diff --git a/README.md b/README.md index a2386f7..72b82c6 100644 --- a/README.md +++ b/README.md @@ -16,21 +16,27 @@ Next.js App Server └── Valkey (API response cache, BullMQ queues) BullMQ Worker (pipeline queue, concurrency 8) - ├── refresh-city → orchestrates full ingest via FlowProducer - ├── download-pbf → streams OSM PBF from Geofabrik - ├── extract-pois → osmium filter + osm2pgsql flex → raw_pois - ├── build-valhalla → clips PBF, builds Valhalla routing tiles - ├── generate-grid → PostGIS 200 m hex grid → grid_points - ├── compute-scores → two-phase orchestrator (see Scoring below) - └── compute-routing → Valhalla matrix → grid_poi_details - (15 parallel jobs: 3 modes × 5 categories) + ├── refresh-city → orchestrates full ingest via FlowProducer + ├── download-pbf → streams OSM PBF from Geofabrik + ├── extract-pois → osmium filter + osm2pgsql flex → raw_pois + ├── build-valhalla → clips PBF, builds Valhalla routing tiles + transit tiles + ├── download-gtfs-de → downloads & extracts GTFS feed for German ÖPNV + ├── generate-grid → PostGIS 200 m hex grid → grid_points + ├── compute-scores → two-phase orchestrator (see Scoring below) + ├── compute-routing → Valhalla matrix → grid_poi_details + │ (15 parallel jobs: 3 modes × 5 categories) + └── compute-transit → Valhalla isochrones → grid_poi_details (travel_mode='transit') + (1 job per city, covers all categories via PostGIS spatial join) BullMQ Worker (valhalla queue, concurrency 1) - └── build-valhalla → runs valhalla_build_tiles, manages valhalla_service + └── build-valhalla → valhalla_ingest_transit + valhalla_convert_transit (GTFS → tiles), + valhalla_build_tiles (road graph + transit connection), + manages valhalla_service Valhalla (child process of valhalla worker) - ├── sources_to_targets matrix → compute-routing jobs - └── isochrones endpoint → user click → /api/isochrones + ├── sources_to_targets matrix → compute-routing jobs (walking/cycling/driving) + ├── isochrone (multimodal) → compute-transit jobs + └── isochrone endpoint → user click → /api/isochrones Protomaps → self-hosted map tiles (PMTiles) ``` @@ -105,52 +111,97 @@ docker compose up postgres valkey -d ### Data pipeline -For each grid point (200 m hexagonal spacing) the pipeline runs in two phases: +For each city the pipeline runs in two phases: -**Phase 1 — Routing** (15 parallel jobs: 3 modes × 5 categories) +**Phase 1 — Routing** (parallel child jobs) -A PostGIS KNN lateral join (`<->` operator) finds the 6 nearest POIs in the category for each grid point. Those POI coordinates are passed to Valhalla's `sources_to_targets` matrix API to obtain real network travel times for the requested travel mode (walking, cycling, driving). The nearest POI per subcategory is persisted to `grid_poi_details`. +*Walking, cycling, driving — 15 jobs (3 modes × 5 categories):* +A PostGIS KNN lateral join (`<->` operator) finds the 6 nearest POIs in the category for each grid point (200 m hexagonal spacing). Those POI coordinates are sent in batches of 20 to Valhalla's `sources_to_targets` matrix API to obtain exact real-network travel times. The nearest POI per subcategory is persisted to `grid_poi_details`. + +*Transit — 1 job per city (`compute-transit`):* +Valhalla's matrix API does not support transit. Instead, for each grid point a multimodal isochrone is fetched from Valhalla at contour intervals of 5, 10, 15, 20, and 30 minutes (fixed departure: Tuesday 08:00 to ensure reproducible GTFS results). PostGIS `ST_Within` then classifies all POIs in the city into the smallest contour they fall within, giving estimated travel times of 300 s / 600 s / 900 s / 1 200 s / 1 800 s respectively. Grid points outside the transit network are silently skipped — transit contributes nothing to their score and the other modes compensate. **Phase 2 — Score aggregation** Scores are precomputed for every combination of: - 5 thresholds: 5, 10, 15, 20, 30 minutes -- 3 travel modes: walking, cycling, driving +- 5 travel modes (see below) - 5 profiles: Universal, Young Family, Senior, Young Professional, Student +### Travel modes + +| Mode | Internal key | How travel time is obtained | +|------|--------------|-----------------------------| +| Best mode | `fifteen` | Synthetic — minimum travel time across walking, cycling, and transit per subcategory. A destination reachable by any of these modes within the threshold counts as accessible. Driving excluded intentionally. | +| Walking | `walking` | Valhalla pedestrian matrix, exact seconds | +| Cycling | `cycling` | Valhalla bicycle matrix, exact seconds | +| Transit | `transit` | Valhalla multimodal isochrone, quantised to 5-min bands (requires GTFS feed) | +| Driving | `driving` | Valhalla auto matrix, exact seconds | + +The `fifteen` mode is computed entirely in memory during Phase 2: for each (grid point, category, subcategory) the minimum travel time across the three active modes is used, then scored normally. No extra routing jobs are needed. + ### Scoring formula -Each subcategory *i* within a category contributes a sigmoid score: +Each subcategory *i* within a category contributes a sigmoid score based on the real travel time `t` and the selected threshold `T` (both in seconds): ``` sigmoid(t, T) = 1 / (1 + exp(4 × (t − T) / T)) ``` -Where `t` is the Valhalla travel time in seconds and `T` is the threshold in seconds. The sigmoid equals 0.5 exactly at the threshold and approaches 1 for very short times. +The sigmoid equals 0.5 exactly at the threshold and approaches 1 for very short times. It is continuous, so a 14-minute trip to a park still contributes nearly as much as a 10-minute trip under a 15-minute threshold. -The category score combines subcategories via a complement-product, weighted by per-profile subcategory importance weights `w_i ∈ [0, 1]`: +The category score combines all subcategories via a **complement product**, weighted by per-profile subcategory importance weights `w_i ∈ [0, 1]`: ``` category_score = 1 − ∏ (1 − w_i × sigmoid(t_i, T)) ``` -This captures diversity of coverage: reaching one supermarket near you already yields a high score, but having a pharmacy, bakery, and bank nearby as well pushes the score higher. +This captures diversity of coverage: one nearby supermarket already yields a high score, but also having a pharmacy and a bakery pushes it higher. Missing subcategories (no POI found) are simply omitted from the product and do not penalise the score. ### Profiles Each profile carries two sets of weights: -- **Category weights** (used as slider presets in the UI, range 0–2): how much relative importance each of the 5 categories gets in the composite score. -- **Subcategory weights** (used during score computation, range 0–1): how much a specific subcategory contributes to its category score. +- **Category weights** (used as slider presets in the UI, range 0–2): how much relative importance each of the 5 categories receives in the composite score. +- **Subcategory weights** (baked into precomputed scores, range 0–1): how strongly a specific subcategory contributes to its parent category score. -| Profile | Focus | -|---------|-------| -| Universal | Balanced across all resident types | -| Young Family | Schools, playgrounds, healthcare, daily shopping | -| Senior | Healthcare, local services, accessible green space, transit | -| Young Professional | Rapid transit, fitness, dining, coworking | -| Student | University, library, cafés, transit, budget services | +| Profile | Emoji | Category emphasis | Notable subcategory boosts | +|---------|-------|-------------------|---------------------------| +| Universal | ⚖️ | All equal (1.0) | Balanced baseline | +| Young Family | 👨‍👩‍👧 | Work & School 1.5×, Recreation 1.4×, Service 1.2× | school, kindergarten, playground, clinic all → 1.0 | +| Senior | 🧓 | Culture & Community 1.5×, Service 1.4×, Transport 1.1× | hospital, clinic, pharmacy, social services → 1.0; school → 0.05 | +| Young Professional | 💼 | Transport 1.5×, Recreation 1.1× | metro, train → 1.0; gym 0.9; coworking 0.85; school → 0.1 | +| Student | 🎓 | Work & School 1.5×, Transport 1.4×, Culture 1.2× | university, library → 1.0; bike share 0.85; school → 0.05 | ### Composite score -The composite shown on the heatmap is a weighted average of the 5 category scores. Category weights come from the selected profile but can be adjusted freely in the UI. All scores are precomputed — changing the profile or weights only triggers a client-side re-blend with no server round-trip. +The composite shown on the heatmap is a weighted average of the 5 category scores. Category weights come from the selected profile but can be adjusted freely in the UI. **All scores are precomputed** — changing the profile, threshold, or travel mode only queries the database; adjusting the category weight sliders re-blends entirely client-side with no round-trip. + +### Per-location score (pin) + +When a user places a pin on the map: + +1. The nearest grid point is found via a PostGIS `<->` KNN query. +2. Precomputed `grid_scores` rows for that grid point, travel mode, threshold, and profile are returned — one row per category. +3. Per-subcategory detail rows from `grid_poi_details` are also fetched, showing the name, straight-line distance, and travel time to the nearest POI in each subcategory for the requested mode. +4. An isochrone overlay is fetched live from Valhalla and shown on the map (walking is used as the representative mode for `fifteen` and `transit` since Valhalla's interactive isochrone only supports single-mode costing). + +The pin panel also shows estate value data (land price in €/m² from the BORIS NI cadastre) for cities in Lower Saxony, including a percentile rank among all zones in the city and a "peer percentile" rank among zones with similar accessibility scores. + +### Hidden gem score + +For cities with BORIS NI estate value data, a **hidden gem score** is precomputed per grid point at the end of Phase 2: + +``` +hidden_gem_score = composite_accessibility × (1 − price_rank_within_decile) +``` + +- `composite_accessibility` — average of all category scores for that grid point (walking / 15 min / universal profile) +- `price_rank_within_decile` — `PERCENT_RANK()` of the nearest zone's land price among all zones in the same accessibility decile (0 = cheapest, 1 = most expensive relative to equally accessible peers) + +The result is in [0, 1]: high only when a location is both accessible *and* priced below its peers. Stored in `grid_points.hidden_gem_score` and served as a separate MVT overlay at `/api/tiles/hidden-gems/`. + +The map offers three mutually exclusive base overlays (switchable in the control panel): +- **Accessibility** — default grid heatmap coloured by composite score +- **Land value** — BORIS NI zones coloured by €/m² (Lower Saxony cities only) +- **Hidden gems** — grid points coloured by hidden gem score (Lower Saxony cities only) diff --git a/apps/web/app/api/admin/jobs/[id]/route.ts b/apps/web/app/api/admin/jobs/[id]/route.ts index 3d94993..21b33d8 100644 --- a/apps/web/app/api/admin/jobs/[id]/route.ts +++ b/apps/web/app/api/admin/jobs/[id]/route.ts @@ -19,7 +19,7 @@ export async function GET( return NextResponse.json({ id: job.id, type: job.data.type, - citySlug: job.data.citySlug, + citySlug: "citySlug" in job.data ? job.data.citySlug : null, state, progress: job.progress ?? null, failedReason: job.failedReason ?? null, diff --git a/apps/web/app/api/admin/jobs/[id]/stream/route.ts b/apps/web/app/api/admin/jobs/[id]/stream/route.ts index 380a1f5..ec87146 100644 --- a/apps/web/app/api/admin/jobs/[id]/stream/route.ts +++ b/apps/web/app/api/admin/jobs/[id]/stream/route.ts @@ -1,7 +1,7 @@ import { NextRequest } from "next/server"; import { Job } from "bullmq"; import { getPipelineQueue, getValhallaQueue } from "@/lib/queue"; -import type { PipelineJobData, JobProgress, ComputeScoresJobData } from "@/lib/queue"; +import type { PipelineJobData, JobProgress, ComputeScoresJobData, RefreshCityJobData } from "@/lib/queue"; import type { SSEEvent } from "@transportationer/shared"; import { CATEGORY_IDS } from "@transportationer/shared"; @@ -20,10 +20,17 @@ export async function GET( const encoder = new TextEncoder(); let timer: ReturnType | null = null; - // Resolve citySlug from the refresh-city job that was returned to the UI. + // Resolve citySlug and creation timestamp from the refresh-city job. // We track progress by citySlug across all pipeline stages because // refresh-city itself completes almost immediately after enqueueing children. + // jobCreatedAt gates failed lookups so we never match results from a + // previous ingest of the same city. + // computeScoresJobId is captured after flow.add() by the worker; once + // available it allows exact-ID matching for the completion check, + // eliminating false positives from previous runs. let citySlug: string; + let jobCreatedAt: number; + let computeScoresJobId: string | undefined; try { const job = await Job.fromId(queue, id); if (!job) { @@ -31,7 +38,9 @@ export async function GET( headers: { "Content-Type": "text/event-stream" }, }); } - citySlug = job.data.citySlug ?? ""; + citySlug = "citySlug" in job.data ? (job.data.citySlug ?? "") : ""; + jobCreatedAt = job.timestamp; + computeScoresJobId = (job.data as RefreshCityJobData).computeScoresJobId; } catch { return new Response(fmt({ type: "failed", jobId: id, error: "Queue unavailable" }), { headers: { "Content-Type": "text/event-stream" }, @@ -57,6 +66,15 @@ export async function GET( const poll = async () => { try { + // If computeScoresJobId wasn't set when the stream opened (race with + // the worker updating job data), re-read the job once to pick it up. + if (!computeScoresJobId) { + const refreshJob = await Job.fromId(queue, id); + computeScoresJobId = refreshJob + ? (refreshJob.data as RefreshCityJobData).computeScoresJobId + : undefined; + } + // 1. Fetch active jobs and waiting-children jobs in parallel. const [pipelineActive, valhallaActive, waitingChildren] = await Promise.all([ queue.getActive(0, 100), @@ -66,32 +84,60 @@ export async function GET( // 1a. Parallel routing phase: compute-scores is waiting for its routing // children to finish. Report aggregate progress instead of one job's pct. + // Only enter this branch when routingDispatched=true (Phase 1 has run). + // Before that, compute-scores is in waiting-children while generate-grid + // is running — fall through to the sequential active-job check instead. + // Match by job ID (exact) when available; fall back to citySlug for the + // brief window before computeScoresJobId is written to the job record. const csWaiting = waitingChildren.find( - (j) => j.data.citySlug === citySlug && j.data.type === "compute-scores", + (j) => + j.data.type === "compute-scores" && + (j.data as ComputeScoresJobData).routingDispatched === true && + (computeScoresJobId ? j.id === computeScoresJobId : j.data.citySlug === citySlug), ); if (csWaiting) { const csData = csWaiting.data as ComputeScoresJobData; - const totalRoutingJobs = csData.modes.length * CATEGORY_IDS.length; + // Transit uses a single compute-transit child, not per-category routing jobs. + const routingModes = csData.modes.filter((m) => m !== "transit"); + const totalRoutingJobs = routingModes.length * CATEGORY_IDS.length; + const hasTransit = csData.modes.includes("transit"); // Count jobs that haven't finished yet (active or still waiting in queue) const pipelineWaiting = await queue.getWaiting(0, 200); - const stillActive = pipelineActive.filter( + const stillRoutingActive = pipelineActive.filter( (j) => j.data.citySlug === citySlug && j.data.type === "compute-routing", ).length; - const stillWaiting = pipelineWaiting.filter( + const stillRoutingWaiting = pipelineWaiting.filter( (j) => j.data.citySlug === citySlug && j.data.type === "compute-routing", ).length; - const completedCount = Math.max(0, totalRoutingJobs - stillActive - stillWaiting); - const pct = totalRoutingJobs > 0 - ? Math.round((completedCount / totalRoutingJobs) * 100) - : 0; + const completedRouting = Math.max(0, totalRoutingJobs - stillRoutingActive - stillRoutingWaiting); - enqueue({ - type: "progress", - stage: "Computing scores", - pct, - message: `${completedCount} / ${totalRoutingJobs} routing jobs`, - }); + // Check if compute-transit is still running + const transitRunning = + hasTransit && + (pipelineActive.some((j) => j.data.citySlug === citySlug && j.data.type === "compute-transit") || + pipelineWaiting.some((j) => j.data.citySlug === citySlug && j.data.type === "compute-transit")); + + // compute-transit job also shows its own progress when active — prefer that + const transitActiveJob = pipelineActive.find( + (j) => j.data.citySlug === citySlug && j.data.type === "compute-transit", + ); + if (transitActiveJob) { + const p = transitActiveJob.progress as JobProgress | undefined; + if (p?.stage) { + enqueue({ type: "progress", stage: p.stage, pct: p.pct, message: p.message }); + return; + } + } + + const pct = totalRoutingJobs > 0 + ? Math.round((completedRouting / totalRoutingJobs) * 100) + : transitRunning ? 99 : 100; + const message = transitRunning && completedRouting >= totalRoutingJobs + ? "Routing done — computing transit isochrones…" + : `${completedRouting} / ${totalRoutingJobs} routing jobs`; + + enqueue({ type: "progress", stage: "Computing scores", pct, message }); return; } @@ -117,7 +163,7 @@ export async function GET( return; } - // 2. No active stage — check for a recent failure in either queue. + // 2. No active stage — check for a failure that occurred after this refresh started. const [pipelineFailed, valhallaFailed] = await Promise.all([ queue.getFailed(0, 50), valhallaQueue.getFailed(0, 50), @@ -126,7 +172,7 @@ export async function GET( (j) => j.data.citySlug === citySlug && j.data.type !== "refresh-city" && - Date.now() - (j.finishedOn ?? 0) < 600_000, + (j.finishedOn ?? 0) > jobCreatedAt, ); if (recentFail) { enqueue({ @@ -138,13 +184,16 @@ export async function GET( return; } - // 3. Check if compute-scores completed recently → full pipeline done. + // 3. Check if the specific compute-scores job completed → pipeline done. + // Use exact job ID match (computeScoresJobId) to avoid false positives + // from a previous run's completed record still in BullMQ's retention window. const completed = await queue.getCompleted(0, 100); - const finalDone = completed.find( - (j) => - j.data.citySlug === citySlug && - j.data.type === "compute-scores" && - Date.now() - (j.finishedOn ?? 0) < 3_600_000, + const finalDone = completed.find((j) => + computeScoresJobId + ? j.id === computeScoresJobId + : j.data.citySlug === citySlug && + j.data.type === "compute-scores" && + (j.finishedOn ?? 0) > jobCreatedAt, ); if (finalDone) { enqueue({ type: "completed", jobId: finalDone.id ?? "" }); diff --git a/apps/web/app/api/location-score/route.ts b/apps/web/app/api/location-score/route.ts index 62c77af..0bb02f3 100644 --- a/apps/web/app/api/location-score/route.ts +++ b/apps/web/app/api/location-score/route.ts @@ -8,7 +8,7 @@ import { export const runtime = "nodejs"; -const VALID_MODES = ["walking", "cycling", "driving"]; +const VALID_MODES = ["walking", "cycling", "driving", "fifteen"]; const VALID_THRESHOLDS = [5, 8, 10, 12, 15, 20, 25, 30]; export async function GET(req: NextRequest) { diff --git a/apps/web/app/api/tiles/grid/[...tile]/route.ts b/apps/web/app/api/tiles/grid/[...tile]/route.ts index f834e11..010d966 100644 --- a/apps/web/app/api/tiles/grid/[...tile]/route.ts +++ b/apps/web/app/api/tiles/grid/[...tile]/route.ts @@ -4,7 +4,7 @@ import { PROFILE_IDS } from "@transportationer/shared"; export const runtime = "nodejs"; -const VALID_MODES = ["walking", "cycling", "driving"]; +const VALID_MODES = ["walking", "cycling", "driving", "transit", "fifteen"]; const VALID_THRESHOLDS = [5, 8, 10, 12, 15, 20, 25, 30]; export async function GET( diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index 9de55d9..936ba8f 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -39,7 +39,7 @@ export default function HomePage() { const [cities, setCities] = useState([]); const [selectedCity, setSelectedCity] = useState(null); const [profile, setProfile] = useState("universal"); - const [mode, setMode] = useState("walking"); + const [mode, setMode] = useState("fifteen"); const [threshold, setThreshold] = useState(15); const [weights, setWeights] = useState({ ...PROFILES["universal"].categoryWeights }); const [activeCategory, setActiveCategory] = useState("composite"); @@ -204,7 +204,9 @@ export default function HomePage() { body: JSON.stringify({ lng: pinLocation.lng, lat: pinLocation.lat, - travelMode: mode, + // "fifteen" and "transit" have no direct Valhalla isochrone costing — + // use walking as the representative display mode for both. + travelMode: (mode === "fifteen" || mode === "transit") ? "walking" : mode, contourMinutes: isochroneContours(threshold), }), }) diff --git a/apps/web/components/control-panel.tsx b/apps/web/components/control-panel.tsx index b90adaf..786a110 100644 --- a/apps/web/components/control-panel.tsx +++ b/apps/web/components/control-panel.tsx @@ -5,8 +5,10 @@ import type { CategoryId, TravelMode, ProfileId } from "@transportationer/shared const TRAVEL_MODES: Array<{ value: TravelMode; label: string; icon: string }> = [ + { value: "fifteen", label: "Best mode", icon: "🏆" }, { value: "walking", label: "Walking", icon: "🚶" }, { value: "cycling", label: "Cycling", icon: "🚲" }, + { value: "transit", label: "Transit", icon: "🚌" }, { value: "driving", label: "Driving", icon: "🚗" }, ]; @@ -90,12 +92,12 @@ export function ControlPanel({

Travel Mode

-
+
{TRAVEL_MODES.map((m) => (