fix: do more in SQL, fix some worker behaviour and scheduling, add boris hb

This commit is contained in:
Jan-Henrik 2026-03-06 10:34:30 +01:00
parent a97ced96ce
commit 06a006691d
11 changed files with 297 additions and 277 deletions

View file

@ -165,7 +165,7 @@ export async function POST(req: NextRequest) {
geofabrikUrl,
resolutionM: resolutionM as number,
},
JOB_OPTIONS["refresh-city"],
{ ...JOB_OPTIONS["refresh-city"], jobId: `refresh-city.${slug}` },
);
// Invalidate city list cache

View file

@ -29,7 +29,7 @@ export async function POST(
const job = await queue.add(
"refresh-city",
{ type: "refresh-city", citySlug: slug, geofabrikUrl },
JOB_OPTIONS["refresh-city"],
{ ...JOB_OPTIONS["refresh-city"], jobId: `refresh-city.${slug}` },
);
return NextResponse.json({ citySlug: slug, jobId: job.id }, { status: 202 });

View file

@ -1,6 +1,6 @@
import { NextRequest } from "next/server";
import { Job } from "bullmq";
import { getPipelineQueue, getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue";
import { getPipelineQueue, getDownloadQueue, getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue";
import type { PipelineJobData, JobProgress, ComputeScoresJobData, RefreshCityJobData } from "@/lib/queue";
import type { SSEEvent, RoutingDetail } from "@transportationer/shared";
import { CATEGORY_IDS } from "@transportationer/shared";
@ -48,6 +48,7 @@ export async function GET(
});
}
const downloadQueue = getDownloadQueue();
const valhallaQueue = getValhallaQueue();
const valhallaTransitQueue = getValhallaTransitQueue();
@ -78,13 +79,15 @@ export async function GET(
}
// 1. Fetch active jobs and waiting-children jobs in parallel.
const [pipelineActive, valhallaActive, valhallaTransitActive, waitingChildren] = await Promise.all([
const [pipelineActive, downloadActive, valhallaActive, valhallaTransitActive, waitingChildren] = await Promise.all([
queue.getActive(0, 100),
downloadQueue.getActive(0, 100),
valhallaQueue.getActive(0, 100),
valhallaTransitQueue.getActive(0, 100),
queue.getWaitingChildren(0, 200),
]);
const allValhallaActive = [...valhallaActive, ...valhallaTransitActive];
void downloadActive; // visible in job overview; not used for SSE progress
// 1a. Parallel routing phase: compute-scores is waiting for its routing
// children to finish. Report aggregate progress instead of one job's pct.

View file

@ -1,37 +1,45 @@
import { NextResponse } from "next/server";
import { getPipelineQueue, getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue";
import { getPipelineQueue, getDownloadQueue, getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue";
import type { JobSummary } from "@transportationer/shared";
export const runtime = "nodejs";
export async function GET() {
const [pQueue, vQueue, vtQueue] = [getPipelineQueue(), getValhallaQueue(), getValhallaTransitQueue()];
const [pQueue, dQueue, vQueue, vtQueue] = [getPipelineQueue(), getDownloadQueue(), getValhallaQueue(), getValhallaTransitQueue()];
const [
pWaiting, pWaitingChildren, pActive, pCompleted, pFailed,
vWaiting, vActive, vCompleted, vFailed,
vtWaiting, vtActive, vtCompleted, vtFailed,
dWaiting, dWaitingChildren, dActive, dCompleted, dFailed,
vWaiting, vWaitingChildren, vActive, vCompleted, vFailed,
vtWaiting, vtWaitingChildren, vtActive, vtCompleted, vtFailed,
] = await Promise.all([
pQueue.getWaiting(0, 20),
pQueue.getWaitingChildren(0, 20),
pQueue.getActive(0, 20),
pQueue.getCompleted(0, 20),
pQueue.getFailed(0, 20),
dQueue.getWaiting(0, 20),
dQueue.getWaitingChildren(0, 20),
dQueue.getActive(0, 20),
dQueue.getCompleted(0, 20),
dQueue.getFailed(0, 20),
vQueue.getWaiting(0, 20),
vQueue.getWaitingChildren(0, 20),
vQueue.getActive(0, 20),
vQueue.getCompleted(0, 20),
vQueue.getFailed(0, 20),
vtQueue.getWaiting(0, 20),
vtQueue.getWaitingChildren(0, 20),
vtQueue.getActive(0, 20),
vtQueue.getCompleted(0, 20),
vtQueue.getFailed(0, 20),
]);
const waitingChildren = [...pWaitingChildren];
const waiting = [...pWaiting, ...vWaiting, ...vtWaiting];
const active = [...pActive, ...vActive, ...vtActive];
const completed = [...pCompleted, ...vCompleted, ...vtCompleted];
const failed = [...pFailed, ...vFailed, ...vtFailed];
const waitingChildren = [...pWaitingChildren, ...dWaitingChildren, ...vWaitingChildren, ...vtWaitingChildren];
const waiting = [...pWaiting, ...dWaiting, ...vWaiting, ...vtWaiting];
const active = [...pActive, ...dActive, ...vActive, ...vtActive];
const completed = [...pCompleted, ...dCompleted, ...vCompleted, ...vtCompleted];
const failed = [...pFailed, ...dFailed, ...vFailed, ...vtFailed];
const all = [...active, ...waitingChildren, ...waiting, ...completed, ...failed];

View file

@ -25,6 +25,9 @@ declare global {
var __pipelineQueue: Queue<any> | undefined;
// eslint-disable-next-line no-var
// eslint-disable-next-line @typescript-eslint/no-explicit-any
var __downloadQueue: Queue<any> | undefined;
// eslint-disable-next-line no-var
// eslint-disable-next-line @typescript-eslint/no-explicit-any
var __valhallaQueue: Queue<any> | undefined;
// eslint-disable-next-line no-var
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@ -46,6 +49,22 @@ export function getPipelineQueue(): Queue<any> {
return globalThis.__pipelineQueue;
}
/** Queue for download-pbf jobs — concurrency 1 in the worker prevents parallel downloads. */
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function getDownloadQueue(): Queue<any> {
if (!globalThis.__downloadQueue) {
globalThis.__downloadQueue = new Queue("download", {
connection: createBullMQConnection(),
defaultJobOptions: {
attempts: 2,
removeOnComplete: { age: 86400 * 7 },
removeOnFail: { age: 86400 * 30 },
},
});
}
return globalThis.__downloadQueue;
}
/** Queue for build-valhalla jobs, processed by the valhalla-worker container. */
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function getValhallaQueue(): Queue<any> {

View file

@ -32,6 +32,8 @@ export interface ComputeScoresJobData {
routingDispatched?: boolean;
/** When true, ingest-boris-ni is dispatched in Phase 1 to run alongside routing jobs. */
ingestBorisNi?: boolean;
/** When true, ingest-boris-hb is dispatched in Phase 1 to run alongside routing jobs. */
ingestBorisHb?: boolean;
}
export interface ComputeRoutingJobData {
@ -71,6 +73,11 @@ export interface IngestBorisNiJobData {
citySlug: string;
}
export interface IngestBorisHbJobData {
type: "ingest-boris-hb";
citySlug: string;
}
export interface DownloadGtfsDeJobData {
type: "download-gtfs-de";
url: string;
@ -94,6 +101,7 @@ export type PipelineJobData =
| BuildValhallaJobData
| RefreshCityJobData
| IngestBorisNiJobData
| IngestBorisHbJobData
| DownloadGtfsDeJobData
| ComputeTransitJobData;
@ -154,4 +162,10 @@ export const JOB_OPTIONS: Record<PipelineJobData["type"], object> = {
removeOnComplete: { age: 86400 * 7 },
removeOnFail: { age: 86400 * 30 },
},
"ingest-boris-hb": {
attempts: 2,
backoff: { type: "fixed", delay: 5000 },
removeOnComplete: { age: 86400 * 7 },
removeOnFail: { age: 86400 * 30 },
},
};

View file

@ -9,10 +9,40 @@ import { handleComputeScores } from "./jobs/compute-scores.js";
import { handleComputeRouting } from "./jobs/compute-routing.js";
import { handleRefreshCity } from "./jobs/refresh-city.js";
import { handleIngestBorisNi } from "./jobs/ingest-boris-ni.js";
import { handleIngestBorisHb } from "./jobs/ingest-boris-hb.js";
import { handleComputeTransit } from "./jobs/compute-transit.js";
console.log("[worker] Starting Transportationer pipeline worker…");
// Dedicated download worker — concurrency 1 ensures at most one PBF download
// runs at a time. The handler is idempotent (skips if file already on disk),
// so the three parallel download-pbf children from a single city refresh
// execute sequentially: the first downloads, the other two skip immediately.
const downloadWorker = new Worker<PipelineJobData>(
"download",
async (job: Job<PipelineJobData>) => {
if (job.data.type === "download-pbf") return handleDownloadPbf(job as Job<any>);
throw new Error(`Unexpected job type on download queue: ${(job.data as any).type}`);
},
{
connection: createBullMQConnection(),
concurrency: 1,
lockDuration: 300_000,
lockRenewTime: 15_000,
maxStalledCount: 3,
},
);
downloadWorker.on("completed", (job) =>
console.log(`[download-worker] ✓ Job ${job.id} (${job.data.type}) completed`),
);
downloadWorker.on("failed", (job, err) =>
console.error(`[download-worker] ✗ Job ${job?.id} failed:`, err.message),
);
downloadWorker.on("active", (job) =>
console.log(`[download-worker] → Job ${job.id} (${job.data.type}) started`),
);
const worker = new Worker<PipelineJobData>(
"pipeline",
async (job: Job<PipelineJobData>, token?: string) => {
@ -33,6 +63,8 @@ const worker = new Worker<PipelineJobData>(
return handleRefreshCity(job as Job<any>);
case "ingest-boris-ni":
return handleIngestBorisNi(job as Job<any>);
case "ingest-boris-hb":
return handleIngestBorisHb(job as Job<any>);
case "compute-transit":
return handleComputeTransit(job as Job<any>);
default:
@ -48,6 +80,7 @@ const worker = new Worker<PipelineJobData>(
concurrency: 8,
lockDuration: 300_000, // 5 minutes — download jobs can be slow
lockRenewTime: 15_000, // Renew every 15s
maxStalledCount: 3, // Allow up to 3 stalls before failing (large city jobs can be slow)
},
);
@ -85,7 +118,7 @@ worker.on("error", (err) => {
const shutdown = async () => {
console.log("[worker] Shutting down gracefully…");
await worker.close();
await Promise.all([worker.close(), downloadWorker.close()]);
process.exit(0);
};

View file

@ -13,7 +13,7 @@ export type ComputeRoutingData = {
/** Number of nearest POI candidates per grid point. */
const K = 6;
/** Grid points per Valhalla matrix call. */
const BATCH_SIZE = 20;
const BATCH_SIZE = 5;
/** Concurrent Valhalla calls within this job. */
const BATCH_CONCURRENCY = 4;
/** Rows per INSERT. */

View file

@ -6,37 +6,9 @@ import type { JobProgress, ComputeScoresJobData as ComputeScoresData } from "@tr
import {
CATEGORY_IDS,
PROFILES,
PROFILE_IDS,
DEFAULT_SUBCATEGORY_WEIGHT,
} from "@transportationer/shared";
const INSERT_CHUNK = 2000;
function subcategoryWeight(profileId: string, subcategory: string): number {
const weights = PROFILES[profileId as keyof typeof PROFILES]?.subcategoryWeights;
if (!weights) return DEFAULT_SUBCATEGORY_WEIGHT;
return weights[subcategory] ?? DEFAULT_SUBCATEGORY_WEIGHT;
}
function sigmoid(t_s: number, threshold_s: number): number {
return 1 / (1 + Math.exp(4 * (t_s - threshold_s) / threshold_s));
}
function complementProduct(
subcategoryTimes: Array<{ subcategory: string; timeS: number | null }>,
threshold_s: number,
profileId: string,
): number {
let logProd = 0;
let hasAny = false;
for (const { subcategory, timeS } of subcategoryTimes) {
const weight = subcategoryWeight(profileId, subcategory);
if (timeS === null || weight <= 0) continue;
hasAny = true;
logProd += Math.log(Math.max(1 - weight * sigmoid(timeS, threshold_s), 1e-10));
}
return hasAny ? 1 - Math.exp(logProd) : 0;
}
/**
* Two-phase orchestrator for accessibility score computation.
@ -91,8 +63,25 @@ export async function handleComputeScores(
// parallel with the routing jobs rather than sequentially after them.
const queue = new Queue("pipeline", { connection: createBullMQConnection() });
try {
// Dispatch transit scoring first so it starts immediately while the
// compute-routing jobs queue up behind it (transit is the slowest leg).
if (modes.includes("transit")) {
await queue.add(
"compute-transit",
{ type: "compute-transit", citySlug },
{
attempts: 1,
removeOnComplete: { age: 86400 * 7 },
removeOnFail: { age: 86400 * 30 },
jobId: `compute-transit.${citySlug}`,
parent: { id: job.id!, queue: queue.qualifiedName },
ignoreDependencyOnFailure: true,
},
);
}
for (const mode of modes) {
if (mode === "transit") continue; // handled below as a single job
if (mode === "transit") continue; // already dispatched above
for (const category of CATEGORY_IDS) {
await queue.add(
"compute-routing",
@ -102,10 +91,9 @@ export async function handleComputeScores(
backoff: { type: "fixed", delay: 3000 },
removeOnComplete: { age: 86400 * 7 },
removeOnFail: { age: 86400 * 30 },
jobId: `compute-routing.${citySlug}.${mode}.${category}`,
parent: {
id: job.id!,
// qualifiedName = "bull:pipeline" — the Redis key BullMQ uses
// to track parent/child relationships.
queue: queue.qualifiedName,
},
},
@ -113,24 +101,6 @@ export async function handleComputeScores(
}
}
// Dispatch transit scoring as a sibling child (one job covers all categories
// via PostGIS isochrone spatial joins, unlike per-category routing jobs).
if (modes.includes("transit")) {
await queue.add(
"compute-transit",
{ type: "compute-transit", citySlug },
{
attempts: 1,
removeOnComplete: { age: 86400 * 7 },
removeOnFail: { age: 86400 * 30 },
parent: { id: job.id!, queue: queue.qualifiedName },
// Transit is optional — a failure should not cascade to the parent.
// The city will be marked ready with walking/cycling scores only.
ignoreDependencyOnFailure: true,
},
);
}
// Dispatch BORIS NI ingest as a sibling child so it runs during routing.
if (job.data.ingestBorisNi) {
await queue.add(
@ -141,8 +111,25 @@ export async function handleComputeScores(
backoff: { type: "fixed", delay: 5000 },
removeOnComplete: { age: 86400 * 7 },
removeOnFail: { age: 86400 * 30 },
jobId: `ingest-boris-ni.${citySlug}`,
parent: { id: job.id!, queue: queue.qualifiedName },
ignoreDependencyOnFailure: true,
},
);
}
// Dispatch BORIS HB ingest (Stadt Bremen + Bremerhaven) similarly.
if (job.data.ingestBorisHb) {
await queue.add(
"ingest-boris-hb",
{ type: "ingest-boris-hb", citySlug },
{
attempts: 2,
backoff: { type: "fixed", delay: 5000 },
removeOnComplete: { age: 86400 * 7 },
removeOnFail: { age: 86400 * 30 },
jobId: `ingest-boris-hb.${citySlug}`,
parent: { id: job.id!, queue: queue.qualifiedName },
// Boris NI data is optional — failure should not cancel the pipeline.
ignoreDependencyOnFailure: true,
},
);
@ -162,197 +149,141 @@ export async function handleComputeScores(
}
// ── Phase 2: aggregate scores from grid_poi_details ──────────────────────
// All computation runs inside a single SQL CTE chain so the DBMS can
// optimise joins and aggregations without streaming data through Node.js.
await job.updateProgress({
stage: "Aggregating scores",
pct: 70,
message: `All routing complete — computing profile scores…`,
} satisfies JobProgress);
// Load all per-subcategory routing results for this city in one query.
// Ordered by distance so the first row per (gpId, mode, category) is nearest.
const detailRows = await Promise.resolve(sql<{
grid_point_id: string;
category: string;
subcategory: string;
travel_mode: string;
nearest_poi_id: string | null;
distance_m: number | null;
travel_time_s: number | null;
}[]>`
// Build parallel arrays for SQL unnest: one row per (profile, subcategory).
const pwProfiles: string[] = [];
const pwSubcats: string[] = [];
const pwWeights: number[] = [];
for (const [profileId, profile] of Object.entries(PROFILES)) {
for (const [subcategory, weight] of Object.entries(profile.subcategoryWeights)) {
pwProfiles.push(profileId);
pwSubcats.push(subcategory);
pwWeights.push(weight);
}
}
await Promise.resolve(sql`
WITH
base AS (
SELECT
gpd.grid_point_id::text,
gpd.grid_point_id,
gpd.category,
gpd.subcategory,
gpd.travel_mode,
gpd.nearest_poi_id::text,
gpd.nearest_poi_id,
gpd.distance_m,
gpd.travel_time_s
FROM grid_poi_details gpd
JOIN grid_points gp ON gp.id = gpd.grid_point_id
WHERE gp.city_slug = ${citySlug}
ORDER BY gpd.grid_point_id, gpd.travel_mode, gpd.category, gpd.distance_m
`);
// Build in-memory structure keyed by "gpId:mode:category".
type GroupEntry = {
gpId: string;
mode: string;
category: string;
subcategoryTimes: Array<{ subcategory: string; timeS: number | null }>;
nearestPoiId: string | null;
nearestDistM: number | null;
nearestTimeS: number | null;
};
const groups = new Map<string, GroupEntry>();
for (const row of detailRows) {
const key = `${row.grid_point_id}:${row.travel_mode}:${row.category}`;
let entry = groups.get(key);
if (!entry) {
entry = {
gpId: row.grid_point_id,
mode: row.travel_mode,
category: row.category,
subcategoryTimes: [],
nearestPoiId: null,
nearestDistM: null,
nearestTimeS: null,
};
groups.set(key, entry);
}
entry.subcategoryTimes.push({ subcategory: row.subcategory, timeS: row.travel_time_s });
// Track the overall nearest POI for this category (minimum distance).
if (
row.distance_m !== null &&
(entry.nearestDistM === null || row.distance_m < entry.nearestDistM)
) {
entry.nearestPoiId = row.nearest_poi_id;
entry.nearestDistM = row.distance_m;
entry.nearestTimeS = row.travel_time_s;
}
}
// Synthesize "multimodal" groups: for each (gpId, category, subcategory),
// take the minimum travel time across walking and cycling so that a
// destination reachable by either mode counts as accessible.
// Driving is intentionally excluded (not a 15-min city metric).
const MULTIMODAL_MODES = new Set(["walking", "cycling", "transit"]); // modes combined into "fifteen"
const mmAccumulator = new Map<string, {
gpId: string;
category: string;
subTimes: Map<string, number | null>;
nearestDistM: number | null;
nearestPoiId: string | null;
nearestTimeS: number | null;
}>();
for (const entry of groups.values()) {
if (!MULTIMODAL_MODES.has(entry.mode)) continue;
const mmKey = `${entry.gpId}:${entry.category}`;
if (!mmAccumulator.has(mmKey)) {
mmAccumulator.set(mmKey, {
gpId: entry.gpId,
category: entry.category,
subTimes: new Map(),
nearestDistM: null,
nearestPoiId: null,
nearestTimeS: null,
});
}
const acc = mmAccumulator.get(mmKey)!;
// Track nearest POI across all multimodal modes
if (entry.nearestDistM !== null && (acc.nearestDistM === null || entry.nearestDistM < acc.nearestDistM)) {
acc.nearestDistM = entry.nearestDistM;
acc.nearestPoiId = entry.nearestPoiId;
acc.nearestTimeS = entry.nearestTimeS;
}
// For each subcategory, keep the minimum travel time across modes
for (const { subcategory, timeS } of entry.subcategoryTimes) {
const existing = acc.subTimes.get(subcategory);
if (existing === undefined) {
acc.subTimes.set(subcategory, timeS);
} else if (existing === null && timeS !== null) {
acc.subTimes.set(subcategory, timeS);
} else if (timeS !== null && existing !== null && timeS < existing) {
acc.subTimes.set(subcategory, timeS);
}
}
}
for (const acc of mmAccumulator.values()) {
const key = `${acc.gpId}:fifteen:${acc.category}`;
groups.set(key, {
gpId: acc.gpId,
mode: "fifteen",
category: acc.category,
subcategoryTimes: Array.from(acc.subTimes.entries()).map(([subcategory, timeS]) => ({ subcategory, timeS })),
nearestPoiId: acc.nearestPoiId,
nearestDistM: acc.nearestDistM,
nearestTimeS: acc.nearestTimeS,
});
}
// Compute and insert scores for every threshold × profile combination.
// Each threshold writes to distinct rows (threshold_min is part of the PK),
// so all thresholds can be processed concurrently without conflicts.
// Node.js is single-threaded so completedThresholds++ is safe.
let completedThresholds = 0;
await Promise.all(thresholds.map(async (thresholdMin) => {
const threshold_s = thresholdMin * 60;
const gpIdArr: string[] = [];
const catArr: string[] = [];
const modeArr: string[] = [];
const profileArr: string[] = [];
const poiIdArr: (string | null)[] = [];
const distArr: (number | null)[] = [];
const timeArr: (number | null)[] = [];
const scoreArr: number[] = [];
for (const entry of groups.values()) {
for (const profileId of PROFILE_IDS) {
gpIdArr.push(entry.gpId);
catArr.push(entry.category);
modeArr.push(entry.mode);
profileArr.push(profileId);
poiIdArr.push(entry.nearestPoiId);
distArr.push(entry.nearestDistM);
timeArr.push(entry.nearestTimeS);
scoreArr.push(complementProduct(entry.subcategoryTimes, threshold_s, profileId));
}
}
// Chunks within a threshold stay sequential — with all thresholds running
// concurrently we already have up to thresholds.length parallel INSERT
// streams, which saturates the connection pool without overwhelming it.
for (let i = 0; i < gpIdArr.length; i += INSERT_CHUNK) {
const end = Math.min(i + INSERT_CHUNK, gpIdArr.length);
await Promise.resolve(sql`
),
fifteen_subcat AS (
-- "fifteen" mode: best (lowest) travel time across walking / cycling / transit
SELECT
grid_point_id, category, subcategory,
'fifteen'::text AS travel_mode,
MIN(travel_time_s) AS travel_time_s
FROM base
WHERE travel_mode IN ('walking', 'cycling', 'transit')
GROUP BY grid_point_id, category, subcategory
),
all_subcat AS (
SELECT grid_point_id, category, subcategory, travel_mode, travel_time_s FROM base
UNION ALL
SELECT grid_point_id, category, subcategory, travel_mode, travel_time_s FROM fifteen_subcat
),
road_nearest AS (
-- Nearest POI per (grid_point, category, mode) by distance
SELECT DISTINCT ON (grid_point_id, category, travel_mode)
grid_point_id, category, travel_mode, nearest_poi_id, distance_m, travel_time_s
FROM base
WHERE nearest_poi_id IS NOT NULL
ORDER BY grid_point_id, category, travel_mode, distance_m
),
fifteen_nearest AS (
-- Nearest POI for "fifteen": closest across walking / cycling / transit
SELECT DISTINCT ON (grid_point_id, category)
grid_point_id, category,
'fifteen'::text AS travel_mode,
nearest_poi_id, distance_m, travel_time_s
FROM base
WHERE travel_mode IN ('walking', 'cycling', 'transit')
AND nearest_poi_id IS NOT NULL
ORDER BY grid_point_id, category, distance_m
),
all_nearest AS (
SELECT * FROM road_nearest
UNION ALL
SELECT * FROM fifteen_nearest
),
profile_weights AS (
SELECT profile_id, subcategory, weight
FROM unnest(
${pwProfiles}::text[],
${pwSubcats}::text[],
${pwWeights}::float8[]
) AS t(profile_id, subcategory, weight)
),
thresholds_t AS (
SELECT unnest(${thresholds}::int[]) AS threshold_min
),
scores AS (
-- Complement-product score per (grid_point, category, mode, threshold, profile).
-- sigmoid(t) = 1 / (1 + exp((t T) / (T/6))) where T = threshold in seconds.
-- score = 1 (1 w·sigmoid) computed via EXP(SUM(LN(complement))).
-- NULL travel_time_s sigmoid = 0 complement = 1 LN(1) = 0 (no penalty).
SELECT
s.grid_point_id,
s.category,
s.travel_mode,
t.threshold_min,
p.profile_id,
1.0 - EXP(SUM(
LN(GREATEST(
1.0 - COALESCE(pw.weight, ${DEFAULT_SUBCATEGORY_WEIGHT}::float8)
* CASE
WHEN s.travel_time_s IS NULL THEN 0.0
ELSE 1.0 / (1.0 + EXP(
(s.travel_time_s - t.threshold_min * 60.0)
/ (t.threshold_min * 10.0)
))
END,
1e-10
))
)) AS score
FROM all_subcat s
JOIN thresholds_t t ON true
CROSS JOIN (SELECT DISTINCT profile_id FROM profile_weights) p
LEFT JOIN profile_weights pw
ON pw.profile_id = p.profile_id AND pw.subcategory = s.subcategory
GROUP BY s.grid_point_id, s.category, s.travel_mode, t.threshold_min, p.profile_id
)
INSERT INTO grid_scores (
grid_point_id, category, travel_mode, threshold_min, profile,
nearest_poi_id, distance_m, travel_time_s, score
)
SELECT
gp_id::bigint,
cat,
mode_val,
${thresholdMin}::int,
prof,
CASE WHEN poi_id IS NULL THEN NULL ELSE poi_id::bigint END,
dist,
time_s,
score_val
FROM unnest(
${gpIdArr.slice(i, end)}::text[],
${catArr.slice(i, end)}::text[],
${modeArr.slice(i, end)}::text[],
${profileArr.slice(i, end)}::text[],
${poiIdArr.slice(i, end)}::text[],
${distArr.slice(i, end)}::float8[],
${timeArr.slice(i, end)}::float8[],
${scoreArr.slice(i, end)}::float8[]
) AS t(gp_id, cat, mode_val, prof, poi_id, dist, time_s, score_val)
sc.grid_point_id,
sc.category,
sc.travel_mode,
sc.threshold_min,
sc.profile_id,
n.nearest_poi_id,
n.distance_m,
n.travel_time_s,
sc.score
FROM scores sc
LEFT JOIN all_nearest n
ON n.grid_point_id = sc.grid_point_id
AND n.category = sc.category
AND n.travel_mode = sc.travel_mode
ON CONFLICT (grid_point_id, category, travel_mode, threshold_min, profile)
DO UPDATE SET
nearest_poi_id = EXCLUDED.nearest_poi_id,
@ -361,15 +292,12 @@ export async function handleComputeScores(
score = EXCLUDED.score,
computed_at = now()
`);
}
completedThresholds++;
await job.updateProgress({
stage: "Aggregating scores",
pct: 70 + Math.round((completedThresholds / thresholds.length) * 28),
message: `${completedThresholds} / ${thresholds.length} thresholds done…`,
pct: 98,
message: `All scores computed for ${citySlug}`,
} satisfies JobProgress);
}));
await Promise.resolve(sql`
UPDATE cities SET status = 'ready', last_ingested = now()

View file

@ -21,6 +21,17 @@ function isInNiedersachsen(minLng: number, minLat: number, maxLng: number, maxLa
return minLng < 11.779 && maxLng > 6.526 && minLat < 54.033 && maxLat > 51.197;
}
/**
* True when the given bbox intersects the Freie Hansestadt Bremen (HB), which
* consists of two non-contiguous cities:
* - Stadt Bremen: ~52.9553.23°N, 8.488.99°E
* - Bremerhaven: ~53.4653.65°N, 8.488.68°E
* The WFS BBOX filter ensures each city only receives its own zones no overlap.
*/
function isInBremen(minLng: number, minLat: number, maxLng: number, maxLat: number): boolean {
return minLng < 8.99 && maxLng > 8.48 && minLat < 53.65 && maxLat > 52.95;
}
export async function handleRefreshCity(
job: Job<RefreshCityData>,
): Promise<void> {
@ -81,18 +92,21 @@ export async function handleRefreshCity(
UPDATE cities SET status = 'processing' WHERE slug = ${citySlug}
`);
// Shared download node factory — produces an idempotent download-pbf node.
// Two independent nodes with the same geofabrikUrl are safe: the idempotency
// check in download-pbf.ts skips the download if the file already exists.
// Shared download node factory — each call produces an independent download-pbf
// job (no fixed jobId). Three branches each get their own job so BullMQ can
// track each parent→child dependency correctly; the handler skips the actual
// download if the file already exists (atomic rename + idempotency check).
const downloadNode = () => ({
name: "download-pbf",
queueName: "pipeline",
queueName: "download",
data: { type: "download-pbf" as const, citySlug, geofabrikUrl },
opts: JOB_OPTIONS["download-pbf"],
opts: { ...JOB_OPTIONS["download-pbf"] },
});
// For NI cities: ingest-boris-ni is dispatched in Phase 1 of compute-scores.
const niApplicable = !!(bbox && isInNiedersachsen(...bbox));
// For HB cities (Stadt Bremen + Bremerhaven): ingest-boris-hb runs in Phase 1.
const hbApplicable = !!(bbox && isInBremen(...bbox));
// Parallel pipeline DAG (bottom-up — leaves execute first):
//
@ -119,17 +133,18 @@ export async function handleRefreshCity(
modes: ["walking", "cycling", "driving", "transit"] as const,
thresholds: [...VALID_THRESHOLDS],
ingestBorisNi: niApplicable,
ingestBorisHb: hbApplicable,
},
opts: JOB_OPTIONS["compute-scores"],
opts: { ...JOB_OPTIONS["compute-scores"], jobId: `compute-scores.${citySlug}` },
children: [
{
name: "generate-grid",
queueName: "pipeline",
data: { type: "generate-grid" as const, citySlug, resolutionM },
opts: JOB_OPTIONS["generate-grid"],
opts: { ...JOB_OPTIONS["generate-grid"], jobId: `generate-grid.${citySlug}` },
children: [
// Three parallel branches — each gets its own download-pbf child;
// the idempotency guard ensures only one actually downloads when they race.
// Three parallel branches — all share a single download-pbf job via
// the deterministic jobId; BullMQ deduplicates them to one download.
{
name: "extract-pois",
queueName: "pipeline",
@ -139,7 +154,7 @@ export async function handleRefreshCity(
pbfPath,
...(bbox ? { bbox } : {}),
},
opts: JOB_OPTIONS["extract-pois"],
opts: { ...JOB_OPTIONS["extract-pois"], jobId: `extract-pois.${citySlug}` },
children: [downloadNode()],
},
// Road-only Valhalla build — no GTFS, produces clean tiles without
@ -153,7 +168,7 @@ export async function handleRefreshCity(
pbfPath,
...(bbox ? { bbox } : {}),
},
opts: JOB_OPTIONS["build-valhalla"],
opts: { ...JOB_OPTIONS["build-valhalla"], jobId: `build-valhalla.${citySlug}` },
children: [downloadNode()],
},
// Transit Valhalla build — depends on GTFS download. Produces tiles with
@ -167,7 +182,7 @@ export async function handleRefreshCity(
pbfPath,
...(bbox ? { bbox } : {}),
},
opts: JOB_OPTIONS["build-valhalla"],
opts: { ...JOB_OPTIONS["build-valhalla"], jobId: `build-valhalla-transit.${citySlug}` },
children: [
downloadNode(),
// Download GTFS feed before building transit tiles. Idempotent —

View file

@ -109,7 +109,7 @@ const MATRIX_MAX_ATTEMPTS = 8;
const MATRIX_RETRY_BASE_MS = 1_000;
const MATRIX_RETRY_MAX_MS = 15_000;
/** Per-request timeout — prevents hanging indefinitely if the service is down. */
const MATRIX_TIMEOUT_MS = 30_000;
const MATRIX_TIMEOUT_MS = 60_000;
/**
* Call Valhalla's sources_to_targets matrix endpoint.