refactor: fix issues with parallelization and scoring

This commit is contained in:
Jan-Henrik 2026-03-07 18:56:59 +01:00
parent d0b29278bd
commit ebe5fec68f
25 changed files with 1225 additions and 1205 deletions

View file

@ -1,33 +1,30 @@
# ─── Build base (Alpine — small, used for npm install + tsc) ────────────────── # ─── Web app Dockerfile ───────────────────────────────────────────────────────
# Alpine — small final image; no native addons needed for web.
FROM node:22-alpine AS base FROM node:22-alpine AS base
RUN apk add --no-cache libc6-compat RUN apk add --no-cache libc6-compat
WORKDIR /app WORKDIR /app
# ─── Dependencies (include devDeps — needed for tsc, next build, etc.) ──────── # ─── Dependencies ─────────────────────────────────────────────────────────────
FROM base AS deps FROM base AS deps
COPY package.json package-lock.json* tsconfig.base.json ./ COPY package.json package-lock.json* tsconfig.base.json ./
COPY apps/web/package.json ./apps/web/ COPY apps/web/package.json ./apps/web/
COPY worker/package.json ./worker/ COPY worker/package.json ./worker/
COPY shared/package.json ./shared/ COPY shared/package.json ./shared/
# NODE_ENV must NOT be production here — devDependencies (tsc, tsx, etc.) are required to build # NODE_ENV must NOT be production here — devDependencies (tsc, next, etc.) are required
RUN npm install --workspace=apps/web --workspace=worker --workspace=shared RUN npm install --workspace=apps/web --workspace=worker --workspace=shared
# ─── Shared build ──────────────────────────────────────────────────────────── # ─── Shared build ────────────────────────────────────────────────────────────
FROM deps AS shared-build FROM deps AS shared-build
COPY shared/ ./shared/ COPY shared/ ./shared/
RUN npm run build --workspace=shared RUN npm run build --workspace=shared
# ─── Next.js build ────────────────────────────────────────────────────────── # ─── Next.js build ────────────────────────────────────────────────────────────
FROM shared-build AS web-build FROM shared-build AS web-build
COPY apps/web/ ./apps/web/ COPY apps/web/ ./apps/web/
RUN npm run build --workspace=apps/web RUN npm run build --workspace=apps/web
# ─── Worker build ────────────────────────────────────────────────────────── # ─── Web runtime ──────────────────────────────────────────────────────────────
FROM shared-build AS worker-build
COPY worker/ ./worker/
RUN npm run build --workspace=worker
# ─── Web runtime (Alpine) ─────────────────────────────────────────────────────
FROM node:22-alpine AS web FROM node:22-alpine AS web
RUN apk add --no-cache libc6-compat RUN apk add --no-cache libc6-compat
RUN addgroup --system --gid 1001 nodejs && adduser --system --uid 1001 nextjs RUN addgroup --system --gid 1001 nodejs && adduser --system --uid 1001 nextjs
@ -42,59 +39,4 @@ COPY apps/web/package.json ./apps/web/
USER nextjs USER nextjs
WORKDIR /app/apps/web WORKDIR /app/apps/web
EXPOSE 3000 EXPOSE 3000
# Use absolute path — WORKDIR is /app/apps/web but node_modules are at /app/node_modules
CMD ["/app/node_modules/.bin/next", "start"] CMD ["/app/node_modules/.bin/next", "start"]
# ─── Valhalla worker (gis-ops Valhalla image + Node.js 22) ───────────────────
# This container runs both a BullMQ worker (build-valhalla jobs) AND the
# valhalla_service HTTP server. It has valhalla_build_tiles and friends
# pre-installed from the base image. Node.js is added for the BullMQ consumer.
FROM ghcr.io/gis-ops/docker-valhalla/valhalla:latest AS valhalla-worker
USER root
RUN apt-get update \
&& apt-get install -y --no-install-recommends ca-certificates curl gnupg osmium-tool \
&& mkdir -p /etc/apt/keyrings \
&& curl -fsSL https://deb.nodesource.com/gpgkey/nodesource-repo.gpg.key \
| gpg --dearmor -o /etc/apt/keyrings/nodesource.gpg \
&& echo "deb [signed-by=/etc/apt/keyrings/nodesource.gpg] https://deb.nodesource.com/node_22.x nodistro main" \
> /etc/apt/sources.list.d/nodesource.list \
&& apt-get update \
&& apt-get install -y --no-install-recommends nodejs \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
ENV NODE_ENV=production
# BullMQ and postgres are pure JS — no native add-ons — so Alpine-built
# node_modules from the deps stage work on this Debian/glibc base.
COPY --from=deps /app/node_modules ./node_modules
COPY --from=worker-build /app/worker/dist ./worker/dist
COPY --from=shared-build /app/shared/dist ./shared/dist
COPY shared/package.json ./shared/
COPY worker/package.json ./worker/
# /data/osm is shared with the pipeline worker (osm_data volume).
# Make it world-writable so the worker (UID 1001) can write PBF files here
# regardless of which container initialises the Docker volume first.
# valhalla mounts this volume :ro so it can never write here accidentally.
RUN mkdir -p /data/osm /data/valhalla && chmod 1777 /data/osm
ENTRYPOINT ["/bin/node"]
CMD ["worker/dist/valhalla-main.js"]
# ─── Worker runtime (Debian slim — osmium-tool + osm2pgsql are in apt) ────────
FROM node:22-slim AS worker
RUN apt-get update && apt-get install -y --no-install-recommends \
osmium-tool \
osm2pgsql \
&& rm -rf /var/lib/apt/lists/*
RUN groupadd --system --gid 1001 nodejs && useradd --system --uid 1001 --gid nodejs workeruser
WORKDIR /app
ENV NODE_ENV=production
COPY --from=deps /app/node_modules ./node_modules
COPY --from=worker-build /app/worker/dist ./worker/dist
COPY --from=shared-build /app/shared/dist ./shared/dist
COPY shared/package.json ./shared/
COPY infra/ ./infra/
COPY worker/package.json ./worker/
# Create data directories owned by workeruser so Docker named volumes
# are initialized with the correct permissions on first run.
RUN mkdir -p /data/osm /data/valhalla && chown -R workeruser:nodejs /data
USER workeruser
CMD ["node", "worker/dist/index.js"]

View file

@ -0,0 +1,47 @@
# ─── Valhalla tile-builder Dockerfile ─────────────────────────────────────────
# Builds per-city Valhalla road/transit tile directories.
# The gis-ops Valhalla image provides valhalla_build_tiles and friends.
# Node.js is added for the BullMQ job consumer.
# @valhallajs/valhallajs is NOT used here — tile building uses the CLI tools.
FROM node:22-slim AS build
RUN apt-get update && apt-get install -y --no-install-recommends python3 make g++ \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY package.json package-lock.json* tsconfig.base.json ./
COPY apps/web/package.json ./apps/web/
COPY worker/package.json ./worker/
COPY shared/package.json ./shared/
# NODE_ENV must NOT be production here — devDependencies needed to build
RUN npm install --workspace=apps/web --workspace=worker --workspace=shared
COPY shared/ ./shared/
RUN npm run build --workspace=shared
COPY worker/ ./worker/
RUN npm run build --workspace=worker
# ─── Runtime (Valhalla + Node.js) ─────────────────────────────────────────────
FROM ghcr.io/gis-ops/docker-valhalla/valhalla:latest AS valhalla-worker
USER root
RUN apt-get update \
&& apt-get install -y --no-install-recommends ca-certificates curl gnupg osmium-tool \
&& mkdir -p /etc/apt/keyrings \
&& curl -fsSL https://deb.nodesource.com/gpgkey/nodesource-repo.gpg.key \
| gpg --dearmor -o /etc/apt/keyrings/nodesource.gpg \
&& echo "deb [signed-by=/etc/apt/keyrings/nodesource.gpg] https://deb.nodesource.com/node_22.x nodistro main" \
> /etc/apt/sources.list.d/nodesource.list \
&& apt-get update \
&& apt-get install -y --no-install-recommends nodejs \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
ENV NODE_ENV=production
# node_modules built on Debian glibc (same ABI as this container)
COPY --from=build /app/node_modules ./node_modules
COPY --from=build /app/worker/dist ./worker/dist
COPY --from=build /app/shared/dist ./shared/dist
COPY shared/package.json ./shared/
COPY worker/package.json ./worker/
RUN mkdir -p /data/osm /data/valhalla && chmod 1777 /data/osm
ENTRYPOINT ["/bin/node"]
CMD ["worker/dist/valhalla-main.js"]

45
Dockerfile.worker Normal file
View file

@ -0,0 +1,45 @@
# ─── Pipeline worker Dockerfile ───────────────────────────────────────────────
# Debian slim — needed for osmium-tool and osm2pgsql (not in Alpine).
# Also serves as the routing queue consumer using @valhallajs/valhallajs
# (glibc prebuilt binary; no separate valhalla HTTP server needed).
FROM node:22-slim AS build
RUN apt-get update && apt-get install -y --no-install-recommends python3 make g++ \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# ─── Dependencies (glibc — required for @valhallajs/valhallajs native addon) ──
COPY package.json package-lock.json* tsconfig.base.json ./
COPY apps/web/package.json ./apps/web/
COPY worker/package.json ./worker/
COPY shared/package.json ./shared/
# NODE_ENV must NOT be production here — devDependencies (tsc, tsx, etc.) needed
RUN npm install --workspace=apps/web --workspace=worker --workspace=shared
# ─── Shared + worker build ────────────────────────────────────────────────────
COPY shared/ ./shared/
RUN npm run build --workspace=shared
COPY worker/ ./worker/
RUN npm run build --workspace=worker
# ─── Runtime ──────────────────────────────────────────────────────────────────
FROM node:22-slim AS worker
RUN apt-get update && apt-get install -y --no-install-recommends \
osmium-tool \
osm2pgsql \
&& rm -rf /var/lib/apt/lists/*
RUN groupadd --system --gid 1001 nodejs && useradd --system --uid 1001 --gid nodejs workeruser
WORKDIR /app
ENV NODE_ENV=production
COPY --from=build /app/node_modules ./node_modules
COPY --from=build /app/worker/dist ./worker/dist
COPY --from=build /app/shared/dist ./shared/dist
COPY shared/package.json ./shared/
COPY infra/ ./infra/
COPY worker/package.json ./worker/
# Create data directories owned by workeruser so Docker named volumes
# are initialized with the correct permissions on first run.
RUN mkdir -p /data/osm /data/valhalla /data/valhalla_road /data/valhalla_transit \
&& chown -R workeruser:nodejs /data
USER workeruser
CMD ["node", "worker/dist/index.js"]

View file

@ -16,39 +16,41 @@ Next.js App Server
└── Valkey (API response cache, BullMQ queues) └── Valkey (API response cache, BullMQ queues)
BullMQ Worker (download queue, concurrency 1) BullMQ Worker (download queue, concurrency 1)
└── download-pbf → streams OSM PBF from Geofabrik (serialised to avoid └── download-pbf → streams OSM PBF from Geofabrik; cached by URL basename
redundant parallel downloads; idempotent if file exists) (multiple cities sharing the same regional PBF download it once),
24 h TTL, serialised to avoid redundant parallel downloads
BullMQ Worker (pipeline queue, concurrency 8) BullMQ Worker (pipeline queue, concurrency 8)
├── refresh-city → orchestrates full ingest via FlowProducer ├── refresh-city → orchestrates full ingest via FlowProducer
├── extract-pois → osmium filter + osm2pgsql flex → raw_pois ├── extract-pois → osmium filter + osm2pgsql flex → raw_pois
├── generate-grid → PostGIS 200 m rectangular grid → grid_points ├── generate-grid → PostGIS 200 m rectangular grid → grid_points
├── compute-scores → two-phase orchestrator (see Scoring below) ├── compute-scores → two-phase orchestrator (see Scoring below)
├── compute-routing → Valhalla matrix → grid_poi_details ├── compute-routing → dispatches matrix job to 'routing' queue → grid_poi_details
│ (15 parallel jobs: 3 modes × 5 categories) │ (15 parallel jobs: 3 modes × 5 categories)
└── compute-transit → Valhalla isochrones → grid_poi_details (travel_mode='transit') └── compute-transit → dispatches isochrone jobs to 'routing-transit' queue →
grid_poi_details (travel_mode='transit')
(1 job per city, covers all categories via PostGIS spatial join) (1 job per city, covers all categories via PostGIS spatial join)
BullMQ Worker (valhalla queue, concurrency 1) — road-only instance BullMQ Worker (valhalla queue, concurrency 1) — road-only tile builder + router
└── build-valhalla → osmium clip + valhalla_build_tiles (road graph only, no transit ├── build-valhalla → osmium clip + valhalla_build_tiles (road graph only, no transit
connections) → manages valhalla_service on :8002 │ connections per city tile dir VALHALLA_TILES_BASE/{citySlug}/)
Clean tiles ensure cycling/walking/driving routing is never │ Clean tiles ensure cycling/walking/driving routing is never
affected by ghost edges from failed transit connections. │ affected by ghost edges from failed transit connections.
└── routing queue → @valhallajs/valhallajs Actor pool (per-city, lazy-loaded)
├── matrix jobs (walking/cycling/driving) ← compute-routing
└── isochrone jobs (non-transit) ← /api/isochrones
BullMQ Worker (valhalla-transit queue, concurrency 1) — transit instance BullMQ Worker (valhalla-transit queue, concurrency 1) — transit tile builder + router
├── download-gtfs-de → downloads & filters GTFS feed for German ÖPNV (bbox-clipped to ├── download-gtfs-de → raw feed cached globally in gtfs/raw/ (keyed by source URL);
│ known cities, single-stop trips removed) │ per-city filtered feed in gtfs/{citySlug}/feed/ (bbox-clipped,
└── build-valhalla → osmium clip + valhalla_ingest_transit + valhalla_convert_transit │ trips with <2 bbox stops removed); 24 h source TTL
+ valhalla_build_tiles (road graph with transit connections) ├── build-valhalla → osmium clip (expanded to transit stops bbox) +
→ manages valhalla_service on :8002 (separate container/port) │ valhalla_ingest_transit + valhalla_convert_transit +
│ valhalla_build_tiles (road graph with transit connections)
Valhalla road instance (child process of valhalla worker, port 8002) │ per city tile dir VALHALLA_TILES_BASE/{citySlug}/
├── sources_to_targets matrix → compute-routing jobs (walking/cycling/driving) └── routing-transit queue → @valhallajs/valhallajs Actor pool (per-city, lazy-loaded)
└── isochrone endpoint → user click → /api/isochrones (non-transit modes) ├── isochrone jobs (multimodal) ← compute-transit
└── isochrone jobs (transit) ← /api/isochrones
Valhalla transit instance (child process of valhalla-transit worker, port 8002)
├── isochrone (multimodal) → compute-transit jobs
└── isochrone endpoint → user click → /api/isochrones (transit mode)
Protomaps → self-hosted map tiles (PMTiles) Protomaps → self-hosted map tiles (PMTiles)
``` ```
@ -127,7 +129,7 @@ Each city is covered by a regular rectangular grid at 200 m spacing, generated i
Travel times are obtained from [Valhalla](https://github.com/valhalla/valhalla), a real-network routing engine built on OSM data: Travel times are obtained from [Valhalla](https://github.com/valhalla/valhalla), a real-network routing engine built on OSM data:
- **Walking, cycling, driving** — Valhalla's `sources_to_targets` matrix endpoint. For each grid point the 6 spatially nearest POIs in the category are sent as targets; the resulting travel-time matrix gives the exact routed time to each. The nearest POI *per subcategory* is retained. - **Walking, cycling, driving** — Valhalla's `sources_to_targets` matrix endpoint. For each grid point the 6 spatially nearest POIs (by KNN) in the category are sent as targets; the resulting travel-time matrix gives the exact routed time to each. The fastest-routed POI *per subcategory* is retained.
- **Transit** — Valhalla's matrix endpoint does not support transit. Instead, a multimodal isochrone is computed per grid point at contour intervals of 5, 10, 15, 20, and 30 minutes (fixed departure: next Tuesday 08:00 for reproducible GTFS results). PostGIS `ST_Within` then classifies every POI in the city into the smallest contour it falls within, giving estimated times of 300 / 600 / 900 / 1200 / 1800 seconds. Grid points outside the transit network are silently skipped — they receive no transit score. - **Transit** — Valhalla's matrix endpoint does not support transit. Instead, a multimodal isochrone is computed per grid point at contour intervals of 5, 10, 15, 20, and 30 minutes (fixed departure: next Tuesday 08:00 for reproducible GTFS results). PostGIS `ST_Within` then classifies every POI in the city into the smallest contour it falls within, giving estimated times of 300 / 600 / 900 / 1200 / 1800 seconds. Grid points outside the transit network are silently skipped — they receive no transit score.
- **Cyclist (`cyclist`)** — synthetic persona: `MIN(walking, cycling, transit)` per POI. Represents someone who cycles and also uses transit when faster. No extra routing calls needed. - **Cyclist (`cyclist`)** — synthetic persona: `MIN(walking, cycling, transit)` per POI. Represents someone who cycles and also uses transit when faster. No extra routing calls needed.
- **Cyclist, no transit (`cycling_walk`)** — synthetic persona: `MIN(walking, cycling)`. Represents someone who cycles but avoids public transit. No extra routing calls needed. - **Cyclist, no transit (`cycling_walk`)** — synthetic persona: `MIN(walking, cycling)`. Represents someone who cycles but avoids public transit. No extra routing calls needed.
@ -141,10 +143,10 @@ All scores are precomputed at ingest time for every combination of threshold (5
Each subcategory *i* contributes a proximity score based on travel time `t` and threshold `T` (both in seconds) using exponential decay: Each subcategory *i* contributes a proximity score based on travel time `t` and threshold `T` (both in seconds) using exponential decay:
``` ```
score(t, T) = exp(3 × t / T) score(t, T) = exp(t / T)
``` ```
At t = 0 the score is 1.0. At the threshold it is exp(3) ≈ 0.05 — a POI reachable in exactly the threshold time barely contributes. Close proximity dominates: a third of the threshold away scores ~0.37, halfway scores ~0.22. This ensures that genuinely nearby POIs are rated much more highly than merely reachable ones. At t = 0 the score is 1.0. At the threshold it is exp(1) ≈ 0.37 — a POI reachable in exactly the threshold time still contributes meaningfully. A third of the threshold away scores ~0.72, halfway scores ~0.61. The curve rewards proximity without harshly penalising destinations that are merely a bit further than ideal.
The category score aggregates across subcategories **and** across multiple nearby POIs of the same subcategory via a **complement product** weighted by profile-specific importance weights `w_i ∈ [0, 1]`: The category score aggregates across subcategories **and** across multiple nearby POIs of the same subcategory via a **complement product** weighted by profile-specific importance weights `w_i ∈ [0, 1]`:
@ -249,8 +251,8 @@ For each city the worker pipeline runs in two phases:
**Phase 1 — Routing** (parallel child jobs, dispatched by `compute-scores`) **Phase 1 — Routing** (parallel child jobs, dispatched by `compute-scores`)
- *Walking, cycling, driving* — 15 parallel jobs (3 modes × 5 categories). A PostGIS KNN lateral join finds the 6 spatially nearest POIs per grid point in the category; those coordinates are sent to Valhalla's `sources_to_targets` matrix API in batches. The nearest POI per subcategory is persisted to `grid_poi_details`. - *Walking, cycling, driving* — 15 parallel jobs (3 modes × 5 categories). A PostGIS KNN lateral join finds the 6 spatially nearest POIs per grid point in the category; those coordinates are dispatched as a matrix job to the `routing` BullMQ queue, where the valhalla worker's Actor pool runs `sources_to_targets`. The nearest POI per subcategory is persisted to `grid_poi_details`.
- *Transit* — 1 job per city (`compute-transit`). Concurrent isochrone calls (8 at a time) to the dedicated transit Valhalla instance; PostGIS `ST_Within` classifies POIs into contour bands. Runs first so it overlaps with the routing jobs. - *Transit* — 1 job per city (`compute-transit`). Concurrent isochrone jobs (8 at a time) dispatched to the `routing-transit` queue; the transit valhalla worker's Actor pool runs multimodal isochrones and PostGIS `ST_Within` classifies POIs into contour bands. Runs first so it overlaps with the routing jobs.
**Phase 2 — Score aggregation** **Phase 2 — Score aggregation**
@ -274,8 +276,8 @@ When a user places a pin on the map:
1. The nearest grid point is found via a PostGIS `<->` KNN query. 1. The nearest grid point is found via a PostGIS `<->` KNN query.
2. Precomputed `grid_scores` rows for that grid point, travel mode, threshold, and profile are returned — one row per category. 2. Precomputed `grid_scores` rows for that grid point, travel mode, threshold, and profile are returned — one row per category.
3. Per-subcategory detail rows from `grid_poi_details` are also fetched, showing the name, straight-line distance, and travel time to the nearest POI in each subcategory for the requested mode. 3. Per-subcategory detail rows from `grid_poi_details` are also fetched, showing the name, straight-line distance, and travel time to the fastest-routed POI in each subcategory for the requested mode. The category headline time shown in the panel is the minimum across all subcategories.
4. An isochrone overlay is fetched live from Valhalla and shown on the map. For `transit` mode the multimodal isochrone comes from the dedicated transit Valhalla instance. For `fifteen` (Best mode), cycling is used as the representative display isochrone since Valhalla's interactive isochrone only supports single-mode costing. 4. An isochrone overlay is dispatched via BullMQ and computed by the valhalla worker's Actor pool. For `transit` mode the job goes to the `routing-transit` queue (multimodal Actor). For `fifteen` (Best mode), cycling is used as the representative display isochrone since Valhalla's isochrone only supports single-mode costing.
The pin panel also shows estate value data (land price in €/m² from the BORIS NI cadastre) for cities in Lower Saxony, including a percentile rank among all zones in the city and a "peer percentile" rank among zones with similar accessibility scores. The pin panel also shows estate value data (land price in €/m² from the BORIS NI cadastre) for cities in Lower Saxony, including a percentile rank among all zones in the city and a "peer percentile" rank among zones with similar accessibility scores.

View file

@ -106,6 +106,18 @@ export default function CityDetailPage() {
<button onClick={handleReIngest} className="btn-primary"> <button onClick={handleReIngest} className="btn-primary">
Re-ingest Data Re-ingest Data
</button> </button>
<button
onClick={async () => {
const res = await fetch(`/api/admin/cities/${slug}/rerun-scores`, { method: "POST" });
if (res.ok) {
const data = await res.json();
setJobId(data.jobId);
}
}}
className="btn-secondary"
>
Rerun Scores
</button>
<button <button
onClick={handleDelete} onClick={handleDelete}
disabled={deleting} disabled={deleting}

View file

@ -218,7 +218,7 @@ function LocationSelector({
const onBoundaryChangeRef = useRef(onBoundaryChange); const onBoundaryChangeRef = useRef(onBoundaryChange);
onBoundaryChangeRef.current = onBoundaryChange; onBoundaryChangeRef.current = onBoundaryChange;
// Notify parent when selection changes // Notify parent when selection changes.
useEffect(() => { useEffect(() => {
onBoundaryChangeRef.current(selected?.geojson ?? null); onBoundaryChangeRef.current(selected?.geojson ?? null);
}, [selected]); }, [selected]);
@ -237,14 +237,11 @@ function LocationSelector({
{ headers: { "User-Agent": "Transportationer/1.0 (15-minute city analyzer)" } }, { headers: { "User-Agent": "Transportationer/1.0 (15-minute city analyzer)" } },
); );
const data: NominatimResult[] = await res.json(); const data: NominatimResult[] = await res.json();
// Keep only results with a real polygon and place_rank >= 13. // Keep polygon results at municipality level or finer (rank >= 10).
// Landkreise/counties are rank 12; cities/towns/municipalities are 13+. setResults(data.filter(
const polygons = data.filter( (r) => r.place_rank >= 10 &&
(r) =>
r.place_rank >= 12 &&
(r.geojson?.type === "Polygon" || r.geojson?.type === "MultiPolygon"), (r.geojson?.type === "Polygon" || r.geojson?.type === "MultiPolygon"),
); ));
setResults(polygons);
setShowDropdown(true); setShowDropdown(true);
} catch { } catch {
setResults([]); setResults([]);
@ -263,12 +260,12 @@ function LocationSelector({
if (!src) return; if (!src) return;
if (geojson) { if (geojson) {
src.setData({ type: "Feature", geometry: geojson, properties: {} }); src.setData({ type: "Feature", geometry: geojson, properties: {} });
// Fit to boundary bbox // Fit to boundary bbox — flatten all rings/polygons to get full extent
try { try {
const coords: number[][] = const coords: number[][] =
geojson.type === "Polygon" geojson.type === "Polygon"
? (geojson.coordinates as number[][][])[0] ? (geojson.coordinates as number[][][]).flat()
: (geojson.coordinates as number[][][][])[0][0]; : (geojson.coordinates as number[][][][]).flat(2);
const lngs = coords.map((c) => c[0]); const lngs = coords.map((c) => c[0]);
const lats = coords.map((c) => c[1]); const lats = coords.map((c) => c[1]);
map.fitBounds( map.fitBounds(
@ -363,8 +360,8 @@ function LocationSelector({
try { try {
const coords: number[][] = const coords: number[][] =
regionGeometry.type === "Polygon" regionGeometry.type === "Polygon"
? regionGeometry.coordinates[0] ? regionGeometry.coordinates.flat()
: regionGeometry.coordinates[0][0]; : regionGeometry.coordinates.flat(2);
const lngs = coords.map((c) => c[0]); const lngs = coords.map((c) => c[0]);
const lats = coords.map((c) => c[1]); const lats = coords.map((c) => c[1]);
map.fitBounds( map.fitBounds(

View file

@ -1,7 +1,6 @@
import { NextRequest, NextResponse } from "next/server"; import { NextRequest, NextResponse } from "next/server";
import { sql } from "@/lib/db"; import { sql } from "@/lib/db";
import { fetchIsochrone } from "@/lib/valhalla"; import { fetchIsochrone } from "@/lib/valhalla";
import { getValhallaQueue, getValhallaTransitQueue } from "@/lib/queue";
import { nextTuesdayDeparture } from "@transportationer/shared"; import { nextTuesdayDeparture } from "@transportationer/shared";
export const runtime = "nodejs"; export const runtime = "nodejs";
@ -19,10 +18,7 @@ export async function POST(req: NextRequest) {
); );
} }
const { lng, lat, travelMode, contourMinutes } = body as Record< const { lng, lat, travelMode, contourMinutes, city } = body as Record<string, unknown>;
string,
unknown
>;
if (typeof lng !== "number" || typeof lat !== "number") { if (typeof lng !== "number" || typeof lat !== "number") {
return NextResponse.json( return NextResponse.json(
@ -30,6 +26,12 @@ export async function POST(req: NextRequest) {
{ status: 400 }, { status: 400 },
); );
} }
if (typeof city !== "string" || !city) {
return NextResponse.json(
{ error: "city is required", code: "MISSING_CITY" },
{ status: 400 },
);
}
const contours: number[] = Array.isArray(contourMinutes) const contours: number[] = Array.isArray(contourMinutes)
? (contourMinutes as number[]) ? (contourMinutes as number[])
@ -60,19 +62,7 @@ export async function POST(req: NextRequest) {
return NextResponse.json({ ...result, cached: true }); return NextResponse.json({ ...result, cached: true });
} }
// Refuse to call valhalla_service while tiles are being rebuilt — // Dispatch to valhalla routing queue via BullMQ.
// the service is stopped during the build and requests would hang or fail.
// Check the queue that owns the requested mode's instance.
const rebuildQueue = mode === "transit" ? getValhallaTransitQueue() : getValhallaQueue();
const activeValhalla = await rebuildQueue.getActiveCount();
if (activeValhalla > 0) {
return NextResponse.json(
{ error: "Routing engine is rebuilding, please try again shortly.", code: "VALHALLA_REBUILDING" },
{ status: 503, headers: { "Retry-After": "60" } },
);
}
// Fetch from local Valhalla
let geojson: object; let geojson: object;
try { try {
geojson = await fetchIsochrone({ geojson = await fetchIsochrone({
@ -80,23 +70,33 @@ export async function POST(req: NextRequest) {
lat, lat,
travelMode: mode, travelMode: mode,
contourMinutes: contours, contourMinutes: contours,
polygons: true, citySlug: city,
}); });
} catch (err) { } catch (err) {
const msg = err instanceof Error ? err.message : "unknown";
// Distinguish "tiles not built yet" from other errors
if (msg.includes("not ready") || msg.includes("timed out") || msg.includes("timeout")) {
return NextResponse.json(
{ error: "Routing engine unavailable — tiles may still be building.", code: "VALHALLA_UNAVAILABLE", detail: msg },
{ status: 503 },
);
}
return NextResponse.json( return NextResponse.json(
{ { error: "Routing engine error", code: "VALHALLA_ERROR", detail: msg },
error: "Routing engine unavailable", { status: 503 },
code: "VALHALLA_ERROR", );
detail: err instanceof Error ? err.message : "unknown", }
},
// Validate result before caching (Actor may return an error object)
const fc = geojson as { features?: unknown[]; error?: unknown; error_code?: unknown };
if (fc.error || fc.error_code || !Array.isArray(fc.features)) {
return NextResponse.json(
{ error: "Routing engine returned invalid response", code: "VALHALLA_BAD_RESPONSE" },
{ status: 503 }, { status: 503 },
); );
} }
// Store in PostGIS cache. // Store in PostGIS cache.
// Use an explicit ::jsonb cast so PostgreSQL receives a text parameter and
// parses it as JSON itself. Without the cast, postgres.js infers the JSONB
// column type and re-encodes the string as a JSONB string literal.
await Promise.resolve(sql` await Promise.resolve(sql`
INSERT INTO isochrone_cache (origin_geom, travel_mode, contours_min, departure_date, result) INSERT INTO isochrone_cache (origin_geom, travel_mode, contours_min, departure_date, result)
VALUES ( VALUES (

View file

@ -222,6 +222,7 @@ export default function HomePage() {
body: JSON.stringify({ body: JSON.stringify({
lng: pinLocation.lng, lng: pinLocation.lng,
lat: pinLocation.lat, lat: pinLocation.lat,
city: selectedCity,
// Synthetic modes map to a single Valhalla costing for the isochrone display. // Synthetic modes map to a single Valhalla costing for the isochrone display.
// "cyclist" uses cycling (largest catchment); "transit_walk" uses transit. // "cyclist" uses cycling (largest catchment); "transit_walk" uses transit.
travelMode: travelMode:

View file

@ -231,11 +231,19 @@ export function LocationScorePanel({
<div className="space-y-2.5"> <div className="space-y-2.5">
{CATEGORIES.map((cat) => { {CATEGORIES.map((cat) => {
const score = data.categoryScores[cat.id] ?? 0; const score = data.categoryScores[cat.id] ?? 0;
const dist = data.distancesM[cat.id];
const time = data.travelTimesS[cat.id];
const barColor = const barColor =
score >= 0.65 ? "#22c55e" : score >= 0.4 ? "#eab308" : "#ef4444"; score >= 0.65 ? "#22c55e" : score >= 0.4 ? "#eab308" : "#ef4444";
const subcats = data.subcategoryDetails?.[cat.id]; const subcats = data.subcategoryDetails?.[cat.id];
// Use the fastest subcategory entry as the category headline — this is
// consistent with what the user sees in the expanded view and avoids
// the straight-line-nearest POI having an unexpectedly long routed time.
const bestSubcat = subcats?.reduce<SubcategoryDetail | null>((best, d) => {
if (d.travelTimeS == null) return best;
if (best == null || d.travelTimeS < (best.travelTimeS ?? Infinity)) return d;
return best;
}, null);
const time = bestSubcat?.travelTimeS ?? data.travelTimesS[cat.id];
const dist = bestSubcat?.distanceM ?? data.distancesM[cat.id];
const isExpanded = expandedCategory === cat.id; const isExpanded = expandedCategory === cat.id;
const hasDetails = subcats && subcats.length > 0; const hasDetails = subcats && subcats.length > 0;
return ( return (

View file

@ -32,6 +32,16 @@ declare global {
// eslint-disable-next-line no-var // eslint-disable-next-line no-var
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
var __valhallaTransitQueue: Queue<any> | undefined; var __valhallaTransitQueue: Queue<any> | undefined;
// eslint-disable-next-line no-var
// eslint-disable-next-line @typescript-eslint/no-explicit-any
var __routingQueue: Queue<any> | undefined;
// eslint-disable-next-line no-var
var __routingQueueEvents: QueueEvents | undefined;
// eslint-disable-next-line no-var
// eslint-disable-next-line @typescript-eslint/no-explicit-any
var __routingTransitQueue: Queue<any> | undefined;
// eslint-disable-next-line no-var
var __routingTransitQueueEvents: QueueEvents | undefined;
} }
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
@ -102,3 +112,53 @@ export function createQueueEvents(): QueueEvents {
connection: createBullMQConnection(), connection: createBullMQConnection(),
}); });
} }
/** On-demand isochrone queue for road modes (walking/cycling/driving). */
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function getRoutingQueue(): Queue<any> {
if (!globalThis.__routingQueue) {
globalThis.__routingQueue = new Queue("routing", {
connection: createBullMQConnection(),
defaultJobOptions: {
attempts: 1,
removeOnComplete: { age: 120 },
removeOnFail: { age: 300 },
},
});
}
return globalThis.__routingQueue;
}
export function getRoutingQueueEvents(): QueueEvents {
if (!globalThis.__routingQueueEvents) {
globalThis.__routingQueueEvents = new QueueEvents("routing", {
connection: createBullMQConnection(),
});
}
return globalThis.__routingQueueEvents;
}
/** On-demand isochrone queue for transit mode. */
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function getRoutingTransitQueue(): Queue<any> {
if (!globalThis.__routingTransitQueue) {
globalThis.__routingTransitQueue = new Queue("routing-transit", {
connection: createBullMQConnection(),
defaultJobOptions: {
attempts: 1,
removeOnComplete: { age: 120 },
removeOnFail: { age: 300 },
},
});
}
return globalThis.__routingTransitQueue;
}
export function getRoutingTransitQueueEvents(): QueueEvents {
if (!globalThis.__routingTransitQueueEvents) {
globalThis.__routingTransitQueueEvents = new QueueEvents("routing-transit", {
connection: createBullMQConnection(),
});
}
return globalThis.__routingTransitQueueEvents;
}

