fifteen/apps/web/hooks/use-job-progress.ts

155 lines
4.4 KiB
TypeScript

"use client";
import { useEffect, useReducer, useRef } from "react";
import type { SSEEvent } from "@transportationer/shared";
export type PipelineStageKey =
| "download-pbf"
| "extract-pois"
| "generate-grid"
| "build-valhalla"
| "compute-scores"
| "refresh-city";
export interface StageStatus {
key: string;
label: string;
status: "pending" | "active" | "completed" | "failed";
pct: number;
message: string;
}
// Four logical UI stages that map to the actual (parallel) pipeline jobs.
// extract-pois and build-valhalla run concurrently — they share "Processing OSM"
// so the linear mark-prior-as-completed logic stays correct.
const STAGE_ORDER: Array<{ key: string; label: string }> = [
{ key: "Downloading PBF", label: "Download OSM data" },
{ key: "Processing OSM", label: "Process OSM & build routes" },
{ key: "Generating grid", label: "Generate analysis grid" },
{ key: "Computing scores", label: "Compute accessibility scores" },
];
/**
* Maps raw worker stage strings → UI stage keys.
* All three parallel worker stages (extract-pois sub-stages + build-valhalla)
* fold into "Processing OSM". Routing sub-jobs and BORIS NI ingest fold
* into "Computing scores" (they run during compute-scores Phase 1).
*/
function normalizeStage(raw: string): string {
if (
raw === "Clipping to bounding box" ||
raw === "Filtering OSM tags" ||
raw === "Importing to PostGIS" ||
raw === "Building routing graph"
) return "Processing OSM";
if (raw.startsWith("Routing ") || raw === "Ingesting BORIS NI") {
return "Computing scores";
}
return raw;
}
export type OverallStatus = "pending" | "active" | "completed" | "failed";
interface ProgressState {
stages: StageStatus[];
overall: OverallStatus;
error?: string;
}
type Action =
| { type: "progress"; stage: string; pct: number; message: string }
| { type: "completed" }
| { type: "failed"; error: string };
function initialState(): ProgressState {
return {
stages: STAGE_ORDER.map((s) => ({
key: s.key,
label: s.label,
status: "pending",
pct: 0,
message: "",
})),
overall: "pending",
};
}
function reducer(state: ProgressState, action: Action): ProgressState {
switch (action.type) {
case "progress": {
const stageKey = normalizeStage(action.stage);
let found = false;
const stages = state.stages.map((s) => {
if (s.key === stageKey) {
found = true;
return {
...s,
status: "active" as const,
pct: action.pct,
message: action.message,
};
}
// Mark prior stages completed once a later stage is active
if (!found) return { ...s, status: "completed" as const, pct: 100 };
return s;
});
return { ...state, stages, overall: "active" };
}
case "completed":
return {
...state,
overall: "completed",
stages: state.stages.map((s) => ({
...s,
status: "completed",
pct: 100,
})),
};
case "failed":
return { ...state, overall: "failed", error: action.error };
default:
return state;
}
}
export function useJobProgress(jobId: string | null): ProgressState {
const [state, dispatch] = useReducer(reducer, undefined, initialState);
const esRef = useRef<EventSource | null>(null);
// Tracks whether the stream ended with a legitimate "completed" event so
// the subsequent connection-close (which fires onerror) is ignored.
const completedRef = useRef(false);
useEffect(() => {
if (!jobId) return;
completedRef.current = false;
const es = new EventSource(`/api/admin/jobs/${jobId}/stream`);
esRef.current = es;
es.onmessage = (event) => {
const payload = JSON.parse(event.data) as SSEEvent;
if (payload.type === "heartbeat") return;
if (payload.type === "completed") {
// Close before the server closes so the subsequent connection-close
// does not trigger onerror and overwrite the completed state.
completedRef.current = true;
es.close();
esRef.current = null;
}
dispatch(payload as Action);
};
es.onerror = () => {
if (completedRef.current) return; // Normal close after completion — ignore
dispatch({ type: "failed", error: "Lost connection to job stream" });
es.close();
};
return () => {
es.close();
esRef.current = null;
};
}, [jobId]);
return state;
}