import type { Job } from "bullmq"; import { Queue, WaitingChildrenError } from "bullmq"; import { getSql } from "../db.js"; import { createBullMQConnection } from "../redis.js"; import type { JobProgress, ComputeScoresJobData as ComputeScoresData } from "@transportationer/shared"; import { CATEGORY_IDS, PROFILES, PROFILE_IDS, DEFAULT_SUBCATEGORY_WEIGHT, } from "@transportationer/shared"; const INSERT_CHUNK = 2000; function subcategoryWeight(profileId: string, subcategory: string): number { const weights = PROFILES[profileId as keyof typeof PROFILES]?.subcategoryWeights; if (!weights) return DEFAULT_SUBCATEGORY_WEIGHT; return weights[subcategory] ?? DEFAULT_SUBCATEGORY_WEIGHT; } function sigmoid(t_s: number, threshold_s: number): number { return 1 / (1 + Math.exp(4 * (t_s - threshold_s) / threshold_s)); } function complementProduct( subcategoryTimes: Array<{ subcategory: string; timeS: number | null }>, threshold_s: number, profileId: string, ): number { let logProd = 0; let hasAny = false; for (const { subcategory, timeS } of subcategoryTimes) { const weight = subcategoryWeight(profileId, subcategory); if (timeS === null || weight <= 0) continue; hasAny = true; logProd += Math.log(Math.max(1 - weight * sigmoid(timeS, threshold_s), 1e-10)); } return hasAny ? 1 - Math.exp(logProd) : 0; } /** * Two-phase orchestrator for accessibility score computation. * * Phase 1 (first activation, after generate-grid completes): * – Clears stale data. * – Enqueues one `compute-routing` child job per (mode × category) pair. * – Suspends itself via moveToWaitingChildren; BullMQ re-queues it when * all routing children finish. * * Phase 2 (re-activation after all routing children complete): * – Reads grid_poi_details (populated by the routing jobs). * – Computes weighted complement-product scores for every * (grid_point × mode × category × threshold × profile) combination. * – Bulk-inserts into grid_scores and marks the city ready. */ export async function handleComputeScores( job: Job, token?: string, ): Promise { const { citySlug, modes, thresholds } = job.data; const sql = getSql(); // ── Phase 1: dispatch compute-routing children ──────────────────────────── if (!job.data.routingDispatched) { const totalRoutingJobs = modes.length * CATEGORY_IDS.length; await job.updateProgress({ stage: "Computing scores", pct: 2, message: `Dispatching ${totalRoutingJobs} routing jobs for ${citySlug}…`, } satisfies JobProgress); // Clear any stale scores from a previous run. await Promise.resolve(sql` DELETE FROM grid_scores USING grid_points gp WHERE grid_scores.grid_point_id = gp.id AND gp.city_slug = ${citySlug} `); await Promise.resolve(sql` DELETE FROM grid_poi_details USING grid_points gp WHERE grid_poi_details.grid_point_id = gp.id AND gp.city_slug = ${citySlug} `); // Enqueue one routing child per (mode, category). Each child registers // itself to this parent job via opts.parent, so BullMQ tracks completion. // Transit is handled by a single compute-transit job (not per-category) // since it uses isochrones rather than the matrix API. // For NI cities, ingest-boris-ni is also enqueued here so it runs in // parallel with the routing jobs rather than sequentially after them. const queue = new Queue("pipeline", { connection: createBullMQConnection() }); try { for (const mode of modes) { if (mode === "transit") continue; // handled below as a single job for (const category of CATEGORY_IDS) { await queue.add( "compute-routing", { type: "compute-routing", citySlug, mode, category }, { attempts: 2, backoff: { type: "fixed", delay: 3000 }, removeOnComplete: { age: 86400 * 7 }, removeOnFail: { age: 86400 * 30 }, parent: { id: job.id!, // qualifiedName = "bull:pipeline" — the Redis key BullMQ uses // to track parent/child relationships. queue: queue.qualifiedName, }, }, ); } } // Dispatch transit scoring as a sibling child (one job covers all categories // via PostGIS isochrone spatial joins, unlike per-category routing jobs). if (modes.includes("transit")) { await queue.add( "compute-transit", { type: "compute-transit", citySlug }, { attempts: 1, removeOnComplete: { age: 86400 * 7 }, removeOnFail: { age: 86400 * 30 }, parent: { id: job.id!, queue: queue.qualifiedName }, }, ); } // Dispatch BORIS NI ingest as a sibling child so it runs during routing. if (job.data.ingestBorisNi) { await queue.add( "ingest-boris-ni", { type: "ingest-boris-ni", citySlug }, { attempts: 2, backoff: { type: "fixed", delay: 5000 }, removeOnComplete: { age: 86400 * 7 }, removeOnFail: { age: 86400 * 30 }, parent: { id: job.id!, queue: queue.qualifiedName }, }, ); } } finally { await queue.close(); } // Persist the dispatched flag so phase 2 is triggered on re-activation. await job.updateData({ ...job.data, routingDispatched: true }); // Suspend until all routing children complete. // Throwing WaitingChildrenError tells the worker not to mark the job // completed — BullMQ will re-activate it once all children finish. await job.moveToWaitingChildren(token!); throw new WaitingChildrenError(); } // ── Phase 2: aggregate scores from grid_poi_details ────────────────────── await job.updateProgress({ stage: "Computing scores", pct: 70, message: `All routing complete — computing profile scores…`, } satisfies JobProgress); // Load all per-subcategory routing results for this city in one query. // Ordered by distance so the first row per (gpId, mode, category) is nearest. const detailRows = await Promise.resolve(sql<{ grid_point_id: string; category: string; subcategory: string; travel_mode: string; nearest_poi_id: string | null; distance_m: number | null; travel_time_s: number | null; }[]>` SELECT gpd.grid_point_id::text, gpd.category, gpd.subcategory, gpd.travel_mode, gpd.nearest_poi_id::text, gpd.distance_m, gpd.travel_time_s FROM grid_poi_details gpd JOIN grid_points gp ON gp.id = gpd.grid_point_id WHERE gp.city_slug = ${citySlug} ORDER BY gpd.grid_point_id, gpd.travel_mode, gpd.category, gpd.distance_m `); // Build in-memory structure keyed by "gpId:mode:category". type GroupEntry = { gpId: string; mode: string; category: string; subcategoryTimes: Array<{ subcategory: string; timeS: number | null }>; nearestPoiId: string | null; nearestDistM: number | null; nearestTimeS: number | null; }; const groups = new Map(); for (const row of detailRows) { const key = `${row.grid_point_id}:${row.travel_mode}:${row.category}`; let entry = groups.get(key); if (!entry) { entry = { gpId: row.grid_point_id, mode: row.travel_mode, category: row.category, subcategoryTimes: [], nearestPoiId: null, nearestDistM: null, nearestTimeS: null, }; groups.set(key, entry); } entry.subcategoryTimes.push({ subcategory: row.subcategory, timeS: row.travel_time_s }); // Track the overall nearest POI for this category (minimum distance). if ( row.distance_m !== null && (entry.nearestDistM === null || row.distance_m < entry.nearestDistM) ) { entry.nearestPoiId = row.nearest_poi_id; entry.nearestDistM = row.distance_m; entry.nearestTimeS = row.travel_time_s; } } // Synthesize "multimodal" groups: for each (gpId, category, subcategory), // take the minimum travel time across walking and cycling so that a // destination reachable by either mode counts as accessible. // Driving is intentionally excluded (not a 15-min city metric). const MULTIMODAL_MODES = new Set(["walking", "cycling", "transit"]); // modes combined into "fifteen" const mmAccumulator = new Map; nearestDistM: number | null; nearestPoiId: string | null; nearestTimeS: number | null; }>(); for (const entry of groups.values()) { if (!MULTIMODAL_MODES.has(entry.mode)) continue; const mmKey = `${entry.gpId}:${entry.category}`; if (!mmAccumulator.has(mmKey)) { mmAccumulator.set(mmKey, { gpId: entry.gpId, category: entry.category, subTimes: new Map(), nearestDistM: null, nearestPoiId: null, nearestTimeS: null, }); } const acc = mmAccumulator.get(mmKey)!; // Track nearest POI across all multimodal modes if (entry.nearestDistM !== null && (acc.nearestDistM === null || entry.nearestDistM < acc.nearestDistM)) { acc.nearestDistM = entry.nearestDistM; acc.nearestPoiId = entry.nearestPoiId; acc.nearestTimeS = entry.nearestTimeS; } // For each subcategory, keep the minimum travel time across modes for (const { subcategory, timeS } of entry.subcategoryTimes) { const existing = acc.subTimes.get(subcategory); if (existing === undefined) { acc.subTimes.set(subcategory, timeS); } else if (existing === null && timeS !== null) { acc.subTimes.set(subcategory, timeS); } else if (timeS !== null && existing !== null && timeS < existing) { acc.subTimes.set(subcategory, timeS); } } } for (const acc of mmAccumulator.values()) { const key = `${acc.gpId}:fifteen:${acc.category}`; groups.set(key, { gpId: acc.gpId, mode: "fifteen", category: acc.category, subcategoryTimes: Array.from(acc.subTimes.entries()).map(([subcategory, timeS]) => ({ subcategory, timeS })), nearestPoiId: acc.nearestPoiId, nearestDistM: acc.nearestDistM, nearestTimeS: acc.nearestTimeS, }); } // Compute and insert scores for every threshold × profile combination. // Each threshold writes to distinct rows (threshold_min is part of the PK), // so all thresholds can be processed concurrently without conflicts. // Node.js is single-threaded so completedThresholds++ is safe. let completedThresholds = 0; await Promise.all(thresholds.map(async (thresholdMin) => { const threshold_s = thresholdMin * 60; const gpIdArr: string[] = []; const catArr: string[] = []; const modeArr: string[] = []; const profileArr: string[] = []; const poiIdArr: (string | null)[] = []; const distArr: (number | null)[] = []; const timeArr: (number | null)[] = []; const scoreArr: number[] = []; for (const entry of groups.values()) { for (const profileId of PROFILE_IDS) { gpIdArr.push(entry.gpId); catArr.push(entry.category); modeArr.push(entry.mode); profileArr.push(profileId); poiIdArr.push(entry.nearestPoiId); distArr.push(entry.nearestDistM); timeArr.push(entry.nearestTimeS); scoreArr.push(complementProduct(entry.subcategoryTimes, threshold_s, profileId)); } } // Chunks within a threshold stay sequential — with all thresholds running // concurrently we already have up to thresholds.length parallel INSERT // streams, which saturates the connection pool without overwhelming it. for (let i = 0; i < gpIdArr.length; i += INSERT_CHUNK) { const end = Math.min(i + INSERT_CHUNK, gpIdArr.length); await Promise.resolve(sql` INSERT INTO grid_scores ( grid_point_id, category, travel_mode, threshold_min, profile, nearest_poi_id, distance_m, travel_time_s, score ) SELECT gp_id::bigint, cat, mode_val, ${thresholdMin}::int, prof, CASE WHEN poi_id IS NULL THEN NULL ELSE poi_id::bigint END, dist, time_s, score_val FROM unnest( ${gpIdArr.slice(i, end)}::text[], ${catArr.slice(i, end)}::text[], ${modeArr.slice(i, end)}::text[], ${profileArr.slice(i, end)}::text[], ${poiIdArr.slice(i, end)}::text[], ${distArr.slice(i, end)}::float8[], ${timeArr.slice(i, end)}::float8[], ${scoreArr.slice(i, end)}::float8[] ) AS t(gp_id, cat, mode_val, prof, poi_id, dist, time_s, score_val) ON CONFLICT (grid_point_id, category, travel_mode, threshold_min, profile) DO UPDATE SET nearest_poi_id = EXCLUDED.nearest_poi_id, distance_m = EXCLUDED.distance_m, travel_time_s = EXCLUDED.travel_time_s, score = EXCLUDED.score, computed_at = now() `); } completedThresholds++; await job.updateProgress({ stage: "Computing scores", pct: 70 + Math.round((completedThresholds / thresholds.length) * 28), message: `${completedThresholds} / ${thresholds.length} thresholds done…`, } satisfies JobProgress); })); await Promise.resolve(sql` UPDATE cities SET status = 'ready', last_ingested = now() WHERE slug = ${citySlug} `); // Compute hidden gem scores per grid point for cities that have estate value zones. // Each grid point looks up the nearest zone's price, ranks it within its accessibility // decile, and stores hidden_gem_score = composite_accessibility × (1 − price_rank). const gemThreshold = thresholds.includes(15) ? 15 : thresholds[0]; // Count only the latest year's zones so historical rows don't skew the check. const [{ n }] = await Promise.resolve(sql<{ n: number }[]>` SELECT count(*)::int AS n FROM estate_value_zones ez WHERE ez.city_slug = ${citySlug} AND ez.value_eur_m2 IS NOT NULL AND (ez.year IS NULL OR ez.year = ( SELECT MAX(year) FROM estate_value_zones WHERE city_slug = ${citySlug} AND source = 'boris-ni' AND year IS NOT NULL )) `); if (n > 0) { await job.updateProgress({ stage: "Computing scores", pct: 99, message: "Computing hidden gem scores…", } satisfies JobProgress); await Promise.resolve(sql` WITH latest_year AS ( SELECT MAX(year) AS yr FROM estate_value_zones WHERE city_slug = ${citySlug} AND source = 'boris-ni' ), grid_with_price AS ( -- For each grid point, get composite accessibility score and nearest latest-year zone price SELECT gp.id, COALESCE(AVG(gs.score), 0) AS composite_score, ROUND(COALESCE(AVG(gs.score), 0) * 10)::int AS score_decile, ( SELECT ez.value_eur_m2 FROM estate_value_zones ez, latest_year WHERE ez.city_slug = ${citySlug} AND ez.value_eur_m2 IS NOT NULL AND (ez.year IS NULL OR ez.year = latest_year.yr) ORDER BY gp.geom <-> ez.geom LIMIT 1 ) AS value_eur_m2 FROM grid_points gp JOIN grid_scores gs ON gs.grid_point_id = gp.id WHERE gp.city_slug = ${citySlug} AND gs.travel_mode = 'walking' AND gs.threshold_min = ${gemThreshold} AND gs.profile = 'universal' GROUP BY gp.id ), ranked AS ( SELECT id, composite_score, PERCENT_RANK() OVER (PARTITION BY score_decile ORDER BY value_eur_m2) AS price_rank FROM grid_with_price WHERE value_eur_m2 IS NOT NULL ) UPDATE grid_points gp SET hidden_gem_score = (ranked.composite_score * (1.0 - ranked.price_rank))::float4 FROM ranked WHERE gp.id = ranked.id `); } await job.updateProgress({ stage: "Computing scores", pct: 100, message: `All scores computed for ${citySlug}`, } satisfies JobProgress); }