feat: add more gtfs sources, chage city addition UI

This commit is contained in:
Jan-Henrik 2026-03-08 00:55:49 +01:00
parent ebe5fec68f
commit 8e323a8ef2
6 changed files with 148 additions and 64 deletions

View file

@ -197,14 +197,32 @@ interface NominatimResult {
class: string;
place_rank: number;
geojson?: { type: string; coordinates: unknown };
address?: {
city?: string;
town?: string;
village?: string;
municipality?: string;
country_code?: string;
};
}
function toSlug(name: string): string {
return name
.toLowerCase()
.normalize("NFD")
.replace(/[\u0300-\u036f]/g, "")
.replace(/[^a-z0-9]+/g, "-")
.replace(/^-|-$/g, "");
}
function LocationSelector({
regionGeometry,
onBoundaryChange,
onResultSelect,
}: {
regionGeometry: GeofabrikFeature["geometry"];
onBoundaryChange: (boundary: { type: string; coordinates: unknown } | null) => void;
onResultSelect?: (result: { name: string; countryCode: string } | null) => void;
}) {
const [query, setQuery] = useState("");
const [results, setResults] = useState<NominatimResult[]>([]);
@ -217,10 +235,24 @@ function LocationSelector({
const onBoundaryChangeRef = useRef(onBoundaryChange);
onBoundaryChangeRef.current = onBoundaryChange;
const onResultSelectRef = useRef(onResultSelect);
onResultSelectRef.current = onResultSelect;
// Notify parent when selection changes.
useEffect(() => {
onBoundaryChangeRef.current(selected?.geojson ?? null);
if (selected) {
const name =
selected.address?.city ??
selected.address?.town ??
selected.address?.village ??
selected.address?.municipality ??
selected.display_name.split(",")[0].trim();
const countryCode = (selected.address?.country_code ?? "").toUpperCase();
onResultSelectRef.current?.({ name, countryCode });
} else {
onResultSelectRef.current?.(null);
}
}, [selected]);
// Debounced Nominatim search — request polygon_geojson + featuretype=settlement
@ -233,7 +265,7 @@ function LocationSelector({
setSearching(true);
try {
const res = await fetch(
`https://nominatim.openstreetmap.org/search?q=${encodeURIComponent(query)}&polygon_geojson=1&format=json&limit=8`,
`https://nominatim.openstreetmap.org/search?q=${encodeURIComponent(query)}&polygon_geojson=1&addressdetails=1&format=json&limit=8`,
{ headers: { "User-Agent": "Transportationer/1.0 (15-minute city analyzer)" } },
);
const data: NominatimResult[] = await res.json();
@ -392,7 +424,7 @@ function LocationSelector({
value={query}
onChange={(e) => {
setQuery(e.target.value);
if (!e.target.value) { setSelected(null); setResults([]); }
if (!e.target.value) { setSelected(null); setResults([]); onResultSelectRef.current?.(null); }
}}
onFocus={() => results.length > 0 && setShowDropdown(true)}
className="block w-full rounded-md border border-gray-300 px-3 py-2 text-sm focus:border-brand-500 focus:outline-none focus:ring-1 focus:ring-brand-500"
@ -442,7 +474,7 @@ function LocationSelector({
)}
{!selected && (
<p className="text-xs text-gray-400">
Search for a city to use its administrative boundary for stats coverage, or leave empty to use the entire dataset.
Search for a city to set its administrative boundary and auto-fill the fields below.
</p>
)}
</div>
@ -477,11 +509,34 @@ function ConfirmStep({
[],
);
const handleResultSelect = useCallback(
(result: { name: string; countryCode: string } | null) => {
if (!result) return;
setName(result.name);
setSlug(toSlug(result.name));
setCountryCode(result.countryCode);
},
[],
);
return (
<div className="card max-w-2xl">
<h2 className="text-lg font-semibold mb-4">Confirm City Details</h2>
<div className="space-y-4">
{/* City boundary selector — at the top so it can auto-fill the fields below */}
<div>
<label className="block text-sm font-medium text-gray-700 mb-2">
City Boundary{" "}
<span className="text-gray-400 font-normal">(optional auto-fills name, slug &amp; country)</span>
</label>
<LocationSelector
regionGeometry={region.geometry}
onBoundaryChange={handleBoundaryChange}
onResultSelect={handleResultSelect}
/>
</div>
<div className="grid grid-cols-2 gap-4">
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">
@ -520,23 +575,6 @@ function ConfirmStep({
/>
</div>
{/* City boundary selector */}
<div>
<label className="block text-sm font-medium text-gray-700 mb-2">
City Boundary{" "}
<span className="text-gray-400 font-normal">(optional for accurate coverage stats)</span>
</label>
<p className="text-xs text-gray-500 mb-3">
Search for the city municipality to use its administrative boundary for
coverage statistics. The bounding box is still used for OSM import, routing,
and grid generation.
</p>
<LocationSelector
regionGeometry={region.geometry}
onBoundaryChange={handleBoundaryChange}
/>
</div>
<div className="bg-gray-50 rounded-md p-3">
<p className="text-xs text-gray-500 font-medium mb-1">Source URL</p>
<p className="text-xs text-gray-700 break-all">

View file

@ -81,8 +81,8 @@ export interface IngestBorisHbJobData {
citySlug: string;
}
export interface DownloadGtfsDeJobData {
type: "download-gtfs-de";
export interface DownloadGtfsJobData {
type: "download-gtfs";
url: string;
/** Re-download even if data already exists */
force?: boolean;
@ -105,7 +105,7 @@ export type PipelineJobData =
| RefreshCityJobData
| IngestBorisNiJobData
| IngestBorisHbJobData
| DownloadGtfsDeJobData
| DownloadGtfsJobData
| ComputeTransitJobData;
// ─── Job options (BullMQ-compatible plain objects) ────────────────────────────
@ -116,7 +116,7 @@ export const JOB_OPTIONS: Record<PipelineJobData["type"], object> = {
removeOnComplete: { age: 86400 * 7 },
removeOnFail: { age: 86400 * 30 },
},
"download-gtfs-de": {
"download-gtfs": {
attempts: 2,
backoff: { type: "fixed", delay: 10000 },
removeOnComplete: { age: 86400 * 7 },

View file

@ -29,7 +29,7 @@ const GTFS_DATA_DIR = process.env.GTFS_DATA_DIR ?? `${VALHALLA_DATA_DIR}/gtfs`;
const TIMEZONE_SQLITE = `${VALHALLA_DATA_DIR}/timezone.sqlite`;
const ADMINS_SQLITE = `${VALHALLA_DATA_DIR}/admins.sqlite`;
/** Per-city GTFS feed dir written by download-gtfs-de. */
/** Per-city GTFS feed dir written by download-gtfs. */
function cityGtfsFeedDir(citySlug: string): string {
return `${GTFS_DATA_DIR}/${citySlug}/feed`;
}
@ -279,7 +279,7 @@ export async function handleBuildValhalla(job: Job<BuildValhallaData>): Promise<
// Stops outside road tile coverage cause valhalla_build_tiles to crash
// with "double free or corruption (fasttop)" when it tries to embed
// road↔transit connection edges for stops with no nearby road data.
// The bbox of seeding stops is written by download-gtfs-de per city.
// The bbox of seeding stops is written by download-gtfs per city.
if (INCLUDE_TRANSIT && citySlug) {
const stopsBboxPath = `${cityGtfsFeedDir(citySlug)}/.stops_bbox`;
if (existsSync(stopsBboxPath)) {

View file

@ -10,6 +10,7 @@
*
* Source: https://download.gtfs.de/germany/nv_free/latest.zip
*/
import { createHash } from "crypto";
import type { Job } from "bullmq";
import {
createReadStream,
@ -31,8 +32,8 @@ import * as path from "path";
import unzipper from "unzipper";
import type { JobProgress } from "@transportationer/shared";
export type DownloadGtfsDeData = {
type: "download-gtfs-de";
export type DownloadGtfsData = {
type: "download-gtfs";
url: string;
citySlug: string;
/** City bbox [minLng, minLat, maxLng, maxLat] already including buffer. */
@ -43,11 +44,37 @@ export type DownloadGtfsDeData = {
const GTFS_DATA_DIR = process.env.GTFS_DATA_DIR ?? "/data/valhalla/gtfs";
/** Global raw (unfiltered) feed — downloaded once, shared across all cities. */
const GTFS_RAW_DIR = `${GTFS_DATA_DIR}/raw`;
const GTFS_ZIP_PATH = `${GTFS_DATA_DIR}/feed.zip`;
/** Records which source URL populated GTFS_RAW_DIR. */
const RAW_MARKER = `${GTFS_RAW_DIR}/.source`;
/**
* Cache key derived from feed URL + current date (YYYY-MM-DD).
* Including the date ensures feeds are re-downloaded daily.
*/
function urlCacheKey(url: string): string {
const today = new Date().toISOString().slice(0, 10);
return createHash("sha256").update(`${url}:${today}`).digest("hex").slice(0, 8);
}
/** Remove prior-day raw-cache dirs for the same source URL. */
function pruneStaleRawDirs(url: string, currentKey: string): void {
const rawBase = `${GTFS_DATA_DIR}/raw`;
if (!existsSync(rawBase)) return;
for (const entry of readdirSync(rawBase)) {
if (entry === currentKey) continue;
const markerFile = path.join(rawBase, entry, ".source");
if (!existsSync(markerFile)) continue;
try {
const marker = JSON.parse(readFileSync(markerFile, "utf8")) as RawMarker;
if (marker.source === url) {
rmSync(path.join(rawBase, entry), { recursive: true, force: true });
console.log(`[download-gtfs] Pruned stale raw cache: ${entry}`);
}
} catch { /* ignore */ }
}
}
/** Per-URL raw (unfiltered) feed directory — one per distinct feed source. */
function rawDir(url: string): string { return `${GTFS_DATA_DIR}/raw/${urlCacheKey(url)}`; }
function zipPath(url: string): string { return `${GTFS_DATA_DIR}/${urlCacheKey(url)}.zip`; }
function rawMarkerPath(url: string): string { return `${rawDir(url)}/.source`; }
/**
* Bump this when the filtering algorithm changes in a way that produces
@ -59,9 +86,10 @@ const FILTER_VERSION = 2;
interface RawMarker { source: string }
interface CityMarker { source: string; bbox: [number, number, number, number]; filterVersion: number }
function readRawMarker(): RawMarker | null {
if (!existsSync(RAW_MARKER)) return null;
try { return JSON.parse(readFileSync(RAW_MARKER, "utf8")) as RawMarker; } catch { return null; }
function readRawMarker(url: string): RawMarker | null {
const p = rawMarkerPath(url);
if (!existsSync(p)) return null;
try { return JSON.parse(readFileSync(p, "utf8")) as RawMarker; } catch { return null; }
}
function cityFeedDir(citySlug: string): string { return `${GTFS_DATA_DIR}/${citySlug}/feed`; }
@ -170,11 +198,11 @@ async function filterGtfsForCity(
destDir: string,
bbox: [number, number, number, number],
): Promise<void> {
console.log(`[download-gtfs-de] Filtering GTFS to bbox [${bbox.map((v) => v.toFixed(3)).join(",")}] → ${destDir}`);
console.log(`[download-gtfs] Filtering GTFS to bbox [${bbox.map((v) => v.toFixed(3)).join(",")}] → ${destDir}`);
const stopsPath = path.join(rawDir, "stops.txt");
if (!existsSync(stopsPath)) {
console.log("[download-gtfs-de] No stops.txt in raw dir — skipping filter");
console.log("[download-gtfs] No stops.txt in raw dir — skipping filter");
return;
}
@ -202,17 +230,17 @@ async function filterGtfsForCity(
}
}
}
console.log(`[download-gtfs-de] ${bboxStopIds.size} stops in bbox`);
console.log(`[download-gtfs] ${bboxStopIds.size} stops in bbox`);
if (bboxStopIds.size === 0) {
console.warn("[download-gtfs-de] No stops found in bbox — GTFS filter skipped");
console.warn("[download-gtfs] No stops found in bbox — GTFS filter skipped");
return;
}
// ── Step 2a: collect trip_ids with ≥2 bbox stops ───────────────────────────
const stopTimesRaw = path.join(rawDir, "stop_times.txt");
if (!existsSync(stopTimesRaw)) {
console.log("[download-gtfs-de] No stop_times.txt — skipping");
console.log("[download-gtfs] No stop_times.txt — skipping");
return;
}
@ -242,7 +270,7 @@ async function filterGtfsForCity(
for (const tripId of validTripIds) {
if ((tripBboxStopCount.get(tripId) ?? 0) < 2) validTripIds.delete(tripId);
}
console.log(`[download-gtfs-de] ${validTripIds.size} trips with ≥2 bbox stops`);
console.log(`[download-gtfs] ${validTripIds.size} trips with ≥2 bbox stops`);
// ── Step 2b: write filtered stop_times (bbox stops on valid trips only) ─────
const allTripStopIds = new Set<string>();
@ -307,7 +335,7 @@ async function filterGtfsForCity(
if (isFinite(seedMinLat)) {
const stopsBbox: [number, number, number, number] = [seedMinLng, seedMinLat, seedMaxLng, seedMaxLat];
writeFileSync(path.join(destDir, ".stops_bbox"), JSON.stringify(stopsBbox));
console.log(`[download-gtfs-de] Transit stops bbox: [${stopsBbox.map((v) => v.toFixed(3)).join(", ")}]`);
console.log(`[download-gtfs] Transit stops bbox: [${stopsBbox.map((v) => v.toFixed(3)).join(", ")}]`);
}
// ── Step 4: filter trips.txt → collect route/service/shape IDs ─────────────
@ -373,14 +401,14 @@ async function filterGtfsForCity(
}
console.log(
`[download-gtfs-de] Filter complete: ${allTripStopIds.size} stops, ` +
`[download-gtfs] Filter complete: ${allTripStopIds.size} stops, ` +
`${validTripIds.size} trips, ${validRouteIds.size} routes`,
);
}
// ─── Job handler ──────────────────────────────────────────────────────────────
export async function handleDownloadGtfsDe(job: Job<DownloadGtfsDeData>): Promise<void> {
export async function handleDownloadGtfs(job: Job<DownloadGtfsData>): Promise<void> {
const { url, citySlug, bbox, force = false } = job.data;
const effectiveSource = url;
@ -391,13 +419,15 @@ export async function handleDownloadGtfsDe(job: Job<DownloadGtfsDeData>): Promis
const cityDataExists = existsSync(destDir) && readdirSync(destDir).some((f) => f.endsWith(".txt"));
if (!force && cityDataExists && cityMarker?.source === effectiveSource &&
cityMarker?.filterVersion === FILTER_VERSION && bboxEqual(cityMarker.bbox, bbox)) {
console.log(`[download-gtfs-de] Per-city feed for ${citySlug} is up to date, skipping`);
console.log(`[download-gtfs] Per-city feed for ${citySlug} is up to date, skipping`);
await job.updateProgress({ stage: "Downloading GTFS", pct: 100, message: "Feed up to date." } satisfies JobProgress);
return;
}
// ── Ensure raw feed is present ─────────────────────────────────────────────
const rawMarker = readRawMarker();
const GTFS_RAW_DIR = rawDir(url);
const GTFS_ZIP_PATH = zipPath(url);
const rawMarker = readRawMarker(url);
const rawExists = existsSync(GTFS_RAW_DIR) && readdirSync(GTFS_RAW_DIR).some((f) => f.endsWith(".txt"));
if (force || !rawExists || rawMarker?.source !== effectiveSource) {
@ -431,7 +461,7 @@ export async function handleDownloadGtfsDe(job: Job<DownloadGtfsDeData>): Promis
});
await pipeline(nodeReadable, createWriteStream(GTFS_ZIP_PATH));
console.log(`[download-gtfs-de] Downloaded ${(downloadedBytes / 1024 / 1024).toFixed(1)} MB`);
console.log(`[download-gtfs] Downloaded ${(downloadedBytes / 1024 / 1024).toFixed(1)} MB`);
await job.updateProgress({ stage: "Downloading GTFS", pct: 60, message: "Extracting GTFS feed…" } satisfies JobProgress);
@ -450,10 +480,11 @@ export async function handleDownloadGtfsDe(job: Job<DownloadGtfsDeData>): Promis
rmSync(GTFS_ZIP_PATH, { force: true });
const extractedFiles = readdirSync(GTFS_RAW_DIR);
console.log(`[download-gtfs-de] Extracted ${extractedFiles.length} files to ${GTFS_RAW_DIR}`);
writeFileSync(RAW_MARKER, JSON.stringify({ source: effectiveSource }));
console.log(`[download-gtfs] Extracted ${extractedFiles.length} files to ${GTFS_RAW_DIR}`);
writeFileSync(rawMarkerPath(url), JSON.stringify({ source: effectiveSource }));
pruneStaleRawDirs(url, urlCacheKey(url));
} else {
console.log(`[download-gtfs-de] Raw feed already present (source=${effectiveSource})`);
console.log(`[download-gtfs] Raw feed already present (source=${effectiveSource})`);
await job.updateProgress({ stage: "Downloading GTFS", pct: 60, message: "Using cached raw feed." } satisfies JobProgress);
}

View file

@ -17,6 +17,19 @@ export type RefreshCityData = {
};
// ─── GTFS feed selection ──────────────────────────────────────────────────────
const GTFS_FEEDS: Record<string, string> = {
DE: "https://download.gtfs.de/germany/nv_free/latest.zip",
NL: "http://gtfs.openov.nl/gtfs-rt/gtfs-openov-nl.zip",
DK: "https://www.rejseplanen.info/labs/GTFS.zip",
};
/** Pick the national GTFS feed by country code. Returns undefined if no feed is known. */
function gtfsFeedUrl(countryCode: string): string | undefined {
return GTFS_FEEDS[countryCode.toUpperCase()];
}
/** True when the given bbox intersects Niedersachsen. */
function isInNiedersachsen(minLng: number, minLat: number, maxLng: number, maxLat: number): boolean {
return minLng < 11.779 && maxLng > 6.526 && minLat < 54.033 && maxLat > 51.197;
@ -46,13 +59,14 @@ export async function handleRefreshCity(
// Also read ALL city bboxes for the GTFS filter: each city gets its own bbox
// (with a small buffer) so valhalla_ingest_transit only processes relevant stops.
const bboxRows = await Promise.resolve(sql<{
minlng: number; minlat: number; maxlng: number; maxlat: number;
minlng: number; minlat: number; maxlng: number; maxlat: number; country_code: string;
}[]>`
SELECT
ST_XMin(bbox)::float AS minlng,
ST_YMin(bbox)::float AS minlat,
ST_XMax(bbox)::float AS maxlng,
ST_YMax(bbox)::float AS maxlat
ST_YMax(bbox)::float AS maxlat,
country_code
FROM cities WHERE slug = ${citySlug} AND bbox IS NOT NULL
`);
@ -60,6 +74,7 @@ export async function handleRefreshCity(
bboxRows.length > 0
? [bboxRows[0].minlng, bboxRows[0].minlat, bboxRows[0].maxlng, bboxRows[0].maxlat]
: undefined;
const countryCode: string = bboxRows[0]?.country_code ?? "";
// ~10 km buffer for GTFS stop coverage near city edges (0.09° ≈ 10 km)
const GTFS_BUFFER = 0.09;
@ -102,7 +117,7 @@ export async function handleRefreshCity(
// │
// download-pbf ──┐ │
// └─→ build-valhalla (transit, "valhalla-transit")┘
// download-gtfs-de ──┘
// download-gtfs ──┘
//
// Road tiles are built without GTFS (clean, no transit connections → cycling works).
// Transit tiles are built with GTFS (multimodal routing on the separate instance).
@ -170,17 +185,17 @@ export async function handleRefreshCity(
children: [
downloadNode(),
// Download and filter GTFS feed for this city before building transit tiles.
// Raw feed is cached globally; only the per-city filter re-runs on bbox change.
...(gtfsBbox ? [{
name: "download-gtfs-de",
// Feed URL is selected by country code; raw feed is cached per URL.
...(gtfsBbox && gtfsFeedUrl(countryCode) ? [{
name: "download-gtfs",
queueName: "valhalla-transit",
data: {
type: "download-gtfs-de" as const,
url: "https://download.gtfs.de/germany/nv_free/latest.zip",
type: "download-gtfs" as const,
url: gtfsFeedUrl(countryCode)!,
citySlug,
bbox: gtfsBbox,
},
opts: { ...JOB_OPTIONS["download-gtfs-de"], jobId: `download-gtfs-de.${citySlug}.${iter}` },
opts: { ...JOB_OPTIONS["download-gtfs"], jobId: `download-gtfs.${citySlug}.${iter}` },
}] : []),
],
},
@ -208,6 +223,6 @@ export async function handleRefreshCity(
await job.updateProgress({
stage: "Orchestrating pipeline",
pct: 100,
message: `All pipeline jobs enqueued${niApplicable ? " (includes BORIS NI, parallel with routing)" : ""}. Processing will begin shortly.`,
message: `All pipeline jobs enqueued${niApplicable ? " (includes BORIS NI, parallel with routing)" : ""}${countryCode ? ` [GTFS: ${gtfsFeedUrl(countryCode) ?? "none"}]` : ""}. Processing will begin shortly.`,
} satisfies JobProgress);
}

View file

@ -1,7 +1,7 @@
import { Worker, type Job } from "bullmq";
import { createBullMQConnection } from "./redis.js";
import { handleBuildValhalla } from "./jobs/build-valhalla.js";
import { handleDownloadGtfsDe } from "./jobs/download-gtfs-de.js";
import { handleDownloadGtfs } from "./jobs/download-gtfs.js";
import { actorIsochrone, actorMatrix } from "./valhalla.js";
const VALHALLA_QUEUE_NAME = process.env.VALHALLA_QUEUE_NAME ?? "valhalla";
@ -20,8 +20,8 @@ const tileWorker = new Worker(
VALHALLA_QUEUE_NAME,
async (job: Job) => {
console.log(`[valhalla-worker] Tile job ${job.id} type=${job.data.type} city=${job.data.citySlug ?? "(rebuild)"}`);
if (job.data.type === "download-gtfs-de") {
await handleDownloadGtfsDe(job as any);
if (job.data.type === "download-gtfs") {
await handleDownloadGtfs(job as any);
return;
}
await handleBuildValhalla(job as any);