Files
p5g-marvis/app/services/ai.py
2026-04-24 12:33:52 -04:00

267 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AI engine for P5G Marvis.
Phase 1: rule-based with real network data.
Phase 2: swap MARVIS_AI_MODE=openai or MARVIS_AI_MODE=ollama to route through LLM.
"""
from datetime import datetime
from app.config import (
AI_MODE,
CONTAINER_RUNTIME,
OPENAI_API_KEY,
OPENAI_MODEL,
OPENAI_BASE_URL,
OLLAMA_MODEL,
OLLAMA_URL,
)
async def answer(query: str, network_state: dict, alerts: list) -> str:
if AI_MODE == "openai":
return await _call_openai(query, network_state, alerts)
if AI_MODE == "ollama":
return await _call_ollama(query, network_state, alerts)
return _rule_based(query, network_state, alerts)
# ── Rule-based engine ──────────────────────────────────────────────────────
def _rule_based(query: str, network_state: dict, alerts: list) -> str:
q = query.lower()
nfs = network_state.get("nfs", [])
cluster = network_state.get("cluster", {})
up = [n for n in nfs if n["state"] == "up"]
down = [n for n in nfs if n["state"] == "down"]
if any(w in q for w in ["hello", "hi ", "hey", "howdy"]):
return ("Hello! I'm **P5G Marvis**, your AI network assistant for HPE Private 5G.\n"
"Ask me about network health, specific functions, alerts, or performance.")
if any(w in q for w in ["help", "what can", "capabilities", "commands", "features"]):
return (
"Here's what I can help with:\n\n"
"• **Network health** — overall P5G status overview\n"
"• **Network functions** — ask about AMF, SMF, UPF, UDM, NRF, etc.\n"
"• **Alerts** — active alarms and their severity\n"
"• **Subscribers** — UE registration and session analysis\n"
"• **Sessions** — PDU session and data plane health\n\n"
"_Tip: Connect an LLM by setting `MARVIS_AI_MODE=openai` or `=ollama`._"
)
# Specific NF query
from app.config import ALL_NFS
for nf_name in ALL_NFS:
if nf_name.lower() in q:
return _nf_detail(nf_name, nfs, alerts)
if any(w in q for w in ["alert", "alarm", "warning", "critical", "incident", "problem", "issue"]):
return _alerts_summary(alerts)
if any(w in q for w in ["subscriber", "ue ", "device", "phone", "handset", "registration", "attach"]):
return _subscriber_analysis(nfs, alerts, cluster)
if any(w in q for w in ["session", "pdu", "bearer", "user plane", "traffic", "throughput"]):
return _session_analysis(nfs, alerts, cluster)
# Default → health summary
return _health_summary(up, down, alerts, cluster)
def _health_summary(up: list, down: list, alerts: list, cluster: dict) -> str:
ts = datetime.now().strftime("%H:%M:%S")
crit = [a for a in alerts if a.get("severity") == "critical"]
warn = [a for a in alerts if a.get("severity") != "critical"]
lines = [f"**P5G Network Health — {ts}**\n"]
nodes = cluster.get("nodes", [])
if up:
lines.append(f"✅ **{len(up)} UP**: {', '.join(_nf_label(n) for n in up)}")
if down:
lines.append(f"🔴 **{len(down)} DOWN**: {', '.join(_nf_label(n) for n in down)}")
lines.append(" ⚡ Action: inspect the node shown for each affected NF before pulling logs.")
if nodes:
lines.append(f"\n**Cluster nodes ({len(nodes)})**")
for node in nodes:
running = [nf["name"] for nf in node.get("nfs", []) if nf.get("state") == "up"]
down_nfs = [nf["name"] for nf in node.get("nfs", []) if nf.get("state") == "down"]
role = node.get("role", "AP")
lines.append(
f"• **{node['hostname']}** ({role}{', local' if node.get('current') else ''})"
f" — running: {', '.join(running) or 'none'}"
)
if down_nfs:
lines.append(f" down here: {', '.join(down_nfs)}")
if alerts:
lines.append(f"\n⚠️ **{len(alerts)} alert(s)** — {len(crit)} critical, {len(warn)} warning")
for a in alerts[:4]:
icon = "🔴" if a.get("severity") == "critical" else "🟡"
lines.append(f" {icon} {a['name']}: {a.get('summary', a.get('instance', ''))}")
else:
lines.append("\n✅ **No active alerts**")
if not down and not alerts:
lines.append("\n🟢 All systems nominal.")
return "\n".join(lines)
def _nf_detail(nf_name: str, nfs: list, alerts: list) -> str:
nf = next((n for n in nfs if n["name"] == nf_name), None)
nf_alerts = [a for a in alerts
if nf_name in a.get("name", "") or nf_name.lower() in a.get("instance", "").lower()]
if not nf or nf["state"] == "unknown":
return (f" No Prometheus data found for **{nf_name}**.\n"
f"Check: `{CONTAINER_RUNTIME} ps | grep {nf_name.lower()}`")
icon = "" if nf["state"] == "up" else "🔴"
placements = nf.get("nodes", [])
lines = [f"{icon} **{nf_name}** is **{nf['state'].upper()}**"]
if placements:
node_text = ", ".join(
f"{node['hostname']} ({'/'.join(node.get('roles', []))})"
for node in placements
)
lines.append(f"Nodes: {node_text}")
lines.append(f"Instance: `{nf.get('instance', 'n/a')}`")
if nf_alerts:
lines.append(f"\n⚠️ {len(nf_alerts)} alert(s) for {nf_name}:")
for a in nf_alerts:
lines.append(f"{a['name']}: {a.get('summary', '')}")
else:
lines.append("No active alerts for this function.")
return "\n".join(lines)
def _alerts_summary(alerts: list) -> str:
if not alerts:
return "✅ **No active alerts.** Network is running cleanly."
crit = [a for a in alerts if a.get("severity") == "critical"]
warn = [a for a in alerts if a.get("severity") != "critical"]
lines = [f"⚠️ **{len(alerts)} active alert(s)** — {len(crit)} critical, {len(warn)} warning\n"]
for a in alerts:
icon = "🔴" if a.get("severity") == "critical" else "🟡"
lines.append(f"{icon} **{a['name']}**")
if a.get("summary"):
lines.append(f" {a['summary']}")
if a.get("instance"):
lines.append(f" `{a['instance']}`")
return "\n".join(lines)
def _subscriber_analysis(nfs: list, alerts: list, cluster: dict) -> str:
amf = next((n for n in nfs if n["name"] == "AMF"), None)
smf = next((n for n in nfs if n["name"] == "SMF"), None)
lines = ["**Subscriber & Registration Analysis**\n"]
lines.append(f"AMF (registration/mobility): {_nf_sentence(amf, 'subscribers cannot register')}")
lines.append(f"SMF (session management): {_nf_sentence(smf, 'no new data sessions')}")
sub_alerts = [a for a in alerts if any(k in a.get("name", "").lower()
for k in ["ue", "subscriber", "session", "attach", "registration"])]
if sub_alerts:
lines.append(f"\n⚠️ {len(sub_alerts)} subscriber-related alert(s) active.")
else:
lines.append("\nNo subscriber-related alerts detected.")
lines.append(_cluster_scope(cluster))
return "\n".join(lines)
def _session_analysis(nfs: list, alerts: list, cluster: dict) -> str:
smf = next((n for n in nfs if n["name"] == "SMF"), None)
upf = next((n for n in nfs if n["name"] == "UPF"), None)
lines = ["**PDU Session & Data Plane Analysis**\n"]
lines.append(f"SMF: {_nf_sentence(smf, 'session setup is blocked')}")
lines.append(f"UPF: {_nf_sentence(upf, 'user-plane forwarding is blocked')}")
if (not smf or smf["state"] != "up") or (not upf or upf["state"] != "up"):
lines.append("\n⚡ **Impact**: PDU sessions will fail until both SMF and UPF are operational.")
else:
lines.append("\nBoth SMF and UPF operational — sessions should be establishing normally.")
lines.append(_cluster_scope(cluster))
return "\n".join(lines)
def _nf_label(nf: dict) -> str:
placements = nf.get("nodes", [])
if not placements:
return nf["name"]
return f"{nf['name']} on {', '.join(node['hostname'] for node in placements)}"
def _nf_sentence(nf: dict | None, impact: str) -> str:
if not nf:
return "○ N/A"
if nf.get("state") == "up":
nodes = ", ".join(node["hostname"] for node in nf.get("nodes", [])) or nf.get("instance", "unknown host")
return f"✅ UP on {nodes}"
return f"🔴 DOWN — {impact}"
def _cluster_scope(cluster: dict) -> str:
nodes = cluster.get("nodes", [])
if not nodes:
return "\nCluster discovery is not available."
details = ", ".join(f"{node['hostname']} ({node.get('role', 'AP')})" for node in nodes)
return f"\nCluster scope checked: {details}"
# ── LLM backends ──────────────────────────────────────────────────────────
def _build_context(network_state: dict, alerts: list) -> str:
nfs = network_state.get("nfs", [])
up = [n["name"] for n in nfs if n["state"] == "up"]
down = [n["name"] for n in nfs if n["state"] == "down"]
nodes = network_state.get("cluster", {}).get("nodes", [])
node_summary = ", ".join(f"{node['hostname']} ({node.get('role', 'AP')})" for node in nodes) or "none"
return (
f"NFs UP: {', '.join(up) or 'none'}\n"
f"NFs DOWN: {', '.join(down) or 'none'}\n"
f"Cluster nodes: {node_summary}\n"
f"Active alerts: {', '.join(a.get('name','') for a in alerts[:5]) or 'none'}"
)
async def _call_openai(query: str, network_state: dict, alerts: list) -> str:
try:
import httpx
ctx = _build_context(network_state, alerts)
messages = [
{"role": "system", "content":
f"You are P5G Marvis, an AI network assistant for HPE Private 5G.\n"
f"Current network state:\n{ctx}\n\nRespond concisely, use markdown."},
{"role": "user", "content": query},
]
base = OPENAI_BASE_URL.rstrip("/")
headers = {"Content-Type": "application/json"}
if OPENAI_API_KEY:
headers["Authorization"] = f"Bearer {OPENAI_API_KEY}"
# disable cert verification for self-signed local LLM servers
verify = base.startswith("https://api.openai.com")
async with httpx.AsyncClient(timeout=120, verify=verify) as client:
resp = await client.post(
f"{base}/v1/chat/completions",
headers=headers,
json={"model": OPENAI_MODEL, "messages": messages, "max_tokens": 1024},
)
msg = resp.json()["choices"][0]["message"]
# some reasoning models put the answer in content, others in reasoning_content
return msg.get("content") or msg.get("reasoning_content") or "(empty response)"
except Exception as e:
return f"LLM error: {e}\n\n" + _rule_based(query, network_state, alerts)
async def _call_ollama(query: str, network_state: dict, alerts: list) -> str:
try:
import httpx
ctx = _build_context(network_state, alerts)
prompt = (f"You are P5G Marvis, an AI network assistant.\n"
f"Network state:\n{ctx}\n\nUser: {query}\nAssistant:")
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(
f"{OLLAMA_URL}/api/generate",
json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False},
)
return resp.json().get("response", "No response.")
except Exception as e:
return f"Ollama error: {e}\n\n" + _rule_based(query, network_state, alerts)