import type { Job } from "bullmq"; import { spawn } from "child_process"; import { existsSync } from "fs"; import path from "path"; import type { JobProgress } from "@transportationer/shared"; import { getSql } from "../db.js"; export type ExtractPoisData = { type: "extract-pois"; citySlug: string; pbfPath: string; bbox?: [number, number, number, number]; // [minLng, minLat, maxLng, maxLat] }; const OSM_DATA_DIR = process.env.OSM_DATA_DIR ?? "/data/osm"; const LUA_SCRIPT = process.env.LUA_SCRIPT ?? "/app/infra/osm2pgsql.lua"; const DATABASE_URL = process.env.DATABASE_URL!; function runProcess( cmd: string, args: string[], env: Record = {}, onLine?: (line: string) => void, ): Promise { return new Promise((resolve, reject) => { const child = spawn(cmd, args, { stdio: ["ignore", "pipe", "pipe"], env: { ...process.env, ...env }, }); const handleLine = (data: Buffer) => { const lines = data.toString().split("\n").filter(Boolean); lines.forEach((l) => { process.stdout.write(`[${cmd}] ${l}\n`); onLine?.(l); }); }; child.stdout.on("data", handleLine); child.stderr.on("data", handleLine); child.on("error", reject); child.on("exit", (code) => { if (code === 0) resolve(); else reject(new Error(`${cmd} exited with code ${code}`)); }); }); } export async function handleExtractPois( job: Job, ): Promise { const { citySlug, pbfPath, bbox } = job.data; const filteredPbf = path.join(OSM_DATA_DIR, `${citySlug}-filtered.osm.pbf`); if (!existsSync(pbfPath)) { throw new Error(`PBF file not found: ${pbfPath}`); } // Stage 0 (optional): osmium extract --bbox to clip to the area of interest. // This dramatically reduces memory and processing time for large geofabrik regions. let sourcePbf = pbfPath; if (bbox) { const [minLng, minLat, maxLng, maxLat] = bbox; const bboxPbf = path.join(OSM_DATA_DIR, `${citySlug}-bbox.osm.pbf`); await job.updateProgress({ stage: "Clipping to bounding box", pct: 2, message: `Clipping region to [${minLng},${minLat},${maxLng},${maxLat}]…`, } satisfies JobProgress); await runProcess( "osmium", [ "extract", `--bbox=${minLng},${minLat},${maxLng},${maxLat}`, pbfPath, "-o", bboxPbf, "--overwrite", ], {}, ); sourcePbf = bboxPbf; } // Stage 1: osmium tags-filter await job.updateProgress({ stage: "Filtering OSM tags", pct: 5, message: "Running osmium tags-filter…", } satisfies JobProgress); await runProcess( "osmium", [ "tags-filter", sourcePbf, // Include all relevant tag groups "nwr/amenity=pharmacy,bank,atm,cafe,restaurant,fast_food,post_office,marketplace", "nwr/amenity=bicycle_rental,car_sharing,ferry_terminal", "nwr/amenity=kindergarten,school,university,college", "nwr/amenity=library,theatre,cinema,community_centre,place_of_worship", "nwr/amenity=hospital,clinic,doctors,social_facility,townhall,police", "nwr/amenity=swimming_pool", "nwr/shop=supermarket,convenience,bakery,pharmacy,laundry,dry_cleaning,greengrocer,butcher", "nwr/highway=bus_stop", "nwr/railway=station,halt,tram_stop,subway_entrance", "nwr/public_transport=stop_position,platform", "nwr/office=coworking,company,government", "nwr/tourism=museum", "nwr/leisure=park,playground,sports_centre,fitness_centre,swimming_pool,garden,nature_reserve,pitch,arts_centre", "-o", filteredPbf, "--overwrite", ], {}, ); await job.updateProgress({ stage: "Filtering OSM tags", pct: 30, message: "osmium complete, starting osm2pgsql…", } satisfies JobProgress); // Stage 2: osm2pgsql with flex output // The Lua script writes to a per-city staging table (raw_pois_import_{slug}) // in create mode, so osm2pgsql can drop/recreate it freely without touching // other cities' rows in raw_pois. We merge afterwards. let nodeCount = 0; await runProcess( "osm2pgsql", [ "--output=flex", `--style=${LUA_SCRIPT}`, `--database=${DATABASE_URL}`, "--slim", "--drop", filteredPbf, ], { CITY_SLUG: citySlug }, (line) => { const match = line.match(/(\d+)\s+nodes?/i); if (match) { nodeCount = parseInt(match[1]); const pct = Math.min(30 + Math.floor(nodeCount / 10_000), 95); job .updateProgress({ stage: "Importing to PostGIS", pct, message: line.trim(), } satisfies JobProgress) .catch(() => {}); } }, ); // Merge staging table into raw_pois, replacing only this city's rows. // The staging table name mirrors what the Lua script uses. const stagingTable = `raw_pois_import_${citySlug.replace(/[^a-z0-9]/gi, "_")}`; const sql = getSql(); await Promise.resolve(sql`DELETE FROM raw_pois WHERE city_slug = ${citySlug}`); await Promise.resolve(sql` INSERT INTO raw_pois (osm_id, osm_type, city_slug, category, subcategory, name, tags, geom) SELECT osm_id, osm_type, city_slug, category, subcategory, name, tags, geom FROM ${sql(stagingTable)} `); await Promise.resolve(sql`DROP TABLE IF EXISTS ${sql(stagingTable)}`); await job.updateProgress({ stage: "Importing to PostGIS", pct: 100, message: `POI extraction complete for ${citySlug}`, } satisfies JobProgress); }