fifteen/worker/src/index.ts

75 lines
2.7 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Worker, type Job } from "bullmq";
import { createBullMQConnection } from "./redis.js";
import type { PipelineJobData } from "@transportationer/shared";
import { handleDownloadPbf } from "./jobs/download-pbf.js";
import { handleExtractPois } from "./jobs/extract-pois.js";
import { handleGenerateGrid } from "./jobs/generate-grid.js";
import { handleComputeScores } from "./jobs/compute-scores.js";
import { handleComputeRouting } from "./jobs/compute-routing.js";
import { handleRefreshCity } from "./jobs/refresh-city.js";
import { handleIngestBorisNi } from "./jobs/ingest-boris-ni.js";
console.log("[worker] Starting Transportationer pipeline worker…");
const worker = new Worker<PipelineJobData>(
"pipeline",
async (job: Job<PipelineJobData>, token?: string) => {
console.log(`[worker] Processing job ${job.id} type=${job.data.type} city=${job.data.citySlug}`);
switch (job.data.type) {
case "download-pbf":
return handleDownloadPbf(job as Job<any>);
case "extract-pois":
return handleExtractPois(job as Job<any>);
case "generate-grid":
return handleGenerateGrid(job as Job<any>);
case "compute-scores":
return handleComputeScores(job as Job<any>, token);
case "compute-routing":
return handleComputeRouting(job as Job<any>);
case "refresh-city":
return handleRefreshCity(job as Job<any>);
case "ingest-boris-ni":
return handleIngestBorisNi(job as Job<any>);
default:
throw new Error(`Unknown job type: ${(job.data as any).type}`);
}
},
{
connection: createBullMQConnection(),
// Higher concurrency lets multiple compute-routing jobs run in parallel.
// The 15 routing jobs per city (3 modes × 5 categories) will fill these
// slots; sequential jobs like download-pbf/extract-pois still run one at a time
// because they're gated by the FlowProducer dependency chain.
concurrency: 8,
lockDuration: 300_000, // 5 minutes — download jobs can be slow
lockRenewTime: 15_000, // Renew every 15s
},
);
worker.on("completed", (job) => {
console.log(`[worker] ✓ Job ${job.id} (${job.data.type}) completed`);
});
worker.on("failed", (job, err) => {
console.error(`[worker] ✗ Job ${job?.id} (${job?.data?.type}) failed:`, err.message);
});
worker.on("active", (job) => {
console.log(`[worker] → Job ${job.id} (${job.data.type}) started`);
});
worker.on("error", (err) => {
console.error("[worker] Worker error:", err.message);
});
const shutdown = async () => {
console.log("[worker] Shutting down gracefully…");
await worker.close();
process.exit(0);
};
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
console.log("[worker] Ready — waiting for jobs (concurrency=8)");