230 lines
9.2 KiB
TypeScript
230 lines
9.2 KiB
TypeScript
import type { Job } from "bullmq";
|
||
import { FlowProducer } from "bullmq";
|
||
import { createBullMQConnection } from "../redis.js";
|
||
import { getSql } from "../db.js";
|
||
import { JOB_OPTIONS, VALID_THRESHOLDS } from "@transportationer/shared";
|
||
import type { JobProgress } from "@transportationer/shared";
|
||
|
||
export type RefreshCityData = {
|
||
type: "refresh-city";
|
||
citySlug: string;
|
||
geofabrikUrl: string;
|
||
resolutionM?: number;
|
||
/** Set after flow.add() — the ID of the enqueued compute-scores job. */
|
||
computeScoresJobId?: string;
|
||
};
|
||
|
||
const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm";
|
||
|
||
/** True when the given bbox intersects Niedersachsen. */
|
||
function isInNiedersachsen(minLng: number, minLat: number, maxLng: number, maxLat: number): boolean {
|
||
return minLng < 11.779 && maxLng > 6.526 && minLat < 54.033 && maxLat > 51.197;
|
||
}
|
||
|
||
/**
|
||
* True when the given bbox intersects the Freie Hansestadt Bremen (HB), which
|
||
* consists of two non-contiguous cities:
|
||
* - Stadt Bremen: ~52.95–53.23°N, 8.48–8.99°E
|
||
* - Bremerhaven: ~53.46–53.65°N, 8.48–8.68°E
|
||
* The WFS BBOX filter ensures each city only receives its own zones — no overlap.
|
||
*/
|
||
function isInBremen(minLng: number, minLat: number, maxLng: number, maxLat: number): boolean {
|
||
return minLng < 8.99 && maxLng > 8.48 && minLat < 53.65 && maxLat > 52.95;
|
||
}
|
||
|
||
export async function handleRefreshCity(
|
||
job: Job<RefreshCityData>,
|
||
): Promise<void> {
|
||
const { citySlug, geofabrikUrl, resolutionM = 200 } = job.data;
|
||
const sql = getSql();
|
||
|
||
const pbfPath = `${OSM_DATA_DIR}/${citySlug}-latest.osm.pbf`;
|
||
|
||
// Read the user-specified bbox from the database (set at city creation time).
|
||
// If present, it will be passed to extract-pois to clip the PBF before import.
|
||
// Also read ALL city bboxes for the GTFS filter: each city gets its own bbox
|
||
// (with a small buffer) so valhalla_ingest_transit only processes relevant stops.
|
||
const [bboxRows, allCityBboxRows] = await Promise.all([
|
||
Promise.resolve(sql<{
|
||
minlng: number; minlat: number; maxlng: number; maxlat: number;
|
||
}[]>`
|
||
SELECT
|
||
ST_XMin(bbox)::float AS minlng,
|
||
ST_YMin(bbox)::float AS minlat,
|
||
ST_XMax(bbox)::float AS maxlng,
|
||
ST_YMax(bbox)::float AS maxlat
|
||
FROM cities WHERE slug = ${citySlug} AND bbox IS NOT NULL
|
||
`),
|
||
Promise.resolve(sql<{
|
||
minlng: number; minlat: number; maxlng: number; maxlat: number;
|
||
}[]>`
|
||
SELECT
|
||
ST_XMin(bbox)::float AS minlng,
|
||
ST_YMin(bbox)::float AS minlat,
|
||
ST_XMax(bbox)::float AS maxlng,
|
||
ST_YMax(bbox)::float AS maxlat
|
||
FROM cities WHERE bbox IS NOT NULL
|
||
`),
|
||
]);
|
||
|
||
const bbox: [number, number, number, number] | undefined =
|
||
bboxRows.length > 0
|
||
? [bboxRows[0].minlng, bboxRows[0].minlat, bboxRows[0].maxlng, bboxRows[0].maxlat]
|
||
: undefined;
|
||
|
||
// ~10 km buffer for GTFS stop coverage near city edges (0.09° ≈ 10 km)
|
||
const GTFS_BUFFER = 0.09;
|
||
const gtfsBboxes: [number, number, number, number][] = allCityBboxRows.map((r) => [
|
||
r.minlng - GTFS_BUFFER,
|
||
r.minlat - GTFS_BUFFER,
|
||
r.maxlng + GTFS_BUFFER,
|
||
r.maxlat + GTFS_BUFFER,
|
||
]);
|
||
|
||
await job.updateProgress({
|
||
stage: "Orchestrating pipeline",
|
||
pct: 0,
|
||
message: `Starting full ingest for ${citySlug}`,
|
||
} satisfies JobProgress);
|
||
|
||
// Mark city as processing
|
||
await Promise.resolve(sql`
|
||
UPDATE cities SET status = 'processing' WHERE slug = ${citySlug}
|
||
`);
|
||
|
||
// Shared download node factory — each call produces an independent download-pbf
|
||
// job (no fixed jobId). Three branches each get their own job so BullMQ can
|
||
// track each parent→child dependency correctly; the handler skips the actual
|
||
// download if the file already exists (atomic rename + idempotency check).
|
||
const downloadNode = () => ({
|
||
name: "download-pbf",
|
||
queueName: "download",
|
||
data: { type: "download-pbf" as const, citySlug, geofabrikUrl },
|
||
opts: { ...JOB_OPTIONS["download-pbf"] },
|
||
});
|
||
|
||
// For NI cities: ingest-boris-ni is dispatched in Phase 1 of compute-scores.
|
||
const niApplicable = !!(bbox && isInNiedersachsen(...bbox));
|
||
// For HB cities (Stadt Bremen + Bremerhaven): ingest-boris-hb runs in Phase 1.
|
||
const hbApplicable = !!(bbox && isInBremen(...bbox));
|
||
|
||
// Parallel pipeline DAG (bottom-up — leaves execute first):
|
||
//
|
||
// download-pbf ──────┬─→ extract-pois ──────────────────────────┐
|
||
// │ ├─→ generate-grid → compute-scores
|
||
// download-pbf ──────┴─→ build-valhalla (road, "valhalla") ┤
|
||
// │
|
||
// download-pbf ──┐ │
|
||
// └─→ build-valhalla (transit, "valhalla-transit")┘
|
||
// download-gtfs-de ──┘
|
||
//
|
||
// Road tiles are built without GTFS (clean, no transit connections → cycling works).
|
||
// Transit tiles are built with GTFS (multimodal routing on the separate instance).
|
||
// generate-grid waits for all three parallel branches before compute-scores runs.
|
||
//
|
||
// compute-scores Phase 1 also dispatches ingest-boris-ni (NI cities only)
|
||
// as a child alongside the routing jobs, so it runs during routing.
|
||
const rootNode = {
|
||
name: "compute-scores",
|
||
queueName: "pipeline",
|
||
data: {
|
||
type: "compute-scores" as const,
|
||
citySlug,
|
||
modes: ["walking", "cycling", "driving", "transit"] as const,
|
||
thresholds: [...VALID_THRESHOLDS],
|
||
ingestBorisNi: niApplicable,
|
||
ingestBorisHb: hbApplicable,
|
||
},
|
||
opts: { ...JOB_OPTIONS["compute-scores"], jobId: `compute-scores.${citySlug}` },
|
||
children: [
|
||
{
|
||
name: "generate-grid",
|
||
queueName: "pipeline",
|
||
data: { type: "generate-grid" as const, citySlug, resolutionM },
|
||
opts: { ...JOB_OPTIONS["generate-grid"], jobId: `generate-grid.${citySlug}` },
|
||
children: [
|
||
// Three parallel branches — all share a single download-pbf job via
|
||
// the deterministic jobId; BullMQ deduplicates them to one download.
|
||
{
|
||
name: "extract-pois",
|
||
queueName: "pipeline",
|
||
data: {
|
||
type: "extract-pois" as const,
|
||
citySlug,
|
||
pbfPath,
|
||
...(bbox ? { bbox } : {}),
|
||
},
|
||
opts: { ...JOB_OPTIONS["extract-pois"], jobId: `extract-pois.${citySlug}` },
|
||
children: [downloadNode()],
|
||
},
|
||
// Road-only Valhalla build — no GTFS, produces clean tiles without
|
||
// transit connections. Cycling/walking/driving route from this instance.
|
||
{
|
||
name: "build-valhalla",
|
||
queueName: "valhalla",
|
||
data: {
|
||
type: "build-valhalla" as const,
|
||
citySlug,
|
||
pbfPath,
|
||
...(bbox ? { bbox } : {}),
|
||
},
|
||
opts: { ...JOB_OPTIONS["build-valhalla"], jobId: `build-valhalla.${citySlug}` },
|
||
children: [downloadNode()],
|
||
},
|
||
// Transit Valhalla build — depends on GTFS download. Produces tiles with
|
||
// road↔transit connections. Multimodal routing comes from this instance.
|
||
{
|
||
name: "build-valhalla",
|
||
queueName: "valhalla-transit",
|
||
data: {
|
||
type: "build-valhalla" as const,
|
||
citySlug,
|
||
pbfPath,
|
||
...(bbox ? { bbox } : {}),
|
||
},
|
||
opts: { ...JOB_OPTIONS["build-valhalla"], jobId: `build-valhalla-transit.${citySlug}` },
|
||
children: [
|
||
downloadNode(),
|
||
// Download GTFS feed before building transit tiles. Idempotent —
|
||
// skips if the feed is current, so subsequent refreshes are cheap.
|
||
{
|
||
name: "download-gtfs-de",
|
||
queueName: "valhalla-transit",
|
||
data: {
|
||
type: "download-gtfs-de" as const,
|
||
url: "https://download.gtfs.de/germany/nv_free/latest.zip",
|
||
// Per-city bboxes (with ~10 km buffer) so valhalla_ingest_transit
|
||
// only processes stops/trips relevant to the known cities.
|
||
...(gtfsBboxes.length > 0 ? { bboxes: gtfsBboxes } : {}),
|
||
},
|
||
opts: JOB_OPTIONS["download-gtfs-de"],
|
||
},
|
||
],
|
||
},
|
||
],
|
||
},
|
||
],
|
||
};
|
||
|
||
const flow = new FlowProducer({ connection: createBullMQConnection() });
|
||
let computeScoresJobId: string | undefined;
|
||
try {
|
||
const jobNode = await flow.add(rootNode);
|
||
// jobNode.job is the root (compute-scores) job. Store its ID in this
|
||
// refresh-city job's data so the SSE stream can match by exact job ID
|
||
// rather than by citySlug (which would match stale completed jobs).
|
||
computeScoresJobId = jobNode.job.id;
|
||
} finally {
|
||
await flow.close();
|
||
}
|
||
|
||
if (computeScoresJobId) {
|
||
await job.updateData({ ...job.data, computeScoresJobId });
|
||
}
|
||
|
||
await job.updateProgress({
|
||
stage: "Orchestrating pipeline",
|
||
pct: 100,
|
||
message: `All pipeline jobs enqueued${niApplicable ? " (includes BORIS NI, parallel with routing)" : ""}. Processing will begin shortly.`,
|
||
} satisfies JobProgress);
|
||
}
|