import type { Job } from "bullmq"; import { FlowProducer } from "bullmq"; import { createBullMQConnection } from "../redis.js"; import { getSql } from "../db.js"; import { JOB_OPTIONS } from "@transportationer/shared"; import type { JobProgress } from "@transportationer/shared"; export type RefreshCityData = { type: "refresh-city"; citySlug: string; geofabrikUrl: string; resolutionM?: number; }; const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm"; export async function handleRefreshCity( job: Job, ): Promise { const { citySlug, geofabrikUrl, resolutionM = 200 } = job.data; const sql = getSql(); const pbfPath = `${OSM_DATA_DIR}/${citySlug}-latest.osm.pbf`; // Read the user-specified bbox from the database (set at city creation time). // If present, it will be passed to extract-pois to clip the PBF before import. const bboxRows = await Promise.resolve(sql<{ minlng: number; minlat: number; maxlng: number; maxlat: number; }[]>` SELECT ST_XMin(bbox)::float AS minlng, ST_YMin(bbox)::float AS minlat, ST_XMax(bbox)::float AS maxlng, ST_YMax(bbox)::float AS maxlat FROM cities WHERE slug = ${citySlug} AND bbox IS NOT NULL `); const bbox: [number, number, number, number] | undefined = bboxRows.length > 0 ? [bboxRows[0].minlng, bboxRows[0].minlat, bboxRows[0].maxlng, bboxRows[0].maxlat] : undefined; await job.updateProgress({ stage: "Orchestrating pipeline", pct: 0, message: `Starting full ingest for ${citySlug}`, } satisfies JobProgress); // Mark city as processing await Promise.resolve(sql` UPDATE cities SET status = 'processing' WHERE slug = ${citySlug} `); // FlowProducer creates a dependency chain evaluated bottom-up: // download → extract → generate-grid → build-valhalla → compute-scores const flow = new FlowProducer({ connection: createBullMQConnection() }); try { await flow.add({ name: "compute-scores", queueName: "pipeline", data: { type: "compute-scores", citySlug, modes: ["walking", "cycling", "driving"], thresholds: [5, 10, 15, 20, 30], }, opts: JOB_OPTIONS["compute-scores"], children: [ { name: "generate-grid", queueName: "pipeline", data: { type: "generate-grid", citySlug, resolutionM, }, opts: JOB_OPTIONS["generate-grid"], children: [ { name: "build-valhalla", queueName: "valhalla", // handled by the dedicated valhalla-worker data: { type: "build-valhalla", citySlug, pbfPath, ...(bbox ? { bbox } : {}), }, opts: JOB_OPTIONS["build-valhalla"], children: [ { name: "extract-pois", queueName: "pipeline", data: { type: "extract-pois", citySlug, pbfPath, ...(bbox ? { bbox } : {}), }, opts: JOB_OPTIONS["extract-pois"], children: [ { name: "download-pbf", queueName: "pipeline", data: { type: "download-pbf", citySlug, geofabrikUrl, }, opts: JOB_OPTIONS["download-pbf"], }, ], }, ], }, ], }, ], }); } finally { await flow.close(); } await job.updateProgress({ stage: "Orchestrating pipeline", pct: 100, message: "All pipeline jobs enqueued. Processing will begin shortly.", } satisfies JobProgress); }