import { cpus } from "os"; import type { Job } from "bullmq"; import { execSync, spawn } from "child_process"; import { existsSync, mkdirSync, readFileSync, readdirSync, rmSync, statSync, unlinkSync, writeFileSync } from "fs"; import type { JobProgress } from "@transportationer/shared"; import { invalidateActor } from "../actor-pool.js"; export type BuildValhallaData = { type: "build-valhalla"; /** City being added/updated. Absent for removal-only rebuilds. */ citySlug?: string; pbfPath?: string; bbox?: [number, number, number, number]; /** Slugs to drop from the routing tile set before rebuilding. */ removeSlugs?: string[]; }; const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm"; const VALHALLA_DATA_DIR = process.env.VALHALLA_DATA_DIR ?? "/data/valhalla"; /** * Per-city tile directories live here: VALHALLA_TILES_BASE/{citySlug}/ * Each valhalla container mounts its own Docker volume at /data/valhalla, * so both road and transit containers use the same env var with different data. */ const VALHALLA_TILES_BASE = process.env.VALHALLA_TILES_BASE ?? `${VALHALLA_DATA_DIR}/tiles`; const GTFS_DATA_DIR = process.env.GTFS_DATA_DIR ?? `${VALHALLA_DATA_DIR}/gtfs`; const TIMEZONE_SQLITE = `${VALHALLA_DATA_DIR}/timezone.sqlite`; const ADMINS_SQLITE = `${VALHALLA_DATA_DIR}/admins.sqlite`; /** Per-city GTFS feed dir written by download-gtfs (contains .txt files). */ function cityGtfsFeedDir(citySlug: string): string { return `${GTFS_DATA_DIR}/${citySlug}/feed`; } /** * Parent directory passed as transit_feeds_dir to Valhalla. * Valhalla expects transit_feeds_dir to contain subdirectories — one per GTFS feed. * Our structure: .../gtfs/{citySlug}/feed/{*.txt}, so the parent is .../gtfs/{citySlug}. */ function cityGtfsFeedsParentDir(citySlug: string): string { return `${GTFS_DATA_DIR}/${citySlug}`; } /** * Per-city transit staging directory. * valhalla_ingest_transit writes PBF staging tiles here. * valhalla_convert_transit converts them to transit graph tiles (.gph) here. * valhalla_build_tiles reads from here when INCLUDE_TRANSIT=true. */ function cityTransitCacheDir(citySlug: string): string { return `${VALHALLA_DATA_DIR}/transit_graph/${citySlug}`; } /** * Set VALHALLA_INCLUDE_TRANSIT=true in the transit container. * The road container leaves it unset so tiles are clean (no ghost transit edges). */ const INCLUDE_TRANSIT = (process.env.VALHALLA_INCLUDE_TRANSIT ?? "").toLowerCase() === "true"; const ROAD_BBOX_BUFFER = 0.2; /** * Cap Valhalla worker thread count. Defaults to CPU count which can be 256+ * on large hosts — causes OOM for cities with large transit datasets (Berlin etc.). * Override with VALHALLA_CONCURRENCY env var. */ const VALHALLA_CONCURRENCY = parseInt(process.env.VALHALLA_CONCURRENCY ?? String(Math.max(1, Math.floor(cpus().length / 2))), 10); const ROUTING_MANIFEST = `${VALHALLA_DATA_DIR}/routing-sources.json`; function readManifest(): Record { try { return JSON.parse(readFileSync(ROUTING_MANIFEST, "utf8")) as Record; } catch { return {}; } } function writeManifest(manifest: Record): void { writeFileSync(ROUTING_MANIFEST, JSON.stringify(manifest, null, 2)); } function runProcess(cmd: string, args: string[]): Promise { return new Promise((resolve, reject) => { console.log(`[build-valhalla] Running: ${cmd} ${args.join(" ")}`); const child = spawn(cmd, args, { stdio: "inherit" }); child.on("error", reject); child.on("exit", (code, signal) => { if (code === 0) resolve(); else reject(new Error(`${cmd} exited with code ${code}${signal ? ` (signal: ${signal})` : ""}`)); }); }); } class ProcessError extends Error { constructor(message: string, public readonly lines: string[], public readonly signal: string | null) { super(message); } } /** Like runProcess but captures stdout+stderr (still printed) for error analysis. */ function runProcessCapture(cmd: string, args: string[]): Promise { return new Promise((resolve, reject) => { console.log(`[build-valhalla] Running: ${cmd} ${args.join(" ")}`); const child = spawn(cmd, args, { stdio: ["ignore", "pipe", "pipe"] }); const lines: string[] = []; const onData = (d: Buffer) => { process.stdout.write(d); lines.push(...d.toString().split(/\r?\n/).filter((l) => l.length > 0)); }; child.stdout!.on("data", onData); child.stderr!.on("data", onData); child.on("error", reject); child.on("exit", (code, signal) => { if (code === 0) resolve(); else reject(new ProcessError( `${cmd} exited with code ${code}${signal ? ` (signal: ${signal})` : ""}`, lines, signal, )); }); }); } // ─── Transit stop quarantine ─────────────────────────────────────────────────── /** Path to the quarantine file (outside feed/ so it survives re-filters). */ function quarantineFilePath(citySlug: string): string { return `${GTFS_DATA_DIR}/${citySlug}/.quarantine`; } function readQuarantine(citySlug: string): [number, number][] { const p = quarantineFilePath(citySlug); if (!existsSync(p)) return []; try { return JSON.parse(readFileSync(p, "utf8")) as [number, number][]; } catch { return []; } } /** Append new bad coordinates to the quarantine file. */ function appendQuarantine(citySlug: string, badCoords: [number, number][]): void { const merged = [...readQuarantine(citySlug), ...badCoords]; writeFileSync(quarantineFilePath(citySlug), JSON.stringify(merged)); console.log(`[build-valhalla] Quarantine updated for ${citySlug}: ${merged.length} stop(s) total`); } /** * Remove quarantined stops from the city's filtered GTFS feed (stops.txt + stop_times.txt). * Called before valhalla_ingest_transit so the bad stops never enter the transit graph. */ function applyQuarantine(citySlug: string): void { const badCoords = readQuarantine(citySlug); if (badCoords.length === 0) return; const feedDir = cityGtfsFeedDir(citySlug); const stopsPath = `${feedDir}/stops.txt`; if (!existsSync(stopsPath)) return; const stopLines = readFileSync(stopsPath, "utf8").split(/\r?\n/).filter((l) => l.trim()); if (stopLines.length < 2) return; const headers = stopLines[0].split(",").map((h) => h.trim().replace(/^\uFEFF/, "")); const stopIdCol = headers.indexOf("stop_id"); const latCol = headers.indexOf("stop_lat"); const lonCol = headers.indexOf("stop_lon"); if (stopIdCol < 0 || latCol < 0 || lonCol < 0) return; const removedIds = new Set(); const keptLines = [stopLines[0]]; for (let i = 1; i < stopLines.length; i++) { const fields = stopLines[i].split(","); const lat = parseFloat(fields[latCol] ?? ""); const lon = parseFloat(fields[lonCol] ?? ""); if (badCoords.some(([bLat, bLon]) => Math.abs(lat - bLat) < 1e-5 && Math.abs(lon - bLon) < 1e-5)) { removedIds.add(fields[stopIdCol] ?? ""); } else { keptLines.push(stopLines[i]); } } if (removedIds.size === 0) return; console.log(`[build-valhalla] Quarantine: removing stop(s) ${[...removedIds].join(", ")} from feed`); writeFileSync(stopsPath, keptLines.join("\n") + "\n"); const stPath = `${feedDir}/stop_times.txt`; if (!existsSync(stPath)) return; const stLines = readFileSync(stPath, "utf8").split(/\r?\n/).filter((l) => l.trim()); if (stLines.length < 2) return; const stHeaders = stLines[0].split(",").map((h) => h.trim()); const stStopCol = stHeaders.indexOf("stop_id"); if (stStopCol < 0) return; const stOut = [stLines[0]]; for (let i = 1; i < stLines.length; i++) { if (!removedIds.has(stLines[i].split(",")[stStopCol] ?? "")) stOut.push(stLines[i]); } writeFileSync(stPath, stOut.join("\n") + "\n"); } function buildTimezoneDb(): Promise { return new Promise((resolve, reject) => { console.log("[build-valhalla] Running: valhalla_build_timezones → " + TIMEZONE_SQLITE); const child = spawn("valhalla_build_timezones", [], { stdio: ["ignore", "pipe", "inherit"] }); const chunks: Buffer[] = []; child.stdout!.on("data", (chunk: Buffer) => chunks.push(chunk)); child.on("error", reject); child.on("exit", (code) => { if (code !== 0) { reject(new Error(`valhalla_build_timezones exited ${code}`)); return; } const db = Buffer.concat(chunks); if (db.length < 1024) { reject(new Error(`valhalla_build_timezones output too small`)); return; } writeFileSync(TIMEZONE_SQLITE, db); console.log(`[build-valhalla] Timezone database written (${(db.length / 1024 / 1024).toFixed(1)} MB)`); resolve(); }); }); } type JsonObject = Record; function deepMerge(base: JsonObject, override: JsonObject): JsonObject { const result: JsonObject = { ...base }; for (const [key, val] of Object.entries(override)) { const baseVal = base[key]; if (val !== null && typeof val === "object" && !Array.isArray(val) && baseVal !== null && typeof baseVal === "object" && !Array.isArray(baseVal)) { result[key] = deepMerge(baseVal as JsonObject, val as JsonObject); } else { result[key] = val; } } return result; } function buildBase(): JsonObject { try { const out = execSync("valhalla_build_config", { encoding: "utf8", maxBuffer: 10 * 1024 * 1024 }); return JSON.parse(out) as JsonObject; } catch (err) { console.warn("[build-valhalla] valhalla_build_config failed, using empty base:", err); return {}; } } /** * Write a per-city config for transit ingest + convert operations. * These tools only read/write from transit_dir/transit_feeds_dir; * tile_dir is a dummy that won't receive any road tiles. */ function writeCityTransitIngestConfig(citySlug: string): string { const configPath = `${VALHALLA_DATA_DIR}/transit-ingest-config-${citySlug}.json`; const dummyTileDir = `${VALHALLA_DATA_DIR}/tiles/_transit_ingest_tmp_${citySlug}`; mkdirSync(dummyTileDir, { recursive: true }); const config = deepMerge(buildBase(), { mjolnir: { tile_dir: dummyTileDir, timezone: TIMEZONE_SQLITE, admin: ADMINS_SQLITE, transit_dir: cityTransitCacheDir(citySlug), transit_feeds_dir: cityGtfsFeedsParentDir(citySlug), concurrency: VALHALLA_CONCURRENCY, }, service_limits: { isochrone: { max_contours: 5 } }, } as JsonObject); writeFileSync(configPath, JSON.stringify(config, null, 2)); return configPath; } /** * Write a per-city config for valhalla_build_tiles. * Road builds omit transit_dir so no transit edges are embedded. * Transit builds include transit_dir so road↔transit connections are added. */ function writeCityConfig(citySlug: string, cityTileDir: string): string { mkdirSync(cityTileDir, { recursive: true }); const config = deepMerge(buildBase(), { mjolnir: { tile_dir: cityTileDir, tile_extract: `${cityTileDir}.tar`, timezone: TIMEZONE_SQLITE, admin: ADMINS_SQLITE, concurrency: VALHALLA_CONCURRENCY, ...(INCLUDE_TRANSIT ? { transit_dir: cityTransitCacheDir(citySlug), transit_feeds_dir: cityGtfsFeedsParentDir(citySlug), } : {}), }, additional_data: { elevation: "/data/elevation/" }, service_limits: { isochrone: { max_contours: 5 } }, } as JsonObject); const configPath = `${cityTileDir}/config.json`; writeFileSync(configPath, JSON.stringify(config, null, 2)); return configPath; } function isTransitIngestFresh(citySlug: string): boolean { const transitCacheDir = cityTransitCacheDir(citySlug); const transitCacheMarker = `${transitCacheDir}/.ready`; const gtfsCityMarker = `${cityGtfsFeedDir(citySlug)}/.source`; if (!existsSync(transitCacheMarker) || !existsSync(gtfsCityMarker)) return false; const level3Dir = `${transitCacheDir}/3`; if (!existsSync(level3Dir)) return false; return statSync(transitCacheMarker).mtimeMs >= statSync(gtfsCityMarker).mtimeMs; } /** Build per-city tiles. Invalidates Actor pool entry before and writes .ready after. */ async function buildCityTiles(citySlug: string, pbfPath: string): Promise { const cityTileDir = `${VALHALLA_TILES_BASE}/${citySlug}`; const readyMarker = `${cityTileDir}/.ready`; // Signal Actor pool: tiles are being rebuilt. if (existsSync(readyMarker)) unlinkSync(readyMarker); invalidateActor(INCLUDE_TRANSIT ? "transit" : "road", citySlug); // ── Transit ingest + convert (transit container only) ───────────────────── if (INCLUDE_TRANSIT) { const feedDir = cityGtfsFeedDir(citySlug); const gtfsReady = existsSync(feedDir) && readdirSync(feedDir).some((f) => f.endsWith(".txt")); if (gtfsReady) { if (!existsSync(TIMEZONE_SQLITE)) { console.log("[build-valhalla] Building timezone database…"); try { await buildTimezoneDb(); } catch (err) { console.warn("[build-valhalla] valhalla_build_timezones failed:", err); } } const transitCacheDir = cityTransitCacheDir(citySlug); const transitCacheMarker = `${transitCacheDir}/.ready`; if (!isTransitIngestFresh(citySlug) && existsSync(TIMEZONE_SQLITE)) { console.log(`[build-valhalla] Ingesting GTFS transit feeds for ${citySlug}…`); try { rmSync(transitCacheDir, { recursive: true, force: true }); mkdirSync(transitCacheDir, { recursive: true }); const cfg = writeCityTransitIngestConfig(citySlug); await runProcess("valhalla_ingest_transit", ["-c", cfg]); writeFileSync(transitCacheMarker, new Date().toISOString()); console.log(`[build-valhalla] valhalla_ingest_transit completed for ${citySlug}`); } catch (err) { console.warn(`[build-valhalla] valhalla_ingest_transit failed for ${citySlug}:`, err); rmSync(transitCacheDir, { recursive: true, force: true }); mkdirSync(transitCacheDir, { recursive: true }); } } else if (isTransitIngestFresh(citySlug)) { console.log(`[build-valhalla] Transit ingest cache fresh for ${citySlug} — skipping re-ingest`); } console.log(`[build-valhalla] Converting transit staging tiles for ${citySlug}…`); try { const cfg = writeCityTransitIngestConfig(citySlug); await runProcess("valhalla_convert_transit", ["-c", cfg]); console.log(`[build-valhalla] valhalla_convert_transit completed for ${citySlug}`); } catch (err) { console.warn(`[build-valhalla] valhalla_convert_transit failed for ${citySlug}:`, err); } } else { console.log(`[build-valhalla] No GTFS feed found for ${citySlug} — skipping transit ingest/convert`); } } const configPath = writeCityConfig(citySlug, cityTileDir); await runProcess("valhalla_build_tiles", ["-c", configPath, pbfPath]); writeFileSync(readyMarker, new Date().toISOString()); console.log(`[build-valhalla] Tiles ready for ${citySlug} → ${cityTileDir}`); } export async function handleBuildValhalla(job: Job): Promise { const { citySlug, pbfPath, bbox, removeSlugs = [] } = job.data; mkdirSync(VALHALLA_TILES_BASE, { recursive: true }); // ── Step 1: manifest + removals ─────────────────────────────────────────── const manifest = readManifest(); for (const slug of removeSlugs) { const clippedPbf = `${VALHALLA_DATA_DIR}/${slug}-routing.osm.pbf`; if (existsSync(clippedPbf)) { unlinkSync(clippedPbf); console.log(`[build-valhalla] Removed clipped PBF for ${slug}`); } const cityTileDir = `${VALHALLA_TILES_BASE}/${slug}`; if (existsSync(cityTileDir)) { rmSync(cityTileDir, { recursive: true, force: true }); console.log(`[build-valhalla] Removed tile dir for ${slug}`); } invalidateActor(INCLUDE_TRANSIT ? "transit" : "road", slug); delete manifest[slug]; } if (citySlug && pbfPath) { await job.updateProgress({ stage: "Building routing graph", pct: 5, message: bbox ? `Clipping PBF to city bbox…` : `Registering full PBF for ${citySlug}…`, } satisfies JobProgress); let routingPbf: string; if (bbox) { const clippedPbf = `${VALHALLA_DATA_DIR}/${citySlug}-routing.osm.pbf`; if (!existsSync(pbfPath)) throw new Error(`PBF file not found: ${pbfPath}`); let eb: [number, number, number, number] = [bbox[0] - ROAD_BBOX_BUFFER, bbox[1] - ROAD_BBOX_BUFFER, bbox[2] + ROAD_BBOX_BUFFER, bbox[3] + ROAD_BBOX_BUFFER]; // Transit container: expand road clip to cover transit stop locations. // Stops outside road tile coverage cause valhalla_build_tiles to crash // with "double free or corruption (fasttop)" when it tries to embed // road↔transit connection edges for stops with no nearby road data. // The bbox of seeding stops is written by download-gtfs per city. if (INCLUDE_TRANSIT && citySlug) { const stopsBboxPath = `${cityGtfsFeedDir(citySlug)}/.stops_bbox`; if (existsSync(stopsBboxPath)) { try { const [sMinLng, sMinLat, sMaxLng, sMaxLat] = JSON.parse(readFileSync(stopsBboxPath, "utf8")) as [number, number, number, number]; const before = eb.map((v) => v.toFixed(3)).join(", "); eb = [Math.min(eb[0], sMinLng), Math.min(eb[1], sMinLat), Math.max(eb[2], sMaxLng), Math.max(eb[3], sMaxLat)]; console.log(`[build-valhalla] Transit stops bbox expansion: [${before}] → [${eb.map((v) => v.toFixed(3)).join(", ")}]`); } catch (e) { console.warn("[build-valhalla] Could not read .stops_bbox — using default buffer:", e); } } else { console.warn("[build-valhalla] .stops_bbox not found — GTFS may not be downloaded yet, or stops are outside bbox"); } } await runProcess("osmium", ["extract", `--bbox=${eb[0]},${eb[1]},${eb[2]},${eb[3]}`, pbfPath, "-o", clippedPbf, "--overwrite"]); routingPbf = clippedPbf; } else if (existsSync(pbfPath)) { routingPbf = pbfPath; } else { const found = readdirSync(OSM_DATA_DIR).filter((f) => f.endsWith("-latest.osm.pbf")).map((f) => `${OSM_DATA_DIR}/${f}`); if (found.length === 0) throw new Error(`No PBF files found in ${OSM_DATA_DIR}`); routingPbf = found[0]; } manifest[citySlug] = routingPbf; } writeManifest(manifest); // ── Step 2: cities to build ─────────────────────────────────────────────── const citiesForBuild: { slug: string; pbf: string }[] = []; if (citySlug && manifest[citySlug] && existsSync(manifest[citySlug])) { citiesForBuild.push({ slug: citySlug, pbf: manifest[citySlug] }); } else if (!citySlug) { for (const [slug, pbf] of Object.entries(manifest)) { if (existsSync(pbf)) citiesForBuild.push({ slug, pbf }); } } if (citiesForBuild.length === 0) { await job.updateProgress({ stage: "Building routing graph", pct: 100, message: "No cities in manifest, skipping." } satisfies JobProgress); return; } for (let i = 0; i < citiesForBuild.length; i++) { const { slug, pbf } = citiesForBuild[i]; const pct = Math.round(20 + (i / citiesForBuild.length) * 75); await job.updateProgress({ stage: "Building routing graph", pct, message: `Building ${INCLUDE_TRANSIT ? "transit" : "road"} tiles for ${slug} (${i + 1}/${citiesForBuild.length})…`, } satisfies JobProgress); await buildCityTiles(slug, pbf); } await job.updateProgress({ stage: "Building routing graph", pct: 100, message: `Routing graph ready — covers: ${citiesForBuild.map((c) => c.slug).join(", ")}`, } satisfies JobProgress); }