import { Queue, QueueEvents } from "bullmq"; import { createBullMQConnection } from "./redis"; // Re-export shared job types so web code can import from one place export type { PipelineJobData, DownloadPbfJobData, ExtractPoisJobData, GenerateGridJobData, ComputeScoresJobData, BuildValhallaJobData, RefreshCityJobData, JobProgress, } from "@transportationer/shared"; export { JOB_OPTIONS } from "@transportationer/shared"; import type { PipelineJobData } from "@transportationer/shared"; // ─── Queue singleton ────────────────────────────────────────────────────────── declare global { // eslint-disable-next-line no-var // eslint-disable-next-line @typescript-eslint/no-explicit-any var __pipelineQueue: Queue | undefined; // eslint-disable-next-line no-var // eslint-disable-next-line @typescript-eslint/no-explicit-any var __downloadQueue: Queue | undefined; // eslint-disable-next-line no-var // eslint-disable-next-line @typescript-eslint/no-explicit-any var __valhallaQueue: Queue | undefined; // eslint-disable-next-line no-var // eslint-disable-next-line @typescript-eslint/no-explicit-any var __valhallaTransitQueue: Queue | undefined; // eslint-disable-next-line no-var // eslint-disable-next-line @typescript-eslint/no-explicit-any var __routingQueue: Queue | undefined; // eslint-disable-next-line no-var var __routingQueueEvents: QueueEvents | undefined; // eslint-disable-next-line no-var // eslint-disable-next-line @typescript-eslint/no-explicit-any var __routingTransitQueue: Queue | undefined; // eslint-disable-next-line no-var var __routingTransitQueueEvents: QueueEvents | undefined; } // eslint-disable-next-line @typescript-eslint/no-explicit-any export function getPipelineQueue(): Queue { if (!globalThis.__pipelineQueue) { globalThis.__pipelineQueue = new Queue("pipeline", { connection: createBullMQConnection(), defaultJobOptions: { attempts: 1, removeOnComplete: { age: 86400 * 7 }, removeOnFail: { age: 86400 * 30 }, }, }); } return globalThis.__pipelineQueue; } /** Queue for download-pbf jobs — concurrency 1 in the worker prevents parallel downloads. */ // eslint-disable-next-line @typescript-eslint/no-explicit-any export function getDownloadQueue(): Queue { if (!globalThis.__downloadQueue) { globalThis.__downloadQueue = new Queue("download", { connection: createBullMQConnection(), defaultJobOptions: { attempts: 2, removeOnComplete: { age: 86400 * 7 }, removeOnFail: { age: 86400 * 30 }, }, }); } return globalThis.__downloadQueue; } /** Queue for build-valhalla jobs, processed by the valhalla-worker container. */ // eslint-disable-next-line @typescript-eslint/no-explicit-any export function getValhallaQueue(): Queue { if (!globalThis.__valhallaQueue) { globalThis.__valhallaQueue = new Queue("valhalla", { connection: createBullMQConnection(), defaultJobOptions: { attempts: 1, removeOnComplete: { age: 86400 * 7 }, removeOnFail: { age: 86400 * 30 }, }, }); } return globalThis.__valhallaQueue; } /** Queue for build-valhalla transit jobs, processed by the valhalla-transit container. */ // eslint-disable-next-line @typescript-eslint/no-explicit-any export function getValhallaTransitQueue(): Queue { if (!globalThis.__valhallaTransitQueue) { globalThis.__valhallaTransitQueue = new Queue("valhalla-transit", { connection: createBullMQConnection(), defaultJobOptions: { attempts: 1, removeOnComplete: { age: 86400 * 7 }, removeOnFail: { age: 86400 * 30 }, }, }); } return globalThis.__valhallaTransitQueue; } export function createQueueEvents(): QueueEvents { return new QueueEvents("pipeline", { connection: createBullMQConnection(), }); } /** On-demand isochrone queue for road modes (walking/cycling/driving). */ // eslint-disable-next-line @typescript-eslint/no-explicit-any export function getRoutingQueue(): Queue { if (!globalThis.__routingQueue) { globalThis.__routingQueue = new Queue("routing", { connection: createBullMQConnection(), defaultJobOptions: { attempts: 1, removeOnComplete: { age: 120 }, removeOnFail: { age: 300 }, }, }); } return globalThis.__routingQueue; } export function getRoutingQueueEvents(): QueueEvents { if (!globalThis.__routingQueueEvents) { globalThis.__routingQueueEvents = new QueueEvents("routing", { connection: createBullMQConnection(), }); } return globalThis.__routingQueueEvents; } /** On-demand isochrone queue for transit mode. */ // eslint-disable-next-line @typescript-eslint/no-explicit-any export function getRoutingTransitQueue(): Queue { if (!globalThis.__routingTransitQueue) { globalThis.__routingTransitQueue = new Queue("routing-transit", { connection: createBullMQConnection(), defaultJobOptions: { attempts: 1, removeOnComplete: { age: 120 }, removeOnFail: { age: 300 }, }, }); } return globalThis.__routingTransitQueue; } export function getRoutingTransitQueueEvents(): QueueEvents { if (!globalThis.__routingTransitQueueEvents) { globalThis.__routingTransitQueueEvents = new QueueEvents("routing-transit", { connection: createBullMQConnection(), }); } return globalThis.__routingTransitQueueEvents; }