""" AI engine for P5G Marvis. Phase 1: rule-based with real network data. Phase 2: swap MARVIS_AI_MODE=openai or MARVIS_AI_MODE=ollama to route through LLM. """ from datetime import datetime from app.config import ( AI_MODE, CONTAINER_RUNTIME, OPENAI_API_KEY, OPENAI_MODEL, OPENAI_BASE_URL, OLLAMA_MODEL, OLLAMA_URL, ) async def answer(query: str, network_state: dict, alerts: list) -> str: if AI_MODE == "openai": return await _call_openai(query, network_state, alerts) if AI_MODE == "ollama": return await _call_ollama(query, network_state, alerts) return _rule_based(query, network_state, alerts) # ── Rule-based engine ────────────────────────────────────────────────────── def _rule_based(query: str, network_state: dict, alerts: list) -> str: q = query.lower() nfs = network_state.get("nfs", []) up = [n for n in nfs if n["state"] == "up"] down = [n for n in nfs if n["state"] == "down"] if any(w in q for w in ["hello", "hi ", "hey", "howdy"]): return ("Hello! I'm **P5G Marvis**, your AI network assistant for HPE Private 5G.\n" "Ask me about network health, specific functions, alerts, or performance.") if any(w in q for w in ["help", "what can", "capabilities", "commands", "features"]): return ( "Here's what I can help with:\n\n" "• **Network health** — overall P5G status overview\n" "• **Network functions** — ask about AMF, SMF, UPF, UDM, NRF, etc.\n" "• **Alerts** — active alarms and their severity\n" "• **Subscribers** — UE registration and session analysis\n" "• **Sessions** — PDU session and data plane health\n\n" "_Tip: Connect an LLM by setting `MARVIS_AI_MODE=openai` or `=ollama`._" ) # Specific NF query from app.config import ALL_NFS for nf_name in ALL_NFS: if nf_name.lower() in q: return _nf_detail(nf_name, nfs, alerts) if any(w in q for w in ["alert", "alarm", "warning", "critical", "incident", "problem", "issue"]): return _alerts_summary(alerts) if any(w in q for w in ["subscriber", "ue ", "device", "phone", "handset", "registration", "attach"]): return _subscriber_analysis(nfs, alerts) if any(w in q for w in ["session", "pdu", "bearer", "user plane", "traffic", "throughput"]): return _session_analysis(nfs, alerts) # Default → health summary return _health_summary(up, down, alerts) def _health_summary(up: list, down: list, alerts: list) -> str: ts = datetime.now().strftime("%H:%M:%S") crit = [a for a in alerts if a.get("severity") == "critical"] warn = [a for a in alerts if a.get("severity") != "critical"] lines = [f"**P5G Network Health — {ts}**\n"] if up: lines.append(f"✅ **{len(up)} UP**: {', '.join(n['name'] for n in up)}") if down: lines.append(f"🔴 **{len(down)} DOWN**: {', '.join(n['name'] for n in down)}") lines.append(f" ⚡ Action: check `{CONTAINER_RUNTIME} logs ` in the runtime host") if alerts: lines.append(f"\n⚠️ **{len(alerts)} alert(s)** — {len(crit)} critical, {len(warn)} warning") for a in alerts[:4]: icon = "🔴" if a.get("severity") == "critical" else "🟡" lines.append(f" {icon} {a['name']}: {a.get('summary', a.get('instance', ''))}") else: lines.append("\n✅ **No active alerts**") if not down and not alerts: lines.append("\n🟢 All systems nominal.") return "\n".join(lines) def _nf_detail(nf_name: str, nfs: list, alerts: list) -> str: nf = next((n for n in nfs if n["name"] == nf_name), None) nf_alerts = [a for a in alerts if nf_name in a.get("name", "") or nf_name.lower() in a.get("instance", "").lower()] if not nf or nf["state"] == "unknown": return (f"ℹ️ No Prometheus data found for **{nf_name}**.\n" f"Check: `{CONTAINER_RUNTIME} ps | grep {nf_name.lower()}`") icon = "✅" if nf["state"] == "up" else "🔴" lines = [f"{icon} **{nf_name}** is **{nf['state'].upper()}**", f"Instance: `{nf.get('instance', 'n/a')}`"] if nf_alerts: lines.append(f"\n⚠️ {len(nf_alerts)} alert(s) for {nf_name}:") for a in nf_alerts: lines.append(f" → {a['name']}: {a.get('summary', '')}") else: lines.append("No active alerts for this function.") return "\n".join(lines) def _alerts_summary(alerts: list) -> str: if not alerts: return "✅ **No active alerts.** Network is running cleanly." crit = [a for a in alerts if a.get("severity") == "critical"] warn = [a for a in alerts if a.get("severity") != "critical"] lines = [f"⚠️ **{len(alerts)} active alert(s)** — {len(crit)} critical, {len(warn)} warning\n"] for a in alerts: icon = "🔴" if a.get("severity") == "critical" else "🟡" lines.append(f"{icon} **{a['name']}**") if a.get("summary"): lines.append(f" {a['summary']}") if a.get("instance"): lines.append(f" `{a['instance']}`") return "\n".join(lines) def _subscriber_analysis(nfs: list, alerts: list) -> str: amf = next((n for n in nfs if n["name"] == "AMF"), None) smf = next((n for n in nfs if n["name"] == "SMF"), None) lines = ["**Subscriber & Registration Analysis**\n"] lines.append(f"AMF (registration/mobility): {'✅ UP' if amf and amf['state'] == 'up' else '🔴 DOWN — subscribers cannot register'}") lines.append(f"SMF (session management): {'✅ UP' if smf and smf['state'] == 'up' else '🔴 DOWN — no new data sessions'}") sub_alerts = [a for a in alerts if any(k in a.get("name", "").lower() for k in ["ue", "subscriber", "session", "attach", "registration"])] if sub_alerts: lines.append(f"\n⚠️ {len(sub_alerts)} subscriber-related alert(s) active.") else: lines.append("\nNo subscriber-related alerts detected.") return "\n".join(lines) def _session_analysis(nfs: list, alerts: list) -> str: smf = next((n for n in nfs if n["name"] == "SMF"), None) upf = next((n for n in nfs if n["name"] == "UPF"), None) lines = ["**PDU Session & Data Plane Analysis**\n"] lines.append(f"SMF: {'✅ UP' if smf and smf['state'] == 'up' else '🔴 DOWN'}") lines.append(f"UPF: {'✅ UP' if upf and upf['state'] == 'up' else '🔴 DOWN'}") if (not smf or smf["state"] != "up") or (not upf or upf["state"] != "up"): lines.append("\n⚡ **Impact**: PDU sessions will fail until both SMF and UPF are operational.") else: lines.append("\nBoth SMF and UPF operational — sessions should be establishing normally.") return "\n".join(lines) # ── LLM backends ────────────────────────────────────────────────────────── def _build_context(network_state: dict, alerts: list) -> str: nfs = network_state.get("nfs", []) up = [n["name"] for n in nfs if n["state"] == "up"] down = [n["name"] for n in nfs if n["state"] == "down"] return ( f"NFs UP: {', '.join(up) or 'none'}\n" f"NFs DOWN: {', '.join(down) or 'none'}\n" f"Active alerts: {', '.join(a.get('name','') for a in alerts[:5]) or 'none'}" ) async def _call_openai(query: str, network_state: dict, alerts: list) -> str: try: import httpx ctx = _build_context(network_state, alerts) messages = [ {"role": "system", "content": f"You are P5G Marvis, an AI network assistant for HPE Private 5G.\n" f"Current network state:\n{ctx}\n\nRespond concisely, use markdown."}, {"role": "user", "content": query}, ] base = OPENAI_BASE_URL.rstrip("/") headers = {"Content-Type": "application/json"} if OPENAI_API_KEY: headers["Authorization"] = f"Bearer {OPENAI_API_KEY}" # disable cert verification for self-signed local LLM servers verify = base.startswith("https://api.openai.com") async with httpx.AsyncClient(timeout=120, verify=verify) as client: resp = await client.post( f"{base}/v1/chat/completions", headers=headers, json={"model": OPENAI_MODEL, "messages": messages, "max_tokens": 1024}, ) msg = resp.json()["choices"][0]["message"] # some reasoning models put the answer in content, others in reasoning_content return msg.get("content") or msg.get("reasoning_content") or "(empty response)" except Exception as e: return f"LLM error: {e}\n\n" + _rule_based(query, network_state, alerts) async def _call_ollama(query: str, network_state: dict, alerts: list) -> str: try: import httpx ctx = _build_context(network_state, alerts) prompt = (f"You are P5G Marvis, an AI network assistant.\n" f"Network state:\n{ctx}\n\nUser: {query}\nAssistant:") async with httpx.AsyncClient(timeout=60) as client: resp = await client.post( f"{OLLAMA_URL}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False}, ) return resp.json().get("response", "No response.") except Exception as e: return f"Ollama error: {e}\n\n" + _rule_based(query, network_state, alerts)