commit 60014089af28b67232ba74c84a13d68b01c82d02 Author: Jan-Henrik Bruhn Date: Sun Mar 29 18:22:00 2026 +0200 chore: initial commit diff --git a/cewe.py b/cewe.py new file mode 100644 index 0000000..fe7f28a --- /dev/null +++ b/cewe.py @@ -0,0 +1,282 @@ +# /// script +# dependencies = [ +# "requests", +# "textual", +# "thefuzz" +# ] +# /// + +import concurrent.futures +import requests +import random +from textual.app import App, ComposeResult +from textual.widgets import DataTable, Footer, Header, Input, TabbedContent, TabPane, LoadingIndicator, Label, ProgressBar +from textual.containers import Vertical +from textual.screen import Screen +from textual.worker import Worker, WorkerState +from textual import on, work +from thefuzz import process + +KIDS = { + "rossman": 30400, + "dm": 1320, + "müller": 3018, + "budni": 1773, + "edeka": 24273, + "mms": 2363, + "kaufland": 6377, + "ringfoto": 15079, + "marktkauf": 4185 +} + +ARTICLE_GROUP_NAMES = { + 4: "Adventskalender", + 5: "CEWE FOTOBUCH", + 6: "Kalender", + 7: "Karten", + 11: "Digitale Produkte", + 13: "Kartensets", + 14: "Zubehör", + 30: "Handyhüllen", + 38: "Zubehör", +} + +CHARACTER_REPLACEMENTS = { + 'ä': 'ae', + 'ö': 'oe', + 'ü': 'ue', + 'Ä': 'Ae', + 'Ö': 'Oe', + 'Ü': 'Ue', + 'ß': 'ss', +} + +FUNNY_SENTENCES = [ + "Einen Moment, die Pixel werden noch handgemalt...", + "Ladebalken-Rennen: Wer gewinnt?", + "Gleich geht's los, versprochen!", + "Wird geladen... oder auch nicht. Scherz!", + "Ich bin schneller als die deutsche Bahn!", + "Kaffee wird gekocht...", + "Nicht verzweifeln, wir sind gleich soweit!", +] + +def slugify(text): + for old, new in CHARACTER_REPLACEMENTS.items(): + text = text.replace(old, new) + return text.replace(" ", "_") + +def deslugify(text): + text = text.replace("_", " ") + for old, new in CHARACTER_REPLACEMENTS.items(): + text = text.replace(new, old) + return text + +class LoadingScreen(Screen): + CSS = """ + #loading_container { + align: center top; + padding: 2; + align-horizontal: center; + } + #indicator_container { + height: 10; + align: center middle; + } + #loading_label { + width: 100%; + content-align-horizontal: center; + } + #status_label { + width: 100%; + content-align-horizontal: center; + color: $text-muted; + } + #progress_bar { + width: 80%; + margin-top: 1; + align: center top; + } + """ + def compose(self) -> ComposeResult: + with Vertical(id="loading_container"): + with Vertical(id="indicator_container"): + yield LoadingIndicator() + yield Label(id="loading_label") + yield Label(id="status_label") + yield ProgressBar(total=len(KIDS), id="progress_bar") + + def on_mount(self) -> None: + self.query_one("#loading_label").update(random.choice(FUNNY_SENTENCES)) + self.set_interval(2, self.change_loading_sentence) + + def change_loading_sentence(self) -> None: + self.query_one("#loading_label").update(random.choice(FUNNY_SENTENCES)) + + def update_progress(self, i, vendor): + self.query_one(ProgressBar).advance(1) + self.query_one("#status_label").update(f"Fetching {vendor} ({i}/{len(KIDS)})") + +class PriceApp(App): + """A Textual app to display product prices.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.categorized_prices = {} + + def compose(self) -> ComposeResult: + """Create child widgets for the app.""" + yield Header() + yield Input(placeholder="Suche nach einem Produkt...", disabled=True) + yield Footer() + yield TabbedContent(id="tabs") + + def on_mount(self) -> None: + """Called when the app is mounted.""" + loading_screen = LoadingScreen() + self.push_screen(loading_screen) + self.fetch_prices(loading_screen) + + @work(thread=True) + def fetch_prices(self, loading_screen): + """Fetch prices in the background.""" + all_prices = {} + with concurrent.futures.ThreadPoolExecutor() as executor: + future_to_vendor = {executor.submit(self.fetch_price_list, name, kid): name for name, kid in KIDS.items()} + for i, future in enumerate(concurrent.futures.as_completed(future_to_vendor)): + vendor, data = future.result() + if data: + all_prices[vendor] = data + self.call_from_thread(loading_screen.update_progress, i + 1, vendor) + return find_cheapest_prices_by_category(all_prices) + + def fetch_price_list(self, vendor, kid): + """Fetches the price list for a single vendor.""" + url = f"https://wci.photoprintit.com/wci/api/pricelists/{kid}?cors=https:foto.mueller.de:&withSupplements=true&languageId=de" + try: + resp = requests.get(url=url) + resp.raise_for_status() + return vendor, resp.json() + except requests.exceptions.RequestException as e: + print(f"Request failed for {vendor}: {e}") + except (KeyError, IndexError): + print(f"Unexpected API response from {vendor}") + return vendor, None + + @on(Worker.StateChanged) + def on_worker_state_changed(self, event: Worker.StateChanged) -> None: + """Called when the worker state changes.""" + if event.state == WorkerState.SUCCESS: + self.pop_screen() + self.categorized_prices = event.worker.result + self.query_one(Input).disabled = False + tabs = self.query_one("#tabs") + tabs.visible = True + + # Add the "Alle" tab first + products = self.categorized_prices.get("Alle", {}) + table = DataTable() + table.add_column("Produkt") + table.add_column("Günstigster Anbieter") + table.add_column("Preis", key="price") + for product, info in sorted(products.items()): + table.add_row( + product, + info["vendor"], + f"{info['price']:.2f} €", + key=product + ) + pane = TabPane("Alle", table, id=slugify("Alle")) + tabs.add_pane(pane) + + for category, products in sorted(self.categorized_prices.items()): + if category == "Alle": + continue + table = DataTable() + table.add_column("Produkt") + table.add_column("Günstigster Anbieter") + table.add_column("Preis", key="price") + for product, info in sorted(products.items()): + table.add_row( + product, + info["vendor"], + f"{info['price']:.2f} €", + key=product + ) + pane = TabPane(category, table, id=slugify(category)) + tabs.add_pane(pane) + + def on_input_changed(self, event: Input.Changed) -> None: + """Called when the input changes.""" + search_term = event.value.lower() + tabs = self.query_one("#tabs") + + if search_term: + tabs.active = "Alle" + table = self.query_one("#Alle > DataTable") + table.clear() + products = self.categorized_prices.get("Alle", {}) + choices = list(products.keys()) + results = process.extract(search_term, choices, limit=50) + for result, score in results: + info = products[result] + table.add_row( + result, + info["vendor"], + f"{info['price']:.2f} €", + key=result + ) + else: + # Repopulate all tabs + for pane in self.query(TabPane): + table = pane.query_one(DataTable) + table.clear() + category = deslugify(pane.id) + products = self.categorized_prices.get(category, {}) + for product, info in sorted(products.items()): + table.add_row( + product, + info["vendor"], + f"{info['price']:.2f} €", + key=product + ) + +def find_cheapest_prices_by_category(all_prices): + """ + Finds the cheapest price for each product and groups them by category. + """ + categorized_prices = {"Alle": {}} + for vendor, price_lists in all_prices.items(): + for price_list in price_lists: + for product in price_list.get("priceListProducts", []): + product_name = product.get("name") + if not product_name: + continue + + price_info = product.get("mailOrderPrices") + if not price_info: + continue + + price = price_info[0].get("price") + if price is None: + continue + + price /= 100 + + category_id = product.get("articleGroupId") + category_name = ARTICLE_GROUP_NAMES.get(category_id, "Unkategorisiert") + + if category_name not in categorized_prices: + categorized_prices[category_name] = {} + + if product_name not in categorized_prices[category_name] or price < categorized_prices[category_name][product_name]["price"]: + categorized_prices[category_name][product_name] = {"vendor": vendor, "price": price} + + if product_name not in categorized_prices["Alle"] or price < categorized_prices["Alle"][product_name]["price"]: + categorized_prices["Alle"][product_name] = {"vendor": vendor, "price": price} + + return categorized_prices + +if __name__ == "__main__": + app = PriceApp() + app.run() \ No newline at end of file