From 06a006691d14f393d1cf726ac79aed7c80a18389 Mon Sep 17 00:00:00 2001 From: Jan-Henrik Bruhn Date: Fri, 6 Mar 2026 10:34:30 +0100 Subject: [PATCH] fix: do more in SQL, fix some worker behaviour and scheduling, add boris hb --- apps/web/app/api/admin/cities/route.ts | 2 +- apps/web/app/api/admin/ingest/[slug]/route.ts | 2 +- .../app/api/admin/jobs/[id]/stream/route.ts | 7 +- apps/web/app/api/admin/jobs/route.ts | 26 +- apps/web/lib/queue.ts | 19 + shared/src/queue.ts | 14 + worker/src/index.ts | 35 +- worker/src/jobs/compute-routing.ts | 2 +- worker/src/jobs/compute-scores.ts | 426 ++++++++---------- worker/src/jobs/refresh-city.ts | 39 +- worker/src/valhalla.ts | 2 +- 11 files changed, 297 insertions(+), 277 deletions(-) diff --git a/apps/web/app/api/admin/cities/route.ts b/apps/web/app/api/admin/cities/route.ts index cc33053..fdec431 100644 --- a/apps/web/app/api/admin/cities/route.ts +++ b/apps/web/app/api/admin/cities/route.ts @@ -165,7 +165,7 @@ export async function POST(req: NextRequest) { geofabrikUrl, resolutionM: resolutionM as number, }, - JOB_OPTIONS["refresh-city"], + { ...JOB_OPTIONS["refresh-city"], jobId: `refresh-city.${slug}` }, ); // Invalidate city list cache diff --git a/apps/web/app/api/admin/ingest/[slug]/route.ts b/apps/web/app/api/admin/ingest/[slug]/route.ts index 52122d0..68fbf84 100644 --- a/apps/web/app/api/admin/ingest/[slug]/route.ts +++ b/apps/web/app/api/admin/ingest/[slug]/route.ts @@ -29,7 +29,7 @@ export async function POST( const job = await queue.add( "refresh-city", { type: "refresh-city", citySlug: slug, geofabrikUrl }, - JOB_OPTIONS["refresh-city"], + { ...JOB_OPTIONS["refresh-city"], jobId: `refresh-city.${slug}` }, ); return NextResponse.json({ citySlug: slug, jobId: job.id }, { status: 202 }); diff --git a/apps/web/app/api/admin/jobs/[id]/stream/route.ts b/apps/web/app/api/admin/jobs/[id]/stream/route.ts index d073ec0..22f938b 100644 --- a/apps/web/app/api/admin/jobs/[id]/stream/route.ts +++ b/apps/web/app/api/admin/jobs/[id]/stream/route.ts @@ -1,6 +1,6 @@ import { NextRequest } from "next/server"; import { Job } from "bullmq"; -import { getPipelineQueue, getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue"; +import { getPipelineQueue, getDownloadQueue, getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue"; import type { PipelineJobData, JobProgress, ComputeScoresJobData, RefreshCityJobData } from "@/lib/queue"; import type { SSEEvent, RoutingDetail } from "@transportationer/shared"; import { CATEGORY_IDS } from "@transportationer/shared"; @@ -48,6 +48,7 @@ export async function GET( }); } + const downloadQueue = getDownloadQueue(); const valhallaQueue = getValhallaQueue(); const valhallaTransitQueue = getValhallaTransitQueue(); @@ -78,13 +79,15 @@ export async function GET( } // 1. Fetch active jobs and waiting-children jobs in parallel. - const [pipelineActive, valhallaActive, valhallaTransitActive, waitingChildren] = await Promise.all([ + const [pipelineActive, downloadActive, valhallaActive, valhallaTransitActive, waitingChildren] = await Promise.all([ queue.getActive(0, 100), + downloadQueue.getActive(0, 100), valhallaQueue.getActive(0, 100), valhallaTransitQueue.getActive(0, 100), queue.getWaitingChildren(0, 200), ]); const allValhallaActive = [...valhallaActive, ...valhallaTransitActive]; + void downloadActive; // visible in job overview; not used for SSE progress // 1a. Parallel routing phase: compute-scores is waiting for its routing // children to finish. Report aggregate progress instead of one job's pct. diff --git a/apps/web/app/api/admin/jobs/route.ts b/apps/web/app/api/admin/jobs/route.ts index aa10706..1eb80af 100644 --- a/apps/web/app/api/admin/jobs/route.ts +++ b/apps/web/app/api/admin/jobs/route.ts @@ -1,37 +1,45 @@ import { NextResponse } from "next/server"; -import { getPipelineQueue, getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue"; +import { getPipelineQueue, getDownloadQueue, getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue"; import type { JobSummary } from "@transportationer/shared"; export const runtime = "nodejs"; export async function GET() { - const [pQueue, vQueue, vtQueue] = [getPipelineQueue(), getValhallaQueue(), getValhallaTransitQueue()]; + const [pQueue, dQueue, vQueue, vtQueue] = [getPipelineQueue(), getDownloadQueue(), getValhallaQueue(), getValhallaTransitQueue()]; const [ pWaiting, pWaitingChildren, pActive, pCompleted, pFailed, - vWaiting, vActive, vCompleted, vFailed, - vtWaiting, vtActive, vtCompleted, vtFailed, + dWaiting, dWaitingChildren, dActive, dCompleted, dFailed, + vWaiting, vWaitingChildren, vActive, vCompleted, vFailed, + vtWaiting, vtWaitingChildren, vtActive, vtCompleted, vtFailed, ] = await Promise.all([ pQueue.getWaiting(0, 20), pQueue.getWaitingChildren(0, 20), pQueue.getActive(0, 20), pQueue.getCompleted(0, 20), pQueue.getFailed(0, 20), + dQueue.getWaiting(0, 20), + dQueue.getWaitingChildren(0, 20), + dQueue.getActive(0, 20), + dQueue.getCompleted(0, 20), + dQueue.getFailed(0, 20), vQueue.getWaiting(0, 20), + vQueue.getWaitingChildren(0, 20), vQueue.getActive(0, 20), vQueue.getCompleted(0, 20), vQueue.getFailed(0, 20), vtQueue.getWaiting(0, 20), + vtQueue.getWaitingChildren(0, 20), vtQueue.getActive(0, 20), vtQueue.getCompleted(0, 20), vtQueue.getFailed(0, 20), ]); - const waitingChildren = [...pWaitingChildren]; - const waiting = [...pWaiting, ...vWaiting, ...vtWaiting]; - const active = [...pActive, ...vActive, ...vtActive]; - const completed = [...pCompleted, ...vCompleted, ...vtCompleted]; - const failed = [...pFailed, ...vFailed, ...vtFailed]; + const waitingChildren = [...pWaitingChildren, ...dWaitingChildren, ...vWaitingChildren, ...vtWaitingChildren]; + const waiting = [...pWaiting, ...dWaiting, ...vWaiting, ...vtWaiting]; + const active = [...pActive, ...dActive, ...vActive, ...vtActive]; + const completed = [...pCompleted, ...dCompleted, ...vCompleted, ...vtCompleted]; + const failed = [...pFailed, ...dFailed, ...vFailed, ...vtFailed]; const all = [...active, ...waitingChildren, ...waiting, ...completed, ...failed]; diff --git a/apps/web/lib/queue.ts b/apps/web/lib/queue.ts index dc2aff1..ef9b0d2 100644 --- a/apps/web/lib/queue.ts +++ b/apps/web/lib/queue.ts @@ -25,6 +25,9 @@ declare global { var __pipelineQueue: Queue | undefined; // eslint-disable-next-line no-var // eslint-disable-next-line @typescript-eslint/no-explicit-any + var __downloadQueue: Queue | undefined; + // eslint-disable-next-line no-var + // eslint-disable-next-line @typescript-eslint/no-explicit-any var __valhallaQueue: Queue | undefined; // eslint-disable-next-line no-var // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -46,6 +49,22 @@ export function getPipelineQueue(): Queue { return globalThis.__pipelineQueue; } +/** Queue for download-pbf jobs — concurrency 1 in the worker prevents parallel downloads. */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export function getDownloadQueue(): Queue { + if (!globalThis.__downloadQueue) { + globalThis.__downloadQueue = new Queue("download", { + connection: createBullMQConnection(), + defaultJobOptions: { + attempts: 2, + removeOnComplete: { age: 86400 * 7 }, + removeOnFail: { age: 86400 * 30 }, + }, + }); + } + return globalThis.__downloadQueue; +} + /** Queue for build-valhalla jobs, processed by the valhalla-worker container. */ // eslint-disable-next-line @typescript-eslint/no-explicit-any export function getValhallaQueue(): Queue { diff --git a/shared/src/queue.ts b/shared/src/queue.ts index 1f57954..45de497 100644 --- a/shared/src/queue.ts +++ b/shared/src/queue.ts @@ -32,6 +32,8 @@ export interface ComputeScoresJobData { routingDispatched?: boolean; /** When true, ingest-boris-ni is dispatched in Phase 1 to run alongside routing jobs. */ ingestBorisNi?: boolean; + /** When true, ingest-boris-hb is dispatched in Phase 1 to run alongside routing jobs. */ + ingestBorisHb?: boolean; } export interface ComputeRoutingJobData { @@ -71,6 +73,11 @@ export interface IngestBorisNiJobData { citySlug: string; } +export interface IngestBorisHbJobData { + type: "ingest-boris-hb"; + citySlug: string; +} + export interface DownloadGtfsDeJobData { type: "download-gtfs-de"; url: string; @@ -94,6 +101,7 @@ export type PipelineJobData = | BuildValhallaJobData | RefreshCityJobData | IngestBorisNiJobData + | IngestBorisHbJobData | DownloadGtfsDeJobData | ComputeTransitJobData; @@ -154,4 +162,10 @@ export const JOB_OPTIONS: Record = { removeOnComplete: { age: 86400 * 7 }, removeOnFail: { age: 86400 * 30 }, }, + "ingest-boris-hb": { + attempts: 2, + backoff: { type: "fixed", delay: 5000 }, + removeOnComplete: { age: 86400 * 7 }, + removeOnFail: { age: 86400 * 30 }, + }, }; diff --git a/worker/src/index.ts b/worker/src/index.ts index 7c1f862..ad2ebfc 100644 --- a/worker/src/index.ts +++ b/worker/src/index.ts @@ -9,10 +9,40 @@ import { handleComputeScores } from "./jobs/compute-scores.js"; import { handleComputeRouting } from "./jobs/compute-routing.js"; import { handleRefreshCity } from "./jobs/refresh-city.js"; import { handleIngestBorisNi } from "./jobs/ingest-boris-ni.js"; +import { handleIngestBorisHb } from "./jobs/ingest-boris-hb.js"; import { handleComputeTransit } from "./jobs/compute-transit.js"; console.log("[worker] Starting Transportationer pipeline worker…"); +// Dedicated download worker — concurrency 1 ensures at most one PBF download +// runs at a time. The handler is idempotent (skips if file already on disk), +// so the three parallel download-pbf children from a single city refresh +// execute sequentially: the first downloads, the other two skip immediately. +const downloadWorker = new Worker( + "download", + async (job: Job) => { + if (job.data.type === "download-pbf") return handleDownloadPbf(job as Job); + throw new Error(`Unexpected job type on download queue: ${(job.data as any).type}`); + }, + { + connection: createBullMQConnection(), + concurrency: 1, + lockDuration: 300_000, + lockRenewTime: 15_000, + maxStalledCount: 3, + }, +); + +downloadWorker.on("completed", (job) => + console.log(`[download-worker] ✓ Job ${job.id} (${job.data.type}) completed`), +); +downloadWorker.on("failed", (job, err) => + console.error(`[download-worker] ✗ Job ${job?.id} failed:`, err.message), +); +downloadWorker.on("active", (job) => + console.log(`[download-worker] → Job ${job.id} (${job.data.type}) started`), +); + const worker = new Worker( "pipeline", async (job: Job, token?: string) => { @@ -33,6 +63,8 @@ const worker = new Worker( return handleRefreshCity(job as Job); case "ingest-boris-ni": return handleIngestBorisNi(job as Job); + case "ingest-boris-hb": + return handleIngestBorisHb(job as Job); case "compute-transit": return handleComputeTransit(job as Job); default: @@ -48,6 +80,7 @@ const worker = new Worker( concurrency: 8, lockDuration: 300_000, // 5 minutes — download jobs can be slow lockRenewTime: 15_000, // Renew every 15s + maxStalledCount: 3, // Allow up to 3 stalls before failing (large city jobs can be slow) }, ); @@ -85,7 +118,7 @@ worker.on("error", (err) => { const shutdown = async () => { console.log("[worker] Shutting down gracefully…"); - await worker.close(); + await Promise.all([worker.close(), downloadWorker.close()]); process.exit(0); }; diff --git a/worker/src/jobs/compute-routing.ts b/worker/src/jobs/compute-routing.ts index 9918e54..12d09ad 100644 --- a/worker/src/jobs/compute-routing.ts +++ b/worker/src/jobs/compute-routing.ts @@ -13,7 +13,7 @@ export type ComputeRoutingData = { /** Number of nearest POI candidates per grid point. */ const K = 6; /** Grid points per Valhalla matrix call. */ -const BATCH_SIZE = 20; +const BATCH_SIZE = 5; /** Concurrent Valhalla calls within this job. */ const BATCH_CONCURRENCY = 4; /** Rows per INSERT. */ diff --git a/worker/src/jobs/compute-scores.ts b/worker/src/jobs/compute-scores.ts index 6065d1b..0585a58 100644 --- a/worker/src/jobs/compute-scores.ts +++ b/worker/src/jobs/compute-scores.ts @@ -6,37 +6,9 @@ import type { JobProgress, ComputeScoresJobData as ComputeScoresData } from "@tr import { CATEGORY_IDS, PROFILES, - PROFILE_IDS, DEFAULT_SUBCATEGORY_WEIGHT, } from "@transportationer/shared"; -const INSERT_CHUNK = 2000; - -function subcategoryWeight(profileId: string, subcategory: string): number { - const weights = PROFILES[profileId as keyof typeof PROFILES]?.subcategoryWeights; - if (!weights) return DEFAULT_SUBCATEGORY_WEIGHT; - return weights[subcategory] ?? DEFAULT_SUBCATEGORY_WEIGHT; -} - -function sigmoid(t_s: number, threshold_s: number): number { - return 1 / (1 + Math.exp(4 * (t_s - threshold_s) / threshold_s)); -} - -function complementProduct( - subcategoryTimes: Array<{ subcategory: string; timeS: number | null }>, - threshold_s: number, - profileId: string, -): number { - let logProd = 0; - let hasAny = false; - for (const { subcategory, timeS } of subcategoryTimes) { - const weight = subcategoryWeight(profileId, subcategory); - if (timeS === null || weight <= 0) continue; - hasAny = true; - logProd += Math.log(Math.max(1 - weight * sigmoid(timeS, threshold_s), 1e-10)); - } - return hasAny ? 1 - Math.exp(logProd) : 0; -} /** * Two-phase orchestrator for accessibility score computation. @@ -91,8 +63,25 @@ export async function handleComputeScores( // parallel with the routing jobs rather than sequentially after them. const queue = new Queue("pipeline", { connection: createBullMQConnection() }); try { + // Dispatch transit scoring first so it starts immediately while the + // compute-routing jobs queue up behind it (transit is the slowest leg). + if (modes.includes("transit")) { + await queue.add( + "compute-transit", + { type: "compute-transit", citySlug }, + { + attempts: 1, + removeOnComplete: { age: 86400 * 7 }, + removeOnFail: { age: 86400 * 30 }, + jobId: `compute-transit.${citySlug}`, + parent: { id: job.id!, queue: queue.qualifiedName }, + ignoreDependencyOnFailure: true, + }, + ); + } + for (const mode of modes) { - if (mode === "transit") continue; // handled below as a single job + if (mode === "transit") continue; // already dispatched above for (const category of CATEGORY_IDS) { await queue.add( "compute-routing", @@ -102,10 +91,9 @@ export async function handleComputeScores( backoff: { type: "fixed", delay: 3000 }, removeOnComplete: { age: 86400 * 7 }, removeOnFail: { age: 86400 * 30 }, + jobId: `compute-routing.${citySlug}.${mode}.${category}`, parent: { id: job.id!, - // qualifiedName = "bull:pipeline" — the Redis key BullMQ uses - // to track parent/child relationships. queue: queue.qualifiedName, }, }, @@ -113,24 +101,6 @@ export async function handleComputeScores( } } - // Dispatch transit scoring as a sibling child (one job covers all categories - // via PostGIS isochrone spatial joins, unlike per-category routing jobs). - if (modes.includes("transit")) { - await queue.add( - "compute-transit", - { type: "compute-transit", citySlug }, - { - attempts: 1, - removeOnComplete: { age: 86400 * 7 }, - removeOnFail: { age: 86400 * 30 }, - parent: { id: job.id!, queue: queue.qualifiedName }, - // Transit is optional — a failure should not cascade to the parent. - // The city will be marked ready with walking/cycling scores only. - ignoreDependencyOnFailure: true, - }, - ); - } - // Dispatch BORIS NI ingest as a sibling child so it runs during routing. if (job.data.ingestBorisNi) { await queue.add( @@ -141,8 +111,25 @@ export async function handleComputeScores( backoff: { type: "fixed", delay: 5000 }, removeOnComplete: { age: 86400 * 7 }, removeOnFail: { age: 86400 * 30 }, + jobId: `ingest-boris-ni.${citySlug}`, + parent: { id: job.id!, queue: queue.qualifiedName }, + ignoreDependencyOnFailure: true, + }, + ); + } + + // Dispatch BORIS HB ingest (Stadt Bremen + Bremerhaven) similarly. + if (job.data.ingestBorisHb) { + await queue.add( + "ingest-boris-hb", + { type: "ingest-boris-hb", citySlug }, + { + attempts: 2, + backoff: { type: "fixed", delay: 5000 }, + removeOnComplete: { age: 86400 * 7 }, + removeOnFail: { age: 86400 * 30 }, + jobId: `ingest-boris-hb.${citySlug}`, parent: { id: job.id!, queue: queue.qualifiedName }, - // Boris NI data is optional — failure should not cancel the pipeline. ignoreDependencyOnFailure: true, }, ); @@ -162,214 +149,155 @@ export async function handleComputeScores( } // ── Phase 2: aggregate scores from grid_poi_details ────────────────────── + // All computation runs inside a single SQL CTE chain so the DBMS can + // optimise joins and aggregations without streaming data through Node.js. await job.updateProgress({ stage: "Aggregating scores", pct: 70, message: `All routing complete — computing profile scores…`, } satisfies JobProgress); - // Load all per-subcategory routing results for this city in one query. - // Ordered by distance so the first row per (gpId, mode, category) is nearest. - const detailRows = await Promise.resolve(sql<{ - grid_point_id: string; - category: string; - subcategory: string; - travel_mode: string; - nearest_poi_id: string | null; - distance_m: number | null; - travel_time_s: number | null; - }[]>` + // Build parallel arrays for SQL unnest: one row per (profile, subcategory). + const pwProfiles: string[] = []; + const pwSubcats: string[] = []; + const pwWeights: number[] = []; + for (const [profileId, profile] of Object.entries(PROFILES)) { + for (const [subcategory, weight] of Object.entries(profile.subcategoryWeights)) { + pwProfiles.push(profileId); + pwSubcats.push(subcategory); + pwWeights.push(weight); + } + } + + await Promise.resolve(sql` + WITH + base AS ( + SELECT + gpd.grid_point_id, + gpd.category, + gpd.subcategory, + gpd.travel_mode, + gpd.nearest_poi_id, + gpd.distance_m, + gpd.travel_time_s + FROM grid_poi_details gpd + JOIN grid_points gp ON gp.id = gpd.grid_point_id + WHERE gp.city_slug = ${citySlug} + ), + fifteen_subcat AS ( + -- "fifteen" mode: best (lowest) travel time across walking / cycling / transit + SELECT + grid_point_id, category, subcategory, + 'fifteen'::text AS travel_mode, + MIN(travel_time_s) AS travel_time_s + FROM base + WHERE travel_mode IN ('walking', 'cycling', 'transit') + GROUP BY grid_point_id, category, subcategory + ), + all_subcat AS ( + SELECT grid_point_id, category, subcategory, travel_mode, travel_time_s FROM base + UNION ALL + SELECT grid_point_id, category, subcategory, travel_mode, travel_time_s FROM fifteen_subcat + ), + road_nearest AS ( + -- Nearest POI per (grid_point, category, mode) by distance + SELECT DISTINCT ON (grid_point_id, category, travel_mode) + grid_point_id, category, travel_mode, nearest_poi_id, distance_m, travel_time_s + FROM base + WHERE nearest_poi_id IS NOT NULL + ORDER BY grid_point_id, category, travel_mode, distance_m + ), + fifteen_nearest AS ( + -- Nearest POI for "fifteen": closest across walking / cycling / transit + SELECT DISTINCT ON (grid_point_id, category) + grid_point_id, category, + 'fifteen'::text AS travel_mode, + nearest_poi_id, distance_m, travel_time_s + FROM base + WHERE travel_mode IN ('walking', 'cycling', 'transit') + AND nearest_poi_id IS NOT NULL + ORDER BY grid_point_id, category, distance_m + ), + all_nearest AS ( + SELECT * FROM road_nearest + UNION ALL + SELECT * FROM fifteen_nearest + ), + profile_weights AS ( + SELECT profile_id, subcategory, weight + FROM unnest( + ${pwProfiles}::text[], + ${pwSubcats}::text[], + ${pwWeights}::float8[] + ) AS t(profile_id, subcategory, weight) + ), + thresholds_t AS ( + SELECT unnest(${thresholds}::int[]) AS threshold_min + ), + scores AS ( + -- Complement-product score per (grid_point, category, mode, threshold, profile). + -- sigmoid(t) = 1 / (1 + exp((t − T) / (T/6))) where T = threshold in seconds. + -- score = 1 − ∏(1 − w·sigmoid) computed via EXP(SUM(LN(complement))). + -- NULL travel_time_s → sigmoid = 0 → complement = 1 → LN(1) = 0 (no penalty). + SELECT + s.grid_point_id, + s.category, + s.travel_mode, + t.threshold_min, + p.profile_id, + 1.0 - EXP(SUM( + LN(GREATEST( + 1.0 - COALESCE(pw.weight, ${DEFAULT_SUBCATEGORY_WEIGHT}::float8) + * CASE + WHEN s.travel_time_s IS NULL THEN 0.0 + ELSE 1.0 / (1.0 + EXP( + (s.travel_time_s - t.threshold_min * 60.0) + / (t.threshold_min * 10.0) + )) + END, + 1e-10 + )) + )) AS score + FROM all_subcat s + JOIN thresholds_t t ON true + CROSS JOIN (SELECT DISTINCT profile_id FROM profile_weights) p + LEFT JOIN profile_weights pw + ON pw.profile_id = p.profile_id AND pw.subcategory = s.subcategory + GROUP BY s.grid_point_id, s.category, s.travel_mode, t.threshold_min, p.profile_id + ) + INSERT INTO grid_scores ( + grid_point_id, category, travel_mode, threshold_min, profile, + nearest_poi_id, distance_m, travel_time_s, score + ) SELECT - gpd.grid_point_id::text, - gpd.category, - gpd.subcategory, - gpd.travel_mode, - gpd.nearest_poi_id::text, - gpd.distance_m, - gpd.travel_time_s - FROM grid_poi_details gpd - JOIN grid_points gp ON gp.id = gpd.grid_point_id - WHERE gp.city_slug = ${citySlug} - ORDER BY gpd.grid_point_id, gpd.travel_mode, gpd.category, gpd.distance_m + sc.grid_point_id, + sc.category, + sc.travel_mode, + sc.threshold_min, + sc.profile_id, + n.nearest_poi_id, + n.distance_m, + n.travel_time_s, + sc.score + FROM scores sc + LEFT JOIN all_nearest n + ON n.grid_point_id = sc.grid_point_id + AND n.category = sc.category + AND n.travel_mode = sc.travel_mode + ON CONFLICT (grid_point_id, category, travel_mode, threshold_min, profile) + DO UPDATE SET + nearest_poi_id = EXCLUDED.nearest_poi_id, + distance_m = EXCLUDED.distance_m, + travel_time_s = EXCLUDED.travel_time_s, + score = EXCLUDED.score, + computed_at = now() `); - // Build in-memory structure keyed by "gpId:mode:category". - type GroupEntry = { - gpId: string; - mode: string; - category: string; - subcategoryTimes: Array<{ subcategory: string; timeS: number | null }>; - nearestPoiId: string | null; - nearestDistM: number | null; - nearestTimeS: number | null; - }; - const groups = new Map(); - - for (const row of detailRows) { - const key = `${row.grid_point_id}:${row.travel_mode}:${row.category}`; - let entry = groups.get(key); - if (!entry) { - entry = { - gpId: row.grid_point_id, - mode: row.travel_mode, - category: row.category, - subcategoryTimes: [], - nearestPoiId: null, - nearestDistM: null, - nearestTimeS: null, - }; - groups.set(key, entry); - } - entry.subcategoryTimes.push({ subcategory: row.subcategory, timeS: row.travel_time_s }); - // Track the overall nearest POI for this category (minimum distance). - if ( - row.distance_m !== null && - (entry.nearestDistM === null || row.distance_m < entry.nearestDistM) - ) { - entry.nearestPoiId = row.nearest_poi_id; - entry.nearestDistM = row.distance_m; - entry.nearestTimeS = row.travel_time_s; - } - } - - // Synthesize "multimodal" groups: for each (gpId, category, subcategory), - // take the minimum travel time across walking and cycling so that a - // destination reachable by either mode counts as accessible. - // Driving is intentionally excluded (not a 15-min city metric). - const MULTIMODAL_MODES = new Set(["walking", "cycling", "transit"]); // modes combined into "fifteen" - const mmAccumulator = new Map; - nearestDistM: number | null; - nearestPoiId: string | null; - nearestTimeS: number | null; - }>(); - - for (const entry of groups.values()) { - if (!MULTIMODAL_MODES.has(entry.mode)) continue; - const mmKey = `${entry.gpId}:${entry.category}`; - if (!mmAccumulator.has(mmKey)) { - mmAccumulator.set(mmKey, { - gpId: entry.gpId, - category: entry.category, - subTimes: new Map(), - nearestDistM: null, - nearestPoiId: null, - nearestTimeS: null, - }); - } - const acc = mmAccumulator.get(mmKey)!; - // Track nearest POI across all multimodal modes - if (entry.nearestDistM !== null && (acc.nearestDistM === null || entry.nearestDistM < acc.nearestDistM)) { - acc.nearestDistM = entry.nearestDistM; - acc.nearestPoiId = entry.nearestPoiId; - acc.nearestTimeS = entry.nearestTimeS; - } - // For each subcategory, keep the minimum travel time across modes - for (const { subcategory, timeS } of entry.subcategoryTimes) { - const existing = acc.subTimes.get(subcategory); - if (existing === undefined) { - acc.subTimes.set(subcategory, timeS); - } else if (existing === null && timeS !== null) { - acc.subTimes.set(subcategory, timeS); - } else if (timeS !== null && existing !== null && timeS < existing) { - acc.subTimes.set(subcategory, timeS); - } - } - } - - for (const acc of mmAccumulator.values()) { - const key = `${acc.gpId}:fifteen:${acc.category}`; - groups.set(key, { - gpId: acc.gpId, - mode: "fifteen", - category: acc.category, - subcategoryTimes: Array.from(acc.subTimes.entries()).map(([subcategory, timeS]) => ({ subcategory, timeS })), - nearestPoiId: acc.nearestPoiId, - nearestDistM: acc.nearestDistM, - nearestTimeS: acc.nearestTimeS, - }); - } - - // Compute and insert scores for every threshold × profile combination. - // Each threshold writes to distinct rows (threshold_min is part of the PK), - // so all thresholds can be processed concurrently without conflicts. - // Node.js is single-threaded so completedThresholds++ is safe. - let completedThresholds = 0; - - await Promise.all(thresholds.map(async (thresholdMin) => { - const threshold_s = thresholdMin * 60; - - const gpIdArr: string[] = []; - const catArr: string[] = []; - const modeArr: string[] = []; - const profileArr: string[] = []; - const poiIdArr: (string | null)[] = []; - const distArr: (number | null)[] = []; - const timeArr: (number | null)[] = []; - const scoreArr: number[] = []; - - for (const entry of groups.values()) { - for (const profileId of PROFILE_IDS) { - gpIdArr.push(entry.gpId); - catArr.push(entry.category); - modeArr.push(entry.mode); - profileArr.push(profileId); - poiIdArr.push(entry.nearestPoiId); - distArr.push(entry.nearestDistM); - timeArr.push(entry.nearestTimeS); - scoreArr.push(complementProduct(entry.subcategoryTimes, threshold_s, profileId)); - } - } - - // Chunks within a threshold stay sequential — with all thresholds running - // concurrently we already have up to thresholds.length parallel INSERT - // streams, which saturates the connection pool without overwhelming it. - for (let i = 0; i < gpIdArr.length; i += INSERT_CHUNK) { - const end = Math.min(i + INSERT_CHUNK, gpIdArr.length); - await Promise.resolve(sql` - INSERT INTO grid_scores ( - grid_point_id, category, travel_mode, threshold_min, profile, - nearest_poi_id, distance_m, travel_time_s, score - ) - SELECT - gp_id::bigint, - cat, - mode_val, - ${thresholdMin}::int, - prof, - CASE WHEN poi_id IS NULL THEN NULL ELSE poi_id::bigint END, - dist, - time_s, - score_val - FROM unnest( - ${gpIdArr.slice(i, end)}::text[], - ${catArr.slice(i, end)}::text[], - ${modeArr.slice(i, end)}::text[], - ${profileArr.slice(i, end)}::text[], - ${poiIdArr.slice(i, end)}::text[], - ${distArr.slice(i, end)}::float8[], - ${timeArr.slice(i, end)}::float8[], - ${scoreArr.slice(i, end)}::float8[] - ) AS t(gp_id, cat, mode_val, prof, poi_id, dist, time_s, score_val) - ON CONFLICT (grid_point_id, category, travel_mode, threshold_min, profile) - DO UPDATE SET - nearest_poi_id = EXCLUDED.nearest_poi_id, - distance_m = EXCLUDED.distance_m, - travel_time_s = EXCLUDED.travel_time_s, - score = EXCLUDED.score, - computed_at = now() - `); - } - - completedThresholds++; - await job.updateProgress({ - stage: "Aggregating scores", - pct: 70 + Math.round((completedThresholds / thresholds.length) * 28), - message: `${completedThresholds} / ${thresholds.length} thresholds done…`, - } satisfies JobProgress); - })); + await job.updateProgress({ + stage: "Aggregating scores", + pct: 98, + message: `All scores computed for ${citySlug}`, + } satisfies JobProgress); await Promise.resolve(sql` UPDATE cities SET status = 'ready', last_ingested = now() diff --git a/worker/src/jobs/refresh-city.ts b/worker/src/jobs/refresh-city.ts index 2741bf8..75622b0 100644 --- a/worker/src/jobs/refresh-city.ts +++ b/worker/src/jobs/refresh-city.ts @@ -21,6 +21,17 @@ function isInNiedersachsen(minLng: number, minLat: number, maxLng: number, maxLa return minLng < 11.779 && maxLng > 6.526 && minLat < 54.033 && maxLat > 51.197; } +/** + * True when the given bbox intersects the Freie Hansestadt Bremen (HB), which + * consists of two non-contiguous cities: + * - Stadt Bremen: ~52.95–53.23°N, 8.48–8.99°E + * - Bremerhaven: ~53.46–53.65°N, 8.48–8.68°E + * The WFS BBOX filter ensures each city only receives its own zones — no overlap. + */ +function isInBremen(minLng: number, minLat: number, maxLng: number, maxLat: number): boolean { + return minLng < 8.99 && maxLng > 8.48 && minLat < 53.65 && maxLat > 52.95; +} + export async function handleRefreshCity( job: Job, ): Promise { @@ -81,18 +92,21 @@ export async function handleRefreshCity( UPDATE cities SET status = 'processing' WHERE slug = ${citySlug} `); - // Shared download node factory — produces an idempotent download-pbf node. - // Two independent nodes with the same geofabrikUrl are safe: the idempotency - // check in download-pbf.ts skips the download if the file already exists. + // Shared download node factory — each call produces an independent download-pbf + // job (no fixed jobId). Three branches each get their own job so BullMQ can + // track each parent→child dependency correctly; the handler skips the actual + // download if the file already exists (atomic rename + idempotency check). const downloadNode = () => ({ name: "download-pbf", - queueName: "pipeline", + queueName: "download", data: { type: "download-pbf" as const, citySlug, geofabrikUrl }, - opts: JOB_OPTIONS["download-pbf"], + opts: { ...JOB_OPTIONS["download-pbf"] }, }); // For NI cities: ingest-boris-ni is dispatched in Phase 1 of compute-scores. const niApplicable = !!(bbox && isInNiedersachsen(...bbox)); + // For HB cities (Stadt Bremen + Bremerhaven): ingest-boris-hb runs in Phase 1. + const hbApplicable = !!(bbox && isInBremen(...bbox)); // Parallel pipeline DAG (bottom-up — leaves execute first): // @@ -119,17 +133,18 @@ export async function handleRefreshCity( modes: ["walking", "cycling", "driving", "transit"] as const, thresholds: [...VALID_THRESHOLDS], ingestBorisNi: niApplicable, + ingestBorisHb: hbApplicable, }, - opts: JOB_OPTIONS["compute-scores"], + opts: { ...JOB_OPTIONS["compute-scores"], jobId: `compute-scores.${citySlug}` }, children: [ { name: "generate-grid", queueName: "pipeline", data: { type: "generate-grid" as const, citySlug, resolutionM }, - opts: JOB_OPTIONS["generate-grid"], + opts: { ...JOB_OPTIONS["generate-grid"], jobId: `generate-grid.${citySlug}` }, children: [ - // Three parallel branches — each gets its own download-pbf child; - // the idempotency guard ensures only one actually downloads when they race. + // Three parallel branches — all share a single download-pbf job via + // the deterministic jobId; BullMQ deduplicates them to one download. { name: "extract-pois", queueName: "pipeline", @@ -139,7 +154,7 @@ export async function handleRefreshCity( pbfPath, ...(bbox ? { bbox } : {}), }, - opts: JOB_OPTIONS["extract-pois"], + opts: { ...JOB_OPTIONS["extract-pois"], jobId: `extract-pois.${citySlug}` }, children: [downloadNode()], }, // Road-only Valhalla build — no GTFS, produces clean tiles without @@ -153,7 +168,7 @@ export async function handleRefreshCity( pbfPath, ...(bbox ? { bbox } : {}), }, - opts: JOB_OPTIONS["build-valhalla"], + opts: { ...JOB_OPTIONS["build-valhalla"], jobId: `build-valhalla.${citySlug}` }, children: [downloadNode()], }, // Transit Valhalla build — depends on GTFS download. Produces tiles with @@ -167,7 +182,7 @@ export async function handleRefreshCity( pbfPath, ...(bbox ? { bbox } : {}), }, - opts: JOB_OPTIONS["build-valhalla"], + opts: { ...JOB_OPTIONS["build-valhalla"], jobId: `build-valhalla-transit.${citySlug}` }, children: [ downloadNode(), // Download GTFS feed before building transit tiles. Idempotent — diff --git a/worker/src/valhalla.ts b/worker/src/valhalla.ts index 9ecc3c1..974fc07 100644 --- a/worker/src/valhalla.ts +++ b/worker/src/valhalla.ts @@ -109,7 +109,7 @@ const MATRIX_MAX_ATTEMPTS = 8; const MATRIX_RETRY_BASE_MS = 1_000; const MATRIX_RETRY_MAX_MS = 15_000; /** Per-request timeout — prevents hanging indefinitely if the service is down. */ -const MATRIX_TIMEOUT_MS = 30_000; +const MATRIX_TIMEOUT_MS = 60_000; /** * Call Valhalla's sources_to_targets matrix endpoint.