fifteen/worker/src/jobs/download-gtfs.ts

529 lines
21 KiB
TypeScript

/**
* Download and filter a GTFS feed per city so Valhalla can build transit tiles.
*
* The raw (unfiltered) feed is downloaded once and cached in GTFS_DATA_DIR/raw/.
* Subsequent calls for other cities re-use the raw cache without re-downloading.
*
* A per-city filtered feed is written to GTFS_DATA_DIR/{citySlug}/feed/ and
* clipped to the city's bounding box. This directory is the transit_feeds_dir
* for that city's valhalla_ingest_transit + valhalla_build_tiles run.
*
* Source: https://download.gtfs.de/germany/nv_free/latest.zip
*/
import { createHash } from "crypto";
import type { Job } from "bullmq";
import {
createReadStream,
createWriteStream,
existsSync,
mkdirSync,
readdirSync,
renameSync,
rmSync,
readFileSync,
writeFileSync,
copyFileSync,
} from "fs";
import { mkdir } from "fs/promises";
import { pipeline } from "stream/promises";
import { Readable } from "stream";
import { createInterface } from "readline";
import * as path from "path";
import unzipper from "unzipper";
import type { JobProgress } from "@transportationer/shared";
export type DownloadGtfsData = {
type: "download-gtfs";
url: string;
citySlug: string;
/** City bbox [minLng, minLat, maxLng, maxLat] already including buffer. */
bbox: [number, number, number, number];
/** Re-download even if raw data already exists (default: false) */
force?: boolean;
};
const GTFS_DATA_DIR = process.env.GTFS_DATA_DIR ?? "/data/valhalla/gtfs";
/**
* Cache key derived from feed URL + current date (YYYY-MM-DD).
* Including the date ensures feeds are re-downloaded daily.
*/
function urlCacheKey(url: string): string {
const today = new Date().toISOString().slice(0, 10);
return createHash("sha256").update(`${url}:${today}`).digest("hex").slice(0, 8);
}
/** Remove prior-day raw-cache dirs for the same source URL. */
function pruneStaleRawDirs(url: string, currentKey: string): void {
const rawBase = `${GTFS_DATA_DIR}/raw`;
if (!existsSync(rawBase)) return;
for (const entry of readdirSync(rawBase)) {
if (entry === currentKey) continue;
const markerFile = path.join(rawBase, entry, ".source");
if (!existsSync(markerFile)) continue;
try {
const marker = JSON.parse(readFileSync(markerFile, "utf8")) as RawMarker;
if (marker.source === url) {
rmSync(path.join(rawBase, entry), { recursive: true, force: true });
console.log(`[download-gtfs] Pruned stale raw cache: ${entry}`);
}
} catch { /* ignore */ }
}
}
/** Per-URL raw (unfiltered) feed directory — one per distinct feed source. */
function rawDir(url: string): string { return `${GTFS_DATA_DIR}/raw/${urlCacheKey(url)}`; }
function zipPath(url: string): string { return `${GTFS_DATA_DIR}/${urlCacheKey(url)}.zip`; }
function rawMarkerPath(url: string): string { return `${rawDir(url)}/.source`; }
/**
* Bump this when the filtering algorithm changes in a way that produces
* different output from the same source + bbox. Forces a re-filter on the
* existing raw data without re-downloading.
*/
const FILTER_VERSION = 3;
interface RawMarker { source: string }
interface CityMarker { source: string; bbox: [number, number, number, number]; filterVersion: number }
function readRawMarker(url: string): RawMarker | null {
const p = rawMarkerPath(url);
if (!existsSync(p)) return null;
try { return JSON.parse(readFileSync(p, "utf8")) as RawMarker; } catch { return null; }
}
function cityFeedDir(citySlug: string): string { return `${GTFS_DATA_DIR}/${citySlug}/feed`; }
function cityMarkerPath(citySlug: string): string { return `${cityFeedDir(citySlug)}/.source`; }
function readCityMarker(citySlug: string): CityMarker | null {
const p = cityMarkerPath(citySlug);
if (!existsSync(p)) return null;
try { return JSON.parse(readFileSync(p, "utf8")) as CityMarker; } catch { return null; }
}
function writeCityMarker(citySlug: string, source: string, bbox: [number, number, number, number]): void {
writeFileSync(cityMarkerPath(citySlug), JSON.stringify({ source, bbox, filterVersion: FILTER_VERSION }));
}
function bboxEqual(a: [number,number,number,number], b: [number,number,number,number]): boolean {
return a[0] === b[0] && a[1] === b[1] && a[2] === b[2] && a[3] === b[3];
}
// ─── GTFS bbox filter ─────────────────────────────────────────────────────────
function splitCsv(line: string): string[] {
if (!line.includes('"')) return line.split(",");
const result: string[] = [];
let current = "";
let inQuotes = false;
for (let i = 0; i < line.length; i++) {
const ch = line[i];
if (ch === '"') {
if (inQuotes && line[i + 1] === '"') { current += '"'; i++; }
else inQuotes = !inQuotes;
} else if (ch === "," && !inQuotes) {
result.push(current); current = "";
} else {
current += ch;
}
}
result.push(current);
return result;
}
function colIndex(header: string): Map<string, number> {
return new Map(splitCsv(header).map((c, i) => [c.trim().replace(/^\uFEFF/, ""), i]));
}
function inBbox(lat: number, lon: number, bbox: [number,number,number,number]): boolean {
const [minLng, minLat, maxLng, maxLat] = bbox;
return lat >= minLat && lat <= maxLat && lon >= minLng && lon <= maxLng;
}
function filterSmallCsv(
filePath: string,
keepRow: (idx: Map<string, number>, fields: string[]) => boolean,
onKept?: (idx: Map<string, number>, fields: string[]) => void,
): void {
if (!existsSync(filePath)) return;
const lines = readFileSync(filePath, "utf8").split(/\r?\n/).filter((l) => l.trim());
if (lines.length < 2) return;
const idx = colIndex(lines[0]);
const out = [lines[0]];
for (let i = 1; i < lines.length; i++) {
const fields = splitCsv(lines[i]);
if (keepRow(idx, fields)) {
if (onKept) onKept(idx, fields);
out.push(lines[i]);
}
}
writeFileSync(filePath, out.join("\n") + "\n");
}
async function filterLargeCsv(
srcPath: string,
destPath: string,
keepRow: (targetCol: number, line: string) => boolean,
getTargetCol: (idx: Map<string, number>) => number,
): Promise<void> {
if (!existsSync(srcPath)) return;
const tmpPath = destPath + ".tmp";
const writer = createWriteStream(tmpPath);
let isFirst = true;
let targetCol = -1;
const rl = createInterface({ input: createReadStream(srcPath), crlfDelay: Infinity });
for await (const line of rl) {
if (!line.trim()) continue;
if (isFirst) {
isFirst = false;
targetCol = getTargetCol(colIndex(line));
writer.write(line + "\n");
continue;
}
if (keepRow(targetCol, line)) writer.write(line + "\n");
}
await new Promise<void>((resolve, reject) =>
writer.end((err?: unknown) => (err ? reject(err) : resolve())),
);
renameSync(tmpPath, destPath);
}
/**
* Filter raw GTFS feed to a single city bbox.
* Reads from rawDir, writes to destDir.
* Also writes .stops_bbox (tight bbox of retained stops) for build-valhalla.
*/
async function filterGtfsForCity(
rawDir: string,
destDir: string,
bbox: [number, number, number, number],
): Promise<void> {
console.log(`[download-gtfs] Filtering GTFS to bbox [${bbox.map((v) => v.toFixed(3)).join(",")}] → ${destDir}`);
const stopsPath = path.join(rawDir, "stops.txt");
if (!existsSync(stopsPath)) {
console.log("[download-gtfs] No stops.txt in raw dir — skipping filter");
return;
}
// ── Step 1: collect bbox stop IDs ──────────────────────────────────────────
const bboxStopIds = new Set<string>();
let seedMinLng = Infinity, seedMinLat = Infinity, seedMaxLng = -Infinity, seedMaxLat = -Infinity;
{
const lines = readFileSync(stopsPath, "utf8").split(/\r?\n/).filter((l) => l.trim());
if (lines.length >= 2) {
const idx = colIndex(lines[0]);
const stopIdCol = idx.get("stop_id") ?? -1;
const latCol = idx.get("stop_lat") ?? -1;
const lonCol = idx.get("stop_lon") ?? -1;
for (let i = 1; i < lines.length; i++) {
const fields = splitCsv(lines[i]);
const lat = parseFloat(fields[latCol] ?? "NaN");
const lon = parseFloat(fields[lonCol] ?? "NaN");
if (inBbox(lat, lon, bbox)) {
bboxStopIds.add(fields[stopIdCol] ?? "");
if (isFinite(lat) && isFinite(lon)) {
seedMinLat = Math.min(seedMinLat, lat); seedMaxLat = Math.max(seedMaxLat, lat);
seedMinLng = Math.min(seedMinLng, lon); seedMaxLng = Math.max(seedMaxLng, lon);
}
}
}
}
}
console.log(`[download-gtfs] ${bboxStopIds.size} stops in bbox`);
if (bboxStopIds.size === 0) {
console.warn("[download-gtfs] No stops found in bbox — GTFS filter skipped");
return;
}
// ── Step 2a: collect trip_ids with ≥2 bbox stops ───────────────────────────
const stopTimesRaw = path.join(rawDir, "stop_times.txt");
if (!existsSync(stopTimesRaw)) {
console.log("[download-gtfs] No stop_times.txt — skipping");
return;
}
const validTripIds = new Set<string>();
const tripBboxStopCount = new Map<string, number>();
{
let stopIdCol = -1, tripIdCol = -1, isFirst = true;
const rl = createInterface({ input: createReadStream(stopTimesRaw), crlfDelay: Infinity });
for await (const line of rl) {
if (!line.trim()) continue;
if (isFirst) {
isFirst = false;
const idx = colIndex(line);
stopIdCol = idx.get("stop_id") ?? -1;
tripIdCol = idx.get("trip_id") ?? -1;
continue;
}
const fields = line.split(",");
const tripId = fields[tripIdCol] ?? "";
const stopId = fields[stopIdCol] ?? "";
if (bboxStopIds.has(stopId)) {
validTripIds.add(tripId);
tripBboxStopCount.set(tripId, (tripBboxStopCount.get(tripId) ?? 0) + 1);
}
}
}
for (const tripId of validTripIds) {
if ((tripBboxStopCount.get(tripId) ?? 0) < 2) validTripIds.delete(tripId);
}
console.log(`[download-gtfs] ${validTripIds.size} trips with ≥2 bbox stops`);
// ── Step 2b: write filtered stop_times (bbox stops on valid trips only) ─────
const allTripStopIds = new Set<string>();
await filterLargeCsv(
stopTimesRaw,
path.join(destDir, "stop_times.txt"),
(tripCol, line) => {
const fields = line.split(",");
const tripId = fields[tripCol] ?? "";
const stopId = fields[tripCol + 1] ?? ""; // wrong index — handled below
return validTripIds.has(tripId);
},
(idx) => idx.get("trip_id") ?? -1,
);
// Re-read filtered stop_times to collect actual stop IDs and also filter to bbox stops only
{
const tmpPath = path.join(destDir, "stop_times.txt") + ".tmp2";
const writer = createWriteStream(tmpPath);
let isFirst = true;
let tripIdCol = -1, stopIdCol = -1;
const rl = createInterface({ input: createReadStream(path.join(destDir, "stop_times.txt")), crlfDelay: Infinity });
for await (const line of rl) {
if (!line.trim()) continue;
if (isFirst) {
isFirst = false;
const idx = colIndex(line);
tripIdCol = idx.get("trip_id") ?? -1;
stopIdCol = idx.get("stop_id") ?? -1;
writer.write(line + "\n");
continue;
}
const fields = line.split(",");
const tripId = fields[tripIdCol] ?? "";
const stopId = fields[stopIdCol] ?? "";
if (validTripIds.has(tripId) && bboxStopIds.has(stopId)) {
allTripStopIds.add(stopId);
writer.write(line + "\n");
}
}
await new Promise<void>((resolve, reject) =>
writer.end((err?: unknown) => (err ? reject(err) : resolve())),
);
renameSync(tmpPath, path.join(destDir, "stop_times.txt"));
}
// ── Step 3: filter stops.txt (only stops that appear in final stop_times) ───
{
const dest = path.join(destDir, "stops.txt");
const lines = readFileSync(stopsPath, "utf8").split(/\r?\n/).filter((l) => l.trim());
if (lines.length >= 2) {
const idx = colIndex(lines[0]);
const stopIdCol = idx.get("stop_id") ?? -1;
const out = [lines[0]];
for (let i = 1; i < lines.length; i++) {
const fields = splitCsv(lines[i]);
if (allTripStopIds.has(fields[stopIdCol] ?? "")) out.push(lines[i]);
}
writeFileSync(dest, out.join("\n") + "\n");
}
}
if (isFinite(seedMinLat)) {
const stopsBbox: [number, number, number, number] = [seedMinLng, seedMinLat, seedMaxLng, seedMaxLat];
writeFileSync(path.join(destDir, ".stops_bbox"), JSON.stringify(stopsBbox));
console.log(`[download-gtfs] Transit stops bbox: [${stopsBbox.map((v) => v.toFixed(3)).join(", ")}]`);
}
// ── Step 4: filter trips.txt → collect route/service/shape IDs ─────────────
const validRouteIds = new Set<string>();
const validServiceIds = new Set<string>();
const validShapeIds = new Set<string>();
{
const src = path.join(rawDir, "trips.txt");
const dest = path.join(destDir, "trips.txt");
if (existsSync(src)) {
const lines = readFileSync(src, "utf8").split(/\r?\n/).filter((l) => l.trim());
if (lines.length >= 2) {
const idx = colIndex(lines[0]);
const tripIdCol = idx.get("trip_id") ?? -1;
const routeIdCol = idx.get("route_id") ?? -1;
const serviceIdCol = idx.get("service_id") ?? -1;
const shapeIdCol = idx.get("shape_id") ?? -1;
const out = [lines[0]];
for (let i = 1; i < lines.length; i++) {
const fields = splitCsv(lines[i]);
if (validTripIds.has(fields[tripIdCol] ?? "")) {
out.push(lines[i]);
validRouteIds.add(fields[routeIdCol] ?? "");
validServiceIds.add(fields[serviceIdCol] ?? "");
const shapeId = fields[shapeIdCol] ?? "";
if (shapeId) validShapeIds.add(shapeId);
}
}
writeFileSync(dest, out.join("\n") + "\n");
}
}
}
// ── Step 5: filter routes, calendar, calendar_dates; copy agency/feed_info ──
// Collect agency IDs from the filtered routes so we can filter agency.txt.
const validAgencyIds = new Set<string>();
{
const src = path.join(rawDir, "routes.txt");
if (existsSync(src)) {
const lines = readFileSync(src, "utf8").split(/\r?\n/).filter((l) => l.trim());
if (lines.length >= 2) {
const idx = colIndex(lines[0]);
const routeIdCol = idx.get("route_id") ?? -1;
const agencyIdCol = idx.get("agency_id") ?? -1;
for (let i = 1; i < lines.length; i++) {
const fields = splitCsv(lines[i]);
if (validRouteIds.has(fields[routeIdCol] ?? "")) {
const aid = fields[agencyIdCol] ?? "";
if (aid) validAgencyIds.add(aid);
}
}
}
}
}
for (const [file, idCol, validIds] of [
["agency.txt", "agency_id", validAgencyIds],
["routes.txt", "route_id", validRouteIds],
["calendar.txt", "service_id", validServiceIds],
["calendar_dates.txt", "service_id", validServiceIds],
] as const) {
const src = path.join(rawDir, file);
const dest = path.join(destDir, file);
if (!existsSync(src)) continue;
const lines = readFileSync(src, "utf8").split(/\r?\n/).filter((l) => l.trim());
if (lines.length < 2) { writeFileSync(dest, lines[0] + "\n"); continue; }
const idx = colIndex(lines[0]);
const col = idx.get(idCol) ?? -1;
const out = [lines[0]];
for (let i = 1; i < lines.length; i++) {
const fields = splitCsv(lines[i]);
if ((validIds as Set<string>).has(fields[col] ?? "")) out.push(lines[i]);
}
writeFileSync(dest, out.join("\n") + "\n");
}
// ── Step 5b: copy feed_info.txt verbatim (not filterable, Valhalla may need it) ──
{
const src = path.join(rawDir, "feed_info.txt");
if (existsSync(src)) copyFileSync(src, path.join(destDir, "feed_info.txt"));
}
// ── Step 6: shapes.txt (large — stream-filter) ─────────────────────────────
if (validShapeIds.size > 0) {
await filterLargeCsv(
path.join(rawDir, "shapes.txt"),
path.join(destDir, "shapes.txt"),
(col, line) => validShapeIds.has(line.split(",")[col] ?? ""),
(idx) => idx.get("shape_id") ?? -1,
);
}
console.log(
`[download-gtfs] Filter complete: ${allTripStopIds.size} stops, ` +
`${validTripIds.size} trips, ${validRouteIds.size} routes`,
);
}
// ─── Job handler ──────────────────────────────────────────────────────────────
export async function handleDownloadGtfs(job: Job<DownloadGtfsData>): Promise<void> {
const { url, citySlug, bbox, force = false } = job.data;
const effectiveSource = url;
const destDir = cityFeedDir(citySlug);
const cityMarker = readCityMarker(citySlug);
// ── Check if per-city feed is already up to date ───────────────────────────
const cityDataExists = existsSync(destDir) && readdirSync(destDir).some((f) => f.endsWith(".txt"));
if (!force && cityDataExists && cityMarker?.source === effectiveSource &&
cityMarker?.filterVersion === FILTER_VERSION && bboxEqual(cityMarker.bbox, bbox)) {
console.log(`[download-gtfs] Per-city feed for ${citySlug} is up to date, skipping`);
await job.updateProgress({ stage: "Downloading GTFS", pct: 100, message: "Feed up to date." } satisfies JobProgress);
return;
}
// ── Ensure raw feed is present ─────────────────────────────────────────────
const GTFS_RAW_DIR = rawDir(url);
const GTFS_ZIP_PATH = zipPath(url);
const rawMarker = readRawMarker(url);
const rawExists = existsSync(GTFS_RAW_DIR) && readdirSync(GTFS_RAW_DIR).some((f) => f.endsWith(".txt"));
if (force || !rawExists || rawMarker?.source !== effectiveSource) {
await job.updateProgress({ stage: "Downloading GTFS", pct: 5, message: `Downloading GTFS feed…` } satisfies JobProgress);
mkdirSync(GTFS_DATA_DIR, { recursive: true });
const response = await fetch(url, { signal: AbortSignal.timeout(600_000) });
if (!response.ok || !response.body) {
throw new Error(`Failed to download GTFS: HTTP ${response.status} ${response.statusText}`);
}
const totalBytes = Number(response.headers.get("content-length") ?? 0);
let downloadedBytes = 0;
let lastReportedPct = 5;
const nodeReadable = Readable.fromWeb(response.body as Parameters<typeof Readable.fromWeb>[0]);
nodeReadable.on("data", (chunk: Buffer) => {
downloadedBytes += chunk.length;
if (totalBytes > 0) {
const pct = Math.min(55, 5 + Math.round((downloadedBytes / totalBytes) * 50));
if (pct > lastReportedPct + 4) {
lastReportedPct = pct;
void job.updateProgress({
stage: "Downloading GTFS", pct,
message: `Downloading… ${(downloadedBytes / 1024 / 1024).toFixed(1)} / ${(totalBytes / 1024 / 1024).toFixed(1)} MB`,
bytesDownloaded: downloadedBytes, totalBytes,
} satisfies JobProgress);
}
}
});
await pipeline(nodeReadable, createWriteStream(GTFS_ZIP_PATH));
console.log(`[download-gtfs] Downloaded ${(downloadedBytes / 1024 / 1024).toFixed(1)} MB`);
await job.updateProgress({ stage: "Downloading GTFS", pct: 60, message: "Extracting GTFS feed…" } satisfies JobProgress);
if (existsSync(GTFS_RAW_DIR)) rmSync(GTFS_RAW_DIR, { recursive: true, force: true });
mkdirSync(GTFS_RAW_DIR, { recursive: true });
const zip = unzipper.Parse({ forceStream: true });
createReadStream(GTFS_ZIP_PATH).pipe(zip);
for await (const entry of zip) {
const e = entry as unzipper.Entry;
const destPath = path.join(GTFS_RAW_DIR, path.basename(e.path));
if (e.type === "Directory") { e.autodrain(); continue; }
await mkdir(path.dirname(destPath), { recursive: true });
await pipeline(e as unknown as NodeJS.ReadableStream, createWriteStream(destPath));
}
rmSync(GTFS_ZIP_PATH, { force: true });
const extractedFiles = readdirSync(GTFS_RAW_DIR);
console.log(`[download-gtfs] Extracted ${extractedFiles.length} files to ${GTFS_RAW_DIR}`);
writeFileSync(rawMarkerPath(url), JSON.stringify({ source: effectiveSource }));
pruneStaleRawDirs(url, urlCacheKey(url));
} else {
console.log(`[download-gtfs] Raw feed already present (source=${effectiveSource})`);
await job.updateProgress({ stage: "Downloading GTFS", pct: 60, message: "Using cached raw feed." } satisfies JobProgress);
}
// ── Filter raw feed for this city ──────────────────────────────────────────
await job.updateProgress({ stage: "Downloading GTFS", pct: 65, message: `Filtering GTFS for ${citySlug}` } satisfies JobProgress);
if (existsSync(destDir)) rmSync(destDir, { recursive: true, force: true });
mkdirSync(destDir, { recursive: true });
await filterGtfsForCity(GTFS_RAW_DIR, destDir, bbox);
writeCityMarker(citySlug, effectiveSource, bbox);
await job.updateProgress({ stage: "Downloading GTFS", pct: 100, message: `GTFS ready for ${citySlug}.` } satisfies JobProgress);
}