View file

@ -1,62 +1,39 @@
import { nextTuesdayDeparture } from "@transportationer/shared"; import { nextTuesdayDeparture } from "@transportationer/shared";
import { getRoutingQueue, getRoutingQueueEvents, getRoutingTransitQueue, getRoutingTransitQueueEvents } from "./queue";
const VALHALLA_BASE = process.env.VALHALLA_URL ?? "http://valhalla:8002";
const VALHALLA_TRANSIT_BASE = process.env.VALHALLA_TRANSIT_URL ?? VALHALLA_BASE;
export type ValhallaCosting = "pedestrian" | "bicycle" | "auto";
const COSTING_MAP: Record<string, ValhallaCosting> = {
walking: "pedestrian",
cycling: "bicycle",
driving: "auto",
};
export interface IsochroneOpts { export interface IsochroneOpts {
lng: number; lng: number;
lat: number; lat: number;
travelMode: string; travelMode: string;
contourMinutes: number[]; contourMinutes: number[];
citySlug: string;
polygons?: boolean; polygons?: boolean;
} }
export async function fetchIsochrone(opts: IsochroneOpts): Promise<object> { const ISOCHRONE_TIMEOUT_MS = 35_000;
const isTransit = opts.travelMode === "transit";
const costing = isTransit ? "multimodal" : (COSTING_MAP[opts.travelMode] ?? "pedestrian");
const body: Record<string, unknown> = {
locations: [{ lon: opts.lng, lat: opts.lat }],
costing,
contours: opts.contourMinutes.map((time) => ({ time })),
polygons: opts.polygons ?? true,
show_locations: false,
};
if (isTransit) {
body.costing_options = { transit: { use_bus: 1.0, use_rail: 1.0, use_transfers: 1.0 } };
body.date_time = { type: 1, value: nextTuesdayDeparture() };
}
const base = isTransit ? VALHALLA_TRANSIT_BASE : VALHALLA_BASE; /**
const res = await fetch(`${base}/isochrone`, { * Dispatch an isochrone job to the appropriate valhalla routing queue and wait.
method: "POST", * Road modes (walking/cycling/driving) routing queue (valhalla container).
headers: { "Content-Type": "application/json" }, * Transit routing-transit queue (valhalla-transit container).
body: JSON.stringify(body), */
signal: AbortSignal.timeout(30_000), export async function fetchIsochrone(opts: IsochroneOpts): Promise<object> {
const { lng, lat, travelMode, contourMinutes, citySlug } = opts;
const isTransit = travelMode === "transit";
const departureDate = isTransit ? nextTuesdayDeparture() : null;
const queue = isTransit ? getRoutingTransitQueue() : getRoutingQueue();
const events = isTransit ? getRoutingTransitQueueEvents() : getRoutingQueueEvents();
const job = await queue.add("isochrone", {
type: "isochrone",
lng,
lat,
travelMode,
contourMinutes,
citySlug,
departureDate,
}); });
if (!res.ok) { return job.waitUntilFinished(events, ISOCHRONE_TIMEOUT_MS) as Promise<object>;
const text = await res.text();
throw new Error(`Valhalla error ${res.status}: ${text}`);
}
return res.json();
}
export async function checkValhalla(): Promise<boolean> {
try {
const res = await fetch(`${VALHALLA_BASE}/status`, {
signal: AbortSignal.timeout(3000),
});
return res.ok;
} catch {
return false;
}
} }

View file

