128 lines
4.9 KiB
TypeScript
128 lines
4.9 KiB
TypeScript
import { Worker, type Job } from "bullmq";
|
||
import { createBullMQConnection } from "./redis.js";
|
||
import { getSql } from "./db.js";
|
||
import type { PipelineJobData } from "@transportationer/shared";
|
||
import { handleDownloadPbf } from "./jobs/download-pbf.js";
|
||
import { handleExtractPois } from "./jobs/extract-pois.js";
|
||
import { handleGenerateGrid } from "./jobs/generate-grid.js";
|
||
import { handleComputeScores } from "./jobs/compute-scores.js";
|
||
import { handleComputeRouting } from "./jobs/compute-routing.js";
|
||
import { handleRefreshCity } from "./jobs/refresh-city.js";
|
||
import { handleIngestBorisNi } from "./jobs/ingest-boris-ni.js";
|
||
import { handleIngestBorisHb } from "./jobs/ingest-boris-hb.js";
|
||
import { handleComputeTransit } from "./jobs/compute-transit.js";
|
||
|
||
console.log("[worker] Starting Transportationer pipeline worker…");
|
||
|
||
// Dedicated download worker — concurrency 1 ensures at most one PBF download
|
||
// runs at a time. The handler is idempotent (skips if file already on disk),
|
||
// so the three parallel download-pbf children from a single city refresh
|
||
// execute sequentially: the first downloads, the other two skip immediately.
|
||
const downloadWorker = new Worker<PipelineJobData>(
|
||
"download",
|
||
async (job: Job<PipelineJobData>) => {
|
||
if (job.data.type === "download-pbf") return handleDownloadPbf(job as Job<any>);
|
||
throw new Error(`Unexpected job type on download queue: ${(job.data as any).type}`);
|
||
},
|
||
{
|
||
connection: createBullMQConnection(),
|
||
concurrency: 1,
|
||
lockDuration: 300_000,
|
||
lockRenewTime: 15_000,
|
||
maxStalledCount: 3,
|
||
},
|
||
);
|
||
|
||
downloadWorker.on("completed", (job) =>
|
||
console.log(`[download-worker] ✓ Job ${job.id} (${job.data.type}) completed`),
|
||
);
|
||
downloadWorker.on("failed", (job, err) =>
|
||
console.error(`[download-worker] ✗ Job ${job?.id} failed:`, err.message),
|
||
);
|
||
downloadWorker.on("active", (job) =>
|
||
console.log(`[download-worker] → Job ${job.id} (${job.data.type}) started`),
|
||
);
|
||
|
||
const worker = new Worker<PipelineJobData>(
|
||
"pipeline",
|
||
async (job: Job<PipelineJobData>, token?: string) => {
|
||
console.log(`[worker] Processing job ${job.id} type=${job.data.type} city=${"citySlug" in job.data ? job.data.citySlug : "n/a"}`);
|
||
|
||
switch (job.data.type) {
|
||
case "download-pbf":
|
||
return handleDownloadPbf(job as Job<any>);
|
||
case "extract-pois":
|
||
return handleExtractPois(job as Job<any>);
|
||
case "generate-grid":
|
||
return handleGenerateGrid(job as Job<any>);
|
||
case "compute-scores":
|
||
return handleComputeScores(job as Job<any>, token);
|
||
case "compute-routing":
|
||
return handleComputeRouting(job as Job<any>);
|
||
case "refresh-city":
|
||
return handleRefreshCity(job as Job<any>);
|
||
case "ingest-boris-ni":
|
||
return handleIngestBorisNi(job as Job<any>);
|
||
case "ingest-boris-hb":
|
||
return handleIngestBorisHb(job as Job<any>);
|
||
case "compute-transit":
|
||
return handleComputeTransit(job as Job<any>);
|
||
default:
|
||
throw new Error(`Unknown job type: ${(job.data as any).type}`);
|
||
}
|
||
},
|
||
{
|
||
connection: createBullMQConnection(),
|
||
// Higher concurrency lets multiple compute-routing jobs run in parallel.
|
||
// The 15 routing jobs per city (3 modes × 5 categories) will fill these
|
||
// slots; sequential jobs like download-pbf/extract-pois still run one at a time
|
||
// because they're gated by the FlowProducer dependency chain.
|
||
concurrency: 8,
|
||
lockDuration: 300_000, // 5 minutes — download jobs can be slow
|
||
lockRenewTime: 15_000, // Renew every 15s
|
||
maxStalledCount: 3, // Allow up to 3 stalls before failing (large city jobs can be slow)
|
||
},
|
||
);
|
||
|
||
worker.on("completed", (job) => {
|
||
console.log(`[worker] ✓ Job ${job.id} (${job.data.type}) completed`);
|
||
});
|
||
|
||
worker.on("failed", async (job, err) => {
|
||
console.error(`[worker] ✗ Job ${job?.id} (${job?.data?.type}) failed:`, err.message);
|
||
// On final failure (all retries exhausted), mark the city as errored so
|
||
// the SSE stream and admin UI can reflect the true state immediately.
|
||
if (job?.data && "citySlug" in job.data && job.data.citySlug) {
|
||
const attemptsExhausted = job.attemptsMade >= (job.opts.attempts ?? 1);
|
||
if (attemptsExhausted) {
|
||
try {
|
||
const sql = getSql();
|
||
await sql`
|
||
UPDATE cities SET status = 'error', error_message = ${err.message.slice(0, 500)}
|
||
WHERE slug = ${job.data.citySlug}
|
||
`;
|
||
} catch (e) {
|
||
console.error("[worker] Failed to update city error status:", e);
|
||
}
|
||
}
|
||
}
|
||
});
|
||
|
||
worker.on("active", (job) => {
|
||
console.log(`[worker] → Job ${job.id} (${job.data.type}) started`);
|
||
});
|
||
|
||
worker.on("error", (err) => {
|
||
console.error("[worker] Worker error:", err.message);
|
||
});
|
||
|
||
const shutdown = async () => {
|
||
console.log("[worker] Shutting down gracefully…");
|
||
await Promise.all([worker.close(), downloadWorker.close()]);
|
||
process.exit(0);
|
||
};
|
||
|
||
process.on("SIGTERM", shutdown);
|
||
process.on("SIGINT", shutdown);
|
||
|
||
console.log("[worker] Ready — waiting for jobs (concurrency=8)");
|