216 lines
9.5 KiB
Python
216 lines
9.5 KiB
Python
"""
|
||
AI engine for P5G Marvis.
|
||
|
||
Phase 1: rule-based with real network data.
|
||
Phase 2: swap MARVIS_AI_MODE=openai or MARVIS_AI_MODE=ollama to route through LLM.
|
||
"""
|
||
|
||
from datetime import datetime
|
||
from app.config import (
|
||
AI_MODE,
|
||
CONTAINER_RUNTIME,
|
||
OPENAI_API_KEY,
|
||
OPENAI_MODEL,
|
||
OPENAI_BASE_URL,
|
||
OLLAMA_MODEL,
|
||
OLLAMA_URL,
|
||
)
|
||
|
||
|
||
async def answer(query: str, network_state: dict, alerts: list) -> str:
|
||
if AI_MODE == "openai":
|
||
return await _call_openai(query, network_state, alerts)
|
||
if AI_MODE == "ollama":
|
||
return await _call_ollama(query, network_state, alerts)
|
||
return _rule_based(query, network_state, alerts)
|
||
|
||
|
||
# ── Rule-based engine ──────────────────────────────────────────────────────
|
||
|
||
def _rule_based(query: str, network_state: dict, alerts: list) -> str:
|
||
q = query.lower()
|
||
nfs = network_state.get("nfs", [])
|
||
up = [n for n in nfs if n["state"] == "up"]
|
||
down = [n for n in nfs if n["state"] == "down"]
|
||
|
||
if any(w in q for w in ["hello", "hi ", "hey", "howdy"]):
|
||
return ("Hello! I'm **P5G Marvis**, your AI network assistant for HPE Private 5G.\n"
|
||
"Ask me about network health, specific functions, alerts, or performance.")
|
||
|
||
if any(w in q for w in ["help", "what can", "capabilities", "commands", "features"]):
|
||
return (
|
||
"Here's what I can help with:\n\n"
|
||
"• **Network health** — overall P5G status overview\n"
|
||
"• **Network functions** — ask about AMF, SMF, UPF, UDM, NRF, etc.\n"
|
||
"• **Alerts** — active alarms and their severity\n"
|
||
"• **Subscribers** — UE registration and session analysis\n"
|
||
"• **Sessions** — PDU session and data plane health\n\n"
|
||
"_Tip: Connect an LLM by setting `MARVIS_AI_MODE=openai` or `=ollama`._"
|
||
)
|
||
|
||
# Specific NF query
|
||
from app.config import ALL_NFS
|
||
for nf_name in ALL_NFS:
|
||
if nf_name.lower() in q:
|
||
return _nf_detail(nf_name, nfs, alerts)
|
||
|
||
if any(w in q for w in ["alert", "alarm", "warning", "critical", "incident", "problem", "issue"]):
|
||
return _alerts_summary(alerts)
|
||
|
||
if any(w in q for w in ["subscriber", "ue ", "device", "phone", "handset", "registration", "attach"]):
|
||
return _subscriber_analysis(nfs, alerts)
|
||
|
||
if any(w in q for w in ["session", "pdu", "bearer", "user plane", "traffic", "throughput"]):
|
||
return _session_analysis(nfs, alerts)
|
||
|
||
# Default → health summary
|
||
return _health_summary(up, down, alerts)
|
||
|
||
|
||
def _health_summary(up: list, down: list, alerts: list) -> str:
|
||
ts = datetime.now().strftime("%H:%M:%S")
|
||
crit = [a for a in alerts if a.get("severity") == "critical"]
|
||
warn = [a for a in alerts if a.get("severity") != "critical"]
|
||
lines = [f"**P5G Network Health — {ts}**\n"]
|
||
|
||
if up:
|
||
lines.append(f"✅ **{len(up)} UP**: {', '.join(n['name'] for n in up)}")
|
||
if down:
|
||
lines.append(f"🔴 **{len(down)} DOWN**: {', '.join(n['name'] for n in down)}")
|
||
lines.append(f" ⚡ Action: check `{CONTAINER_RUNTIME} logs <nf>` in the runtime host")
|
||
|
||
if alerts:
|
||
lines.append(f"\n⚠️ **{len(alerts)} alert(s)** — {len(crit)} critical, {len(warn)} warning")
|
||
for a in alerts[:4]:
|
||
icon = "🔴" if a.get("severity") == "critical" else "🟡"
|
||
lines.append(f" {icon} {a['name']}: {a.get('summary', a.get('instance', ''))}")
|
||
else:
|
||
lines.append("\n✅ **No active alerts**")
|
||
|
||
if not down and not alerts:
|
||
lines.append("\n🟢 All systems nominal.")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _nf_detail(nf_name: str, nfs: list, alerts: list) -> str:
|
||
nf = next((n for n in nfs if n["name"] == nf_name), None)
|
||
nf_alerts = [a for a in alerts
|
||
if nf_name in a.get("name", "") or nf_name.lower() in a.get("instance", "").lower()]
|
||
|
||
if not nf or nf["state"] == "unknown":
|
||
return (f"ℹ️ No Prometheus data found for **{nf_name}**.\n"
|
||
f"Check: `{CONTAINER_RUNTIME} ps | grep {nf_name.lower()}`")
|
||
|
||
icon = "✅" if nf["state"] == "up" else "🔴"
|
||
lines = [f"{icon} **{nf_name}** is **{nf['state'].upper()}**",
|
||
f"Instance: `{nf.get('instance', 'n/a')}`"]
|
||
if nf_alerts:
|
||
lines.append(f"\n⚠️ {len(nf_alerts)} alert(s) for {nf_name}:")
|
||
for a in nf_alerts:
|
||
lines.append(f" → {a['name']}: {a.get('summary', '')}")
|
||
else:
|
||
lines.append("No active alerts for this function.")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _alerts_summary(alerts: list) -> str:
|
||
if not alerts:
|
||
return "✅ **No active alerts.** Network is running cleanly."
|
||
crit = [a for a in alerts if a.get("severity") == "critical"]
|
||
warn = [a for a in alerts if a.get("severity") != "critical"]
|
||
lines = [f"⚠️ **{len(alerts)} active alert(s)** — {len(crit)} critical, {len(warn)} warning\n"]
|
||
for a in alerts:
|
||
icon = "🔴" if a.get("severity") == "critical" else "🟡"
|
||
lines.append(f"{icon} **{a['name']}**")
|
||
if a.get("summary"):
|
||
lines.append(f" {a['summary']}")
|
||
if a.get("instance"):
|
||
lines.append(f" `{a['instance']}`")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _subscriber_analysis(nfs: list, alerts: list) -> str:
|
||
amf = next((n for n in nfs if n["name"] == "AMF"), None)
|
||
smf = next((n for n in nfs if n["name"] == "SMF"), None)
|
||
lines = ["**Subscriber & Registration Analysis**\n"]
|
||
lines.append(f"AMF (registration/mobility): {'✅ UP' if amf and amf['state'] == 'up' else '🔴 DOWN — subscribers cannot register'}")
|
||
lines.append(f"SMF (session management): {'✅ UP' if smf and smf['state'] == 'up' else '🔴 DOWN — no new data sessions'}")
|
||
sub_alerts = [a for a in alerts if any(k in a.get("name", "").lower()
|
||
for k in ["ue", "subscriber", "session", "attach", "registration"])]
|
||
if sub_alerts:
|
||
lines.append(f"\n⚠️ {len(sub_alerts)} subscriber-related alert(s) active.")
|
||
else:
|
||
lines.append("\nNo subscriber-related alerts detected.")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _session_analysis(nfs: list, alerts: list) -> str:
|
||
smf = next((n for n in nfs if n["name"] == "SMF"), None)
|
||
upf = next((n for n in nfs if n["name"] == "UPF"), None)
|
||
lines = ["**PDU Session & Data Plane Analysis**\n"]
|
||
lines.append(f"SMF: {'✅ UP' if smf and smf['state'] == 'up' else '🔴 DOWN'}")
|
||
lines.append(f"UPF: {'✅ UP' if upf and upf['state'] == 'up' else '🔴 DOWN'}")
|
||
if (not smf or smf["state"] != "up") or (not upf or upf["state"] != "up"):
|
||
lines.append("\n⚡ **Impact**: PDU sessions will fail until both SMF and UPF are operational.")
|
||
else:
|
||
lines.append("\nBoth SMF and UPF operational — sessions should be establishing normally.")
|
||
return "\n".join(lines)
|
||
|
||
|
||
# ── LLM backends ──────────────────────────────────────────────────────────
|
||
|
||
def _build_context(network_state: dict, alerts: list) -> str:
|
||
nfs = network_state.get("nfs", [])
|
||
up = [n["name"] for n in nfs if n["state"] == "up"]
|
||
down = [n["name"] for n in nfs if n["state"] == "down"]
|
||
return (
|
||
f"NFs UP: {', '.join(up) or 'none'}\n"
|
||
f"NFs DOWN: {', '.join(down) or 'none'}\n"
|
||
f"Active alerts: {', '.join(a.get('name','') for a in alerts[:5]) or 'none'}"
|
||
)
|
||
|
||
|
||
async def _call_openai(query: str, network_state: dict, alerts: list) -> str:
|
||
try:
|
||
import httpx
|
||
ctx = _build_context(network_state, alerts)
|
||
messages = [
|
||
{"role": "system", "content":
|
||
f"You are P5G Marvis, an AI network assistant for HPE Private 5G.\n"
|
||
f"Current network state:\n{ctx}\n\nRespond concisely, use markdown."},
|
||
{"role": "user", "content": query},
|
||
]
|
||
base = OPENAI_BASE_URL.rstrip("/")
|
||
headers = {"Content-Type": "application/json"}
|
||
if OPENAI_API_KEY:
|
||
headers["Authorization"] = f"Bearer {OPENAI_API_KEY}"
|
||
# disable cert verification for self-signed local LLM servers
|
||
verify = base.startswith("https://api.openai.com")
|
||
async with httpx.AsyncClient(timeout=120, verify=verify) as client:
|
||
resp = await client.post(
|
||
f"{base}/v1/chat/completions",
|
||
headers=headers,
|
||
json={"model": OPENAI_MODEL, "messages": messages, "max_tokens": 1024},
|
||
)
|
||
msg = resp.json()["choices"][0]["message"]
|
||
# some reasoning models put the answer in content, others in reasoning_content
|
||
return msg.get("content") or msg.get("reasoning_content") or "(empty response)"
|
||
except Exception as e:
|
||
return f"LLM error: {e}\n\n" + _rule_based(query, network_state, alerts)
|
||
|
||
|
||
async def _call_ollama(query: str, network_state: dict, alerts: list) -> str:
|
||
try:
|
||
import httpx
|
||
ctx = _build_context(network_state, alerts)
|
||
prompt = (f"You are P5G Marvis, an AI network assistant.\n"
|
||
f"Network state:\n{ctx}\n\nUser: {query}\nAssistant:")
|
||
async with httpx.AsyncClient(timeout=60) as client:
|
||
resp = await client.post(
|
||
f"{OLLAMA_URL}/api/generate",
|
||
json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False},
|
||
)
|
||
return resp.json().get("response", "No response.")
|
||
except Exception as e:
|
||
return f"Ollama error: {e}\n\n" + _rule_based(query, network_state, alerts)
|