cewe-price-comparison/cewe.py

282 lines
No EOL
9.5 KiB
Python

# /// script
# dependencies = [
# "requests",
# "textual",
# "thefuzz"
# ]
# ///
import concurrent.futures
import requests
import random
from textual.app import App, ComposeResult
from textual.widgets import DataTable, Footer, Header, Input, TabbedContent, TabPane, LoadingIndicator, Label, ProgressBar
from textual.containers import Vertical
from textual.screen import Screen
from textual.worker import Worker, WorkerState
from textual import on, work
from thefuzz import process
KIDS = {
"rossman": 30400,
"dm": 1320,
"müller": 3018,
"budni": 1773,
"edeka": 24273,
"mms": 2363,
"kaufland": 6377,
"ringfoto": 15079,
"marktkauf": 4185
}
ARTICLE_GROUP_NAMES = {
4: "Adventskalender",
5: "CEWE FOTOBUCH",
6: "Kalender",
7: "Karten",
11: "Digitale Produkte",
13: "Kartensets",
14: "Zubehör",
30: "Handyhüllen",
38: "Zubehör",
}
CHARACTER_REPLACEMENTS = {
'ä': 'ae',
'ö': 'oe',
'ü': 'ue',
'Ä': 'Ae',
'Ö': 'Oe',
'Ü': 'Ue',
'ß': 'ss',
}
FUNNY_SENTENCES = [
"Einen Moment, die Pixel werden noch handgemalt...",
"Ladebalken-Rennen: Wer gewinnt?",
"Gleich geht's los, versprochen!",
"Wird geladen... oder auch nicht. Scherz!",
"Ich bin schneller als die deutsche Bahn!",
"Kaffee wird gekocht...",
"Nicht verzweifeln, wir sind gleich soweit!",
]
def slugify(text):
for old, new in CHARACTER_REPLACEMENTS.items():
text = text.replace(old, new)
return text.replace(" ", "_")
def deslugify(text):
text = text.replace("_", " ")
for old, new in CHARACTER_REPLACEMENTS.items():
text = text.replace(new, old)
return text
class LoadingScreen(Screen):
CSS = """
#loading_container {
align: center top;
padding: 2;
align-horizontal: center;
}
#indicator_container {
height: 10;
align: center middle;
}
#loading_label {
width: 100%;
content-align-horizontal: center;
}
#status_label {
width: 100%;
content-align-horizontal: center;
color: $text-muted;
}
#progress_bar {
width: 80%;
margin-top: 1;
align: center top;
}
"""
def compose(self) -> ComposeResult:
with Vertical(id="loading_container"):
with Vertical(id="indicator_container"):
yield LoadingIndicator()
yield Label(id="loading_label")
yield Label(id="status_label")
yield ProgressBar(total=len(KIDS), id="progress_bar")
def on_mount(self) -> None:
self.query_one("#loading_label").update(random.choice(FUNNY_SENTENCES))
self.set_interval(2, self.change_loading_sentence)
def change_loading_sentence(self) -> None:
self.query_one("#loading_label").update(random.choice(FUNNY_SENTENCES))
def update_progress(self, i, vendor):
self.query_one(ProgressBar).advance(1)
self.query_one("#status_label").update(f"Fetching {vendor} ({i}/{len(KIDS)})")
class PriceApp(App):
"""A Textual app to display product prices."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.categorized_prices = {}
def compose(self) -> ComposeResult:
"""Create child widgets for the app."""
yield Header()
yield Input(placeholder="Suche nach einem Produkt...", disabled=True)
yield Footer()
yield TabbedContent(id="tabs")
def on_mount(self) -> None:
"""Called when the app is mounted."""
loading_screen = LoadingScreen()
self.push_screen(loading_screen)
self.fetch_prices(loading_screen)
@work(thread=True)
def fetch_prices(self, loading_screen):
"""Fetch prices in the background."""
all_prices = {}
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_vendor = {executor.submit(self.fetch_price_list, name, kid): name for name, kid in KIDS.items()}
for i, future in enumerate(concurrent.futures.as_completed(future_to_vendor)):
vendor, data = future.result()
if data:
all_prices[vendor] = data
self.call_from_thread(loading_screen.update_progress, i + 1, vendor)
return find_cheapest_prices_by_category(all_prices)
def fetch_price_list(self, vendor, kid):
"""Fetches the price list for a single vendor."""
url = f"https://wci.photoprintit.com/wci/api/pricelists/{kid}?cors=https:foto.mueller.de:&withSupplements=true&languageId=de"
try:
resp = requests.get(url=url)
resp.raise_for_status()
return vendor, resp.json()
except requests.exceptions.RequestException as e:
print(f"Request failed for {vendor}: {e}")
except (KeyError, IndexError):
print(f"Unexpected API response from {vendor}")
return vendor, None
@on(Worker.StateChanged)
def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
"""Called when the worker state changes."""
if event.state == WorkerState.SUCCESS:
self.pop_screen()
self.categorized_prices = event.worker.result
self.query_one(Input).disabled = False
tabs = self.query_one("#tabs")
tabs.visible = True
# Add the "Alle" tab first
products = self.categorized_prices.get("Alle", {})
table = DataTable()
table.add_column("Produkt")
table.add_column("Günstigster Anbieter")
table.add_column("Preis", key="price")
for product, info in sorted(products.items()):
table.add_row(
product,
info["vendor"],
f"{info['price']:.2f}",
key=product
)
pane = TabPane("Alle", table, id=slugify("Alle"))
tabs.add_pane(pane)
for category, products in sorted(self.categorized_prices.items()):
if category == "Alle":
continue
table = DataTable()
table.add_column("Produkt")
table.add_column("Günstigster Anbieter")
table.add_column("Preis", key="price")
for product, info in sorted(products.items()):
table.add_row(
product,
info["vendor"],
f"{info['price']:.2f}",
key=product
)
pane = TabPane(category, table, id=slugify(category))
tabs.add_pane(pane)
def on_input_changed(self, event: Input.Changed) -> None:
"""Called when the input changes."""
search_term = event.value.lower()
tabs = self.query_one("#tabs")
if search_term:
tabs.active = "Alle"
table = self.query_one("#Alle > DataTable")
table.clear()
products = self.categorized_prices.get("Alle", {})
choices = list(products.keys())
results = process.extract(search_term, choices, limit=50)
for result, score in results:
info = products[result]
table.add_row(
result,
info["vendor"],
f"{info['price']:.2f}",
key=result
)
else:
# Repopulate all tabs
for pane in self.query(TabPane):
table = pane.query_one(DataTable)
table.clear()
category = deslugify(pane.id)
products = self.categorized_prices.get(category, {})
for product, info in sorted(products.items()):
table.add_row(
product,
info["vendor"],
f"{info['price']:.2f}",
key=product
)
def find_cheapest_prices_by_category(all_prices):
"""
Finds the cheapest price for each product and groups them by category.
"""
categorized_prices = {"Alle": {}}
for vendor, price_lists in all_prices.items():
for price_list in price_lists:
for product in price_list.get("priceListProducts", []):
product_name = product.get("name")
if not product_name:
continue
price_info = product.get("mailOrderPrices")
if not price_info:
continue
price = price_info[0].get("price")
if price is None:
continue
price /= 100
category_id = product.get("articleGroupId")
category_name = ARTICLE_GROUP_NAMES.get(category_id, "Unkategorisiert")
if category_name not in categorized_prices:
categorized_prices[category_name] = {}
if product_name not in categorized_prices[category_name] or price < categorized_prices[category_name][product_name]["price"]:
categorized_prices[category_name][product_name] = {"vendor": vendor, "price": price}
if product_name not in categorized_prices["Alle"] or price < categorized_prices["Alle"][product_name]["price"]:
categorized_prices["Alle"][product_name] = {"vendor": vendor, "price": price}
return categorized_prices
if __name__ == "__main__":
app = PriceApp()
app.run()