From 0d740bc12d9c9f5d9b616d168efd545c6ba78a54 Mon Sep 17 00:00:00 2001 From: Jan-Henrik Bruhn Date: Thu, 5 Mar 2026 16:51:19 +0100 Subject: [PATCH] fix: instantiate a second valhalla instance to fix transit ingestion --- apps/web/app/admin/cities/[slug]/page.tsx | 36 +--- apps/web/app/admin/cities/new/page.tsx | 172 +----------------- apps/web/app/api/admin/cities/[slug]/route.ts | 18 +- .../app/api/admin/jobs/[id]/stream/route.ts | 19 +- apps/web/app/api/admin/jobs/route.ts | 17 +- apps/web/app/api/isochrones/route.ts | 12 +- apps/web/components/city-ingest-progress.tsx | 172 ++++++++++++++++++ apps/web/lib/cache.ts | 2 +- apps/web/lib/queue.ts | 19 ++ apps/web/lib/valhalla.ts | 10 +- docker-compose.yml | 47 ++++- infra/schema.sql | 16 +- shared/src/index.ts | 1 + shared/src/utils.ts | 15 ++ worker/src/jobs/build-valhalla.ts | 154 ++++++++++------ worker/src/jobs/compute-scores.ts | 5 + worker/src/jobs/compute-transit.ts | 63 ++++++- worker/src/jobs/download-gtfs-de.ts | 162 +++++++++++++---- worker/src/jobs/refresh-city.ts | 42 +++-- worker/src/valhalla-main.ts | 9 +- worker/src/valhalla.ts | 32 +++- 21 files changed, 654 insertions(+), 369 deletions(-) create mode 100644 apps/web/components/city-ingest-progress.tsx create mode 100644 shared/src/utils.ts diff --git a/apps/web/app/admin/cities/[slug]/page.tsx b/apps/web/app/admin/cities/[slug]/page.tsx index d197d1a..cf9a55d 100644 --- a/apps/web/app/admin/cities/[slug]/page.tsx +++ b/apps/web/app/admin/cities/[slug]/page.tsx @@ -2,7 +2,7 @@ import { useState, useEffect } from "react"; import { useParams, useRouter } from "next/navigation"; -import { useJobProgress } from "@/hooks/use-job-progress"; +import { CityIngestProgress } from "@/components/city-ingest-progress"; interface CityDetail { slug: string; @@ -24,8 +24,6 @@ export default function CityDetailPage() { const [jobId, setJobId] = useState(null); const [deleting, setDeleting] = useState(false); - const { stages, overall } = useJobProgress(jobId); - useEffect(() => { fetch(`/api/admin/cities`) .then((r) => r.json()) @@ -101,37 +99,7 @@ export default function CityDetailPage() { {/* Live progress if ingesting */} - {jobId && ( -
-

Ingestion Progress

-
    - {stages.map((s) => ( -
  1. - - {s.status === "completed" ? "✓" : s.status === "active" ? "…" : "○"} - - - {s.label} - - {s.status === "active" && ( - {s.pct}% - )} -
  2. - ))} -
- {overall === "completed" && ( -

✓ Ingestion complete!

- )} -
- )} + {/* Actions */}
diff --git a/apps/web/app/admin/cities/new/page.tsx b/apps/web/app/admin/cities/new/page.tsx index 709efa5..f2c6898 100644 --- a/apps/web/app/admin/cities/new/page.tsx +++ b/apps/web/app/admin/cities/new/page.tsx @@ -2,8 +2,7 @@ import { useState, useEffect, useMemo, useRef, useCallback } from "react"; import type { GeofabrikFeature, GeofabrikIndex } from "@transportationer/shared"; -import { useJobProgress } from "@/hooks/use-job-progress"; -import type { StageStatus, RoutingDetail as RoutingDetailType } from "@/hooks/use-job-progress"; +import { CityIngestProgress } from "@/components/city-ingest-progress"; type Step = "browse" | "confirm" | "ingest"; @@ -568,173 +567,6 @@ function ConfirmStep({ // ─── Progress step ──────────────────────────────────────────────────────────── -function StageIcon({ status }: { status: StageStatus["status"] }) { - if (status === "completed") - return ( - - ✓ - - ); - if (status === "failed") - return ( - - ✗ - - ); - if (status === "active") - return ( - - - - - - - ); - return ; -} - -function StageRow({ - stage, - error, -}: { - stage: StageStatus; - error?: string; -}) { - return ( -
- -
-

- {stage.label} -

- {stage.status === "active" && ( - <> -
-
-
-

{stage.message}

- - )} - {stage.status === "failed" && error && ( -

{error}

- )} -
-
- ); -} - -function RoutingGrid({ routingDetail }: { routingDetail: RoutingDetailType }) { - const MODE_LABELS: Record = { - walking: "Walking", - cycling: "Cycling", - driving: "Driving", - transit: "Transit", - }; - const entries = Object.entries(routingDetail); - if (entries.length === 0) return null; - return ( -
- {entries.map(([mode, { done, total }]) => ( -
- {MODE_LABELS[mode] ?? mode} -
-
0 ? `${(done / total) * 100}%` : done > 0 ? "100%" : "0%" }} - /> -
- - {total > 1 ? `${done}/${total}` : done >= 1 ? "done" : "…"} - -
- ))} -
- ); -} - -function ProgressStep({ jobId }: { jobId: string | null }) { - const { stages, overall, error, routingDetail } = useJobProgress(jobId); - - // Group consecutive parallel stages together for rendering - type StageGroup = - | { kind: "single"; stage: StageStatus } - | { kind: "parallel"; stages: StageStatus[] }; - - const groups: StageGroup[] = []; - for (const stage of stages) { - if (stage.parallelGroup) { - const last = groups[groups.length - 1]; - if (last?.kind === "parallel" && last.stages[0].parallelGroup === stage.parallelGroup) { - last.stages.push(stage); - } else { - groups.push({ kind: "parallel", stages: [stage] }); - } - } else { - groups.push({ kind: "single", stage }); - } - } - - return ( -
-

Processing City Data

- -
- {groups.map((group, gi) => - group.kind === "single" ? ( -
- - {/* Show per-mode routing grid under the compute-accessibility stage */} - {group.stage.key === "Computing scores" && - group.stage.status === "active" && - routingDetail && ( - - )} -
- ) : ( -
-

- Running in parallel -

- {group.stages.map((s) => ( - - ))} -
- ), - )} -
- - {overall === "completed" && ( -
- ✓ City ingestion complete!{" "} - - Return to dashboard - {" "} - or{" "} - - view on map - - . -
- )} - - {overall === "failed" && ( -
- ✗ Ingestion failed: {error}{" "} - - Return to dashboard - - . -
- )} -
- ); -} // ─── Main page ──────────────────────────────────────────────────────────────── @@ -806,7 +638,7 @@ export default function AddCityPage() { onConfirm={handleConfirm} /> )} - {step === "ingest" && } + {step === "ingest" && }
); } diff --git a/apps/web/app/api/admin/cities/[slug]/route.ts b/apps/web/app/api/admin/cities/[slug]/route.ts index f7a3456..c5bf27b 100644 --- a/apps/web/app/api/admin/cities/[slug]/route.ts +++ b/apps/web/app/api/admin/cities/[slug]/route.ts @@ -1,7 +1,7 @@ import { NextRequest, NextResponse } from "next/server"; import { sql } from "@/lib/db"; import { cacheDel } from "@/lib/cache"; -import { getValhallaQueue } from "@/lib/queue"; +import { getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue"; export const runtime = "nodejs"; @@ -27,15 +27,13 @@ export async function DELETE( cacheDel(`api:stats:*${slug}*`), ]); - // Remove city from the global Valhalla routing tile set. - // The valhalla-worker will delete the city's clipped PBF and rebuild - // tiles from all remaining cities' PBFs in one pass. - const valhallaQueue = getValhallaQueue(); - await valhallaQueue.add( - "build-valhalla", - { type: "build-valhalla", removeSlugs: [slug] }, - { attempts: 1, removeOnComplete: { age: 86400 } }, - ); + // Remove city from both Valhalla routing tile sets (road + transit). + const rebuildOpts = { attempts: 1, removeOnComplete: { age: 86400 } } as const; + const rebuildData = { type: "build-valhalla" as const, removeSlugs: [slug] }; + await Promise.all([ + getValhallaQueue().add("build-valhalla", rebuildData, rebuildOpts), + getValhallaTransitQueue().add("build-valhalla", rebuildData, rebuildOpts), + ]); return NextResponse.json({ deleted: slug }); } diff --git a/apps/web/app/api/admin/jobs/[id]/stream/route.ts b/apps/web/app/api/admin/jobs/[id]/stream/route.ts index 1ed7459..d073ec0 100644 --- a/apps/web/app/api/admin/jobs/[id]/stream/route.ts +++ b/apps/web/app/api/admin/jobs/[id]/stream/route.ts @@ -1,6 +1,6 @@ import { NextRequest } from "next/server"; import { Job } from "bullmq"; -import { getPipelineQueue, getValhallaQueue } from "@/lib/queue"; +import { getPipelineQueue, getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue"; import type { PipelineJobData, JobProgress, ComputeScoresJobData, RefreshCityJobData } from "@/lib/queue"; import type { SSEEvent, RoutingDetail } from "@transportationer/shared"; import { CATEGORY_IDS } from "@transportationer/shared"; @@ -48,7 +48,8 @@ export async function GET( }); } - const valhallaQueue = getValhallaQueue(); + const valhallaQueue = getValhallaQueue(); + const valhallaTransitQueue = getValhallaTransitQueue(); const stream = new ReadableStream({ start(ctrl) { @@ -77,11 +78,13 @@ export async function GET( } // 1. Fetch active jobs and waiting-children jobs in parallel. - const [pipelineActive, valhallaActive, waitingChildren] = await Promise.all([ + const [pipelineActive, valhallaActive, valhallaTransitActive, waitingChildren] = await Promise.all([ queue.getActive(0, 100), valhallaQueue.getActive(0, 100), + valhallaTransitQueue.getActive(0, 100), queue.getWaitingChildren(0, 200), ]); + const allValhallaActive = [...valhallaActive, ...valhallaTransitActive]; // 1a. Parallel routing phase: compute-scores is waiting for its routing // children to finish. Report aggregate progress instead of one job's pct. @@ -158,7 +161,8 @@ export async function GET( const extractPoisJob = pipelineActive.find( (j) => j.data.citySlug === citySlug && j.data.type === "extract-pois", ); - const buildValhallaJob = valhallaActive.find( + // Prefer the road build (valhalla queue) for progress; fall back to transit. + const buildValhallaJob = allValhallaActive.find( (j) => j.data.citySlug === citySlug && j.data.type === "build-valhalla", ); if (extractPoisJob || buildValhallaJob) { @@ -176,7 +180,7 @@ export async function GET( // immediately at pct=100 while the other does the real download. // Prefer the job that has actual byte progress so the UI doesn't // regress from 100% → 5% when the skip job is seen first. - const allCityActive = [...pipelineActive, ...valhallaActive].filter( + const allCityActive = [...pipelineActive, ...allValhallaActive].filter( (j) => j.data.citySlug === citySlug && j.data.type !== "refresh-city", ); const activeJob = @@ -226,11 +230,12 @@ export async function GET( // 3. Check BullMQ failed queue as a secondary signal (catches failures // before the worker's DB update propagates, e.g. DB connection issues). - const [pipelineFailed, valhallaFailed] = await Promise.all([ + const [pipelineFailed, valhallaFailed, valhallaTransitFailed] = await Promise.all([ queue.getFailed(0, 50), valhallaQueue.getFailed(0, 50), + valhallaTransitQueue.getFailed(0, 50), ]); - const recentFail = [...pipelineFailed, ...valhallaFailed].find( + const recentFail = [...pipelineFailed, ...valhallaFailed, ...valhallaTransitFailed].find( (j) => j.data.citySlug === citySlug && j.data.type !== "refresh-city" && diff --git a/apps/web/app/api/admin/jobs/route.ts b/apps/web/app/api/admin/jobs/route.ts index 2999e3b..aa10706 100644 --- a/apps/web/app/api/admin/jobs/route.ts +++ b/apps/web/app/api/admin/jobs/route.ts @@ -1,15 +1,16 @@ import { NextResponse } from "next/server"; -import { getPipelineQueue, getValhallaQueue } from "@/lib/queue"; +import { getPipelineQueue, getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue"; import type { JobSummary } from "@transportationer/shared"; export const runtime = "nodejs"; export async function GET() { - const [pQueue, vQueue] = [getPipelineQueue(), getValhallaQueue()]; + const [pQueue, vQueue, vtQueue] = [getPipelineQueue(), getValhallaQueue(), getValhallaTransitQueue()]; const [ pWaiting, pWaitingChildren, pActive, pCompleted, pFailed, vWaiting, vActive, vCompleted, vFailed, + vtWaiting, vtActive, vtCompleted, vtFailed, ] = await Promise.all([ pQueue.getWaiting(0, 20), pQueue.getWaitingChildren(0, 20), @@ -20,13 +21,17 @@ export async function GET() { vQueue.getActive(0, 20), vQueue.getCompleted(0, 20), vQueue.getFailed(0, 20), + vtQueue.getWaiting(0, 20), + vtQueue.getActive(0, 20), + vtQueue.getCompleted(0, 20), + vtQueue.getFailed(0, 20), ]); const waitingChildren = [...pWaitingChildren]; - const waiting = [...pWaiting, ...vWaiting]; - const active = [...pActive, ...vActive]; - const completed = [...pCompleted, ...vCompleted]; - const failed = [...pFailed, ...vFailed]; + const waiting = [...pWaiting, ...vWaiting, ...vtWaiting]; + const active = [...pActive, ...vActive, ...vtActive]; + const completed = [...pCompleted, ...vCompleted, ...vtCompleted]; + const failed = [...pFailed, ...vFailed, ...vtFailed]; const all = [...active, ...waitingChildren, ...waiting, ...completed, ...failed]; diff --git a/apps/web/app/api/isochrones/route.ts b/apps/web/app/api/isochrones/route.ts index e70e942..25e7a57 100644 --- a/apps/web/app/api/isochrones/route.ts +++ b/apps/web/app/api/isochrones/route.ts @@ -1,7 +1,8 @@ import { NextRequest, NextResponse } from "next/server"; import { sql } from "@/lib/db"; import { fetchIsochrone } from "@/lib/valhalla"; -import { getValhallaQueue } from "@/lib/queue"; +import { getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue"; +import { nextTuesdayDeparture } from "@transportationer/shared"; export const runtime = "nodejs"; @@ -35,6 +36,7 @@ export async function POST(req: NextRequest) { : [5, 10, 15]; const mode = typeof travelMode === "string" ? travelMode : "walking"; + const departureDate = mode === "transit" ? nextTuesdayDeparture() : null; // Check PostGIS isochrone cache const cached = await Promise.resolve(sql<{ result: object }[]>` @@ -42,6 +44,7 @@ export async function POST(req: NextRequest) { FROM isochrone_cache WHERE travel_mode = ${mode} AND contours_min = ${contours} + AND departure_date IS NOT DISTINCT FROM ${departureDate} AND ST_DWithin( origin_geom::geography, ST_SetSRID(ST_MakePoint(${lng}, ${lat}), 4326)::geography, @@ -59,7 +62,9 @@ export async function POST(req: NextRequest) { // Refuse to call valhalla_service while tiles are being rebuilt — // the service is stopped during the build and requests would hang or fail. - const activeValhalla = await getValhallaQueue().getActiveCount(); + // Check the queue that owns the requested mode's instance. + const rebuildQueue = mode === "transit" ? getValhallaTransitQueue() : getValhallaQueue(); + const activeValhalla = await rebuildQueue.getActiveCount(); if (activeValhalla > 0) { return NextResponse.json( { error: "Routing engine is rebuilding, please try again shortly.", code: "VALHALLA_REBUILDING" }, @@ -93,11 +98,12 @@ export async function POST(req: NextRequest) { // parses it as JSON itself. Without the cast, postgres.js infers the JSONB // column type and re-encodes the string as a JSONB string literal. await Promise.resolve(sql` - INSERT INTO isochrone_cache (origin_geom, travel_mode, contours_min, result) + INSERT INTO isochrone_cache (origin_geom, travel_mode, contours_min, departure_date, result) VALUES ( ST_SetSRID(ST_MakePoint(${lng}, ${lat}), 4326), ${mode}, ${contours}, + ${departureDate}, ${JSON.stringify(geojson)}::jsonb ) `); diff --git a/apps/web/components/city-ingest-progress.tsx b/apps/web/components/city-ingest-progress.tsx new file mode 100644 index 0000000..4f5ef89 --- /dev/null +++ b/apps/web/components/city-ingest-progress.tsx @@ -0,0 +1,172 @@ +"use client"; + +import { useJobProgress } from "@/hooks/use-job-progress"; +import type { StageStatus, RoutingDetail as RoutingDetailType } from "@/hooks/use-job-progress"; + +function StageIcon({ status }: { status: StageStatus["status"] }) { + if (status === "completed") + return ( + + ✓ + + ); + if (status === "failed") + return ( + + ✗ + + ); + if (status === "active") + return ( + + + + + + + ); + return ; +} + +function StageRow({ stage, error }: { stage: StageStatus; error?: string }) { + return ( +
+ +
+

+ {stage.label} +

+ {stage.status === "active" && ( + <> +
+
+
+

{stage.message}

+ + )} + {stage.status === "failed" && error && ( +

{error}

+ )} +
+
+ ); +} + +function RoutingGrid({ routingDetail }: { routingDetail: RoutingDetailType }) { + const MODE_LABELS: Record = { + walking: "Walking", + cycling: "Cycling", + driving: "Driving", + transit: "Transit", + }; + const entries = Object.entries(routingDetail); + if (entries.length === 0) return null; + return ( +
+ {entries.map(([mode, { done, total }]) => ( +
+ {MODE_LABELS[mode] ?? mode} +
+
0 ? `${(done / total) * 100}%` : done > 0 ? "100%" : "0%" }} + /> +
+ + {total > 1 ? `${done}/${total}` : done >= 1 ? "done" : "…"} + +
+ ))} +
+ ); +} + +export function CityIngestProgress({ + jobId, + className = "card max-w-lg", +}: { + jobId: string | null; + className?: string; +}) { + const { stages, overall, error, routingDetail } = useJobProgress(jobId); + + if (!jobId) return null; + + type StageGroup = + | { kind: "single"; stage: StageStatus } + | { kind: "parallel"; stages: StageStatus[] }; + + const groups: StageGroup[] = []; + for (const stage of stages) { + if (stage.parallelGroup) { + const last = groups[groups.length - 1]; + if (last?.kind === "parallel" && last.stages[0].parallelGroup === stage.parallelGroup) { + last.stages.push(stage); + } else { + groups.push({ kind: "parallel", stages: [stage] }); + } + } else { + groups.push({ kind: "single", stage }); + } + } + + return ( +
+

Processing City Data

+ +
+ {groups.map((group, gi) => + group.kind === "single" ? ( +
+ + {group.stage.key === "Computing scores" && + group.stage.status === "active" && + routingDetail && ( + + )} +
+ ) : ( +
+

+ Running in parallel +

+ {group.stages.map((s) => ( + + ))} +
+ ), + )} +
+ + {overall === "completed" && ( +
+ ✓ City ingestion complete!{" "} + + Return to dashboard + {" "} + or{" "} + + view on map + + . +
+ )} + + {overall === "failed" && ( +
+ ✗ Ingestion failed: {error}{" "} + + Return to dashboard + + . +
+ )} +
+ ); +} diff --git a/apps/web/lib/cache.ts b/apps/web/lib/cache.ts index 0ba582e..97132de 100644 --- a/apps/web/lib/cache.ts +++ b/apps/web/lib/cache.ts @@ -2,7 +2,7 @@ import { getRedis } from "./redis"; /** TTL in seconds for each cache category */ const TTL = { - API_CITIES: 3600, // 1 hour + API_CITIES: 30, // 30 seconds — city status changes during ingest API_POIS: 300, // 5 minutes API_GRID: 600, // 10 minutes API_STATS: 120, // 2 minutes diff --git a/apps/web/lib/queue.ts b/apps/web/lib/queue.ts index ee9ed61..dc2aff1 100644 --- a/apps/web/lib/queue.ts +++ b/apps/web/lib/queue.ts @@ -26,6 +26,9 @@ declare global { // eslint-disable-next-line no-var // eslint-disable-next-line @typescript-eslint/no-explicit-any var __valhallaQueue: Queue | undefined; + // eslint-disable-next-line no-var + // eslint-disable-next-line @typescript-eslint/no-explicit-any + var __valhallaTransitQueue: Queue | undefined; } // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -59,6 +62,22 @@ export function getValhallaQueue(): Queue { return globalThis.__valhallaQueue; } +/** Queue for build-valhalla transit jobs, processed by the valhalla-transit container. */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export function getValhallaTransitQueue(): Queue { + if (!globalThis.__valhallaTransitQueue) { + globalThis.__valhallaTransitQueue = new Queue("valhalla-transit", { + connection: createBullMQConnection(), + defaultJobOptions: { + attempts: 1, + removeOnComplete: { age: 86400 * 7 }, + removeOnFail: { age: 86400 * 30 }, + }, + }); + } + return globalThis.__valhallaTransitQueue; +} + export function createQueueEvents(): QueueEvents { return new QueueEvents("pipeline", { connection: createBullMQConnection(), diff --git a/apps/web/lib/valhalla.ts b/apps/web/lib/valhalla.ts index 03650f5..9c82ade 100644 --- a/apps/web/lib/valhalla.ts +++ b/apps/web/lib/valhalla.ts @@ -1,4 +1,7 @@ -const VALHALLA_BASE = process.env.VALHALLA_URL ?? "http://valhalla:8002"; +import { nextTuesdayDeparture } from "@transportationer/shared"; + +const VALHALLA_BASE = process.env.VALHALLA_URL ?? "http://valhalla:8002"; +const VALHALLA_TRANSIT_BASE = process.env.VALHALLA_TRANSIT_URL ?? VALHALLA_BASE; export type ValhallaCosting = "pedestrian" | "bicycle" | "auto"; @@ -28,10 +31,11 @@ export async function fetchIsochrone(opts: IsochroneOpts): Promise { }; if (isTransit) { body.costing_options = { transit: { use_bus: 1.0, use_rail: 1.0, use_transfers: 1.0 } }; - body.date_time = { type: 0 }; // current time + body.date_time = { type: 1, value: nextTuesdayDeparture() }; } - const res = await fetch(`${VALHALLA_BASE}/isochrone`, { + const base = isTransit ? VALHALLA_TRANSIT_BASE : VALHALLA_BASE; + const res = await fetch(`${base}/isochrone`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(body), diff --git a/docker-compose.yml b/docker-compose.yml index 5e5178e..bd30e43 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,10 +31,10 @@ services: timeout: 5s retries: 5 - # ─── Valhalla routing engine + BullMQ worker ────────────────────────────── - # Built from the gis-ops Valhalla image + Node.js. This container does two - # things: processes build-valhalla BullMQ jobs (running valhalla_build_tiles - # natively) and serves the resulting tiles via valhalla_service on port 8002. + # ─── Valhalla road worker (port 8002) ───────────────────────────────────── + # Builds road-only tiles (no transit data) → cycling/walking/driving routing. + # Without GTFS in its volume, valhalla_build_tiles produces clean road tiles + # with no ghost transit edges, so bicycle routing works correctly. valhalla: build: context: . @@ -42,19 +42,45 @@ services: restart: unless-stopped volumes: - osm_data:/data/osm:ro # PBF files downloaded by the main worker - - valhalla_tiles:/data/valhalla # Valhalla config and routing tiles + - valhalla_tiles:/data/valhalla # Road-only config and tiles environment: REDIS_HOST: valkey REDIS_PORT: "6379" REDIS_PASSWORD: ${VALKEY_PASSWORD} + VALHALLA_QUEUE_NAME: valhalla + OSM_DATA_DIR: /data/osm + VALHALLA_CONFIG: /data/valhalla/valhalla.json + VALHALLA_TILES_DIR: /data/valhalla/valhalla_tiles + NODE_ENV: production + ports: + - "127.0.0.1:8002:8002" # Valhalla HTTP API (road) + depends_on: + valkey: + condition: service_healthy + + # ─── Valhalla transit worker (port 8002 internal) ───────────────────────── + # Builds tiles with GTFS transit data → multimodal routing. + # Separate volume from the road worker so transit ghost edges never affect + # the road instance. + valhalla-transit: + build: + context: . + target: valhalla-worker + restart: unless-stopped + volumes: + - osm_data:/data/osm:ro # PBF files downloaded by the main worker + - valhalla_tiles_transit:/data/valhalla # Transit config, tiles and GTFS data + environment: + REDIS_HOST: valkey + REDIS_PORT: "6379" + REDIS_PASSWORD: ${VALKEY_PASSWORD} + VALHALLA_QUEUE_NAME: valhalla-transit OSM_DATA_DIR: /data/osm VALHALLA_CONFIG: /data/valhalla/valhalla.json VALHALLA_TILES_DIR: /data/valhalla/valhalla_tiles NODE_ENV: production # Optional: connect-info.net token for NDS-specific GTFS feed CONNECT_INFO_TOKEN: ${CONNECT_INFO_TOKEN:-} - ports: - - "127.0.0.1:8002:8002" # Valhalla HTTP API depends_on: valkey: condition: service_healthy @@ -83,6 +109,7 @@ services: REDIS_PORT: "6379" REDIS_PASSWORD: ${VALKEY_PASSWORD} VALHALLA_URL: http://valhalla:8002 + VALHALLA_TRANSIT_URL: http://valhalla-transit:8002 ADMIN_PASSWORD_HASH: ${ADMIN_PASSWORD_HASH} ADMIN_JWT_SECRET: ${ADMIN_JWT_SECRET} NODE_ENV: production @@ -106,6 +133,7 @@ services: OSM_DATA_DIR: /data/osm LUA_SCRIPT: /app/infra/osm2pgsql.lua VALHALLA_URL: http://valhalla:8002 + VALHALLA_TRANSIT_URL: http://valhalla-transit:8002 NODE_ENV: production # Optional: enables NDS-specific GTFS source for cities in Niedersachsen CONNECT_INFO_TOKEN: ${CONNECT_INFO_TOKEN:-} @@ -120,6 +148,7 @@ services: volumes: postgres_data: valkey_data: - osm_data: # Shared: worker writes, valhalla reads - valhalla_tiles: + osm_data: # Shared: worker writes, valhalla containers read + valhalla_tiles: # Road-only tiles (no transit) — cycling works correctly here + valhalla_tiles_transit: # Transit tiles (with GTFS) — multimodal routing pmtiles_data: diff --git a/infra/schema.sql b/infra/schema.sql index ec52ea5..cb1e84a 100644 --- a/infra/schema.sql +++ b/infra/schema.sql @@ -117,18 +117,22 @@ CREATE INDEX IF NOT EXISTS idx_grid_poi_details_lookup -- ─── Isochrone cache ────────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS isochrone_cache ( - id BIGSERIAL PRIMARY KEY, - origin_geom geometry(Point, 4326) NOT NULL, - travel_mode TEXT NOT NULL, - contours_min INTEGER[] NOT NULL, - result JSONB NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now() + id BIGSERIAL PRIMARY KEY, + origin_geom geometry(Point, 4326) NOT NULL, + travel_mode TEXT NOT NULL, + contours_min INTEGER[] NOT NULL, + departure_date TEXT, -- NULL for non-transit; 'YYYY-MM-DDTHH:mm' for transit + result JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_isochrone_origin ON isochrone_cache USING GIST (origin_geom); CREATE INDEX IF NOT EXISTS idx_isochrone_created ON isochrone_cache (created_at); +CREATE INDEX IF NOT EXISTS idx_isochrone_mode_departure + ON isochrone_cache (travel_mode, departure_date) + WHERE departure_date IS NOT NULL; -- Auto-expire isochrone cache entries older than 30 days -- (handled by periodic cleanup or TTL logic in app) diff --git a/shared/src/index.ts b/shared/src/index.ts index f93602b..d0de709 100644 --- a/shared/src/index.ts +++ b/shared/src/index.ts @@ -2,3 +2,4 @@ export * from "./osm-tags.js"; export * from "./types.js"; export * from "./queue.js"; export * from "./profiles.js"; +export * from "./utils.js"; diff --git a/shared/src/utils.ts b/shared/src/utils.ts new file mode 100644 index 0000000..6dd4de9 --- /dev/null +++ b/shared/src/utils.ts @@ -0,0 +1,15 @@ +/** + * Returns the next Tuesday at 08:00 as a Valhalla-compatible datetime string + * (YYYY-MM-DDTHH:mm). Used for reproducible transit scoring across worker and + * UI — always lands within a freshly-downloaded GTFS feed's validity window. + * Tuesday is a representative mid-week workday; "next" ensures we never pick + * today (which may be a partial day or have no upcoming service). + */ +export function nextTuesdayDeparture(): string { + const now = new Date(); + const daysUntilTue = (2 - now.getDay() + 7) % 7 || 7; // 1–7, always future + const d = new Date(now); + d.setDate(d.getDate() + daysUntilTue); + const pad = (n: number) => String(n).padStart(2, "0"); + return `${d.getFullYear()}-${pad(d.getMonth() + 1)}-${pad(d.getDate())}T08:00`; +} diff --git a/worker/src/jobs/build-valhalla.ts b/worker/src/jobs/build-valhalla.ts index a05fc5b..0ac69a0 100644 --- a/worker/src/jobs/build-valhalla.ts +++ b/worker/src/jobs/build-valhalla.ts @@ -35,19 +35,25 @@ const ADMINS_SQLITE = `${VALHALLA_DATA_DIR}/admins.sqlite`; * the Valhalla default (/data/valhalla/transit) and to persist transit tiles * between builds. * - * IMPORTANT build order (per Valhalla docs): - * 1. valhalla_build_tiles — road graph; also downloads timezone.sqlite - * 2. valhalla_ingest_transit — GTFS → transit PBF tiles in transit_dir - * 3. valhalla_convert_transit — reads transit PBFs + road tiles → transit graph + * IMPORTANT build order: + * 1. valhalla_ingest_transit — GTFS → transit PBF staging files in transit_dir + * 2. valhalla_convert_transit — PBF → transit graph tiles (.gph) in transit_dir + * 3. valhalla_build_tiles — road graph + reads transit tiles from transit_dir + * → creates road tiles WITH road-to-transit edges + * → copies transit tiles into tile_dir/3/ * - * valhalla_convert_transit REQUIRES road tiles to exist (it uses GraphReader - * to look up road node IDs for stop connections). Running it before - * valhalla_build_tiles causes it to crash looking for tiles that don't exist. + * valhalla_build_tiles MUST run AFTER valhalla_convert_transit so it can find + * the transit .gph tiles in transit_dir and embed road↔transit connections in + * the road tiles. Running build_tiles before convert results in road tiles with + * no transit connections (transit routing silently falls back to walking). + * + * valhalla_convert_transit does NOT require road tiles — it only reads the + * transit PBF staging files and writes transit graph tiles. It can run on an + * empty tile_dir without crashing. * * TRANSIT_CACHE_MARKER tracks whether ingest PBFs are current relative to the - * GTFS source. valhalla_convert_transit is always re-run after a road build - * because road node IDs change on each rebuild and old transit-to-road - * connections would otherwise be stale. + * GTFS source. valhalla_convert_transit always runs after ingest (or when the + * ingest cache is fresh) so transit_dir has up-to-date .gph before build_tiles. */ const TRANSIT_CACHE_DIR = `${VALHALLA_DATA_DIR}/transit_graph`; /** Written after a successful valhalla_ingest_transit; compared against GTFS source mtime. */ @@ -55,6 +61,19 @@ const TRANSIT_CACHE_MARKER = `${TRANSIT_CACHE_DIR}/.ready`; /** Written by download-gtfs-de after each successful GTFS extraction. */ const GTFS_SOURCE_MARKER = `${GTFS_FEED_DIR}/.source`; +/** + * Buffer added to the city bbox when clipping the road PBF with osmium. + * Transit stops within the city bbox may be in low-road-density areas (parks, + * new developments, demand-responsive zones) where the nearest OSM road is + * outside the exact bbox clip. Without coverage, valhalla_build_tiles crashes + * with a memory corruption error ("double free" / "bad_array_new_length"). + * stop_times is already filtered to bbox-local stops so the buffer only adds + * road coverage — it does NOT let Germany-wide transit stops into the graph. + * 0.2° ≈ 18 km at 53 °N — covers roads for all plausibly in-city stops. + */ +const ROAD_BBOX_BUFFER = 0.2; + + /** * Manifest file: maps citySlug → absolute path of its routing PBF. * Persists in the valhalla_tiles Docker volume across restarts. @@ -201,6 +220,12 @@ function generateConfig(): void { */ function isTransitIngestFresh(): boolean { if (!existsSync(TRANSIT_CACHE_MARKER) || !existsSync(GTFS_SOURCE_MARKER)) return false; + // Verify at least one transit PBF tile exists — the marker can survive a + // cache-dir wipe (crash recovery) and we'd otherwise skip ingest with an + // empty transit dir, causing valhalla_convert_transit to fail silently. + // Valhalla 3.x ingest writes level-3 tiles; check for the directory. + const level3Dir = `${TRANSIT_CACHE_DIR}/3`; + if (!existsSync(level3Dir)) return false; return statSync(TRANSIT_CACHE_MARKER).mtimeMs >= statSync(GTFS_SOURCE_MARKER).mtimeMs; } @@ -236,21 +261,32 @@ export async function handleBuildValhalla( stage: "Building routing graph", pct: 5, message: bbox - ? `Clipping PBF to bbox [${bbox.map((v) => v.toFixed(3)).join(", ")}]…` + ? `Clipping PBF to city bbox (may expand for transit coverage)…` : `Registering full PBF for ${citySlug}…`, } satisfies JobProgress); let routingPbf: string; if (bbox) { - const [minLng, minLat, maxLng, maxLat] = bbox; const clippedPbf = `${VALHALLA_DATA_DIR}/${citySlug}-routing.osm.pbf`; if (!existsSync(pbfPath)) throw new Error(`PBF file not found: ${pbfPath}`); + const extractBbox: [number, number, number, number] = [ + bbox[0] - ROAD_BBOX_BUFFER, + bbox[1] - ROAD_BBOX_BUFFER, + bbox[2] + ROAD_BBOX_BUFFER, + bbox[3] + ROAD_BBOX_BUFFER, + ]; + + console.log( + `[build-valhalla] Road extract bbox: city [${bbox.map((v) => v.toFixed(3)).join(", ")}]` + + ` + ${ROAD_BBOX_BUFFER}° buffer → [${extractBbox.map((v) => v.toFixed(3)).join(", ")}]`, + ); + await runProcess("osmium", [ "extract", - `--bbox=${minLng},${minLat},${maxLng},${maxLat}`, + `--bbox=${extractBbox[0]},${extractBbox[1]},${extractBbox[2]},${extractBbox[3]}`, pbfPath, "-o", clippedPbf, "--overwrite", @@ -289,73 +325,47 @@ export async function handleBuildValhalla( return; } - // ── Step 3: build road tiles ────────────────────────────────────────────── + // ── Step 3: transit ingest + convert ───────────────────────────────────── // - // valhalla_build_tiles MUST run before transit operations: - // • valhalla_convert_transit needs road tiles (GraphReader) to look up road - // node IDs for each transit stop — running it before this step causes the - // "Couldn't load .../0/000/000.pbf" crash. - // - // valhalla_build_tiles ignores any transit tiles in transit_dir (it filters - // them out of the hierarchy build), so there is no "transit connection" pass - // to worry about — transit connectivity is created by convert_transit. - - await job.updateProgress({ - stage: "Building routing graph", - pct: 10, - message: `Building road routing tiles for: ${allSlugs.join(", ")}`, - } satisfies JobProgress); - - await runProcess("valhalla_build_tiles", ["-c", VALHALLA_CONFIG, ...allPbfs]); - - console.log("[build-valhalla] Road tiles built"); - - // ── Step 4: transit tile preparation ───────────────────────────────────── - // - // Transit runs after road tiles exist. Three sub-steps: - // - // 4a. timezone db — valhalla_build_timezones (one-time, skip if exists). - // valhalla_ingest_transit needs it to assign timezone info to stops. - // Without it, ingest skips writing the root index tile (0/000/000.pbf) - // and valhalla_convert_transit crashes trying to load it. - // - // 4b. valhalla_ingest_transit — GTFS → transit PBF tiles in transit_dir. - // Only re-run when GTFS data changed (expensive: can take hours). - // - // 4c. valhalla_convert_transit — transit PBFs + road tiles → transit graph. - // ALWAYS re-run after a road build because road node IDs change on - // every rebuild; old transit-to-road connections would be stale. + // Build order: ingest → convert → road tiles. + // valhalla_build_tiles MUST run after valhalla_convert_transit so it finds + // transit .gph tiles in transit_dir and embeds road↔transit connection edges + // in the road tiles. Without convert running first, road tiles have no transit + // connections and multimodal routing silently falls back to walking. + // valhalla_convert_transit does NOT need road tiles — it only reads the GTFS + // staging PBFs and writes the transit graph tiles. const gtfsReady = existsSync(GTFS_FEED_DIR) && readdirSync(GTFS_FEED_DIR).some((f) => f.endsWith(".txt")); + let ingestPbfsAvailable = false; + if (gtfsReady) { - // 4a: timezone database — one-time setup, persists in VALHALLA_DATA_DIR. - // valhalla_ingest_transit needs this to assign timezone info to stops; + // 3a: timezone database — one-time setup, persists in VALHALLA_DATA_DIR. + // valhalla_ingest_transit needs it to assign timezone info to stops; // without it the root index tile (0/000/000.pbf) is not written and // valhalla_convert_transit crashes trying to load it. if (!existsSync(TIMEZONE_SQLITE)) { await job.updateProgress({ stage: "Building routing graph", - pct: 73, + pct: 10, message: "Building timezone database (one-time setup)…", } satisfies JobProgress); try { await buildTimezoneDb(); } catch (err) { console.warn("[build-valhalla] valhalla_build_timezones failed — skipping transit:", err); - // Can't safely run transit ingest without timezone db. } } - // 4b: ingest (only when GTFS changed, and only when timezone db is ready) - let ingestPbfsAvailable = isTransitIngestFresh(); + // 3b: ingest (only when GTFS changed, and only when timezone db is ready) + ingestPbfsAvailable = isTransitIngestFresh(); if (!ingestPbfsAvailable && existsSync(TIMEZONE_SQLITE)) { await job.updateProgress({ stage: "Building routing graph", - pct: 75, + pct: 12, message: "Ingesting GTFS transit feeds…", } satisfies JobProgress); try { @@ -374,17 +384,23 @@ export async function handleBuildValhalla( mkdirSync(TRANSIT_CACHE_DIR, { recursive: true }); } } else if (ingestPbfsAvailable) { - console.log("[build-valhalla] Transit ingest cache is fresh — skipping ingest"); + await job.updateProgress({ + stage: "Building routing graph", + pct: 12, + message: "Transit ingest cache is fresh — skipping re-ingest", + } satisfies JobProgress); } else { console.log("[build-valhalla] timezone.sqlite unavailable — skipping transit ingest"); } - // 4c: convert (always, to reconnect transit to the new road graph) + // 3c: convert transit PBF staging files → transit graph tiles (.gph) + // Runs even when ingest was skipped (cache fresh) so transit_dir always + // has up-to-date .gph tiles before valhalla_build_tiles reads them. if (ingestPbfsAvailable) { await job.updateProgress({ stage: "Building routing graph", - pct: 85, - message: "Connecting transit tiles to road graph…", + pct: 15, + message: "Converting transit staging tiles to graph tiles…", } satisfies JobProgress); try { await runProcess("valhalla_convert_transit", ["-c", VALHALLA_CONFIG]); @@ -394,9 +410,27 @@ export async function handleBuildValhalla( } } } else { - console.log("[build-valhalla] No GTFS feed found — skipping transit tile prep"); + console.log("[build-valhalla] No GTFS feed found — skipping transit ingest/convert"); } + // ── Step 4: build road tiles ────────────────────────────────────────────── + // + // Runs AFTER valhalla_convert_transit so transit .gph tiles are present in + // transit_dir. valhalla_build_tiles reads them, embeds road↔transit connection + // edges in the road tiles, and copies transit tiles into tile_dir/3/. + // Without transit tiles present at this step, road tiles have no transit + // connections and multimodal routing silently falls back to walking. + + await job.updateProgress({ + stage: "Building routing graph", + pct: 20, + message: `Building road routing tiles for: ${allSlugs.join(", ")}`, + } satisfies JobProgress); + + await runProcess("valhalla_build_tiles", ["-c", VALHALLA_CONFIG, ...allPbfs]); + + console.log("[build-valhalla] Road tiles built"); + // ── Step 5: restart Valhalla service ───────────────────────────────────── await job.updateProgress({ diff --git a/worker/src/jobs/compute-scores.ts b/worker/src/jobs/compute-scores.ts index 201b1dc..6065d1b 100644 --- a/worker/src/jobs/compute-scores.ts +++ b/worker/src/jobs/compute-scores.ts @@ -124,6 +124,9 @@ export async function handleComputeScores( removeOnComplete: { age: 86400 * 7 }, removeOnFail: { age: 86400 * 30 }, parent: { id: job.id!, queue: queue.qualifiedName }, + // Transit is optional — a failure should not cascade to the parent. + // The city will be marked ready with walking/cycling scores only. + ignoreDependencyOnFailure: true, }, ); } @@ -139,6 +142,8 @@ export async function handleComputeScores( removeOnComplete: { age: 86400 * 7 }, removeOnFail: { age: 86400 * 30 }, parent: { id: job.id!, queue: queue.qualifiedName }, + // Boris NI data is optional — failure should not cancel the pipeline. + ignoreDependencyOnFailure: true, }, ); } diff --git a/worker/src/jobs/compute-transit.ts b/worker/src/jobs/compute-transit.ts index 32d2136..24b2078 100644 --- a/worker/src/jobs/compute-transit.ts +++ b/worker/src/jobs/compute-transit.ts @@ -20,9 +20,9 @@ */ import type { Job } from "bullmq"; import { getSql } from "../db.js"; -import { fetchTransitIsochrone } from "../valhalla.js"; +import { fetchTransitIsochrone, parseTransitContours, TRANSIT_CONTOUR_MINUTES } from "../valhalla.js"; import type { JobProgress } from "@transportationer/shared"; -import { CATEGORY_IDS } from "@transportationer/shared"; +import { CATEGORY_IDS, nextTuesdayDeparture } from "@transportationer/shared"; export type ComputeTransitData = { type: "compute-transit"; @@ -46,9 +46,66 @@ async function asyncPool( await Promise.all(Array.from({ length: Math.min(concurrency, items.length) }, worker)); } +/** Check isochrone cache then call Valhalla, writing the result back to cache. */ +async function getTransitIsochrone( + sql: ReturnType, + gp: { lat: number; lng: number }, + departureDate: string, +) { + type CacheRow = { result: object }; + const cached = await Promise.resolve(sql` + SELECT result FROM isochrone_cache + WHERE travel_mode = 'transit' + AND departure_date = ${departureDate} + AND contours_min = ${TRANSIT_CONTOUR_MINUTES as unknown as number[]} + AND ST_DWithin( + origin_geom::geography, + ST_SetSRID(ST_MakePoint(${gp.lng}, ${gp.lat}), 4326)::geography, + 50 + ) + LIMIT 1 + `); + + if (cached.length > 0) { + return parseTransitContours(cached[0].result); + } + + const contours = await fetchTransitIsochrone({ lat: gp.lat, lng: gp.lng }, departureDate); + + if (contours) { + const geojson = { + type: "FeatureCollection", + features: contours.map((c) => ({ + type: "Feature", + properties: { contour: c.minutes }, + geometry: c.geojson, + })), + }; + try { + await Promise.resolve(sql` + INSERT INTO isochrone_cache + (origin_geom, travel_mode, contours_min, departure_date, result) + VALUES ( + ST_SetSRID(ST_MakePoint(${gp.lng}, ${gp.lat}), 4326), + 'transit', + ${TRANSIT_CONTOUR_MINUTES as unknown as number[]}, + ${departureDate}, + ${JSON.stringify(geojson)}::jsonb + ) + ON CONFLICT DO NOTHING + `); + } catch { + // Cache write failure is non-fatal + } + } + + return contours; +} + export async function handleComputeTransit(job: Job): Promise { const { citySlug } = job.data; const sql = getSql(); + const departureDate = nextTuesdayDeparture(); const gridPoints = await Promise.resolve(sql<{ id: string; lat: number; lng: number }[]>` SELECT id::text AS id, ST_Y(geom) AS lat, ST_X(geom) AS lng @@ -78,7 +135,7 @@ export async function handleComputeTransit(job: Job): Promis let withTransit = 0; await asyncPool(BATCH_CONCURRENCY, gridPoints, async (gp) => { - const contours = await fetchTransitIsochrone({ lat: gp.lat, lng: gp.lng }); + const contours = await getTransitIsochrone(sql, gp, departureDate); processed++; if (!contours || contours.length === 0) { diff --git a/worker/src/jobs/download-gtfs-de.ts b/worker/src/jobs/download-gtfs-de.ts index 8ae5ea8..09ae1c9 100644 --- a/worker/src/jobs/download-gtfs-de.ts +++ b/worker/src/jobs/download-gtfs-de.ts @@ -51,14 +51,22 @@ export type DownloadGtfsDeData = { const GTFS_DATA_DIR = process.env.GTFS_DATA_DIR ?? "/data/valhalla/gtfs"; const GTFS_ZIP_PATH = `${GTFS_DATA_DIR}/feed.zip`; const GTFS_FEED_DIR = `${GTFS_DATA_DIR}/feed`; -/** Records which source/bboxes last populated GTFS_FEED_DIR. JSON format. */ +/** Records which source/bboxes/algorithm last populated GTFS_FEED_DIR. JSON format. */ const SOURCE_MARKER = `${GTFS_FEED_DIR}/.source`; +/** + * Bump this when the filtering algorithm changes in a way that produces + * different output from the same source + bboxes. This forces a re-filter + * on the existing extracted data without re-downloading. + */ +const FILTER_VERSION = 2; + // ─── Source marker helpers ──────────────────────────────────────────────────── interface SourceMarker { source: string; bboxes?: [number, number, number, number][]; + filterVersion?: number; } function readSourceMarker(): SourceMarker | null { @@ -73,7 +81,7 @@ function readSourceMarker(): SourceMarker | null { } function writeSourceMarker(source: string, bboxes?: [number, number, number, number][]): void { - writeFileSync(SOURCE_MARKER, JSON.stringify({ source, bboxes })); + writeFileSync(SOURCE_MARKER, JSON.stringify({ source, bboxes, filterVersion: FILTER_VERSION })); } /** True when `outer` fully contains `inner`. */ @@ -205,7 +213,11 @@ async function filterGtfsByBboxes( renameSync(tmpPath, filePath); } - // ── Step 1: filter stops.txt by bbox → validStopIds ────────────────────── + // ── Step 1: collect bbox stop IDs (read-only — stops.txt not written yet) ── + // + // Build the set of stops within the bbox — used to seed validTripIds (step 2a) + // and to filter stop_times to local stops only (step 2b). stops.txt itself is + // filtered in step 3 to only bbox stops that appear in the final stop_times. const stopsPath = path.join(feedDir, "stops.txt"); if (!existsSync(stopsPath)) { @@ -213,21 +225,35 @@ async function filterGtfsByBboxes( return; } - const validStopIds = new Set(); - filterSmallCsv( - stopsPath, - (idx, fields) => { - const lat = parseFloat(fields[idx.get("stop_lat") ?? -1] ?? "NaN"); - const lon = parseFloat(fields[idx.get("stop_lon") ?? -1] ?? "NaN"); - return inAnyBbox(lat, lon); - }, - (idx, fields) => { - validStopIds.add(fields[idx.get("stop_id") ?? -1] ?? ""); - }, - ); - console.log(`[download-gtfs-de] Bbox filter: ${validStopIds.size} stops in area`); + const bboxStopIds = new Set(); + // Also track the bbox of seeding stops — used later to expand the road tile + // extraction in build-valhalla to cover these stops without expanding to the + // full retained-stops area (which includes Germany-wide long-distance trip stops). + let seedMinLng = Infinity, seedMinLat = Infinity, seedMaxLng = -Infinity, seedMaxLat = -Infinity; + { + const lines = readFileSync(stopsPath, "utf8").split(/\r?\n/).filter((l) => l.trim()); + if (lines.length >= 2) { + const idx = colIndex(lines[0]); + const stopIdCol = idx.get("stop_id") ?? -1; + const latCol = idx.get("stop_lat") ?? -1; + const lonCol = idx.get("stop_lon") ?? -1; + for (let i = 1; i < lines.length; i++) { + const fields = splitCsv(lines[i]); + const lat = parseFloat(fields[latCol] ?? "NaN"); + const lon = parseFloat(fields[lonCol] ?? "NaN"); + if (inAnyBbox(lat, lon)) { + bboxStopIds.add(fields[stopIdCol] ?? ""); + if (isFinite(lat) && isFinite(lon)) { + seedMinLat = Math.min(seedMinLat, lat); seedMaxLat = Math.max(seedMaxLat, lat); + seedMinLng = Math.min(seedMinLng, lon); seedMaxLng = Math.max(seedMaxLng, lon); + } + } + } + } + } + console.log(`[download-gtfs-de] Bbox filter: ${bboxStopIds.size} stops seed the area`); - if (validStopIds.size === 0) { + if (bboxStopIds.size === 0) { console.warn( "[download-gtfs-de] No stops found in any bbox — GTFS filter skipped " + "(check bbox coverage and feed area)", @@ -235,7 +261,7 @@ async function filterGtfsByBboxes( return; } - // ── Step 2 (pass 1): collect trip_ids that serve the area ───────────────── + // ── Step 2a: collect trip_ids that serve the area (pass 1) ──────────────── const stopTimesPath = path.join(feedDir, "stop_times.txt"); if (!existsSync(stopTimesPath)) { @@ -244,6 +270,9 @@ async function filterGtfsByBboxes( } const validTripIds = new Set(); + // Count how many bbox-local stops each trip has — trips with only 1 bbox + // stop are useless for routing (no O→D pair) and are pruned before step 2b. + const tripBboxStopCount = new Map(); { let stopIdCol = -1; let tripIdCol = -1; @@ -258,24 +287,84 @@ async function filterGtfsByBboxes( tripIdCol = idx.get("trip_id") ?? -1; continue; } - // stop_id and trip_id never contain commas/quotes — fast split is safe const fields = line.split(","); - if (stopIdCol >= 0 && validStopIds.has(fields[stopIdCol] ?? "")) { - validTripIds.add(fields[tripIdCol] ?? ""); + const tripId = fields[tripIdCol] ?? ""; + const stopId = fields[stopIdCol] ?? ""; + if (stopIdCol >= 0 && bboxStopIds.has(stopId)) { + validTripIds.add(tripId); + tripBboxStopCount.set(tripId, (tripBboxStopCount.get(tripId) ?? 0) + 1); } } } - console.log(`[download-gtfs-de] Bbox filter: ${validTripIds.size} trips serve the area`); + // Remove trips with only one bbox stop — they can't provide an O→D pair + for (const tripId of validTripIds) { + if ((tripBboxStopCount.get(tripId) ?? 0) < 2) validTripIds.delete(tripId); + } + console.log(`[download-gtfs-de] Bbox filter: ${validTripIds.size} trips with ≥2 bbox stops serve the area`); - // ── Step 2 (pass 2): write filtered stop_times.txt ──────────────────────── + // ── Step 2b: write filtered stop_times, keeping only bbox stops (pass 2) ─── + // + // We keep a stop_times entry only when BOTH: + // - its trip has ≥2 bbox stops (trip_id ∈ validTripIds), AND + // - the stop itself is within the city bbox (stop_id ∈ bboxStopIds). + // + // Out-of-bbox stops on long-distance routes (e.g. ICE Hamburg↔Bremen passing + // through Oldenburg) are stripped from stop_times. Trips with only one bbox + // stop are removed entirely (no O→D pair, useless for routing). This limits + // the transit graph to local stops only, ensuring valhalla_build_tiles can + // create road connections for all included stops without ghost edge references + // that cause routing errors for other modes (bicycle, driving). - await filterLargeCsv( - stopTimesPath, - (tripIdCol, line) => validTripIds.has(line.split(",")[tripIdCol] ?? ""), - (idx) => idx.get("trip_id") ?? -1, + const allTripStopIds = new Set(); + { + const tmpPath = stopTimesPath + ".tmp"; + const writer = createWriteStream(tmpPath); + let isFirst = true; + let tripIdCol = -1; + let stopIdCol = -1; + const rl = createInterface({ input: createReadStream(stopTimesPath), crlfDelay: Infinity }); + for await (const line of rl) { + if (!line.trim()) continue; + if (isFirst) { + isFirst = false; + const idx = colIndex(line); + tripIdCol = idx.get("trip_id") ?? -1; + stopIdCol = idx.get("stop_id") ?? -1; + writer.write(line + "\n"); + continue; + } + const fields = line.split(","); + if ( + validTripIds.has(fields[tripIdCol] ?? "") && + bboxStopIds.has(fields[stopIdCol] ?? "") + ) { + allTripStopIds.add(fields[stopIdCol] ?? ""); + writer.write(line + "\n"); + } + } + await new Promise((resolve, reject) => + writer.end((err?: unknown) => (err ? reject(err) : resolve())), + ); + renameSync(tmpPath, stopTimesPath); + } + + // ── Step 3: filter stops.txt to bbox stops used by kept trips ────────────── + + filterSmallCsv( + stopsPath, + (idx, fields) => allTripStopIds.has(fields[idx.get("stop_id") ?? -1] ?? ""), ); - // ── Step 3: filter trips.txt ─────────────────────────────────────────────── + if (isFinite(seedMinLat)) { + const stopsBbox: [number, number, number, number] = [seedMinLng, seedMinLat, seedMaxLng, seedMaxLat]; + writeFileSync(path.join(feedDir, ".stops_bbox"), JSON.stringify(stopsBbox)); + console.log( + `[download-gtfs-de] Transit stops bbox (seeding area): [${stopsBbox.map((v) => v.toFixed(3)).join(", ")}]`, + ); + } + console.log(`[download-gtfs-de] Bbox filter: ${allTripStopIds.size} bbox stops kept across ${validTripIds.size} trips`); + + // ── Step 4: filter trips.txt ─────────────────────────────────────────────── const validRouteIds = new Set(); const validServiceIds = new Set(); @@ -292,7 +381,7 @@ async function filterGtfsByBboxes( }, ); - // ── Step 4: filter remaining files ──────────────────────────────────────── + // ── Step 5: filter remaining files ──────────────────────────────────────── filterSmallCsv( path.join(feedDir, "routes.txt"), @@ -317,7 +406,7 @@ async function filterGtfsByBboxes( console.log( `[download-gtfs-de] GTFS filter complete: ` + - `${validStopIds.size} stops, ${validTripIds.size} trips, ${validRouteIds.size} routes`, + `${allTripStopIds.size} stops, ${validTripIds.size} trips, ${validRouteIds.size} routes`, ); } @@ -348,6 +437,7 @@ export async function handleDownloadGtfsDe(job: Job): Promis if (!force && !sourceChanged && dataExists) { const existingBboxes = existingMarker?.bboxes; + const filterVersionOk = existingMarker?.filterVersion === FILTER_VERSION; // Does the existing filtered data cover all requested bboxes? const bboxesCovered = !bboxes?.length || allBboxesCovered(existingBboxes, bboxes); @@ -356,8 +446,8 @@ export async function handleDownloadGtfsDe(job: Job): Promis // Marker already reflects desired filtering? const markerOk = !bboxes?.length || (existingBboxes && existingBboxes.length > 0); - if (markerOk) { - console.log(`[download-gtfs-de] GTFS feed up to date (source=${effectiveSource}), skipping`); + if (markerOk && filterVersionOk) { + console.log(`[download-gtfs-de] GTFS feed up to date (source=${effectiveSource}, filterVersion=${FILTER_VERSION}), skipping`); await job.updateProgress({ stage: "Downloading GTFS", pct: 100, @@ -366,8 +456,12 @@ export async function handleDownloadGtfsDe(job: Job): Promis return; } - // Data is unfiltered but bboxes are now requested — filter in place. - console.log(`[download-gtfs-de] Applying bbox filter to existing GTFS data`); + // Data needs re-filtering: either unfiltered (bboxes newly requested) + // or filter algorithm changed (filterVersion mismatch). + const reason = !filterVersionOk + ? `filter algorithm updated (v${existingMarker?.filterVersion ?? "none"} → v${FILTER_VERSION})` + : "applying bbox filter to unfiltered data"; + console.log(`[download-gtfs-de] Re-filtering existing GTFS data: ${reason}`); await job.updateProgress({ stage: "Downloading GTFS", pct: 10, diff --git a/worker/src/jobs/refresh-city.ts b/worker/src/jobs/refresh-city.ts index 73098b0..2741bf8 100644 --- a/worker/src/jobs/refresh-city.ts +++ b/worker/src/jobs/refresh-city.ts @@ -96,12 +96,18 @@ export async function handleRefreshCity( // Parallel pipeline DAG (bottom-up — leaves execute first): // - // download-pbf ──────┬─→ extract-pois ────────────────────┐ - // │ ├─→ generate-grid → compute-scores - // download-pbf ──┐ └─→ build-valhalla ──────────────────┘ - // └──→ build-valhalla (waits for both ↑) + // download-pbf ──────┬─→ extract-pois ──────────────────────────┐ + // │ ├─→ generate-grid → compute-scores + // download-pbf ──────┴─→ build-valhalla (road, "valhalla") ┤ + // │ + // download-pbf ──┐ │ + // └─→ build-valhalla (transit, "valhalla-transit")┘ // download-gtfs-de ──┘ // + // Road tiles are built without GTFS (clean, no transit connections → cycling works). + // Transit tiles are built with GTFS (multimodal routing on the separate instance). + // generate-grid waits for all three parallel branches before compute-scores runs. + // // compute-scores Phase 1 also dispatches ingest-boris-ni (NI cities only) // as a child alongside the routing jobs, so it runs during routing. const rootNode = { @@ -122,9 +128,8 @@ export async function handleRefreshCity( data: { type: "generate-grid" as const, citySlug, resolutionM }, opts: JOB_OPTIONS["generate-grid"], children: [ - // extract-pois and build-valhalla run in parallel — each gets its - // own download-pbf child; the idempotency guard ensures only one - // actually downloads when they race. + // Three parallel branches — each gets its own download-pbf child; + // the idempotency guard ensures only one actually downloads when they race. { name: "extract-pois", queueName: "pipeline", @@ -137,6 +142,8 @@ export async function handleRefreshCity( opts: JOB_OPTIONS["extract-pois"], children: [downloadNode()], }, + // Road-only Valhalla build — no GTFS, produces clean tiles without + // transit connections. Cycling/walking/driving route from this instance. { name: "build-valhalla", queueName: "valhalla", @@ -147,14 +154,27 @@ export async function handleRefreshCity( ...(bbox ? { bbox } : {}), }, opts: JOB_OPTIONS["build-valhalla"], + children: [downloadNode()], + }, + // Transit Valhalla build — depends on GTFS download. Produces tiles with + // road↔transit connections. Multimodal routing comes from this instance. + { + name: "build-valhalla", + queueName: "valhalla-transit", + data: { + type: "build-valhalla" as const, + citySlug, + pbfPath, + ...(bbox ? { bbox } : {}), + }, + opts: JOB_OPTIONS["build-valhalla"], children: [ downloadNode(), - // Download GTFS feed before building tiles so valhalla_build_transit - // runs during this build. The job is idempotent — it skips immediately - // if the feed is already present, so subsequent refreshes are cheap. + // Download GTFS feed before building transit tiles. Idempotent — + // skips if the feed is current, so subsequent refreshes are cheap. { name: "download-gtfs-de", - queueName: "valhalla", + queueName: "valhalla-transit", data: { type: "download-gtfs-de" as const, url: "https://download.gtfs.de/germany/nv_free/latest.zip", diff --git a/worker/src/valhalla-main.ts b/worker/src/valhalla-main.ts index af8f77e..02ff36c 100644 --- a/worker/src/valhalla-main.ts +++ b/worker/src/valhalla-main.ts @@ -5,9 +5,10 @@ import { createBullMQConnection } from "./redis.js"; import { handleBuildValhalla } from "./jobs/build-valhalla.js"; import { handleDownloadGtfsDe } from "./jobs/download-gtfs-de.js"; -const VALHALLA_CONFIG = process.env.VALHALLA_CONFIG ?? "/data/valhalla/valhalla.json"; +const VALHALLA_CONFIG = process.env.VALHALLA_CONFIG ?? "/data/valhalla/valhalla.json"; +const VALHALLA_QUEUE_NAME = process.env.VALHALLA_QUEUE_NAME ?? "valhalla"; -console.log("[valhalla-worker] Starting Transportationer Valhalla worker…"); +console.log(`[valhalla-worker] Starting Transportationer Valhalla worker (queue=${VALHALLA_QUEUE_NAME})…`); // ─── Valhalla service process manager ───────────────────────────────────────── // The valhalla_service HTTP server runs as a child process alongside this @@ -49,7 +50,7 @@ function stopValhallaService(): Promise { // ─── BullMQ worker ──────────────────────────────────────────────────────────── const worker = new Worker( - "valhalla", + VALHALLA_QUEUE_NAME, async (job: Job) => { console.log(`[valhalla-worker] Processing job ${job.id} type=${job.data.type} city=${job.data.citySlug ?? "(rebuild)"}`); @@ -107,4 +108,4 @@ process.on("SIGINT", shutdown); // Start serving if tiles already exist from a previous run startValhallaService(); -console.log("[valhalla-worker] Ready — waiting for build-valhalla jobs on 'valhalla' queue"); +console.log(`[valhalla-worker] Ready — waiting for jobs on '${VALHALLA_QUEUE_NAME}' queue`); diff --git a/worker/src/valhalla.ts b/worker/src/valhalla.ts index 9a30672..9ecc3c1 100644 --- a/worker/src/valhalla.ts +++ b/worker/src/valhalla.ts @@ -1,4 +1,6 @@ -const VALHALLA_URL = process.env.VALHALLA_URL ?? "http://localhost:8002"; +const VALHALLA_URL = process.env.VALHALLA_URL ?? "http://localhost:8002"; +/** Transit instance (port 8003). Falls back to VALHALLA_URL if not set. */ +const VALHALLA_TRANSIT_URL = process.env.VALHALLA_TRANSIT_URL ?? VALHALLA_URL; const COSTING: Record<"walking" | "cycling" | "driving", string> = { walking: "pedestrian", @@ -10,9 +12,7 @@ const COSTING: Record<"walking" | "cycling" | "driving", string> = { // Must match the scoring thresholds used in compute-scores. export const TRANSIT_CONTOUR_MINUTES = [5, 10, 15, 20, 30] as const; -// Fixed weekday morning departure for reproducible transit scores. -// GTFS schedules repeat weekly, so the exact date doesn't matter — any Tuesday works. -const TRANSIT_DEPARTURE = "2024-01-16T08:00"; +import { nextTuesdayDeparture } from "@transportationer/shared"; export interface TransitContour { minutes: number; @@ -27,6 +27,7 @@ export interface TransitContour { */ export async function fetchTransitIsochrone( source: LatLng, + departureDate: string, ): Promise { const body = { locations: [{ lat: source.lat, lon: source.lng }], @@ -36,12 +37,12 @@ export async function fetchTransitIsochrone( costing_options: { transit: { use_bus: 1.0, use_rail: 1.0, use_transfers: 1.0 }, }, - date_time: { type: 1, value: TRANSIT_DEPARTURE }, + date_time: { type: 1, value: departureDate }, }; let resp: Response; try { - resp = await fetch(`${VALHALLA_URL}/isochrone`, { + resp = await fetch(`${VALHALLA_TRANSIT_URL}/isochrone`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(body), @@ -71,6 +72,21 @@ export async function fetchTransitIsochrone( return contours.length >= 2 ? contours : null; } +/** + * Parse a cached Valhalla isochrone FeatureCollection back into TransitContour[]. + * Mirrors the extraction logic in fetchTransitIsochrone. + */ +export function parseTransitContours(geojson: object): TransitContour[] | null { + const fc = geojson as { features?: Array<{ properties: { contour: number }; geometry: object }> }; + if (!Array.isArray(fc.features)) return null; + const contours: TransitContour[] = []; + for (const minutes of TRANSIT_CONTOUR_MINUTES) { + const feature = fc.features.find((f) => f.properties?.contour === minutes); + if (feature?.geometry) contours.push({ minutes, geojson: feature.geometry }); + } + return contours.length >= 2 ? contours : null; +} + export interface LatLng { lat: number; lng: number; @@ -110,8 +126,8 @@ export async function fetchMatrix( mode: "walking" | "cycling" | "driving", ): Promise<(number | null)[][]> { const body = { - sources: sources.map(({ lat, lng }) => ({ lat, lon: lng })), - targets: targets.map(({ lat, lng }) => ({ lat, lon: lng })), + sources: sources.map(({ lat, lng }) => ({ lat, lon: lng, radius: 20 })), + targets: targets.map(({ lat, lng }) => ({ lat, lon: lng, radius: 20 })), costing: COSTING[mode], }; const bodyJson = JSON.stringify(body);