{CATEGORIES.map((cat) => {
const score = data.categoryScores[cat.id] ?? 0;
- const dist = data.distancesM[cat.id];
- const time = data.travelTimesS[cat.id];
const barColor =
score >= 0.65 ? "#22c55e" : score >= 0.4 ? "#eab308" : "#ef4444";
const subcats = data.subcategoryDetails?.[cat.id];
+ // Use the fastest subcategory entry as the category headline — this is
+ // consistent with what the user sees in the expanded view and avoids
+ // the straight-line-nearest POI having an unexpectedly long routed time.
+ const bestSubcat = subcats?.reduce
((best, d) => {
+ if (d.travelTimeS == null) return best;
+ if (best == null || d.travelTimeS < (best.travelTimeS ?? Infinity)) return d;
+ return best;
+ }, null);
+ const time = bestSubcat?.travelTimeS ?? data.travelTimesS[cat.id];
+ const dist = bestSubcat?.distanceM ?? data.distancesM[cat.id];
const isExpanded = expandedCategory === cat.id;
const hasDetails = subcats && subcats.length > 0;
return (
diff --git a/apps/web/lib/queue.ts b/apps/web/lib/queue.ts
index ef9b0d2..01bd1c5 100644
--- a/apps/web/lib/queue.ts
+++ b/apps/web/lib/queue.ts
@@ -32,6 +32,16 @@ declare global {
// eslint-disable-next-line no-var
// eslint-disable-next-line @typescript-eslint/no-explicit-any
var __valhallaTransitQueue: Queue | undefined;
+ // eslint-disable-next-line no-var
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ var __routingQueue: Queue | undefined;
+ // eslint-disable-next-line no-var
+ var __routingQueueEvents: QueueEvents | undefined;
+ // eslint-disable-next-line no-var
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ var __routingTransitQueue: Queue | undefined;
+ // eslint-disable-next-line no-var
+ var __routingTransitQueueEvents: QueueEvents | undefined;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -102,3 +112,53 @@ export function createQueueEvents(): QueueEvents {
connection: createBullMQConnection(),
});
}
+
+/** On-demand isochrone queue for road modes (walking/cycling/driving). */
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export function getRoutingQueue(): Queue {
+ if (!globalThis.__routingQueue) {
+ globalThis.__routingQueue = new Queue("routing", {
+ connection: createBullMQConnection(),
+ defaultJobOptions: {
+ attempts: 1,
+ removeOnComplete: { age: 120 },
+ removeOnFail: { age: 300 },
+ },
+ });
+ }
+ return globalThis.__routingQueue;
+}
+
+export function getRoutingQueueEvents(): QueueEvents {
+ if (!globalThis.__routingQueueEvents) {
+ globalThis.__routingQueueEvents = new QueueEvents("routing", {
+ connection: createBullMQConnection(),
+ });
+ }
+ return globalThis.__routingQueueEvents;
+}
+
+/** On-demand isochrone queue for transit mode. */
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export function getRoutingTransitQueue(): Queue {
+ if (!globalThis.__routingTransitQueue) {
+ globalThis.__routingTransitQueue = new Queue("routing-transit", {
+ connection: createBullMQConnection(),
+ defaultJobOptions: {
+ attempts: 1,
+ removeOnComplete: { age: 120 },
+ removeOnFail: { age: 300 },
+ },
+ });
+ }
+ return globalThis.__routingTransitQueue;
+}
+
+export function getRoutingTransitQueueEvents(): QueueEvents {
+ if (!globalThis.__routingTransitQueueEvents) {
+ globalThis.__routingTransitQueueEvents = new QueueEvents("routing-transit", {
+ connection: createBullMQConnection(),
+ });
+ }
+ return globalThis.__routingTransitQueueEvents;
+}
diff --git a/apps/web/lib/valhalla.ts b/apps/web/lib/valhalla.ts
index 9c82ade..f35993e 100644
--- a/apps/web/lib/valhalla.ts
+++ b/apps/web/lib/valhalla.ts
@@ -1,62 +1,39 @@
import { nextTuesdayDeparture } from "@transportationer/shared";
-
-const VALHALLA_BASE = process.env.VALHALLA_URL ?? "http://valhalla:8002";
-const VALHALLA_TRANSIT_BASE = process.env.VALHALLA_TRANSIT_URL ?? VALHALLA_BASE;
-
-export type ValhallaCosting = "pedestrian" | "bicycle" | "auto";
-
-const COSTING_MAP: Record = {
- walking: "pedestrian",
- cycling: "bicycle",
- driving: "auto",
-};
+import { getRoutingQueue, getRoutingQueueEvents, getRoutingTransitQueue, getRoutingTransitQueueEvents } from "./queue";
export interface IsochroneOpts {
lng: number;
lat: number;
travelMode: string;
contourMinutes: number[];
+ citySlug: string;
polygons?: boolean;
}
-export async function fetchIsochrone(opts: IsochroneOpts): Promise {
- const isTransit = opts.travelMode === "transit";
- const costing = isTransit ? "multimodal" : (COSTING_MAP[opts.travelMode] ?? "pedestrian");
- const body: Record = {
- locations: [{ lon: opts.lng, lat: opts.lat }],
- costing,
- contours: opts.contourMinutes.map((time) => ({ time })),
- polygons: opts.polygons ?? true,
- show_locations: false,
- };
- if (isTransit) {
- body.costing_options = { transit: { use_bus: 1.0, use_rail: 1.0, use_transfers: 1.0 } };
- body.date_time = { type: 1, value: nextTuesdayDeparture() };
- }
+const ISOCHRONE_TIMEOUT_MS = 35_000;
- const base = isTransit ? VALHALLA_TRANSIT_BASE : VALHALLA_BASE;
- const res = await fetch(`${base}/isochrone`, {
- method: "POST",
- headers: { "Content-Type": "application/json" },
- body: JSON.stringify(body),
- signal: AbortSignal.timeout(30_000),
+/**
+ * Dispatch an isochrone job to the appropriate valhalla routing queue and wait.
+ * Road modes (walking/cycling/driving) → routing queue (valhalla container).
+ * Transit → routing-transit queue (valhalla-transit container).
+ */
+export async function fetchIsochrone(opts: IsochroneOpts): Promise {
+ const { lng, lat, travelMode, contourMinutes, citySlug } = opts;
+ const isTransit = travelMode === "transit";
+ const departureDate = isTransit ? nextTuesdayDeparture() : null;
+
+ const queue = isTransit ? getRoutingTransitQueue() : getRoutingQueue();
+ const events = isTransit ? getRoutingTransitQueueEvents() : getRoutingQueueEvents();
+
+ const job = await queue.add("isochrone", {
+ type: "isochrone",
+ lng,
+ lat,
+ travelMode,
+ contourMinutes,
+ citySlug,
+ departureDate,
});
- if (!res.ok) {
- const text = await res.text();
- throw new Error(`Valhalla error ${res.status}: ${text}`);
- }
-
- return res.json();
-}
-
-export async function checkValhalla(): Promise {
- try {
- const res = await fetch(`${VALHALLA_BASE}/status`, {
- signal: AbortSignal.timeout(3000),
- });
- return res.ok;
- } catch {
- return false;
- }
+ return job.waitUntilFinished(events, ISOCHRONE_TIMEOUT_MS) as Promise;
}
diff --git a/docker-compose.yml b/docker-compose.yml
index bd30e43..2fa911e 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -31,55 +31,54 @@ services:
timeout: 5s
retries: 5
- # ─── Valhalla road worker (port 8002) ─────────────────────────────────────
- # Builds road-only tiles (no transit data) → cycling/walking/driving routing.
- # Without GTFS in its volume, valhalla_build_tiles produces clean road tiles
- # with no ghost transit edges, so bicycle routing works correctly.
+ # ─── Valhalla road tile-builder + road routing queue ──────────────────────
+ # Builds road-only per-city tile directories (walking/cycling/driving).
+ # Also serves on-demand routing requests from the web app + pipeline worker
+ # via the BullMQ 'routing' queue using @valhallajs/valhallajs Actor pool.
+ # No HTTP server — all routing goes through BullMQ.
valhalla:
build:
context: .
- target: valhalla-worker
+ dockerfile: Dockerfile.valhalla-worker
restart: unless-stopped
volumes:
- - osm_data:/data/osm:ro # PBF files downloaded by the main worker
- - valhalla_tiles:/data/valhalla # Road-only config and tiles
+ - osm_data:/data/osm:ro
+ - valhalla_tiles:/data/valhalla
environment:
REDIS_HOST: valkey
REDIS_PORT: "6379"
REDIS_PASSWORD: ${VALKEY_PASSWORD}
VALHALLA_QUEUE_NAME: valhalla
+ VALHALLA_TILES_BASE: /data/valhalla/tiles
+ VALHALLA_DATA_DIR: /data/valhalla
OSM_DATA_DIR: /data/osm
- VALHALLA_CONFIG: /data/valhalla/valhalla.json
- VALHALLA_TILES_DIR: /data/valhalla/valhalla_tiles
NODE_ENV: production
- ports:
- - "127.0.0.1:8002:8002" # Valhalla HTTP API (road)
depends_on:
valkey:
condition: service_healthy
- # ─── Valhalla transit worker (port 8002 internal) ─────────────────────────
- # Builds tiles with GTFS transit data → multimodal routing.
- # Separate volume from the road worker so transit ghost edges never affect
- # the road instance.
+ # ─── Valhalla transit tile-builder + transit routing queue ────────────────
+ # Builds transit per-city tile directories (GTFS multimodal routing).
+ # Also serves transit isochrone requests via the 'routing-transit' queue.
+ # No HTTP server — all routing goes through BullMQ.
valhalla-transit:
build:
context: .
- target: valhalla-worker
+ dockerfile: Dockerfile.valhalla-worker
restart: unless-stopped
volumes:
- - osm_data:/data/osm:ro # PBF files downloaded by the main worker
- - valhalla_tiles_transit:/data/valhalla # Transit config, tiles and GTFS data
+ - osm_data:/data/osm:ro
+ - valhalla_tiles_transit:/data/valhalla
environment:
REDIS_HOST: valkey
REDIS_PORT: "6379"
REDIS_PASSWORD: ${VALKEY_PASSWORD}
VALHALLA_QUEUE_NAME: valhalla-transit
+ VALHALLA_TILES_BASE: /data/valhalla/tiles
+ VALHALLA_DATA_DIR: /data/valhalla
+ VALHALLA_INCLUDE_TRANSIT: "true"
OSM_DATA_DIR: /data/osm
- VALHALLA_CONFIG: /data/valhalla/valhalla.json
- VALHALLA_TILES_DIR: /data/valhalla/valhalla_tiles
NODE_ENV: production
- # Optional: connect-info.net token for NDS-specific GTFS feed
CONNECT_INFO_TOKEN: ${CONNECT_INFO_TOKEN:-}
depends_on:
valkey:
@@ -99,7 +98,7 @@ services:
web:
build:
context: .
- target: web
+ dockerfile: Dockerfile
restart: unless-stopped
ports:
- "3000:3000"
@@ -108,8 +107,6 @@ services:
REDIS_HOST: valkey
REDIS_PORT: "6379"
REDIS_PASSWORD: ${VALKEY_PASSWORD}
- VALHALLA_URL: http://valhalla:8002
- VALHALLA_TRANSIT_URL: http://valhalla-transit:8002
ADMIN_PASSWORD_HASH: ${ADMIN_PASSWORD_HASH}
ADMIN_JWT_SECRET: ${ADMIN_JWT_SECRET}
NODE_ENV: production
@@ -120,10 +117,12 @@ services:
condition: service_healthy
# ─── BullMQ pipeline worker ───────────────────────────────────────────────
+ # Pure pipeline: download, extract, grid, scoring — no Valhalla knowledge.
+ # Routing requests are dispatched via BullMQ to the valhalla containers.
worker:
build:
context: .
- target: worker
+ dockerfile: Dockerfile.worker
restart: unless-stopped
environment:
DATABASE_URL: postgres://app:${POSTGRES_PASSWORD}@postgres:5432/fifteenmin
@@ -132,13 +131,10 @@ services:
REDIS_PASSWORD: ${VALKEY_PASSWORD}
OSM_DATA_DIR: /data/osm
LUA_SCRIPT: /app/infra/osm2pgsql.lua
- VALHALLA_URL: http://valhalla:8002
- VALHALLA_TRANSIT_URL: http://valhalla-transit:8002
NODE_ENV: production
- # Optional: enables NDS-specific GTFS source for cities in Niedersachsen
CONNECT_INFO_TOKEN: ${CONNECT_INFO_TOKEN:-}
volumes:
- - osm_data:/data/osm # Worker downloads PBF here
+ - osm_data:/data/osm
depends_on:
postgres:
condition: service_healthy
@@ -148,7 +144,7 @@ services:
volumes:
postgres_data:
valkey_data:
- osm_data: # Shared: worker writes, valhalla containers read
- valhalla_tiles: # Road-only tiles (no transit) — cycling works correctly here
- valhalla_tiles_transit: # Transit tiles (with GTFS) — multimodal routing
+ osm_data: # Shared: worker writes PBF files, valhalla containers read
+ valhalla_tiles: # Road-only per-city tiles + Actor pool data
+ valhalla_tiles_transit: # Transit per-city tiles + GTFS data + Actor pool data
pmtiles_data:
diff --git a/package-lock.json b/package-lock.json
index 3077cd1..20522df 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -1492,6 +1492,15 @@
"@types/node": "*"
}
},
+ "node_modules/@valhallajs/valhallajs": {
+ "version": "3.6.1",
+ "resolved": "https://registry.npmjs.org/@valhallajs/valhallajs/-/valhallajs-3.6.1.tgz",
+ "integrity": "sha512-od2CkY58XW2Frf2/6KTaqgd567XwGKkwFu4ga4YjYOAdu0u3aJIODEr6zlY6sSKW3qGylW1wwg3joZHidIbaiw==",
+ "license": "MIT",
+ "bin": {
+ "valhalla": "bin/valhalla-cli.js"
+ }
+ },
"node_modules/any-promise": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/any-promise/-/any-promise-1.3.0.tgz",
@@ -3579,6 +3588,7 @@
"version": "0.1.0",
"dependencies": {
"@transportationer/shared": "*",
+ "@valhallajs/valhallajs": "^3.6.1",
"bullmq": "^5.13.0",
"postgres": "^3.4.4",
"unzipper": "^0.12.3"
diff --git a/worker/package.json b/worker/package.json
index 2057604..65656f4 100644
--- a/worker/package.json
+++ b/worker/package.json
@@ -11,6 +11,7 @@
},
"dependencies": {
"@transportationer/shared": "*",
+ "@valhallajs/valhallajs": "^3.6.1",
"bullmq": "^5.13.0",
"postgres": "^3.4.4",
"unzipper": "^0.12.3"
diff --git a/worker/src/actor-pool.ts b/worker/src/actor-pool.ts
new file mode 100644
index 0000000..a8bc8ee
--- /dev/null
+++ b/worker/src/actor-pool.ts
@@ -0,0 +1,93 @@
+/**
+ * Per-city Valhalla Actor pool using @valhallajs/valhallajs native bindings.
+ *
+ * Each city gets its own Actor instance pointing to its per-city tile directory.
+ * Actors are lazy-loaded and invalidated when the .ready marker is updated
+ * (i.e. after a fresh tile build).
+ *
+ * Road tiles live under ROAD_TILES_BASE/{citySlug}/
+ * Transit tiles live under TRANSIT_TILES_BASE/{citySlug}/
+ *
+ * Both directories must contain a config.json generated by build-valhalla.ts.
+ */
+import { existsSync, statSync, readFileSync } from "fs";
+import type { Actor } from "@valhallajs/valhallajs";
+
+/** Where road tile directories live (inside the valhalla_tiles volume). */
+const ROAD_TILES_BASE = process.env.ROAD_TILES_BASE ?? "/data/valhalla/tiles";
+/** Where transit tile directories live. Set same default; overridden by VALHALLA_TILES_BASE env in each container. */
+const TRANSIT_TILES_BASE = process.env.TRANSIT_TILES_BASE ?? ROAD_TILES_BASE;
+
+let actorCtor: typeof Actor | null = null;
+
+async function getActorCtor(): Promise {
+ if (!actorCtor) {
+ const mod = await import("@valhallajs/valhallajs");
+ actorCtor = mod.Actor;
+ }
+ return actorCtor;
+}
+
+interface CachedEntry {
+ actor: Actor;
+ loadedAt: number;
+}
+
+const pool = new Map();
+
+export type ActorType = "road" | "transit";
+
+function tilesBase(type: ActorType): string {
+ return type === "road" ? ROAD_TILES_BASE : TRANSIT_TILES_BASE;
+}
+
+export function tileDir(type: ActorType, citySlug: string): string {
+ return `${tilesBase(type)}/${citySlug}`;
+}
+
+export function readyMarkerPath(type: ActorType, citySlug: string): string {
+ return `${tileDir(type, citySlug)}/.ready`;
+}
+
+/** Remove a cached Actor entry so the next call to getActor() reloads it. */
+export function invalidateActor(type: ActorType, citySlug: string): void {
+ const key = `${type}:${citySlug}`;
+ if (pool.delete(key)) {
+ console.log(`[actor-pool] Invalidated ${key}`);
+ }
+}
+
+/**
+ * Return (or lazily load) a Valhalla Actor for the given city and type.
+ * Throws if tiles are not yet built (.ready marker absent) or config is missing.
+ */
+export async function getActor(type: ActorType, citySlug: string): Promise {
+ const key = `${type}:${citySlug}`;
+ const marker = readyMarkerPath(type, citySlug);
+
+ if (!existsSync(marker)) {
+ throw new Error(`Valhalla tiles not ready for ${key} — run build-valhalla first (marker: ${marker})`);
+ }
+
+ const markerMtime = statSync(marker).mtimeMs;
+ const cached = pool.get(key);
+
+ if (cached && cached.loadedAt >= markerMtime) {
+ return cached.actor;
+ }
+
+ const dir = tileDir(type, citySlug);
+ const configPath = `${dir}/config.json`;
+ if (!existsSync(configPath)) {
+ throw new Error(`Valhalla config missing for ${key}: ${configPath}`);
+ }
+
+ console.log(`[actor-pool] Loading Actor for ${key} from ${configPath}…`);
+ const Ctor = await getActorCtor();
+ // Actor constructor takes the config JSON as a string (not a file path).
+ const configJson = readFileSync(configPath, "utf8");
+ const actor = new Ctor(configJson);
+ pool.set(key, { actor, loadedAt: Date.now() });
+ console.log(`[actor-pool] Actor for ${key} ready`);
+ return actor;
+}
diff --git a/worker/src/jobs/build-valhalla.ts b/worker/src/jobs/build-valhalla.ts
index 0ac69a0..869952e 100644
--- a/worker/src/jobs/build-valhalla.ts
+++ b/worker/src/jobs/build-valhalla.ts
@@ -1,8 +1,8 @@
import type { Job } from "bullmq";
import { execSync, spawn } from "child_process";
import { existsSync, mkdirSync, readFileSync, readdirSync, rmSync, statSync, unlinkSync, writeFileSync } from "fs";
-import * as path from "path";
import type { JobProgress } from "@transportationer/shared";
+import { invalidateActor } from "../actor-pool.js";
export type BuildValhallaData = {
type: "build-valhalla";
@@ -10,74 +10,48 @@ export type BuildValhallaData = {
citySlug?: string;
pbfPath?: string;
bbox?: [number, number, number, number];
- /** Slugs to drop from the global routing tile set before rebuilding */
+ /** Slugs to drop from the routing tile set before rebuilding. */
removeSlugs?: string[];
};
-const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm";
-const VALHALLA_CONFIG = process.env.VALHALLA_CONFIG ?? "/data/valhalla/valhalla.json";
-const VALHALLA_TILES_DIR = process.env.VALHALLA_TILES_DIR ?? "/data/valhalla/valhalla_tiles";
-const VALHALLA_DATA_DIR = "/data/valhalla";
-const GTFS_DATA_DIR = process.env.GTFS_DATA_DIR ?? "/data/valhalla/gtfs";
-const GTFS_FEED_DIR = `${GTFS_DATA_DIR}/feed`;
+const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm";
+const VALHALLA_DATA_DIR = process.env.VALHALLA_DATA_DIR ?? "/data/valhalla";
/**
- * Auxiliary databases downloaded by valhalla_build_tiles on first run.
- * Stored OUTSIDE VALHALLA_TILES_DIR so they survive crash-recovery tile
- * wipes and don't need to be re-downloaded on retries.
+ * Per-city tile directories live here: VALHALLA_TILES_BASE/{citySlug}/
+ * Each valhalla container mounts its own Docker volume at /data/valhalla,
+ * so both road and transit containers use the same env var with different data.
*/
+const VALHALLA_TILES_BASE = process.env.VALHALLA_TILES_BASE ?? `${VALHALLA_DATA_DIR}/tiles`;
+
+const GTFS_DATA_DIR = process.env.GTFS_DATA_DIR ?? `${VALHALLA_DATA_DIR}/gtfs`;
+
const TIMEZONE_SQLITE = `${VALHALLA_DATA_DIR}/timezone.sqlite`;
const ADMINS_SQLITE = `${VALHALLA_DATA_DIR}/admins.sqlite`;
-/**
- * Explicit mjolnir.transit_dir — used by all four transit-aware Valhalla
- * operations (ingest, convert, build_tiles, service). Pinned here to avoid
- * the Valhalla default (/data/valhalla/transit) and to persist transit tiles
- * between builds.
- *
- * IMPORTANT build order:
- * 1. valhalla_ingest_transit — GTFS → transit PBF staging files in transit_dir
- * 2. valhalla_convert_transit — PBF → transit graph tiles (.gph) in transit_dir
- * 3. valhalla_build_tiles — road graph + reads transit tiles from transit_dir
- * → creates road tiles WITH road-to-transit edges
- * → copies transit tiles into tile_dir/3/
- *
- * valhalla_build_tiles MUST run AFTER valhalla_convert_transit so it can find
- * the transit .gph tiles in transit_dir and embed road↔transit connections in
- * the road tiles. Running build_tiles before convert results in road tiles with
- * no transit connections (transit routing silently falls back to walking).
- *
- * valhalla_convert_transit does NOT require road tiles — it only reads the
- * transit PBF staging files and writes transit graph tiles. It can run on an
- * empty tile_dir without crashing.
- *
- * TRANSIT_CACHE_MARKER tracks whether ingest PBFs are current relative to the
- * GTFS source. valhalla_convert_transit always runs after ingest (or when the
- * ingest cache is fresh) so transit_dir has up-to-date .gph before build_tiles.
- */
-const TRANSIT_CACHE_DIR = `${VALHALLA_DATA_DIR}/transit_graph`;
-/** Written after a successful valhalla_ingest_transit; compared against GTFS source mtime. */
-const TRANSIT_CACHE_MARKER = `${TRANSIT_CACHE_DIR}/.ready`;
-/** Written by download-gtfs-de after each successful GTFS extraction. */
-const GTFS_SOURCE_MARKER = `${GTFS_FEED_DIR}/.source`;
+/** Per-city GTFS feed dir written by download-gtfs-de. */
+function cityGtfsFeedDir(citySlug: string): string {
+ return `${GTFS_DATA_DIR}/${citySlug}/feed`;
+}
/**
- * Buffer added to the city bbox when clipping the road PBF with osmium.
- * Transit stops within the city bbox may be in low-road-density areas (parks,
- * new developments, demand-responsive zones) where the nearest OSM road is
- * outside the exact bbox clip. Without coverage, valhalla_build_tiles crashes
- * with a memory corruption error ("double free" / "bad_array_new_length").
- * stop_times is already filtered to bbox-local stops so the buffer only adds
- * road coverage — it does NOT let Germany-wide transit stops into the graph.
- * 0.2° ≈ 18 km at 53 °N — covers roads for all plausibly in-city stops.
+ * Per-city transit staging directory.
+ * valhalla_ingest_transit writes PBF staging tiles here.
+ * valhalla_convert_transit converts them to transit graph tiles (.gph) here.
+ * valhalla_build_tiles reads from here when INCLUDE_TRANSIT=true.
*/
+function cityTransitCacheDir(citySlug: string): string {
+ return `${VALHALLA_DATA_DIR}/transit_graph/${citySlug}`;
+}
+
+/**
+ * Set VALHALLA_INCLUDE_TRANSIT=true in the transit container.
+ * The road container leaves it unset so tiles are clean (no ghost transit edges).
+ */
+const INCLUDE_TRANSIT = (process.env.VALHALLA_INCLUDE_TRANSIT ?? "").toLowerCase() === "true";
+
const ROAD_BBOX_BUFFER = 0.2;
-
-/**
- * Manifest file: maps citySlug → absolute path of its routing PBF.
- * Persists in the valhalla_tiles Docker volume across restarts.
- */
const ROUTING_MANIFEST = `${VALHALLA_DATA_DIR}/routing-sources.json`;
function readManifest(): Record {
@@ -104,35 +78,19 @@ function runProcess(cmd: string, args: string[]): Promise {
});
}
-/**
- * Build the IANA timezone SQLite database required by valhalla_ingest_transit.
- * Without it, ingest does not write the root index tile (0/000/000.pbf) and
- * valhalla_convert_transit crashes trying to load it.
- *
- * valhalla_build_timezones writes the SQLite database to stdout (no args),
- * so we capture stdout and write it to TIMEZONE_SQLITE.
- */
function buildTimezoneDb(): Promise {
return new Promise((resolve, reject) => {
- console.log("[build-valhalla] Running: valhalla_build_timezones (output → " + TIMEZONE_SQLITE + ")");
- const child = spawn("valhalla_build_timezones", [], {
- stdio: ["ignore", "pipe", "inherit"],
- });
+ console.log("[build-valhalla] Running: valhalla_build_timezones → " + TIMEZONE_SQLITE);
+ const child = spawn("valhalla_build_timezones", [], { stdio: ["ignore", "pipe", "inherit"] });
const chunks: Buffer[] = [];
child.stdout!.on("data", (chunk: Buffer) => chunks.push(chunk));
child.on("error", reject);
child.on("exit", (code) => {
- if (code !== 0) {
- reject(new Error(`valhalla_build_timezones exited with code ${code}`));
- return;
- }
+ if (code !== 0) { reject(new Error(`valhalla_build_timezones exited ${code}`)); return; }
const db = Buffer.concat(chunks);
- if (db.length < 1024) {
- reject(new Error(`valhalla_build_timezones output too small (${db.length} B) — likely failed silently`));
- return;
- }
+ if (db.length < 1024) { reject(new Error(`valhalla_build_timezones output too small`)); return; }
writeFileSync(TIMEZONE_SQLITE, db);
- console.log(`[build-valhalla] Timezone database written to ${TIMEZONE_SQLITE} (${(db.length / 1024 / 1024).toFixed(1)} MB)`);
+ console.log(`[build-valhalla] Timezone database written (${(db.length / 1024 / 1024).toFixed(1)} MB)`);
resolve();
});
});
@@ -140,16 +98,12 @@ function buildTimezoneDb(): Promise {
type JsonObject = Record;
-/** Deep-merge override into base. Objects are merged recursively; arrays and
- * scalars in override replace the corresponding base value entirely. */
function deepMerge(base: JsonObject, override: JsonObject): JsonObject {
const result: JsonObject = { ...base };
for (const [key, val] of Object.entries(override)) {
const baseVal = base[key];
- if (
- val !== null && typeof val === "object" && !Array.isArray(val) &&
- baseVal !== null && typeof baseVal === "object" && !Array.isArray(baseVal)
- ) {
+ if (val !== null && typeof val === "object" && !Array.isArray(val) &&
+ baseVal !== null && typeof baseVal === "object" && !Array.isArray(baseVal)) {
result[key] = deepMerge(baseVal as JsonObject, val as JsonObject);
} else {
result[key] = val;
@@ -158,291 +112,232 @@ function deepMerge(base: JsonObject, override: JsonObject): JsonObject {
return result;
}
-/**
- * Generate valhalla.json by starting from the canonical defaults produced by
- * valhalla_build_config, then overlaying only the deployment-specific settings.
- */
-function generateConfig(): void {
- mkdirSync(VALHALLA_TILES_DIR, { recursive: true });
- mkdirSync(TRANSIT_CACHE_DIR, { recursive: true });
-
- let base: JsonObject = {};
+function buildBase(): JsonObject {
try {
- const out = execSync("valhalla_build_config", {
- encoding: "utf8",
- maxBuffer: 10 * 1024 * 1024,
- });
- base = JSON.parse(out) as JsonObject;
- console.log("[build-valhalla] Loaded defaults from valhalla_build_config");
+ const out = execSync("valhalla_build_config", { encoding: "utf8", maxBuffer: 10 * 1024 * 1024 });
+ return JSON.parse(out) as JsonObject;
} catch (err) {
console.warn("[build-valhalla] valhalla_build_config failed, using empty base:", err);
+ return {};
}
-
- const overrides: JsonObject = {
- mjolnir: {
- tile_dir: VALHALLA_TILES_DIR,
- tile_extract: `${VALHALLA_TILES_DIR}.tar`,
- // Stored outside tile_dir so they survive crash-recovery wipes.
- timezone: TIMEZONE_SQLITE,
- admin: ADMINS_SQLITE,
- // All transit operations (ingest, convert, service) read/write here.
- transit_dir: TRANSIT_CACHE_DIR,
- // valhalla_ingest_transit expects a directory whose subdirectories are
- // individual GTFS feeds. feed/ (inside GTFS_DATA_DIR) is one such feed.
- transit_feeds_dir: GTFS_DATA_DIR,
- },
- additional_data: {
- elevation: "/data/elevation/",
- },
- httpd: {
- service: {
- listen: "tcp://*:8002",
- timeout_seconds: 26,
- },
- },
- service_limits: {
- isochrone: {
- // Transit scoring uses 5 contours [5,10,15,20,30]; Valhalla default is 4.
- max_contours: 5,
- },
- },
- };
-
- const config = deepMerge(base, overrides);
- writeFileSync(VALHALLA_CONFIG, JSON.stringify(config, null, 2));
- console.log(`[build-valhalla] Config written to ${VALHALLA_CONFIG}`);
}
/**
- * True when valhalla_ingest_transit has been run against the current GTFS data.
- * Compares the ingest marker mtime against the GTFS source marker mtime written
- * by download-gtfs-de after each successful extraction.
+ * Write a per-city config for transit ingest + convert operations.
+ * These tools only read/write from transit_dir/transit_feeds_dir;
+ * tile_dir is a dummy that won't receive any road tiles.
*/
-function isTransitIngestFresh(): boolean {
- if (!existsSync(TRANSIT_CACHE_MARKER) || !existsSync(GTFS_SOURCE_MARKER)) return false;
- // Verify at least one transit PBF tile exists — the marker can survive a
- // cache-dir wipe (crash recovery) and we'd otherwise skip ingest with an
- // empty transit dir, causing valhalla_convert_transit to fail silently.
- // Valhalla 3.x ingest writes level-3 tiles; check for the directory.
- const level3Dir = `${TRANSIT_CACHE_DIR}/3`;
- if (!existsSync(level3Dir)) return false;
- return statSync(TRANSIT_CACHE_MARKER).mtimeMs >= statSync(GTFS_SOURCE_MARKER).mtimeMs;
+function writeCityTransitIngestConfig(citySlug: string): string {
+ const configPath = `${VALHALLA_DATA_DIR}/transit-ingest-config-${citySlug}.json`;
+ const dummyTileDir = `${VALHALLA_DATA_DIR}/tiles/_transit_ingest_tmp_${citySlug}`;
+ mkdirSync(dummyTileDir, { recursive: true });
+
+ const config = deepMerge(buildBase(), {
+ mjolnir: {
+ tile_dir: dummyTileDir,
+ timezone: TIMEZONE_SQLITE,
+ admin: ADMINS_SQLITE,
+ transit_dir: cityTransitCacheDir(citySlug),
+ transit_feeds_dir: cityGtfsFeedDir(citySlug),
+ },
+ service_limits: { isochrone: { max_contours: 5 } },
+ } as JsonObject);
+
+ writeFileSync(configPath, JSON.stringify(config, null, 2));
+ return configPath;
}
-export async function handleBuildValhalla(
- job: Job,
- restartService: () => Promise,
-): Promise {
+/**
+ * Write a per-city config for valhalla_build_tiles.
+ * Road builds omit transit_dir so no transit edges are embedded.
+ * Transit builds include transit_dir so road↔transit connections are added.
+ */
+function writeCityConfig(citySlug: string, cityTileDir: string): string {
+ mkdirSync(cityTileDir, { recursive: true });
+
+ const config = deepMerge(buildBase(), {
+ mjolnir: {
+ tile_dir: cityTileDir,
+ tile_extract: `${cityTileDir}.tar`,
+ timezone: TIMEZONE_SQLITE,
+ admin: ADMINS_SQLITE,
+ ...(INCLUDE_TRANSIT ? {
+ transit_dir: cityTransitCacheDir(citySlug),
+ transit_feeds_dir: cityGtfsFeedDir(citySlug),
+ } : {}),
+ },
+ additional_data: { elevation: "/data/elevation/" },
+ service_limits: { isochrone: { max_contours: 5 } },
+ } as JsonObject);
+
+ const configPath = `${cityTileDir}/config.json`;
+ writeFileSync(configPath, JSON.stringify(config, null, 2));
+ return configPath;
+}
+
+function isTransitIngestFresh(citySlug: string): boolean {
+ const transitCacheDir = cityTransitCacheDir(citySlug);
+ const transitCacheMarker = `${transitCacheDir}/.ready`;
+ const gtfsCityMarker = `${cityGtfsFeedDir(citySlug)}/.source`;
+ if (!existsSync(transitCacheMarker) || !existsSync(gtfsCityMarker)) return false;
+ const level3Dir = `${transitCacheDir}/3`;
+ if (!existsSync(level3Dir)) return false;
+ return statSync(transitCacheMarker).mtimeMs >= statSync(gtfsCityMarker).mtimeMs;
+}
+
+/** Build per-city tiles. Invalidates Actor pool entry before and writes .ready after. */
+async function buildCityTiles(citySlug: string, pbfPath: string): Promise {
+ const cityTileDir = `${VALHALLA_TILES_BASE}/${citySlug}`;
+ const readyMarker = `${cityTileDir}/.ready`;
+
+ // Signal Actor pool: tiles are being rebuilt.
+ if (existsSync(readyMarker)) unlinkSync(readyMarker);
+ invalidateActor(INCLUDE_TRANSIT ? "transit" : "road", citySlug);
+
+ // ── Transit ingest + convert (transit container only) ─────────────────────
+ if (INCLUDE_TRANSIT) {
+ const feedDir = cityGtfsFeedDir(citySlug);
+ const gtfsReady = existsSync(feedDir) && readdirSync(feedDir).some((f) => f.endsWith(".txt"));
+
+ if (gtfsReady) {
+ if (!existsSync(TIMEZONE_SQLITE)) {
+ console.log("[build-valhalla] Building timezone database…");
+ try { await buildTimezoneDb(); } catch (err) { console.warn("[build-valhalla] valhalla_build_timezones failed:", err); }
+ }
+
+ const transitCacheDir = cityTransitCacheDir(citySlug);
+ const transitCacheMarker = `${transitCacheDir}/.ready`;
+
+ if (!isTransitIngestFresh(citySlug) && existsSync(TIMEZONE_SQLITE)) {
+ console.log(`[build-valhalla] Ingesting GTFS transit feeds for ${citySlug}…`);
+ try {
+ rmSync(transitCacheDir, { recursive: true, force: true });
+ mkdirSync(transitCacheDir, { recursive: true });
+ const cfg = writeCityTransitIngestConfig(citySlug);
+ await runProcess("valhalla_ingest_transit", ["-c", cfg]);
+ writeFileSync(transitCacheMarker, new Date().toISOString());
+ console.log(`[build-valhalla] valhalla_ingest_transit completed for ${citySlug}`);
+ } catch (err) {
+ console.warn(`[build-valhalla] valhalla_ingest_transit failed for ${citySlug}:`, err);
+ rmSync(transitCacheDir, { recursive: true, force: true });
+ mkdirSync(transitCacheDir, { recursive: true });
+ }
+ } else if (isTransitIngestFresh(citySlug)) {
+ console.log(`[build-valhalla] Transit ingest cache fresh for ${citySlug} — skipping re-ingest`);
+ }
+
+ console.log(`[build-valhalla] Converting transit staging tiles for ${citySlug}…`);
+ try {
+ const cfg = writeCityTransitIngestConfig(citySlug);
+ await runProcess("valhalla_convert_transit", ["-c", cfg]);
+ console.log(`[build-valhalla] valhalla_convert_transit completed for ${citySlug}`);
+ } catch (err) {
+ console.warn(`[build-valhalla] valhalla_convert_transit failed for ${citySlug}:`, err);
+ }
+ } else {
+ console.log(`[build-valhalla] No GTFS feed found for ${citySlug} — skipping transit ingest/convert`);
+ }
+ }
+
+ const configPath = writeCityConfig(citySlug, cityTileDir);
+ await runProcess("valhalla_build_tiles", ["-c", configPath, pbfPath]);
+
+ writeFileSync(readyMarker, new Date().toISOString());
+ console.log(`[build-valhalla] Tiles ready for ${citySlug} → ${cityTileDir}`);
+}
+
+export async function handleBuildValhalla(job: Job): Promise {
const { citySlug, pbfPath, bbox, removeSlugs = [] } = job.data;
- // Always regenerate config to ensure it's valid JSON (not stale/corrupted).
- await job.updateProgress({
- stage: "Building routing graph",
- pct: 2,
- message: "Writing Valhalla configuration…",
- } satisfies JobProgress);
- generateConfig();
+ mkdirSync(VALHALLA_TILES_BASE, { recursive: true });
- // ── Step 1: update the routing manifest ──────────────────────────────────
+ // ── Step 1: manifest + removals ───────────────────────────────────────────
const manifest = readManifest();
for (const slug of removeSlugs) {
const clippedPbf = `${VALHALLA_DATA_DIR}/${slug}-routing.osm.pbf`;
- if (existsSync(clippedPbf)) {
- unlinkSync(clippedPbf);
- console.log(`[build-valhalla] Removed clipped PBF for ${slug}`);
- }
+ if (existsSync(clippedPbf)) { unlinkSync(clippedPbf); console.log(`[build-valhalla] Removed clipped PBF for ${slug}`); }
+ const cityTileDir = `${VALHALLA_TILES_BASE}/${slug}`;
+ if (existsSync(cityTileDir)) { rmSync(cityTileDir, { recursive: true, force: true }); console.log(`[build-valhalla] Removed tile dir for ${slug}`); }
+ invalidateActor(INCLUDE_TRANSIT ? "transit" : "road", slug);
delete manifest[slug];
}
if (citySlug && pbfPath) {
- await job.updateProgress({
- stage: "Building routing graph",
- pct: 5,
- message: bbox
- ? `Clipping PBF to city bbox (may expand for transit coverage)…`
- : `Registering full PBF for ${citySlug}…`,
+ await job.updateProgress({ stage: "Building routing graph", pct: 5,
+ message: bbox ? `Clipping PBF to city bbox…` : `Registering full PBF for ${citySlug}…`,
} satisfies JobProgress);
let routingPbf: string;
-
if (bbox) {
const clippedPbf = `${VALHALLA_DATA_DIR}/${citySlug}-routing.osm.pbf`;
-
if (!existsSync(pbfPath)) throw new Error(`PBF file not found: ${pbfPath}`);
+ let eb: [number, number, number, number] = [bbox[0] - ROAD_BBOX_BUFFER, bbox[1] - ROAD_BBOX_BUFFER, bbox[2] + ROAD_BBOX_BUFFER, bbox[3] + ROAD_BBOX_BUFFER];
- const extractBbox: [number, number, number, number] = [
- bbox[0] - ROAD_BBOX_BUFFER,
- bbox[1] - ROAD_BBOX_BUFFER,
- bbox[2] + ROAD_BBOX_BUFFER,
- bbox[3] + ROAD_BBOX_BUFFER,
- ];
-
- console.log(
- `[build-valhalla] Road extract bbox: city [${bbox.map((v) => v.toFixed(3)).join(", ")}]` +
- ` + ${ROAD_BBOX_BUFFER}° buffer → [${extractBbox.map((v) => v.toFixed(3)).join(", ")}]`,
- );
-
- await runProcess("osmium", [
- "extract",
- `--bbox=${extractBbox[0]},${extractBbox[1]},${extractBbox[2]},${extractBbox[3]}`,
- pbfPath,
- "-o", clippedPbf,
- "--overwrite",
- ]);
- routingPbf = clippedPbf;
- } else {
- if (existsSync(pbfPath)) {
- routingPbf = pbfPath;
- } else {
- const { readdirSync } = await import("fs");
- const found = readdirSync(OSM_DATA_DIR)
- .filter((f) => f.endsWith("-latest.osm.pbf"))
- .map((f) => `${OSM_DATA_DIR}/${f}`);
- if (found.length === 0) throw new Error(`No PBF files found in ${OSM_DATA_DIR}`);
- routingPbf = found[0];
+ // Transit container: expand road clip to cover transit stop locations.
+ // Stops outside road tile coverage cause valhalla_build_tiles to crash
+ // with "double free or corruption (fasttop)" when it tries to embed
+ // road↔transit connection edges for stops with no nearby road data.
+ // The bbox of seeding stops is written by download-gtfs-de per city.
+ if (INCLUDE_TRANSIT && citySlug) {
+ const stopsBboxPath = `${cityGtfsFeedDir(citySlug)}/.stops_bbox`;
+ if (existsSync(stopsBboxPath)) {
+ try {
+ const [sMinLng, sMinLat, sMaxLng, sMaxLat] = JSON.parse(readFileSync(stopsBboxPath, "utf8")) as [number, number, number, number];
+ const before = eb.map((v) => v.toFixed(3)).join(", ");
+ eb = [Math.min(eb[0], sMinLng), Math.min(eb[1], sMinLat), Math.max(eb[2], sMaxLng), Math.max(eb[3], sMaxLat)];
+ console.log(`[build-valhalla] Transit stops bbox expansion: [${before}] → [${eb.map((v) => v.toFixed(3)).join(", ")}]`);
+ } catch (e) {
+ console.warn("[build-valhalla] Could not read .stops_bbox — using default buffer:", e);
+ }
+ } else {
+ console.warn("[build-valhalla] .stops_bbox not found — GTFS may not be downloaded yet, or stops are outside bbox");
+ }
}
- }
+ await runProcess("osmium", ["extract", `--bbox=${eb[0]},${eb[1]},${eb[2]},${eb[3]}`, pbfPath, "-o", clippedPbf, "--overwrite"]);
+ routingPbf = clippedPbf;
+ } else if (existsSync(pbfPath)) {
+ routingPbf = pbfPath;
+ } else {
+ const found = readdirSync(OSM_DATA_DIR).filter((f) => f.endsWith("-latest.osm.pbf")).map((f) => `${OSM_DATA_DIR}/${f}`);
+ if (found.length === 0) throw new Error(`No PBF files found in ${OSM_DATA_DIR}`);
+ routingPbf = found[0];
+ }
manifest[citySlug] = routingPbf;
}
writeManifest(manifest);
- // ── Step 2: check for cities to build ────────────────────────────────────
+ // ── Step 2: cities to build ───────────────────────────────────────────────
- const allPbfs = Object.values(manifest).filter(existsSync);
- const allSlugs = Object.keys(manifest);
+ const citiesForBuild: { slug: string; pbf: string }[] = [];
+ if (citySlug && manifest[citySlug] && existsSync(manifest[citySlug])) {
+ citiesForBuild.push({ slug: citySlug, pbf: manifest[citySlug] });
+ } else if (!citySlug) {
+ for (const [slug, pbf] of Object.entries(manifest)) {
+ if (existsSync(pbf)) citiesForBuild.push({ slug, pbf });
+ }
+ }
- if (allPbfs.length === 0) {
- console.log("[build-valhalla] Manifest is empty — no cities to build routing tiles for.");
- await job.updateProgress({
- stage: "Building routing graph",
- pct: 100,
- message: "No cities in manifest, skipping tile build.",
- } satisfies JobProgress);
+ if (citiesForBuild.length === 0) {
+ await job.updateProgress({ stage: "Building routing graph", pct: 100, message: "No cities in manifest, skipping." } satisfies JobProgress);
return;
}
- // ── Step 3: transit ingest + convert ─────────────────────────────────────
- //
- // Build order: ingest → convert → road tiles.
- // valhalla_build_tiles MUST run after valhalla_convert_transit so it finds
- // transit .gph tiles in transit_dir and embeds road↔transit connection edges
- // in the road tiles. Without convert running first, road tiles have no transit
- // connections and multimodal routing silently falls back to walking.
- // valhalla_convert_transit does NOT need road tiles — it only reads the GTFS
- // staging PBFs and writes the transit graph tiles.
-
- const gtfsReady =
- existsSync(GTFS_FEED_DIR) &&
- readdirSync(GTFS_FEED_DIR).some((f) => f.endsWith(".txt"));
-
- let ingestPbfsAvailable = false;
-
- if (gtfsReady) {
- // 3a: timezone database — one-time setup, persists in VALHALLA_DATA_DIR.
- // valhalla_ingest_transit needs it to assign timezone info to stops;
- // without it the root index tile (0/000/000.pbf) is not written and
- // valhalla_convert_transit crashes trying to load it.
- if (!existsSync(TIMEZONE_SQLITE)) {
- await job.updateProgress({
- stage: "Building routing graph",
- pct: 10,
- message: "Building timezone database (one-time setup)…",
- } satisfies JobProgress);
- try {
- await buildTimezoneDb();
- } catch (err) {
- console.warn("[build-valhalla] valhalla_build_timezones failed — skipping transit:", err);
- }
- }
-
- // 3b: ingest (only when GTFS changed, and only when timezone db is ready)
- ingestPbfsAvailable = isTransitIngestFresh();
-
- if (!ingestPbfsAvailable && existsSync(TIMEZONE_SQLITE)) {
- await job.updateProgress({
- stage: "Building routing graph",
- pct: 12,
- message: "Ingesting GTFS transit feeds…",
- } satisfies JobProgress);
- try {
- // Wipe stale/partial PBF tiles before ingesting.
- rmSync(TRANSIT_CACHE_DIR, { recursive: true, force: true });
- mkdirSync(TRANSIT_CACHE_DIR, { recursive: true });
-
- await runProcess("valhalla_ingest_transit", ["-c", VALHALLA_CONFIG]);
- writeFileSync(TRANSIT_CACHE_MARKER, new Date().toISOString());
- ingestPbfsAvailable = true;
- console.log("[build-valhalla] valhalla_ingest_transit completed");
- } catch (err) {
- console.warn("[build-valhalla] valhalla_ingest_transit failed (road routing unaffected):", err);
- // Wipe partial output so convert doesn't try to read corrupt PBFs.
- rmSync(TRANSIT_CACHE_DIR, { recursive: true, force: true });
- mkdirSync(TRANSIT_CACHE_DIR, { recursive: true });
- }
- } else if (ingestPbfsAvailable) {
- await job.updateProgress({
- stage: "Building routing graph",
- pct: 12,
- message: "Transit ingest cache is fresh — skipping re-ingest",
- } satisfies JobProgress);
- } else {
- console.log("[build-valhalla] timezone.sqlite unavailable — skipping transit ingest");
- }
-
- // 3c: convert transit PBF staging files → transit graph tiles (.gph)
- // Runs even when ingest was skipped (cache fresh) so transit_dir always
- // has up-to-date .gph tiles before valhalla_build_tiles reads them.
- if (ingestPbfsAvailable) {
- await job.updateProgress({
- stage: "Building routing graph",
- pct: 15,
- message: "Converting transit staging tiles to graph tiles…",
- } satisfies JobProgress);
- try {
- await runProcess("valhalla_convert_transit", ["-c", VALHALLA_CONFIG]);
- console.log("[build-valhalla] valhalla_convert_transit completed");
- } catch (err) {
- console.warn("[build-valhalla] valhalla_convert_transit failed (road routing unaffected):", err);
- }
- }
- } else {
- console.log("[build-valhalla] No GTFS feed found — skipping transit ingest/convert");
+ for (let i = 0; i < citiesForBuild.length; i++) {
+ const { slug, pbf } = citiesForBuild[i];
+ const pct = Math.round(20 + (i / citiesForBuild.length) * 75);
+ await job.updateProgress({
+ stage: "Building routing graph", pct,
+ message: `Building ${INCLUDE_TRANSIT ? "transit" : "road"} tiles for ${slug} (${i + 1}/${citiesForBuild.length})…`,
+ } satisfies JobProgress);
+ await buildCityTiles(slug, pbf);
}
- // ── Step 4: build road tiles ──────────────────────────────────────────────
- //
- // Runs AFTER valhalla_convert_transit so transit .gph tiles are present in
- // transit_dir. valhalla_build_tiles reads them, embeds road↔transit connection
- // edges in the road tiles, and copies transit tiles into tile_dir/3/.
- // Without transit tiles present at this step, road tiles have no transit
- // connections and multimodal routing silently falls back to walking.
-
await job.updateProgress({
- stage: "Building routing graph",
- pct: 20,
- message: `Building road routing tiles for: ${allSlugs.join(", ")}`,
- } satisfies JobProgress);
-
- await runProcess("valhalla_build_tiles", ["-c", VALHALLA_CONFIG, ...allPbfs]);
-
- console.log("[build-valhalla] Road tiles built");
-
- // ── Step 5: restart Valhalla service ─────────────────────────────────────
-
- await job.updateProgress({
- stage: "Building routing graph",
- pct: 95,
- message: "Tiles built — restarting Valhalla service…",
- } satisfies JobProgress);
- await restartService();
-
- await job.updateProgress({
- stage: "Building routing graph",
- pct: 100,
- message: `Routing graph ready — covers: ${allSlugs.join(", ")}`,
+ stage: "Building routing graph", pct: 100,
+ message: `Routing graph ready — covers: ${citiesForBuild.map((c) => c.slug).join(", ")}`,
} satisfies JobProgress);
}
diff --git a/worker/src/jobs/compute-routing.ts b/worker/src/jobs/compute-routing.ts
index 5ddc903..4f8c2c0 100644
--- a/worker/src/jobs/compute-routing.ts
+++ b/worker/src/jobs/compute-routing.ts
@@ -1,7 +1,7 @@
import type { Job } from "bullmq";
import { getSql } from "../db.js";
import type { JobProgress } from "@transportationer/shared";
-import { fetchMatrix } from "../valhalla.js";
+import { dispatchMatrix } from "../routing-client.js";
export type ComputeRoutingData = {
type: "compute-routing";
@@ -120,10 +120,10 @@ export async function handleComputeRouting(job: Job): Promis
const sources = batch.map((gp) => ({ lat: gp.lat, lng: gp.lng }));
let matrix: (number | null)[][];
try {
- matrix = await fetchMatrix(sources, targets, mode);
+ matrix = await dispatchMatrix(sources, targets, mode, citySlug);
} catch (err) {
console.error(
- `[compute-routing] Valhalla failed (${mode}/${category}, batch ${batchesDone}):`,
+ `[compute-routing] Matrix dispatch failed (${mode}/${category}, batch ${batchesDone}):`,
(err as Error).message,
);
return;
diff --git a/worker/src/jobs/compute-scores.ts b/worker/src/jobs/compute-scores.ts
index ddce4f8..598a70a 100644
--- a/worker/src/jobs/compute-scores.ts
+++ b/worker/src/jobs/compute-scores.ts
@@ -220,12 +220,12 @@ export async function handleComputeScores(
SELECT grid_point_id, category, subcategory, travel_mode, travel_time_s FROM transit_walk_subcat
),
road_nearest AS (
- -- Nearest POI per (grid_point, category, mode) by distance
+ -- Nearest POI per (grid_point, category, mode) by travel time
SELECT DISTINCT ON (grid_point_id, category, travel_mode)
grid_point_id, category, travel_mode, nearest_poi_id, distance_m, travel_time_s
FROM base
WHERE nearest_poi_id IS NOT NULL
- ORDER BY grid_point_id, category, travel_mode, distance_m
+ ORDER BY grid_point_id, category, travel_mode, travel_time_s ASC NULLS LAST
),
cyclist_nearest AS (
SELECT DISTINCT ON (grid_point_id, category)
@@ -235,7 +235,7 @@ export async function handleComputeScores(
FROM base
WHERE travel_mode IN ('walking', 'cycling', 'transit')
AND nearest_poi_id IS NOT NULL
- ORDER BY grid_point_id, category, distance_m
+ ORDER BY grid_point_id, category, travel_time_s ASC NULLS LAST
),
cycling_walk_nearest AS (
SELECT DISTINCT ON (grid_point_id, category)
@@ -245,7 +245,7 @@ export async function handleComputeScores(
FROM base
WHERE travel_mode IN ('walking', 'cycling')
AND nearest_poi_id IS NOT NULL
- ORDER BY grid_point_id, category, distance_m
+ ORDER BY grid_point_id, category, travel_time_s ASC NULLS LAST
),
transit_walk_nearest AS (
SELECT DISTINCT ON (grid_point_id, category)
@@ -255,7 +255,7 @@ export async function handleComputeScores(
FROM base
WHERE travel_mode IN ('walking', 'transit')
AND nearest_poi_id IS NOT NULL
- ORDER BY grid_point_id, category, distance_m
+ ORDER BY grid_point_id, category, travel_time_s ASC NULLS LAST
),
all_nearest AS (
SELECT * FROM road_nearest
@@ -279,9 +279,9 @@ export async function handleComputeScores(
),
scores AS (
-- Complement-product score per (grid_point, category, mode, threshold, profile).
- -- sigmoid(t) = 1 / (1 + exp((t − T) / (T/6))) where T = threshold in seconds.
- -- score = 1 − ∏(1 − w·sigmoid) computed via EXP(SUM(LN(complement))).
- -- NULL travel_time_s → sigmoid = 0 → complement = 1 → LN(1) = 0 (no penalty).
+ -- proximity(t) = exp(−t / T) where T = threshold in seconds.
+ -- score = 1 − ∏(1 − w·proximity) computed via EXP(SUM(LN(complement))).
+ -- NULL travel_time_s → proximity = 0 → complement = 1 → LN(1) = 0 (no penalty).
SELECT
s.grid_point_id,
s.category,
@@ -293,7 +293,7 @@ export async function handleComputeScores(
1.0 - COALESCE(pw.weight, ${DEFAULT_SUBCATEGORY_WEIGHT}::float8)
* CASE
WHEN s.travel_time_s IS NULL THEN 0.0
- ELSE EXP(-3.0 * s.travel_time_s / (t.threshold_min * 60.0))
+ ELSE EXP(-1.0 * s.travel_time_s / (t.threshold_min * 60.0))
END,
1e-10
))
diff --git a/worker/src/jobs/compute-transit.ts b/worker/src/jobs/compute-transit.ts
index 7771500..250b5a6 100644
--- a/worker/src/jobs/compute-transit.ts
+++ b/worker/src/jobs/compute-transit.ts
@@ -20,10 +20,12 @@
*/
import type { Job } from "bullmq";
import { getSql } from "../db.js";
-import { fetchTransitIsochrone, parseTransitContours, TRANSIT_CONTOUR_MINUTES } from "../valhalla.js";
+import { dispatchTransitIsochrone, parseTransitContours } from "../routing-client.js";
import type { JobProgress } from "@transportationer/shared";
import { CATEGORY_IDS, nextTuesdayDeparture } from "@transportationer/shared";
+const TRANSIT_CONTOUR_MINUTES = [5, 10, 15, 20, 30] as const;
+
export type ComputeTransitData = {
type: "compute-transit";
citySlug: string;
@@ -46,11 +48,12 @@ async function asyncPool(
await Promise.all(Array.from({ length: Math.min(concurrency, items.length) }, worker));
}
-/** Check isochrone cache then call Valhalla, writing the result back to cache. */
+/** Check DB isochrone cache, then dispatch to routing-transit queue on miss. */
async function getTransitIsochrone(
sql: ReturnType,
gp: { lat: number; lng: number },
departureDate: string,
+ citySlug: string,
) {
type CacheRow = { result: object };
const cached = await Promise.resolve(sql`
@@ -70,7 +73,8 @@ async function getTransitIsochrone(
return parseTransitContours(cached[0].result);
}
- const contours = await fetchTransitIsochrone({ lat: gp.lat, lng: gp.lng }, departureDate);
+ // Dispatch to valhalla-transit routing queue and wait for result.
+ const contours = await dispatchTransitIsochrone(gp, departureDate, citySlug);
if (contours) {
const geojson = {
@@ -145,7 +149,7 @@ export async function handleComputeTransit(job: Job): Promis
let withTransit = 0;
await asyncPool(BATCH_CONCURRENCY, gridPoints, async (gp) => {
- const contours = await getTransitIsochrone(sql, gp, departureDate);
+ const contours = await getTransitIsochrone(sql, gp, departureDate, citySlug);
processed++;
if (!contours || contours.length === 0) {
diff --git a/worker/src/jobs/download-gtfs-de.ts b/worker/src/jobs/download-gtfs-de.ts
index 09ae1c9..aebf36b 100644
--- a/worker/src/jobs/download-gtfs-de.ts
+++ b/worker/src/jobs/download-gtfs-de.ts
@@ -1,19 +1,14 @@
/**
- * Download and extract a GTFS feed ZIP so Valhalla can build transit tiles.
+ * Download and filter a GTFS feed per city so Valhalla can build transit tiles.
*
- * The feed is saved to GTFS_DATA_DIR (default /data/valhalla/gtfs) inside the
- * valhalla container, which owns the valhalla_tiles Docker volume.
+ * The raw (unfiltered) feed is downloaded once and cached in GTFS_DATA_DIR/raw/.
+ * Subsequent calls for other cities re-use the raw cache without re-downloading.
*
- * After extraction the feed is clipped to the bounding boxes of each known city
- * (plus a small buffer) so that valhalla_ingest_transit only processes stops and
- * trips that are relevant — reducing ingest time from hours to minutes for
- * country-wide feeds like gtfs.de.
- *
- * After this job completes, the next build-valhalla run will automatically
- * call valhalla_ingest_transit and produce transit-capable routing tiles.
+ * A per-city filtered feed is written to GTFS_DATA_DIR/{citySlug}/feed/ and
+ * clipped to the city's bounding box. This directory is the transit_feeds_dir
+ * for that city's valhalla_ingest_transit + valhalla_build_tiles run.
*
* Source: https://download.gtfs.de/germany/nv_free/latest.zip
- * Covers all German ÖPNV (local public transport), updated regularly.
*/
import type { Job } from "bullmq";
import {
@@ -26,6 +21,7 @@ import {
rmSync,
readFileSync,
writeFileSync,
+ copyFileSync,
} from "fs";
import { mkdir } from "fs/promises";
import { pipeline } from "stream/promises";
@@ -38,197 +34,152 @@ import type { JobProgress } from "@transportationer/shared";
export type DownloadGtfsDeData = {
type: "download-gtfs-de";
url: string;
- /** Re-download even if data already exists (default: false) */
+ citySlug: string;
+ /** City bbox [minLng, minLat, maxLng, maxLat] already including buffer. */
+ bbox: [number, number, number, number];
+ /** Re-download even if raw data already exists (default: false) */
force?: boolean;
- /**
- * Per-city bounding boxes [minLng, minLat, maxLng, maxLat] used to clip the
- * feed after extraction. Each bbox should already include a buffer. A stop is
- * kept when it falls inside ANY of the boxes. When absent the full feed is kept.
- */
- bboxes?: [number, number, number, number][];
};
const GTFS_DATA_DIR = process.env.GTFS_DATA_DIR ?? "/data/valhalla/gtfs";
+
+/** Global raw (unfiltered) feed — downloaded once, shared across all cities. */
+const GTFS_RAW_DIR = `${GTFS_DATA_DIR}/raw`;
const GTFS_ZIP_PATH = `${GTFS_DATA_DIR}/feed.zip`;
-const GTFS_FEED_DIR = `${GTFS_DATA_DIR}/feed`;
-/** Records which source/bboxes/algorithm last populated GTFS_FEED_DIR. JSON format. */
-const SOURCE_MARKER = `${GTFS_FEED_DIR}/.source`;
+/** Records which source URL populated GTFS_RAW_DIR. */
+const RAW_MARKER = `${GTFS_RAW_DIR}/.source`;
/**
* Bump this when the filtering algorithm changes in a way that produces
- * different output from the same source + bboxes. This forces a re-filter
- * on the existing extracted data without re-downloading.
+ * different output from the same source + bbox. Forces a re-filter on the
+ * existing raw data without re-downloading.
*/
const FILTER_VERSION = 2;
-// ─── Source marker helpers ────────────────────────────────────────────────────
+interface RawMarker { source: string }
+interface CityMarker { source: string; bbox: [number, number, number, number]; filterVersion: number }
-interface SourceMarker {
- source: string;
- bboxes?: [number, number, number, number][];
- filterVersion?: number;
+function readRawMarker(): RawMarker | null {
+ if (!existsSync(RAW_MARKER)) return null;
+ try { return JSON.parse(readFileSync(RAW_MARKER, "utf8")) as RawMarker; } catch { return null; }
}
-function readSourceMarker(): SourceMarker | null {
- if (!existsSync(SOURCE_MARKER)) return null;
- const content = readFileSync(SOURCE_MARKER, "utf8").trim();
- try {
- return JSON.parse(content) as SourceMarker;
- } catch {
- // Legacy format: plain string written by older versions
- return { source: content };
- }
+function cityFeedDir(citySlug: string): string { return `${GTFS_DATA_DIR}/${citySlug}/feed`; }
+function cityMarkerPath(citySlug: string): string { return `${cityFeedDir(citySlug)}/.source`; }
+
+function readCityMarker(citySlug: string): CityMarker | null {
+ const p = cityMarkerPath(citySlug);
+ if (!existsSync(p)) return null;
+ try { return JSON.parse(readFileSync(p, "utf8")) as CityMarker; } catch { return null; }
}
-function writeSourceMarker(source: string, bboxes?: [number, number, number, number][]): void {
- writeFileSync(SOURCE_MARKER, JSON.stringify({ source, bboxes, filterVersion: FILTER_VERSION }));
+function writeCityMarker(citySlug: string, source: string, bbox: [number, number, number, number]): void {
+ writeFileSync(cityMarkerPath(citySlug), JSON.stringify({ source, bbox, filterVersion: FILTER_VERSION }));
}
-/** True when `outer` fully contains `inner`. */
-function bboxContains(
- outer: [number, number, number, number],
- inner: [number, number, number, number],
-): boolean {
- return outer[0] <= inner[0] && outer[1] <= inner[1] && outer[2] >= inner[2] && outer[3] >= inner[3];
-}
-
-/**
- * True when every bbox in `requested` is covered by at least one bbox in `existing`.
- * If `existing` is empty/absent the data was unfiltered, which covers everything.
- */
-function allBboxesCovered(
- existing: [number, number, number, number][] | undefined,
- requested: [number, number, number, number][],
-): boolean {
- if (!existing || existing.length === 0) return true; // unfiltered → covers all
- return requested.every((req) => existing.some((ex) => bboxContains(ex, req)));
+function bboxEqual(a: [number,number,number,number], b: [number,number,number,number]): boolean {
+ return a[0] === b[0] && a[1] === b[1] && a[2] === b[2] && a[3] === b[3];
}
// ─── GTFS bbox filter ─────────────────────────────────────────────────────────
-/**
- * Clip an extracted GTFS feed in-place to the union of the given bboxes.
- *
- * Algorithm:
- * 1. Filter stops.txt: keep stops whose lat/lon falls inside ANY bbox.
- * 2. Pass 1 over stop_times.txt (streaming): collect trip_ids with ≥1 stop
- * inside a bbox.
- * 3. Pass 2 over stop_times.txt (streaming): write filtered rows to a temp
- * file, then replace the original.
- * 4. Filter trips.txt → collect validRouteIds / validServiceIds / validShapeIds.
- * 5. Filter routes.txt, calendar.txt, calendar_dates.txt.
- * 6. Stream-filter shapes.txt (can be large).
- */
-async function filterGtfsByBboxes(
- feedDir: string,
- bboxes: [number, number, number, number][],
+function splitCsv(line: string): string[] {
+ if (!line.includes('"')) return line.split(",");
+ const result: string[] = [];
+ let current = "";
+ let inQuotes = false;
+ for (let i = 0; i < line.length; i++) {
+ const ch = line[i];
+ if (ch === '"') {
+ if (inQuotes && line[i + 1] === '"') { current += '"'; i++; }
+ else inQuotes = !inQuotes;
+ } else if (ch === "," && !inQuotes) {
+ result.push(current); current = "";
+ } else {
+ current += ch;
+ }
+ }
+ result.push(current);
+ return result;
+}
+
+function colIndex(header: string): Map {
+ return new Map(splitCsv(header).map((c, i) => [c.trim().replace(/^\uFEFF/, ""), i]));
+}
+
+function inBbox(lat: number, lon: number, bbox: [number,number,number,number]): boolean {
+ const [minLng, minLat, maxLng, maxLat] = bbox;
+ return lat >= minLat && lat <= maxLat && lon >= minLng && lon <= maxLng;
+}
+
+function filterSmallCsv(
+ filePath: string,
+ keepRow: (idx: Map, fields: string[]) => boolean,
+ onKept?: (idx: Map, fields: string[]) => void,
+): void {
+ if (!existsSync(filePath)) return;
+ const lines = readFileSync(filePath, "utf8").split(/\r?\n/).filter((l) => l.trim());
+ if (lines.length < 2) return;
+ const idx = colIndex(lines[0]);
+ const out = [lines[0]];
+ for (let i = 1; i < lines.length; i++) {
+ const fields = splitCsv(lines[i]);
+ if (keepRow(idx, fields)) {
+ if (onKept) onKept(idx, fields);
+ out.push(lines[i]);
+ }
+ }
+ writeFileSync(filePath, out.join("\n") + "\n");
+}
+
+async function filterLargeCsv(
+ srcPath: string,
+ destPath: string,
+ keepRow: (targetCol: number, line: string) => boolean,
+ getTargetCol: (idx: Map) => number,
): Promise {
- if (bboxes.length === 0) return;
-
- console.log(
- `[download-gtfs-de] Filtering GTFS to ${bboxes.length} bbox(es):`,
- bboxes.map((b) => `[${b.map((v) => v.toFixed(3)).join(",")}]`).join(" "),
+ if (!existsSync(srcPath)) return;
+ const tmpPath = destPath + ".tmp";
+ const writer = createWriteStream(tmpPath);
+ let isFirst = true;
+ let targetCol = -1;
+ const rl = createInterface({ input: createReadStream(srcPath), crlfDelay: Infinity });
+ for await (const line of rl) {
+ if (!line.trim()) continue;
+ if (isFirst) {
+ isFirst = false;
+ targetCol = getTargetCol(colIndex(line));
+ writer.write(line + "\n");
+ continue;
+ }
+ if (keepRow(targetCol, line)) writer.write(line + "\n");
+ }
+ await new Promise((resolve, reject) =>
+ writer.end((err?: unknown) => (err ? reject(err) : resolve())),
);
+ renameSync(tmpPath, destPath);
+}
- // ── CSV helpers ─────────────────────────────────────────────────────────────
+/**
+ * Filter raw GTFS feed to a single city bbox.
+ * Reads from rawDir, writes to destDir.
+ * Also writes .stops_bbox (tight bbox of retained stops) for build-valhalla.
+ */
+async function filterGtfsForCity(
+ rawDir: string,
+ destDir: string,
+ bbox: [number, number, number, number],
+): Promise {
+ console.log(`[download-gtfs-de] Filtering GTFS to bbox [${bbox.map((v) => v.toFixed(3)).join(",")}] → ${destDir}`);
- function splitCsv(line: string): string[] {
- if (!line.includes('"')) return line.split(",");
- const result: string[] = [];
- let current = "";
- let inQuotes = false;
- for (let i = 0; i < line.length; i++) {
- const ch = line[i];
- if (ch === '"') {
- if (inQuotes && line[i + 1] === '"') { current += '"'; i++; }
- else inQuotes = !inQuotes;
- } else if (ch === "," && !inQuotes) {
- result.push(current); current = "";
- } else {
- current += ch;
- }
- }
- result.push(current);
- return result;
- }
-
- function colIndex(header: string): Map {
- return new Map(splitCsv(header).map((c, i) => [c.trim().replace(/^\uFEFF/, ""), i]));
- }
-
- function inAnyBbox(lat: number, lon: number): boolean {
- return bboxes.some(([minLng, minLat, maxLng, maxLat]) =>
- lat >= minLat && lat <= maxLat && lon >= minLng && lon <= maxLng,
- );
- }
-
- /** Filter a small CSV file (fits in memory) in-place. */
- function filterSmallCsv(
- filePath: string,
- keepRow: (idx: Map, fields: string[]) => boolean,
- onKept?: (idx: Map, fields: string[]) => void,
- ): void {
- if (!existsSync(filePath)) return;
- const lines = readFileSync(filePath, "utf8").split(/\r?\n/).filter((l) => l.trim());
- if (lines.length < 2) return;
- const idx = colIndex(lines[0]);
- const out = [lines[0]];
- for (let i = 1; i < lines.length; i++) {
- const fields = splitCsv(lines[i]);
- if (keepRow(idx, fields)) {
- if (onKept) onKept(idx, fields);
- out.push(lines[i]);
- }
- }
- writeFileSync(filePath, out.join("\n") + "\n");
- }
-
- /** Stream-filter a large CSV file in-place via a temp file. */
- async function filterLargeCsv(
- filePath: string,
- keepRow: (targetCol: number, line: string) => boolean,
- getTargetCol: (idx: Map) => number,
- ): Promise {
- if (!existsSync(filePath)) return;
- const tmpPath = filePath + ".tmp";
- const writer = createWriteStream(tmpPath);
- let isFirst = true;
- let targetCol = -1;
-
- const rl = createInterface({ input: createReadStream(filePath), crlfDelay: Infinity });
- for await (const line of rl) {
- if (!line.trim()) continue;
- if (isFirst) {
- isFirst = false;
- targetCol = getTargetCol(colIndex(line));
- writer.write(line + "\n");
- continue;
- }
- if (keepRow(targetCol, line)) writer.write(line + "\n");
- }
-
- await new Promise((resolve, reject) =>
- writer.end((err?: unknown) => (err ? reject(err) : resolve())),
- );
- renameSync(tmpPath, filePath);
- }
-
- // ── Step 1: collect bbox stop IDs (read-only — stops.txt not written yet) ──
- //
- // Build the set of stops within the bbox — used to seed validTripIds (step 2a)
- // and to filter stop_times to local stops only (step 2b). stops.txt itself is
- // filtered in step 3 to only bbox stops that appear in the final stop_times.
-
- const stopsPath = path.join(feedDir, "stops.txt");
+ const stopsPath = path.join(rawDir, "stops.txt");
if (!existsSync(stopsPath)) {
- console.log("[download-gtfs-de] No stops.txt — skipping GTFS bbox filter");
+ console.log("[download-gtfs-de] No stops.txt in raw dir — skipping filter");
return;
}
+ // ── Step 1: collect bbox stop IDs ──────────────────────────────────────────
const bboxStopIds = new Set();
- // Also track the bbox of seeding stops — used later to expand the road tile
- // extraction in build-valhalla to cover these stops without expanding to the
- // full retained-stops area (which includes Germany-wide long-distance trip stops).
let seedMinLng = Infinity, seedMinLat = Infinity, seedMaxLng = -Infinity, seedMaxLat = -Infinity;
{
const lines = readFileSync(stopsPath, "utf8").split(/\r?\n/).filter((l) => l.trim());
@@ -241,7 +192,7 @@ async function filterGtfsByBboxes(
const fields = splitCsv(lines[i]);
const lat = parseFloat(fields[latCol] ?? "NaN");
const lon = parseFloat(fields[lonCol] ?? "NaN");
- if (inAnyBbox(lat, lon)) {
+ if (inBbox(lat, lon, bbox)) {
bboxStopIds.add(fields[stopIdCol] ?? "");
if (isFinite(lat) && isFinite(lon)) {
seedMinLat = Math.min(seedMinLat, lat); seedMaxLat = Math.max(seedMaxLat, lat);
@@ -251,33 +202,25 @@ async function filterGtfsByBboxes(
}
}
}
- console.log(`[download-gtfs-de] Bbox filter: ${bboxStopIds.size} stops seed the area`);
+ console.log(`[download-gtfs-de] ${bboxStopIds.size} stops in bbox`);
if (bboxStopIds.size === 0) {
- console.warn(
- "[download-gtfs-de] No stops found in any bbox — GTFS filter skipped " +
- "(check bbox coverage and feed area)",
- );
+ console.warn("[download-gtfs-de] No stops found in bbox — GTFS filter skipped");
return;
}
- // ── Step 2a: collect trip_ids that serve the area (pass 1) ────────────────
-
- const stopTimesPath = path.join(feedDir, "stop_times.txt");
- if (!existsSync(stopTimesPath)) {
- console.log("[download-gtfs-de] No stop_times.txt — skipping trip filter");
+ // ── Step 2a: collect trip_ids with ≥2 bbox stops ───────────────────────────
+ const stopTimesRaw = path.join(rawDir, "stop_times.txt");
+ if (!existsSync(stopTimesRaw)) {
+ console.log("[download-gtfs-de] No stop_times.txt — skipping");
return;
}
const validTripIds = new Set();
- // Count how many bbox-local stops each trip has — trips with only 1 bbox
- // stop are useless for routing (no O→D pair) and are pruned before step 2b.
const tripBboxStopCount = new Map();
{
- let stopIdCol = -1;
- let tripIdCol = -1;
- let isFirst = true;
- const rl = createInterface({ input: createReadStream(stopTimesPath), crlfDelay: Infinity });
+ let stopIdCol = -1, tripIdCol = -1, isFirst = true;
+ const rl = createInterface({ input: createReadStream(stopTimesRaw), crlfDelay: Infinity });
for await (const line of rl) {
if (!line.trim()) continue;
if (isFirst) {
@@ -290,39 +233,37 @@ async function filterGtfsByBboxes(
const fields = line.split(",");
const tripId = fields[tripIdCol] ?? "";
const stopId = fields[stopIdCol] ?? "";
- if (stopIdCol >= 0 && bboxStopIds.has(stopId)) {
+ if (bboxStopIds.has(stopId)) {
validTripIds.add(tripId);
tripBboxStopCount.set(tripId, (tripBboxStopCount.get(tripId) ?? 0) + 1);
}
}
}
- // Remove trips with only one bbox stop — they can't provide an O→D pair
for (const tripId of validTripIds) {
if ((tripBboxStopCount.get(tripId) ?? 0) < 2) validTripIds.delete(tripId);
}
- console.log(`[download-gtfs-de] Bbox filter: ${validTripIds.size} trips with ≥2 bbox stops serve the area`);
-
- // ── Step 2b: write filtered stop_times, keeping only bbox stops (pass 2) ───
- //
- // We keep a stop_times entry only when BOTH:
- // - its trip has ≥2 bbox stops (trip_id ∈ validTripIds), AND
- // - the stop itself is within the city bbox (stop_id ∈ bboxStopIds).
- //
- // Out-of-bbox stops on long-distance routes (e.g. ICE Hamburg↔Bremen passing
- // through Oldenburg) are stripped from stop_times. Trips with only one bbox
- // stop are removed entirely (no O→D pair, useless for routing). This limits
- // the transit graph to local stops only, ensuring valhalla_build_tiles can
- // create road connections for all included stops without ghost edge references
- // that cause routing errors for other modes (bicycle, driving).
+ console.log(`[download-gtfs-de] ${validTripIds.size} trips with ≥2 bbox stops`);
+ // ── Step 2b: write filtered stop_times (bbox stops on valid trips only) ─────
const allTripStopIds = new Set();
+ await filterLargeCsv(
+ stopTimesRaw,
+ path.join(destDir, "stop_times.txt"),
+ (tripCol, line) => {
+ const fields = line.split(",");
+ const tripId = fields[tripCol] ?? "";
+ const stopId = fields[tripCol + 1] ?? ""; // wrong index — handled below
+ return validTripIds.has(tripId);
+ },
+ (idx) => idx.get("trip_id") ?? -1,
+ );
+ // Re-read filtered stop_times to collect actual stop IDs and also filter to bbox stops only
{
- const tmpPath = stopTimesPath + ".tmp";
+ const tmpPath = path.join(destDir, "stop_times.txt") + ".tmp2";
const writer = createWriteStream(tmpPath);
let isFirst = true;
- let tripIdCol = -1;
- let stopIdCol = -1;
- const rl = createInterface({ input: createReadStream(stopTimesPath), crlfDelay: Infinity });
+ let tripIdCol = -1, stopIdCol = -1;
+ const rl = createInterface({ input: createReadStream(path.join(destDir, "stop_times.txt")), crlfDelay: Infinity });
for await (const line of rl) {
if (!line.trim()) continue;
if (isFirst) {
@@ -334,242 +275,196 @@ async function filterGtfsByBboxes(
continue;
}
const fields = line.split(",");
- if (
- validTripIds.has(fields[tripIdCol] ?? "") &&
- bboxStopIds.has(fields[stopIdCol] ?? "")
- ) {
- allTripStopIds.add(fields[stopIdCol] ?? "");
+ const tripId = fields[tripIdCol] ?? "";
+ const stopId = fields[stopIdCol] ?? "";
+ if (validTripIds.has(tripId) && bboxStopIds.has(stopId)) {
+ allTripStopIds.add(stopId);
writer.write(line + "\n");
}
}
await new Promise((resolve, reject) =>
writer.end((err?: unknown) => (err ? reject(err) : resolve())),
);
- renameSync(tmpPath, stopTimesPath);
+ renameSync(tmpPath, path.join(destDir, "stop_times.txt"));
}
- // ── Step 3: filter stops.txt to bbox stops used by kept trips ──────────────
-
- filterSmallCsv(
- stopsPath,
- (idx, fields) => allTripStopIds.has(fields[idx.get("stop_id") ?? -1] ?? ""),
- );
+ // ── Step 3: filter stops.txt (only stops that appear in final stop_times) ───
+ {
+ const dest = path.join(destDir, "stops.txt");
+ const lines = readFileSync(stopsPath, "utf8").split(/\r?\n/).filter((l) => l.trim());
+ if (lines.length >= 2) {
+ const idx = colIndex(lines[0]);
+ const stopIdCol = idx.get("stop_id") ?? -1;
+ const out = [lines[0]];
+ for (let i = 1; i < lines.length; i++) {
+ const fields = splitCsv(lines[i]);
+ if (allTripStopIds.has(fields[stopIdCol] ?? "")) out.push(lines[i]);
+ }
+ writeFileSync(dest, out.join("\n") + "\n");
+ }
+ }
if (isFinite(seedMinLat)) {
const stopsBbox: [number, number, number, number] = [seedMinLng, seedMinLat, seedMaxLng, seedMaxLat];
- writeFileSync(path.join(feedDir, ".stops_bbox"), JSON.stringify(stopsBbox));
- console.log(
- `[download-gtfs-de] Transit stops bbox (seeding area): [${stopsBbox.map((v) => v.toFixed(3)).join(", ")}]`,
- );
+ writeFileSync(path.join(destDir, ".stops_bbox"), JSON.stringify(stopsBbox));
+ console.log(`[download-gtfs-de] Transit stops bbox: [${stopsBbox.map((v) => v.toFixed(3)).join(", ")}]`);
}
- console.log(`[download-gtfs-de] Bbox filter: ${allTripStopIds.size} bbox stops kept across ${validTripIds.size} trips`);
-
- // ── Step 4: filter trips.txt ───────────────────────────────────────────────
+ // ── Step 4: filter trips.txt → collect route/service/shape IDs ─────────────
const validRouteIds = new Set();
const validServiceIds = new Set();
const validShapeIds = new Set();
-
- filterSmallCsv(
- path.join(feedDir, "trips.txt"),
- (idx, fields) => validTripIds.has(fields[idx.get("trip_id") ?? -1] ?? ""),
- (idx, fields) => {
- validRouteIds.add(fields[idx.get("route_id") ?? -1] ?? "");
- validServiceIds.add(fields[idx.get("service_id") ?? -1] ?? "");
- const shapeId = fields[idx.get("shape_id") ?? -1] ?? "";
- if (shapeId) validShapeIds.add(shapeId);
- },
- );
-
- // ── Step 5: filter remaining files ────────────────────────────────────────
-
- filterSmallCsv(
- path.join(feedDir, "routes.txt"),
- (idx, fields) => validRouteIds.has(fields[idx.get("route_id") ?? -1] ?? ""),
- );
-
- for (const name of ["calendar.txt", "calendar_dates.txt"] as const) {
- filterSmallCsv(
- path.join(feedDir, name),
- (idx, fields) => validServiceIds.has(fields[idx.get("service_id") ?? -1] ?? ""),
- );
+ {
+ const src = path.join(rawDir, "trips.txt");
+ const dest = path.join(destDir, "trips.txt");
+ if (existsSync(src)) {
+ const lines = readFileSync(src, "utf8").split(/\r?\n/).filter((l) => l.trim());
+ if (lines.length >= 2) {
+ const idx = colIndex(lines[0]);
+ const tripIdCol = idx.get("trip_id") ?? -1;
+ const routeIdCol = idx.get("route_id") ?? -1;
+ const serviceIdCol = idx.get("service_id") ?? -1;
+ const shapeIdCol = idx.get("shape_id") ?? -1;
+ const out = [lines[0]];
+ for (let i = 1; i < lines.length; i++) {
+ const fields = splitCsv(lines[i]);
+ if (validTripIds.has(fields[tripIdCol] ?? "")) {
+ out.push(lines[i]);
+ validRouteIds.add(fields[routeIdCol] ?? "");
+ validServiceIds.add(fields[serviceIdCol] ?? "");
+ const shapeId = fields[shapeIdCol] ?? "";
+ if (shapeId) validShapeIds.add(shapeId);
+ }
+ }
+ writeFileSync(dest, out.join("\n") + "\n");
+ }
+ }
}
- // shapes.txt can be large — stream it
+ // ── Step 5: filter routes, calendar, calendar_dates ────────────────────────
+ for (const [file, idCol, validIds] of [
+ ["routes.txt", "route_id", validRouteIds],
+ ["calendar.txt", "service_id", validServiceIds],
+ ["calendar_dates.txt", "service_id", validServiceIds],
+ ] as const) {
+ const src = path.join(rawDir, file);
+ const dest = path.join(destDir, file);
+ if (!existsSync(src)) continue;
+ const lines = readFileSync(src, "utf8").split(/\r?\n/).filter((l) => l.trim());
+ if (lines.length < 2) { writeFileSync(dest, lines[0] + "\n"); continue; }
+ const idx = colIndex(lines[0]);
+ const col = idx.get(idCol) ?? -1;
+ const out = [lines[0]];
+ for (let i = 1; i < lines.length; i++) {
+ const fields = splitCsv(lines[i]);
+ if ((validIds as Set).has(fields[col] ?? "")) out.push(lines[i]);
+ }
+ writeFileSync(dest, out.join("\n") + "\n");
+ }
+
+ // ── Step 6: shapes.txt (large — stream-filter) ─────────────────────────────
if (validShapeIds.size > 0) {
await filterLargeCsv(
- path.join(feedDir, "shapes.txt"),
+ path.join(rawDir, "shapes.txt"),
+ path.join(destDir, "shapes.txt"),
(col, line) => validShapeIds.has(line.split(",")[col] ?? ""),
(idx) => idx.get("shape_id") ?? -1,
);
}
console.log(
- `[download-gtfs-de] GTFS filter complete: ` +
- `${allTripStopIds.size} stops, ${validTripIds.size} trips, ${validRouteIds.size} routes`,
+ `[download-gtfs-de] Filter complete: ${allTripStopIds.size} stops, ` +
+ `${validTripIds.size} trips, ${validRouteIds.size} routes`,
);
}
// ─── Job handler ──────────────────────────────────────────────────────────────
export async function handleDownloadGtfsDe(job: Job): Promise {
- const { url, force = false, bboxes } = job.data;
- const effectiveSource = "gtfs-de";
+ const { url, citySlug, bbox, force = false } = job.data;
+ const effectiveSource = url;
- // ── Idempotency check ──────────────────────────────────────────────────────
- //
- // Skip entirely when source is unchanged AND data is present AND the existing
- // filter already covers all requested bboxes.
- //
- // Filter-only (no re-download) when data is present with the same source but
- // the existing data is unfiltered (marker has no bboxes) while bboxes are now
- // requested. The unfiltered data on disk is the superset we need.
- //
- // Re-download when source changes OR the existing filter bbox set no longer
- // covers all requested bboxes (e.g. a new city was added outside the
- // previously covered area).
+ const destDir = cityFeedDir(citySlug);
+ const cityMarker = readCityMarker(citySlug);
- const existingMarker = readSourceMarker();
- const sourceChanged = existingMarker?.source !== effectiveSource;
+ // ── Check if per-city feed is already up to date ───────────────────────────
+ const cityDataExists = existsSync(destDir) && readdirSync(destDir).some((f) => f.endsWith(".txt"));
+ if (!force && cityDataExists && cityMarker?.source === effectiveSource &&
+ cityMarker?.filterVersion === FILTER_VERSION && bboxEqual(cityMarker.bbox, bbox)) {
+ console.log(`[download-gtfs-de] Per-city feed for ${citySlug} is up to date, skipping`);
+ await job.updateProgress({ stage: "Downloading GTFS", pct: 100, message: "Feed up to date." } satisfies JobProgress);
+ return;
+ }
- const dataExists = existsSync(GTFS_FEED_DIR) &&
- readdirSync(GTFS_FEED_DIR).some((f) => f.endsWith(".txt"));
+ // ── Ensure raw feed is present ─────────────────────────────────────────────
+ const rawMarker = readRawMarker();
+ const rawExists = existsSync(GTFS_RAW_DIR) && readdirSync(GTFS_RAW_DIR).some((f) => f.endsWith(".txt"));
- if (!force && !sourceChanged && dataExists) {
- const existingBboxes = existingMarker?.bboxes;
- const filterVersionOk = existingMarker?.filterVersion === FILTER_VERSION;
+ if (force || !rawExists || rawMarker?.source !== effectiveSource) {
+ await job.updateProgress({ stage: "Downloading GTFS", pct: 5, message: `Downloading GTFS feed…` } satisfies JobProgress);
- // Does the existing filtered data cover all requested bboxes?
- const bboxesCovered = !bboxes?.length || allBboxesCovered(existingBboxes, bboxes);
+ mkdirSync(GTFS_DATA_DIR, { recursive: true });
- if (bboxesCovered) {
- // Marker already reflects desired filtering?
- const markerOk = !bboxes?.length || (existingBboxes && existingBboxes.length > 0);
-
- if (markerOk && filterVersionOk) {
- console.log(`[download-gtfs-de] GTFS feed up to date (source=${effectiveSource}, filterVersion=${FILTER_VERSION}), skipping`);
- await job.updateProgress({
- stage: "Downloading GTFS",
- pct: 100,
- message: "GTFS data already present and up to date.",
- } satisfies JobProgress);
- return;
- }
-
- // Data needs re-filtering: either unfiltered (bboxes newly requested)
- // or filter algorithm changed (filterVersion mismatch).
- const reason = !filterVersionOk
- ? `filter algorithm updated (v${existingMarker?.filterVersion ?? "none"} → v${FILTER_VERSION})`
- : "applying bbox filter to unfiltered data";
- console.log(`[download-gtfs-de] Re-filtering existing GTFS data: ${reason}`);
- await job.updateProgress({
- stage: "Downloading GTFS",
- pct: 10,
- message: "Filtering existing GTFS feed to city areas…",
- } satisfies JobProgress);
- await filterGtfsByBboxes(GTFS_FEED_DIR, bboxes!);
- writeSourceMarker(effectiveSource, bboxes);
- await job.updateProgress({
- stage: "Downloading GTFS",
- pct: 100,
- message: "GTFS feed filtered to city areas.",
- } satisfies JobProgress);
- return;
+ const response = await fetch(url, { signal: AbortSignal.timeout(600_000) });
+ if (!response.ok || !response.body) {
+ throw new Error(`Failed to download GTFS: HTTP ${response.status} ${response.statusText}`);
}
- // Existing filter too small — need fresh data from network.
- console.log(`[download-gtfs-de] Existing GTFS filter too small for new areas — re-downloading`);
- }
+ const totalBytes = Number(response.headers.get("content-length") ?? 0);
+ let downloadedBytes = 0;
+ let lastReportedPct = 5;
- if (sourceChanged) {
- console.log(
- `[download-gtfs-de] Source changed ` +
- `(${existingMarker?.source ?? "none"} → ${effectiveSource}), re-downloading`,
- );
- }
-
- mkdirSync(GTFS_DATA_DIR, { recursive: true });
-
- // ── Download ───────────────────────────────────────────────────────────────
- await job.updateProgress({
- stage: "Downloading GTFS",
- pct: 5,
- message: `Downloading GTFS feed (source: ${effectiveSource})…`,
- } satisfies JobProgress);
-
- const response = await fetch(url, { signal: AbortSignal.timeout(600_000) });
- if (!response.ok || !response.body) {
- throw new Error(`Failed to download GTFS: HTTP ${response.status} ${response.statusText}`);
- }
-
- const totalBytes = Number(response.headers.get("content-length") ?? 0);
- let downloadedBytes = 0;
- let lastReportedPct = 5;
-
- const nodeReadable = Readable.fromWeb(response.body as Parameters[0]);
- nodeReadable.on("data", (chunk: Buffer) => {
- downloadedBytes += chunk.length;
- if (totalBytes > 0) {
- const pct = Math.min(55, 5 + Math.round((downloadedBytes / totalBytes) * 50));
- if (pct > lastReportedPct + 4) {
- lastReportedPct = pct;
- void job.updateProgress({
- stage: "Downloading GTFS",
- pct,
- message: `Downloading… ${(downloadedBytes / 1024 / 1024).toFixed(1)} / ${(totalBytes / 1024 / 1024).toFixed(1)} MB`,
- bytesDownloaded: downloadedBytes,
- totalBytes,
- } satisfies JobProgress);
+ const nodeReadable = Readable.fromWeb(response.body as Parameters[0]);
+ nodeReadable.on("data", (chunk: Buffer) => {
+ downloadedBytes += chunk.length;
+ if (totalBytes > 0) {
+ const pct = Math.min(55, 5 + Math.round((downloadedBytes / totalBytes) * 50));
+ if (pct > lastReportedPct + 4) {
+ lastReportedPct = pct;
+ void job.updateProgress({
+ stage: "Downloading GTFS", pct,
+ message: `Downloading… ${(downloadedBytes / 1024 / 1024).toFixed(1)} / ${(totalBytes / 1024 / 1024).toFixed(1)} MB`,
+ bytesDownloaded: downloadedBytes, totalBytes,
+ } satisfies JobProgress);
+ }
}
+ });
+
+ await pipeline(nodeReadable, createWriteStream(GTFS_ZIP_PATH));
+ console.log(`[download-gtfs-de] Downloaded ${(downloadedBytes / 1024 / 1024).toFixed(1)} MB`);
+
+ await job.updateProgress({ stage: "Downloading GTFS", pct: 60, message: "Extracting GTFS feed…" } satisfies JobProgress);
+
+ if (existsSync(GTFS_RAW_DIR)) rmSync(GTFS_RAW_DIR, { recursive: true, force: true });
+ mkdirSync(GTFS_RAW_DIR, { recursive: true });
+
+ const zip = unzipper.Parse({ forceStream: true });
+ createReadStream(GTFS_ZIP_PATH).pipe(zip);
+ for await (const entry of zip) {
+ const e = entry as unzipper.Entry;
+ const destPath = path.join(GTFS_RAW_DIR, path.basename(e.path));
+ if (e.type === "Directory") { e.autodrain(); continue; }
+ await mkdir(path.dirname(destPath), { recursive: true });
+ await pipeline(e as unknown as NodeJS.ReadableStream, createWriteStream(destPath));
}
- });
+ rmSync(GTFS_ZIP_PATH, { force: true });
- await pipeline(nodeReadable, createWriteStream(GTFS_ZIP_PATH));
- console.log(`[download-gtfs-de] Downloaded ${(downloadedBytes / 1024 / 1024).toFixed(1)} MB`);
-
- // ── Extract ────────────────────────────────────────────────────────────────
- await job.updateProgress({
- stage: "Downloading GTFS",
- pct: 60,
- message: "Extracting GTFS feed…",
- } satisfies JobProgress);
-
- if (existsSync(GTFS_FEED_DIR)) rmSync(GTFS_FEED_DIR, { recursive: true, force: true });
- mkdirSync(GTFS_FEED_DIR, { recursive: true });
-
- const zip = unzipper.Parse({ forceStream: true });
- createReadStream(GTFS_ZIP_PATH).pipe(zip);
-
- for await (const entry of zip) {
- const e = entry as unzipper.Entry;
- const destPath = path.join(GTFS_FEED_DIR, path.basename(e.path));
- if (e.type === "Directory") { e.autodrain(); continue; }
- await mkdir(path.dirname(destPath), { recursive: true });
- await pipeline(e as unknown as NodeJS.ReadableStream, createWriteStream(destPath));
+ const extractedFiles = readdirSync(GTFS_RAW_DIR);
+ console.log(`[download-gtfs-de] Extracted ${extractedFiles.length} files to ${GTFS_RAW_DIR}`);
+ writeFileSync(RAW_MARKER, JSON.stringify({ source: effectiveSource }));
+ } else {
+ console.log(`[download-gtfs-de] Raw feed already present (source=${effectiveSource})`);
+ await job.updateProgress({ stage: "Downloading GTFS", pct: 60, message: "Using cached raw feed." } satisfies JobProgress);
}
- const extractedFiles = readdirSync(GTFS_FEED_DIR);
- console.log(`[download-gtfs-de] Extracted ${extractedFiles.length} files to ${GTFS_FEED_DIR}`);
+ // ── Filter raw feed for this city ──────────────────────────────────────────
+ await job.updateProgress({ stage: "Downloading GTFS", pct: 65, message: `Filtering GTFS for ${citySlug}…` } satisfies JobProgress);
- rmSync(GTFS_ZIP_PATH, { force: true });
+ if (existsSync(destDir)) rmSync(destDir, { recursive: true, force: true });
+ mkdirSync(destDir, { recursive: true });
- // ── Bbox filter ────────────────────────────────────────────────────────────
- if (bboxes && bboxes.length > 0) {
- await job.updateProgress({
- stage: "Downloading GTFS",
- pct: 65,
- message: `Filtering GTFS feed to ${bboxes.length} city area(s)…`,
- } satisfies JobProgress);
- await filterGtfsByBboxes(GTFS_FEED_DIR, bboxes);
- }
+ await filterGtfsForCity(GTFS_RAW_DIR, destDir, bbox);
+ writeCityMarker(citySlug, effectiveSource, bbox);
- writeSourceMarker(effectiveSource, bboxes?.length ? bboxes : undefined);
-
- await job.updateProgress({
- stage: "Downloading GTFS",
- pct: 100,
- message: bboxes?.length
- ? `GTFS feed ready and filtered to ${bboxes.length} city area(s) (source: ${effectiveSource}).`
- : `GTFS feed ready: ${extractedFiles.length} files (source: ${effectiveSource}).`,
- } satisfies JobProgress);
+ await job.updateProgress({ stage: "Downloading GTFS", pct: 100, message: `GTFS ready for ${citySlug}.` } satisfies JobProgress);
}
diff --git a/worker/src/jobs/download-pbf.ts b/worker/src/jobs/download-pbf.ts
index a34a8de..2d30110 100644
--- a/worker/src/jobs/download-pbf.ts
+++ b/worker/src/jobs/download-pbf.ts
@@ -1,6 +1,7 @@
import type { Job } from "bullmq";
import { createWriteStream, mkdirSync, statSync, renameSync } from "fs";
import { Writable } from "stream";
+import { basename } from "path";
import type { JobProgress } from "@transportationer/shared";
export type DownloadPbfData = {
@@ -15,31 +16,39 @@ const ALLOWED_PATTERN =
const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm";
+/** PBFs older than this are re-downloaded. */
+const MAX_AGE_MS = 24 * 3600 * 1000;
+
+export function pbfPath(geofabrikUrl: string): string {
+ return `${OSM_DATA_DIR}/${basename(new URL(geofabrikUrl).pathname)}`;
+}
+
export async function handleDownloadPbf(
job: Job,
): Promise {
- const { citySlug, geofabrikUrl, expectedBytes } = job.data;
+ const { geofabrikUrl, expectedBytes } = job.data;
if (!ALLOWED_PATTERN.test(geofabrikUrl)) {
throw new Error(`Rejected URL (must be a Geofabrik PBF): ${geofabrikUrl}`);
}
mkdirSync(OSM_DATA_DIR, { recursive: true });
- const outputPath = `${OSM_DATA_DIR}/${citySlug}-latest.osm.pbf`;
- // Use job.id in the tmp path so two concurrent download-pbf jobs for the
- // same city (one under extract-pois, one under build-valhalla) don't write
- // to the same file and corrupt each other.
+ const outputPath = pbfPath(geofabrikUrl);
+ // Use job.id in the tmp path so concurrent download-pbf jobs for the same
+ // URL don't write to the same temp file and corrupt each other.
const tmpPath = `${outputPath}.${job.id}.tmp`;
- // Idempotency: skip if a complete file is already on disk (supports
- // parallel download-pbf instances for the same city PBF).
+ // Cache hit: skip if file is on disk and younger than MAX_AGE_MS.
+ // Keyed by URL basename so multiple cities sharing the same regional PBF
+ // (e.g. all cities in Niedersachsen) only download it once.
try {
const stat = statSync(outputPath);
- if (stat.size > 0) {
+ const ageMs = Date.now() - stat.mtimeMs;
+ if (stat.size > 0 && ageMs < MAX_AGE_MS) {
await job.updateProgress({
stage: "Downloading PBF",
pct: 100,
- message: `Already on disk: ${outputPath} (${(stat.size / 1_048_576).toFixed(1)} MB)`,
+ message: `Cached: ${outputPath} (${(stat.size / 1_048_576).toFixed(1)} MB, ${Math.round(ageMs / 3600_000)}h old)`,
} satisfies JobProgress);
return;
}
diff --git a/worker/src/jobs/refresh-city.ts b/worker/src/jobs/refresh-city.ts
index 0650911..fd3853c 100644
--- a/worker/src/jobs/refresh-city.ts
+++ b/worker/src/jobs/refresh-city.ts
@@ -4,6 +4,7 @@ import { createBullMQConnection } from "../redis.js";
import { getSql } from "../db.js";
import { JOB_OPTIONS, VALID_THRESHOLDS } from "@transportationer/shared";
import type { JobProgress } from "@transportationer/shared";
+import { pbfPath } from "./download-pbf.js";
export type RefreshCityData = {
type: "refresh-city";
@@ -15,7 +16,6 @@ export type RefreshCityData = {
computeScoresJobId?: string;
};
-const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm";
/** True when the given bbox intersects Niedersachsen. */
function isInNiedersachsen(minLng: number, minLat: number, maxLng: number, maxLat: number): boolean {
@@ -39,34 +39,22 @@ export async function handleRefreshCity(
const { citySlug, geofabrikUrl, resolutionM = 200, iter = 0 } = job.data;
const sql = getSql();
- const pbfPath = `${OSM_DATA_DIR}/${citySlug}-latest.osm.pbf`;
+ const localPbfPath = pbfPath(geofabrikUrl);
// Read the user-specified bbox from the database (set at city creation time).
// If present, it will be passed to extract-pois to clip the PBF before import.
// Also read ALL city bboxes for the GTFS filter: each city gets its own bbox
// (with a small buffer) so valhalla_ingest_transit only processes relevant stops.
- const [bboxRows, allCityBboxRows] = await Promise.all([
- Promise.resolve(sql<{
- minlng: number; minlat: number; maxlng: number; maxlat: number;
- }[]>`
- SELECT
- ST_XMin(bbox)::float AS minlng,
- ST_YMin(bbox)::float AS minlat,
- ST_XMax(bbox)::float AS maxlng,
- ST_YMax(bbox)::float AS maxlat
- FROM cities WHERE slug = ${citySlug} AND bbox IS NOT NULL
- `),
- Promise.resolve(sql<{
- minlng: number; minlat: number; maxlng: number; maxlat: number;
- }[]>`
- SELECT
- ST_XMin(bbox)::float AS minlng,
- ST_YMin(bbox)::float AS minlat,
- ST_XMax(bbox)::float AS maxlng,
- ST_YMax(bbox)::float AS maxlat
- FROM cities WHERE bbox IS NOT NULL
- `),
- ]);
+ const bboxRows = await Promise.resolve(sql<{
+ minlng: number; minlat: number; maxlng: number; maxlat: number;
+ }[]>`
+ SELECT
+ ST_XMin(bbox)::float AS minlng,
+ ST_YMin(bbox)::float AS minlat,
+ ST_XMax(bbox)::float AS maxlng,
+ ST_YMax(bbox)::float AS maxlat
+ FROM cities WHERE slug = ${citySlug} AND bbox IS NOT NULL
+ `);
const bbox: [number, number, number, number] | undefined =
bboxRows.length > 0
@@ -75,12 +63,9 @@ export async function handleRefreshCity(
// ~10 km buffer for GTFS stop coverage near city edges (0.09° ≈ 10 km)
const GTFS_BUFFER = 0.09;
- const gtfsBboxes: [number, number, number, number][] = allCityBboxRows.map((r) => [
- r.minlng - GTFS_BUFFER,
- r.minlat - GTFS_BUFFER,
- r.maxlng + GTFS_BUFFER,
- r.maxlat + GTFS_BUFFER,
- ]);
+ const gtfsBbox: [number, number, number, number] | undefined = bbox
+ ? [bbox[0] - GTFS_BUFFER, bbox[1] - GTFS_BUFFER, bbox[2] + GTFS_BUFFER, bbox[3] + GTFS_BUFFER]
+ : undefined;
await job.updateProgress({
stage: "Orchestrating pipeline",
@@ -150,7 +135,7 @@ export async function handleRefreshCity(
data: {
type: "extract-pois" as const,
citySlug,
- pbfPath,
+ pbfPath: localPbfPath,
...(bbox ? { bbox } : {}),
},
opts: { ...JOB_OPTIONS["extract-pois"], jobId: `extract-pois.${citySlug}.${iter}` },
@@ -164,7 +149,7 @@ export async function handleRefreshCity(
data: {
type: "build-valhalla" as const,
citySlug,
- pbfPath,
+ pbfPath: localPbfPath,
...(bbox ? { bbox } : {}),
},
opts: { ...JOB_OPTIONS["build-valhalla"], jobId: `build-valhalla.${citySlug}.${iter}` },
@@ -178,26 +163,25 @@ export async function handleRefreshCity(
data: {
type: "build-valhalla" as const,
citySlug,
- pbfPath,
+ pbfPath: localPbfPath,
...(bbox ? { bbox } : {}),
},
opts: { ...JOB_OPTIONS["build-valhalla"], jobId: `build-valhalla-transit.${citySlug}.${iter}` },
children: [
downloadNode(),
- // Download GTFS feed before building transit tiles. Idempotent —
- // skips if the feed is current, so subsequent refreshes are cheap.
- {
+ // Download and filter GTFS feed for this city before building transit tiles.
+ // Raw feed is cached globally; only the per-city filter re-runs on bbox change.
+ ...(gtfsBbox ? [{
name: "download-gtfs-de",
queueName: "valhalla-transit",
data: {
type: "download-gtfs-de" as const,
url: "https://download.gtfs.de/germany/nv_free/latest.zip",
- // Per-city bboxes (with ~10 km buffer) so valhalla_ingest_transit
- // only processes stops/trips relevant to the known cities.
- ...(gtfsBboxes.length > 0 ? { bboxes: gtfsBboxes } : {}),
+ citySlug,
+ bbox: gtfsBbox,
},
- opts: JOB_OPTIONS["download-gtfs-de"],
- },
+ opts: { ...JOB_OPTIONS["download-gtfs-de"], jobId: `download-gtfs-de.${citySlug}.${iter}` },
+ }] : []),
],
},
],
diff --git a/worker/src/routing-client.ts b/worker/src/routing-client.ts
new file mode 100644
index 0000000..f4624b1
--- /dev/null
+++ b/worker/src/routing-client.ts
@@ -0,0 +1,127 @@
+/**
+ * Routing client for the main pipeline worker.
+ * Dispatches matrix/isochrone jobs to the valhalla routing queues and
+ * waits for the results. The main worker has zero Valhalla tile knowledge.
+ */
+import { Queue, QueueEvents } from "bullmq";
+import { createBullMQConnection } from "./redis.js";
+
+export interface LatLng {
+ lat: number;
+ lng: number;
+}
+
+export interface TransitContour {
+ minutes: number;
+ geojson: object;
+}
+
+// Contour times that must match valhalla.ts TRANSIT_CONTOUR_MINUTES.
+const TRANSIT_CONTOUR_MINUTES = [5, 10, 15, 20, 30] as const;
+
+// ─── Queue singletons ──────────────────────────────────────────────────────────
+
+let roadQueue: Queue | null = null;
+let roadQueueEvents: QueueEvents | null = null;
+let transitQueue: Queue | null = null;
+let transitQueueEvents: QueueEvents | null = null;
+
+function getRoadQueue(): Queue {
+ if (!roadQueue) {
+ roadQueue = new Queue("routing", {
+ connection: createBullMQConnection(),
+ defaultJobOptions: { removeOnComplete: { age: 120 }, removeOnFail: { age: 300 } },
+ });
+ }
+ return roadQueue;
+}
+
+function getRoadQueueEvents(): QueueEvents {
+ if (!roadQueueEvents) {
+ roadQueueEvents = new QueueEvents("routing", { connection: createBullMQConnection() });
+ }
+ return roadQueueEvents;
+}
+
+function getTransitQueue(): Queue {
+ if (!transitQueue) {
+ transitQueue = new Queue("routing-transit", {
+ connection: createBullMQConnection(),
+ defaultJobOptions: { removeOnComplete: { age: 120 }, removeOnFail: { age: 300 } },
+ });
+ }
+ return transitQueue;
+}
+
+function getTransitQueueEvents(): QueueEvents {
+ if (!transitQueueEvents) {
+ transitQueueEvents = new QueueEvents("routing-transit", { connection: createBullMQConnection() });
+ }
+ return transitQueueEvents;
+}
+
+const MATRIX_TIMEOUT_MS = 90_000;
+const ISOCHRONE_TIMEOUT_MS = 60_000;
+
+/**
+ * Dispatch a sources_to_targets matrix job to the road routing queue and wait.
+ * Returns an M×N matrix of travel times in seconds (null = unreachable).
+ */
+export async function dispatchMatrix(
+ sources: LatLng[],
+ targets: LatLng[],
+ mode: "walking" | "cycling" | "driving",
+ citySlug: string,
+): Promise<(number | null)[][]> {
+ const job = await getRoadQueue().add("matrix", {
+ type: "matrix",
+ sources,
+ targets,
+ mode,
+ citySlug,
+ });
+ const result = await job.waitUntilFinished(getRoadQueueEvents(), MATRIX_TIMEOUT_MS) as (number | null)[][];
+ return result;
+}
+
+/**
+ * Dispatch a transit isochrone job to the transit routing queue and wait.
+ * Returns parsed contour array or null if the point has no transit coverage.
+ */
+export async function dispatchTransitIsochrone(
+ gp: { lat: number; lng: number },
+ departureDate: string,
+ citySlug: string,
+): Promise {
+ const job = await getTransitQueue().add("isochrone", {
+ type: "isochrone",
+ lat: gp.lat,
+ lng: gp.lng,
+ travelMode: "transit",
+ contourMinutes: [...TRANSIT_CONTOUR_MINUTES],
+ citySlug,
+ departureDate,
+ });
+
+ let geojson: object | null;
+ try {
+ geojson = await job.waitUntilFinished(getTransitQueueEvents(), ISOCHRONE_TIMEOUT_MS) as object | null;
+ } catch {
+ return null;
+ }
+
+ if (!geojson) return null;
+ return parseTransitContours(geojson);
+}
+
+/** Parse a Valhalla isochrone FeatureCollection into TransitContour[]. */
+export function parseTransitContours(geojson: object): TransitContour[] | null {
+ const fc = geojson as { features?: Array<{ properties: { contour: number }; geometry: object }> };
+ if (!Array.isArray(fc.features)) return null;
+ const contours: TransitContour[] = [];
+ for (const minutes of TRANSIT_CONTOUR_MINUTES) {
+ const feature = fc.features.find((f) => f.properties?.contour === minutes);
+ if (feature?.geometry) contours.push({ minutes, geojson: feature.geometry });
+ }
+ return contours.length >= 2 ? contours : null;
+}
diff --git a/worker/src/valhalla-main.ts b/worker/src/valhalla-main.ts
index b395285..6e68f56 100644
--- a/worker/src/valhalla-main.ts
+++ b/worker/src/valhalla-main.ts
@@ -1,112 +1,100 @@
import { Worker, type Job } from "bullmq";
-import { spawn, type ChildProcess } from "child_process";
-import { existsSync } from "fs";
import { createBullMQConnection } from "./redis.js";
import { handleBuildValhalla } from "./jobs/build-valhalla.js";
import { handleDownloadGtfsDe } from "./jobs/download-gtfs-de.js";
+import { actorIsochrone, actorMatrix } from "./valhalla.js";
-const VALHALLA_CONFIG = process.env.VALHALLA_CONFIG ?? "/data/valhalla/valhalla.json";
const VALHALLA_QUEUE_NAME = process.env.VALHALLA_QUEUE_NAME ?? "valhalla";
-console.log(`[valhalla-worker] Starting Transportationer Valhalla worker (queue=${VALHALLA_QUEUE_NAME})…`);
+// Derive routing queue name from build queue name.
+// valhalla → routing (road: walking/cycling/driving)
+// valhalla-transit → routing-transit (multimodal)
+const ROUTING_QUEUE_NAME =
+ VALHALLA_QUEUE_NAME === "valhalla-transit" ? "routing-transit" : "routing";
-// ─── Valhalla service process manager ─────────────────────────────────────────
-// The valhalla_service HTTP server runs as a child process alongside this
-// BullMQ worker. When a build-valhalla job arrives, we stop the server, rebuild
-// the routing tiles (using the Valhalla tools installed in this container),
-// then restart the server.
+console.log(`[valhalla-worker] Starting tile-builder+router (tile-queue=${VALHALLA_QUEUE_NAME}, routing-queue=${ROUTING_QUEUE_NAME})…`);
-let valhallaProc: ChildProcess | null = null;
+// ─── Tile-builder worker ───────────────────────────────────────────────────────
-function startValhallaService(): void {
- if (!existsSync(VALHALLA_CONFIG)) {
- console.log("[valhalla-worker] No config yet — will start after first tile build");
- return;
- }
- console.log("[valhalla-worker] Starting valhalla_service…");
- // valhalla_service [concurrency] — positional arg, not -c flag
- valhallaProc = spawn("valhalla_service", [VALHALLA_CONFIG], {
- stdio: "inherit",
- });
- valhallaProc.on("exit", (code, signal) => {
- console.log(`[valhalla-worker] valhalla_service exited (code=${code}, signal=${signal})`);
- valhallaProc = null;
- });
-}
-
-function stopValhallaService(): Promise {
- return new Promise((resolve) => {
- if (!valhallaProc) { resolve(); return; }
- const proc = valhallaProc;
- proc.once("exit", () => resolve());
- proc.kill("SIGTERM");
- // Force kill after 10 s if it doesn't exit cleanly
- setTimeout(() => {
- if (valhallaProc === proc) proc.kill("SIGKILL");
- }, 10_000);
- });
-}
-
-// ─── BullMQ worker ────────────────────────────────────────────────────────────
-
-const worker = new Worker(
+const tileWorker = new Worker(
VALHALLA_QUEUE_NAME,
async (job: Job) => {
- console.log(`[valhalla-worker] Processing job ${job.id} type=${job.data.type} city=${job.data.citySlug ?? "(rebuild)"}`);
-
- // Valhalla keeps serving old tiles while the new tiles are being built.
- // restartService is called from inside handleBuildValhalla only after the
- // tile build completes — the service is only down for the few seconds it
- // takes to restart, and compute-routing jobs retry transparently across that
- // window via fetchMatrix's built-in retry logic.
- async function restartService(): Promise {
- await stopValhallaService();
- startValhallaService();
- }
-
+ console.log(`[valhalla-worker] Tile job ${job.id} type=${job.data.type} city=${job.data.citySlug ?? "(rebuild)"}`);
if (job.data.type === "download-gtfs-de") {
await handleDownloadGtfsDe(job as any);
return;
}
- await handleBuildValhalla(job as any, restartService);
+ await handleBuildValhalla(job as any);
},
{
connection: createBullMQConnection(),
concurrency: 1,
- lockDuration: 1_800_000, // 30 min — large-region tile builds can be very slow
+ lockDuration: 1_800_000,
lockRenewTime: 60_000,
maxStalledCount: 3,
},
);
-worker.on("completed", (job) => {
- console.log(`[valhalla-worker] ✓ Job ${job.id} (${job.data.type}) completed`);
-});
+// ─── On-demand routing worker ──────────────────────────────────────────────────
+// Handles isochrone + matrix requests from the web app and pipeline workers.
+// Uses the per-city Actor pool — no HTTP server involved.
-worker.on("failed", (job, err) => {
- console.error(`[valhalla-worker] ✗ Job ${job?.id} (${job?.data?.type}) failed:`, err.message);
-});
+const routingWorker = new Worker(
+ ROUTING_QUEUE_NAME,
+ async (job: Job) => {
+ const { type, citySlug } = job.data as { type: string; citySlug: string };
-worker.on("active", (job) => {
+ if (type === "isochrone") {
+ const { lng, lat, travelMode, contourMinutes, departureDate } = job.data as {
+ lng: number; lat: number; travelMode: string; contourMinutes: number[];
+ departureDate?: string | null;
+ };
+ return actorIsochrone({ lng, lat, travelMode, contourMinutes, citySlug, departureDate });
+ }
+
+ if (type === "matrix") {
+ const { sources, targets, mode } = job.data as {
+ sources: { lat: number; lng: number }[];
+ targets: { lat: number; lng: number }[];
+ mode: "walking" | "cycling" | "driving";
+ };
+ return actorMatrix(sources, targets, mode, citySlug);
+ }
+
+ throw new Error(`Unknown routing job type: ${type}`);
+ },
+ {
+ connection: createBullMQConnection(),
+ concurrency: 8,
+ lockDuration: 60_000,
+ lockRenewTime: 15_000,
+ maxStalledCount: 1,
+ },
+);
+
+// ─── Event handlers ────────────────────────────────────────────────────────────
+
+tileWorker.on("completed", (job) =>
+ console.log(`[valhalla-worker] ✓ Tile job ${job.id} (${job.data.type}) completed`));
+tileWorker.on("failed", (job, err) =>
+ console.error(`[valhalla-worker] ✗ Tile job ${job?.id} (${job?.data?.type}) failed:`, err.message));
+tileWorker.on("active", (job) => {
const city = job.data.citySlug ?? job.data.removeSlugs?.join(",") ?? "rebuild";
- console.log(`[valhalla-worker] → Job ${job.id} (${job.data.type}) started city=${city}`);
+ console.log(`[valhalla-worker] → Tile job ${job.id} (${job.data.type}) city=${city}`);
});
+tileWorker.on("error", (err) => console.error("[valhalla-worker] Tile worker error:", err.message));
-worker.on("error", (err) => {
- console.error("[valhalla-worker] Worker error:", err.message);
-});
+routingWorker.on("failed", (job, err) =>
+ console.warn(`[routing-worker] ✗ Job ${job?.id} (${job?.data?.type}) failed:`, err.message));
+routingWorker.on("error", (err) => console.error("[routing-worker] Worker error:", err.message));
const shutdown = async () => {
console.log("[valhalla-worker] Shutting down gracefully…");
- await worker.close();
- await stopValhallaService();
+ await Promise.all([tileWorker.close(), routingWorker.close()]);
process.exit(0);
};
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
-// Start serving if tiles already exist from a previous run
-startValhallaService();
-
-console.log(`[valhalla-worker] Ready — waiting for jobs on '${VALHALLA_QUEUE_NAME}' queue`);
+console.log(`[valhalla-worker] Ready — tile-builds on '${VALHALLA_QUEUE_NAME}', routing on '${ROUTING_QUEUE_NAME}'`);
diff --git a/worker/src/valhalla.ts b/worker/src/valhalla.ts
index 974fc07..dc51709 100644
--- a/worker/src/valhalla.ts
+++ b/worker/src/valhalla.ts
@@ -1,81 +1,24 @@
-const VALHALLA_URL = process.env.VALHALLA_URL ?? "http://localhost:8002";
-/** Transit instance (port 8003). Falls back to VALHALLA_URL if not set. */
-const VALHALLA_TRANSIT_URL = process.env.VALHALLA_TRANSIT_URL ?? VALHALLA_URL;
+/**
+ * Valhalla Actor helpers — used by valhalla-main.ts routing queue worker.
+ * These functions run in the valhalla containers which have the tile volumes.
+ * The main pipeline worker never imports this file.
+ */
+import { nextTuesdayDeparture } from "@transportationer/shared";
+import { getActor } from "./actor-pool.js";
-const COSTING: Record<"walking" | "cycling" | "driving", string> = {
+const ROAD_COSTING: Record<"walking" | "cycling" | "driving", string> = {
walking: "pedestrian",
cycling: "bicycle",
driving: "auto",
};
-// Standard contour times used for transit isochrones.
-// Must match the scoring thresholds used in compute-scores.
export const TRANSIT_CONTOUR_MINUTES = [5, 10, 15, 20, 30] as const;
-import { nextTuesdayDeparture } from "@transportationer/shared";
-
export interface TransitContour {
minutes: number;
- /** GeoJSON Polygon or MultiPolygon geometry of the reachable area */
geojson: object;
}
-/**
- * Fetch a transit isochrone for a point using Valhalla's multimodal costing.
- * Returns an array of contour polygons sorted from smallest to largest,
- * or null if transit routing fails (e.g. no GTFS data loaded in Valhalla).
- */
-export async function fetchTransitIsochrone(
- source: LatLng,
- departureDate: string,
-): Promise {
- const body = {
- locations: [{ lat: source.lat, lon: source.lng }],
- costing: "multimodal",
- contours: TRANSIT_CONTOUR_MINUTES.map((t) => ({ time: t })),
- polygons: true,
- costing_options: {
- transit: { use_bus: 1.0, use_rail: 1.0, use_transfers: 1.0 },
- },
- date_time: { type: 1, value: departureDate },
- };
-
- let resp: Response;
- try {
- resp = await fetch(`${VALHALLA_TRANSIT_URL}/isochrone`, {
- method: "POST",
- headers: { "Content-Type": "application/json" },
- body: JSON.stringify(body),
- signal: AbortSignal.timeout(30_000),
- });
- } catch {
- return null;
- }
-
- if (!resp.ok) return null;
-
- let data: { features?: Array<{ properties: { contour: number }; geometry: object }>; error?: unknown; error_code?: unknown };
- try {
- data = await resp.json() as typeof data;
- } catch {
- return null;
- }
-
- if (data.error || data.error_code || !Array.isArray(data.features)) return null;
-
- const contours: TransitContour[] = [];
- for (const minutes of TRANSIT_CONTOUR_MINUTES) {
- const feature = data.features.find((f) => f.properties?.contour === minutes);
- if (feature?.geometry) contours.push({ minutes, geojson: feature.geometry });
- }
-
- return contours.length >= 2 ? contours : null;
-}
-
-/**
- * Parse a cached Valhalla isochrone FeatureCollection back into TransitContour[].
- * Mirrors the extraction logic in fetchTransitIsochrone.
- */
export function parseTransitContours(geojson: object): TransitContour[] | null {
const fc = geojson as { features?: Array<{ properties: { contour: number }; geometry: object }> };
if (!Array.isArray(fc.features)) return null;
@@ -101,76 +44,60 @@ interface MatrixResponse {
sources_to_targets: MatrixCell[][];
}
-const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
-
-/** Max attempts for retrying transient Valhalla failures (e.g. service restart). */
-const MATRIX_MAX_ATTEMPTS = 8;
-/** Exponential backoff: delay = min(BASE * 2^attempt, MAX) ms. */
-const MATRIX_RETRY_BASE_MS = 1_000;
-const MATRIX_RETRY_MAX_MS = 15_000;
-/** Per-request timeout — prevents hanging indefinitely if the service is down. */
-const MATRIX_TIMEOUT_MS = 60_000;
-
/**
- * Call Valhalla's sources_to_targets matrix endpoint.
- * Returns an M×N matrix where [i][j] is travel time in seconds
- * from sources[i] to targets[j], or null if unreachable.
- *
- * Retries automatically on connection errors and 5xx responses to survive
- * brief Valhalla service restarts (tile rebuilds). After MATRIX_MAX_ATTEMPTS
- * the last error is rethrown.
+ * Run a sources_to_targets matrix via the city's road Actor.
+ * Returns an M×N matrix of travel times in seconds (null = unreachable).
*/
-export async function fetchMatrix(
+export async function actorMatrix(
sources: LatLng[],
targets: LatLng[],
mode: "walking" | "cycling" | "driving",
+ citySlug: string,
): Promise<(number | null)[][]> {
const body = {
sources: sources.map(({ lat, lng }) => ({ lat, lon: lng, radius: 20 })),
targets: targets.map(({ lat, lng }) => ({ lat, lon: lng, radius: 20 })),
- costing: COSTING[mode],
+ costing: ROAD_COSTING[mode],
};
- const bodyJson = JSON.stringify(body);
-
- let lastErr: unknown;
- for (let attempt = 1; attempt <= MATRIX_MAX_ATTEMPTS; attempt++) {
- try {
- const resp = await fetch(`${VALHALLA_URL}/sources_to_targets`, {
- method: "POST",
- headers: { "Content-Type": "application/json" },
- body: bodyJson,
- signal: AbortSignal.timeout(MATRIX_TIMEOUT_MS),
- });
-
- if (!resp.ok) {
- // 5xx: service may be restarting — retry
- if (resp.status >= 500 && attempt < MATRIX_MAX_ATTEMPTS) {
- const delay = Math.min(MATRIX_RETRY_BASE_MS * 2 ** (attempt - 1), MATRIX_RETRY_MAX_MS);
- await sleep(delay);
- continue;
- }
- const text = await resp.text();
- throw new Error(`Valhalla matrix ${resp.status}: ${text.slice(0, 300)}`);
- }
-
- const data = (await resp.json()) as MatrixResponse;
- return data.sources_to_targets.map((row) => row.map((cell) => cell.time ?? null));
- } catch (err) {
- lastErr = err;
- if (attempt >= MATRIX_MAX_ATTEMPTS) break;
- // TypeError from fetch = network-level failure (ECONNREFUSED, reset, timeout)
- // AbortError = our per-request timeout fired
- // Both are transient during a service restart.
- const isTransient =
- err instanceof TypeError ||
- (err instanceof Error && (err.name === "AbortError" || err.name === "TimeoutError"));
- if (!isTransient) throw err;
- const delay = Math.min(MATRIX_RETRY_BASE_MS * 2 ** (attempt - 1), MATRIX_RETRY_MAX_MS);
- console.warn(
- `[valhalla] fetchMatrix attempt ${attempt}/${MATRIX_MAX_ATTEMPTS} failed (${(err as Error).message}) — retrying in ${delay / 1000}s…`,
- );
- await sleep(delay);
- }
- }
- throw lastErr;
+ const actor = await getActor("road", citySlug);
+ // Actor.matrix() is async and returns Promise
+ const resultStr = await actor.matrix(JSON.stringify(body));
+ const data = JSON.parse(resultStr) as MatrixResponse;
+ return data.sources_to_targets.map((row) => row.map((cell) => cell.time ?? null));
+}
+
+/**
+ * Compute an isochrone via the city's Actor (road or transit based on travelMode).
+ * Returns a GeoJSON FeatureCollection.
+ */
+export async function actorIsochrone(opts: {
+ lng: number;
+ lat: number;
+ travelMode: string;
+ contourMinutes: number[];
+ citySlug: string;
+ departureDate?: string | null;
+}): Promise {
+ const { lng, lat, travelMode, contourMinutes, citySlug, departureDate } = opts;
+ const isTransit = travelMode === "transit";
+ const costing = isTransit
+ ? "multimodal"
+ : (ROAD_COSTING as Record)[travelMode] ?? "pedestrian";
+
+ const body: Record = {
+ locations: [{ lon: lng, lat }],
+ costing,
+ contours: contourMinutes.map((time) => ({ time })),
+ polygons: true,
+ show_locations: false,
+ };
+ if (isTransit) {
+ body.costing_options = { transit: { use_bus: 1.0, use_rail: 1.0, use_transfers: 1.0 } };
+ body.date_time = { type: 1, value: departureDate ?? nextTuesdayDeparture() };
+ }
+
+ const actor = await getActor(isTransit ? "transit" : "road", citySlug);
+ // Actor.isochrone() is async and returns Promise
+ const resultStr = await actor.isochrone(JSON.stringify(body));
+ return JSON.parse(resultStr);
}