# /// script # dependencies = [ # "requests", # "textual", # "thefuzz" # ] # /// import concurrent.futures import requests import random from textual.app import App, ComposeResult from textual.widgets import DataTable, Footer, Header, Input, TabbedContent, TabPane, LoadingIndicator, Label, ProgressBar from textual.containers import Vertical from textual.screen import Screen from textual.worker import Worker, WorkerState from textual import on, work from thefuzz import process KIDS = { "rossman": 30400, "dm": 1320, "müller": 3018, "budni": 1773, "edeka": 24273, "mms": 2363, "kaufland": 6377, "ringfoto": 15079, "marktkauf": 4185 } ARTICLE_GROUP_NAMES = { 4: "Adventskalender", 5: "CEWE FOTOBUCH", 6: "Kalender", 7: "Karten", 11: "Digitale Produkte", 13: "Kartensets", 14: "Zubehör", 30: "Handyhüllen", 38: "Zubehör", } CHARACTER_REPLACEMENTS = { 'ä': 'ae', 'ö': 'oe', 'ü': 'ue', 'Ä': 'Ae', 'Ö': 'Oe', 'Ü': 'Ue', 'ß': 'ss', } FUNNY_SENTENCES = [ "Einen Moment, die Pixel werden noch handgemalt...", "Ladebalken-Rennen: Wer gewinnt?", "Gleich geht's los, versprochen!", "Wird geladen... oder auch nicht. Scherz!", "Ich bin schneller als die deutsche Bahn!", "Kaffee wird gekocht...", "Nicht verzweifeln, wir sind gleich soweit!", ] def slugify(text): for old, new in CHARACTER_REPLACEMENTS.items(): text = text.replace(old, new) return text.replace(" ", "_") def deslugify(text): text = text.replace("_", " ") for old, new in CHARACTER_REPLACEMENTS.items(): text = text.replace(new, old) return text class LoadingScreen(Screen): CSS = """ #loading_container { align: center top; padding: 2; align-horizontal: center; } #indicator_container { height: 10; align: center middle; } #loading_label { width: 100%; content-align-horizontal: center; } #status_label { width: 100%; content-align-horizontal: center; color: $text-muted; } #progress_bar { width: 80%; margin-top: 1; align: center top; } """ def compose(self) -> ComposeResult: with Vertical(id="loading_container"): with Vertical(id="indicator_container"): yield LoadingIndicator() yield Label(id="loading_label") yield Label(id="status_label") yield ProgressBar(total=len(KIDS), id="progress_bar") def on_mount(self) -> None: self.query_one("#loading_label").update(random.choice(FUNNY_SENTENCES)) self.set_interval(2, self.change_loading_sentence) def change_loading_sentence(self) -> None: self.query_one("#loading_label").update(random.choice(FUNNY_SENTENCES)) def update_progress(self, i, vendor): self.query_one(ProgressBar).advance(1) self.query_one("#status_label").update(f"Fetching {vendor} ({i}/{len(KIDS)})") class PriceApp(App): """A Textual app to display product prices.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.categorized_prices = {} def compose(self) -> ComposeResult: """Create child widgets for the app.""" yield Header() yield Input(placeholder="Suche nach einem Produkt...", disabled=True) yield Footer() yield TabbedContent(id="tabs") def on_mount(self) -> None: """Called when the app is mounted.""" loading_screen = LoadingScreen() self.push_screen(loading_screen) self.fetch_prices(loading_screen) @work(thread=True) def fetch_prices(self, loading_screen): """Fetch prices in the background.""" all_prices = {} with concurrent.futures.ThreadPoolExecutor() as executor: future_to_vendor = {executor.submit(self.fetch_price_list, name, kid): name for name, kid in KIDS.items()} for i, future in enumerate(concurrent.futures.as_completed(future_to_vendor)): vendor, data = future.result() if data: all_prices[vendor] = data self.call_from_thread(loading_screen.update_progress, i + 1, vendor) return find_cheapest_prices_by_category(all_prices) def fetch_price_list(self, vendor, kid): """Fetches the price list for a single vendor.""" url = f"https://wci.photoprintit.com/wci/api/pricelists/{kid}?cors=https:foto.mueller.de:&withSupplements=true&languageId=de" try: resp = requests.get(url=url) resp.raise_for_status() return vendor, resp.json() except requests.exceptions.RequestException as e: print(f"Request failed for {vendor}: {e}") except (KeyError, IndexError): print(f"Unexpected API response from {vendor}") return vendor, None @on(Worker.StateChanged) def on_worker_state_changed(self, event: Worker.StateChanged) -> None: """Called when the worker state changes.""" if event.state == WorkerState.SUCCESS: self.pop_screen() self.categorized_prices = event.worker.result self.query_one(Input).disabled = False tabs = self.query_one("#tabs") tabs.visible = True # Add the "Alle" tab first products = self.categorized_prices.get("Alle", {}) table = DataTable() table.add_column("Produkt") table.add_column("Günstigster Anbieter") table.add_column("Preis", key="price") for product, info in sorted(products.items()): table.add_row( product, info["vendor"], f"{info['price']:.2f} €", key=product ) pane = TabPane("Alle", table, id=slugify("Alle")) tabs.add_pane(pane) for category, products in sorted(self.categorized_prices.items()): if category == "Alle": continue table = DataTable() table.add_column("Produkt") table.add_column("Günstigster Anbieter") table.add_column("Preis", key="price") for product, info in sorted(products.items()): table.add_row( product, info["vendor"], f"{info['price']:.2f} €", key=product ) pane = TabPane(category, table, id=slugify(category)) tabs.add_pane(pane) def on_input_changed(self, event: Input.Changed) -> None: """Called when the input changes.""" search_term = event.value.lower() tabs = self.query_one("#tabs") if search_term: tabs.active = "Alle" table = self.query_one("#Alle > DataTable") table.clear() products = self.categorized_prices.get("Alle", {}) choices = list(products.keys()) results = process.extract(search_term, choices, limit=50) for result, score in results: info = products[result] table.add_row( result, info["vendor"], f"{info['price']:.2f} €", key=result ) else: # Repopulate all tabs for pane in self.query(TabPane): table = pane.query_one(DataTable) table.clear() category = deslugify(pane.id) products = self.categorized_prices.get(category, {}) for product, info in sorted(products.items()): table.add_row( product, info["vendor"], f"{info['price']:.2f} €", key=product ) def find_cheapest_prices_by_category(all_prices): """ Finds the cheapest price for each product and groups them by category. """ categorized_prices = {"Alle": {}} for vendor, price_lists in all_prices.items(): for price_list in price_lists: for product in price_list.get("priceListProducts", []): product_name = product.get("name") if not product_name: continue price_info = product.get("mailOrderPrices") if not price_info: continue price = price_info[0].get("price") if price is None: continue price /= 100 category_id = product.get("articleGroupId") category_name = ARTICLE_GROUP_NAMES.get(category_id, "Unkategorisiert") if category_name not in categorized_prices: categorized_prices[category_name] = {} if product_name not in categorized_prices[category_name] or price < categorized_prices[category_name][product_name]["price"]: categorized_prices[category_name][product_name] = {"vendor": vendor, "price": price} if product_name not in categorized_prices["Alle"] or price < categorized_prices["Alle"][product_name]["price"]: categorized_prices["Alle"][product_name] = {"vendor": vendor, "price": price} return categorized_prices if __name__ == "__main__": app = PriceApp() app.run()