import type { Job } from "bullmq"; import { FlowProducer } from "bullmq"; import { createBullMQConnection } from "../redis.js"; import { getSql } from "../db.js"; import { JOB_OPTIONS, VALID_THRESHOLDS } from "@transportationer/shared"; import type { JobProgress } from "@transportationer/shared"; export type RefreshCityData = { type: "refresh-city"; citySlug: string; geofabrikUrl: string; resolutionM?: number; /** Set after flow.add() — the ID of the enqueued compute-scores job. */ computeScoresJobId?: string; }; const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm"; /** True when the given bbox intersects Niedersachsen. */ function isInNiedersachsen(minLng: number, minLat: number, maxLng: number, maxLat: number): boolean { return minLng < 11.779 && maxLng > 6.526 && minLat < 54.033 && maxLat > 51.197; } export async function handleRefreshCity( job: Job, ): Promise { const { citySlug, geofabrikUrl, resolutionM = 200 } = job.data; const sql = getSql(); const pbfPath = `${OSM_DATA_DIR}/${citySlug}-latest.osm.pbf`; // Read the user-specified bbox from the database (set at city creation time). // If present, it will be passed to extract-pois to clip the PBF before import. // Also read ALL city bboxes for the GTFS filter: each city gets its own bbox // (with a small buffer) so valhalla_ingest_transit only processes relevant stops. const [bboxRows, allCityBboxRows] = await Promise.all([ Promise.resolve(sql<{ minlng: number; minlat: number; maxlng: number; maxlat: number; }[]>` SELECT ST_XMin(bbox)::float AS minlng, ST_YMin(bbox)::float AS minlat, ST_XMax(bbox)::float AS maxlng, ST_YMax(bbox)::float AS maxlat FROM cities WHERE slug = ${citySlug} AND bbox IS NOT NULL `), Promise.resolve(sql<{ minlng: number; minlat: number; maxlng: number; maxlat: number; }[]>` SELECT ST_XMin(bbox)::float AS minlng, ST_YMin(bbox)::float AS minlat, ST_XMax(bbox)::float AS maxlng, ST_YMax(bbox)::float AS maxlat FROM cities WHERE bbox IS NOT NULL `), ]); const bbox: [number, number, number, number] | undefined = bboxRows.length > 0 ? [bboxRows[0].minlng, bboxRows[0].minlat, bboxRows[0].maxlng, bboxRows[0].maxlat] : undefined; // ~10 km buffer for GTFS stop coverage near city edges (0.09° ≈ 10 km) const GTFS_BUFFER = 0.09; const gtfsBboxes: [number, number, number, number][] = allCityBboxRows.map((r) => [ r.minlng - GTFS_BUFFER, r.minlat - GTFS_BUFFER, r.maxlng + GTFS_BUFFER, r.maxlat + GTFS_BUFFER, ]); await job.updateProgress({ stage: "Orchestrating pipeline", pct: 0, message: `Starting full ingest for ${citySlug}`, } satisfies JobProgress); // Mark city as processing await Promise.resolve(sql` UPDATE cities SET status = 'processing' WHERE slug = ${citySlug} `); // Shared download node factory — produces an idempotent download-pbf node. // Two independent nodes with the same geofabrikUrl are safe: the idempotency // check in download-pbf.ts skips the download if the file already exists. const downloadNode = () => ({ name: "download-pbf", queueName: "pipeline", data: { type: "download-pbf" as const, citySlug, geofabrikUrl }, opts: JOB_OPTIONS["download-pbf"], }); // For NI cities: ingest-boris-ni is dispatched in Phase 1 of compute-scores. const niApplicable = !!(bbox && isInNiedersachsen(...bbox)); // Parallel pipeline DAG (bottom-up — leaves execute first): // // download-pbf ──────┬─→ extract-pois ────────────────────┐ // │ ├─→ generate-grid → compute-scores // download-pbf ──┐ └─→ build-valhalla ──────────────────┘ // └──→ build-valhalla (waits for both ↑) // download-gtfs-de ──┘ // // compute-scores Phase 1 also dispatches ingest-boris-ni (NI cities only) // as a child alongside the routing jobs, so it runs during routing. const rootNode = { name: "compute-scores", queueName: "pipeline", data: { type: "compute-scores" as const, citySlug, modes: ["walking", "cycling", "driving", "transit"] as const, thresholds: [...VALID_THRESHOLDS], ingestBorisNi: niApplicable, }, opts: JOB_OPTIONS["compute-scores"], children: [ { name: "generate-grid", queueName: "pipeline", data: { type: "generate-grid" as const, citySlug, resolutionM }, opts: JOB_OPTIONS["generate-grid"], children: [ // extract-pois and build-valhalla run in parallel — each gets its // own download-pbf child; the idempotency guard ensures only one // actually downloads when they race. { name: "extract-pois", queueName: "pipeline", data: { type: "extract-pois" as const, citySlug, pbfPath, ...(bbox ? { bbox } : {}), }, opts: JOB_OPTIONS["extract-pois"], children: [downloadNode()], }, { name: "build-valhalla", queueName: "valhalla", data: { type: "build-valhalla" as const, citySlug, pbfPath, ...(bbox ? { bbox } : {}), }, opts: JOB_OPTIONS["build-valhalla"], children: [ downloadNode(), // Download GTFS feed before building tiles so valhalla_build_transit // runs during this build. The job is idempotent — it skips immediately // if the feed is already present, so subsequent refreshes are cheap. { name: "download-gtfs-de", queueName: "valhalla", data: { type: "download-gtfs-de" as const, url: "https://download.gtfs.de/germany/nv_free/latest.zip", // Per-city bboxes (with ~10 km buffer) so valhalla_ingest_transit // only processes stops/trips relevant to the known cities. ...(gtfsBboxes.length > 0 ? { bboxes: gtfsBboxes } : {}), }, opts: JOB_OPTIONS["download-gtfs-de"], }, ], }, ], }, ], }; const flow = new FlowProducer({ connection: createBullMQConnection() }); let computeScoresJobId: string | undefined; try { const jobNode = await flow.add(rootNode); // jobNode.job is the root (compute-scores) job. Store its ID in this // refresh-city job's data so the SSE stream can match by exact job ID // rather than by citySlug (which would match stale completed jobs). computeScoresJobId = jobNode.job.id; } finally { await flow.close(); } if (computeScoresJobId) { await job.updateData({ ...job.data, computeScoresJobId }); } await job.updateProgress({ stage: "Orchestrating pipeline", pct: 100, message: `All pipeline jobs enqueued${niApplicable ? " (includes BORIS NI, parallel with routing)" : ""}. Processing will begin shortly.`, } satisfies JobProgress); }