Initial commit from Martins Github

This commit is contained in:
Jake Kasper
2026-04-23 13:50:31 -05:00
parent 488a0d01ef
commit 3228db3097
30 changed files with 4377 additions and 1 deletions

0
app/services/__init__.py Normal file
View File

207
app/services/ai.py Normal file
View File

@@ -0,0 +1,207 @@
"""
AI engine for P5G Marvis.
Phase 1: rule-based with real network data.
Phase 2: swap MARVIS_AI_MODE=openai or MARVIS_AI_MODE=ollama to route through LLM.
"""
from datetime import datetime
from app.config import AI_MODE, OPENAI_API_KEY, OPENAI_MODEL, OPENAI_BASE_URL, OLLAMA_URL, OLLAMA_MODEL
async def answer(query: str, network_state: dict, alerts: list) -> str:
if AI_MODE == "openai":
return await _call_openai(query, network_state, alerts)
if AI_MODE == "ollama":
return await _call_ollama(query, network_state, alerts)
return _rule_based(query, network_state, alerts)
# ── Rule-based engine ──────────────────────────────────────────────────────
def _rule_based(query: str, network_state: dict, alerts: list) -> str:
q = query.lower()
nfs = network_state.get("nfs", [])
up = [n for n in nfs if n["state"] == "up"]
down = [n for n in nfs if n["state"] == "down"]
if any(w in q for w in ["hello", "hi ", "hey", "howdy"]):
return ("Hello! I'm **P5G Marvis**, your AI network assistant for HPE Private 5G.\n"
"Ask me about network health, specific functions, alerts, or performance.")
if any(w in q for w in ["help", "what can", "capabilities", "commands", "features"]):
return (
"Here's what I can help with:\n\n"
"• **Network health** — overall P5G status overview\n"
"• **Network functions** — ask about AMF, SMF, UPF, UDM, NRF, etc.\n"
"• **Alerts** — active alarms and their severity\n"
"• **Subscribers** — UE registration and session analysis\n"
"• **Sessions** — PDU session and data plane health\n\n"
"_Tip: Connect an LLM by setting `MARVIS_AI_MODE=openai` or `=ollama`._"
)
# Specific NF query
from app.config import ALL_NFS
for nf_name in ALL_NFS:
if nf_name.lower() in q:
return _nf_detail(nf_name, nfs, alerts)
if any(w in q for w in ["alert", "alarm", "warning", "critical", "incident", "problem", "issue"]):
return _alerts_summary(alerts)
if any(w in q for w in ["subscriber", "ue ", "device", "phone", "handset", "registration", "attach"]):
return _subscriber_analysis(nfs, alerts)
if any(w in q for w in ["session", "pdu", "bearer", "user plane", "traffic", "throughput"]):
return _session_analysis(nfs, alerts)
# Default → health summary
return _health_summary(up, down, alerts)
def _health_summary(up: list, down: list, alerts: list) -> str:
ts = datetime.now().strftime("%H:%M:%S")
crit = [a for a in alerts if a.get("severity") == "critical"]
warn = [a for a in alerts if a.get("severity") != "critical"]
lines = [f"**P5G Network Health — {ts}**\n"]
if up:
lines.append(f"✅ **{len(up)} UP**: {', '.join(n['name'] for n in up)}")
if down:
lines.append(f"🔴 **{len(down)} DOWN**: {', '.join(n['name'] for n in down)}")
lines.append(f" ⚡ Action: check `podman logs <nf>` on the VM")
if alerts:
lines.append(f"\n⚠️ **{len(alerts)} alert(s)** — {len(crit)} critical, {len(warn)} warning")
for a in alerts[:4]:
icon = "🔴" if a.get("severity") == "critical" else "🟡"
lines.append(f" {icon} {a['name']}: {a.get('summary', a.get('instance', ''))}")
else:
lines.append("\n✅ **No active alerts**")
if not down and not alerts:
lines.append("\n🟢 All systems nominal.")
return "\n".join(lines)
def _nf_detail(nf_name: str, nfs: list, alerts: list) -> str:
nf = next((n for n in nfs if n["name"] == nf_name), None)
nf_alerts = [a for a in alerts
if nf_name in a.get("name", "") or nf_name.lower() in a.get("instance", "").lower()]
if not nf or nf["state"] == "unknown":
return (f" No Prometheus data found for **{nf_name}**.\n"
f"Check: `podman ps | grep {nf_name.lower()}`")
icon = "" if nf["state"] == "up" else "🔴"
lines = [f"{icon} **{nf_name}** is **{nf['state'].upper()}**",
f"Instance: `{nf.get('instance', 'n/a')}`"]
if nf_alerts:
lines.append(f"\n⚠️ {len(nf_alerts)} alert(s) for {nf_name}:")
for a in nf_alerts:
lines.append(f"{a['name']}: {a.get('summary', '')}")
else:
lines.append("No active alerts for this function.")
return "\n".join(lines)
def _alerts_summary(alerts: list) -> str:
if not alerts:
return "✅ **No active alerts.** Network is running cleanly."
crit = [a for a in alerts if a.get("severity") == "critical"]
warn = [a for a in alerts if a.get("severity") != "critical"]
lines = [f"⚠️ **{len(alerts)} active alert(s)** — {len(crit)} critical, {len(warn)} warning\n"]
for a in alerts:
icon = "🔴" if a.get("severity") == "critical" else "🟡"
lines.append(f"{icon} **{a['name']}**")
if a.get("summary"):
lines.append(f" {a['summary']}")
if a.get("instance"):
lines.append(f" `{a['instance']}`")
return "\n".join(lines)
def _subscriber_analysis(nfs: list, alerts: list) -> str:
amf = next((n for n in nfs if n["name"] == "AMF"), None)
smf = next((n for n in nfs if n["name"] == "SMF"), None)
lines = ["**Subscriber & Registration Analysis**\n"]
lines.append(f"AMF (registration/mobility): {'✅ UP' if amf and amf['state'] == 'up' else '🔴 DOWN — subscribers cannot register'}")
lines.append(f"SMF (session management): {'✅ UP' if smf and smf['state'] == 'up' else '🔴 DOWN — no new data sessions'}")
sub_alerts = [a for a in alerts if any(k in a.get("name", "").lower()
for k in ["ue", "subscriber", "session", "attach", "registration"])]
if sub_alerts:
lines.append(f"\n⚠️ {len(sub_alerts)} subscriber-related alert(s) active.")
else:
lines.append("\nNo subscriber-related alerts detected.")
return "\n".join(lines)
def _session_analysis(nfs: list, alerts: list) -> str:
smf = next((n for n in nfs if n["name"] == "SMF"), None)
upf = next((n for n in nfs if n["name"] == "UPF"), None)
lines = ["**PDU Session & Data Plane Analysis**\n"]
lines.append(f"SMF: {'✅ UP' if smf and smf['state'] == 'up' else '🔴 DOWN'}")
lines.append(f"UPF: {'✅ UP' if upf and upf['state'] == 'up' else '🔴 DOWN'}")
if (not smf or smf["state"] != "up") or (not upf or upf["state"] != "up"):
lines.append("\n⚡ **Impact**: PDU sessions will fail until both SMF and UPF are operational.")
else:
lines.append("\nBoth SMF and UPF operational — sessions should be establishing normally.")
return "\n".join(lines)
# ── LLM backends ──────────────────────────────────────────────────────────
def _build_context(network_state: dict, alerts: list) -> str:
nfs = network_state.get("nfs", [])
up = [n["name"] for n in nfs if n["state"] == "up"]
down = [n["name"] for n in nfs if n["state"] == "down"]
return (
f"NFs UP: {', '.join(up) or 'none'}\n"
f"NFs DOWN: {', '.join(down) or 'none'}\n"
f"Active alerts: {', '.join(a.get('name','') for a in alerts[:5]) or 'none'}"
)
async def _call_openai(query: str, network_state: dict, alerts: list) -> str:
try:
import httpx
ctx = _build_context(network_state, alerts)
messages = [
{"role": "system", "content":
f"You are P5G Marvis, an AI network assistant for HPE Private 5G.\n"
f"Current network state:\n{ctx}\n\nRespond concisely, use markdown."},
{"role": "user", "content": query},
]
base = OPENAI_BASE_URL.rstrip("/")
headers = {"Content-Type": "application/json"}
if OPENAI_API_KEY:
headers["Authorization"] = f"Bearer {OPENAI_API_KEY}"
# disable cert verification for self-signed local LLM servers
verify = base.startswith("https://api.openai.com")
async with httpx.AsyncClient(timeout=120, verify=verify) as client:
resp = await client.post(
f"{base}/v1/chat/completions",
headers=headers,
json={"model": OPENAI_MODEL, "messages": messages, "max_tokens": 1024},
)
msg = resp.json()["choices"][0]["message"]
# some reasoning models put the answer in content, others in reasoning_content
return msg.get("content") or msg.get("reasoning_content") or "(empty response)"
except Exception as e:
return f"LLM error: {e}\n\n" + _rule_based(query, network_state, alerts)
async def _call_ollama(query: str, network_state: dict, alerts: list) -> str:
try:
import httpx
ctx = _build_context(network_state, alerts)
prompt = (f"You are P5G Marvis, an AI network assistant.\n"
f"Network state:\n{ctx}\n\nUser: {query}\nAssistant:")
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(
f"{OLLAMA_URL}/api/generate",
json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False},
)
return resp.json().get("response", "No response.")
except Exception as e:
return f"Ollama error: {e}\n\n" + _rule_based(query, network_state, alerts)

