"use client"; import { useEffect, useReducer, useRef } from "react"; import type { SSEEvent, RoutingDetail } from "@transportationer/shared"; export type { RoutingDetail }; // ─── Stage specification ────────────────────────────────────────────────────── interface StageSpec { key: string; label: string; /** Stages sharing the same parallelGroup run concurrently. */ parallelGroup?: string; } export const STAGE_SPECS: StageSpec[] = [ { key: "Downloading PBF", label: "Download OSM & GTFS data" }, { key: "Extract POIs", label: "Extract & import POIs", parallelGroup: "osm" }, { key: "Building routing graph",label: "Build routing graph", parallelGroup: "osm" }, { key: "Generating grid", label: "Generate analysis grid" }, { key: "Computing scores", label: "Compute accessibility" }, { key: "Aggregating scores", label: "Aggregate scores" }, ]; export interface StageStatus { key: string; label: string; status: "pending" | "active" | "completed" | "failed"; pct: number; message: string; parallelGroup?: string; } /** * Maps raw worker stage strings → UI stage keys. * Returns null for stages that should be silently ignored * (e.g. the brief "Orchestrating pipeline" from refresh-city). */ function normalizeStage(raw: string): string | null { if (raw === "Downloading GTFS") return "Downloading PBF"; if ( raw === "Clipping to bounding box" || raw === "Filtering OSM tags" || raw === "Importing to PostGIS" ) return "Extract POIs"; // "Building routing graph" → direct key match // "Generating grid" → direct key match if ( raw.startsWith("Routing ") || raw === "Transit routing" || raw === "Ingesting BORIS NI" ) return "Computing scores"; // "Computing scores" (Phase 1 brief dispatch update) → direct key match // "Aggregating scores" (Phase 2) → direct key match if (raw === "Orchestrating pipeline") return null; // ignore, handled by refresh-city return raw; } // ─── State ──────────────────────────────────────────────────────────────────── export type OverallStatus = "pending" | "active" | "completed" | "failed"; interface ProgressState { stages: StageStatus[]; overall: OverallStatus; error?: string; routingDetail?: RoutingDetail; } type Action = | { type: "progress"; stage: string; pct: number; message: string; routingDetail?: RoutingDetail } | { type: "completed" } | { type: "failed"; error: string }; function initialState(): ProgressState { return { stages: STAGE_SPECS.map((s) => ({ key: s.key, label: s.label, status: "pending", pct: 0, message: "", parallelGroup: s.parallelGroup, })), overall: "pending", }; } function reducer(state: ProgressState, action: Action): ProgressState { switch (action.type) { case "progress": { const stageKey = normalizeStage(action.stage); if (stageKey === null) return { ...state, overall: "active" }; const stageIdx = STAGE_SPECS.findIndex((s) => s.key === stageKey); if (stageIdx === -1) { // Unknown stage — keep pipeline active but don't corrupt stage state return { ...state, overall: "active" }; } const activeGroup = STAGE_SPECS[stageIdx].parallelGroup; let found = false; const stages = state.stages.map((s, i) => { const spec = STAGE_SPECS[i]; if (s.key === stageKey) { found = true; return { ...s, status: "active" as const, pct: action.pct, message: action.message }; } // Don't auto-complete parallel siblings — they track their own lifecycle if (activeGroup && spec.parallelGroup === activeGroup) return s; // Mark all prior (non-sibling) stages as completed if (!found) return { ...s, status: "completed" as const, pct: 100 }; return s; }); return { ...state, stages, overall: "active", routingDetail: action.routingDetail ?? state.routingDetail, }; } case "completed": return { ...state, overall: "completed", stages: state.stages.map((s) => ({ ...s, status: "completed", pct: 100 })), }; case "failed": return { ...state, overall: "failed", error: action.error }; default: return state; } } // ─── Hook ───────────────────────────────────────────────────────────────────── export function useJobProgress(jobId: string | null): ProgressState { const [state, dispatch] = useReducer(reducer, undefined, initialState); const esRef = useRef(null); const completedRef = useRef(false); useEffect(() => { if (!jobId) return; completedRef.current = false; const es = new EventSource(`/api/admin/jobs/${jobId}/stream`); esRef.current = es; es.onmessage = (event) => { const payload = JSON.parse(event.data) as SSEEvent; if (payload.type === "heartbeat") return; if (payload.type === "completed") { completedRef.current = true; es.close(); esRef.current = null; } dispatch(payload as Action); }; es.onerror = () => { if (completedRef.current) return; dispatch({ type: "failed", error: "Lost connection to job stream" }); es.close(); }; return () => { es.close(); esRef.current = null; }; }, [jobId]); return state; }