@ -31,55 +31,54 @@ services:
timeout: 5s timeout: 5s
retries: 5 retries: 5
# ─── Valhalla road worker (port 8002) ───────────────────────────────────── # ─── Valhalla road tile-builder + road routing queue ──────────────────────
# Builds road-only tiles (no transit data) → cycling/walking/driving routing. # Builds road-only per-city tile directories (walking/cycling/driving).
# Without GTFS in its volume, valhalla_build_tiles produces clean road tiles # Also serves on-demand routing requests from the web app + pipeline worker
# with no ghost transit edges, so bicycle routing works correctly. # via the BullMQ 'routing' queue using @valhallajs/valhallajs Actor pool.
# No HTTP server — all routing goes through BullMQ.
valhalla: valhalla:
build: build:
context: . context: .
target: valhalla-worker dockerfile: Dockerfile.valhalla-worker
restart: unless-stopped restart: unless-stopped
volumes: volumes:
- osm_data:/data/osm:ro # PBF files downloaded by the main worker - osm_data:/data/osm:ro
- valhalla_tiles:/data/valhalla # Road-only config and tiles - valhalla_tiles:/data/valhalla
environment: environment:
REDIS_HOST: valkey REDIS_HOST: valkey
REDIS_PORT: "6379" REDIS_PORT: "6379"
REDIS_PASSWORD: ${VALKEY_PASSWORD} REDIS_PASSWORD: ${VALKEY_PASSWORD}
VALHALLA_QUEUE_NAME: valhalla VALHALLA_QUEUE_NAME: valhalla
VALHALLA_TILES_BASE: /data/valhalla/tiles
VALHALLA_DATA_DIR: /data/valhalla
OSM_DATA_DIR: /data/osm OSM_DATA_DIR: /data/osm
VALHALLA_CONFIG: /data/valhalla/valhalla.json
VALHALLA_TILES_DIR: /data/valhalla/valhalla_tiles
NODE_ENV: production NODE_ENV: production
ports:
- "127.0.0.1:8002:8002" # Valhalla HTTP API (road)
depends_on: depends_on:
valkey: valkey:
condition: service_healthy condition: service_healthy
# ─── Valhalla transit worker (port 8002 internal) ───────────────────────── # ─── Valhalla transit tile-builder + transit routing queue ────────────────
# Builds tiles with GTFS transit data → multimodal routing. # Builds transit per-city tile directories (GTFS multimodal routing).
# Separate volume from the road worker so transit ghost edges never affect # Also serves transit isochrone requests via the 'routing-transit' queue.
# the road instance. # No HTTP server — all routing goes through BullMQ.
valhalla-transit: valhalla-transit:
build: build:
context: . context: .
target: valhalla-worker dockerfile: Dockerfile.valhalla-worker
restart: unless-stopped restart: unless-stopped
volumes: volumes:
- osm_data:/data/osm:ro # PBF files downloaded by the main worker - osm_data:/data/osm:ro
- valhalla_tiles_transit:/data/valhalla # Transit config, tiles and GTFS data - valhalla_tiles_transit:/data/valhalla
environment: environment:
REDIS_HOST: valkey REDIS_HOST: valkey
REDIS_PORT: "6379" REDIS_PORT: "6379"
REDIS_PASSWORD: ${VALKEY_PASSWORD} REDIS_PASSWORD: ${VALKEY_PASSWORD}
VALHALLA_QUEUE_NAME: valhalla-transit VALHALLA_QUEUE_NAME: valhalla-transit
VALHALLA_TILES_BASE: /data/valhalla/tiles
VALHALLA_DATA_DIR: /data/valhalla
VALHALLA_INCLUDE_TRANSIT: "true"
OSM_DATA_DIR: /data/osm OSM_DATA_DIR: /data/osm
VALHALLA_CONFIG: /data/valhalla/valhalla.json
VALHALLA_TILES_DIR: /data/valhalla/valhalla_tiles
NODE_ENV: production NODE_ENV: production
# Optional: connect-info.net token for NDS-specific GTFS feed
CONNECT_INFO_TOKEN: ${CONNECT_INFO_TOKEN:-} CONNECT_INFO_TOKEN: ${CONNECT_INFO_TOKEN:-}
depends_on: depends_on:
valkey: valkey:
@ -99,7 +98,7 @@ services:
web: web:
build: build:
context: . context: .
target: web dockerfile: Dockerfile
restart: unless-stopped restart: unless-stopped
ports: ports:
- "3000:3000" - "3000:3000"
@ -108,8 +107,6 @@ services:
REDIS_HOST: valkey REDIS_HOST: valkey
REDIS_PORT: "6379" REDIS_PORT: "6379"
REDIS_PASSWORD: ${VALKEY_PASSWORD} REDIS_PASSWORD: ${VALKEY_PASSWORD}
VALHALLA_URL: http://valhalla:8002
VALHALLA_TRANSIT_URL: http://valhalla-transit:8002
ADMIN_PASSWORD_HASH: ${ADMIN_PASSWORD_HASH} ADMIN_PASSWORD_HASH: ${ADMIN_PASSWORD_HASH}
ADMIN_JWT_SECRET: ${ADMIN_JWT_SECRET} ADMIN_JWT_SECRET: ${ADMIN_JWT_SECRET}
NODE_ENV: production NODE_ENV: production
@ -120,10 +117,12 @@ services:
condition: service_healthy condition: service_healthy
# ─── BullMQ pipeline worker ─────────────────────────────────────────────── # ─── BullMQ pipeline worker ───────────────────────────────────────────────
# Pure pipeline: download, extract, grid, scoring — no Valhalla knowledge.
# Routing requests are dispatched via BullMQ to the valhalla containers.
worker: worker:
build: build:
context: . context: .
target: worker dockerfile: Dockerfile.worker
restart: unless-stopped restart: unless-stopped
environment: environment:
DATABASE_URL: postgres://app:${POSTGRES_PASSWORD}@postgres:5432/fifteenmin DATABASE_URL: postgres://app:${POSTGRES_PASSWORD}@postgres:5432/fifteenmin
@ -132,13 +131,10 @@ services:
REDIS_PASSWORD: ${VALKEY_PASSWORD} REDIS_PASSWORD: ${VALKEY_PASSWORD}
OSM_DATA_DIR: /data/osm OSM_DATA_DIR: /data/osm
LUA_SCRIPT: /app/infra/osm2pgsql.lua LUA_SCRIPT: /app/infra/osm2pgsql.lua
VALHALLA_URL: http://valhalla:8002
VALHALLA_TRANSIT_URL: http://valhalla-transit:8002
NODE_ENV: production NODE_ENV: production
# Optional: enables NDS-specific GTFS source for cities in Niedersachsen
CONNECT_INFO_TOKEN: ${CONNECT_INFO_TOKEN:-} CONNECT_INFO_TOKEN: ${CONNECT_INFO_TOKEN:-}
volumes: volumes:
- osm_data:/data/osm # Worker downloads PBF here - osm_data:/data/osm
depends_on: depends_on:
postgres: postgres:
condition: service_healthy condition: service_healthy
@ -148,7 +144,7 @@ services:
volumes: volumes:
postgres_data: postgres_data:
valkey_data: valkey_data:
osm_data: # Shared: worker writes, valhalla containers read osm_data: # Shared: worker writes PBF files, valhalla containers read
valhalla_tiles: # Road-only tiles (no transit) — cycling works correctly here valhalla_tiles: # Road-only per-city tiles + Actor pool data
valhalla_tiles_transit: # Transit tiles (with GTFS) — multimodal routing valhalla_tiles_transit: # Transit per-city tiles + GTFS data + Actor pool data
pmtiles_data: pmtiles_data:

10
package-lock.json generated
View file