View File

@@ -0,0 +1,29 @@
"""Alertmanager client."""
import httpx
from app.config import ALERTMANAGER_URL
_BASE = ALERTMANAGER_URL.rstrip("/")
async def get_alerts() -> list:
"""Return normalised list of active alerts from Alertmanager."""
try:
async with httpx.AsyncClient(timeout=5) as client:
r = await client.get(f"{_BASE}/api/v2/alerts", params={"active": "true", "silenced": "false"})
r.raise_for_status()
raw = r.json()
except Exception:
return []
alerts = []
for a in raw:
labels = a.get("labels", {})
annotations = a.get("annotations", {})
alerts.append({
"name": labels.get("alertname", "Unknown"),
"severity": labels.get("severity", "warning"),
"instance": labels.get("instance", ""),
"summary": annotations.get("summary", annotations.get("description", "")),
})
return alerts

View File

@@ -0,0 +1,338 @@
"""
log_analyzer.py — Reads P5G NF container logs and active Prometheus/Alertmanager
data to produce a structured list of recommended remediation actions, grouped
by category. This is the data backend powering the /api/actions endpoint.
"""
import asyncio
import re
import time
from collections import deque
from datetime import datetime
# ── In-memory history (up to 96 snapshots ≈ 48 min at 30 s refresh) ────────
_history: deque = deque(maxlen=96)
# ── Category colour palette ──────────────────────────────────────────────────
CATEGORY_COLORS: dict[str, str] = {
"Registration": "#3b82f6",
"Sessions": "#7c3aed",
"Authentication": "#f59e0b",
"Connectivity": "#06b6d4",
"Policy": "#10b981",
"Security": "#ef4444",
}
# All categories in canonical display order (left side, right side)
ALL_CATEGORIES = ["Registration", "Authentication", "Security",
"Sessions", "Connectivity", "Policy"]
# ── Log-pattern definitions ──────────────────────────────────────────────────
# Each entry: (regex, affected_nf, severity, short_description, remediation)
CATEGORY_PATTERNS: dict[str, list[tuple]] = {
"Registration": [
(r"RegistrationFailure|UeRegistrationFailed|N1.*[Rr]egistration.*[Ff]ail",
"AMF", "critical",
"UE registration failure",
"Check AMF logs for NGAP errors; verify UE credentials and NRF registration."),
(r"N2SetupFail|NgapSetupFail|N2.*[Tt]imeout|NgapProcedure.*failed",
"AMF", "critical",
"N2 interface setup failure",
"Verify gNB connectivity to AMF; check SCTP transport and NGAP PLMN config."),
(r"InitialContextSetupFail|UeContextRelease.*[Aa]bnormal",
"AMF", "warning",
"UE context setup failure",
"Review AMF-SMF N11 interface; check subscriber profile in UDM/UDR."),
(r"PagingFail|UeUnreachable|UeNotFound",
"AMF", "warning",
"UE paging failure",
"Verify UE is registered; check AMF tracking area configuration."),
],
"Sessions": [
(r"PduSessionEstablishmentReject|PduSession.*[Ff]ail|CreateSessionResponse.*[Ff]ail",
"SMF", "critical",
"PDU session establishment failure",
"Check SMF-UPF N4 path; verify DNN/APN config and UPF N3/N9 interfaces."),
(r"N4Session.*[Ff]ail|PfcpSession.*[Ee]rror|N4.*[Tt]imeout|PfcpAssociation.*[Ff]ail",
"UPF", "critical",
"N4/PFCP session error",
"Restart PFCP association between SMF and UPF; check N4 IP reachability."),
(r"IpAllocationFail|AddressPoolExhausted|NoIpAvailable",
"SMF", "critical",
"IP address pool exhausted",
"Expand UE IP address pool in SMF config; review active session count."),
(r"SessionModification.*[Ff]ail|BearerModification.*[Ee]rror",
"SMF", "warning",
"Session modification failure",
"Check PCF policy consistency; verify QoS parameters match UPF capabilities."),
],
"Authentication": [
(r"AuthenticationFailure|AuthReject|EapFailure|5g-aka.*[Ff]ail|EapAkaFailure",
"AUSF", "critical",
"UE authentication failure",
"Verify USIM credentials match UDM subscriber data; check AUSF-UDM N12 link."),
(r"UdmAuthReq.*[Ee]rror|SuciDeconceal.*[Ff]ail|UdmUeAuth.*[Ee]rror",
"UDM", "critical",
"UDM authentication error",
"Check UDM-UDR N35 connectivity; verify Home Network Public Key configuration."),
(r"AuthVectorFetch.*[Ff]ail|AusfUeAuth.*[Rr]eject|HssAuth.*[Ff]ail",
"AUSF", "warning",
"Auth vector fetch failure",
"Review UDR data integrity for affected SUPI; check AUSF-UDM TLS certificates."),
],
"Connectivity": [
(r"NfDiscovery.*[Ff]ail|NrfRegistration.*[Ff]ail|NfDeregistration.*unexpect",
"NRF", "warning",
"NF service discovery failure",
"Verify NRF is reachable from all NFs; check NRF registration TTL and heartbeat."),
(r"ServiceUnavailable.*NF|HTTP.*503.*NF|NfProfile.*expired",
"NRF", "warning",
"NF service unavailable",
"Check NF pod health and SBI listen port; review NRF subscription notifications."),
(r"SbiRequest.*[Tt]imeout|SbiConn.*[Ff]ail|Http2.*[Ee]rror",
"NRF", "warning",
"SBI interface timeout",
"Inspect inter-NF network MTU and TLS handshake; check load balancer config."),
],
"Policy": [
(r"PcfSmPolicy.*[Ee]rror|PolicyDecision.*[Ff]ail|SmPolicy.*[Rr]eject",
"PCF", "warning",
"Policy decision failure",
"Review PCF policy rules and subscriber group config; check PCF-UDR N36 link."),
(r"QosEnforce.*[Ff]ail|ChargingRule.*[Ee]rror|PccRule.*[Rr]eject",
"PCF", "warning",
"QoS policy enforcement failure",
"Verify QoS profiles match UPF capabilities; check PCF-CHF N40 charging path."),
],
"Security": [
(r"SecurityMode.*[Ff]ail|IntegrityCheck.*[Ff]ail|NasIntegrity.*[Ee]rror",
"AMF", "critical",
"NAS security mode failure",
"Check AMF cipher/integrity algorithm priority list matches UE capabilities."),
(r"TlsHandshake.*[Ff]ail|Certificate.*[Ee]xpir|x509.*[Ee]rror|CertVerify.*[Ff]ail",
"AMF", "critical",
"TLS/certificate error",
"Renew expired certificates; verify trust chain between NFs; check SBI TLS config."),
(r"SuciProtection.*[Ff]ail|PrivacyProtection.*[Ee]rror|HomeNetworkKey.*[Ee]rror",
"UDM", "warning",
"SUCI privacy protection error",
"Verify Home Network Public Key provisioning on UDM; check SUPI revealing config."),
],
}
# ── NF → possible container name fragments (tried in order) ─────────────────
NF_CONTAINER_HINTS: dict[str, list[str]] = {
"AMF": ["amf"],
"SMF": ["smf"],
"UPF": ["upf"],
"NRF": ["nrf"],
"UDM": ["udm"],
"AUSF": ["ausf"],
"PCF": ["pcf"],
}
# ── Container discovery cache ────────────────────────────────────────────────
_container_cache: dict[str, str] = {}
_container_cache_ts: float = 0.0
async def _discover_containers() -> dict[str, str]:
"""Run `podman ps` and map NF names to actual container names."""
global _container_cache, _container_cache_ts
now = time.monotonic()
if _container_cache and now - _container_cache_ts < 60:
return _container_cache
try:
proc = await asyncio.create_subprocess_exec(
"podman", "ps", "--format", "{{.Names}}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
names = [n.strip() for n in stdout.decode().splitlines() if n.strip()]
except Exception:
names = []
mapping: dict[str, str] = {}
for nf, hints in NF_CONTAINER_HINTS.items():
for hint in hints:
match = next((n for n in names if hint in n.lower()), None)
if match:
mapping[nf] = match
break
_container_cache = mapping
_container_cache_ts = now
return mapping
async def _read_logs(container: str, tail: int = 400) -> str:
"""Read recent logs from a podman container (stdout + stderr)."""
try:
proc = await asyncio.create_subprocess_exec(
"podman", "logs", "--tail", str(tail), container,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=8)
return (stdout.decode("utf-8", errors="replace") +
stderr.decode("utf-8", errors="replace"))
except Exception:
return ""
def _match_count(text: str, pattern: str) -> int:
if not text:
return 0
try:
return len(re.findall(pattern, text, re.IGNORECASE | re.MULTILINE))
except re.error:
return 0
# ── Category/NF mapping for Alertmanager alerts ──────────────────────────────
def _alert_category(alert: dict) -> str:
name = (alert.get("name", "") + " " + alert.get("summary", "")).lower()
if any(k in name for k in ["register", "attach", "ngap", "n2"]):
return "Registration"
if any(k in name for k in ["session", "pdu", "bearer", "smf_", "upf_", "n4", "pfcp"]):
return "Sessions"
if any(k in name for k in ["auth", "ausf", "udm_", "supi", "aka", "eap"]):
return "Authentication"
if any(k in name for k in ["nrf", "discovery", "unavailable", "sbi", "connect"]):
return "Connectivity"
if any(k in name for k in ["pcf", "policy", "qos", "pcc", "charge"]):
return "Policy"
if any(k in name for k in ["tls", "cert", "security", "cipher", "integ", "suci"]):
return "Security"
return "Connectivity"
def _alert_nf(alert: dict) -> str:
from app.config import ALL_NFS
text = (alert.get("name", "") + alert.get("instance", "")).lower()
for nf in ALL_NFS:
if nf.lower() in text:
return nf
return "System"
# ── Main analysis entry point ────────────────────────────────────────────────
async def analyze_logs() -> dict:
"""
Gather log-pattern issues + Prometheus NF status + Alertmanager alerts.
Returns a fully structured dict ready for JSON serialisation.
"""
from app.services import alertmanager, prometheus
# Kick off all I/O in parallel
containers_f = asyncio.create_task(_discover_containers())
alerts_f = asyncio.create_task(alertmanager.get_alerts())
nf_status_f = asyncio.create_task(prometheus.get_nf_status())
containers = await containers_f
alerts, nf_statuses = await asyncio.gather(alerts_f, nf_status_f,
return_exceptions=True)
if isinstance(alerts, Exception):
alerts = []
if isinstance(nf_statuses, Exception):
nf_statuses = []
# Read all container logs concurrently
log_tasks = {nf: asyncio.create_task(_read_logs(cname))
for nf, cname in containers.items()}
log_texts: dict[str, str] = {}
if log_tasks:
log_results = await asyncio.gather(*log_tasks.values(), return_exceptions=True)
for nf, result in zip(log_tasks.keys(), log_results):
log_texts[nf] = result if isinstance(result, str) else ""
issues: list[dict] = []
# 1. Log-pattern analysis
for category, patterns in CATEGORY_PATTERNS.items():
for (pat_re, nf, severity, description, remediation) in patterns:
count = _match_count(log_texts.get(nf, ""), pat_re)
if count:
issues.append({
"id": f"log-{nf}-{len(issues)}",
"category": category,
"nf": nf,
"severity": severity,
"count": count,
"description": description,
"remediation": remediation,
"source": "log",
})
# 2. NF-down events from Prometheus
for nf_st in nf_statuses:
if isinstance(nf_st, dict) and nf_st.get("state") == "down":
issues.append({
"id": f"nf-down-{nf_st['name']}",
"category": "Connectivity",
"nf": nf_st["name"],
"severity": "critical",
"count": 1,
"description": f"{nf_st['name']} is unreachable",
"remediation": (f"Run `podman ps` on the VM and check if {nf_st['name']} "
f"container is running; inspect its logs."),
"source": "prometheus",
})
# 3. Active Alertmanager alerts
for alert in alerts:
if isinstance(alert, dict):
issues.append({
"id": f"alert-{alert.get('name', '')}-{len(issues)}",
"category": _alert_category(alert),
"nf": _alert_nf(alert),
"severity": alert.get("severity", "warning"),
"count": 1,
"description": alert.get("summary") or alert.get("name", "Unknown alert"),
"remediation": "Investigate the active Alertmanager alert and follow runbook.",
"source": "alertmanager",
})
# Group by category, preserving canonical order
cats: dict[str, dict] = {}
for cat_name in ALL_CATEGORIES:
cats[cat_name] = {
"name": cat_name,
"color": CATEGORY_COLORS[cat_name],
"count": 0,
"issues": [],
}
for issue in issues:
cat = issue["category"]
if cat not in cats:
cats[cat] = {"name": cat, "color": "#7a8499", "count": 0, "issues": []}
cats[cat]["count"] += issue["count"]
cats[cat]["issues"].append(issue)
total = sum(c["count"] for c in cats.values())
categories = [c for c in cats.values()]
result = {
"total": total,
"categories": categories,
"timestamp": datetime.now().isoformat(),
"log_sources": list(containers.keys()),
}
# Persist to history ring-buffer
_history.append({
"time": datetime.now().isoformat(),
"total": total,
"by_category": {name: cats[name]["count"] for name in ALL_CATEGORIES},
})
return result
def get_history() -> list:
"""Return the accumulated history snapshots as a plain list."""
return list(_history)

View File

@@ -0,0 +1,41 @@
"""Prometheus client — queries the HPE P5G Prometheus instance."""
import httpx
from app.config import PROMETHEUS_URL, PROMETHEUS_PREFIX, TARGET_TYPE_MAP, ALL_NFS
_BASE = PROMETHEUS_URL.rstrip("/") + PROMETHEUS_PREFIX
async def query(promql: str) -> list:
"""Run an instant PromQL query, return the result list."""
async with httpx.AsyncClient(timeout=5) as client:
r = await client.get(f"{_BASE}/api/v1/query", params={"query": promql})
r.raise_for_status()
return r.json()["data"]["result"]
async def get_nf_status() -> list:
"""Return a list of {name, state, instance} for every known NF."""
try:
results = await query("up")
except Exception:
return [{"name": n, "state": "unknown", "instance": ""} for n in ALL_NFS]
seen: dict[str, dict] = {}
for r in results:
metric = r["metric"]
target = metric.get("target_type", metric.get("job", "")).lower()
name = TARGET_TYPE_MAP.get(target)
if not name:
continue
state = "up" if r["value"][1] == "1" else "down"
# Keep worst state if multiple instances
if name not in seen or seen[name]["state"] != "down":
seen[name] = {"name": name, "state": state, "instance": metric.get("instance", "")}
# Fill in NFs with no Prometheus data
for n in ALL_NFS:
if n not in seen:
seen[n] = {"name": n, "state": "unknown", "instance": ""}
return list(seen.values())

92
app/services/ueransim.py Normal file
View File

@@ -0,0 +1,92 @@
import asyncio
import uuid
import time
from typing import Dict, Optional
_tasks: Dict[str, dict] = {}
def create_task() -> str:
task_id = str(uuid.uuid4())
_tasks[task_id] = {
"id": task_id,
"status": "pending",
"logs": [],
"created": time.time(),
}
return task_id
def get_task(task_id: str) -> Optional[dict]:
return _tasks.get(task_id)
async def run_test(task_id: str) -> None:
task = _tasks[task_id]
task["status"] = "running"
def log(msg: str, type: str = "info") -> None:
task["logs"].append({"msg": msg, "type": type, "ts": time.strftime("%H:%M:%S")})
log("▸ Checking UERANSIM Docker image…", "run")
check = await asyncio.create_subprocess_exec(
"docker", "images", "-q", "ueransim",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
out, _ = await check.communicate()
if not out.strip():
log("✗ UERANSIM image not found.", "err")
log(" SSH to host and run: bash /opt/p5g-marvis/build-ueransim.sh", "err")
task["status"] = "error"
return
log(" UERANSIM image ready", "ok")
log("▸ Starting test container — allow up to 60s…", "run")
env_file = "/opt/p5g-marvis/config/ueransim.env"
try:
proc = await asyncio.create_subprocess_exec(
"docker", "run", "--rm",
"--network=host",
"--privileged",
"--env-file", env_file,
"ueransim",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
try:
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=90)
except asyncio.TimeoutError:
proc.kill()
log("✗ Test timed out after 90s — container killed", "err")
task["status"] = "error"
return
for line in stdout.decode(errors="replace").splitlines():
line = line.strip()
if not line:
continue
if "ERROR" in line:
log(line, "err")
elif "PASSED" in line or "established" in line or "successful" in line:
log(line, "ok")
elif "WARNING" in line:
log(line, "warn")
else:
log(line, "info")
if proc.returncode == 0:
log("✓ Emulated data session completed successfully", "ok")
task["status"] = "done"
elif proc.returncode == 2:
log("⚠ Credentials not configured — edit /opt/p5g-marvis/config/ueransim.env", "warn")
task["status"] = "error"
else:
log(f"✗ Test exited with code {proc.returncode}", "err")
task["status"] = "error"
except Exception as exc:
log(f"✗ Unexpected error: {exc}", "err")
task["status"] = "error"