""" BAN YARO — KI-Abstraktions-Layer Routing-Logik: 1. Immer lokal (LM Studio) zuerst versuchen 2. Falls lokal nicht erreichbar → Fallback auf Cloud (Claude), wenn ANTHROPIC_API_KEY gesetzt 3. Falls beides nicht geht → KIUnavailableError Modi (KI_MODE Umgebungsvariable): KI_MODE=local → lokal + Cloud-Fallback wenn Key vorhanden KI_MODE=cloud → lokal + Cloud-Fallback (gleiche Logik, anderer Label) KI_MODE=off → kein KI verfügbar requires_premium=True schützt Features vor Free-Usern, ändert aber nicht das Routing. """ import os import logging from typing import Optional from functools import wraps logger = logging.getLogger(__name__) KI_MODE = os.getenv("KI_MODE", "local") # off | local | cloud LOCAL_BASE_URL = os.getenv("KI_LOCAL_URL", "http://10.47.11.70:11435/v1") LOCAL_MODEL = os.getenv("KI_LOCAL_MODEL", "gemma-4-31b-it") CLOUD_MODEL = os.getenv("KI_CLOUD_MODEL", "claude-sonnet-4-6") VISION_MODEL = os.getenv("KI_VISION_MODEL", "claude-opus-4-8") # Bild-Analyse (Rassenerkennung) ANTHROPIC_KEY = os.getenv("ANTHROPIC_API_KEY", "") CLOUD_WEEKLY_LIMIT = int(os.getenv("KI_CLOUD_WEEKLY_LIMIT", "20")) # Lazy Imports — nur laden wenn wirklich benötigt _openai_client = None _anthropic_client = None def _get_local_client(): global _openai_client if _openai_client is None: from openai import OpenAI _openai_client = OpenAI(base_url=LOCAL_BASE_URL, api_key="lm-studio") return _openai_client def _get_cloud_client(): global _anthropic_client if _anthropic_client is None: import anthropic _anthropic_client = anthropic.Anthropic(api_key=ANTHROPIC_KEY) return _anthropic_client class KIUnavailableError(Exception): """KI-Feature nicht verfügbar (Off-Modus oder kein Premium).""" pass class KIPremiumRequired(Exception): """Dieses Feature erfordert Ban Yaro Premium.""" pass # ------------------------------------------------------------------ # Haupt-Aufruf-Funktion — zentraler Eingang für alle KI-Requests # ------------------------------------------------------------------ def _track_usage(user_id: int | None, source: str) -> None: """Schreibt einen KI-Aufruf in ki_daily_calls (fire-and-forget, swallows errors).""" if user_id is None: return try: from database import db from datetime import date today = date.today().isoformat() with db() as conn: conn.execute( """INSERT INTO ki_daily_calls (user_id, date, count, source) VALUES (?, ?, 1, ?) ON CONFLICT(user_id, date, source) DO UPDATE SET count = count + 1""", (user_id, today, source) ) except Exception as exc: logger.warning(f"KI-Tracking fehlgeschlagen: {exc}") def _is_cloud_priority_user(user_id: int | None) -> bool: """Privilegierte Rollen (Admin, Moderator, Züchter, Manager) nutzen Cloud-KI primär.""" if not user_id: return False try: from database import db with db() as conn: user = conn.execute( "SELECT rolle, is_moderator, is_social_media FROM users WHERE id=?", (user_id,) ).fetchone() if not user: return False return bool( user["rolle"] in ("admin", "breeder", "moderator") or user["is_moderator"] or user["is_social_media"] ) except Exception: return False def _check_weekly_cloud_limit(user_id: int | None) -> None: """Wirft KIPremiumRequired wenn user_id das wöchentliche Cloud-Limit erreicht hat.""" if user_id is None or CLOUD_WEEKLY_LIMIT <= 0: return try: from database import db with db() as conn: user = conn.execute( "SELECT rolle, is_moderator FROM users WHERE id=?", (user_id,) ).fetchone() # Admins, Moderatoren, Züchter und Media Manager haben kein Limit if user and ( user["rolle"] in ("admin", "breeder", "moderator", "media_manager") or user["is_moderator"] ): return used = conn.execute( """SELECT COALESCE(SUM(count), 0) FROM ki_daily_calls WHERE user_id=? AND source='cloud' AND date >= DATE('now', '-6 days')""", (user_id,) ).fetchone()[0] if used >= CLOUD_WEEKLY_LIMIT: raise KIPremiumRequired( f"Wöchentliches KI-Limit erreicht ({CLOUD_WEEKLY_LIMIT} Cloud-Anfragen/Woche). " "Morgen wieder verfügbar." ) except KIPremiumRequired: raise except Exception as exc: logger.warning(f"KI-Limit-Check fehlgeschlagen: {exc}") async def complete( prompt: str, system: str = "Du bist ein hilfreicher Assistent für Hundebesitzer.", max_tokens: int = 512, requires_premium: bool = False, user_is_premium: bool = False, json_mode: bool = False, return_model: bool = False, return_source: bool = False, user_id: int | None = None, ) -> str: """ KI-Completion. Wählt automatisch den richtigen Backend. Zählt jeden Aufruf in ki_daily_calls wenn user_id übergeben wird. Prüft wöchentliches Cloud-Limit (CLOUD_WEEKLY_LIMIT) für Cloud-Aufrufe. """ if KI_MODE == "off": raise KIUnavailableError("KI ist deaktiviert.") if requires_premium and not user_is_premium: raise KIPremiumRequired("Dieses Feature ist Teil von Ban Yaro Premium.") if KI_MODE in ("local", "cloud"): # Privilegierte Rollen (Admin, Moderator, Züchter, Manager) → Cloud zuerst if _is_cloud_priority_user(user_id): try: _check_weekly_cloud_limit(user_id) text = await _cloud_complete(prompt, system, max_tokens, json_mode) _track_usage(user_id, "cloud") if return_model: return (text, CLOUD_MODEL) return (text, "cloud") if return_source else text except KIPremiumRequired: raise except Exception as e: logger.warning(f"Cloud-KI nicht erreichbar für privilegierten User, Fallback lokal: {e}") try: text = await _local_complete(prompt, system, max_tokens, json_mode) except Exception as local_e: raise KIUnavailableError("KI-Modell nicht erreichbar.") from local_e _track_usage(user_id, "local") if return_model: return (text, LOCAL_MODEL) return (text, "local") if return_source else text # Standard-User → lokal zuerst, Cloud als Fallback try: text = await _local_complete(prompt, system, max_tokens, json_mode) _track_usage(user_id, "local") if return_model: return (text, LOCAL_MODEL) return (text, "local") if return_source else text except Exception as e: logger.warning(f"Lokales KI-Modell nicht erreichbar: {e}") if ANTHROPIC_KEY: logger.info("Fallback auf Cloud-KI.") _check_weekly_cloud_limit(user_id) text = await _cloud_complete(prompt, system, max_tokens, json_mode) _track_usage(user_id, "cloud") if return_model: return (text, CLOUD_MODEL) return (text, "cloud") if return_source else text raise KIUnavailableError( "KI-Modell momentan nicht erreichbar. Bitte später erneut versuchen." ) from e raise KIUnavailableError("Unbekannter KI-Modus.") async def _local_complete( prompt: str, system: str, max_tokens: int, json_mode: bool ) -> str: """LM Studio lokale Completion (synchron in Thread-Pool).""" import asyncio def _sync(): client = _get_local_client() kwargs = dict( model=LOCAL_MODEL, messages=[ {"role": "system", "content": system}, {"role": "user", "content": prompt}, ], max_tokens=max_tokens, temperature=0.3, ) if json_mode: kwargs["response_format"] = {"type": "json_object"} response = client.chat.completions.create(**kwargs) return response.choices[0].message.content.strip() return await asyncio.get_event_loop().run_in_executor(None, _sync) async def _cloud_complete( prompt: str, system: str, max_tokens: int, json_mode: bool ) -> str: """Claude API Completion mit Prompt Caching.""" import asyncio def _sync(): client = _get_cloud_client() system_content = [ { "type": "text", "text": system, "cache_control": {"type": "ephemeral"}, # Prompt Caching } ] response = client.messages.create( model=CLOUD_MODEL, max_tokens=max_tokens, system=system_content, messages=[{"role": "user", "content": prompt}], ) return response.content[0].text.strip() return await asyncio.get_event_loop().run_in_executor(None, _sync) # ------------------------------------------------------------------ # Spezialisierte KI-Funktionen # (hier liegen die konkreten Prompts — zentral, nicht in den Routes) # ------------------------------------------------------------------ async def symptom_check(symptoms: str, dog_info: dict, user_id: int | None = None, user_is_premium: bool = False) -> dict: """ Symptom-Triage für Hunde. Lokal für alle, Premium bekommt detailliertere Analyse. """ rasse = dog_info.get("rasse", "unbekannt") alter = dog_info.get("alter_jahre", "unbekannt") system = ( "Du bist ein erfahrener Veterinär-Assistent. " "Deine Aufgabe ist eine erste Einschätzung von Symptomen beim Hund. " "Antworte IMMER auf Deutsch und IMMER im angegebenen JSON-Format. " "Überschreite nie deine Kompetenz — weise bei Unsicherheit zum Tierarzt." ) prompt = f""" Hund: {rasse}, {alter} Jahre alt. Symptome: {symptoms} Antworte NUR als JSON: {{ "dringlichkeit": "beobachten" | "tierarzt_heute" | "notfall", "einschaetzung": "Kurze Einschätzung in 1-2 Sätzen", "hinweise": ["Hinweis 1", "Hinweis 2"], "zum_tierarzt_wenn": "Wann unbedingt zum Tierarzt" }} """ result = await complete( prompt, system, max_tokens=400, requires_premium=False, user_is_premium=user_is_premium, json_mode=True, user_id=user_id, ) import json, re # Cloud-Modelle wrappen JSON manchmal in ```json … ``` — stripppen cleaned = re.sub(r"^```(?:json)?\s*|\s*```$", "", result.strip(), flags=re.DOTALL) try: return json.loads(cleaned) except json.JSONDecodeError: return { "dringlichkeit": "tierarzt_heute", "einschaetzung": cleaned, "hinweise": [], "zum_tierarzt_wenn": "Bei Verschlechterung sofort.", } async def training_plan(problem: str, dog_info: dict, user_is_premium: bool = False) -> str: """ Trainingsplan generieren — Premium-Feature. """ system = ( "Du bist ein zertifizierter Hundetrainer. " "Erstelle konkrete, positive, gewaltfreie Trainingspläne auf Deutsch." ) prompt = f""" Hund: {dog_info.get('rasse')}, {dog_info.get('alter_jahre')} Jahre. Problem: {problem} Erstelle einen 2-Wochen-Trainingsplan mit täglichen Übungen (max. 15 Min/Tag). Strukturiert, konkret, motivierend. """ return await complete( prompt, system, max_tokens=800, requires_premium=True, # Cloud-Feature — kostet Geld user_is_premium=user_is_premium, ) async def diary_tags(entry_text: str, user_id: int | None = None) -> list[str]: """ Automatische Tags für Tagebucheinträge — läuft lokal, kostenlos. """ if KI_MODE == "off": return [] prompt = f""" Tagebucheintrag: "{entry_text}" Vergib 2-4 passende Tags aus dieser Liste: training, spaziergang, gesundheit, freunde, spielen, futter, tierarzt, reise, meilenstein, lustig, traurig, wetter, sonstige Antworte NUR mit den Tags, kommagetrennt, keine Erklärung. """ try: result = await complete(prompt, max_tokens=50, requires_premium=False, user_id=user_id) return [t.strip().lower() for t in result.split(",") if t.strip()] except Exception: return [] async def poison_description_check(description: str) -> dict: """ Giftköder-Beschreibung auf Plausibilität und Typ prüfen — lokal, kostenlos. """ if KI_MODE == "off": return {"valid": True, "typ": "unbekannt"} prompt = f""" Giftköder-Meldung: "{description}" Bewerte als JSON: {{ "plausibel": true/false, "typ": "fleisch" | "koeder" | "objekt" | "unbekannt", "hinweis": "Kurzer Hinweis für andere Hundebesitzer (max 1 Satz)" }} """ try: import json result = await complete(prompt, max_tokens=150, json_mode=True) return json.loads(result) except Exception: return {"plausibel": True, "typ": "unbekannt", "hinweis": description} async def health_summary(health_data: list, dog_info: dict, user_is_premium: bool = False, user_id: int | None = None) -> str: """ Tierärztliche Zusammenfassung aller Gesundheitsdaten — lokal für alle. Fasst Impfungen, Besuche, Medikamente etc. in einem lesbaren Bericht zusammen. """ system = ( "Du bist ein erfahrener Veterinär-Assistent. " "Erstelle präzise, strukturierte Gesundheitsberichte für Hunde auf Deutsch. " "Weise auf fällige Impfungen und wichtige Termine hin." ) # Daten nach Typ gruppieren für den Prompt def _fmt(entries, typ): subset = [e for e in entries if e.get("typ") == typ] if not subset: return " (keine Einträge)" lines = [] for e in subset[:5]: # maximal 5 pro Typ — Kontextfenster schonen line = f" - {e.get('datum', '?')}: {e.get('bezeichnung', '?')}" if e.get("naechstes"): line += f" (nächste Fälligkeit: {e['naechstes']})" if e.get("notiz"): line += f" — {e['notiz']}" lines.append(line) return "\n".join(lines) heute = __import__("datetime").date.today().isoformat() prompt = f""" Hund: {dog_info.get('name')}, {dog_info.get('rasse', 'unbekannt')}, Geburtstag: {dog_info.get('geburtstag', 'unbekannt')}, Gewicht: {dog_info.get('gewicht_kg', '?')} kg Heutiges Datum: {heute} === IMPFUNGEN === {_fmt(health_data, 'impfung')} === ENTWURMUNG === {_fmt(health_data, 'entwurmung')} === TIERARZTBESUCHE === {_fmt(health_data, 'tierarzt')} === MEDIKAMENTE === {_fmt(health_data, 'medikament')} === ALLERGIEN === {_fmt(health_data, 'allergie')} Erstelle einen strukturierten Gesundheitsbericht mit: 1. Aktueller Status (2-3 Sätze) 2. Fällige / überfällige Impfungen und Termine 3. Wichtige Hinweise für den nächsten Tierarztbesuch 4. Allgemeine Empfehlungen Schreibe klar und verständlich für den Hundebesitzer. """ return await complete( prompt, system, max_tokens=700, requires_premium=False, user_is_premium=user_is_premium, user_id=user_id, ) # ------------------------------------------------------------------ # Status-Endpoint (für Admin/Debug) # ------------------------------------------------------------------ def status() -> dict: return { "mode": KI_MODE, "local_url": LOCAL_BASE_URL if KI_MODE != "off" else None, "local_model": LOCAL_MODEL if KI_MODE != "off" else None, "cloud_model": CLOUD_MODEL, "cloud_key_set": bool(ANTHROPIC_KEY), }