diff --git a/worker/src/jobs/build-valhalla.ts b/worker/src/jobs/build-valhalla.ts index 461339d..ba922c2 100644 --- a/worker/src/jobs/build-valhalla.ts +++ b/worker/src/jobs/build-valhalla.ts @@ -304,6 +304,67 @@ function isTransitIngestFresh(citySlug: string): boolean { return statSync(transitCacheMarker).mtimeMs >= statSync(gtfsCityMarker).mtimeMs; } +/** + * Run transit ingest + convert for a city. + * Applies quarantine first so bad stops are excluded before the graph is built. + */ +async function runTransitIngestConvert(citySlug: string): Promise { + const feedDir = cityGtfsFeedDir(citySlug); + const gtfsReady = existsSync(feedDir) && readdirSync(feedDir).some((f) => f.endsWith(".txt")); + if (!gtfsReady) { + console.log(`[build-valhalla] No GTFS feed found for ${citySlug} — skipping transit ingest/convert`); + return; + } + + if (!existsSync(TIMEZONE_SQLITE)) { + console.log("[build-valhalla] Building timezone database…"); + try { await buildTimezoneDb(); } catch (err) { console.warn("[build-valhalla] valhalla_build_timezones failed:", err); } + } + + // Remove quarantined stops from the feed before ingest. + applyQuarantine(citySlug); + + const transitCacheDir = cityTransitCacheDir(citySlug); + const transitCacheMarker = `${transitCacheDir}/.ready`; + + if (!isTransitIngestFresh(citySlug) && existsSync(TIMEZONE_SQLITE)) { + console.log(`[build-valhalla] Ingesting GTFS transit feeds for ${citySlug}…`); + try { + rmSync(transitCacheDir, { recursive: true, force: true }); + mkdirSync(transitCacheDir, { recursive: true }); + const cfg = writeCityTransitIngestConfig(citySlug); + await runProcess("valhalla_ingest_transit", ["-c", cfg]); + writeFileSync(transitCacheMarker, new Date().toISOString()); + console.log(`[build-valhalla] valhalla_ingest_transit completed for ${citySlug}`); + } catch (err) { + console.warn(`[build-valhalla] valhalla_ingest_transit failed for ${citySlug}:`, err); + rmSync(transitCacheDir, { recursive: true, force: true }); + mkdirSync(transitCacheDir, { recursive: true }); + } + } else if (isTransitIngestFresh(citySlug)) { + console.log(`[build-valhalla] Transit ingest cache fresh for ${citySlug} — skipping re-ingest`); + } + + console.log(`[build-valhalla] Converting transit staging tiles for ${citySlug}…`); + try { + const cfg = writeCityTransitIngestConfig(citySlug); + await runProcess("valhalla_convert_transit", ["-c", cfg]); + console.log(`[build-valhalla] valhalla_convert_transit completed for ${citySlug}`); + } catch (err) { + console.warn(`[build-valhalla] valhalla_convert_transit failed for ${citySlug}:`, err); + } +} + +/** Extract "Could not find connection point" coordinates from captured output lines. */ +function parseBadTransitCoords(lines: string[]): [number, number][] { + return lines + .filter((l) => l.includes("Could not find connection point for in/egress near:")) + .flatMap((l) => { + const m = l.match(/near:\s*([\d.]+),([\d.]+)/); + return m ? [[parseFloat(m[1]), parseFloat(m[2])] as [number, number]] : []; + }); +} + /** Build per-city tiles. Invalidates Actor pool entry before and writes .ready after. */ async function buildCityTiles(citySlug: string, pbfPath: string): Promise { const cityTileDir = `${VALHALLA_TILES_BASE}/${citySlug}`; @@ -313,53 +374,38 @@ async function buildCityTiles(citySlug: string, pbfPath: string): Promise if (existsSync(readyMarker)) unlinkSync(readyMarker); invalidateActor(INCLUDE_TRANSIT ? "transit" : "road", citySlug); - // ── Transit ingest + convert (transit container only) ───────────────────── - if (INCLUDE_TRANSIT) { - const feedDir = cityGtfsFeedDir(citySlug); - const gtfsReady = existsSync(feedDir) && readdirSync(feedDir).some((f) => f.endsWith(".txt")); - - if (gtfsReady) { - if (!existsSync(TIMEZONE_SQLITE)) { - console.log("[build-valhalla] Building timezone database…"); - try { await buildTimezoneDb(); } catch (err) { console.warn("[build-valhalla] valhalla_build_timezones failed:", err); } - } - - const transitCacheDir = cityTransitCacheDir(citySlug); - const transitCacheMarker = `${transitCacheDir}/.ready`; - - if (!isTransitIngestFresh(citySlug) && existsSync(TIMEZONE_SQLITE)) { - console.log(`[build-valhalla] Ingesting GTFS transit feeds for ${citySlug}…`); - try { - rmSync(transitCacheDir, { recursive: true, force: true }); - mkdirSync(transitCacheDir, { recursive: true }); - const cfg = writeCityTransitIngestConfig(citySlug); - await runProcess("valhalla_ingest_transit", ["-c", cfg]); - writeFileSync(transitCacheMarker, new Date().toISOString()); - console.log(`[build-valhalla] valhalla_ingest_transit completed for ${citySlug}`); - } catch (err) { - console.warn(`[build-valhalla] valhalla_ingest_transit failed for ${citySlug}:`, err); - rmSync(transitCacheDir, { recursive: true, force: true }); - mkdirSync(transitCacheDir, { recursive: true }); - } - } else if (isTransitIngestFresh(citySlug)) { - console.log(`[build-valhalla] Transit ingest cache fresh for ${citySlug} — skipping re-ingest`); - } - - console.log(`[build-valhalla] Converting transit staging tiles for ${citySlug}…`); - try { - const cfg = writeCityTransitIngestConfig(citySlug); - await runProcess("valhalla_convert_transit", ["-c", cfg]); - console.log(`[build-valhalla] valhalla_convert_transit completed for ${citySlug}`); - } catch (err) { - console.warn(`[build-valhalla] valhalla_convert_transit failed for ${citySlug}:`, err); - } - } else { - console.log(`[build-valhalla] No GTFS feed found for ${citySlug} — skipping transit ingest/convert`); - } - } + if (INCLUDE_TRANSIT) await runTransitIngestConvert(citySlug); const configPath = writeCityConfig(citySlug, cityTileDir); - await runProcess("valhalla_build_tiles", ["-c", configPath, pbfPath]); + + // valhalla_build_tiles: capture output so we can quarantine bad stops on crash. + // Transit crashes (SIGSEGV/SIGABRT from unconnectable stops) are retried once + // after quarantining the offending stops and re-running ingest+convert. + const MAX_ATTEMPTS = INCLUDE_TRANSIT ? 5 : 1; + for (let attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { + try { + await runProcessCapture("valhalla_build_tiles", ["-c", configPath, pbfPath]); + break; // success + } catch (err) { + const isLast = attempt >= MAX_ATTEMPTS - 1; + if (!INCLUDE_TRANSIT || isLast || !(err instanceof ProcessError) || !err.signal) throw err; + + const badCoords = parseBadTransitCoords(err.lines); + if (badCoords.length === 0) throw err; // unrelated crash — don't retry + + console.warn( + `[build-valhalla] ${err.signal} from ${badCoords.length} unconnectable transit stop(s) ` + + `at ${badCoords.map((c) => c.join(",")).join("; ")} — quarantining and retrying (attempt ${attempt + 1}/${MAX_ATTEMPTS})`, + ); + appendQuarantine(citySlug, badCoords); + + // Re-run ingest+convert with quarantine applied so the bad stops are excluded. + const transitCacheDir = cityTransitCacheDir(citySlug); + rmSync(transitCacheDir, { recursive: true, force: true }); + mkdirSync(transitCacheDir, { recursive: true }); + await runTransitIngestConvert(citySlug); + } + } writeFileSync(readyMarker, new Date().toISOString()); console.log(`[build-valhalla] Tiles ready for ${citySlug} → ${cityTileDir}`);