diff --git a/apps/web/app/api/admin/jobs/[id]/stream/route.ts b/apps/web/app/api/admin/jobs/[id]/stream/route.ts index 3ffa5be..380a1f5 100644 --- a/apps/web/app/api/admin/jobs/[id]/stream/route.ts +++ b/apps/web/app/api/admin/jobs/[id]/stream/route.ts @@ -1,8 +1,9 @@ import { NextRequest } from "next/server"; import { Job } from "bullmq"; import { getPipelineQueue, getValhallaQueue } from "@/lib/queue"; -import type { PipelineJobData, JobProgress } from "@/lib/queue"; +import type { PipelineJobData, JobProgress, ComputeScoresJobData } from "@/lib/queue"; import type { SSEEvent } from "@transportationer/shared"; +import { CATEGORY_IDS } from "@transportationer/shared"; export const runtime = "nodejs"; @@ -56,11 +57,45 @@ export async function GET( const poll = async () => { try { - // 1. Find the currently active stage across both queues. - const [pipelineActive, valhallaActive] = await Promise.all([ + // 1. Fetch active jobs and waiting-children jobs in parallel. + const [pipelineActive, valhallaActive, waitingChildren] = await Promise.all([ queue.getActive(0, 100), valhallaQueue.getActive(0, 100), + queue.getWaitingChildren(0, 100), ]); + + // 1a. Parallel routing phase: compute-scores is waiting for its routing + // children to finish. Report aggregate progress instead of one job's pct. + const csWaiting = waitingChildren.find( + (j) => j.data.citySlug === citySlug && j.data.type === "compute-scores", + ); + if (csWaiting) { + const csData = csWaiting.data as ComputeScoresJobData; + const totalRoutingJobs = csData.modes.length * CATEGORY_IDS.length; + + // Count jobs that haven't finished yet (active or still waiting in queue) + const pipelineWaiting = await queue.getWaiting(0, 200); + const stillActive = pipelineActive.filter( + (j) => j.data.citySlug === citySlug && j.data.type === "compute-routing", + ).length; + const stillWaiting = pipelineWaiting.filter( + (j) => j.data.citySlug === citySlug && j.data.type === "compute-routing", + ).length; + const completedCount = Math.max(0, totalRoutingJobs - stillActive - stillWaiting); + const pct = totalRoutingJobs > 0 + ? Math.round((completedCount / totalRoutingJobs) * 100) + : 0; + + enqueue({ + type: "progress", + stage: "Computing scores", + pct, + message: `${completedCount} / ${totalRoutingJobs} routing jobs`, + }); + return; + } + + // 1b. Sequential phase: report whichever single job is currently active. const activeJob = [...pipelineActive, ...valhallaActive].find( (j) => j.data.citySlug === citySlug && j.data.type !== "refresh-city", ); diff --git a/apps/web/app/api/boris/route.ts b/apps/web/app/api/boris/route.ts new file mode 100644 index 0000000..7ea785f --- /dev/null +++ b/apps/web/app/api/boris/route.ts @@ -0,0 +1,11 @@ +// Renamed to /api/estate-value — this file can be deleted. +import type { NextRequest } from "next/server"; +import { NextResponse } from "next/server"; + +export const runtime = "nodejs"; + +export function GET(request: NextRequest) { + const url = new URL(request.url); + url.pathname = "/api/estate-value"; + return NextResponse.redirect(url, 301); +} diff --git a/apps/web/app/api/estate-value/route.ts b/apps/web/app/api/estate-value/route.ts new file mode 100644 index 0000000..461a685 --- /dev/null +++ b/apps/web/app/api/estate-value/route.ts @@ -0,0 +1,213 @@ +import { NextResponse } from "next/server"; +import { sql } from "@/lib/db"; + +export const runtime = "nodejs"; + +export async function GET(request: Request) { + const { searchParams } = new URL(request.url); + + // ── Point query: ?lat=&lng= ─────────────────────────────────────────────── + const latParam = searchParams.get("lat"); + const lngParam = searchParams.get("lng"); + + if (latParam !== null && lngParam !== null) { + const lat = Number(latParam); + const lng = Number(lngParam); + if (isNaN(lat) || isNaN(lng)) { + return NextResponse.json({ error: "invalid lat/lng" }, { status: 400 }); + } + + // Optional params for score-based percentile comparison. + const cityParam = searchParams.get("city"); + const modeParam = searchParams.get("mode"); + const profileParam = searchParams.get("profile"); + const thresholdNum = Number(searchParams.get("threshold") ?? ""); + const hasScoreParams = + cityParam && modeParam && profileParam && !isNaN(thresholdNum) && thresholdNum > 0; + + // Run both queries in parallel: zone info + 5km radius percentile, and + // (optionally) score-based percentile among zones with similar accessibility. + const [mainRows, scoreRows] = await Promise.all([ + Promise.resolve(sql<{ + geom_json: string; + value: number | null; + zone_name: string | null; + usage_type: string | null; + usage_detail: string | null; + dev_state: string | null; + stichtag: string | null; + percentile_rank: number | null; + nearby_count: number; + }[]>` + WITH pt AS ( + SELECT ST_SetSRID(ST_Point(${lng}, ${lat}), 4326) AS geom + ), + nearby AS ( + SELECT value_eur_m2 + FROM estate_value_zones, pt + WHERE value_eur_m2 IS NOT NULL + AND ST_DWithin(estate_value_zones.geom::geography, pt.geom::geography, 5000) + ) + SELECT + ST_AsGeoJSON(ez.geom) AS geom_json, + ez.value_eur_m2::float AS value, + ez.zone_name, ez.usage_type, ez.usage_detail, ez.dev_state, ez.stichtag, + ( + SELECT ROUND( + count(*) FILTER (WHERE nearby.value_eur_m2 <= ez.value_eur_m2) * 100.0 / + NULLIF(count(*), 0) + )::int + FROM nearby + ) AS percentile_rank, + (SELECT count(*)::int FROM nearby) AS nearby_count + FROM estate_value_zones ez, pt + WHERE ST_Within(pt.geom, ez.geom) + LIMIT 1 + `), + + // Score-based percentile: find all zones in the city, attach each zone's + // accessibility score (composite = average across categories for the + // nearest grid point), then compute the percentile of this zone's value + // among zones with a similar accessibility score (±0.1 band). + hasScoreParams + ? Promise.resolve(sql<{ + score_percentile_rank: number | null; + similar_count: number; + }[]>` + WITH pt AS ( + SELECT ST_SetSRID(ST_Point(${lng}, ${lat}), 4326) AS geom + ), + clicked_zone AS ( + SELECT value_eur_m2 + FROM estate_value_zones, pt + WHERE ST_Within(pt.geom, estate_value_zones.geom) + LIMIT 1 + ), + clicked_gp_score AS ( + SELECT AVG(gs.score) AS composite_score + FROM grid_scores gs + WHERE gs.grid_point_id = ( + SELECT gp.id + FROM grid_points gp, pt + WHERE gp.city_slug = ${cityParam} + ORDER BY gp.geom <-> pt.geom + LIMIT 1 + ) + AND gs.travel_mode = ${modeParam} + AND gs.threshold_min = ${thresholdNum} + AND gs.profile = ${profileParam} + ), + zone_scores AS ( + SELECT ez.value_eur_m2, nearest.composite_score + FROM estate_value_zones ez + JOIN LATERAL ( + SELECT AVG(gs.score) AS composite_score + FROM grid_scores gs + WHERE gs.grid_point_id = ( + SELECT gp.id + FROM grid_points gp + WHERE gp.city_slug = ${cityParam} + ORDER BY gp.geom <-> ST_PointOnSurface(ez.geom) + LIMIT 1 + ) + AND gs.travel_mode = ${modeParam} + AND gs.threshold_min = ${thresholdNum} + AND gs.profile = ${profileParam} + ) nearest ON true + WHERE ez.value_eur_m2 IS NOT NULL + AND ez.city_slug = ${cityParam} + ), + peer_zones AS ( + SELECT value_eur_m2 + FROM zone_scores + WHERE ABS(composite_score - (SELECT composite_score FROM clicked_gp_score)) <= 0.1 + ) + SELECT + ROUND( + count(*) FILTER ( + WHERE value_eur_m2 <= (SELECT value_eur_m2 FROM clicked_zone) + ) * 100.0 / NULLIF(count(*), 0) + )::int AS score_percentile_rank, + count(*)::int AS similar_count + FROM peer_zones + `) + : Promise.resolve([] as { score_percentile_rank: number | null; similar_count: number }[]), + ]); + + const scoreRow = scoreRows[0] ?? null; + + const features = mainRows.map((row) => ({ + type: "Feature", + geometry: JSON.parse(row.geom_json), + properties: { + value: row.value, + zoneName: row.zone_name, + usageType: row.usage_type, + usageDetail: row.usage_detail, + devState: row.dev_state, + stichtag: row.stichtag, + percentileRank: row.percentile_rank, + nearbyCount: row.nearby_count, + scorePercentileRank: scoreRow?.score_percentile_rank ?? null, + similarCount: scoreRow?.similar_count ?? 0, + }, + })); + + return NextResponse.json( + { type: "FeatureCollection", features }, + { headers: { "Cache-Control": "no-store" } }, + ); + } + + // ── Bbox query: ?bbox=minLng,minLat,maxLng,maxLat ───────────────────────── + const bboxParam = searchParams.get("bbox"); + if (!bboxParam) { + return NextResponse.json({ error: "bbox or lat/lng required" }, { status: 400 }); + } + + const parts = bboxParam.split(",").map(Number); + if (parts.length !== 4 || parts.some(isNaN)) { + return NextResponse.json({ error: "invalid bbox" }, { status: 400 }); + } + + const [minLng, minLat, maxLng, maxLat] = parts; + + const rows = await Promise.resolve(sql<{ + geom_json: string; + value: number | null; + zone_name: string | null; + usage_type: string | null; + usage_detail: string | null; + dev_state: string | null; + stichtag: string | null; + }[]>` + SELECT + ST_AsGeoJSON(geom) AS geom_json, + value_eur_m2::float AS value, + zone_name, usage_type, usage_detail, dev_state, stichtag + FROM estate_value_zones + WHERE ST_Intersects( + geom, + ST_MakeEnvelope(${minLng}, ${minLat}, ${maxLng}, ${maxLat}, 4326) + ) + LIMIT 5000 + `); + + const features = rows.map((row) => ({ + type: "Feature", + geometry: JSON.parse(row.geom_json), + properties: { + value: row.value, + zoneName: row.zone_name, + usageType: row.usage_type, + usageDetail: row.usage_detail, + devState: row.dev_state, + stichtag: row.stichtag, + }, + })); + + return NextResponse.json( + { type: "FeatureCollection", features }, + { headers: { "Cache-Control": "public, max-age=3600" } }, + ); +} diff --git a/apps/web/app/api/tiles/hidden-gems/[...tile]/route.ts b/apps/web/app/api/tiles/hidden-gems/[...tile]/route.ts new file mode 100644 index 0000000..2ef247d --- /dev/null +++ b/apps/web/app/api/tiles/hidden-gems/[...tile]/route.ts @@ -0,0 +1,66 @@ +import { NextRequest, NextResponse } from "next/server"; +import { sql } from "@/lib/db"; + +export const runtime = "nodejs"; + +export async function GET( + req: NextRequest, + { params }: { params: Promise<{ tile: string[] }> }, +) { + const { tile } = await params; + if (tile.length !== 3) { + return new NextResponse("Invalid tile path", { status: 400 }); + } + const z = parseInt(tile[0], 10); + const x = parseInt(tile[1], 10); + const y = parseInt(tile[2], 10); + + if ([z, x, y].some(isNaN)) { + return new NextResponse("Invalid tile coordinates", { status: 400 }); + } + + const city = req.nextUrl.searchParams.get("city") ?? ""; + if (!city) return new NextResponse("Missing city", { status: 400 }); + + try { + const rows = await Promise.resolve(sql<{ mvt: Uint8Array }[]>` + WITH + envelope AS (SELECT ST_TileEnvelope(${z}, ${x}, ${y}) AS env), + city_info AS (SELECT COALESCE(resolution_m, 200) AS resolution_m FROM cities WHERE slug = ${city}) + SELECT ST_AsMVT(t, 'hidden-gems', 4096, 'geom') AS mvt + FROM ( + SELECT + ST_AsMVTGeom( + ST_Expand(ST_Transform(gp.geom, 3857), ci.resolution_m::float / 2), + e.env, + 4096, 0, true + ) AS geom, + ROUND(gp.hidden_gem_score * 100)::int AS score + FROM grid_points gp + CROSS JOIN envelope e + CROSS JOIN city_info ci + WHERE gp.city_slug = ${city} + AND gp.hidden_gem_score IS NOT NULL + AND ST_Intersects( + ST_Transform(gp.geom, 3857), + ST_Expand(e.env, ci.resolution_m::float / 2) + ) + ) t + WHERE t.geom IS NOT NULL + `); + + const buf = rows[0]?.mvt; + const data = buf ? new Uint8Array(buf) : new Uint8Array(0); + + return new NextResponse(data, { + headers: { + "Content-Type": "application/x-protobuf", + "Cache-Control": "public, max-age=3600", + "Access-Control-Allow-Origin": "*", + }, + }); + } catch (err) { + console.error("[tiles/hidden-gems] Error:", err); + return new NextResponse("Internal Server Error", { status: 500 }); + } +} diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index 82d36f3..e0c265f 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -49,12 +49,33 @@ export default function HomePage() { const [pinLocation, setPinLocation] = useState<{ lat: number; lng: number } | null>(null); const [pinData, setPinData] = useState(null); const [pinAddress, setPinAddress] = useState(undefined); + const [pinEstateValue, setPinEstateValue] = useState(null); + const [pinEstatePercentile, setPinEstatePercentile] = useState(null); + const [pinEstateScorePercentile, setPinEstateScorePercentile] = useState(null); // Overlay mode: isochrone (new default) or relative heatmap const [overlayMode, setOverlayMode] = useState("isochrone"); const [isochroneData, setIsochroneData] = useState(null); const [isochroneLoading, setIsochroneLoading] = useState(false); + // Base overlay: which layer to show when no pin is active + const [baseOverlay, setBaseOverlay] = useState<"accessibility" | "estate-value" | "hidden-gem">("accessibility"); + + // Derived city data — used in effects below so must be declared before them + const selectedCityData = cities.find((c) => c.slug === selectedCity); + const cityBbox = selectedCityData?.bbox; + const estateValueAvailable = + cityBbox != null && + cityBbox[0] < 11.779 && + cityBbox[2] > 6.526 && + cityBbox[1] < 54.033 && + cityBbox[3] > 51.197; + + // Reset base overlay when city changes (availability depends on city) + useEffect(() => { + setBaseOverlay("accessibility"); + }, [selectedCity]); + // Load city list useEffect(() => { fetch("/api/cities") @@ -79,7 +100,13 @@ export default function HomePage() { // Fetch location score + reverse geocode when pin changes useEffect(() => { - if (!pinLocation || !selectedCity) return; + if (!pinLocation || !selectedCity) { + setPinData(null); + setPinAddress(undefined); + return; + } + + let cancelled = false; const params = new URLSearchParams({ lat: String(pinLocation.lat), @@ -101,13 +128,59 @@ export default function HomePage() { .catch(() => undefined), ]) .then(([scoreData, address]) => { - if (scoreData?.error) return; + if (cancelled) return; + if (scoreData?.error) { + // No grid data for this location — clear the pin so the skeleton doesn't persist. + setPinLocation(null); + return; + } setPinData(scoreData as LocationScoreData); setPinAddress(address); }) - .catch(console.error); + .catch(() => { if (!cancelled) setPinLocation(null); }); + + return () => { cancelled = true; }; }, [pinLocation, selectedCity, mode, threshold, profile]); + // Fetch estate value + percentile ratings for the clicked location + useEffect(() => { + if (!pinLocation || !estateValueAvailable || !selectedCity) { + setPinEstateValue(null); + setPinEstatePercentile(null); + setPinEstateScorePercentile(null); + return; + } + const { lat, lng } = pinLocation; + let cancelled = false; + const params = new URLSearchParams({ + lat: String(lat), + lng: String(lng), + city: selectedCity, + mode, + threshold: String(threshold), + profile, + }); + fetch(`/api/estate-value?${params}`) + .then((r) => r.json()) + .then((geojson) => { + if (cancelled) return; + const props = geojson?.features?.[0]?.properties; + setPinEstateValue(typeof props?.value === "number" ? props.value : null); + setPinEstatePercentile(typeof props?.percentileRank === "number" ? props.percentileRank : null); + setPinEstateScorePercentile(typeof props?.scorePercentileRank === "number" ? props.scorePercentileRank : null); + }) + .catch(() => { + if (!cancelled) { + setPinEstateValue(null); + setPinEstatePercentile(null); + setPinEstateScorePercentile(null); + } + }); + return () => { + cancelled = true; + }; + }, [pinLocation, estateValueAvailable, selectedCity, mode, threshold, profile]); + // Fetch isochrone when in isochrone mode with an active pin useEffect(() => { if (!pinLocation || overlayMode !== "isochrone") { @@ -115,6 +188,7 @@ export default function HomePage() { return; } + let cancelled = false; setIsochroneLoading(true); setIsochroneData(null); @@ -130,14 +204,20 @@ export default function HomePage() { }) .then((r) => r.json()) .then((data) => { + if (cancelled) return; // Valhalla may return 200 OK with error_code (not error) for unroutable locations. // Only accept valid FeatureCollections. if (!data.error && !data.error_code && Array.isArray(data.features)) { setIsochroneData(data); } }) - .catch(console.error) - .finally(() => setIsochroneLoading(false)); + .catch(() => {}) + .finally(() => { if (!cancelled) setIsochroneLoading(false); }); + + return () => { + cancelled = true; + setIsochroneLoading(false); + }; }, [pinLocation, overlayMode, mode, threshold]); function handleProfileChange(newProfile: ProfileId) { @@ -147,13 +227,19 @@ export default function HomePage() { setPinLocation(null); setPinData(null); setPinAddress(undefined); + setPinEstateValue(null); + setPinEstatePercentile(null); + setPinEstateScorePercentile(null); setIsochroneData(null); } - function handleLocationClick(lat: number, lng: number) { + function handleLocationClick(lat: number, lng: number, estateValue: number | null) { setPinLocation({ lat, lng }); setPinData(null); setPinAddress(undefined); + setPinEstateValue(estateValue); + setPinEstatePercentile(null); + setPinEstateScorePercentile(null); setIsochroneData(null); } @@ -161,10 +247,12 @@ export default function HomePage() { setPinLocation(null); setPinData(null); setPinAddress(undefined); + setPinEstateValue(null); + setPinEstatePercentile(null); + setPinEstateScorePercentile(null); setIsochroneData(null); } - const selectedCityData = cities.find((c) => c.slug === selectedCity); return (
@@ -184,11 +272,14 @@ export default function HomePage() { threshold={threshold} weights={weights} activeCategory={activeCategory} + baseOverlay={baseOverlay} + estateValueAvailable={estateValueAvailable} onProfileChange={handleProfileChange} onModeChange={setMode} onThresholdChange={setThreshold} onWeightChange={(cat, w) => setWeights((prev) => ({ ...prev, [cat]: w }))} onCategoryChange={setActiveCategory} + onBaseOverlayChange={setBaseOverlay} />
@@ -219,21 +310,57 @@ export default function HomePage() { overlayMode === "relative" ? (pinData?.categoryScores ?? null) : null } isochrones={overlayMode === "isochrone" ? isochroneData : null} + baseOverlay={baseOverlay} onLocationClick={handleLocationClick} /> )} + {pinLocation && !pinData && ( +
+ +
+
+
+
+
+
+
+
+
+
+ {[0, 1, 2, 3, 4].map((i) => ( +
+
+
+
+
+
+
+ ))} +
+
+
+ )} + {pinData && ( = const THRESHOLDS = [5, 8, 10, 12, 15, 20, 25, 30]; +type BaseOverlay = "accessibility" | "estate-value" | "hidden-gem"; + interface ControlPanelProps { profile: ProfileId; mode: TravelMode; threshold: number; weights: Record; activeCategory: CategoryId | "composite"; + baseOverlay: BaseOverlay; + estateValueAvailable: boolean; onProfileChange: (p: ProfileId) => void; onModeChange: (m: TravelMode) => void; onThresholdChange: (t: number) => void; onWeightChange: (cat: CategoryId, w: number) => void; onCategoryChange: (cat: CategoryId | "composite") => void; + onBaseOverlayChange: (o: BaseOverlay) => void; } export function ControlPanel({ @@ -31,11 +36,14 @@ export function ControlPanel({ threshold, weights, activeCategory, + baseOverlay, + estateValueAvailable, onProfileChange, onModeChange, onThresholdChange, onWeightChange, onCategoryChange, + onBaseOverlayChange, }: ControlPanelProps) { return (
+ {/* Base overlay selector */} +
+

+ Map Overlay +

+
+ + {estateValueAvailable && ( + <> + + + + )} +
+ {baseOverlay === "estate-value" && estateValueAvailable && ( +
+
+
+ 0 + 100 + 300 + 700 + 1500+ +
+
€/m²
+
+ )} + {baseOverlay === "hidden-gem" && estateValueAvailable && ( +
+
+
+ Poor + Gem score + High +
+
+ )} +
+ {/* Category weights */}

diff --git a/apps/web/components/location-score-panel.tsx b/apps/web/components/location-score-panel.tsx index 7172b65..d8b76e7 100644 --- a/apps/web/components/location-score-panel.tsx +++ b/apps/web/components/location-score-panel.tsx @@ -112,6 +112,9 @@ export function LocationScorePanel({ data, weights, address, + estateValue, + estatePercentile, + estateScorePercentile, overlayMode, isochroneLoading, onOverlayModeChange, @@ -120,6 +123,11 @@ export function LocationScorePanel({ data: LocationScoreData; weights: Weights; address?: string; + estateValue?: number | null; + /** % of zones within 5 km with a lower value (0–100). */ + estatePercentile?: number | null; + /** % of zones with similar accessibility score that have a lower value (0–100). */ + estateScorePercentile?: number | null; overlayMode: OverlayMode; isochroneLoading: boolean; onOverlayModeChange: (mode: OverlayMode) => void; @@ -136,6 +144,28 @@ export function LocationScorePanel({

{g}
{Math.round(composite * 100)} / 100
+ {estateValue != null && ( +
+
+ {estateValue.toLocaleString("de-DE")} €/m² + land value +
+ {(estatePercentile != null || estateScorePercentile != null) && ( +
+ {estatePercentile != null && ( +
+ Pricier than {estatePercentile}% of zones · 5 km radius +
+ )} + {estateScorePercentile != null && ( +
+ Pricier than {estateScorePercentile}% · similar accessibility +
+ )} +
+ )} +
+ )} {address && (
{address} diff --git a/apps/web/components/map-legend.tsx b/apps/web/components/map-legend.tsx index d43db8e..a41a9a9 100644 --- a/apps/web/components/map-legend.tsx +++ b/apps/web/components/map-legend.tsx @@ -1,9 +1,11 @@ "use client"; import type { OverlayMode } from "./location-score-panel"; +import type { BaseOverlay } from "./map-view"; interface MapLegendProps { overlayMode: OverlayMode; + baseOverlay: BaseOverlay; threshold: number; hasPinData: boolean; } @@ -21,7 +23,7 @@ function gradientCss(stops: [number, string][]): string { return `linear-gradient(to right, ${stops.map(([p, c]) => `${c} ${p * 100}%`).join(", ")})`; } -export function MapLegend({ overlayMode, threshold, hasPinData }: MapLegendProps) { +export function MapLegend({ overlayMode, baseOverlay, threshold, hasPinData }: MapLegendProps) { if (overlayMode === "isochrone" && hasPinData) { // Travel-time legend: green (near) → red (far) const stops: [number, string][] = [ @@ -67,6 +69,53 @@ export function MapLegend({ overlayMode, threshold, hasPinData }: MapLegendProps ); } + // Hidden gem score + if (baseOverlay === "hidden-gem" && !hasPinData) { + const stops: [number, string][] = [ + [0, "#d73027"], + [0.4, "#fee08b"], + [0.7, "#d9ef8b"], + [1, "#1a9850"], + ]; + return ( +
+
Hidden Gem Score
+
+ Poor value +
+ High value +
+
+ ); + } + + // Estate value (land price) + if (baseOverlay === "estate-value" && !hasPinData) { + const stops: [number, string][] = [ + [0, "#ffffb2"], + [0.07, "#fecc5c"], + [0.2, "#fd8d3c"], + [0.47, "#f03b20"], + [1, "#bd0026"], + ]; + return ( +
+
Land Value
+
+ 0 +
+ 1500+ €/m² +
+
+ ); + } + // Default: absolute accessibility score return (
diff --git a/apps/web/components/map-view.tsx b/apps/web/components/map-view.tsx index 9dd6746..baa870a 100644 --- a/apps/web/components/map-view.tsx +++ b/apps/web/components/map-view.tsx @@ -1,10 +1,14 @@ "use client"; -import { useEffect, useRef } from "react"; +import { useEffect, useRef, useState } from "react"; +import * as maplibregl from "maplibre-gl"; +import { Protocol } from "pmtiles"; import type { CategoryId, TravelMode, ProfileId } from "@transportationer/shared"; type Weights = Record; +export type BaseOverlay = "accessibility" | "estate-value" | "hidden-gem"; + export interface MapViewProps { citySlug: string; cityBbox: [number, number, number, number]; @@ -18,7 +22,9 @@ export interface MapViewProps { pinCategoryScores?: Record | null; /** Set in isochrone mode: GeoJSON FeatureCollection from Valhalla. */ isochrones?: object | null; - onLocationClick?: (lat: number, lng: number) => void; + /** Which base overlay to show (accessibility grid, estate value, or hidden gem). */ + baseOverlay?: BaseOverlay; + onLocationClick?: (lat: number, lng: number, estateValue: number | null) => void; } // Red → yellow → green score ramp @@ -66,7 +72,6 @@ function makeRelativeColorExpr( pinScore = pinCategoryScores[cat] ?? 0; } - // Diverging: negative = worse than pin (red), positive = better (green) return [ "interpolate", ["linear"], ["-", scoreExpr, pinScore], -0.5, "#d73027", @@ -78,16 +83,52 @@ function makeRelativeColorExpr( } function tileUrl(city: string, mode: string, threshold: number, profile: string) { - const origin = typeof window !== "undefined" ? window.location.origin : ""; - return `${origin}/api/tiles/grid/{z}/{x}/{y}?city=${encodeURIComponent(city)}&mode=${mode}&threshold=${threshold}&profile=${profile}`; + return `${window.location.origin}/api/tiles/grid/{z}/{x}/{y}?city=${encodeURIComponent(city)}&mode=${mode}&threshold=${threshold}&profile=${profile}`; } -/** Remove isochrone layer/source if they exist. */ -function removeIsochroneLayers(map: import("maplibre-gl").Map) { +function removeIsochroneLayers(map: maplibregl.Map) { if (map.getLayer("isochrone-fill")) map.removeLayer("isochrone-fill"); if (map.getSource("isochrone")) map.removeSource("isochrone"); } +function removeEstateValueLayers(map: maplibregl.Map) { + if (map.getLayer("estate-value-outline")) map.removeLayer("estate-value-outline"); + if (map.getLayer("estate-value-fill")) map.removeLayer("estate-value-fill"); + if (map.getSource("estate-value-zones")) map.removeSource("estate-value-zones"); +} + +function removeHiddenGemLayers(map: maplibregl.Map) { + if (map.getLayer("hidden-gems-fill")) map.removeLayer("hidden-gems-fill"); + if (map.getSource("hidden-gems-tiles")) map.removeSource("hidden-gems-tiles"); +} + +function buildEstateValuePopupHtml(props: Record): string { + const value = props.value != null ? `${props.value} €/m²` : "–"; + const usageLabels: Record = { + W: "Residential", G: "Commercial", LF: "Agricultural", + SF: "Special Use", B: "Mixed", GIF: "Mixed Infill", + }; + const detailLabels: Record = { + EFH: "Single-family", ZFH: "Two-family", MFH: "Multi-family", + RH: "Terraced", GW: "Mixed-use", + }; + const usage = props.usageType + ? (usageLabels[props.usageType as string] ?? props.usageType as string) + : null; + const detail = props.usageDetail + ? (detailLabels[props.usageDetail as string] ?? props.usageDetail as string) + : null; + const zone = props.zoneName as string | null; + const stichtag = props.stichtag as string | null; + + return `
+
${value}
+ ${usage ? `
${usage}${detail ? ` · ${detail}` : ""}
` : ""} + ${zone ? `
${zone}
` : ""} + ${stichtag ? `
${stichtag}
` : ""} +
`; +} + export function MapView({ citySlug, cityBbox, @@ -99,181 +140,350 @@ export function MapView({ pinLocation, pinCategoryScores, isochrones, + baseOverlay = "accessibility", onLocationClick, }: MapViewProps) { const containerRef = useRef(null); - const mapRef = useRef(null); - const markerRef = useRef(null); + const mapRef = useRef(null); + const markerRef = useRef(null); + const estateValuePopupRef = useRef(null); + const hiddenGemPopupRef = useRef(null); const mountedRef = useRef(false); + // Ref for estate-value event handlers so cleanup can call map.off() even after async fetch + const evHandlersRef = useRef<{ + map: maplibregl.Map; + onEnter: () => void; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + onMove: (e: any) => void; + onLeave: () => void; + } | null>(null); + + // Tracked as state so effects re-run once the map style finishes loading. + // Without this, effects that check isStyleLoaded() return early on first render + // and never reapply state after the async on("load") fires. + const [mapLoaded, setMapLoaded] = useState(false); + + // Derive which layers should be visible + const showGrid = !isochrones && (baseOverlay === "accessibility" || !!pinCategoryScores); + const showEstateValue = !isochrones && !pinCategoryScores && baseOverlay === "estate-value"; + const showHiddenGem = !isochrones && !pinCategoryScores && baseOverlay === "hidden-gem"; const stateRef = useRef({ - citySlug, profile, mode, threshold, activeCategory, weights, onLocationClick, + citySlug, cityBbox, profile, mode, threshold, activeCategory, weights, onLocationClick, }); - stateRef.current = { citySlug, profile, mode, threshold, activeCategory, weights, onLocationClick }; + stateRef.current = { citySlug, cityBbox, profile, mode, threshold, activeCategory, weights, onLocationClick }; - // Update heatmap paint when category, weights, or pin scores change + // ── Update heatmap paint when category / weights / pin scores change ───── useEffect(() => { + if (!mapLoaded) return; const map = mapRef.current; - if (!map?.isStyleLoaded() || !map.getLayer("grid-fill")) return; + if (!map?.getLayer("grid-fill")) return; const colorExpr = pinCategoryScores ? makeRelativeColorExpr(activeCategory, weights, pinCategoryScores) : makeColorExpr(activeCategory, weights); // eslint-disable-next-line @typescript-eslint/no-explicit-any map.setPaintProperty("grid-fill", "fill-color", colorExpr as any); - }, [activeCategory, weights, pinCategoryScores]); + }, [mapLoaded, activeCategory, weights, pinCategoryScores]); - // Update tile source when city/mode/threshold/profile change + // ── Update tile source when city / mode / threshold / profile change ────── useEffect(() => { + if (!mapLoaded) return; const map = mapRef.current; - if (!map?.isStyleLoaded()) return; + if (!map) return; // eslint-disable-next-line @typescript-eslint/no-explicit-any const src = map.getSource("grid-tiles") as any; if (src?.setTiles) src.setTiles([tileUrl(citySlug, mode, threshold, profile)]); - }, [citySlug, mode, threshold, profile]); + }, [mapLoaded, citySlug, mode, threshold, profile]); - // Add/remove pin marker when pin location changes + // ── Pin marker ───────────────────────────────────────────────────────────── + // Markers are DOM elements — no need to wait for style to be loaded. useEffect(() => { const map = mapRef.current; if (!map) return; markerRef.current?.remove(); markerRef.current = null; if (pinLocation) { - import("maplibre-gl").then(({ Marker }) => { - const marker = new Marker({ color: "#2563eb" }) - .setLngLat([pinLocation.lng, pinLocation.lat]) - .addTo(map); - markerRef.current = marker; - }); + markerRef.current = new maplibregl.Marker({ color: "#2563eb" }) + .setLngLat([pinLocation.lng, pinLocation.lat]) + .addTo(map); } }, [pinLocation]); - // Add/remove isochrone layer when isochrones data changes. - // The grid-fill layer is hidden while isochrones are shown so only one - // overlay is visible at a time. + // ── Grid visibility ─────────────────────────────────────────────────────── useEffect(() => { + if (!mapLoaded) return; const map = mapRef.current; - if (!map?.isStyleLoaded()) return; + if (!map?.getLayer("grid-fill")) return; + map.setLayoutProperty("grid-fill", "visibility", showGrid ? "visible" : "none"); + }, [mapLoaded, showGrid]); + + // ── Isochrone layer ─────────────────────────────────────────────────────── + useEffect(() => { + if (!mapLoaded) return; + const map = mapRef.current!; removeIsochroneLayers(map); - if (!isochrones) { - // Restore grid when leaving isochrone mode. - if (map.getLayer("grid-fill")) { - map.setLayoutProperty("grid-fill", "visibility", "visible"); - } - return; - } + if (!isochrones) return; - // Hide the grid heatmap — the isochrone replaces it visually. if (map.getLayer("grid-fill")) { map.setLayoutProperty("grid-fill", "visibility", "none"); } - // Sort largest contour first — smaller (inner, more accessible) polygons - // are drawn on top, so each pixel shows the color of the smallest contour - // that covers it (i.e. the fastest reachable zone wins visually). const geojson = isochrones as { type: string; features: { properties: { contour: number } }[] }; - if (!Array.isArray(geojson.features) || geojson.features.length === 0) { - // Malformed response (e.g. Valhalla error body with no features) — restore grid. - if (map.getLayer("grid-fill")) map.setLayoutProperty("grid-fill", "visibility", "visible"); - return; - } + if (!Array.isArray(geojson.features) || geojson.features.length === 0) return; + const contourValues = geojson.features.map((f) => f.properties.contour); const maxContour = Math.max(...contourValues); - const sorted = { ...geojson, - features: [...geojson.features].sort( - (a, b) => b.properties.contour - a.properties.contour, - ), + features: [...geojson.features].sort((a, b) => b.properties.contour - a.properties.contour), }; - try { - map.addSource("isochrone", { type: "geojson", data: sorted as never }); - // Color each zone using the same green→red ramp: - // small contour (close) = green, large contour (far) = red. - map.addLayer({ - id: "isochrone-fill", - type: "fill", - source: "isochrone", - paint: { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - "fill-color": ["interpolate", ["linear"], ["get", "contour"], - 0, "#1a9850", - maxContour * 0.5, "#fee08b", - maxContour, "#d73027", - ] as any, - "fill-opacity": 0.65, - "fill-outline-color": "rgba(0,0,0,0.15)", - }, - }); - } catch (err) { - console.warn("[map-view] Error adding isochrone layer:", err); - } + map.addSource("isochrone", { type: "geojson", data: sorted as never }); + map.addLayer({ + id: "isochrone-fill", + type: "fill", + source: "isochrone", + paint: { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + "fill-color": ["interpolate", ["linear"], ["get", "contour"], + 0, "#1a9850", + maxContour * 0.5, "#fee08b", + maxContour, "#d73027", + ] as any, + "fill-opacity": 0.5, + "fill-outline-color": "rgba(0,0,0,0.1)", + }, + }); + + return () => { removeIsochroneLayers(map); }; + }, [mapLoaded, isochrones]); + + // ── Hidden gem overlay ──────────────────────────────────────────────────── + useEffect(() => { + if (!mapLoaded) return; + const map = mapRef.current!; + + removeHiddenGemLayers(map); + hiddenGemPopupRef.current?.remove(); + hiddenGemPopupRef.current = null; + + if (!showHiddenGem) return; + + const tileBase = `${window.location.origin}/api/tiles/hidden-gems/{z}/{x}/{y}?city=${encodeURIComponent(citySlug)}`; + + map.addSource("hidden-gems-tiles", { + type: "vector", + tiles: [tileBase], + minzoom: 0, + maxzoom: 16, + }); + map.addLayer({ + id: "hidden-gems-fill", + type: "fill", + source: "hidden-gems-tiles", + "source-layer": "hidden-gems", + paint: { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + "fill-color": ["interpolate", ["linear"], ["coalesce", ["get", "score"], 0], + 0, "#d73027", + 40, "#fee08b", + 70, "#d9ef8b", + 100, "#1a9850", + ] as any, + "fill-opacity": 0.5, + "fill-outline-color": "rgba(0,0,0,0.1)", + }, + }); + + const popup = new maplibregl.Popup({ closeButton: false, closeOnClick: false, maxWidth: "200px" }); + hiddenGemPopupRef.current = popup; + + const onEnter = () => { map.getCanvas().style.cursor = "pointer"; }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const onMove = (e: any) => { + const feature = e.features?.[0]; + if (!feature) return; + const score = feature.properties?.score as number | null; + popup.setLngLat(e.lngLat).setHTML( + `
+
${score != null ? score + "%" : "–"}
+
Hidden gem score
+
` + ).addTo(map); + }; + const onLeave = () => { map.getCanvas().style.cursor = "crosshair"; popup.remove(); }; + + map.on("mouseenter", "hidden-gems-fill", onEnter); + map.on("mousemove", "hidden-gems-fill", onMove); + map.on("mouseleave", "hidden-gems-fill", onLeave); return () => { - const m = mapRef.current; - if (!m?.isStyleLoaded()) return; - removeIsochroneLayers(m); - if (m.getLayer("grid-fill")) { - m.setLayoutProperty("grid-fill", "visibility", "visible"); - } + map.off("mouseenter", "hidden-gems-fill", onEnter); + map.off("mousemove", "hidden-gems-fill", onMove); + map.off("mouseleave", "hidden-gems-fill", onLeave); + removeHiddenGemLayers(map); + try { map.getCanvas().style.cursor = "crosshair"; } catch { /* map removed */ } + hiddenGemPopupRef.current?.remove(); + hiddenGemPopupRef.current = null; }; - }, [isochrones]); + }, [mapLoaded, showHiddenGem, citySlug]); // eslint-disable-line react-hooks/exhaustive-deps - // Initialize map once on mount + // ── Estate value overlay ────────────────────────────────────────────────── + useEffect(() => { + if (!mapLoaded) return; + const map = mapRef.current!; + + // Clean up any previously registered handlers before tearing down layers + if (evHandlersRef.current) { + const { map: m, onEnter, onMove, onLeave } = evHandlersRef.current; + m.off("mouseenter", "estate-value-fill", onEnter); + m.off("mousemove", "estate-value-fill", onMove); + m.off("mouseleave", "estate-value-fill", onLeave); + evHandlersRef.current = null; + } + removeEstateValueLayers(map); + estateValuePopupRef.current?.remove(); + estateValuePopupRef.current = null; + + if (!showEstateValue) return; + + const [minLng, minLat, maxLng, maxLat] = stateRef.current.cityBbox; + let cancelled = false; + + fetch(`/api/estate-value?bbox=${minLng},${minLat},${maxLng},${maxLat}`) + .then((r) => { + if (!r.ok) throw new Error(`estate-value API ${r.status}`); + return r.json(); + }) + .then((geojson) => { + if (cancelled || !map.isStyleLoaded()) return; + + map.addSource("estate-value-zones", { type: "geojson", data: geojson }); + map.addLayer({ + id: "estate-value-fill", + type: "fill", + source: "estate-value-zones", + paint: { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + "fill-color": ["interpolate", ["linear"], ["coalesce", ["get", "value"], 0], + 0, "#ffffb2", + 100, "#fecc5c", + 300, "#fd8d3c", + 700, "#f03b20", + 1500, "#bd0026", + ] as any, + "fill-opacity": 0.5, + }, + }); + map.addLayer({ + id: "estate-value-outline", + type: "line", + source: "estate-value-zones", + paint: { "line-color": "rgba(0,0,0,0.25)", "line-width": 0.6 }, + }); + + const popup = new maplibregl.Popup({ closeButton: false, closeOnClick: false, maxWidth: "240px" }); + estateValuePopupRef.current = popup; + + const onEnter = () => { map.getCanvas().style.cursor = "pointer"; }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const onMove = (e: any) => { + const feature = e.features?.[0]; + if (!feature) return; + popup.setLngLat(e.lngLat) + .setHTML(buildEstateValuePopupHtml(feature.properties as Record)) + .addTo(map); + }; + const onLeave = () => { map.getCanvas().style.cursor = "crosshair"; popup.remove(); }; + + map.on("mouseenter", "estate-value-fill", onEnter); + map.on("mousemove", "estate-value-fill", onMove); + map.on("mouseleave", "estate-value-fill", onLeave); + evHandlersRef.current = { map, onEnter, onMove, onLeave }; + }) + .catch((err) => { + if (!cancelled) console.warn("[map-view] estate-value fetch failed:", err); + }); + + return () => { + cancelled = true; + if (evHandlersRef.current) { + const { map: m, onEnter, onMove, onLeave } = evHandlersRef.current; + m.off("mouseenter", "estate-value-fill", onEnter); + m.off("mousemove", "estate-value-fill", onMove); + m.off("mouseleave", "estate-value-fill", onLeave); + evHandlersRef.current = null; + } + try { + if (map.isStyleLoaded()) { + removeEstateValueLayers(map); + map.getCanvas().style.cursor = "crosshair"; + } + } catch { /* map removed */ } + estateValuePopupRef.current?.remove(); + estateValuePopupRef.current = null; + }; + }, [mapLoaded, showEstateValue, citySlug]); // eslint-disable-line react-hooks/exhaustive-deps + + // ── Initialize map (runs once on mount) ─────────────────────────────────── useEffect(() => { if (mountedRef.current || !containerRef.current) return; mountedRef.current = true; - (async () => { - const mgl = await import("maplibre-gl"); - const { Protocol } = await import("pmtiles"); + const protocol = new Protocol(); + maplibregl.addProtocol("pmtiles", protocol.tile); - const protocol = new Protocol(); - mgl.addProtocol("pmtiles", protocol.tile); + const map = new maplibregl.Map({ + container: containerRef.current, + style: "/tiles/style.json", + bounds: cityBbox, + fitBoundsOptions: { padding: 40 }, + }); - const map = new mgl.Map({ - container: containerRef.current!, - style: "/tiles/style.json", - bounds: cityBbox, - fitBoundsOptions: { padding: 40 }, + mapRef.current = map; + + map.on("load", () => { + const { citySlug: city, profile: prof, mode: m, threshold: t, activeCategory: cat, weights: w } = stateRef.current; + + map.addSource("grid-tiles", { + type: "vector", + tiles: [tileUrl(city, m, t, prof)], + minzoom: 0, + maxzoom: 16, }); - mapRef.current = map; - - map.on("load", () => { - const { citySlug: city, profile: prof, mode: m, threshold: t, activeCategory: cat, weights: w } = stateRef.current; - - map.addSource("grid-tiles", { - type: "vector", - tiles: [tileUrl(city, m, t, prof)], - minzoom: 0, - maxzoom: 16, - }); - - map.addLayer({ - id: "grid-fill", - type: "fill", - source: "grid-tiles", - "source-layer": "grid", - paint: { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - "fill-color": makeColorExpr(cat, w) as any, - "fill-opacity": 0.8, - "fill-outline-color": "rgba(0,0,0,0.06)", - }, - }); - - map.on("click", (e) => { - stateRef.current.onLocationClick?.(e.lngLat.lat, e.lngLat.lng); - }); - - map.getCanvas().style.cursor = "crosshair"; + map.addLayer({ + id: "grid-fill", + type: "fill", + source: "grid-tiles", + "source-layer": "grid", + paint: { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + "fill-color": makeColorExpr(cat, w) as any, + "fill-opacity": 0.5, + "fill-outline-color": "rgba(0,0,0,0.06)", + }, }); - })(); + + map.on("click", (e) => { + const evFeatures = map.getLayer("estate-value-fill") + ? map.queryRenderedFeatures(e.point, { layers: ["estate-value-fill"] }) + : []; + const estateValue = (evFeatures[0]?.properties?.value as number | null) ?? null; + stateRef.current.onLocationClick?.(e.lngLat.lat, e.lngLat.lng, estateValue); + }); + + map.getCanvas().style.cursor = "crosshair"; + + // Signal all reactive effects that the map is ready. + setMapLoaded(true); + }); return () => { + // Reset mapLoaded so effects re-run if the map is recreated (e.g. StrictMode). + setMapLoaded(false); markerRef.current?.remove(); markerRef.current = null; mapRef.current?.remove(); diff --git a/apps/web/hooks/use-job-progress.ts b/apps/web/hooks/use-job-progress.ts index 9fc7e4c..ba7b0ef 100644 --- a/apps/web/hooks/use-job-progress.ts +++ b/apps/web/hooks/use-job-progress.ts @@ -19,15 +19,35 @@ export interface StageStatus { message: string; } +// Four logical UI stages that map to the actual (parallel) pipeline jobs. +// extract-pois and build-valhalla run concurrently — they share "Processing OSM" +// so the linear mark-prior-as-completed logic stays correct. const STAGE_ORDER: Array<{ key: string; label: string }> = [ - { key: "Downloading PBF", label: "Download OSM data" }, - { key: "Filtering OSM tags", label: "Filter & extract POIs" }, - { key: "Importing to PostGIS", label: "Import to database" }, - { key: "Building routing graph", label: "Build routing graph" }, - { key: "Generating grid", label: "Generate analysis grid" }, - { key: "Computing scores", label: "Compute accessibility scores" }, + { key: "Downloading PBF", label: "Download OSM data" }, + { key: "Processing OSM", label: "Process OSM & build routes" }, + { key: "Generating grid", label: "Generate analysis grid" }, + { key: "Computing scores", label: "Compute accessibility scores" }, ]; +/** + * Maps raw worker stage strings → UI stage keys. + * All three parallel worker stages (extract-pois sub-stages + build-valhalla) + * fold into "Processing OSM". Routing sub-jobs and BORIS NI ingest fold + * into "Computing scores" (they run during compute-scores Phase 1). + */ +function normalizeStage(raw: string): string { + if ( + raw === "Clipping to bounding box" || + raw === "Filtering OSM tags" || + raw === "Importing to PostGIS" || + raw === "Building routing graph" + ) return "Processing OSM"; + if (raw.startsWith("Routing ") || raw === "Ingesting BORIS NI") { + return "Computing scores"; + } + return raw; +} + export type OverallStatus = "pending" | "active" | "completed" | "failed"; interface ProgressState { @@ -57,9 +77,10 @@ function initialState(): ProgressState { function reducer(state: ProgressState, action: Action): ProgressState { switch (action.type) { case "progress": { + const stageKey = normalizeStage(action.stage); let found = false; const stages = state.stages.map((s) => { - if (s.key === action.stage) { + if (s.key === stageKey) { found = true; return { ...s, @@ -94,9 +115,13 @@ function reducer(state: ProgressState, action: Action): ProgressState { export function useJobProgress(jobId: string | null): ProgressState { const [state, dispatch] = useReducer(reducer, undefined, initialState); const esRef = useRef(null); + // Tracks whether the stream ended with a legitimate "completed" event so + // the subsequent connection-close (which fires onerror) is ignored. + const completedRef = useRef(false); useEffect(() => { if (!jobId) return; + completedRef.current = false; const es = new EventSource(`/api/admin/jobs/${jobId}/stream`); esRef.current = es; @@ -104,10 +129,18 @@ export function useJobProgress(jobId: string | null): ProgressState { es.onmessage = (event) => { const payload = JSON.parse(event.data) as SSEEvent; if (payload.type === "heartbeat") return; + if (payload.type === "completed") { + // Close before the server closes so the subsequent connection-close + // does not trigger onerror and overwrite the completed state. + completedRef.current = true; + es.close(); + esRef.current = null; + } dispatch(payload as Action); }; es.onerror = () => { + if (completedRef.current) return; // Normal close after completion — ignore dispatch({ type: "failed", error: "Lost connection to job stream" }); es.close(); }; diff --git a/infra/schema.sql b/infra/schema.sql index c280a5b..f8b66d0 100644 --- a/infra/schema.sql +++ b/infra/schema.sql @@ -52,16 +52,23 @@ CREATE INDEX IF NOT EXISTS idx_raw_pois_name -- ─── Grid points ───────────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS grid_points ( - id BIGSERIAL PRIMARY KEY, - city_slug TEXT NOT NULL REFERENCES cities(slug) ON DELETE CASCADE, - geom geometry(Point, 4326) NOT NULL, - grid_x INTEGER NOT NULL, - grid_y INTEGER NOT NULL, + id BIGSERIAL PRIMARY KEY, + city_slug TEXT NOT NULL REFERENCES cities(slug) ON DELETE CASCADE, + geom geometry(Point, 4326) NOT NULL, + grid_x INTEGER NOT NULL, + grid_y INTEGER NOT NULL, + hidden_gem_score FLOAT4, UNIQUE (city_slug, grid_x, grid_y) ); +-- Migration for existing databases +ALTER TABLE grid_points ADD COLUMN IF NOT EXISTS hidden_gem_score FLOAT4; + CREATE INDEX IF NOT EXISTS idx_grid_city ON grid_points (city_slug); CREATE INDEX IF NOT EXISTS idx_grid_geom ON grid_points USING GIST (geom); +CREATE INDEX IF NOT EXISTS idx_grid_hidden_gem + ON grid_points (city_slug, hidden_gem_score) + WHERE hidden_gem_score IS NOT NULL; -- ─── Pre-computed accessibility scores ─────────────────────────────────────── @@ -123,3 +130,31 @@ CREATE INDEX IF NOT EXISTS idx_isochrone_created -- Auto-expire isochrone cache entries older than 30 days -- (handled by periodic cleanup or TTL logic in app) + +-- ─── Estate value zones ─────────────────────────────────────────────────────── +-- Populated by datasource-specific worker jobs (currently: ingest-boris-ni). +-- Only present for cities whose bbox intersects a supported region. + +-- Migration: rename if upgrading from the previous schema version +DO $$ BEGIN + IF EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'boris_zones') + AND NOT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'estate_value_zones') + THEN ALTER TABLE boris_zones RENAME TO estate_value_zones; END IF; +END $$; + +CREATE TABLE IF NOT EXISTS estate_value_zones ( + id BIGSERIAL PRIMARY KEY, + city_slug TEXT NOT NULL REFERENCES cities(slug) ON DELETE CASCADE, + geom geometry(GEOMETRY, 4326) NOT NULL, + value_eur_m2 NUMERIC, + zone_name TEXT, + usage_type TEXT, + usage_detail TEXT, + dev_state TEXT, + stichtag TEXT, + source TEXT NOT NULL DEFAULT 'boris-ni', + ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_estate_value_zones_city ON estate_value_zones (city_slug); +CREATE INDEX IF NOT EXISTS idx_estate_value_zones_geom ON estate_value_zones USING GIST (geom); diff --git a/shared/src/queue.ts b/shared/src/queue.ts index 1ef2429..1e31a0c 100644 --- a/shared/src/queue.ts +++ b/shared/src/queue.ts @@ -28,6 +28,8 @@ export interface ComputeScoresJobData { thresholds: number[]; /** Set after compute-routing children are dispatched (internal two-phase state). */ routingDispatched?: boolean; + /** When true, ingest-boris-ni is dispatched in Phase 1 to run alongside routing jobs. */ + ingestBorisNi?: boolean; } export interface ComputeRoutingJobData { @@ -55,6 +57,11 @@ export interface RefreshCityJobData { resolutionM?: number; } +export interface IngestBorisNiJobData { + type: "ingest-boris-ni"; + citySlug: string; +} + export type PipelineJobData = | DownloadPbfJobData | ExtractPoisJobData @@ -62,7 +69,8 @@ export type PipelineJobData = | ComputeScoresJobData | ComputeRoutingJobData | BuildValhallaJobData - | RefreshCityJobData; + | RefreshCityJobData + | IngestBorisNiJobData; // ─── Job options (BullMQ-compatible plain objects) ──────────────────────────── @@ -104,4 +112,10 @@ export const JOB_OPTIONS: Record = { removeOnComplete: { age: 86400 * 7 }, removeOnFail: { age: 86400 * 30 }, }, + "ingest-boris-ni": { + attempts: 2, + backoff: { type: "fixed", delay: 5000 }, + removeOnComplete: { age: 86400 * 7 }, + removeOnFail: { age: 86400 * 30 }, + }, }; diff --git a/worker/src/index.ts b/worker/src/index.ts index 02090bb..3c57450 100644 --- a/worker/src/index.ts +++ b/worker/src/index.ts @@ -7,6 +7,7 @@ import { handleGenerateGrid } from "./jobs/generate-grid.js"; import { handleComputeScores } from "./jobs/compute-scores.js"; import { handleComputeRouting } from "./jobs/compute-routing.js"; import { handleRefreshCity } from "./jobs/refresh-city.js"; +import { handleIngestBorisNi } from "./jobs/ingest-boris-ni.js"; console.log("[worker] Starting Transportationer pipeline worker…"); @@ -28,6 +29,8 @@ const worker = new Worker( return handleComputeRouting(job as Job); case "refresh-city": return handleRefreshCity(job as Job); + case "ingest-boris-ni": + return handleIngestBorisNi(job as Job); default: throw new Error(`Unknown job type: ${(job.data as any).type}`); } diff --git a/worker/src/jobs/compute-scores.ts b/worker/src/jobs/compute-scores.ts index f30b1b3..3c7f309 100644 --- a/worker/src/jobs/compute-scores.ts +++ b/worker/src/jobs/compute-scores.ts @@ -17,6 +17,8 @@ export type ComputeScoresData = { thresholds: number[]; /** Persisted after routing children are dispatched to distinguish phase 1 from phase 2. */ routingDispatched?: boolean; + /** When true, ingest-boris-ni is dispatched in Phase 1 to run alongside routing jobs. */ + ingestBorisNi?: boolean; }; const INSERT_CHUNK = 2000; @@ -94,6 +96,8 @@ export async function handleComputeScores( // Enqueue one routing child per (mode, category). Each child registers // itself to this parent job via opts.parent, so BullMQ tracks completion. + // For NI cities, ingest-boris-ni is also enqueued here so it runs in + // parallel with the routing jobs rather than sequentially after them. const queue = new Queue("pipeline", { connection: createBullMQConnection() }); try { for (const mode of modes) { @@ -116,6 +120,21 @@ export async function handleComputeScores( ); } } + + // Dispatch BORIS NI ingest as a sibling child so it runs during routing. + if (job.data.ingestBorisNi) { + await queue.add( + "ingest-boris-ni", + { type: "ingest-boris-ni", citySlug }, + { + attempts: 2, + backoff: { type: "fixed", delay: 5000 }, + removeOnComplete: { age: 86400 * 7 }, + removeOnFail: { age: 86400 * 30 }, + parent: { id: job.id!, queue: queue.qualifiedName }, + }, + ); + } } finally { await queue.close(); } @@ -202,15 +221,13 @@ export async function handleComputeScores( } // Compute and insert scores for every threshold × profile combination. - for (let ti = 0; ti < thresholds.length; ti++) { - const thresholdMin = thresholds[ti]; - const threshold_s = thresholdMin * 60; + // Each threshold writes to distinct rows (threshold_min is part of the PK), + // so all thresholds can be processed concurrently without conflicts. + // Node.js is single-threaded so completedThresholds++ is safe. + let completedThresholds = 0; - await job.updateProgress({ - stage: "Computing scores", - pct: 70 + Math.round(((ti + 1) / thresholds.length) * 28), - message: `${thresholdMin}min — inserting scores for all profiles…`, - } satisfies JobProgress); + await Promise.all(thresholds.map(async (thresholdMin) => { + const threshold_s = thresholdMin * 60; const gpIdArr: string[] = []; const catArr: string[] = []; @@ -234,6 +251,9 @@ export async function handleComputeScores( } } + // Chunks within a threshold stay sequential — with all thresholds running + // concurrently we already have up to thresholds.length parallel INSERT + // streams, which saturates the connection pool without overwhelming it. for (let i = 0; i < gpIdArr.length; i += INSERT_CHUNK) { const end = Math.min(i + INSERT_CHUNK, gpIdArr.length); await Promise.resolve(sql` @@ -270,13 +290,70 @@ export async function handleComputeScores( computed_at = now() `); } - } + + completedThresholds++; + await job.updateProgress({ + stage: "Computing scores", + pct: 70 + Math.round((completedThresholds / thresholds.length) * 28), + message: `${completedThresholds} / ${thresholds.length} thresholds done…`, + } satisfies JobProgress); + })); await Promise.resolve(sql` UPDATE cities SET status = 'ready', last_ingested = now() WHERE slug = ${citySlug} `); + // Compute hidden gem scores per grid point for cities that have estate value zones. + // Each grid point looks up the nearest zone's price, ranks it within its accessibility + // decile, and stores hidden_gem_score = composite_accessibility × (1 − price_rank). + const gemThreshold = thresholds.includes(15) ? 15 : thresholds[0]; + const [{ n }] = await Promise.resolve(sql<{ n: number }[]>` + SELECT count(*)::int AS n FROM estate_value_zones + WHERE city_slug = ${citySlug} AND value_eur_m2 IS NOT NULL + `); + if (n > 0) { + await job.updateProgress({ + stage: "Computing scores", + pct: 99, + message: "Computing hidden gem scores…", + } satisfies JobProgress); + await Promise.resolve(sql` + WITH grid_with_price AS ( + -- For each grid point, get composite accessibility score and nearest zone price + SELECT + gp.id, + COALESCE(AVG(gs.score), 0) AS composite_score, + ROUND(COALESCE(AVG(gs.score), 0) * 10)::int AS score_decile, + ( + SELECT ez.value_eur_m2 + FROM estate_value_zones ez + WHERE ez.city_slug = ${citySlug} AND ez.value_eur_m2 IS NOT NULL + ORDER BY gp.geom <-> ez.geom + LIMIT 1 + ) AS value_eur_m2 + FROM grid_points gp + JOIN grid_scores gs ON gs.grid_point_id = gp.id + WHERE gp.city_slug = ${citySlug} + AND gs.travel_mode = 'walking' + AND gs.threshold_min = ${gemThreshold} + AND gs.profile = 'universal' + GROUP BY gp.id + ), + ranked AS ( + SELECT + id, + composite_score, + PERCENT_RANK() OVER (PARTITION BY score_decile ORDER BY value_eur_m2) AS price_rank + FROM grid_with_price + WHERE value_eur_m2 IS NOT NULL + ) + UPDATE grid_points gp + SET hidden_gem_score = (ranked.composite_score * (1.0 - ranked.price_rank))::float4 + FROM ranked WHERE gp.id = ranked.id + `); + } + await job.updateProgress({ stage: "Computing scores", pct: 100, diff --git a/worker/src/jobs/download-pbf.ts b/worker/src/jobs/download-pbf.ts index f9e7ede..9f0baf6 100644 --- a/worker/src/jobs/download-pbf.ts +++ b/worker/src/jobs/download-pbf.ts @@ -1,6 +1,5 @@ import type { Job } from "bullmq"; -import { createWriteStream, mkdirSync } from "fs"; -import { pipeline } from "stream/promises"; +import { createWriteStream, mkdirSync, statSync, renameSync } from "fs"; import { Writable } from "stream"; import type { JobProgress } from "@transportationer/shared"; @@ -27,6 +26,23 @@ export async function handleDownloadPbf( mkdirSync(OSM_DATA_DIR, { recursive: true }); const outputPath = `${OSM_DATA_DIR}/${citySlug}-latest.osm.pbf`; + const tmpPath = `${outputPath}.tmp`; + + // Idempotency: skip if a complete file is already on disk (supports + // parallel download-pbf instances for the same city PBF). + try { + const stat = statSync(outputPath); + if (stat.size > 0) { + await job.updateProgress({ + stage: "Downloading PBF", + pct: 100, + message: `Already on disk: ${outputPath} (${(stat.size / 1_048_576).toFixed(1)} MB)`, + } satisfies JobProgress); + return; + } + } catch { + // File doesn't exist — fall through to download. + } await job.updateProgress({ stage: "Downloading PBF", @@ -49,7 +65,9 @@ export async function handleDownloadPbf( let downloaded = 0; let lastPct = -1; - const fileStream = createWriteStream(outputPath); + // Write to a temp path; rename to final path on completion so concurrent + // instances see a complete file or nothing (never a partial file). + const fileStream = createWriteStream(tmpPath); // Count bytes through a transform, then write to file const reader = response.body.getReader(); @@ -94,6 +112,9 @@ export async function handleDownloadPbf( ); })(); + // Atomically promote the temp file to the final path. + renameSync(tmpPath, outputPath); + await job.updateProgress({ stage: "Downloading PBF", pct: 100, diff --git a/worker/src/jobs/extract-pois.ts b/worker/src/jobs/extract-pois.ts index 3bf9a80..9a87504 100644 --- a/worker/src/jobs/extract-pois.ts +++ b/worker/src/jobs/extract-pois.ts @@ -165,6 +165,7 @@ export async function handleExtractPois( INSERT INTO raw_pois (osm_id, osm_type, city_slug, category, subcategory, name, tags, geom) SELECT osm_id, osm_type, city_slug, category, subcategory, name, tags, geom FROM ${sql(stagingTable)} + WHERE geom IS NOT NULL `); await Promise.resolve(sql`DROP TABLE IF EXISTS ${sql(stagingTable)}`); diff --git a/worker/src/jobs/ingest-boris-ni.ts b/worker/src/jobs/ingest-boris-ni.ts new file mode 100644 index 0000000..1443f6b --- /dev/null +++ b/worker/src/jobs/ingest-boris-ni.ts @@ -0,0 +1,262 @@ +/** + * Ingest BORIS NI (Niedersachsen Bodenrichtwerte) estate value zones. + * + * Fetches from the LGLN WFS and stores into estate_value_zones. + * Only enqueued by refresh-city when the city's bbox intersects Niedersachsen. + */ +import type { Job } from "bullmq"; +import { getSql } from "../db.js"; +import type { JobProgress } from "@transportationer/shared"; + +export type IngestBorisNiData = { + type: "ingest-boris-ni"; + citySlug: string; +}; + +const WFS_BASE = "https://opendata.lgln.niedersachsen.de/doorman/noauth/boris_wfs"; +const CHUNK = 500; + +// ─── GML Parser (WFS 2.0 / GML 3.2) ────────────────────────────────────────── + +function extractFirst(xml: string, localName: string): string | null { + const re = new RegExp(`<(?:[\\w]+:)?${localName}(?:\\s[^>]*)?>([^<]*)`, ""); + const m = re.exec(xml); + return m ? m[1].trim() || null : null; +} + +function parsePosListCoords(posList: string): [number, number][] { + const nums = posList.trim().split(/\s+/).map(Number); + const coords: [number, number][] = []; + for (let i = 0; i + 1 < nums.length; i += 2) { + coords.push([nums[i + 1], nums[i]]); // lat,lon → lon,lat (GeoJSON) + } + return coords; +} + +function parseGmlPolygon(xml: string): { type: "Polygon"; coordinates: [number, number][][] } | null { + const extMatch = /]*>([\s\S]*?)<\/gml:posList>/.exec(xml); + if (!extMatch) return null; + const exterior = parsePosListCoords(extMatch[1]); + if (exterior.length < 3) return null; + + const rings: [number, number][][] = [exterior]; + const intRe = /]*>([\s\S]*?)<\/gml:posList>/g; + let intMatch: RegExpExecArray | null; + while ((intMatch = intRe.exec(xml)) !== null) { + const interior = parsePosListCoords(intMatch[1]); + if (interior.length >= 3) rings.push(interior); + } + return { type: "Polygon", coordinates: rings }; +} + +type GeoJsonGeometry = + | { type: "Polygon"; coordinates: [number, number][][] } + | { type: "MultiPolygon"; coordinates: [number, number][][][] }; + +type EstateValueFeature = { + geometry: GeoJsonGeometry; + value: number | null; + zoneName: string | null; + usageType: string | null; + usageDetail: string | null; + devState: string | null; + stichtag: string | null; +}; + +function parseMember(xml: string): EstateValueFeature | null { + const value = extractFirst(xml, "bodenrichtwert"); + const zoneName = extractFirst(xml, "bodenrichtwertzoneName"); + const art = extractFirst(xml, "art"); + const ergaenzung = extractFirst(xml, "ergaenzung"); + const devState = extractFirst(xml, "entwicklungszustand"); + const stichtag = extractFirst(xml, "stichtag"); + + // Only keep Bauland (B) zones designated for residential or mixed use. + // entwicklungszustand=LF covers Landwirtschaft, Forst (F), Grünland (GR), + // and Ackerland (A) — all priced €0.65–€4/m², irrelevant for housing. + // Within Bauland, art W* = Wohnbaufläche (residential) and M* = Mischgebiet + // (mixed use, housing permitted); GE/GI (commercial/industrial) and SO + // (special use) are excluded as houses cannot be built there. + if (devState !== "B") return null; + if (!art || (!art.startsWith("W") && !art.startsWith("M"))) return null; + + let geometry: GeoJsonGeometry | null = null; + + const polygonMatch = /]*>([\s\S]*?)<\/gml:Polygon>/.exec(xml); + if (polygonMatch) { + geometry = parseGmlPolygon(polygonMatch[0]); + } else { + const multiMatch = /]*>([\s\S]*?)<\/gml:MultiSurface>/.exec(xml); + if (multiMatch) { + const polys: [number, number][][][] = []; + const surfRe = /]*>([\s\S]*?)<\/gml:surfaceMember>/g; + let surfMatch: RegExpExecArray | null; + while ((surfMatch = surfRe.exec(multiMatch[0])) !== null) { + const polyFrag = /]*>([\s\S]*?)<\/gml:Polygon>/.exec(surfMatch[0]); + if (polyFrag) { + const poly = parseGmlPolygon(polyFrag[0]); + if (poly) polys.push(poly.coordinates); + } + } + if (polys.length > 0) { + geometry = { type: "MultiPolygon", coordinates: polys }; + } + } + } + + if (!geometry) return null; + + return { + geometry, + value: value !== null ? parseFloat(value) : null, + zoneName, + usageType: art, + usageDetail: ergaenzung, + devState, + stichtag, + }; +} + +function parseGmlFeatures(gml: string): EstateValueFeature[] { + const features: EstateValueFeature[] = []; + const memberRe = /([\s\S]*?)<\/wfs:member>/g; + let match: RegExpExecArray | null; + while ((match = memberRe.exec(gml)) !== null) { + const f = parseMember(match[1]); + if (f) features.push(f); + } + return features; +} + +// ─── Job handler ────────────────────────────────────────────────────────────── + +export async function handleIngestBorisNi(job: Job): Promise { + const { citySlug } = job.data; + const sql = getSql(); + + await job.updateProgress({ + stage: "Ingesting BORIS NI", + pct: 0, + message: `Reading bbox for ${citySlug}`, + } satisfies JobProgress); + + const bboxRows = await Promise.resolve(sql<{ + minlng: number; minlat: number; maxlng: number; maxlat: number; + }[]>` + SELECT + ST_XMin(bbox)::float AS minlng, + ST_YMin(bbox)::float AS minlat, + ST_XMax(bbox)::float AS maxlng, + ST_YMax(bbox)::float AS maxlat + FROM cities WHERE slug = ${citySlug} AND bbox IS NOT NULL + `); + + if (bboxRows.length === 0) { + throw new Error(`No bbox set for city ${citySlug}`); + } + + const { minlng, minlat, maxlng, maxlat } = bboxRows[0]; + + await job.updateProgress({ + stage: "Ingesting BORIS NI", + pct: 10, + message: `Fetching BORIS NI WFS for ${citySlug}`, + } satisfies JobProgress); + + // Fetch from BORIS NI WFS (lat/lon axis order for EPSG:4326) + const wfsUrl = new URL(WFS_BASE); + wfsUrl.searchParams.set("SERVICE", "WFS"); + wfsUrl.searchParams.set("VERSION", "2.0.0"); + wfsUrl.searchParams.set("REQUEST", "GetFeature"); + wfsUrl.searchParams.set("TYPENAMES", "boris:BR_BodenrichtwertZonal"); + wfsUrl.searchParams.set("SRSNAME", "urn:ogc:def:crs:EPSG::4326"); + wfsUrl.searchParams.set("BBOX", `${minlat},${minlng},${maxlat},${maxlng},urn:ogc:def:crs:EPSG::4326`); + wfsUrl.searchParams.set("COUNT", "10000"); + + const response = await fetch(wfsUrl.toString(), { + headers: { Accept: "application/gml+xml; version=3.2" }, + signal: AbortSignal.timeout(60_000), + }); + + if (!response.ok) { + throw new Error(`BORIS NI WFS returned HTTP ${response.status}`); + } + + const gml = await response.text(); + + if (gml.includes(" JSON.stringify(f.geometry)); + const values = chunk.map((f) => f.value); + const zoneNames = chunk.map((f) => f.zoneName); + const usageTypes = chunk.map((f) => f.usageType); + const usageDetails = chunk.map((f) => f.usageDetail); + const devStates = chunk.map((f) => f.devState); + const stichtags = chunk.map((f) => f.stichtag); + + await Promise.resolve(sql` + INSERT INTO estate_value_zones + (city_slug, geom, value_eur_m2, zone_name, usage_type, usage_detail, dev_state, stichtag, source) + SELECT + ${citySlug}, + ST_SetSRID(ST_GeomFromGeoJSON(g), 4326), + v, + zn, ut, ud, ds, st, + 'boris-ni' + FROM unnest( + ${geomJsons}::text[], + ${values}::numeric[], + ${zoneNames}::text[], + ${usageTypes}::text[], + ${usageDetails}::text[], + ${devStates}::text[], + ${stichtags}::text[] + ) AS t(g, v, zn, ut, ud, ds, st) + `); + + inserted += chunk.length; + await job.updateProgress({ + stage: "Ingesting BORIS NI", + pct: 50 + Math.round((inserted / features.length) * 50), + message: `Stored ${inserted}/${features.length} zones`, + } satisfies JobProgress); + } + + console.log(`[ingest-boris-ni] ✓ Stored ${inserted} BORIS NI zones for ${citySlug}`); + + await job.updateProgress({ + stage: "Ingesting BORIS NI", + pct: 100, + message: `BORIS NI ingest complete: ${inserted} zones`, + } satisfies JobProgress); +} diff --git a/worker/src/jobs/refresh-city.ts b/worker/src/jobs/refresh-city.ts index 77cea31..f31e3c6 100644 --- a/worker/src/jobs/refresh-city.ts +++ b/worker/src/jobs/refresh-city.ts @@ -14,6 +14,11 @@ export type RefreshCityData = { const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm"; +/** True when the given bbox intersects Niedersachsen. */ +function isInNiedersachsen(minLng: number, minLat: number, maxLng: number, maxLat: number): boolean { + return minLng < 11.779 && maxLng > 6.526 && minLat < 54.033 && maxLat > 51.197; +} + export async function handleRefreshCity( job: Job, ): Promise { @@ -50,72 +55,81 @@ export async function handleRefreshCity( UPDATE cities SET status = 'processing' WHERE slug = ${citySlug} `); - // FlowProducer creates a dependency chain evaluated bottom-up: - // download → extract → generate-grid → build-valhalla → compute-scores - const flow = new FlowProducer({ connection: createBullMQConnection() }); + // Shared download node factory — produces an idempotent download-pbf node. + // Two independent nodes with the same geofabrikUrl are safe: the idempotency + // check in download-pbf.ts skips the download if the file already exists. + const downloadNode = () => ({ + name: "download-pbf", + queueName: "pipeline", + data: { type: "download-pbf" as const, citySlug, geofabrikUrl }, + opts: JOB_OPTIONS["download-pbf"], + }); - try { - await flow.add({ - name: "compute-scores", - queueName: "pipeline", - data: { - type: "compute-scores", - citySlug, - modes: ["walking", "cycling", "driving"], - thresholds: [5, 10, 15, 20, 30], - }, - opts: JOB_OPTIONS["compute-scores"], - children: [ - { - name: "generate-grid", - queueName: "pipeline", - data: { - type: "generate-grid", - citySlug, - resolutionM, - }, - opts: JOB_OPTIONS["generate-grid"], - children: [ - { - name: "build-valhalla", - queueName: "valhalla", // handled by the dedicated valhalla-worker - data: { - type: "build-valhalla", - citySlug, - pbfPath, - ...(bbox ? { bbox } : {}), - }, - opts: JOB_OPTIONS["build-valhalla"], - children: [ - { - name: "extract-pois", - queueName: "pipeline", - data: { - type: "extract-pois", - citySlug, - pbfPath, - ...(bbox ? { bbox } : {}), - }, - opts: JOB_OPTIONS["extract-pois"], - children: [ - { - name: "download-pbf", - queueName: "pipeline", - data: { - type: "download-pbf", - citySlug, - geofabrikUrl, - }, - opts: JOB_OPTIONS["download-pbf"], - }, - ], - }, - ], + // For cities in Niedersachsen, ingest-boris-ni is dispatched in Phase 1 + // of compute-scores so it runs in parallel with the routing jobs. + const niApplicable = !!(bbox && isInNiedersachsen(...bbox)); + + // Parallel pipeline DAG (bottom-up — leaves execute first): + // + // download-pbf ─┬─→ extract-pois ──┐ + // │ ├─→ generate-grid → compute-scores + // download-pbf ─└─→ build-valhalla ─┘ + // + // compute-scores Phase 1 also dispatches ingest-boris-ni (NI cities only) + // as a child alongside the routing jobs, so it runs during routing. + const rootNode = { + name: "compute-scores", + queueName: "pipeline", + data: { + type: "compute-scores" as const, + citySlug, + modes: ["walking", "cycling", "driving"] as const, + thresholds: [5, 10, 15, 20, 30], + ingestBorisNi: niApplicable, + }, + opts: JOB_OPTIONS["compute-scores"], + children: [ + { + name: "generate-grid", + queueName: "pipeline", + data: { type: "generate-grid" as const, citySlug, resolutionM }, + opts: JOB_OPTIONS["generate-grid"], + children: [ + // extract-pois and build-valhalla run in parallel — each gets its + // own download-pbf child; the idempotency guard ensures only one + // actually downloads when they race. + { + name: "extract-pois", + queueName: "pipeline", + data: { + type: "extract-pois" as const, + citySlug, + pbfPath, + ...(bbox ? { bbox } : {}), }, - ], - }, - ], - }); + opts: JOB_OPTIONS["extract-pois"], + children: [downloadNode()], + }, + { + name: "build-valhalla", + queueName: "valhalla", + data: { + type: "build-valhalla" as const, + citySlug, + pbfPath, + ...(bbox ? { bbox } : {}), + }, + opts: JOB_OPTIONS["build-valhalla"], + children: [downloadNode()], + }, + ], + }, + ], + }; + + const flow = new FlowProducer({ connection: createBullMQConnection() }); + try { + await flow.add(rootNode); } finally { await flow.close(); } @@ -123,6 +137,6 @@ export async function handleRefreshCity( await job.updateProgress({ stage: "Orchestrating pipeline", pct: 100, - message: "All pipeline jobs enqueued. Processing will begin shortly.", + message: `All pipeline jobs enqueued${niApplicable ? " (includes BORIS NI, parallel with routing)" : ""}. Processing will begin shortly.`, } satisfies JobProgress); }