@ -1492,6 +1492,15 @@
"@types/node": "*" "@types/node": "*"
} }
}, },
"node_modules/@valhallajs/valhallajs": {
"version": "3.6.1",
"resolved": "https://registry.npmjs.org/@valhallajs/valhallajs/-/valhallajs-3.6.1.tgz",
"integrity": "sha512-od2CkY58XW2Frf2/6KTaqgd567XwGKkwFu4ga4YjYOAdu0u3aJIODEr6zlY6sSKW3qGylW1wwg3joZHidIbaiw==",
"license": "MIT",
"bin": {
"valhalla": "bin/valhalla-cli.js"
}
},
"node_modules/any-promise": { "node_modules/any-promise": {
"version": "1.3.0", "version": "1.3.0",
"resolved": "https://registry.npmjs.org/any-promise/-/any-promise-1.3.0.tgz", "resolved": "https://registry.npmjs.org/any-promise/-/any-promise-1.3.0.tgz",
@ -3579,6 +3588,7 @@
"version": "0.1.0", "version": "0.1.0",
"dependencies": { "dependencies": {
"@transportationer/shared": "*", "@transportationer/shared": "*",
"@valhallajs/valhallajs": "^3.6.1",
"bullmq": "^5.13.0", "bullmq": "^5.13.0",
"postgres": "^3.4.4", "postgres": "^3.4.4",
"unzipper": "^0.12.3" "unzipper": "^0.12.3"

View file

@ -11,6 +11,7 @@
}, },
"dependencies": { "dependencies": {
"@transportationer/shared": "*", "@transportationer/shared": "*",
"@valhallajs/valhallajs": "^3.6.1",
"bullmq": "^5.13.0", "bullmq": "^5.13.0",
"postgres": "^3.4.4", "postgres": "^3.4.4",
"unzipper": "^0.12.3" "unzipper": "^0.12.3"

93
worker/src/actor-pool.ts Normal file
View file

@ -0,0 +1,93 @@
/**
* Per-city Valhalla Actor pool using @valhallajs/valhallajs native bindings.
*
* Each city gets its own Actor instance pointing to its per-city tile directory.
* Actors are lazy-loaded and invalidated when the .ready marker is updated
* (i.e. after a fresh tile build).
*
* Road tiles live under ROAD_TILES_BASE/{citySlug}/
* Transit tiles live under TRANSIT_TILES_BASE/{citySlug}/
*
* Both directories must contain a config.json generated by build-valhalla.ts.
*/
import { existsSync, statSync, readFileSync } from "fs";
import type { Actor } from "@valhallajs/valhallajs";
/** Where road tile directories live (inside the valhalla_tiles volume). */
const ROAD_TILES_BASE = process.env.ROAD_TILES_BASE ?? "/data/valhalla/tiles";
/** Where transit tile directories live. Set same default; overridden by VALHALLA_TILES_BASE env in each container. */
const TRANSIT_TILES_BASE = process.env.TRANSIT_TILES_BASE ?? ROAD_TILES_BASE;
let actorCtor: typeof Actor | null = null;
async function getActorCtor(): Promise<typeof Actor> {
if (!actorCtor) {
const mod = await import("@valhallajs/valhallajs");
actorCtor = mod.Actor;
}
return actorCtor;
}
interface CachedEntry {
actor: Actor;
loadedAt: number;
}
const pool = new Map<string, CachedEntry>();
export type ActorType = "road" | "transit";
function tilesBase(type: ActorType): string {
return type === "road" ? ROAD_TILES_BASE : TRANSIT_TILES_BASE;
}
export function tileDir(type: ActorType, citySlug: string): string {
return `${tilesBase(type)}/${citySlug}`;
}
export function readyMarkerPath(type: ActorType, citySlug: string): string {
return `${tileDir(type, citySlug)}/.ready`;
}
/** Remove a cached Actor entry so the next call to getActor() reloads it. */
export function invalidateActor(type: ActorType, citySlug: string): void {
const key = `${type}:${citySlug}`;
if (pool.delete(key)) {
console.log(`[actor-pool] Invalidated ${key}`);
}
}
/**
* Return (or lazily load) a Valhalla Actor for the given city and type.
* Throws if tiles are not yet built (.ready marker absent) or config is missing.
*/
export async function getActor(type: ActorType, citySlug: string): Promise<Actor> {
const key = `${type}:${citySlug}`;
const marker = readyMarkerPath(type, citySlug);
if (!existsSync(marker)) {
throw new Error(`Valhalla tiles not ready for ${key} — run build-valhalla first (marker: ${marker})`);
}
const markerMtime = statSync(marker).mtimeMs;
const cached = pool.get(key);
if (cached && cached.loadedAt >= markerMtime) {
return cached.actor;
}
const dir = tileDir(type, citySlug);
const configPath = `${dir}/config.json`;
if (!existsSync(configPath)) {
throw new Error(`Valhalla config missing for ${key}: ${configPath}`);
}
console.log(`[actor-pool] Loading Actor for ${key} from ${configPath}`);
const Ctor = await getActorCtor();
// Actor constructor takes the config JSON as a string (not a file path).
const configJson = readFileSync(configPath, "utf8");
const actor = new Ctor(configJson);
pool.set(key, { actor, loadedAt: Date.now() });
console.log(`[actor-pool] Actor for ${key} ready`);
return actor;
}

View file

@ -1,8 +1,8 @@
import type { Job } from "bullmq"; import type { Job } from "bullmq";
import { execSync, spawn } from "child_process"; import { execSync, spawn } from "child_process";
import { existsSync, mkdirSync, readFileSync, readdirSync, rmSync, statSync, unlinkSync, writeFileSync } from "fs"; import { existsSync, mkdirSync, readFileSync, readdirSync, rmSync, statSync, unlinkSync, writeFileSync } from "fs";
import * as path from "path";
import type { JobProgress } from "@transportationer/shared"; import type { JobProgress } from "@transportationer/shared";
import { invalidateActor } from "../actor-pool.js";
export type BuildValhallaData = { export type BuildValhallaData = {
type: "build-valhalla"; type: "build-valhalla";
@ -10,74 +10,48 @@ export type BuildValhallaData = {
citySlug?: string; citySlug?: string;
pbfPath?: string; pbfPath?: string;
bbox?: [number, number, number, number]; bbox?: [number, number, number, number];
/** Slugs to drop from the global routing tile set before rebuilding */ /** Slugs to drop from the routing tile set before rebuilding. */
removeSlugs?: string[]; removeSlugs?: string[];
}; };
const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm"; const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm";
const VALHALLA_CONFIG = process.env.VALHALLA_CONFIG ?? "/data/valhalla/valhalla.json"; const VALHALLA_DATA_DIR = process.env.VALHALLA_DATA_DIR ?? "/data/valhalla";
const VALHALLA_TILES_DIR = process.env.VALHALLA_TILES_DIR ?? "/data/valhalla/valhalla_tiles";
const VALHALLA_DATA_DIR = "/data/valhalla";
const GTFS_DATA_DIR = process.env.GTFS_DATA_DIR ?? "/data/valhalla/gtfs";
const GTFS_FEED_DIR = `${GTFS_DATA_DIR}/feed`;
/** /**
* Auxiliary databases downloaded by valhalla_build_tiles on first run. * Per-city tile directories live here: VALHALLA_TILES_BASE/{citySlug}/
* Stored OUTSIDE VALHALLA_TILES_DIR so they survive crash-recovery tile * Each valhalla container mounts its own Docker volume at /data/valhalla,
* wipes and don't need to be re-downloaded on retries. * so both road and transit containers use the same env var with different data.
*/ */
const VALHALLA_TILES_BASE = process.env.VALHALLA_TILES_BASE ?? `${VALHALLA_DATA_DIR}/tiles`;
const GTFS_DATA_DIR = process.env.GTFS_DATA_DIR ?? `${VALHALLA_DATA_DIR}/gtfs`;
const TIMEZONE_SQLITE = `${VALHALLA_DATA_DIR}/timezone.sqlite`; const TIMEZONE_SQLITE = `${VALHALLA_DATA_DIR}/timezone.sqlite`;
const ADMINS_SQLITE = `${VALHALLA_DATA_DIR}/admins.sqlite`; const ADMINS_SQLITE = `${VALHALLA_DATA_DIR}/admins.sqlite`;
/** /** Per-city GTFS feed dir written by download-gtfs-de. */
* Explicit mjolnir.transit_dir used by all four transit-aware Valhalla function cityGtfsFeedDir(citySlug: string): string {
* operations (ingest, convert, build_tiles, service). Pinned here to avoid return `${GTFS_DATA_DIR}/${citySlug}/feed`;
* the Valhalla default (/data/valhalla/transit) and to persist transit tiles }
* between builds.
*
* IMPORTANT build order:
* 1. valhalla_ingest_transit GTFS transit PBF staging files in transit_dir
* 2. valhalla_convert_transit PBF transit graph tiles (.gph) in transit_dir
* 3. valhalla_build_tiles road graph + reads transit tiles from transit_dir
* creates road tiles WITH road-to-transit edges
* copies transit tiles into tile_dir/3/
*
* valhalla_build_tiles MUST run AFTER valhalla_convert_transit so it can find
* the transit .gph tiles in transit_dir and embed roadtransit connections in
* the road tiles. Running build_tiles before convert results in road tiles with
* no transit connections (transit routing silently falls back to walking).
*
* valhalla_convert_transit does NOT require road tiles it only reads the
* transit PBF staging files and writes transit graph tiles. It can run on an
* empty tile_dir without crashing.
*
* TRANSIT_CACHE_MARKER tracks whether ingest PBFs are current relative to the
* GTFS source. valhalla_convert_transit always runs after ingest (or when the
* ingest cache is fresh) so transit_dir has up-to-date .gph before build_tiles.
*/
const TRANSIT_CACHE_DIR = `${VALHALLA_DATA_DIR}/transit_graph`;
/** Written after a successful valhalla_ingest_transit; compared against GTFS source mtime. */
const TRANSIT_CACHE_MARKER = `${TRANSIT_CACHE_DIR}/.ready`;
/** Written by download-gtfs-de after each successful GTFS extraction. */
const GTFS_SOURCE_MARKER = `${GTFS_FEED_DIR}/.source`;
/** /**
* Buffer added to the city bbox when clipping the road PBF with osmium. * Per-city transit staging directory.
* Transit stops within the city bbox may be in low-road-density areas (parks, * valhalla_ingest_transit writes PBF staging tiles here.
* new developments, demand-responsive zones) where the nearest OSM road is * valhalla_convert_transit converts them to transit graph tiles (.gph) here.
* outside the exact bbox clip. Without coverage, valhalla_build_tiles crashes * valhalla_build_tiles reads from here when INCLUDE_TRANSIT=true.
* with a memory corruption error ("double free" / "bad_array_new_length").
* stop_times is already filtered to bbox-local stops so the buffer only adds
* road coverage it does NOT let Germany-wide transit stops into the graph.
* 0.2° 18 km at 53 °N covers roads for all plausibly in-city stops.
*/ */
function cityTransitCacheDir(citySlug: string): string {
return `${VALHALLA_DATA_DIR}/transit_graph/${citySlug}`;
}
/**
* Set VALHALLA_INCLUDE_TRANSIT=true in the transit container.
* The road container leaves it unset so tiles are clean (no ghost transit edges).
*/
const INCLUDE_TRANSIT = (process.env.VALHALLA_INCLUDE_TRANSIT ?? "").toLowerCase() === "true";
const ROAD_BBOX_BUFFER = 0.2; const ROAD_BBOX_BUFFER = 0.2;
/**
* Manifest file: maps citySlug absolute path of its routing PBF.
* Persists in the valhalla_tiles Docker volume across restarts.
*/
const ROUTING_MANIFEST = `${VALHALLA_DATA_DIR}/routing-sources.json`; const ROUTING_MANIFEST = `${VALHALLA_DATA_DIR}/routing-sources.json`;
function readManifest(): Record<string, string> { function readManifest(): Record<string, string> {
@ -104,35 +78,19 @@ function runProcess(cmd: string, args: string[]): Promise<void> {
}); });
} }
/**
* Build the IANA timezone SQLite database required by valhalla_ingest_transit.
* Without it, ingest does not write the root index tile (0/000/000.pbf) and
* valhalla_convert_transit crashes trying to load it.
*
* valhalla_build_timezones writes the SQLite database to stdout (no args),
* so we capture stdout and write it to TIMEZONE_SQLITE.
*/
function buildTimezoneDb(): Promise<void> { function buildTimezoneDb(): Promise<void> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
console.log("[build-valhalla] Running: valhalla_build_timezones (output → " + TIMEZONE_SQLITE + ")"); console.log("[build-valhalla] Running: valhalla_build_timezones → " + TIMEZONE_SQLITE);
const child = spawn("valhalla_build_timezones", [], { const child = spawn("valhalla_build_timezones", [], { stdio: ["ignore", "pipe", "inherit"] });
stdio: ["ignore", "pipe", "inherit"],
});
const chunks: Buffer[] = []; const chunks: Buffer[] = [];
child.stdout!.on("data", (chunk: Buffer) => chunks.push(chunk)); child.stdout!.on("data", (chunk: Buffer) => chunks.push(chunk));
child.on("error", reject); child.on("error", reject);
child.on("exit", (code) => { child.on("exit", (code) => {
if (code !== 0) { if (code !== 0) { reject(new Error(`valhalla_build_timezones exited ${code}`)); return; }
reject(new Error(`valhalla_build_timezones exited with code ${code}`));
return;
}
const db = Buffer.concat(chunks); const db = Buffer.concat(chunks);
if (db.length < 1024) { if (db.length < 1024) { reject(new Error(`valhalla_build_timezones output too small`)); return; }
reject(new Error(`valhalla_build_timezones output too small (${db.length} B) — likely failed silently`));
return;
}
writeFileSync(TIMEZONE_SQLITE, db); writeFileSync(TIMEZONE_SQLITE, db);
console.log(`[build-valhalla] Timezone database written to ${TIMEZONE_SQLITE} (${(db.length / 1024 / 1024).toFixed(1)} MB)`); console.log(`[build-valhalla] Timezone database written (${(db.length / 1024 / 1024).toFixed(1)} MB)`);
resolve(); resolve();
}); });
}); });
@ -140,16 +98,12 @@ function buildTimezoneDb(): Promise<void> {
type JsonObject = Record<string, unknown>; type JsonObject = Record<string, unknown>;
/** Deep-merge override into base. Objects are merged recursively; arrays and
* scalars in override replace the corresponding base value entirely. */
function deepMerge(base: JsonObject, override: JsonObject): JsonObject { function deepMerge(base: JsonObject, override: JsonObject): JsonObject {
const result: JsonObject = { ...base }; const result: JsonObject = { ...base };
for (const [key, val] of Object.entries(override)) { for (const [key, val] of Object.entries(override)) {
const baseVal = base[key]; const baseVal = base[key];
if ( if (val !== null && typeof val === "object" && !Array.isArray(val) &&
val !== null && typeof val === "object" && !Array.isArray(val) && baseVal !== null && typeof baseVal === "object" && !Array.isArray(baseVal)) {
baseVal !== null && typeof baseVal === "object" && !Array.isArray(baseVal)
) {
result[key] = deepMerge(baseVal as JsonObject, val as JsonObject); result[key] = deepMerge(baseVal as JsonObject, val as JsonObject);
} else { } else {
result[key] = val; result[key] = val;
@ -158,291 +112,232 @@ function deepMerge(base: JsonObject, override: JsonObject): JsonObject {
return result; return result;
} }
/** function buildBase(): JsonObject {
* Generate valhalla.json by starting from the canonical defaults produced by
* valhalla_build_config, then overlaying only the deployment-specific settings.
*/
function generateConfig(): void {
mkdirSync(VALHALLA_TILES_DIR, { recursive: true });
mkdirSync(TRANSIT_CACHE_DIR, { recursive: true });
let base: JsonObject = {};
try { try {
const out = execSync("valhalla_build_config", { const out = execSync("valhalla_build_config", { encoding: "utf8", maxBuffer: 10 * 1024 * 1024 });
encoding: "utf8", return JSON.parse(out) as JsonObject;
maxBuffer: 10 * 1024 * 1024,
});
base = JSON.parse(out) as JsonObject;
console.log("[build-valhalla] Loaded defaults from valhalla_build_config");
} catch (err) { } catch (err) {
console.warn("[build-valhalla] valhalla_build_config failed, using empty base:", err); console.warn("[build-valhalla] valhalla_build_config failed, using empty base:", err);
return {};
} }
const overrides: JsonObject = {
mjolnir: {
tile_dir: VALHALLA_TILES_DIR,
tile_extract: `${VALHALLA_TILES_DIR}.tar`,
// Stored outside tile_dir so they survive crash-recovery wipes.
timezone: TIMEZONE_SQLITE,
admin: ADMINS_SQLITE,
// All transit operations (ingest, convert, service) read/write here.
transit_dir: TRANSIT_CACHE_DIR,
// valhalla_ingest_transit expects a directory whose subdirectories are
// individual GTFS feeds. feed/ (inside GTFS_DATA_DIR) is one such feed.
transit_feeds_dir: GTFS_DATA_DIR,
},
additional_data: {
elevation: "/data/elevation/",
},
httpd: {
service: {
listen: "tcp://*:8002",
timeout_seconds: 26,
},
},
service_limits: {
isochrone: {
// Transit scoring uses 5 contours [5,10,15,20,30]; Valhalla default is 4.
max_contours: 5,
},
},
};
const config = deepMerge(base, overrides);
writeFileSync(VALHALLA_CONFIG, JSON.stringify(config, null, 2));
console.log(`[build-valhalla] Config written to ${VALHALLA_CONFIG}`);
} }
/** /**
* True when valhalla_ingest_transit has been run against the current GTFS data. * Write a per-city config for transit ingest + convert operations.
* Compares the ingest marker mtime against the GTFS source marker mtime written * These tools only read/write from transit_dir/transit_feeds_dir;
* by download-gtfs-de after each successful extraction. * tile_dir is a dummy that won't receive any road tiles.
*/ */
function isTransitIngestFresh(): boolean { function writeCityTransitIngestConfig(citySlug: string): string {
if (!existsSync(TRANSIT_CACHE_MARKER) || !existsSync(GTFS_SOURCE_MARKER)) return false; const configPath = `${VALHALLA_DATA_DIR}/transit-ingest-config-${citySlug}.json`;
// Verify at least one transit PBF tile exists — the marker can survive a const dummyTileDir = `${VALHALLA_DATA_DIR}/tiles/_transit_ingest_tmp_${citySlug}`;
// cache-dir wipe (crash recovery) and we'd otherwise skip ingest with an mkdirSync(dummyTileDir, { recursive: true });
// empty transit dir, causing valhalla_convert_transit to fail silently.
// Valhalla 3.x ingest writes level-3 tiles; check for the directory. const config = deepMerge(buildBase(), {
const level3Dir = `${TRANSIT_CACHE_DIR}/3`; mjolnir: {
if (!existsSync(level3Dir)) return false; tile_dir: dummyTileDir,
return statSync(TRANSIT_CACHE_MARKER).mtimeMs >= statSync(GTFS_SOURCE_MARKER).mtimeMs; timezone: TIMEZONE_SQLITE,
admin: ADMINS_SQLITE,
transit_dir: cityTransitCacheDir(citySlug),
transit_feeds_dir: cityGtfsFeedDir(citySlug),
},
service_limits: { isochrone: { max_contours: 5 } },
} as JsonObject);
writeFileSync(configPath, JSON.stringify(config, null, 2));
return configPath;
} }
export async function handleBuildValhalla( /**
job: Job<BuildValhallaData>, * Write a per-city config for valhalla_build_tiles.
restartService: () => Promise<void>, * Road builds omit transit_dir so no transit edges are embedded.
): Promise<void> { * Transit builds include transit_dir so roadtransit connections are added.
*/
function writeCityConfig(citySlug: string, cityTileDir: string): string {
mkdirSync(cityTileDir, { recursive: true });
const config = deepMerge(buildBase(), {
mjolnir: {
tile_dir: cityTileDir,
tile_extract: `${cityTileDir}.tar`,
timezone: TIMEZONE_SQLITE,
admin: ADMINS_SQLITE,
...(INCLUDE_TRANSIT ? {
transit_dir: cityTransitCacheDir(citySlug),
transit_feeds_dir: cityGtfsFeedDir(citySlug),
} : {}),
},
additional_data: { elevation: "/data/elevation/" },
service_limits: { isochrone: { max_contours: 5 } },
} as JsonObject);
const configPath = `${cityTileDir}/config.json`;
writeFileSync(configPath, JSON.stringify(config, null, 2));
return configPath;
}
function isTransitIngestFresh(citySlug: string): boolean {
const transitCacheDir = cityTransitCacheDir(citySlug);
const transitCacheMarker = `${transitCacheDir}/.ready`;
const gtfsCityMarker = `${cityGtfsFeedDir(citySlug)}/.source`;
if (!existsSync(transitCacheMarker) || !existsSync(gtfsCityMarker)) return false;
const level3Dir = `${transitCacheDir}/3`;
if (!existsSync(level3Dir)) return false;
return statSync(transitCacheMarker).mtimeMs >= statSync(gtfsCityMarker).mtimeMs;
}
/** Build per-city tiles. Invalidates Actor pool entry before and writes .ready after. */
async function buildCityTiles(citySlug: string, pbfPath: string): Promise<void> {
const cityTileDir = `${VALHALLA_TILES_BASE}/${citySlug}`;
const readyMarker = `${cityTileDir}/.ready`;
// Signal Actor pool: tiles are being rebuilt.
if (existsSync(readyMarker)) unlinkSync(readyMarker);
invalidateActor(INCLUDE_TRANSIT ? "transit" : "road", citySlug);
// ── Transit ingest + convert (transit container only) ─────────────────────
if (INCLUDE_TRANSIT) {
const feedDir = cityGtfsFeedDir(citySlug);
const gtfsReady = existsSync(feedDir) && readdirSync(feedDir).some((f) => f.endsWith(".txt"));
if (gtfsReady) {
if (!existsSync(TIMEZONE_SQLITE)) {
console.log("[build-valhalla] Building timezone database…");
try { await buildTimezoneDb(); } catch (err) { console.warn("[build-valhalla] valhalla_build_timezones failed:", err); }
}
const transitCacheDir = cityTransitCacheDir(citySlug);
const transitCacheMarker = `${transitCacheDir}/.ready`;
if (!isTransitIngestFresh(citySlug) && existsSync(TIMEZONE_SQLITE)) {
console.log(`[build-valhalla] Ingesting GTFS transit feeds for ${citySlug}`);
try {
rmSync(transitCacheDir, { recursive: true, force: true });
mkdirSync(transitCacheDir, { recursive: true });
const cfg = writeCityTransitIngestConfig(citySlug);
await runProcess("valhalla_ingest_transit", ["-c", cfg]);
writeFileSync(transitCacheMarker, new Date().toISOString());
console.log(`[build-valhalla] valhalla_ingest_transit completed for ${citySlug}`);
} catch (err) {
console.warn(`[build-valhalla] valhalla_ingest_transit failed for ${citySlug}:`, err);
rmSync(transitCacheDir, { recursive: true, force: true });
mkdirSync(transitCacheDir, { recursive: true });
}
} else if (isTransitIngestFresh(citySlug)) {
console.log(`[build-valhalla] Transit ingest cache fresh for ${citySlug} — skipping re-ingest`);
}
console.log(`[build-valhalla] Converting transit staging tiles for ${citySlug}`);
try {
const cfg = writeCityTransitIngestConfig(citySlug);
await runProcess("valhalla_convert_transit", ["-c", cfg]);
console.log(`[build-valhalla] valhalla_convert_transit completed for ${citySlug}`);
} catch (err) {
console.warn(`[build-valhalla] valhalla_convert_transit failed for ${citySlug}:`, err);
}
} else {
console.log(`[build-valhalla] No GTFS feed found for ${citySlug} — skipping transit ingest/convert`);
}
}
const configPath = writeCityConfig(citySlug, cityTileDir);
await runProcess("valhalla_build_tiles", ["-c", configPath, pbfPath]);
writeFileSync(readyMarker, new Date().toISOString());
console.log(`[build-valhalla] Tiles ready for ${citySlug}${cityTileDir}`);
}
export async function handleBuildValhalla(job: Job<BuildValhallaData>): Promise<void> {
const { citySlug, pbfPath, bbox, removeSlugs = [] } = job.data; const { citySlug, pbfPath, bbox, removeSlugs = [] } = job.data;
// Always regenerate config to ensure it's valid JSON (not stale/corrupted). mkdirSync(VALHALLA_TILES_BASE, { recursive: true });
await job.updateProgress({
stage: "Building routing graph",
pct: 2,
message: "Writing Valhalla configuration…",
} satisfies JobProgress);
generateConfig();
// ── Step 1: update the routing manifest ────────────────────────────────── // ── Step 1: manifest + removals ───────────────────────────────────────────
const manifest = readManifest(); const manifest = readManifest();
for (const slug of removeSlugs) { for (const slug of removeSlugs) {
const clippedPbf = `${VALHALLA_DATA_DIR}/${slug}-routing.osm.pbf`; const clippedPbf = `${VALHALLA_DATA_DIR}/${slug}-routing.osm.pbf`;
if (existsSync(clippedPbf)) { if (existsSync(clippedPbf)) { unlinkSync(clippedPbf); console.log(`[build-valhalla] Removed clipped PBF for ${slug}`); }
unlinkSync(clippedPbf); const cityTileDir = `${VALHALLA_TILES_BASE}/${slug}`;
console.log(`[build-valhalla] Removed clipped PBF for ${slug}`); if (existsSync(cityTileDir)) { rmSync(cityTileDir, { recursive: true, force: true }); console.log(`[build-valhalla] Removed tile dir for ${slug}`); }
} invalidateActor(INCLUDE_TRANSIT ? "transit" : "road", slug);
delete manifest[slug]; delete manifest[slug];
} }
if (citySlug && pbfPath) { if (citySlug && pbfPath) {
await job.updateProgress({ await job.updateProgress({ stage: "Building routing graph", pct: 5,
stage: "Building routing graph", message: bbox ? `Clipping PBF to city bbox…` : `Registering full PBF for ${citySlug}`,
pct: 5,
message: bbox
? `Clipping PBF to city bbox (may expand for transit coverage)…`
: `Registering full PBF for ${citySlug}`,
} satisfies JobProgress); } satisfies JobProgress);
let routingPbf: string; let routingPbf: string;
if (bbox) { if (bbox) {
const clippedPbf = `${VALHALLA_DATA_DIR}/${citySlug}-routing.osm.pbf`; const clippedPbf = `${VALHALLA_DATA_DIR}/${citySlug}-routing.osm.pbf`;
if (!existsSync(pbfPath)) throw new Error(`PBF file not found: ${pbfPath}`); if (!existsSync(pbfPath)) throw new Error(`PBF file not found: ${pbfPath}`);
let eb: [number, number, number, number] = [bbox[0] - ROAD_BBOX_BUFFER, bbox[1] - ROAD_BBOX_BUFFER, bbox[2] + ROAD_BBOX_BUFFER, bbox[3] + ROAD_BBOX_BUFFER];
const extractBbox: [number, number, number, number] = [ // Transit container: expand road clip to cover transit stop locations.
bbox[0] - ROAD_BBOX_BUFFER, // Stops outside road tile coverage cause valhalla_build_tiles to crash
bbox[1] - ROAD_BBOX_BUFFER, // with "double free or corruption (fasttop)" when it tries to embed
bbox[2] + ROAD_BBOX_BUFFER, // road↔transit connection edges for stops with no nearby road data.
bbox[3] + ROAD_BBOX_BUFFER, // The bbox of seeding stops is written by download-gtfs-de per city.
]; if (INCLUDE_TRANSIT && citySlug) {
const stopsBboxPath = `${cityGtfsFeedDir(citySlug)}/.stops_bbox`;
console.log( if (existsSync(stopsBboxPath)) {
`[build-valhalla] Road extract bbox: city [${bbox.map((v) => v.toFixed(3)).join(", ")}]` + try {
` + ${ROAD_BBOX_BUFFER}° buffer → [${extractBbox.map((v) => v.toFixed(3)).join(", ")}]`, const [sMinLng, sMinLat, sMaxLng, sMaxLat] = JSON.parse(readFileSync(stopsBboxPath, "utf8")) as [number, number, number, number];
); const before = eb.map((v) => v.toFixed(3)).join(", ");
eb = [Math.min(eb[0], sMinLng), Math.min(eb[1], sMinLat), Math.max(eb[2], sMaxLng), Math.max(eb[3], sMaxLat)];
await runProcess("osmium", [ console.log(`[build-valhalla] Transit stops bbox expansion: [${before}] → [${eb.map((v) => v.toFixed(3)).join(", ")}]`);
"extract", } catch (e) {
`--bbox=${extractBbox[0]},${extractBbox[1]},${extractBbox[2]},${extractBbox[3]}`, console.warn("[build-valhalla] Could not read .stops_bbox — using default buffer:", e);
pbfPath, }
"-o", clippedPbf, } else {
"--overwrite", console.warn("[build-valhalla] .stops_bbox not found — GTFS may not be downloaded yet, or stops are outside bbox");
]); }
routingPbf = clippedPbf;
} else {
if (existsSync(pbfPath)) {
routingPbf = pbfPath;
} else {
const { readdirSync } = await import("fs");
const found = readdirSync(OSM_DATA_DIR)
.filter((f) => f.endsWith("-latest.osm.pbf"))
.map((f) => `${OSM_DATA_DIR}/${f}`);
if (found.length === 0) throw new Error(`No PBF files found in ${OSM_DATA_DIR}`);
routingPbf = found[0];
} }
}
await runProcess("osmium", ["extract", `--bbox=${eb[0]},${eb[1]},${eb[2]},${eb[3]}`, pbfPath, "-o", clippedPbf, "--overwrite"]);
routingPbf = clippedPbf;
} else if (existsSync(pbfPath)) {
routingPbf = pbfPath;
} else {
const found = readdirSync(OSM_DATA_DIR).filter((f) => f.endsWith("-latest.osm.pbf")).map((f) => `${OSM_DATA_DIR}/${f}`);
if (found.length === 0) throw new Error(`No PBF files found in ${OSM_DATA_DIR}`);
routingPbf = found[0];
}
manifest[citySlug] = routingPbf; manifest[citySlug] = routingPbf;
} }
writeManifest(manifest); writeManifest(manifest);
// ── Step 2: check for cities to build ──────────────────────────────────── // ── Step 2: cities to build ───────────────────────────────────────────────
const allPbfs = Object.values(manifest).filter(existsSync); const citiesForBuild: { slug: string; pbf: string }[] = [];
const allSlugs = Object.keys(manifest); if (citySlug && manifest[citySlug] && existsSync(manifest[citySlug])) {
citiesForBuild.push({ slug: citySlug, pbf: manifest[citySlug] });
} else if (!citySlug) {
for (const [slug, pbf] of Object.entries(manifest)) {
if (existsSync(pbf)) citiesForBuild.push({ slug, pbf });
}
}
if (allPbfs.length === 0) { if (citiesForBuild.length === 0) {
console.log("[build-valhalla] Manifest is empty — no cities to build routing tiles for."); await job.updateProgress({ stage: "Building routing graph", pct: 100, message: "No cities in manifest, skipping." } satisfies JobProgress);
await job.updateProgress({
stage: "Building routing graph",
pct: 100,
message: "No cities in manifest, skipping tile build.",
} satisfies JobProgress);
return; return;
} }
// ── Step 3: transit ingest + convert ───────────────────────────────────── for (let i = 0; i < citiesForBuild.length; i++) {
// const { slug, pbf } = citiesForBuild[i];
// Build order: ingest → convert → road tiles. const pct = Math.round(20 + (i / citiesForBuild.length) * 75);
// valhalla_build_tiles MUST run after valhalla_convert_transit so it finds await job.updateProgress({
// transit .gph tiles in transit_dir and embeds road↔transit connection edges stage: "Building routing graph", pct,
// in the road tiles. Without convert running first, road tiles have no transit message: `Building ${INCLUDE_TRANSIT ? "transit" : "road"} tiles for ${slug} (${i + 1}/${citiesForBuild.length})…`,
// connections and multimodal routing silently falls back to walking. } satisfies JobProgress);
// valhalla_convert_transit does NOT need road tiles — it only reads the GTFS await buildCityTiles(slug, pbf);
// staging PBFs and writes the transit graph tiles.
const gtfsReady =
existsSync(GTFS_FEED_DIR) &&
readdirSync(GTFS_FEED_DIR).some((f) => f.endsWith(".txt"));
let ingestPbfsAvailable = false;
if (gtfsReady) {
// 3a: timezone database — one-time setup, persists in VALHALLA_DATA_DIR.
// valhalla_ingest_transit needs it to assign timezone info to stops;
// without it the root index tile (0/000/000.pbf) is not written and
// valhalla_convert_transit crashes trying to load it.
if (!existsSync(TIMEZONE_SQLITE)) {
await job.updateProgress({
stage: "Building routing graph",
pct: 10,
message: "Building timezone database (one-time setup)…",
} satisfies JobProgress);
try {
await buildTimezoneDb();
} catch (err) {
console.warn("[build-valhalla] valhalla_build_timezones failed — skipping transit:", err);
}
}
// 3b: ingest (only when GTFS changed, and only when timezone db is ready)
ingestPbfsAvailable = isTransitIngestFresh();
if (!ingestPbfsAvailable && existsSync(TIMEZONE_SQLITE)) {
await job.updateProgress({
stage: "Building routing graph",
pct: 12,
message: "Ingesting GTFS transit feeds…",
} satisfies JobProgress);
try {
// Wipe stale/partial PBF tiles before ingesting.
rmSync(TRANSIT_CACHE_DIR, { recursive: true, force: true });
mkdirSync(TRANSIT_CACHE_DIR, { recursive: true });
await runProcess("valhalla_ingest_transit", ["-c", VALHALLA_CONFIG]);
writeFileSync(TRANSIT_CACHE_MARKER, new Date().toISOString());
ingestPbfsAvailable = true;
console.log("[build-valhalla] valhalla_ingest_transit completed");
} catch (err) {
console.warn("[build-valhalla] valhalla_ingest_transit failed (road routing unaffected):", err);
// Wipe partial output so convert doesn't try to read corrupt PBFs.
rmSync(TRANSIT_CACHE_DIR, { recursive: true, force: true });
mkdirSync(TRANSIT_CACHE_DIR, { recursive: true });
}
} else if (ingestPbfsAvailable) {
await job.updateProgress({
stage: "Building routing graph",
pct: 12,
message: "Transit ingest cache is fresh — skipping re-ingest",
} satisfies JobProgress);
} else {
console.log("[build-valhalla] timezone.sqlite unavailable — skipping transit ingest");
}
// 3c: convert transit PBF staging files → transit graph tiles (.gph)
// Runs even when ingest was skipped (cache fresh) so transit_dir always
// has up-to-date .gph tiles before valhalla_build_tiles reads them.
if (ingestPbfsAvailable) {
await job.updateProgress({
stage: "Building routing graph",
pct: 15,
message: "Converting transit staging tiles to graph tiles…",
} satisfies JobProgress);
try {
await runProcess("valhalla_convert_transit", ["-c", VALHALLA_CONFIG]);
console.log("[build-valhalla] valhalla_convert_transit completed");
} catch (err) {
console.warn("[build-valhalla] valhalla_convert_transit failed (road routing unaffected):", err);
}
}
} else {
console.log("[build-valhalla] No GTFS feed found — skipping transit ingest/convert");
} }
// ── Step 4: build road tiles ──────────────────────────────────────────────
//
// Runs AFTER valhalla_convert_transit so transit .gph tiles are present in
// transit_dir. valhalla_build_tiles reads them, embeds road↔transit connection
// edges in the road tiles, and copies transit tiles into tile_dir/3/.
// Without transit tiles present at this step, road tiles have no transit
// connections and multimodal routing silently falls back to walking.
await job.updateProgress({ await job.updateProgress({
stage: "Building routing graph", stage: "Building routing graph", pct: 100,
pct: 20, message: `Routing graph ready — covers: ${citiesForBuild.map((c) => c.slug).join(", ")}`,
message: `Building road routing tiles for: ${allSlugs.join(", ")}`,
} satisfies JobProgress);
await runProcess("valhalla_build_tiles", ["-c", VALHALLA_CONFIG, ...allPbfs]);
console.log("[build-valhalla] Road tiles built");
// ── Step 5: restart Valhalla service ─────────────────────────────────────
await job.updateProgress({
stage: "Building routing graph",
pct: 95,
message: "Tiles built — restarting Valhalla service…",
} satisfies JobProgress);
await restartService();
await job.updateProgress({
stage: "Building routing graph",
pct: 100,
message: `Routing graph ready — covers: ${allSlugs.join(", ")}`,
} satisfies JobProgress); } satisfies JobProgress);
} }

View file

@ -1,7 +1,7 @@
import type { Job } from "bullmq"; import type { Job } from "bullmq";
import { getSql } from "../db.js"; import { getSql } from "../db.js";
import type { JobProgress } from "@transportationer/shared"; import type { JobProgress } from "@transportationer/shared";
import { fetchMatrix } from "../valhalla.js"; import { dispatchMatrix } from "../routing-client.js";
export type ComputeRoutingData = { export type ComputeRoutingData = {
type: "compute-routing"; type: "compute-routing";
@ -120,10 +120,10 @@ export async function handleComputeRouting(job: Job<ComputeRoutingData>): Promis
const sources = batch.map((gp) => ({ lat: gp.lat, lng: gp.lng })); const sources = batch.map((gp) => ({ lat: gp.lat, lng: gp.lng }));
let matrix: (number | null)[][]; let matrix: (number | null)[][];
try { try {
matrix = await fetchMatrix(sources, targets, mode); matrix = await dispatchMatrix(sources, targets, mode, citySlug);
} catch (err) { } catch (err) {
console.error( console.error(
`[compute-routing] Valhalla failed (${mode}/${category}, batch ${batchesDone}):`, `[compute-routing] Matrix dispatch failed (${mode}/${category}, batch ${batchesDone}):`,
(err as Error).message, (err as Error).message,
); );
return; return;

View file

@ -220,12 +220,12 @@ export async function handleComputeScores(
SELECT grid_point_id, category, subcategory, travel_mode, travel_time_s FROM transit_walk_subcat SELECT grid_point_id, category, subcategory, travel_mode, travel_time_s FROM transit_walk_subcat
), ),
road_nearest AS ( road_nearest AS (
-- Nearest POI per (grid_point, category, mode) by distance -- Nearest POI per (grid_point, category, mode) by travel time
SELECT DISTINCT ON (grid_point_id, category, travel_mode) SELECT DISTINCT ON (grid_point_id, category, travel_mode)
grid_point_id, category, travel_mode, nearest_poi_id, distance_m, travel_time_s grid_point_id, category, travel_mode, nearest_poi_id, distance_m, travel_time_s
FROM base FROM base
WHERE nearest_poi_id IS NOT NULL WHERE nearest_poi_id IS NOT NULL
ORDER BY grid_point_id, category, travel_mode, distance_m ORDER BY grid_point_id, category, travel_mode, travel_time_s ASC NULLS LAST
), ),
cyclist_nearest AS ( cyclist_nearest AS (
SELECT DISTINCT ON (grid_point_id, category) SELECT DISTINCT ON (grid_point_id, category)
@ -235,7 +235,7 @@ export async function handleComputeScores(
FROM base FROM base
WHERE travel_mode IN ('walking', 'cycling', 'transit') WHERE travel_mode IN ('walking', 'cycling', 'transit')
AND nearest_poi_id IS NOT NULL AND nearest_poi_id IS NOT NULL
ORDER BY grid_point_id, category, distance_m ORDER BY grid_point_id, category, travel_time_s ASC NULLS LAST
), ),
cycling_walk_nearest AS ( cycling_walk_nearest AS (
SELECT DISTINCT ON (grid_point_id, category) SELECT DISTINCT ON (grid_point_id, category)
@ -245,7 +245,7 @@ export async function handleComputeScores(
FROM base FROM base
WHERE travel_mode IN ('walking', 'cycling') WHERE travel_mode IN ('walking', 'cycling')
AND nearest_poi_id IS NOT NULL AND nearest_poi_id IS NOT NULL
ORDER BY grid_point_id, category, distance_m ORDER BY grid_point_id, category, travel_time_s ASC NULLS LAST
), ),
transit_walk_nearest AS ( transit_walk_nearest AS (
SELECT DISTINCT ON (grid_point_id, category) SELECT DISTINCT ON (grid_point_id, category)
@ -255,7 +255,7 @@ export async function handleComputeScores(
FROM base FROM base
WHERE travel_mode IN ('walking', 'transit') WHERE travel_mode IN ('walking', 'transit')
AND nearest_poi_id IS NOT NULL AND nearest_poi_id IS NOT NULL
ORDER BY grid_point_id, category, distance_m ORDER BY grid_point_id, category, travel_time_s ASC NULLS LAST
), ),
all_nearest AS ( all_nearest AS (
SELECT * FROM road_nearest SELECT * FROM road_nearest
@ -279,9 +279,9 @@ export async function handleComputeScores(
), ),
scores AS ( scores AS (
-- Complement-product score per (grid_point, category, mode, threshold, profile). -- Complement-product score per (grid_point, category, mode, threshold, profile).
-- sigmoid(t) = 1 / (1 + exp((t T) / (T/6))) where T = threshold in seconds. -- proximity(t) = exp(t / T) where T = threshold in seconds.
-- score = 1 (1 w·sigmoid) computed via EXP(SUM(LN(complement))). -- score = 1 (1 w·proximity) computed via EXP(SUM(LN(complement))).
-- NULL travel_time_s sigmoid = 0 complement = 1 LN(1) = 0 (no penalty). -- NULL travel_time_s proximity = 0 complement = 1 LN(1) = 0 (no penalty).
SELECT SELECT
s.grid_point_id, s.grid_point_id,
s.category, s.category,
@ -293,7 +293,7 @@ export async function handleComputeScores(
1.0 - COALESCE(pw.weight, ${DEFAULT_SUBCATEGORY_WEIGHT}::float8) 1.0 - COALESCE(pw.weight, ${DEFAULT_SUBCATEGORY_WEIGHT}::float8)
* CASE * CASE
WHEN s.travel_time_s IS NULL THEN 0.0 WHEN s.travel_time_s IS NULL THEN 0.0
ELSE EXP(-3.0 * s.travel_time_s / (t.threshold_min * 60.0)) ELSE EXP(-1.0 * s.travel_time_s / (t.threshold_min * 60.0))
END, END,
1e-10 1e-10
)) ))

View file

@ -20,10 +20,12 @@
*/ */
import type { Job } from "bullmq"; import type { Job } from "bullmq";
import { getSql } from "../db.js"; import { getSql } from "../db.js";
import { fetchTransitIsochrone, parseTransitContours, TRANSIT_CONTOUR_MINUTES } from "../valhalla.js"; import { dispatchTransitIsochrone, parseTransitContours } from "../routing-client.js";
import type { JobProgress } from "@transportationer/shared"; import type { JobProgress } from "@transportationer/shared";
import { CATEGORY_IDS, nextTuesdayDeparture } from "@transportationer/shared"; import { CATEGORY_IDS, nextTuesdayDeparture } from "@transportationer/shared";
const TRANSIT_CONTOUR_MINUTES = [5, 10, 15, 20, 30] as const;
export type ComputeTransitData = { export type ComputeTransitData = {
type: "compute-transit"; type: "compute-transit";
citySlug: string; citySlug: string;
@ -46,11 +48,12 @@ async function asyncPool<T>(
await Promise.all(Array.from({ length: Math.min(concurrency, items.length) }, worker)); await Promise.all(Array.from({ length: Math.min(concurrency, items.length) }, worker));
} }
/** Check isochrone cache then call Valhalla, writing the result back to cache. */ /** Check DB isochrone cache, then dispatch to routing-transit queue on miss. */
async function getTransitIsochrone( async function getTransitIsochrone(
sql: ReturnType<typeof getSql>, sql: ReturnType<typeof getSql>,
gp: { lat: number; lng: number }, gp: { lat: number; lng: number },
departureDate: string, departureDate: string,
citySlug: string,
) { ) {
type CacheRow = { result: object }; type CacheRow = { result: object };
const cached = await Promise.resolve(sql<CacheRow[]>` const cached = await Promise.resolve(sql<CacheRow[]>`
@ -70,7 +73,8 @@ async function getTransitIsochrone(
return parseTransitContours(cached[0].result); return parseTransitContours(cached[0].result);
} }
const contours = await fetchTransitIsochrone({ lat: gp.lat, lng: gp.lng }, departureDate); // Dispatch to valhalla-transit routing queue and wait for result.
const contours = await dispatchTransitIsochrone(gp, departureDate, citySlug);
if (contours) { if (contours) {
const geojson = { const geojson = {
@ -145,7 +149,7 @@ export async function handleComputeTransit(job: Job<ComputeTransitData>): Promis
let withTransit = 0; let withTransit = 0;
await asyncPool(BATCH_CONCURRENCY, gridPoints, async (gp) => { await asyncPool(BATCH_CONCURRENCY, gridPoints, async (gp) => {
const contours = await getTransitIsochrone(sql, gp, departureDate); const contours = await getTransitIsochrone(sql, gp, departureDate, citySlug);
processed++; processed++;
if (!contours || contours.length === 0) { if (!contours || contours.length === 0) {

View file

@ -1,19 +1,14 @@
/** /**
* Download and extract a GTFS feed ZIP so Valhalla can build transit tiles. * Download and filter a GTFS feed per city so Valhalla can build transit tiles.
* *
* The feed is saved to GTFS_DATA_DIR (default /data/valhalla/gtfs) inside the * The raw (unfiltered) feed is downloaded once and cached in GTFS_DATA_DIR/raw/.
* valhalla container, which owns the valhalla_tiles Docker volume. * Subsequent calls for other cities re-use the raw cache without re-downloading.
* *
* After extraction the feed is clipped to the bounding boxes of each known city * A per-city filtered feed is written to GTFS_DATA_DIR/{citySlug}/feed/ and
* (plus a small buffer) so that valhalla_ingest_transit only processes stops and * clipped to the city's bounding box. This directory is the transit_feeds_dir
* trips that are relevant reducing ingest time from hours to minutes for * for that city's valhalla_ingest_transit + valhalla_build_tiles run.
* country-wide feeds like gtfs.de.
*
* After this job completes, the next build-valhalla run will automatically
* call valhalla_ingest_transit and produce transit-capable routing tiles.
* *
* Source: https://download.gtfs.de/germany/nv_free/latest.zip * Source: https://download.gtfs.de/germany/nv_free/latest.zip
* Covers all German ÖPNV (local public transport), updated regularly.
*/ */
import type { Job } from "bullmq"; import type { Job } from "bullmq";
import { import {
@ -26,6 +21,7 @@ import {
rmSync, rmSync,
readFileSync, readFileSync,
writeFileSync, writeFileSync,
copyFileSync,
} from "fs"; } from "fs";
import { mkdir } from "fs/promises"; import { mkdir } from "fs/promises";
import { pipeline } from "stream/promises"; import { pipeline } from "stream/promises";
@ -38,197 +34,152 @@ import type { JobProgress } from "@transportationer/shared";
export type DownloadGtfsDeData = { export type DownloadGtfsDeData = {
type: "download-gtfs-de"; type: "download-gtfs-de";
url: string; url: string;
/** Re-download even if data already exists (default: false) */ citySlug: string;
/** City bbox [minLng, minLat, maxLng, maxLat] already including buffer. */
bbox: [number, number, number, number];
/** Re-download even if raw data already exists (default: false) */
force?: boolean; force?: boolean;
/**
* Per-city bounding boxes [minLng, minLat, maxLng, maxLat] used to clip the
* feed after extraction. Each bbox should already include a buffer. A stop is
* kept when it falls inside ANY of the boxes. When absent the full feed is kept.
*/
bboxes?: [number, number, number, number][];
}; };
const GTFS_DATA_DIR = process.env.GTFS_DATA_DIR ?? "/data/valhalla/gtfs"; const GTFS_DATA_DIR = process.env.GTFS_DATA_DIR ?? "/data/valhalla/gtfs";
/** Global raw (unfiltered) feed — downloaded once, shared across all cities. */
const GTFS_RAW_DIR = `${GTFS_DATA_DIR}/raw`;
const GTFS_ZIP_PATH = `${GTFS_DATA_DIR}/feed.zip`; const GTFS_ZIP_PATH = `${GTFS_DATA_DIR}/feed.zip`;
const GTFS_FEED_DIR = `${GTFS_DATA_DIR}/feed`; /** Records which source URL populated GTFS_RAW_DIR. */
/** Records which source/bboxes/algorithm last populated GTFS_FEED_DIR. JSON format. */ const RAW_MARKER = `${GTFS_RAW_DIR}/.source`;
const SOURCE_MARKER = `${GTFS_FEED_DIR}/.source`;
/** /**
* Bump this when the filtering algorithm changes in a way that produces * Bump this when the filtering algorithm changes in a way that produces
* different output from the same source + bboxes. This forces a re-filter * different output from the same source + bbox. Forces a re-filter on the
* on the existing extracted data without re-downloading. * existing raw data without re-downloading.
*/ */
const FILTER_VERSION = 2; const FILTER_VERSION = 2;
// ─── Source marker helpers ──────────────────────────────────────────────────── interface RawMarker { source: string }
interface CityMarker { source: string; bbox: [number, number, number, number]; filterVersion: number }
interface SourceMarker { function readRawMarker(): RawMarker | null {
source: string; if (!existsSync(RAW_MARKER)) return null;
bboxes?: [number, number, number, number][]; try { return JSON.parse(readFileSync(RAW_MARKER, "utf8")) as RawMarker; } catch { return null; }
filterVersion?: number;
} }
function readSourceMarker(): SourceMarker | null { function cityFeedDir(citySlug: string): string { return `${GTFS_DATA_DIR}/${citySlug}/feed`; }
if (!existsSync(SOURCE_MARKER)) return null; function cityMarkerPath(citySlug: string): string { return `${cityFeedDir(citySlug)}/.source`; }
const content = readFileSync(SOURCE_MARKER, "utf8").trim();
try { function readCityMarker(citySlug: string): CityMarker | null {
return JSON.parse(content) as SourceMarker; const p = cityMarkerPath(citySlug);
} catch { if (!existsSync(p)) return null;
// Legacy format: plain string written by older versions try { return JSON.parse(readFileSync(p, "utf8")) as CityMarker; } catch { return null; }
return { source: content };
}
} }
function writeSourceMarker(source: string, bboxes?: [number, number, number, number][]): void { function writeCityMarker(citySlug: string, source: string, bbox: [number, number, number, number]): void {
writeFileSync(SOURCE_MARKER, JSON.stringify({ source, bboxes, filterVersion: FILTER_VERSION })); writeFileSync(cityMarkerPath(citySlug), JSON.stringify({ source, bbox, filterVersion: FILTER_VERSION }));
} }
/** True when `outer` fully contains `inner`. */ function bboxEqual(a: [number,number,number,number], b: [number,number,number,number]): boolean {
function bboxContains( return a[0] === b[0] && a[1] === b[1] && a[2] === b[2] && a[3] === b[3];
outer: [number, number, number, number],
inner: [number, number, number, number],
): boolean {
return outer[0] <= inner[0] && outer[1] <= inner[1] && outer[2] >= inner[2] && outer[3] >= inner[3];
}
/**
* True when every bbox in `requested` is covered by at least one bbox in `existing`.
* If `existing` is empty/absent the data was unfiltered, which covers everything.
*/
function allBboxesCovered(
existing: [number, number, number, number][] | undefined,
requested: [number, number, number, number][],
): boolean {
if (!existing || existing.length === 0) return true; // unfiltered → covers all
return requested.every((req) => existing.some((ex) => bboxContains(ex, req)));
} }
// ─── GTFS bbox filter ───────────────────────────────────────────────────────── // ─── GTFS bbox filter ─────────────────────────────────────────────────────────
/** function splitCsv(line: string): string[] {
* Clip an extracted GTFS feed in-place to the union of the given bboxes. if (!line.includes('"')) return line.split(",");
* const result: string[] = [];
* Algorithm: let current = "";
* 1. Filter stops.txt: keep stops whose lat/lon falls inside ANY bbox. let inQuotes = false;
* 2. Pass 1 over stop_times.txt (streaming): collect trip_ids with 1 stop for (let i = 0; i < line.length; i++) {
* inside a bbox. const ch = line[i];
* 3. Pass 2 over stop_times.txt (streaming): write filtered rows to a temp if (ch === '"') {
* file, then replace the original. if (inQuotes && line[i + 1] === '"') { current += '"'; i++; }
* 4. Filter trips.txt collect validRouteIds / validServiceIds / validShapeIds. else inQuotes = !inQuotes;
* 5. Filter routes.txt, calendar.txt, calendar_dates.txt. } else if (ch === "," && !inQuotes) {
* 6. Stream-filter shapes.txt (can be large). result.push(current); current = "";
*/ } else {
async function filterGtfsByBboxes( current += ch;
feedDir: string, }
bboxes: [number, number, number, number][], }
result.push(current);
return result;
}
function colIndex(header: string): Map<string, number> {
return new Map(splitCsv(header).map((c, i) => [c.trim().replace(/^\uFEFF/, ""), i]));
}
function inBbox(lat: number, lon: number, bbox: [number,number,number,number]): boolean {
const [minLng, minLat, maxLng, maxLat] = bbox;
return lat >= minLat && lat <= maxLat && lon >= minLng && lon <= maxLng;
}
function filterSmallCsv(
filePath: string,
keepRow: (idx: Map<string, number>, fields: string[]) => boolean,
onKept?: (idx: Map<string, number>, fields: string[]) => void,
): void {
if (!existsSync(filePath)) return;
const lines = readFileSync(filePath, "utf8").split(/\r?\n/).filter((l) => l.trim());
if (lines.length < 2) return;
const idx = colIndex(lines[0]);
const out = [lines[0]];
for (let i = 1; i < lines.length; i++) {
const fields = splitCsv(lines[i]);
if (keepRow(idx, fields)) {
if (onKept) onKept(idx, fields);
out.push(lines[i]);
}
}
writeFileSync(filePath, out.join("\n") + "\n");
}
async function filterLargeCsv(
srcPath: string,
destPath: string,
keepRow: (targetCol: number, line: string) => boolean,
getTargetCol: (idx: Map<string, number>) => number,
): Promise<void> { ): Promise<void> {
if (bboxes.length === 0) return; if (!existsSync(srcPath)) return;
const tmpPath = destPath + ".tmp";
console.log( const writer = createWriteStream(tmpPath);
`[download-gtfs-de] Filtering GTFS to ${bboxes.length} bbox(es):`, let isFirst = true;
bboxes.map((b) => `[${b.map((v) => v.toFixed(3)).join(",")}]`).join(" "), let targetCol = -1;
const rl = createInterface({ input: createReadStream(srcPath), crlfDelay: Infinity });
for await (const line of rl) {
if (!line.trim()) continue;
if (isFirst) {
isFirst = false;
targetCol = getTargetCol(colIndex(line));
writer.write(line + "\n");
continue;
}
if (keepRow(targetCol, line)) writer.write(line + "\n");
}
await new Promise<void>((resolve, reject) =>
writer.end((err?: unknown) => (err ? reject(err) : resolve())),
); );
renameSync(tmpPath, destPath);
}
// ── CSV helpers ───────────────────────────────────────────────────────────── /**
* Filter raw GTFS feed to a single city bbox.
* Reads from rawDir, writes to destDir.
* Also writes .stops_bbox (tight bbox of retained stops) for build-valhalla.
*/
async function filterGtfsForCity(
rawDir: string,
destDir: string,
bbox: [number, number, number, number],
): Promise<void> {
console.log(`[download-gtfs-de] Filtering GTFS to bbox [${bbox.map((v) => v.toFixed(3)).join(",")}] → ${destDir}`);
function splitCsv(line: string): string[] { const stopsPath = path.join(rawDir, "stops.txt");
if (!line.includes('"')) return line.split(",");
const result: string[] = [];
let current = "";
let inQuotes = false;
for (let i = 0; i < line.length; i++) {
const ch = line[i];
if (ch === '"') {
if (inQuotes && line[i + 1] === '"') { current += '"'; i++; }
else inQuotes = !inQuotes;
} else if (ch === "," && !inQuotes) {
result.push(current); current = "";
} else {
current += ch;
}
}
result.push(current);
return result;
}
function colIndex(header: string): Map<string, number> {
return new Map(splitCsv(header).map((c, i) => [c.trim().replace(/^\uFEFF/, ""), i]));
}
function inAnyBbox(lat: number, lon: number): boolean {
return bboxes.some(([minLng, minLat, maxLng, maxLat]) =>
lat >= minLat && lat <= maxLat && lon >= minLng && lon <= maxLng,
);
}
/** Filter a small CSV file (fits in memory) in-place. */
function filterSmallCsv(
filePath: string,
keepRow: (idx: Map<string, number>, fields: string[]) => boolean,
onKept?: (idx: Map<string, number>, fields: string[]) => void,
): void {
if (!existsSync(filePath)) return;
const lines = readFileSync(filePath, "utf8").split(/\r?\n/).filter((l) => l.trim());
if (lines.length < 2) return;
const idx = colIndex(lines[0]);
const out = [lines[0]];
for (let i = 1; i < lines.length; i++) {
const fields = splitCsv(lines[i]);
if (keepRow(idx, fields)) {
if (onKept) onKept(idx, fields);
out.push(lines[i]);
}
}
writeFileSync(filePath, out.join("\n") + "\n");
}
/** Stream-filter a large CSV file in-place via a temp file. */
async function filterLargeCsv(
filePath: string,
keepRow: (targetCol: number, line: string) => boolean,
getTargetCol: (idx: Map<string, number>) => number,
): Promise<void> {
if (!existsSync(filePath)) return;
const tmpPath = filePath + ".tmp";
const writer = createWriteStream(tmpPath);
let isFirst = true;
let targetCol = -1;
const rl = createInterface({ input: createReadStream(filePath), crlfDelay: Infinity });
for await (const line of rl) {
if (!line.trim()) continue;
if (isFirst) {
isFirst = false;
targetCol = getTargetCol(colIndex(line));
writer.write(line + "\n");
continue;
}
if (keepRow(targetCol, line)) writer.write(line + "\n");
}
await new Promise<void>((resolve, reject) =>
writer.end((err?: unknown) => (err ? reject(err) : resolve())),
);
renameSync(tmpPath, filePath);
}
// ── Step 1: collect bbox stop IDs (read-only — stops.txt not written yet) ──
//
// Build the set of stops within the bbox — used to seed validTripIds (step 2a)
// and to filter stop_times to local stops only (step 2b). stops.txt itself is
// filtered in step 3 to only bbox stops that appear in the final stop_times.
const stopsPath = path.join(feedDir, "stops.txt");
if (!existsSync(stopsPath)) { if (!existsSync(stopsPath)) {
console.log("[download-gtfs-de] No stops.txt — skipping GTFS bbox filter"); console.log("[download-gtfs-de] No stops.txt in raw dir — skipping filter");
return; return;
} }
// ── Step 1: collect bbox stop IDs ──────────────────────────────────────────
const bboxStopIds = new Set<string>(); const bboxStopIds = new Set<string>();
// Also track the bbox of seeding stops — used later to expand the road tile
// extraction in build-valhalla to cover these stops without expanding to the
// full retained-stops area (which includes Germany-wide long-distance trip stops).
let seedMinLng = Infinity, seedMinLat = Infinity, seedMaxLng = -Infinity, seedMaxLat = -Infinity; let seedMinLng = Infinity, seedMinLat = Infinity, seedMaxLng = -Infinity, seedMaxLat = -Infinity;
{ {
const lines = readFileSync(stopsPath, "utf8").split(/\r?\n/).filter((l) => l.trim()); const lines = readFileSync(stopsPath, "utf8").split(/\r?\n/).filter((l) => l.trim());
@ -241,7 +192,7 @@ async function filterGtfsByBboxes(
const fields = splitCsv(lines[i]); const fields = splitCsv(lines[i]);
const lat = parseFloat(fields[latCol] ?? "NaN"); const lat = parseFloat(fields[latCol] ?? "NaN");
const lon = parseFloat(fields[lonCol] ?? "NaN"); const lon = parseFloat(fields[lonCol] ?? "NaN");
if (inAnyBbox(lat, lon)) { if (inBbox(lat, lon, bbox)) {
bboxStopIds.add(fields[stopIdCol] ?? ""); bboxStopIds.add(fields[stopIdCol] ?? "");
if (isFinite(lat) && isFinite(lon)) { if (isFinite(lat) && isFinite(lon)) {
seedMinLat = Math.min(seedMinLat, lat); seedMaxLat = Math.max(seedMaxLat, lat); seedMinLat = Math.min(seedMinLat, lat); seedMaxLat = Math.max(seedMaxLat, lat);
@ -251,33 +202,25 @@ async function filterGtfsByBboxes(
} }
} }
} }
console.log(`[download-gtfs-de] Bbox filter: ${bboxStopIds.size} stops seed the area`); console.log(`[download-gtfs-de] ${bboxStopIds.size} stops in bbox`);
if (bboxStopIds.size === 0) { if (bboxStopIds.size === 0) {
console.warn( console.warn("[download-gtfs-de] No stops found in bbox — GTFS filter skipped");
"[download-gtfs-de] No stops found in any bbox — GTFS filter skipped " +
"(check bbox coverage and feed area)",
);
return; return;
} }
// ── Step 2a: collect trip_ids that serve the area (pass 1) ──────────────── // ── Step 2a: collect trip_ids with ≥2 bbox stops ───────────────────────────
const stopTimesRaw = path.join(rawDir, "stop_times.txt");
const stopTimesPath = path.join(feedDir, "stop_times.txt"); if (!existsSync(stopTimesRaw)) {
if (!existsSync(stopTimesPath)) { console.log("[download-gtfs-de] No stop_times.txt — skipping");
console.log("[download-gtfs-de] No stop_times.txt — skipping trip filter");
return; return;
} }
const validTripIds = new Set<string>(); const validTripIds = new Set<string>();
// Count how many bbox-local stops each trip has — trips with only 1 bbox
// stop are useless for routing (no O→D pair) and are pruned before step 2b.
const tripBboxStopCount = new Map<string, number>(); const tripBboxStopCount = new Map<string, number>();
{ {
let stopIdCol = -1; let stopIdCol = -1, tripIdCol = -1, isFirst = true;
let tripIdCol = -1; const rl = createInterface({ input: createReadStream(stopTimesRaw), crlfDelay: Infinity });
let isFirst = true;
const rl = createInterface({ input: createReadStream(stopTimesPath), crlfDelay: Infinity });
for await (const line of rl) { for await (const line of rl) {
if (!line.trim()) continue; if (!line.trim()) continue;
if (isFirst) { if (isFirst) {
@ -290,39 +233,37 @@ async function filterGtfsByBboxes(
const fields = line.split(","); const fields = line.split(",");
const tripId = fields[tripIdCol] ?? ""; const tripId = fields[tripIdCol] ?? "";
const stopId = fields[stopIdCol] ?? ""; const stopId = fields[stopIdCol] ?? "";
if (stopIdCol >= 0 && bboxStopIds.has(stopId)) { if (bboxStopIds.has(stopId)) {
validTripIds.add(tripId); validTripIds.add(tripId);
tripBboxStopCount.set(tripId, (tripBboxStopCount.get(tripId) ?? 0) + 1); tripBboxStopCount.set(tripId, (tripBboxStopCount.get(tripId) ?? 0) + 1);
} }
} }
} }
// Remove trips with only one bbox stop — they can't provide an O→D pair
for (const tripId of validTripIds) { for (const tripId of validTripIds) {
if ((tripBboxStopCount.get(tripId) ?? 0) < 2) validTripIds.delete(tripId); if ((tripBboxStopCount.get(tripId) ?? 0) < 2) validTripIds.delete(tripId);
} }
console.log(`[download-gtfs-de] Bbox filter: ${validTripIds.size} trips with ≥2 bbox stops serve the area`); console.log(`[download-gtfs-de] ${validTripIds.size} trips with ≥2 bbox stops`);
// ── Step 2b: write filtered stop_times, keeping only bbox stops (pass 2) ───
//
// We keep a stop_times entry only when BOTH:
// - its trip has ≥2 bbox stops (trip_id ∈ validTripIds), AND
// - the stop itself is within the city bbox (stop_id ∈ bboxStopIds).
//
// Out-of-bbox stops on long-distance routes (e.g. ICE Hamburg↔Bremen passing
// through Oldenburg) are stripped from stop_times. Trips with only one bbox
// stop are removed entirely (no O→D pair, useless for routing). This limits
// the transit graph to local stops only, ensuring valhalla_build_tiles can
// create road connections for all included stops without ghost edge references
// that cause routing errors for other modes (bicycle, driving).
// ── Step 2b: write filtered stop_times (bbox stops on valid trips only) ─────
const allTripStopIds = new Set<string>(); const allTripStopIds = new Set<string>();
await filterLargeCsv(
stopTimesRaw,
path.join(destDir, "stop_times.txt"),
(tripCol, line) => {
const fields = line.split(",");
const tripId = fields[tripCol] ?? "";
const stopId = fields[tripCol + 1] ?? ""; // wrong index — handled below
return validTripIds.has(tripId);
},
(idx) => idx.get("trip_id") ?? -1,
);
// Re-read filtered stop_times to collect actual stop IDs and also filter to bbox stops only
{ {
const tmpPath = stopTimesPath + ".tmp"; const tmpPath = path.join(destDir, "stop_times.txt") + ".tmp2";
const writer = createWriteStream(tmpPath); const writer = createWriteStream(tmpPath);
let isFirst = true; let isFirst = true;
let tripIdCol = -1; let tripIdCol = -1, stopIdCol = -1;
let stopIdCol = -1; const rl = createInterface({ input: createReadStream(path.join(destDir, "stop_times.txt")), crlfDelay: Infinity });
const rl = createInterface({ input: createReadStream(stopTimesPath), crlfDelay: Infinity });
for await (const line of rl) { for await (const line of rl) {
if (!line.trim()) continue; if (!line.trim()) continue;
if (isFirst) { if (isFirst) {
@ -334,242 +275,196 @@ async function filterGtfsByBboxes(
continue; continue;
} }
const fields = line.split(","); const fields = line.split(",");
if ( const tripId = fields[tripIdCol] ?? "";
validTripIds.has(fields[tripIdCol] ?? "") && const stopId = fields[stopIdCol] ?? "";
bboxStopIds.has(fields[stopIdCol] ?? "") if (validTripIds.has(tripId) && bboxStopIds.has(stopId)) {
) { allTripStopIds.add(stopId);
allTripStopIds.add(fields[stopIdCol] ?? "");
writer.write(line + "\n"); writer.write(line + "\n");
} }
} }
await new Promise<void>((resolve, reject) => await new Promise<void>((resolve, reject) =>
writer.end((err?: unknown) => (err ? reject(err) : resolve())), writer.end((err?: unknown) => (err ? reject(err) : resolve())),
); );
renameSync(tmpPath, stopTimesPath); renameSync(tmpPath, path.join(destDir, "stop_times.txt"));
} }
// ── Step 3: filter stops.txt to bbox stops used by kept trips ────────────── // ── Step 3: filter stops.txt (only stops that appear in final stop_times) ───
{
filterSmallCsv( const dest = path.join(destDir, "stops.txt");
stopsPath, const lines = readFileSync(stopsPath, "utf8").split(/\r?\n/).filter((l) => l.trim());
(idx, fields) => allTripStopIds.has(fields[idx.get("stop_id") ?? -1] ?? ""), if (lines.length >= 2) {
); const idx = colIndex(lines[0]);
const stopIdCol = idx.get("stop_id") ?? -1;
const out = [lines[0]];
for (let i = 1; i < lines.length; i++) {
const fields = splitCsv(lines[i]);
if (allTripStopIds.has(fields[stopIdCol] ?? "")) out.push(lines[i]);
}
writeFileSync(dest, out.join("\n") + "\n");
}
}
if (isFinite(seedMinLat)) { if (isFinite(seedMinLat)) {
const stopsBbox: [number, number, number, number] = [seedMinLng, seedMinLat, seedMaxLng, seedMaxLat]; const stopsBbox: [number, number, number, number] = [seedMinLng, seedMinLat, seedMaxLng, seedMaxLat];
writeFileSync(path.join(feedDir, ".stops_bbox"), JSON.stringify(stopsBbox)); writeFileSync(path.join(destDir, ".stops_bbox"), JSON.stringify(stopsBbox));
console.log( console.log(`[download-gtfs-de] Transit stops bbox: [${stopsBbox.map((v) => v.toFixed(3)).join(", ")}]`);
`[download-gtfs-de] Transit stops bbox (seeding area): [${stopsBbox.map((v) => v.toFixed(3)).join(", ")}]`,
);
} }
console.log(`[download-gtfs-de] Bbox filter: ${allTripStopIds.size} bbox stops kept across ${validTripIds.size} trips`);
// ── Step 4: filter trips.txt ───────────────────────────────────────────────
// ── Step 4: filter trips.txt → collect route/service/shape IDs ─────────────
const validRouteIds = new Set<string>(); const validRouteIds = new Set<string>();
const validServiceIds = new Set<string>(); const validServiceIds = new Set<string>();
const validShapeIds = new Set<string>(); const validShapeIds = new Set<string>();
{
filterSmallCsv( const src = path.join(rawDir, "trips.txt");
path.join(feedDir, "trips.txt"), const dest = path.join(destDir, "trips.txt");
(idx, fields) => validTripIds.has(fields[idx.get("trip_id") ?? -1] ?? ""), if (existsSync(src)) {
(idx, fields) => { const lines = readFileSync(src, "utf8").split(/\r?\n/).filter((l) => l.trim());
validRouteIds.add(fields[idx.get("route_id") ?? -1] ?? ""); if (lines.length >= 2) {
validServiceIds.add(fields[idx.get("service_id") ?? -1] ?? ""); const idx = colIndex(lines[0]);
const shapeId = fields[idx.get("shape_id") ?? -1] ?? ""; const tripIdCol = idx.get("trip_id") ?? -1;
if (shapeId) validShapeIds.add(shapeId); const routeIdCol = idx.get("route_id") ?? -1;
}, const serviceIdCol = idx.get("service_id") ?? -1;
); const shapeIdCol = idx.get("shape_id") ?? -1;
const out = [lines[0]];
// ── Step 5: filter remaining files ──────────────────────────────────────── for (let i = 1; i < lines.length; i++) {
const fields = splitCsv(lines[i]);
filterSmallCsv( if (validTripIds.has(fields[tripIdCol] ?? "")) {
path.join(feedDir, "routes.txt"), out.push(lines[i]);
(idx, fields) => validRouteIds.has(fields[idx.get("route_id") ?? -1] ?? ""), validRouteIds.add(fields[routeIdCol] ?? "");
); validServiceIds.add(fields[serviceIdCol] ?? "");
const shapeId = fields[shapeIdCol] ?? "";
for (const name of ["calendar.txt", "calendar_dates.txt"] as const) { if (shapeId) validShapeIds.add(shapeId);
filterSmallCsv( }
path.join(feedDir, name), }
(idx, fields) => validServiceIds.has(fields[idx.get("service_id") ?? -1] ?? ""), writeFileSync(dest, out.join("\n") + "\n");
); }
}
} }
// shapes.txt can be large — stream it // ── Step 5: filter routes, calendar, calendar_dates ────────────────────────
for (const [file, idCol, validIds] of [
["routes.txt", "route_id", validRouteIds],
["calendar.txt", "service_id", validServiceIds],
["calendar_dates.txt", "service_id", validServiceIds],
] as const) {
const src = path.join(rawDir, file);
const dest = path.join(destDir, file);
if (!existsSync(src)) continue;
const lines = readFileSync(src, "utf8").split(/\r?\n/).filter((l) => l.trim());
if (lines.length < 2) { writeFileSync(dest, lines[0] + "\n"); continue; }
const idx = colIndex(lines[0]);
const col = idx.get(idCol) ?? -1;
const out = [lines[0]];
for (let i = 1; i < lines.length; i++) {
const fields = splitCsv(lines[i]);
if ((validIds as Set<string>).has(fields[col] ?? "")) out.push(lines[i]);
}
writeFileSync(dest, out.join("\n") + "\n");
}
// ── Step 6: shapes.txt (large — stream-filter) ─────────────────────────────
if (validShapeIds.size > 0) { if (validShapeIds.size > 0) {
await filterLargeCsv( await filterLargeCsv(
path.join(feedDir, "shapes.txt"), path.join(rawDir, "shapes.txt"),
path.join(destDir, "shapes.txt"),
(col, line) => validShapeIds.has(line.split(",")[col] ?? ""), (col, line) => validShapeIds.has(line.split(",")[col] ?? ""),
(idx) => idx.get("shape_id") ?? -1, (idx) => idx.get("shape_id") ?? -1,
); );
} }
console.log( console.log(
`[download-gtfs-de] GTFS filter complete: ` + `[download-gtfs-de] Filter complete: ${allTripStopIds.size} stops, ` +
`${allTripStopIds.size} stops, ${validTripIds.size} trips, ${validRouteIds.size} routes`, `${validTripIds.size} trips, ${validRouteIds.size} routes`,
); );
} }
// ─── Job handler ────────────────────────────────────────────────────────────── // ─── Job handler ──────────────────────────────────────────────────────────────
export async function handleDownloadGtfsDe(job: Job<DownloadGtfsDeData>): Promise<void> { export async function handleDownloadGtfsDe(job: Job<DownloadGtfsDeData>): Promise<void> {
const { url, force = false, bboxes } = job.data; const { url, citySlug, bbox, force = false } = job.data;
const effectiveSource = "gtfs-de"; const effectiveSource = url;
// ── Idempotency check ────────────────────────────────────────────────────── const destDir = cityFeedDir(citySlug);
// const cityMarker = readCityMarker(citySlug);
// Skip entirely when source is unchanged AND data is present AND the existing
// filter already covers all requested bboxes.
//
// Filter-only (no re-download) when data is present with the same source but
// the existing data is unfiltered (marker has no bboxes) while bboxes are now
// requested. The unfiltered data on disk is the superset we need.
//
// Re-download when source changes OR the existing filter bbox set no longer
// covers all requested bboxes (e.g. a new city was added outside the
// previously covered area).
const existingMarker = readSourceMarker(); // ── Check if per-city feed is already up to date ───────────────────────────
const sourceChanged = existingMarker?.source !== effectiveSource; const cityDataExists = existsSync(destDir) && readdirSync(destDir).some((f) => f.endsWith(".txt"));
if (!force && cityDataExists && cityMarker?.source === effectiveSource &&
cityMarker?.filterVersion === FILTER_VERSION && bboxEqual(cityMarker.bbox, bbox)) {
console.log(`[download-gtfs-de] Per-city feed for ${citySlug} is up to date, skipping`);
await job.updateProgress({ stage: "Downloading GTFS", pct: 100, message: "Feed up to date." } satisfies JobProgress);
return;
}
const dataExists = existsSync(GTFS_FEED_DIR) && // ── Ensure raw feed is present ─────────────────────────────────────────────
readdirSync(GTFS_FEED_DIR).some((f) => f.endsWith(".txt")); const rawMarker = readRawMarker();
const rawExists = existsSync(GTFS_RAW_DIR) && readdirSync(GTFS_RAW_DIR).some((f) => f.endsWith(".txt"));
if (!force && !sourceChanged && dataExists) { if (force || !rawExists || rawMarker?.source !== effectiveSource) {
const existingBboxes = existingMarker?.bboxes; await job.updateProgress({ stage: "Downloading GTFS", pct: 5, message: `Downloading GTFS feed…` } satisfies JobProgress);
const filterVersionOk = existingMarker?.filterVersion === FILTER_VERSION;
// Does the existing filtered data cover all requested bboxes? mkdirSync(GTFS_DATA_DIR, { recursive: true });
const bboxesCovered = !bboxes?.length || allBboxesCovered(existingBboxes, bboxes);
if (bboxesCovered) { const response = await fetch(url, { signal: AbortSignal.timeout(600_000) });
// Marker already reflects desired filtering? if (!response.ok || !response.body) {
const markerOk = !bboxes?.length || (existingBboxes && existingBboxes.length > 0); throw new Error(`Failed to download GTFS: HTTP ${response.status} ${response.statusText}`);
if (markerOk && filterVersionOk) {
console.log(`[download-gtfs-de] GTFS feed up to date (source=${effectiveSource}, filterVersion=${FILTER_VERSION}), skipping`);
await job.updateProgress({
stage: "Downloading GTFS",
pct: 100,
message: "GTFS data already present and up to date.",
} satisfies JobProgress);
return;
}
// Data needs re-filtering: either unfiltered (bboxes newly requested)
// or filter algorithm changed (filterVersion mismatch).
const reason = !filterVersionOk
? `filter algorithm updated (v${existingMarker?.filterVersion ?? "none"} → v${FILTER_VERSION})`
: "applying bbox filter to unfiltered data";
console.log(`[download-gtfs-de] Re-filtering existing GTFS data: ${reason}`);
await job.updateProgress({
stage: "Downloading GTFS",
pct: 10,
message: "Filtering existing GTFS feed to city areas…",
} satisfies JobProgress);
await filterGtfsByBboxes(GTFS_FEED_DIR, bboxes!);
writeSourceMarker(effectiveSource, bboxes);
await job.updateProgress({
stage: "Downloading GTFS",
pct: 100,
message: "GTFS feed filtered to city areas.",
} satisfies JobProgress);
return;
} }
// Existing filter too small — need fresh data from network. const totalBytes = Number(response.headers.get("content-length") ?? 0);
console.log(`[download-gtfs-de] Existing GTFS filter too small for new areas — re-downloading`); let downloadedBytes = 0;
} let lastReportedPct = 5;
if (sourceChanged) { const nodeReadable = Readable.fromWeb(response.body as Parameters<typeof Readable.fromWeb>[0]);
console.log( nodeReadable.on("data", (chunk: Buffer) => {
`[download-gtfs-de] Source changed ` + downloadedBytes += chunk.length;
`(${existingMarker?.source ?? "none"}${effectiveSource}), re-downloading`, if (totalBytes > 0) {
); const pct = Math.min(55, 5 + Math.round((downloadedBytes / totalBytes) * 50));
} if (pct > lastReportedPct + 4) {
lastReportedPct = pct;
mkdirSync(GTFS_DATA_DIR, { recursive: true }); void job.updateProgress({
stage: "Downloading GTFS", pct,
// ── Download ─────────────────────────────────────────────────────────────── message: `Downloading… ${(downloadedBytes / 1024 / 1024).toFixed(1)} / ${(totalBytes / 1024 / 1024).toFixed(1)} MB`,
await job.updateProgress({ bytesDownloaded: downloadedBytes, totalBytes,
stage: "Downloading GTFS", } satisfies JobProgress);
pct: 5, }
message: `Downloading GTFS feed (source: ${effectiveSource})…`,
} satisfies JobProgress);
const response = await fetch(url, { signal: AbortSignal.timeout(600_000) });
if (!response.ok || !response.body) {
throw new Error(`Failed to download GTFS: HTTP ${response.status} ${response.statusText}`);
}
const totalBytes = Number(response.headers.get("content-length") ?? 0);
let downloadedBytes = 0;
let lastReportedPct = 5;
const nodeReadable = Readable.fromWeb(response.body as Parameters<typeof Readable.fromWeb>[0]);
nodeReadable.on("data", (chunk: Buffer) => {
downloadedBytes += chunk.length;
if (totalBytes > 0) {
const pct = Math.min(55, 5 + Math.round((downloadedBytes / totalBytes) * 50));
if (pct > lastReportedPct + 4) {
lastReportedPct = pct;
void job.updateProgress({
stage: "Downloading GTFS",
pct,
message: `Downloading… ${(downloadedBytes / 1024 / 1024).toFixed(1)} / ${(totalBytes / 1024 / 1024).toFixed(1)} MB`,
bytesDownloaded: downloadedBytes,
totalBytes,
} satisfies JobProgress);
} }
});
await pipeline(nodeReadable, createWriteStream(GTFS_ZIP_PATH));
console.log(`[download-gtfs-de] Downloaded ${(downloadedBytes / 1024 / 1024).toFixed(1)} MB`);
await job.updateProgress({ stage: "Downloading GTFS", pct: 60, message: "Extracting GTFS feed…" } satisfies JobProgress);
if (existsSync(GTFS_RAW_DIR)) rmSync(GTFS_RAW_DIR, { recursive: true, force: true });
mkdirSync(GTFS_RAW_DIR, { recursive: true });
const zip = unzipper.Parse({ forceStream: true });
createReadStream(GTFS_ZIP_PATH).pipe(zip);
for await (const entry of zip) {
const e = entry as unzipper.Entry;
const destPath = path.join(GTFS_RAW_DIR, path.basename(e.path));
if (e.type === "Directory") { e.autodrain(); continue; }
await mkdir(path.dirname(destPath), { recursive: true });
await pipeline(e as unknown as NodeJS.ReadableStream, createWriteStream(destPath));
} }
}); rmSync(GTFS_ZIP_PATH, { force: true });
await pipeline(nodeReadable, createWriteStream(GTFS_ZIP_PATH)); const extractedFiles = readdirSync(GTFS_RAW_DIR);
console.log(`[download-gtfs-de] Downloaded ${(downloadedBytes / 1024 / 1024).toFixed(1)} MB`); console.log(`[download-gtfs-de] Extracted ${extractedFiles.length} files to ${GTFS_RAW_DIR}`);
writeFileSync(RAW_MARKER, JSON.stringify({ source: effectiveSource }));
// ── Extract ──────────────────────────────────────────────────────────────── } else {
await job.updateProgress({ console.log(`[download-gtfs-de] Raw feed already present (source=${effectiveSource})`);
stage: "Downloading GTFS", await job.updateProgress({ stage: "Downloading GTFS", pct: 60, message: "Using cached raw feed." } satisfies JobProgress);
pct: 60,
message: "Extracting GTFS feed…",
} satisfies JobProgress);
if (existsSync(GTFS_FEED_DIR)) rmSync(GTFS_FEED_DIR, { recursive: true, force: true });
mkdirSync(GTFS_FEED_DIR, { recursive: true });
const zip = unzipper.Parse({ forceStream: true });
createReadStream(GTFS_ZIP_PATH).pipe(zip);
for await (const entry of zip) {
const e = entry as unzipper.Entry;
const destPath = path.join(GTFS_FEED_DIR, path.basename(e.path));
if (e.type === "Directory") { e.autodrain(); continue; }
await mkdir(path.dirname(destPath), { recursive: true });
await pipeline(e as unknown as NodeJS.ReadableStream, createWriteStream(destPath));
} }
const extractedFiles = readdirSync(GTFS_FEED_DIR); // ── Filter raw feed for this city ──────────────────────────────────────────
console.log(`[download-gtfs-de] Extracted ${extractedFiles.length} files to ${GTFS_FEED_DIR}`); await job.updateProgress({ stage: "Downloading GTFS", pct: 65, message: `Filtering GTFS for ${citySlug}` } satisfies JobProgress);
rmSync(GTFS_ZIP_PATH, { force: true }); if (existsSync(destDir)) rmSync(destDir, { recursive: true, force: true });
mkdirSync(destDir, { recursive: true });
// ── Bbox filter ──────────────────────────────────────────────────────────── await filterGtfsForCity(GTFS_RAW_DIR, destDir, bbox);
if (bboxes && bboxes.length > 0) { writeCityMarker(citySlug, effectiveSource, bbox);
await job.updateProgress({
stage: "Downloading GTFS",
pct: 65,
message: `Filtering GTFS feed to ${bboxes.length} city area(s)…`,
} satisfies JobProgress);
await filterGtfsByBboxes(GTFS_FEED_DIR, bboxes);
}
writeSourceMarker(effectiveSource, bboxes?.length ? bboxes : undefined); await job.updateProgress({ stage: "Downloading GTFS", pct: 100, message: `GTFS ready for ${citySlug}.` } satisfies JobProgress);
await job.updateProgress({
stage: "Downloading GTFS",
pct: 100,
message: bboxes?.length
? `GTFS feed ready and filtered to ${bboxes.length} city area(s) (source: ${effectiveSource}).`
: `GTFS feed ready: ${extractedFiles.length} files (source: ${effectiveSource}).`,
} satisfies JobProgress);
} }

View file

@ -1,6 +1,7 @@
import type { Job } from "bullmq"; import type { Job } from "bullmq";
import { createWriteStream, mkdirSync, statSync, renameSync } from "fs"; import { createWriteStream, mkdirSync, statSync, renameSync } from "fs";
import { Writable } from "stream"; import { Writable } from "stream";
import { basename } from "path";
import type { JobProgress } from "@transportationer/shared"; import type { JobProgress } from "@transportationer/shared";
export type DownloadPbfData = { export type DownloadPbfData = {
@ -15,31 +16,39 @@ const ALLOWED_PATTERN =
const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm"; const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm";
/** PBFs older than this are re-downloaded. */
const MAX_AGE_MS = 24 * 3600 * 1000;
export function pbfPath(geofabrikUrl: string): string {
return `${OSM_DATA_DIR}/${basename(new URL(geofabrikUrl).pathname)}`;
}
export async function handleDownloadPbf( export async function handleDownloadPbf(
job: Job<DownloadPbfData>, job: Job<DownloadPbfData>,
): Promise<void> { ): Promise<void> {
const { citySlug, geofabrikUrl, expectedBytes } = job.data; const { geofabrikUrl, expectedBytes } = job.data;
if (!ALLOWED_PATTERN.test(geofabrikUrl)) { if (!ALLOWED_PATTERN.test(geofabrikUrl)) {
throw new Error(`Rejected URL (must be a Geofabrik PBF): ${geofabrikUrl}`); throw new Error(`Rejected URL (must be a Geofabrik PBF): ${geofabrikUrl}`);
} }
mkdirSync(OSM_DATA_DIR, { recursive: true }); mkdirSync(OSM_DATA_DIR, { recursive: true });
const outputPath = `${OSM_DATA_DIR}/${citySlug}-latest.osm.pbf`; const outputPath = pbfPath(geofabrikUrl);
// Use job.id in the tmp path so two concurrent download-pbf jobs for the // Use job.id in the tmp path so concurrent download-pbf jobs for the same
// same city (one under extract-pois, one under build-valhalla) don't write // URL don't write to the same temp file and corrupt each other.
// to the same file and corrupt each other.
const tmpPath = `${outputPath}.${job.id}.tmp`; const tmpPath = `${outputPath}.${job.id}.tmp`;
// Idempotency: skip if a complete file is already on disk (supports // Cache hit: skip if file is on disk and younger than MAX_AGE_MS.
// parallel download-pbf instances for the same city PBF). // Keyed by URL basename so multiple cities sharing the same regional PBF
// (e.g. all cities in Niedersachsen) only download it once.
try { try {
const stat = statSync(outputPath); const stat = statSync(outputPath);
if (stat.size > 0) { const ageMs = Date.now() - stat.mtimeMs;
if (stat.size > 0 && ageMs < MAX_AGE_MS) {
await job.updateProgress({ await job.updateProgress({
stage: "Downloading PBF", stage: "Downloading PBF",
pct: 100, pct: 100,
message: `Already on disk: ${outputPath} (${(stat.size / 1_048_576).toFixed(1)} MB)`, message: `Cached: ${outputPath} (${(stat.size / 1_048_576).toFixed(1)} MB, ${Math.round(ageMs / 3600_000)}h old)`,
} satisfies JobProgress); } satisfies JobProgress);
return; return;
} }

View file

@ -4,6 +4,7 @@ import { createBullMQConnection } from "../redis.js";
import { getSql } from "../db.js"; import { getSql } from "../db.js";
import { JOB_OPTIONS, VALID_THRESHOLDS } from "@transportationer/shared"; import { JOB_OPTIONS, VALID_THRESHOLDS } from "@transportationer/shared";
import type { JobProgress } from "@transportationer/shared"; import type { JobProgress } from "@transportationer/shared";
import { pbfPath } from "./download-pbf.js";
export type RefreshCityData = { export type RefreshCityData = {
type: "refresh-city"; type: "refresh-city";
@ -15,7 +16,6 @@ export type RefreshCityData = {
computeScoresJobId?: string; computeScoresJobId?: string;
}; };
const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm";
/** True when the given bbox intersects Niedersachsen. */ /** True when the given bbox intersects Niedersachsen. */
function isInNiedersachsen(minLng: number, minLat: number, maxLng: number, maxLat: number): boolean { function isInNiedersachsen(minLng: number, minLat: number, maxLng: number, maxLat: number): boolean {
@ -39,34 +39,22 @@ export async function handleRefreshCity(
const { citySlug, geofabrikUrl, resolutionM = 200, iter = 0 } = job.data; const { citySlug, geofabrikUrl, resolutionM = 200, iter = 0 } = job.data;
const sql = getSql(); const sql = getSql();
const pbfPath = `${OSM_DATA_DIR}/${citySlug}-latest.osm.pbf`; const localPbfPath = pbfPath(geofabrikUrl);
// Read the user-specified bbox from the database (set at city creation time). // Read the user-specified bbox from the database (set at city creation time).
// If present, it will be passed to extract-pois to clip the PBF before import. // If present, it will be passed to extract-pois to clip the PBF before import.
// Also read ALL city bboxes for the GTFS filter: each city gets its own bbox // Also read ALL city bboxes for the GTFS filter: each city gets its own bbox
// (with a small buffer) so valhalla_ingest_transit only processes relevant stops. // (with a small buffer) so valhalla_ingest_transit only processes relevant stops.
const [bboxRows, allCityBboxRows] = await Promise.all([ const bboxRows = await Promise.resolve(sql<{
Promise.resolve(sql<{ minlng: number; minlat: number; maxlng: number; maxlat: number;
minlng: number; minlat: number; maxlng: number; maxlat: number; }[]>`
}[]>` SELECT
SELECT ST_XMin(bbox)::float AS minlng,
ST_XMin(bbox)::float AS minlng, ST_YMin(bbox)::float AS minlat,
ST_YMin(bbox)::float AS minlat, ST_XMax(bbox)::float AS maxlng,
ST_XMax(bbox)::float AS maxlng, ST_YMax(bbox)::float AS maxlat
ST_YMax(bbox)::float AS maxlat FROM cities WHERE slug = ${citySlug} AND bbox IS NOT NULL
FROM cities WHERE slug = ${citySlug} AND bbox IS NOT NULL `);
`),
Promise.resolve(sql<{
minlng: number; minlat: number; maxlng: number; maxlat: number;
}[]>`
SELECT
ST_XMin(bbox)::float AS minlng,
ST_YMin(bbox)::float AS minlat,
ST_XMax(bbox)::float AS maxlng,
ST_YMax(bbox)::float AS maxlat
FROM cities WHERE bbox IS NOT NULL
`),
]);
const bbox: [number, number, number, number] | undefined = const bbox: [number, number, number, number] | undefined =
bboxRows.length > 0 bboxRows.length > 0
@ -75,12 +63,9 @@ export async function handleRefreshCity(
// ~10 km buffer for GTFS stop coverage near city edges (0.09° ≈ 10 km) // ~10 km buffer for GTFS stop coverage near city edges (0.09° ≈ 10 km)
const GTFS_BUFFER = 0.09; const GTFS_BUFFER = 0.09;
const gtfsBboxes: [number, number, number, number][] = allCityBboxRows.map((r) => [ const gtfsBbox: [number, number, number, number] | undefined = bbox
r.minlng - GTFS_BUFFER, ? [bbox[0] - GTFS_BUFFER, bbox[1] - GTFS_BUFFER, bbox[2] + GTFS_BUFFER, bbox[3] + GTFS_BUFFER]
r.minlat - GTFS_BUFFER, : undefined;
r.maxlng + GTFS_BUFFER,
r.maxlat + GTFS_BUFFER,
]);
await job.updateProgress({ await job.updateProgress({
stage: "Orchestrating pipeline", stage: "Orchestrating pipeline",
@ -150,7 +135,7 @@ export async function handleRefreshCity(
data: { data: {
type: "extract-pois" as const, type: "extract-pois" as const,
citySlug, citySlug,
pbfPath, pbfPath: localPbfPath,
...(bbox ? { bbox } : {}), ...(bbox ? { bbox } : {}),
}, },
opts: { ...JOB_OPTIONS["extract-pois"], jobId: `extract-pois.${citySlug}.${iter}` }, opts: { ...JOB_OPTIONS["extract-pois"], jobId: `extract-pois.${citySlug}.${iter}` },
@ -164,7 +149,7 @@ export async function handleRefreshCity(
data: { data: {
type: "build-valhalla" as const, type: "build-valhalla" as const,
citySlug, citySlug,
pbfPath, pbfPath: localPbfPath,
...(bbox ? { bbox } : {}), ...(bbox ? { bbox } : {}),
}, },
opts: { ...JOB_OPTIONS["build-valhalla"], jobId: `build-valhalla.${citySlug}.${iter}` }, opts: { ...JOB_OPTIONS["build-valhalla"], jobId: `build-valhalla.${citySlug}.${iter}` },
@ -178,26 +163,25 @@ export async function handleRefreshCity(
data: { data: {
type: "build-valhalla" as const, type: "build-valhalla" as const,
citySlug, citySlug,
pbfPath, pbfPath: localPbfPath,
...(bbox ? { bbox } : {}), ...(bbox ? { bbox } : {}),
}, },
opts: { ...JOB_OPTIONS["build-valhalla"], jobId: `build-valhalla-transit.${citySlug}.${iter}` }, opts: { ...JOB_OPTIONS["build-valhalla"], jobId: `build-valhalla-transit.${citySlug}.${iter}` },
children: [ children: [
downloadNode(), downloadNode(),
// Download GTFS feed before building transit tiles. Idempotent — // Download and filter GTFS feed for this city before building transit tiles.
// skips if the feed is current, so subsequent refreshes are cheap. // Raw feed is cached globally; only the per-city filter re-runs on bbox change.
{ ...(gtfsBbox ? [{
name: "download-gtfs-de", name: "download-gtfs-de",
queueName: "valhalla-transit", queueName: "valhalla-transit",
data: { data: {
type: "download-gtfs-de" as const, type: "download-gtfs-de" as const,
url: "https://download.gtfs.de/germany/nv_free/latest.zip", url: "https://download.gtfs.de/germany/nv_free/latest.zip",
// Per-city bboxes (with ~10 km buffer) so valhalla_ingest_transit citySlug,
// only processes stops/trips relevant to the known cities. bbox: gtfsBbox,
...(gtfsBboxes.length > 0 ? { bboxes: gtfsBboxes } : {}),
}, },
opts: JOB_OPTIONS["download-gtfs-de"], opts: { ...JOB_OPTIONS["download-gtfs-de"], jobId: `download-gtfs-de.${citySlug}.${iter}` },
}, }] : []),
], ],
}, },
], ],

View file

@ -0,0 +1,127 @@
/**
* Routing client for the main pipeline worker.
* Dispatches matrix/isochrone jobs to the valhalla routing queues and
* waits for the results. The main worker has zero Valhalla tile knowledge.
*/
import { Queue, QueueEvents } from "bullmq";
import { createBullMQConnection } from "./redis.js";
export interface LatLng {
lat: number;
lng: number;
}
export interface TransitContour {
minutes: number;
geojson: object;
}
// Contour times that must match valhalla.ts TRANSIT_CONTOUR_MINUTES.
const TRANSIT_CONTOUR_MINUTES = [5, 10, 15, 20, 30] as const;
// ─── Queue singletons ──────────────────────────────────────────────────────────
let roadQueue: Queue | null = null;
let roadQueueEvents: QueueEvents | null = null;
let transitQueue: Queue | null = null;
let transitQueueEvents: QueueEvents | null = null;
function getRoadQueue(): Queue {
if (!roadQueue) {
roadQueue = new Queue("routing", {
connection: createBullMQConnection(),
defaultJobOptions: { removeOnComplete: { age: 120 }, removeOnFail: { age: 300 } },
});
}
return roadQueue;
}
function getRoadQueueEvents(): QueueEvents {
if (!roadQueueEvents) {
roadQueueEvents = new QueueEvents("routing", { connection: createBullMQConnection() });
}
return roadQueueEvents;
}
function getTransitQueue(): Queue {
if (!transitQueue) {
transitQueue = new Queue("routing-transit", {
connection: createBullMQConnection(),
defaultJobOptions: { removeOnComplete: { age: 120 }, removeOnFail: { age: 300 } },
});
}
return transitQueue;
}
function getTransitQueueEvents(): QueueEvents {
if (!transitQueueEvents) {
transitQueueEvents = new QueueEvents("routing-transit", { connection: createBullMQConnection() });
}
return transitQueueEvents;
}
const MATRIX_TIMEOUT_MS = 90_000;
const ISOCHRONE_TIMEOUT_MS = 60_000;
/**
* Dispatch a sources_to_targets matrix job to the road routing queue and wait.
* Returns an M×N matrix of travel times in seconds (null = unreachable).
*/
export async function dispatchMatrix(
sources: LatLng[],
targets: LatLng[],
mode: "walking" | "cycling" | "driving",
citySlug: string,
): Promise<(number | null)[][]> {
const job = await getRoadQueue().add("matrix", {
type: "matrix",
sources,
targets,
mode,
citySlug,
});
const result = await job.waitUntilFinished(getRoadQueueEvents(), MATRIX_TIMEOUT_MS) as (number | null)[][];
return result;
}
/**
* Dispatch a transit isochrone job to the transit routing queue and wait.
* Returns parsed contour array or null if the point has no transit coverage.
*/
export async function dispatchTransitIsochrone(
gp: { lat: number; lng: number },
departureDate: string,
citySlug: string,
): Promise<TransitContour[] | null> {
const job = await getTransitQueue().add("isochrone", {
type: "isochrone",
lat: gp.lat,
lng: gp.lng,
travelMode: "transit",
contourMinutes: [...TRANSIT_CONTOUR_MINUTES],
citySlug,
departureDate,
});
let geojson: object | null;
try {
geojson = await job.waitUntilFinished(getTransitQueueEvents(), ISOCHRONE_TIMEOUT_MS) as object | null;
} catch {
return null;
}
if (!geojson) return null;
return parseTransitContours(geojson);
}
/** Parse a Valhalla isochrone FeatureCollection into TransitContour[]. */
export function parseTransitContours(geojson: object): TransitContour[] | null {
const fc = geojson as { features?: Array<{ properties: { contour: number }; geometry: object }> };
if (!Array.isArray(fc.features)) return null;
const contours: TransitContour[] = [];
for (const minutes of TRANSIT_CONTOUR_MINUTES) {
const feature = fc.features.find((f) => f.properties?.contour === minutes);
if (feature?.geometry) contours.push({ minutes, geojson: feature.geometry });
}
return contours.length >= 2 ? contours : null;
}

View file

@ -1,112 +1,100 @@
import { Worker, type Job } from "bullmq"; import { Worker, type Job } from "bullmq";
import { spawn, type ChildProcess } from "child_process";
import { existsSync } from "fs";
import { createBullMQConnection } from "./redis.js"; import { createBullMQConnection } from "./redis.js";
import { handleBuildValhalla } from "./jobs/build-valhalla.js"; import { handleBuildValhalla } from "./jobs/build-valhalla.js";
import { handleDownloadGtfsDe } from "./jobs/download-gtfs-de.js"; import { handleDownloadGtfsDe } from "./jobs/download-gtfs-de.js";
import { actorIsochrone, actorMatrix } from "./valhalla.js";
const VALHALLA_CONFIG = process.env.VALHALLA_CONFIG ?? "/data/valhalla/valhalla.json";
const VALHALLA_QUEUE_NAME = process.env.VALHALLA_QUEUE_NAME ?? "valhalla"; const VALHALLA_QUEUE_NAME = process.env.VALHALLA_QUEUE_NAME ?? "valhalla";
console.log(`[valhalla-worker] Starting Transportationer Valhalla worker (queue=${VALHALLA_QUEUE_NAME})…`); // Derive routing queue name from build queue name.
// valhalla → routing (road: walking/cycling/driving)
// valhalla-transit → routing-transit (multimodal)
const ROUTING_QUEUE_NAME =
VALHALLA_QUEUE_NAME === "valhalla-transit" ? "routing-transit" : "routing";
// ─── Valhalla service process manager ───────────────────────────────────────── console.log(`[valhalla-worker] Starting tile-builder+router (tile-queue=${VALHALLA_QUEUE_NAME}, routing-queue=${ROUTING_QUEUE_NAME})…`);
// The valhalla_service HTTP server runs as a child process alongside this
// BullMQ worker. When a build-valhalla job arrives, we stop the server, rebuild
// the routing tiles (using the Valhalla tools installed in this container),
// then restart the server.
let valhallaProc: ChildProcess | null = null; // ─── Tile-builder worker ───────────────────────────────────────────────────────
function startValhallaService(): void { const tileWorker = new Worker(
if (!existsSync(VALHALLA_CONFIG)) {
console.log("[valhalla-worker] No config yet — will start after first tile build");
return;
}
console.log("[valhalla-worker] Starting valhalla_service…");
// valhalla_service <config_file> [concurrency] — positional arg, not -c flag
valhallaProc = spawn("valhalla_service", [VALHALLA_CONFIG], {
stdio: "inherit",
});
valhallaProc.on("exit", (code, signal) => {
console.log(`[valhalla-worker] valhalla_service exited (code=${code}, signal=${signal})`);
valhallaProc = null;
});
}
function stopValhallaService(): Promise<void> {
return new Promise((resolve) => {
if (!valhallaProc) { resolve(); return; }
const proc = valhallaProc;
proc.once("exit", () => resolve());
proc.kill("SIGTERM");
// Force kill after 10 s if it doesn't exit cleanly
setTimeout(() => {
if (valhallaProc === proc) proc.kill("SIGKILL");
}, 10_000);
});
}
// ─── BullMQ worker ────────────────────────────────────────────────────────────
const worker = new Worker(
VALHALLA_QUEUE_NAME, VALHALLA_QUEUE_NAME,
async (job: Job) => { async (job: Job) => {
console.log(`[valhalla-worker] Processing job ${job.id} type=${job.data.type} city=${job.data.citySlug ?? "(rebuild)"}`); console.log(`[valhalla-worker] Tile job ${job.id} type=${job.data.type} city=${job.data.citySlug ?? "(rebuild)"}`);
// Valhalla keeps serving old tiles while the new tiles are being built.
// restartService is called from inside handleBuildValhalla only after the
// tile build completes — the service is only down for the few seconds it
// takes to restart, and compute-routing jobs retry transparently across that
// window via fetchMatrix's built-in retry logic.
async function restartService(): Promise<void> {
await stopValhallaService();
startValhallaService();
}
if (job.data.type === "download-gtfs-de") { if (job.data.type === "download-gtfs-de") {
await handleDownloadGtfsDe(job as any); await handleDownloadGtfsDe(job as any);
return; return;
} }
await handleBuildValhalla(job as any, restartService); await handleBuildValhalla(job as any);
}, },
{ {
connection: createBullMQConnection(), connection: createBullMQConnection(),
concurrency: 1, concurrency: 1,
lockDuration: 1_800_000, // 30 min — large-region tile builds can be very slow lockDuration: 1_800_000,
lockRenewTime: 60_000, lockRenewTime: 60_000,
maxStalledCount: 3, maxStalledCount: 3,
}, },
); );
worker.on("completed", (job) => { // ─── On-demand routing worker ──────────────────────────────────────────────────
console.log(`[valhalla-worker] ✓ Job ${job.id} (${job.data.type}) completed`); // Handles isochrone + matrix requests from the web app and pipeline workers.
}); // Uses the per-city Actor pool — no HTTP server involved.
worker.on("failed", (job, err) => { const routingWorker = new Worker(
console.error(`[valhalla-worker] ✗ Job ${job?.id} (${job?.data?.type}) failed:`, err.message); ROUTING_QUEUE_NAME,
}); async (job: Job) => {
const { type, citySlug } = job.data as { type: string; citySlug: string };
worker.on("active", (job) => { if (type === "isochrone") {
const { lng, lat, travelMode, contourMinutes, departureDate } = job.data as {
lng: number; lat: number; travelMode: string; contourMinutes: number[];
departureDate?: string | null;
};
return actorIsochrone({ lng, lat, travelMode, contourMinutes, citySlug, departureDate });
}
if (type === "matrix") {
const { sources, targets, mode } = job.data as {
sources: { lat: number; lng: number }[];
targets: { lat: number; lng: number }[];
mode: "walking" | "cycling" | "driving";
};
return actorMatrix(sources, targets, mode, citySlug);
}
throw new Error(`Unknown routing job type: ${type}`);
},
{
connection: createBullMQConnection(),
concurrency: 8,
lockDuration: 60_000,
lockRenewTime: 15_000,
maxStalledCount: 1,
},
);
// ─── Event handlers ────────────────────────────────────────────────────────────
tileWorker.on("completed", (job) =>
console.log(`[valhalla-worker] ✓ Tile job ${job.id} (${job.data.type}) completed`));
tileWorker.on("failed", (job, err) =>
console.error(`[valhalla-worker] ✗ Tile job ${job?.id} (${job?.data?.type}) failed:`, err.message));
tileWorker.on("active", (job) => {
const city = job.data.citySlug ?? job.data.removeSlugs?.join(",") ?? "rebuild"; const city = job.data.citySlug ?? job.data.removeSlugs?.join(",") ?? "rebuild";
console.log(`[valhalla-worker] → Job ${job.id} (${job.data.type}) started city=${city}`); console.log(`[valhalla-worker] → Tile job ${job.id} (${job.data.type}) city=${city}`);
}); });
tileWorker.on("error", (err) => console.error("[valhalla-worker] Tile worker error:", err.message));
worker.on("error", (err) => { routingWorker.on("failed", (job, err) =>
console.error("[valhalla-worker] Worker error:", err.message); console.warn(`[routing-worker] ✗ Job ${job?.id} (${job?.data?.type}) failed:`, err.message));
}); routingWorker.on("error", (err) => console.error("[routing-worker] Worker error:", err.message));
const shutdown = async () => { const shutdown = async () => {
console.log("[valhalla-worker] Shutting down gracefully…"); console.log("[valhalla-worker] Shutting down gracefully…");
await worker.close(); await Promise.all([tileWorker.close(), routingWorker.close()]);
await stopValhallaService();
process.exit(0); process.exit(0);
}; };
process.on("SIGTERM", shutdown); process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown); process.on("SIGINT", shutdown);
// Start serving if tiles already exist from a previous run console.log(`[valhalla-worker] Ready — tile-builds on '${VALHALLA_QUEUE_NAME}', routing on '${ROUTING_QUEUE_NAME}'`);
startValhallaService();
console.log(`[valhalla-worker] Ready — waiting for jobs on '${VALHALLA_QUEUE_NAME}' queue`);

View file

@ -1,81 +1,24 @@
const VALHALLA_URL = process.env.VALHALLA_URL ?? "http://localhost:8002"; /**
/** Transit instance (port 8003). Falls back to VALHALLA_URL if not set. */ * Valhalla Actor helpers used by valhalla-main.ts routing queue worker.
const VALHALLA_TRANSIT_URL = process.env.VALHALLA_TRANSIT_URL ?? VALHALLA_URL; * These functions run in the valhalla containers which have the tile volumes.
* The main pipeline worker never imports this file.
*/
import { nextTuesdayDeparture } from "@transportationer/shared";
import { getActor } from "./actor-pool.js";
const COSTING: Record<"walking" | "cycling" | "driving", string> = { const ROAD_COSTING: Record<"walking" | "cycling" | "driving", string> = {
walking: "pedestrian", walking: "pedestrian",
cycling: "bicycle", cycling: "bicycle",
driving: "auto", driving: "auto",
}; };
// Standard contour times used for transit isochrones.
// Must match the scoring thresholds used in compute-scores.
export const TRANSIT_CONTOUR_MINUTES = [5, 10, 15, 20, 30] as const; export const TRANSIT_CONTOUR_MINUTES = [5, 10, 15, 20, 30] as const;
import { nextTuesdayDeparture } from "@transportationer/shared";
export interface TransitContour { export interface TransitContour {
minutes: number; minutes: number;
/** GeoJSON Polygon or MultiPolygon geometry of the reachable area */
geojson: object; geojson: object;
} }
/**
* Fetch a transit isochrone for a point using Valhalla's multimodal costing.
* Returns an array of contour polygons sorted from smallest to largest,
* or null if transit routing fails (e.g. no GTFS data loaded in Valhalla).
*/
export async function fetchTransitIsochrone(
source: LatLng,
departureDate: string,
): Promise<TransitContour[] | null> {
const body = {
locations: [{ lat: source.lat, lon: source.lng }],
costing: "multimodal",
contours: TRANSIT_CONTOUR_MINUTES.map((t) => ({ time: t })),
polygons: true,
costing_options: {
transit: { use_bus: 1.0, use_rail: 1.0, use_transfers: 1.0 },
},
date_time: { type: 1, value: departureDate },
};
let resp: Response;
try {
resp = await fetch(`${VALHALLA_TRANSIT_URL}/isochrone`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
signal: AbortSignal.timeout(30_000),
});
} catch {
return null;
}
if (!resp.ok) return null;
let data: { features?: Array<{ properties: { contour: number }; geometry: object }>; error?: unknown; error_code?: unknown };
try {
data = await resp.json() as typeof data;
} catch {
return null;
}
if (data.error || data.error_code || !Array.isArray(data.features)) return null;
const contours: TransitContour[] = [];
for (const minutes of TRANSIT_CONTOUR_MINUTES) {
const feature = data.features.find((f) => f.properties?.contour === minutes);
if (feature?.geometry) contours.push({ minutes, geojson: feature.geometry });
}
return contours.length >= 2 ? contours : null;
}
/**
* Parse a cached Valhalla isochrone FeatureCollection back into TransitContour[].
* Mirrors the extraction logic in fetchTransitIsochrone.
*/
export function parseTransitContours(geojson: object): TransitContour[] | null { export function parseTransitContours(geojson: object): TransitContour[] | null {
const fc = geojson as { features?: Array<{ properties: { contour: number }; geometry: object }> }; const fc = geojson as { features?: Array<{ properties: { contour: number }; geometry: object }> };
if (!Array.isArray(fc.features)) return null; if (!Array.isArray(fc.features)) return null;
@ -101,76 +44,60 @@ interface MatrixResponse {
sources_to_targets: MatrixCell[][]; sources_to_targets: MatrixCell[][];
} }
const sleep = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));
/** Max attempts for retrying transient Valhalla failures (e.g. service restart). */
const MATRIX_MAX_ATTEMPTS = 8;
/** Exponential backoff: delay = min(BASE * 2^attempt, MAX) ms. */
const MATRIX_RETRY_BASE_MS = 1_000;
const MATRIX_RETRY_MAX_MS = 15_000;
/** Per-request timeout — prevents hanging indefinitely if the service is down. */
const MATRIX_TIMEOUT_MS = 60_000;
/** /**
* Call Valhalla's sources_to_targets matrix endpoint. * Run a sources_to_targets matrix via the city's road Actor.
* Returns an M×N matrix where [i][j] is travel time in seconds * Returns an M×N matrix of travel times in seconds (null = unreachable).
* from sources[i] to targets[j], or null if unreachable.
*
* Retries automatically on connection errors and 5xx responses to survive
* brief Valhalla service restarts (tile rebuilds). After MATRIX_MAX_ATTEMPTS
* the last error is rethrown.
*/ */
export async function fetchMatrix( export async function actorMatrix(
sources: LatLng[], sources: LatLng[],
targets: LatLng[], targets: LatLng[],
mode: "walking" | "cycling" | "driving", mode: "walking" | "cycling" | "driving",
citySlug: string,
): Promise<(number | null)[][]> { ): Promise<(number | null)[][]> {
const body = { const body = {
sources: sources.map(({ lat, lng }) => ({ lat, lon: lng, radius: 20 })), sources: sources.map(({ lat, lng }) => ({ lat, lon: lng, radius: 20 })),
targets: targets.map(({ lat, lng }) => ({ lat, lon: lng, radius: 20 })), targets: targets.map(({ lat, lng }) => ({ lat, lon: lng, radius: 20 })),
costing: COSTING[mode], costing: ROAD_COSTING[mode],
}; };
const bodyJson = JSON.stringify(body); const actor = await getActor("road", citySlug);
// Actor.matrix() is async and returns Promise<string>
let lastErr: unknown; const resultStr = await actor.matrix(JSON.stringify(body));
for (let attempt = 1; attempt <= MATRIX_MAX_ATTEMPTS; attempt++) { const data = JSON.parse(resultStr) as MatrixResponse;
try { return data.sources_to_targets.map((row) => row.map((cell) => cell.time ?? null));
const resp = await fetch(`${VALHALLA_URL}/sources_to_targets`, { }
method: "POST",
headers: { "Content-Type": "application/json" }, /**
body: bodyJson, * Compute an isochrone via the city's Actor (road or transit based on travelMode).
signal: AbortSignal.timeout(MATRIX_TIMEOUT_MS), * Returns a GeoJSON FeatureCollection.
}); */
export async function actorIsochrone(opts: {
if (!resp.ok) { lng: number;
// 5xx: service may be restarting — retry lat: number;
if (resp.status >= 500 && attempt < MATRIX_MAX_ATTEMPTS) { travelMode: string;
const delay = Math.min(MATRIX_RETRY_BASE_MS * 2 ** (attempt - 1), MATRIX_RETRY_MAX_MS); contourMinutes: number[];
await sleep(delay); citySlug: string;
continue; departureDate?: string | null;
} }): Promise<object> {
const text = await resp.text(); const { lng, lat, travelMode, contourMinutes, citySlug, departureDate } = opts;
throw new Error(`Valhalla matrix ${resp.status}: ${text.slice(0, 300)}`); const isTransit = travelMode === "transit";
} const costing = isTransit
? "multimodal"
const data = (await resp.json()) as MatrixResponse; : (ROAD_COSTING as Record<string, string>)[travelMode] ?? "pedestrian";
return data.sources_to_targets.map((row) => row.map((cell) => cell.time ?? null));
} catch (err) { const body: Record<string, unknown> = {
lastErr = err; locations: [{ lon: lng, lat }],
if (attempt >= MATRIX_MAX_ATTEMPTS) break; costing,
// TypeError from fetch = network-level failure (ECONNREFUSED, reset, timeout) contours: contourMinutes.map((time) => ({ time })),
// AbortError = our per-request timeout fired polygons: true,
// Both are transient during a service restart. show_locations: false,
const isTransient = };
err instanceof TypeError || if (isTransit) {
(err instanceof Error && (err.name === "AbortError" || err.name === "TimeoutError")); body.costing_options = { transit: { use_bus: 1.0, use_rail: 1.0, use_transfers: 1.0 } };
if (!isTransient) throw err; body.date_time = { type: 1, value: departureDate ?? nextTuesdayDeparture() };
const delay = Math.min(MATRIX_RETRY_BASE_MS * 2 ** (attempt - 1), MATRIX_RETRY_MAX_MS); }
console.warn(
`[valhalla] fetchMatrix attempt ${attempt}/${MATRIX_MAX_ATTEMPTS} failed (${(err as Error).message}) — retrying in ${delay / 1000}s…`, const actor = await getActor(isTransit ? "transit" : "road", citySlug);
); // Actor.isochrone() is async and returns Promise<string>
await sleep(delay); const resultStr = await actor.isochrone(JSON.stringify(body));
} return JSON.parse(resultStr);
}
throw lastErr;
} }