fix: instantiate a second valhalla instance to fix transit ingestion

This commit is contained in:
Jan-Henrik 2026-03-05 16:51:19 +01:00
parent e94d660686
commit 0d740bc12d
21 changed files with 654 additions and 369 deletions

View file

@ -2,7 +2,7 @@
import { useState, useEffect } from "react";
import { useParams, useRouter } from "next/navigation";
import { useJobProgress } from "@/hooks/use-job-progress";
import { CityIngestProgress } from "@/components/city-ingest-progress";
interface CityDetail {
slug: string;
@ -24,8 +24,6 @@ export default function CityDetailPage() {
const [jobId, setJobId] = useState<string | null>(null);
const [deleting, setDeleting] = useState(false);
const { stages, overall } = useJobProgress(jobId);
useEffect(() => {
fetch(`/api/admin/cities`)
.then((r) => r.json())
@ -101,37 +99,7 @@ export default function CityDetailPage() {
</div>
{/* Live progress if ingesting */}
{jobId && (
<div className="card mb-6">
<h2 className="text-sm font-semibold mb-4">Ingestion Progress</h2>
<ol className="space-y-3">
{stages.map((s) => (
<li key={s.key} className="flex items-center gap-3 text-sm">
<span
className={`w-4 h-4 rounded-full flex items-center justify-center text-xs ${
s.status === "completed"
? "bg-green-200 text-green-700"
: s.status === "active"
? "bg-brand-100 text-brand-700"
: "bg-gray-100 text-gray-400"
}`}
>
{s.status === "completed" ? "✓" : s.status === "active" ? "…" : "○"}
</span>
<span className={s.status === "active" ? "font-medium" : "text-gray-500"}>
{s.label}
</span>
{s.status === "active" && (
<span className="text-xs text-gray-400">{s.pct}%</span>
)}
</li>
))}
</ol>
{overall === "completed" && (
<p className="text-sm text-green-700 mt-4"> Ingestion complete!</p>
)}
</div>
)}
<CityIngestProgress jobId={jobId} className="card mb-6" />
{/* Actions */}
<div className="flex gap-3">

View file

@ -2,8 +2,7 @@
import { useState, useEffect, useMemo, useRef, useCallback } from "react";
import type { GeofabrikFeature, GeofabrikIndex } from "@transportationer/shared";
import { useJobProgress } from "@/hooks/use-job-progress";
import type { StageStatus, RoutingDetail as RoutingDetailType } from "@/hooks/use-job-progress";
import { CityIngestProgress } from "@/components/city-ingest-progress";
type Step = "browse" | "confirm" | "ingest";
@ -568,173 +567,6 @@ function ConfirmStep({
// ─── Progress step ────────────────────────────────────────────────────────────
function StageIcon({ status }: { status: StageStatus["status"] }) {
if (status === "completed")
return (
<span className="w-5 h-5 flex items-center justify-center rounded-full bg-green-100 text-green-600 text-xs shrink-0">
</span>
);
if (status === "failed")
return (
<span className="w-5 h-5 flex items-center justify-center rounded-full bg-red-100 text-red-600 text-xs shrink-0">
</span>
);
if (status === "active")
return (
<span className="w-5 h-5 flex items-center justify-center shrink-0">
<svg className="animate-spin w-4 h-4 text-brand-600" viewBox="0 0 24 24" fill="none">
<circle cx="12" cy="12" r="10" stroke="currentColor" strokeWidth="2" opacity="0.25" />
<path d="M12 2a10 10 0 0 1 10 10" stroke="currentColor" strokeWidth="2" strokeLinecap="round" />
</svg>
</span>
);
return <span className="w-5 h-5 rounded-full border-2 border-gray-300 shrink-0" />;
}
function StageRow({
stage,
error,
}: {
stage: StageStatus;
error?: string;
}) {
return (
<div className="flex items-start gap-3">
<StageIcon status={stage.status} />
<div className="flex-1 min-w-0">
<p className={`text-sm font-medium ${stage.status === "pending" ? "text-gray-400" : "text-gray-900"}`}>
{stage.label}
</p>
{stage.status === "active" && (
<>
<div className="w-full bg-gray-200 rounded-full h-1 mt-1.5">
<div
className="bg-brand-600 h-1 rounded-full transition-all duration-500"
style={{ width: `${stage.pct}%` }}
/>
</div>
<p className="text-xs text-gray-500 mt-0.5 truncate">{stage.message}</p>
</>
)}
{stage.status === "failed" && error && (
<p className="text-xs text-red-600 mt-0.5">{error}</p>
)}
</div>
</div>
);
}
function RoutingGrid({ routingDetail }: { routingDetail: RoutingDetailType }) {
const MODE_LABELS: Record<string, string> = {
walking: "Walking",
cycling: "Cycling",
driving: "Driving",
transit: "Transit",
};
const entries = Object.entries(routingDetail);
if (entries.length === 0) return null;
return (
<div className="mt-2 space-y-1.5 pl-8">
{entries.map(([mode, { done, total }]) => (
<div key={mode} className="flex items-center gap-2">
<span className="text-xs text-gray-500 w-14 shrink-0">{MODE_LABELS[mode] ?? mode}</span>
<div className="flex-1 bg-gray-200 rounded-full h-1.5">
<div
className="bg-brand-500 h-1.5 rounded-full transition-all duration-500"
style={{ width: total > 0 ? `${(done / total) * 100}%` : done > 0 ? "100%" : "0%" }}
/>
</div>
<span className="text-xs text-gray-400 w-10 text-right shrink-0">
{total > 1 ? `${done}/${total}` : done >= 1 ? "done" : "…"}
</span>
</div>
))}
</div>
);
}
function ProgressStep({ jobId }: { jobId: string | null }) {
const { stages, overall, error, routingDetail } = useJobProgress(jobId);
// Group consecutive parallel stages together for rendering
type StageGroup =
| { kind: "single"; stage: StageStatus }
| { kind: "parallel"; stages: StageStatus[] };
const groups: StageGroup[] = [];
for (const stage of stages) {
if (stage.parallelGroup) {
const last = groups[groups.length - 1];
if (last?.kind === "parallel" && last.stages[0].parallelGroup === stage.parallelGroup) {
last.stages.push(stage);
} else {
groups.push({ kind: "parallel", stages: [stage] });
}
} else {
groups.push({ kind: "single", stage });
}
}
return (
<div className="card max-w-lg">
<h2 className="text-lg font-semibold mb-6">Processing City Data</h2>
<div className="space-y-4">
{groups.map((group, gi) =>
group.kind === "single" ? (
<div key={group.stage.key}>
<StageRow stage={group.stage} error={error} />
{/* Show per-mode routing grid under the compute-accessibility stage */}
{group.stage.key === "Computing scores" &&
group.stage.status === "active" &&
routingDetail && (
<RoutingGrid routingDetail={routingDetail} />
)}
</div>
) : (
<div
key={`group-${gi}`}
className="rounded-lg border border-gray-200 bg-gray-50 p-3 space-y-3"
>
<p className="text-xs font-medium text-gray-400 uppercase tracking-wide">
Running in parallel
</p>
{group.stages.map((s) => (
<StageRow key={s.key} stage={s} error={error} />
))}
</div>
),
)}
</div>
{overall === "completed" && (
<div className="mt-6 p-4 bg-green-50 rounded-lg text-green-800 text-sm">
City ingestion complete!{" "}
<a href="/admin" className="underline font-medium">
Return to dashboard
</a>{" "}
or{" "}
<a href="/" className="underline font-medium">
view on map
</a>
.
</div>
)}
{overall === "failed" && (
<div className="mt-6 p-4 bg-red-50 rounded-lg text-red-800 text-sm">
Ingestion failed: {error}{" "}
<a href="/admin" className="underline">
Return to dashboard
</a>
.
</div>
)}
</div>
);
}
// ─── Main page ────────────────────────────────────────────────────────────────
@ -806,7 +638,7 @@ export default function AddCityPage() {
onConfirm={handleConfirm}
/>
)}
{step === "ingest" && <ProgressStep jobId={jobId} />}
{step === "ingest" && <CityIngestProgress jobId={jobId} />}
</div>
);
}

View file

@ -1,7 +1,7 @@
import { NextRequest, NextResponse } from "next/server";
import { sql } from "@/lib/db";
import { cacheDel } from "@/lib/cache";
import { getValhallaQueue } from "@/lib/queue";
import { getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue";
export const runtime = "nodejs";
@ -27,15 +27,13 @@ export async function DELETE(
cacheDel(`api:stats:*${slug}*`),
]);
// Remove city from the global Valhalla routing tile set.
// The valhalla-worker will delete the city's clipped PBF and rebuild
// tiles from all remaining cities' PBFs in one pass.
const valhallaQueue = getValhallaQueue();
await valhallaQueue.add(
"build-valhalla",
{ type: "build-valhalla", removeSlugs: [slug] },
{ attempts: 1, removeOnComplete: { age: 86400 } },
);
// Remove city from both Valhalla routing tile sets (road + transit).
const rebuildOpts = { attempts: 1, removeOnComplete: { age: 86400 } } as const;
const rebuildData = { type: "build-valhalla" as const, removeSlugs: [slug] };
await Promise.all([
getValhallaQueue().add("build-valhalla", rebuildData, rebuildOpts),
getValhallaTransitQueue().add("build-valhalla", rebuildData, rebuildOpts),
]);
return NextResponse.json({ deleted: slug });
}

View file

@ -1,6 +1,6 @@
import { NextRequest } from "next/server";
import { Job } from "bullmq";
import { getPipelineQueue, getValhallaQueue } from "@/lib/queue";
import { getPipelineQueue, getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue";
import type { PipelineJobData, JobProgress, ComputeScoresJobData, RefreshCityJobData } from "@/lib/queue";
import type { SSEEvent, RoutingDetail } from "@transportationer/shared";
import { CATEGORY_IDS } from "@transportationer/shared";
@ -49,6 +49,7 @@ export async function GET(
}
const valhallaQueue = getValhallaQueue();
const valhallaTransitQueue = getValhallaTransitQueue();
const stream = new ReadableStream({
start(ctrl) {
@ -77,11 +78,13 @@ export async function GET(
}
// 1. Fetch active jobs and waiting-children jobs in parallel.
const [pipelineActive, valhallaActive, waitingChildren] = await Promise.all([
const [pipelineActive, valhallaActive, valhallaTransitActive, waitingChildren] = await Promise.all([
queue.getActive(0, 100),
valhallaQueue.getActive(0, 100),
valhallaTransitQueue.getActive(0, 100),
queue.getWaitingChildren(0, 200),
]);
const allValhallaActive = [...valhallaActive, ...valhallaTransitActive];
// 1a. Parallel routing phase: compute-scores is waiting for its routing
// children to finish. Report aggregate progress instead of one job's pct.
@ -158,7 +161,8 @@ export async function GET(
const extractPoisJob = pipelineActive.find(
(j) => j.data.citySlug === citySlug && j.data.type === "extract-pois",
);
const buildValhallaJob = valhallaActive.find(
// Prefer the road build (valhalla queue) for progress; fall back to transit.
const buildValhallaJob = allValhallaActive.find(
(j) => j.data.citySlug === citySlug && j.data.type === "build-valhalla",
);
if (extractPoisJob || buildValhallaJob) {
@ -176,7 +180,7 @@ export async function GET(
// immediately at pct=100 while the other does the real download.
// Prefer the job that has actual byte progress so the UI doesn't
// regress from 100% → 5% when the skip job is seen first.
const allCityActive = [...pipelineActive, ...valhallaActive].filter(
const allCityActive = [...pipelineActive, ...allValhallaActive].filter(
(j) => j.data.citySlug === citySlug && j.data.type !== "refresh-city",
);
const activeJob =
@ -226,11 +230,12 @@ export async function GET(
// 3. Check BullMQ failed queue as a secondary signal (catches failures
// before the worker's DB update propagates, e.g. DB connection issues).
const [pipelineFailed, valhallaFailed] = await Promise.all([
const [pipelineFailed, valhallaFailed, valhallaTransitFailed] = await Promise.all([
queue.getFailed(0, 50),
valhallaQueue.getFailed(0, 50),
valhallaTransitQueue.getFailed(0, 50),
]);
const recentFail = [...pipelineFailed, ...valhallaFailed].find(
const recentFail = [...pipelineFailed, ...valhallaFailed, ...valhallaTransitFailed].find(
(j) =>
j.data.citySlug === citySlug &&
j.data.type !== "refresh-city" &&

View file

@ -1,15 +1,16 @@
import { NextResponse } from "next/server";
import { getPipelineQueue, getValhallaQueue } from "@/lib/queue";
import { getPipelineQueue, getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue";
import type { JobSummary } from "@transportationer/shared";
export const runtime = "nodejs";
export async function GET() {
const [pQueue, vQueue] = [getPipelineQueue(), getValhallaQueue()];
const [pQueue, vQueue, vtQueue] = [getPipelineQueue(), getValhallaQueue(), getValhallaTransitQueue()];
const [
pWaiting, pWaitingChildren, pActive, pCompleted, pFailed,
vWaiting, vActive, vCompleted, vFailed,
vtWaiting, vtActive, vtCompleted, vtFailed,
] = await Promise.all([
pQueue.getWaiting(0, 20),
pQueue.getWaitingChildren(0, 20),
@ -20,13 +21,17 @@ export async function GET() {
vQueue.getActive(0, 20),
vQueue.getCompleted(0, 20),
vQueue.getFailed(0, 20),
vtQueue.getWaiting(0, 20),
vtQueue.getActive(0, 20),
vtQueue.getCompleted(0, 20),
vtQueue.getFailed(0, 20),
]);
const waitingChildren = [...pWaitingChildren];
const waiting = [...pWaiting, ...vWaiting];
const active = [...pActive, ...vActive];
const completed = [...pCompleted, ...vCompleted];
const failed = [...pFailed, ...vFailed];
const waiting = [...pWaiting, ...vWaiting, ...vtWaiting];
const active = [...pActive, ...vActive, ...vtActive];
const completed = [...pCompleted, ...vCompleted, ...vtCompleted];
const failed = [...pFailed, ...vFailed, ...vtFailed];
const all = [...active, ...waitingChildren, ...waiting, ...completed, ...failed];

View file

@ -1,7 +1,8 @@
import { NextRequest, NextResponse } from "next/server";
import { sql } from "@/lib/db";
import { fetchIsochrone } from "@/lib/valhalla";
import { getValhallaQueue } from "@/lib/queue";
import { getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue";
import { nextTuesdayDeparture } from "@transportationer/shared";
export const runtime = "nodejs";
@ -35,6 +36,7 @@ export async function POST(req: NextRequest) {
: [5, 10, 15];
const mode = typeof travelMode === "string" ? travelMode : "walking";
const departureDate = mode === "transit" ? nextTuesdayDeparture() : null;
// Check PostGIS isochrone cache
const cached = await Promise.resolve(sql<{ result: object }[]>`
@ -42,6 +44,7 @@ export async function POST(req: NextRequest) {
FROM isochrone_cache
WHERE travel_mode = ${mode}
AND contours_min = ${contours}
AND departure_date IS NOT DISTINCT FROM ${departureDate}
AND ST_DWithin(
origin_geom::geography,
ST_SetSRID(ST_MakePoint(${lng}, ${lat}), 4326)::geography,
@ -59,7 +62,9 @@ export async function POST(req: NextRequest) {
// Refuse to call valhalla_service while tiles are being rebuilt —
// the service is stopped during the build and requests would hang or fail.
const activeValhalla = await getValhallaQueue().getActiveCount();
// Check the queue that owns the requested mode's instance.
const rebuildQueue = mode === "transit" ? getValhallaTransitQueue() : getValhallaQueue();
const activeValhalla = await rebuildQueue.getActiveCount();
if (activeValhalla > 0) {
return NextResponse.json(
{ error: "Routing engine is rebuilding, please try again shortly.", code: "VALHALLA_REBUILDING" },
@ -93,11 +98,12 @@ export async function POST(req: NextRequest) {
// parses it as JSON itself. Without the cast, postgres.js infers the JSONB
// column type and re-encodes the string as a JSONB string literal.
await Promise.resolve(sql`
INSERT INTO isochrone_cache (origin_geom, travel_mode, contours_min, result)
INSERT INTO isochrone_cache (origin_geom, travel_mode, contours_min, departure_date, result)
VALUES (
ST_SetSRID(ST_MakePoint(${lng}, ${lat}), 4326),
${mode},
${contours},
${departureDate},
${JSON.stringify(geojson)}::jsonb
)
`);

View file

@ -0,0 +1,172 @@
"use client";
import { useJobProgress } from "@/hooks/use-job-progress";
import type { StageStatus, RoutingDetail as RoutingDetailType } from "@/hooks/use-job-progress";
function StageIcon({ status }: { status: StageStatus["status"] }) {
if (status === "completed")
return (
<span className="w-5 h-5 flex items-center justify-center rounded-full bg-green-100 text-green-600 text-xs shrink-0">
</span>
);
if (status === "failed")
return (
<span className="w-5 h-5 flex items-center justify-center rounded-full bg-red-100 text-red-600 text-xs shrink-0">
</span>
);
if (status === "active")
return (
<span className="w-5 h-5 flex items-center justify-center shrink-0">
<svg className="animate-spin w-4 h-4 text-brand-600" viewBox="0 0 24 24" fill="none">
<circle cx="12" cy="12" r="10" stroke="currentColor" strokeWidth="2" opacity="0.25" />
<path d="M12 2a10 10 0 0 1 10 10" stroke="currentColor" strokeWidth="2" strokeLinecap="round" />
</svg>
</span>
);
return <span className="w-5 h-5 rounded-full border-2 border-gray-300 shrink-0" />;
}
function StageRow({ stage, error }: { stage: StageStatus; error?: string }) {
return (
<div className="flex items-start gap-3">
<StageIcon status={stage.status} />
<div className="flex-1 min-w-0">
<p className={`text-sm font-medium ${stage.status === "pending" ? "text-gray-400" : "text-gray-900"}`}>
{stage.label}
</p>
{stage.status === "active" && (
<>
<div className="w-full bg-gray-200 rounded-full h-1 mt-1.5">
<div
className="bg-brand-600 h-1 rounded-full transition-all duration-500"
style={{ width: `${stage.pct}%` }}
/>
</div>
<p className="text-xs text-gray-500 mt-0.5 truncate">{stage.message}</p>
</>
)}
{stage.status === "failed" && error && (
<p className="text-xs text-red-600 mt-0.5">{error}</p>
)}
</div>
</div>
);
}
function RoutingGrid({ routingDetail }: { routingDetail: RoutingDetailType }) {
const MODE_LABELS: Record<string, string> = {
walking: "Walking",
cycling: "Cycling",
driving: "Driving",
transit: "Transit",
};
const entries = Object.entries(routingDetail);
if (entries.length === 0) return null;
return (
<div className="mt-2 space-y-1.5 pl-8">
{entries.map(([mode, { done, total }]) => (
<div key={mode} className="flex items-center gap-2">
<span className="text-xs text-gray-500 w-14 shrink-0">{MODE_LABELS[mode] ?? mode}</span>
<div className="flex-1 bg-gray-200 rounded-full h-1.5">
<div
className="bg-brand-500 h-1.5 rounded-full transition-all duration-500"
style={{ width: total > 0 ? `${(done / total) * 100}%` : done > 0 ? "100%" : "0%" }}
/>
</div>
<span className="text-xs text-gray-400 w-10 text-right shrink-0">
{total > 1 ? `${done}/${total}` : done >= 1 ? "done" : "…"}
</span>
</div>
))}
</div>
);
}
export function CityIngestProgress({
jobId,
className = "card max-w-lg",
}: {
jobId: string | null;
className?: string;
}) {
const { stages, overall, error, routingDetail } = useJobProgress(jobId);
if (!jobId) return null;
type StageGroup =
| { kind: "single"; stage: StageStatus }
| { kind: "parallel"; stages: StageStatus[] };
const groups: StageGroup[] = [];
for (const stage of stages) {
if (stage.parallelGroup) {
const last = groups[groups.length - 1];
if (last?.kind === "parallel" && last.stages[0].parallelGroup === stage.parallelGroup) {
last.stages.push(stage);
} else {
groups.push({ kind: "parallel", stages: [stage] });
}
} else {
groups.push({ kind: "single", stage });
}
}
return (
<div className={className}>
<h2 className="text-lg font-semibold mb-6">Processing City Data</h2>
<div className="space-y-4">
{groups.map((group, gi) =>
group.kind === "single" ? (
<div key={group.stage.key}>
<StageRow stage={group.stage} error={error} />
{group.stage.key === "Computing scores" &&
group.stage.status === "active" &&
routingDetail && (
<RoutingGrid routingDetail={routingDetail} />
)}
</div>
) : (
<div
key={`group-${gi}`}
className="rounded-lg border border-gray-200 bg-gray-50 p-3 space-y-3"
>
<p className="text-xs font-medium text-gray-400 uppercase tracking-wide">
Running in parallel
</p>
{group.stages.map((s) => (
<StageRow key={s.key} stage={s} error={error} />
))}
</div>
),
)}
</div>
{overall === "completed" && (
<div className="mt-6 p-4 bg-green-50 rounded-lg text-green-800 text-sm">
City ingestion complete!{" "}
<a href="/admin" className="underline font-medium">
Return to dashboard
</a>{" "}
or{" "}
<a href="/" className="underline font-medium">
view on map
</a>
.
</div>
)}
{overall === "failed" && (
<div className="mt-6 p-4 bg-red-50 rounded-lg text-red-800 text-sm">
Ingestion failed: {error}{" "}
<a href="/admin" className="underline">
Return to dashboard
</a>
.
</div>
)}
</div>
);
}

View file

@ -2,7 +2,7 @@ import { getRedis } from "./redis";
/** TTL in seconds for each cache category */
const TTL = {
API_CITIES: 3600, // 1 hour
API_CITIES: 30, // 30 seconds — city status changes during ingest
API_POIS: 300, // 5 minutes
API_GRID: 600, // 10 minutes
API_STATS: 120, // 2 minutes

View file

@ -26,6 +26,9 @@ declare global {
// eslint-disable-next-line no-var
// eslint-disable-next-line @typescript-eslint/no-explicit-any
var __valhallaQueue: Queue<any> | undefined;
// eslint-disable-next-line no-var
// eslint-disable-next-line @typescript-eslint/no-explicit-any
var __valhallaTransitQueue: Queue<any> | undefined;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@ -59,6 +62,22 @@ export function getValhallaQueue(): Queue<any> {
return globalThis.__valhallaQueue;
}
/** Queue for build-valhalla transit jobs, processed by the valhalla-transit container. */
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function getValhallaTransitQueue(): Queue<any> {
if (!globalThis.__valhallaTransitQueue) {
globalThis.__valhallaTransitQueue = new Queue("valhalla-transit", {
connection: createBullMQConnection(),
defaultJobOptions: {
attempts: 1,
removeOnComplete: { age: 86400 * 7 },
removeOnFail: { age: 86400 * 30 },
},
});
}
return globalThis.__valhallaTransitQueue;
}
export function createQueueEvents(): QueueEvents {
return new QueueEvents("pipeline", {
connection: createBullMQConnection(),

View file

@ -1,4 +1,7 @@
import { nextTuesdayDeparture } from "@transportationer/shared";
const VALHALLA_BASE = process.env.VALHALLA_URL ?? "http://valhalla:8002";
const VALHALLA_TRANSIT_BASE = process.env.VALHALLA_TRANSIT_URL ?? VALHALLA_BASE;
export type ValhallaCosting = "pedestrian" | "bicycle" | "auto";
@ -28,10 +31,11 @@ export async function fetchIsochrone(opts: IsochroneOpts): Promise<object> {
};
if (isTransit) {
body.costing_options = { transit: { use_bus: 1.0, use_rail: 1.0, use_transfers: 1.0 } };
body.date_time = { type: 0 }; // current time
body.date_time = { type: 1, value: nextTuesdayDeparture() };
}
const res = await fetch(`${VALHALLA_BASE}/isochrone`, {
const base = isTransit ? VALHALLA_TRANSIT_BASE : VALHALLA_BASE;
const res = await fetch(`${base}/isochrone`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),

View file

@ -31,10 +31,10 @@ services:
timeout: 5s
retries: 5
# ─── Valhalla routing engine + BullMQ worker ──────────────────────────────
# Built from the gis-ops Valhalla image + Node.js. This container does two
# things: processes build-valhalla BullMQ jobs (running valhalla_build_tiles
# natively) and serves the resulting tiles via valhalla_service on port 8002.
# ─── Valhalla road worker (port 8002) ─────────────────────────────────────
# Builds road-only tiles (no transit data) → cycling/walking/driving routing.
# Without GTFS in its volume, valhalla_build_tiles produces clean road tiles
# with no ghost transit edges, so bicycle routing works correctly.
valhalla:
build:
context: .
@ -42,19 +42,45 @@ services:
restart: unless-stopped
volumes:
- osm_data:/data/osm:ro # PBF files downloaded by the main worker
- valhalla_tiles:/data/valhalla # Valhalla config and routing tiles
- valhalla_tiles:/data/valhalla # Road-only config and tiles
environment:
REDIS_HOST: valkey
REDIS_PORT: "6379"
REDIS_PASSWORD: ${VALKEY_PASSWORD}
VALHALLA_QUEUE_NAME: valhalla
OSM_DATA_DIR: /data/osm
VALHALLA_CONFIG: /data/valhalla/valhalla.json
VALHALLA_TILES_DIR: /data/valhalla/valhalla_tiles
NODE_ENV: production
ports:
- "127.0.0.1:8002:8002" # Valhalla HTTP API (road)
depends_on:
valkey:
condition: service_healthy
# ─── Valhalla transit worker (port 8002 internal) ─────────────────────────
# Builds tiles with GTFS transit data → multimodal routing.
# Separate volume from the road worker so transit ghost edges never affect
# the road instance.
valhalla-transit:
build:
context: .
target: valhalla-worker
restart: unless-stopped
volumes:
- osm_data:/data/osm:ro # PBF files downloaded by the main worker
- valhalla_tiles_transit:/data/valhalla # Transit config, tiles and GTFS data
environment:
REDIS_HOST: valkey
REDIS_PORT: "6379"
REDIS_PASSWORD: ${VALKEY_PASSWORD}
VALHALLA_QUEUE_NAME: valhalla-transit
OSM_DATA_DIR: /data/osm
VALHALLA_CONFIG: /data/valhalla/valhalla.json
VALHALLA_TILES_DIR: /data/valhalla/valhalla_tiles
NODE_ENV: production
# Optional: connect-info.net token for NDS-specific GTFS feed
CONNECT_INFO_TOKEN: ${CONNECT_INFO_TOKEN:-}
ports:
- "127.0.0.1:8002:8002" # Valhalla HTTP API
depends_on:
valkey:
condition: service_healthy
@ -83,6 +109,7 @@ services:
REDIS_PORT: "6379"
REDIS_PASSWORD: ${VALKEY_PASSWORD}
VALHALLA_URL: http://valhalla:8002
VALHALLA_TRANSIT_URL: http://valhalla-transit:8002
ADMIN_PASSWORD_HASH: ${ADMIN_PASSWORD_HASH}
ADMIN_JWT_SECRET: ${ADMIN_JWT_SECRET}
NODE_ENV: production
@ -106,6 +133,7 @@ services:
OSM_DATA_DIR: /data/osm
LUA_SCRIPT: /app/infra/osm2pgsql.lua
VALHALLA_URL: http://valhalla:8002
VALHALLA_TRANSIT_URL: http://valhalla-transit:8002
NODE_ENV: production
# Optional: enables NDS-specific GTFS source for cities in Niedersachsen
CONNECT_INFO_TOKEN: ${CONNECT_INFO_TOKEN:-}
@ -120,6 +148,7 @@ services:
volumes:
postgres_data:
valkey_data:
osm_data: # Shared: worker writes, valhalla reads
valhalla_tiles:
osm_data: # Shared: worker writes, valhalla containers read
valhalla_tiles: # Road-only tiles (no transit) — cycling works correctly here
valhalla_tiles_transit: # Transit tiles (with GTFS) — multimodal routing
pmtiles_data:

View file

@ -121,6 +121,7 @@ CREATE TABLE IF NOT EXISTS isochrone_cache (
origin_geom geometry(Point, 4326) NOT NULL,
travel_mode TEXT NOT NULL,
contours_min INTEGER[] NOT NULL,
departure_date TEXT, -- NULL for non-transit; 'YYYY-MM-DDTHH:mm' for transit
result JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
@ -129,6 +130,9 @@ CREATE INDEX IF NOT EXISTS idx_isochrone_origin
ON isochrone_cache USING GIST (origin_geom);
CREATE INDEX IF NOT EXISTS idx_isochrone_created
ON isochrone_cache (created_at);
CREATE INDEX IF NOT EXISTS idx_isochrone_mode_departure
ON isochrone_cache (travel_mode, departure_date)
WHERE departure_date IS NOT NULL;
-- Auto-expire isochrone cache entries older than 30 days
-- (handled by periodic cleanup or TTL logic in app)

View file

@ -2,3 +2,4 @@ export * from "./osm-tags.js";
export * from "./types.js";
export * from "./queue.js";
export * from "./profiles.js";
export * from "./utils.js";

15
shared/src/utils.ts Normal file
View file

@ -0,0 +1,15 @@
/**
* Returns the next Tuesday at 08:00 as a Valhalla-compatible datetime string
* (YYYY-MM-DDTHH:mm). Used for reproducible transit scoring across worker and
* UI always lands within a freshly-downloaded GTFS feed's validity window.
* Tuesday is a representative mid-week workday; "next" ensures we never pick
* today (which may be a partial day or have no upcoming service).
*/
export function nextTuesdayDeparture(): string {
const now = new Date();
const daysUntilTue = (2 - now.getDay() + 7) % 7 || 7; // 17, always future
const d = new Date(now);
d.setDate(d.getDate() + daysUntilTue);
const pad = (n: number) => String(n).padStart(2, "0");
return `${d.getFullYear()}-${pad(d.getMonth() + 1)}-${pad(d.getDate())}T08:00`;
}

View file

@ -35,19 +35,25 @@ const ADMINS_SQLITE = `${VALHALLA_DATA_DIR}/admins.sqlite`;
* the Valhalla default (/data/valhalla/transit) and to persist transit tiles
* between builds.
*
* IMPORTANT build order (per Valhalla docs):
* 1. valhalla_build_tiles road graph; also downloads timezone.sqlite
* 2. valhalla_ingest_transit GTFS transit PBF tiles in transit_dir
* 3. valhalla_convert_transit reads transit PBFs + road tiles transit graph
* IMPORTANT build order:
* 1. valhalla_ingest_transit GTFS transit PBF staging files in transit_dir
* 2. valhalla_convert_transit PBF transit graph tiles (.gph) in transit_dir
* 3. valhalla_build_tiles road graph + reads transit tiles from transit_dir
* creates road tiles WITH road-to-transit edges
* copies transit tiles into tile_dir/3/
*
* valhalla_convert_transit REQUIRES road tiles to exist (it uses GraphReader
* to look up road node IDs for stop connections). Running it before
* valhalla_build_tiles causes it to crash looking for tiles that don't exist.
* valhalla_build_tiles MUST run AFTER valhalla_convert_transit so it can find
* the transit .gph tiles in transit_dir and embed roadtransit connections in
* the road tiles. Running build_tiles before convert results in road tiles with
* no transit connections (transit routing silently falls back to walking).
*
* valhalla_convert_transit does NOT require road tiles it only reads the
* transit PBF staging files and writes transit graph tiles. It can run on an
* empty tile_dir without crashing.
*
* TRANSIT_CACHE_MARKER tracks whether ingest PBFs are current relative to the
* GTFS source. valhalla_convert_transit is always re-run after a road build
* because road node IDs change on each rebuild and old transit-to-road
* connections would otherwise be stale.
* GTFS source. valhalla_convert_transit always runs after ingest (or when the
* ingest cache is fresh) so transit_dir has up-to-date .gph before build_tiles.
*/
const TRANSIT_CACHE_DIR = `${VALHALLA_DATA_DIR}/transit_graph`;
/** Written after a successful valhalla_ingest_transit; compared against GTFS source mtime. */
@ -55,6 +61,19 @@ const TRANSIT_CACHE_MARKER = `${TRANSIT_CACHE_DIR}/.ready`;
/** Written by download-gtfs-de after each successful GTFS extraction. */
const GTFS_SOURCE_MARKER = `${GTFS_FEED_DIR}/.source`;
/**
* Buffer added to the city bbox when clipping the road PBF with osmium.
* Transit stops within the city bbox may be in low-road-density areas (parks,
* new developments, demand-responsive zones) where the nearest OSM road is
* outside the exact bbox clip. Without coverage, valhalla_build_tiles crashes
* with a memory corruption error ("double free" / "bad_array_new_length").
* stop_times is already filtered to bbox-local stops so the buffer only adds
* road coverage it does NOT let Germany-wide transit stops into the graph.
* 0.2° 18 km at 53 °N covers roads for all plausibly in-city stops.
*/
const ROAD_BBOX_BUFFER = 0.2;
/**
* Manifest file: maps citySlug absolute path of its routing PBF.
* Persists in the valhalla_tiles Docker volume across restarts.
@ -201,6 +220,12 @@ function generateConfig(): void {
*/
function isTransitIngestFresh(): boolean {
if (!existsSync(TRANSIT_CACHE_MARKER) || !existsSync(GTFS_SOURCE_MARKER)) return false;
// Verify at least one transit PBF tile exists — the marker can survive a
// cache-dir wipe (crash recovery) and we'd otherwise skip ingest with an
// empty transit dir, causing valhalla_convert_transit to fail silently.
// Valhalla 3.x ingest writes level-3 tiles; check for the directory.
const level3Dir = `${TRANSIT_CACHE_DIR}/3`;
if (!existsSync(level3Dir)) return false;
return statSync(TRANSIT_CACHE_MARKER).mtimeMs >= statSync(GTFS_SOURCE_MARKER).mtimeMs;
}
@ -236,21 +261,32 @@ export async function handleBuildValhalla(
stage: "Building routing graph",
pct: 5,
message: bbox
? `Clipping PBF to bbox [${bbox.map((v) => v.toFixed(3)).join(", ")}]`
? `Clipping PBF to city bbox (may expand for transit coverage)`
: `Registering full PBF for ${citySlug}`,
} satisfies JobProgress);
let routingPbf: string;
if (bbox) {
const [minLng, minLat, maxLng, maxLat] = bbox;
const clippedPbf = `${VALHALLA_DATA_DIR}/${citySlug}-routing.osm.pbf`;
if (!existsSync(pbfPath)) throw new Error(`PBF file not found: ${pbfPath}`);
const extractBbox: [number, number, number, number] = [
bbox[0] - ROAD_BBOX_BUFFER,
bbox[1] - ROAD_BBOX_BUFFER,
bbox[2] + ROAD_BBOX_BUFFER,
bbox[3] + ROAD_BBOX_BUFFER,
];
console.log(
`[build-valhalla] Road extract bbox: city [${bbox.map((v) => v.toFixed(3)).join(", ")}]` +
` + ${ROAD_BBOX_BUFFER}° buffer → [${extractBbox.map((v) => v.toFixed(3)).join(", ")}]`,
);
await runProcess("osmium", [
"extract",
`--bbox=${minLng},${minLat},${maxLng},${maxLat}`,
`--bbox=${extractBbox[0]},${extractBbox[1]},${extractBbox[2]},${extractBbox[3]}`,
pbfPath,
"-o", clippedPbf,
"--overwrite",
@ -289,73 +325,47 @@ export async function handleBuildValhalla(
return;
}
// ── Step 3: build road tiles ──────────────────────────────────────────────
// ── Step 3: transit ingest + convert ─────────────────────────────────────
//
// valhalla_build_tiles MUST run before transit operations:
// • valhalla_convert_transit needs road tiles (GraphReader) to look up road
// node IDs for each transit stop — running it before this step causes the
// "Couldn't load .../0/000/000.pbf" crash.
//
// valhalla_build_tiles ignores any transit tiles in transit_dir (it filters
// them out of the hierarchy build), so there is no "transit connection" pass
// to worry about — transit connectivity is created by convert_transit.
await job.updateProgress({
stage: "Building routing graph",
pct: 10,
message: `Building road routing tiles for: ${allSlugs.join(", ")}`,
} satisfies JobProgress);
await runProcess("valhalla_build_tiles", ["-c", VALHALLA_CONFIG, ...allPbfs]);
console.log("[build-valhalla] Road tiles built");
// ── Step 4: transit tile preparation ─────────────────────────────────────
//
// Transit runs after road tiles exist. Three sub-steps:
//
// 4a. timezone db — valhalla_build_timezones (one-time, skip if exists).
// valhalla_ingest_transit needs it to assign timezone info to stops.
// Without it, ingest skips writing the root index tile (0/000/000.pbf)
// and valhalla_convert_transit crashes trying to load it.
//
// 4b. valhalla_ingest_transit — GTFS → transit PBF tiles in transit_dir.
// Only re-run when GTFS data changed (expensive: can take hours).
//
// 4c. valhalla_convert_transit — transit PBFs + road tiles → transit graph.
// ALWAYS re-run after a road build because road node IDs change on
// every rebuild; old transit-to-road connections would be stale.
// Build order: ingest → convert → road tiles.
// valhalla_build_tiles MUST run after valhalla_convert_transit so it finds
// transit .gph tiles in transit_dir and embeds road↔transit connection edges
// in the road tiles. Without convert running first, road tiles have no transit
// connections and multimodal routing silently falls back to walking.
// valhalla_convert_transit does NOT need road tiles — it only reads the GTFS
// staging PBFs and writes the transit graph tiles.
const gtfsReady =
existsSync(GTFS_FEED_DIR) &&
readdirSync(GTFS_FEED_DIR).some((f) => f.endsWith(".txt"));
let ingestPbfsAvailable = false;
if (gtfsReady) {
// 4a: timezone database — one-time setup, persists in VALHALLA_DATA_DIR.
// valhalla_ingest_transit needs this to assign timezone info to stops;
// 3a: timezone database — one-time setup, persists in VALHALLA_DATA_DIR.
// valhalla_ingest_transit needs it to assign timezone info to stops;
// without it the root index tile (0/000/000.pbf) is not written and
// valhalla_convert_transit crashes trying to load it.
if (!existsSync(TIMEZONE_SQLITE)) {
await job.updateProgress({
stage: "Building routing graph",
pct: 73,
pct: 10,
message: "Building timezone database (one-time setup)…",
} satisfies JobProgress);
try {
await buildTimezoneDb();
} catch (err) {
console.warn("[build-valhalla] valhalla_build_timezones failed — skipping transit:", err);
// Can't safely run transit ingest without timezone db.
}
}
// 4b: ingest (only when GTFS changed, and only when timezone db is ready)
let ingestPbfsAvailable = isTransitIngestFresh();
// 3b: ingest (only when GTFS changed, and only when timezone db is ready)
ingestPbfsAvailable = isTransitIngestFresh();
if (!ingestPbfsAvailable && existsSync(TIMEZONE_SQLITE)) {
await job.updateProgress({
stage: "Building routing graph",
pct: 75,
pct: 12,
message: "Ingesting GTFS transit feeds…",
} satisfies JobProgress);
try {
@ -374,17 +384,23 @@ export async function handleBuildValhalla(
mkdirSync(TRANSIT_CACHE_DIR, { recursive: true });
}
} else if (ingestPbfsAvailable) {
console.log("[build-valhalla] Transit ingest cache is fresh — skipping ingest");
await job.updateProgress({
stage: "Building routing graph",
pct: 12,
message: "Transit ingest cache is fresh — skipping re-ingest",
} satisfies JobProgress);
} else {
console.log("[build-valhalla] timezone.sqlite unavailable — skipping transit ingest");
}
// 4c: convert (always, to reconnect transit to the new road graph)
// 3c: convert transit PBF staging files → transit graph tiles (.gph)
// Runs even when ingest was skipped (cache fresh) so transit_dir always
// has up-to-date .gph tiles before valhalla_build_tiles reads them.
if (ingestPbfsAvailable) {
await job.updateProgress({
stage: "Building routing graph",
pct: 85,
message: "Connecting transit tiles to road graph…",
pct: 15,
message: "Converting transit staging tiles to graph tiles…",
} satisfies JobProgress);
try {
await runProcess("valhalla_convert_transit", ["-c", VALHALLA_CONFIG]);
@ -394,9 +410,27 @@ export async function handleBuildValhalla(
}
}
} else {
console.log("[build-valhalla] No GTFS feed found — skipping transit tile prep");
console.log("[build-valhalla] No GTFS feed found — skipping transit ingest/convert");
}
// ── Step 4: build road tiles ──────────────────────────────────────────────
//
// Runs AFTER valhalla_convert_transit so transit .gph tiles are present in
// transit_dir. valhalla_build_tiles reads them, embeds road↔transit connection
// edges in the road tiles, and copies transit tiles into tile_dir/3/.
// Without transit tiles present at this step, road tiles have no transit
// connections and multimodal routing silently falls back to walking.
await job.updateProgress({
stage: "Building routing graph",
pct: 20,
message: `Building road routing tiles for: ${allSlugs.join(", ")}`,
} satisfies JobProgress);
await runProcess("valhalla_build_tiles", ["-c", VALHALLA_CONFIG, ...allPbfs]);
console.log("[build-valhalla] Road tiles built");
// ── Step 5: restart Valhalla service ─────────────────────────────────────
await job.updateProgress({

View file

@ -124,6 +124,9 @@ export async function handleComputeScores(
removeOnComplete: { age: 86400 * 7 },
removeOnFail: { age: 86400 * 30 },
parent: { id: job.id!, queue: queue.qualifiedName },
// Transit is optional — a failure should not cascade to the parent.
// The city will be marked ready with walking/cycling scores only.
ignoreDependencyOnFailure: true,
},
);
}
@ -139,6 +142,8 @@ export async function handleComputeScores(
removeOnComplete: { age: 86400 * 7 },
removeOnFail: { age: 86400 * 30 },
parent: { id: job.id!, queue: queue.qualifiedName },
// Boris NI data is optional — failure should not cancel the pipeline.
ignoreDependencyOnFailure: true,
},
);
}

View file

@ -20,9 +20,9 @@
*/
import type { Job } from "bullmq";
import { getSql } from "../db.js";
import { fetchTransitIsochrone } from "../valhalla.js";
import { fetchTransitIsochrone, parseTransitContours, TRANSIT_CONTOUR_MINUTES } from "../valhalla.js";
import type { JobProgress } from "@transportationer/shared";
import { CATEGORY_IDS } from "@transportationer/shared";
import { CATEGORY_IDS, nextTuesdayDeparture } from "@transportationer/shared";
export type ComputeTransitData = {
type: "compute-transit";
@ -46,9 +46,66 @@ async function asyncPool<T>(
await Promise.all(Array.from({ length: Math.min(concurrency, items.length) }, worker));
}
/** Check isochrone cache then call Valhalla, writing the result back to cache. */
async function getTransitIsochrone(
sql: ReturnType<typeof getSql>,
gp: { lat: number; lng: number },
departureDate: string,
) {
type CacheRow = { result: object };
const cached = await Promise.resolve(sql<CacheRow[]>`
SELECT result FROM isochrone_cache
WHERE travel_mode = 'transit'
AND departure_date = ${departureDate}
AND contours_min = ${TRANSIT_CONTOUR_MINUTES as unknown as number[]}
AND ST_DWithin(
origin_geom::geography,
ST_SetSRID(ST_MakePoint(${gp.lng}, ${gp.lat}), 4326)::geography,
50
)
LIMIT 1
`);
if (cached.length > 0) {
return parseTransitContours(cached[0].result);
}
const contours = await fetchTransitIsochrone({ lat: gp.lat, lng: gp.lng }, departureDate);
if (contours) {
const geojson = {
type: "FeatureCollection",
features: contours.map((c) => ({
type: "Feature",
properties: { contour: c.minutes },
geometry: c.geojson,
})),
};
try {
await Promise.resolve(sql`
INSERT INTO isochrone_cache
(origin_geom, travel_mode, contours_min, departure_date, result)
VALUES (
ST_SetSRID(ST_MakePoint(${gp.lng}, ${gp.lat}), 4326),
'transit',
${TRANSIT_CONTOUR_MINUTES as unknown as number[]},
${departureDate},
${JSON.stringify(geojson)}::jsonb
)
ON CONFLICT DO NOTHING
`);
} catch {
// Cache write failure is non-fatal
}
}
return contours;
}
export async function handleComputeTransit(job: Job<ComputeTransitData>): Promise<void> {
const { citySlug } = job.data;
const sql = getSql();
const departureDate = nextTuesdayDeparture();
const gridPoints = await Promise.resolve(sql<{ id: string; lat: number; lng: number }[]>`
SELECT id::text AS id, ST_Y(geom) AS lat, ST_X(geom) AS lng
@ -78,7 +135,7 @@ export async function handleComputeTransit(job: Job<ComputeTransitData>): Promis
let withTransit = 0;
await asyncPool(BATCH_CONCURRENCY, gridPoints, async (gp) => {
const contours = await fetchTransitIsochrone({ lat: gp.lat, lng: gp.lng });
const contours = await getTransitIsochrone(sql, gp, departureDate);
processed++;
if (!contours || contours.length === 0) {

View file

@ -51,14 +51,22 @@ export type DownloadGtfsDeData = {
const GTFS_DATA_DIR = process.env.GTFS_DATA_DIR ?? "/data/valhalla/gtfs";
const GTFS_ZIP_PATH = `${GTFS_DATA_DIR}/feed.zip`;
const GTFS_FEED_DIR = `${GTFS_DATA_DIR}/feed`;
/** Records which source/bboxes last populated GTFS_FEED_DIR. JSON format. */
/** Records which source/bboxes/algorithm last populated GTFS_FEED_DIR. JSON format. */
const SOURCE_MARKER = `${GTFS_FEED_DIR}/.source`;
/**
* Bump this when the filtering algorithm changes in a way that produces
* different output from the same source + bboxes. This forces a re-filter
* on the existing extracted data without re-downloading.
*/
const FILTER_VERSION = 2;
// ─── Source marker helpers ────────────────────────────────────────────────────
interface SourceMarker {
source: string;
bboxes?: [number, number, number, number][];
filterVersion?: number;
}
function readSourceMarker(): SourceMarker | null {
@ -73,7 +81,7 @@ function readSourceMarker(): SourceMarker | null {
}
function writeSourceMarker(source: string, bboxes?: [number, number, number, number][]): void {
writeFileSync(SOURCE_MARKER, JSON.stringify({ source, bboxes }));
writeFileSync(SOURCE_MARKER, JSON.stringify({ source, bboxes, filterVersion: FILTER_VERSION }));
}
/** True when `outer` fully contains `inner`. */
@ -205,7 +213,11 @@ async function filterGtfsByBboxes(
renameSync(tmpPath, filePath);
}
// ── Step 1: filter stops.txt by bbox → validStopIds ──────────────────────
// ── Step 1: collect bbox stop IDs (read-only — stops.txt not written yet) ──
//
// Build the set of stops within the bbox — used to seed validTripIds (step 2a)
// and to filter stop_times to local stops only (step 2b). stops.txt itself is
// filtered in step 3 to only bbox stops that appear in the final stop_times.
const stopsPath = path.join(feedDir, "stops.txt");
if (!existsSync(stopsPath)) {
@ -213,21 +225,35 @@ async function filterGtfsByBboxes(
return;
}
const validStopIds = new Set<string>();
filterSmallCsv(
stopsPath,
(idx, fields) => {
const lat = parseFloat(fields[idx.get("stop_lat") ?? -1] ?? "NaN");
const lon = parseFloat(fields[idx.get("stop_lon") ?? -1] ?? "NaN");
return inAnyBbox(lat, lon);
},
(idx, fields) => {
validStopIds.add(fields[idx.get("stop_id") ?? -1] ?? "");
},
);
console.log(`[download-gtfs-de] Bbox filter: ${validStopIds.size} stops in area`);
const bboxStopIds = new Set<string>();
// Also track the bbox of seeding stops — used later to expand the road tile
// extraction in build-valhalla to cover these stops without expanding to the
// full retained-stops area (which includes Germany-wide long-distance trip stops).
let seedMinLng = Infinity, seedMinLat = Infinity, seedMaxLng = -Infinity, seedMaxLat = -Infinity;
{
const lines = readFileSync(stopsPath, "utf8").split(/\r?\n/).filter((l) => l.trim());
if (lines.length >= 2) {
const idx = colIndex(lines[0]);
const stopIdCol = idx.get("stop_id") ?? -1;
const latCol = idx.get("stop_lat") ?? -1;
const lonCol = idx.get("stop_lon") ?? -1;
for (let i = 1; i < lines.length; i++) {
const fields = splitCsv(lines[i]);
const lat = parseFloat(fields[latCol] ?? "NaN");
const lon = parseFloat(fields[lonCol] ?? "NaN");
if (inAnyBbox(lat, lon)) {
bboxStopIds.add(fields[stopIdCol] ?? "");
if (isFinite(lat) && isFinite(lon)) {
seedMinLat = Math.min(seedMinLat, lat); seedMaxLat = Math.max(seedMaxLat, lat);
seedMinLng = Math.min(seedMinLng, lon); seedMaxLng = Math.max(seedMaxLng, lon);
}
}
}
}
}
console.log(`[download-gtfs-de] Bbox filter: ${bboxStopIds.size} stops seed the area`);
if (validStopIds.size === 0) {
if (bboxStopIds.size === 0) {
console.warn(
"[download-gtfs-de] No stops found in any bbox — GTFS filter skipped " +
"(check bbox coverage and feed area)",
@ -235,7 +261,7 @@ async function filterGtfsByBboxes(
return;
}
// ── Step 2 (pass 1): collect trip_ids that serve the area ─────────────────
// ── Step 2a: collect trip_ids that serve the area (pass 1) ────────────────
const stopTimesPath = path.join(feedDir, "stop_times.txt");
if (!existsSync(stopTimesPath)) {
@ -244,6 +270,9 @@ async function filterGtfsByBboxes(
}
const validTripIds = new Set<string>();
// Count how many bbox-local stops each trip has — trips with only 1 bbox
// stop are useless for routing (no O→D pair) and are pruned before step 2b.
const tripBboxStopCount = new Map<string, number>();
{
let stopIdCol = -1;
let tripIdCol = -1;
@ -258,24 +287,84 @@ async function filterGtfsByBboxes(
tripIdCol = idx.get("trip_id") ?? -1;
continue;
}
// stop_id and trip_id never contain commas/quotes — fast split is safe
const fields = line.split(",");
if (stopIdCol >= 0 && validStopIds.has(fields[stopIdCol] ?? "")) {
validTripIds.add(fields[tripIdCol] ?? "");
const tripId = fields[tripIdCol] ?? "";
const stopId = fields[stopIdCol] ?? "";
if (stopIdCol >= 0 && bboxStopIds.has(stopId)) {
validTripIds.add(tripId);
tripBboxStopCount.set(tripId, (tripBboxStopCount.get(tripId) ?? 0) + 1);
}
}
}
console.log(`[download-gtfs-de] Bbox filter: ${validTripIds.size} trips serve the area`);
// Remove trips with only one bbox stop — they can't provide an O→D pair
for (const tripId of validTripIds) {
if ((tripBboxStopCount.get(tripId) ?? 0) < 2) validTripIds.delete(tripId);
}
console.log(`[download-gtfs-de] Bbox filter: ${validTripIds.size} trips with ≥2 bbox stops serve the area`);
// ── Step 2 (pass 2): write filtered stop_times.txt ────────────────────────
// ── Step 2b: write filtered stop_times, keeping only bbox stops (pass 2) ───
//
// We keep a stop_times entry only when BOTH:
// - its trip has ≥2 bbox stops (trip_id ∈ validTripIds), AND
// - the stop itself is within the city bbox (stop_id ∈ bboxStopIds).
//
// Out-of-bbox stops on long-distance routes (e.g. ICE Hamburg↔Bremen passing
// through Oldenburg) are stripped from stop_times. Trips with only one bbox
// stop are removed entirely (no O→D pair, useless for routing). This limits
// the transit graph to local stops only, ensuring valhalla_build_tiles can
// create road connections for all included stops without ghost edge references
// that cause routing errors for other modes (bicycle, driving).
await filterLargeCsv(
stopTimesPath,
(tripIdCol, line) => validTripIds.has(line.split(",")[tripIdCol] ?? ""),
(idx) => idx.get("trip_id") ?? -1,
const allTripStopIds = new Set<string>();
{
const tmpPath = stopTimesPath + ".tmp";
const writer = createWriteStream(tmpPath);
let isFirst = true;
let tripIdCol = -1;
let stopIdCol = -1;
const rl = createInterface({ input: createReadStream(stopTimesPath), crlfDelay: Infinity });
for await (const line of rl) {
if (!line.trim()) continue;
if (isFirst) {
isFirst = false;
const idx = colIndex(line);
tripIdCol = idx.get("trip_id") ?? -1;
stopIdCol = idx.get("stop_id") ?? -1;
writer.write(line + "\n");
continue;
}
const fields = line.split(",");
if (
validTripIds.has(fields[tripIdCol] ?? "") &&
bboxStopIds.has(fields[stopIdCol] ?? "")
) {
allTripStopIds.add(fields[stopIdCol] ?? "");
writer.write(line + "\n");
}
}
await new Promise<void>((resolve, reject) =>
writer.end((err?: unknown) => (err ? reject(err) : resolve())),
);
renameSync(tmpPath, stopTimesPath);
}
// ── Step 3: filter stops.txt to bbox stops used by kept trips ──────────────
filterSmallCsv(
stopsPath,
(idx, fields) => allTripStopIds.has(fields[idx.get("stop_id") ?? -1] ?? ""),
);
// ── Step 3: filter trips.txt ───────────────────────────────────────────────
if (isFinite(seedMinLat)) {
const stopsBbox: [number, number, number, number] = [seedMinLng, seedMinLat, seedMaxLng, seedMaxLat];
writeFileSync(path.join(feedDir, ".stops_bbox"), JSON.stringify(stopsBbox));
console.log(
`[download-gtfs-de] Transit stops bbox (seeding area): [${stopsBbox.map((v) => v.toFixed(3)).join(", ")}]`,
);
}
console.log(`[download-gtfs-de] Bbox filter: ${allTripStopIds.size} bbox stops kept across ${validTripIds.size} trips`);
// ── Step 4: filter trips.txt ───────────────────────────────────────────────
const validRouteIds = new Set<string>();
const validServiceIds = new Set<string>();
@ -292,7 +381,7 @@ async function filterGtfsByBboxes(
},
);
// ── Step 4: filter remaining files ────────────────────────────────────────
// ── Step 5: filter remaining files ────────────────────────────────────────
filterSmallCsv(
path.join(feedDir, "routes.txt"),
@ -317,7 +406,7 @@ async function filterGtfsByBboxes(
console.log(
`[download-gtfs-de] GTFS filter complete: ` +
`${validStopIds.size} stops, ${validTripIds.size} trips, ${validRouteIds.size} routes`,
`${allTripStopIds.size} stops, ${validTripIds.size} trips, ${validRouteIds.size} routes`,
);
}
@ -348,6 +437,7 @@ export async function handleDownloadGtfsDe(job: Job<DownloadGtfsDeData>): Promis
if (!force && !sourceChanged && dataExists) {
const existingBboxes = existingMarker?.bboxes;
const filterVersionOk = existingMarker?.filterVersion === FILTER_VERSION;
// Does the existing filtered data cover all requested bboxes?
const bboxesCovered = !bboxes?.length || allBboxesCovered(existingBboxes, bboxes);
@ -356,8 +446,8 @@ export async function handleDownloadGtfsDe(job: Job<DownloadGtfsDeData>): Promis
// Marker already reflects desired filtering?
const markerOk = !bboxes?.length || (existingBboxes && existingBboxes.length > 0);
if (markerOk) {
console.log(`[download-gtfs-de] GTFS feed up to date (source=${effectiveSource}), skipping`);
if (markerOk && filterVersionOk) {
console.log(`[download-gtfs-de] GTFS feed up to date (source=${effectiveSource}, filterVersion=${FILTER_VERSION}), skipping`);
await job.updateProgress({
stage: "Downloading GTFS",
pct: 100,
@ -366,8 +456,12 @@ export async function handleDownloadGtfsDe(job: Job<DownloadGtfsDeData>): Promis
return;
}
// Data is unfiltered but bboxes are now requested — filter in place.
console.log(`[download-gtfs-de] Applying bbox filter to existing GTFS data`);
// Data needs re-filtering: either unfiltered (bboxes newly requested)
// or filter algorithm changed (filterVersion mismatch).
const reason = !filterVersionOk
? `filter algorithm updated (v${existingMarker?.filterVersion ?? "none"} → v${FILTER_VERSION})`
: "applying bbox filter to unfiltered data";
console.log(`[download-gtfs-de] Re-filtering existing GTFS data: ${reason}`);
await job.updateProgress({
stage: "Downloading GTFS",
pct: 10,

View file

@ -96,12 +96,18 @@ export async function handleRefreshCity(
// Parallel pipeline DAG (bottom-up — leaves execute first):
//
// download-pbf ──────┬─→ extract-pois ────────────────────
// download-pbf ──────┬─→ extract-pois ──────────────────────────
// │ ├─→ generate-grid → compute-scores
// download-pbf ──┐ └─→ build-valhalla ──────────────────┘
// └──→ build-valhalla (waits for both ↑)
// download-pbf ──────┴─→ build-valhalla (road, "valhalla") ┤
// │
// download-pbf ──┐ │
// └─→ build-valhalla (transit, "valhalla-transit")┘
// download-gtfs-de ──┘
//
// Road tiles are built without GTFS (clean, no transit connections → cycling works).
// Transit tiles are built with GTFS (multimodal routing on the separate instance).
// generate-grid waits for all three parallel branches before compute-scores runs.
//
// compute-scores Phase 1 also dispatches ingest-boris-ni (NI cities only)
// as a child alongside the routing jobs, so it runs during routing.
const rootNode = {
@ -122,9 +128,8 @@ export async function handleRefreshCity(
data: { type: "generate-grid" as const, citySlug, resolutionM },
opts: JOB_OPTIONS["generate-grid"],
children: [
// extract-pois and build-valhalla run in parallel — each gets its
// own download-pbf child; the idempotency guard ensures only one
// actually downloads when they race.
// Three parallel branches — each gets its own download-pbf child;
// the idempotency guard ensures only one actually downloads when they race.
{
name: "extract-pois",
queueName: "pipeline",
@ -137,6 +142,8 @@ export async function handleRefreshCity(
opts: JOB_OPTIONS["extract-pois"],
children: [downloadNode()],
},
// Road-only Valhalla build — no GTFS, produces clean tiles without
// transit connections. Cycling/walking/driving route from this instance.
{
name: "build-valhalla",
queueName: "valhalla",
@ -147,14 +154,27 @@ export async function handleRefreshCity(
...(bbox ? { bbox } : {}),
},
opts: JOB_OPTIONS["build-valhalla"],
children: [downloadNode()],
},
// Transit Valhalla build — depends on GTFS download. Produces tiles with
// road↔transit connections. Multimodal routing comes from this instance.
{
name: "build-valhalla",
queueName: "valhalla-transit",
data: {
type: "build-valhalla" as const,
citySlug,
pbfPath,
...(bbox ? { bbox } : {}),
},
opts: JOB_OPTIONS["build-valhalla"],
children: [
downloadNode(),
// Download GTFS feed before building tiles so valhalla_build_transit
// runs during this build. The job is idempotent — it skips immediately
// if the feed is already present, so subsequent refreshes are cheap.
// Download GTFS feed before building transit tiles. Idempotent —
// skips if the feed is current, so subsequent refreshes are cheap.
{
name: "download-gtfs-de",
queueName: "valhalla",
queueName: "valhalla-transit",
data: {
type: "download-gtfs-de" as const,
url: "https://download.gtfs.de/germany/nv_free/latest.zip",

View file

@ -6,8 +6,9 @@ import { handleBuildValhalla } from "./jobs/build-valhalla.js";
import { handleDownloadGtfsDe } from "./jobs/download-gtfs-de.js";
const VALHALLA_CONFIG = process.env.VALHALLA_CONFIG ?? "/data/valhalla/valhalla.json";
const VALHALLA_QUEUE_NAME = process.env.VALHALLA_QUEUE_NAME ?? "valhalla";
console.log("[valhalla-worker] Starting Transportationer Valhalla worker…");
console.log(`[valhalla-worker] Starting Transportationer Valhalla worker (queue=${VALHALLA_QUEUE_NAME})…`);
// ─── Valhalla service process manager ─────────────────────────────────────────
// The valhalla_service HTTP server runs as a child process alongside this
@ -49,7 +50,7 @@ function stopValhallaService(): Promise<void> {
// ─── BullMQ worker ────────────────────────────────────────────────────────────
const worker = new Worker(
"valhalla",
VALHALLA_QUEUE_NAME,
async (job: Job) => {
console.log(`[valhalla-worker] Processing job ${job.id} type=${job.data.type} city=${job.data.citySlug ?? "(rebuild)"}`);
@ -107,4 +108,4 @@ process.on("SIGINT", shutdown);
// Start serving if tiles already exist from a previous run
startValhallaService();
console.log("[valhalla-worker] Ready — waiting for build-valhalla jobs on 'valhalla' queue");
console.log(`[valhalla-worker] Ready — waiting for jobs on '${VALHALLA_QUEUE_NAME}' queue`);

View file

@ -1,4 +1,6 @@
const VALHALLA_URL = process.env.VALHALLA_URL ?? "http://localhost:8002";
/** Transit instance (port 8003). Falls back to VALHALLA_URL if not set. */
const VALHALLA_TRANSIT_URL = process.env.VALHALLA_TRANSIT_URL ?? VALHALLA_URL;
const COSTING: Record<"walking" | "cycling" | "driving", string> = {
walking: "pedestrian",
@ -10,9 +12,7 @@ const COSTING: Record<"walking" | "cycling" | "driving", string> = {
// Must match the scoring thresholds used in compute-scores.
export const TRANSIT_CONTOUR_MINUTES = [5, 10, 15, 20, 30] as const;
// Fixed weekday morning departure for reproducible transit scores.
// GTFS schedules repeat weekly, so the exact date doesn't matter — any Tuesday works.
const TRANSIT_DEPARTURE = "2024-01-16T08:00";
import { nextTuesdayDeparture } from "@transportationer/shared";
export interface TransitContour {
minutes: number;
@ -27,6 +27,7 @@ export interface TransitContour {
*/
export async function fetchTransitIsochrone(
source: LatLng,
departureDate: string,
): Promise<TransitContour[] | null> {
const body = {
locations: [{ lat: source.lat, lon: source.lng }],
@ -36,12 +37,12 @@ export async function fetchTransitIsochrone(
costing_options: {
transit: { use_bus: 1.0, use_rail: 1.0, use_transfers: 1.0 },
},
date_time: { type: 1, value: TRANSIT_DEPARTURE },
date_time: { type: 1, value: departureDate },
};
let resp: Response;
try {
resp = await fetch(`${VALHALLA_URL}/isochrone`, {
resp = await fetch(`${VALHALLA_TRANSIT_URL}/isochrone`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
@ -71,6 +72,21 @@ export async function fetchTransitIsochrone(
return contours.length >= 2 ? contours : null;
}
/**
* Parse a cached Valhalla isochrone FeatureCollection back into TransitContour[].
* Mirrors the extraction logic in fetchTransitIsochrone.
*/
export function parseTransitContours(geojson: object): TransitContour[] | null {
const fc = geojson as { features?: Array<{ properties: { contour: number }; geometry: object }> };
if (!Array.isArray(fc.features)) return null;
const contours: TransitContour[] = [];
for (const minutes of TRANSIT_CONTOUR_MINUTES) {
const feature = fc.features.find((f) => f.properties?.contour === minutes);
if (feature?.geometry) contours.push({ minutes, geojson: feature.geometry });
}
return contours.length >= 2 ? contours : null;
}
export interface LatLng {
lat: number;
lng: number;
@ -110,8 +126,8 @@ export async function fetchMatrix(
mode: "walking" | "cycling" | "driving",
): Promise<(number | null)[][]> {
const body = {
sources: sources.map(({ lat, lng }) => ({ lat, lon: lng })),
targets: targets.map(({ lat, lng }) => ({ lat, lon: lng })),
sources: sources.map(({ lat, lng }) => ({ lat, lon: lng, radius: 20 })),
targets: targets.map(({ lat, lng }) => ({ lat, lon: lng, radius: 20 })),
costing: COSTING[mode],
};
const bodyJson = JSON.stringify(body);