""" log_analyzer.py — Reads P5G NF container logs and active Prometheus/Alertmanager data to produce a structured list of recommended remediation actions, grouped by category. This is the data backend powering the /api/actions endpoint. """ import asyncio import re import time from collections import deque from datetime import datetime from app.config import CONTAINER_HOST, CONTAINER_RUNTIME # ── In-memory history (up to 96 snapshots ≈ 48 min at 30 s refresh) ──────── _history: deque = deque(maxlen=96) # ── Category colour palette ────────────────────────────────────────────────── CATEGORY_COLORS: dict[str, str] = { "Registration": "#3b82f6", "Sessions": "#7c3aed", "Authentication": "#f59e0b", "Connectivity": "#06b6d4", "Policy": "#10b981", "Security": "#ef4444", } # All categories in canonical display order (left side, right side) ALL_CATEGORIES = ["Registration", "Authentication", "Security", "Sessions", "Connectivity", "Policy"] # ── Log-pattern definitions ────────────────────────────────────────────────── # Each entry: (regex, affected_nf, severity, short_description, remediation) CATEGORY_PATTERNS: dict[str, list[tuple]] = { "Registration": [ (r"RegistrationFailure|UeRegistrationFailed|N1.*[Rr]egistration.*[Ff]ail", "AMF", "critical", "UE registration failure", "Check AMF logs for NGAP errors; verify UE credentials and NRF registration."), (r"N2SetupFail|NgapSetupFail|N2.*[Tt]imeout|NgapProcedure.*failed", "AMF", "critical", "N2 interface setup failure", "Verify gNB connectivity to AMF; check SCTP transport and NGAP PLMN config."), (r"InitialContextSetupFail|UeContextRelease.*[Aa]bnormal", "AMF", "warning", "UE context setup failure", "Review AMF-SMF N11 interface; check subscriber profile in UDM/UDR."), (r"PagingFail|UeUnreachable|UeNotFound", "AMF", "warning", "UE paging failure", "Verify UE is registered; check AMF tracking area configuration."), ], "Sessions": [ (r"PduSessionEstablishmentReject|PduSession.*[Ff]ail|CreateSessionResponse.*[Ff]ail", "SMF", "critical", "PDU session establishment failure", "Check SMF-UPF N4 path; verify DNN/APN config and UPF N3/N9 interfaces."), (r"N4Session.*[Ff]ail|PfcpSession.*[Ee]rror|N4.*[Tt]imeout|PfcpAssociation.*[Ff]ail", "UPF", "critical", "N4/PFCP session error", "Restart PFCP association between SMF and UPF; check N4 IP reachability."), (r"IpAllocationFail|AddressPoolExhausted|NoIpAvailable", "SMF", "critical", "IP address pool exhausted", "Expand UE IP address pool in SMF config; review active session count."), (r"SessionModification.*[Ff]ail|BearerModification.*[Ee]rror", "SMF", "warning", "Session modification failure", "Check PCF policy consistency; verify QoS parameters match UPF capabilities."), ], "Authentication": [ (r"AuthenticationFailure|AuthReject|EapFailure|5g-aka.*[Ff]ail|EapAkaFailure", "AUSF", "critical", "UE authentication failure", "Verify USIM credentials match UDM subscriber data; check AUSF-UDM N12 link."), (r"UdmAuthReq.*[Ee]rror|SuciDeconceal.*[Ff]ail|UdmUeAuth.*[Ee]rror", "UDM", "critical", "UDM authentication error", "Check UDM-UDR N35 connectivity; verify Home Network Public Key configuration."), (r"AuthVectorFetch.*[Ff]ail|AusfUeAuth.*[Rr]eject|HssAuth.*[Ff]ail", "AUSF", "warning", "Auth vector fetch failure", "Review UDR data integrity for affected SUPI; check AUSF-UDM TLS certificates."), ], "Connectivity": [ (r"NfDiscovery.*[Ff]ail|NrfRegistration.*[Ff]ail|NfDeregistration.*unexpect", "NRF", "warning", "NF service discovery failure", "Verify NRF is reachable from all NFs; check NRF registration TTL and heartbeat."), (r"ServiceUnavailable.*NF|HTTP.*503.*NF|NfProfile.*expired", "NRF", "warning", "NF service unavailable", "Check NF pod health and SBI listen port; review NRF subscription notifications."), (r"SbiRequest.*[Tt]imeout|SbiConn.*[Ff]ail|Http2.*[Ee]rror", "NRF", "warning", "SBI interface timeout", "Inspect inter-NF network MTU and TLS handshake; check load balancer config."), ], "Policy": [ (r"PcfSmPolicy.*[Ee]rror|PolicyDecision.*[Ff]ail|SmPolicy.*[Rr]eject", "PCF", "warning", "Policy decision failure", "Review PCF policy rules and subscriber group config; check PCF-UDR N36 link."), (r"QosEnforce.*[Ff]ail|ChargingRule.*[Ee]rror|PccRule.*[Rr]eject", "PCF", "warning", "QoS policy enforcement failure", "Verify QoS profiles match UPF capabilities; check PCF-CHF N40 charging path."), ], "Security": [ (r"SecurityMode.*[Ff]ail|IntegrityCheck.*[Ff]ail|NasIntegrity.*[Ee]rror", "AMF", "critical", "NAS security mode failure", "Check AMF cipher/integrity algorithm priority list matches UE capabilities."), (r"TlsHandshake.*[Ff]ail|Certificate.*[Ee]xpir|x509.*[Ee]rror|CertVerify.*[Ff]ail", "AMF", "critical", "TLS/certificate error", "Renew expired certificates; verify trust chain between NFs; check SBI TLS config."), (r"SuciProtection.*[Ff]ail|PrivacyProtection.*[Ee]rror|HomeNetworkKey.*[Ee]rror", "UDM", "warning", "SUCI privacy protection error", "Verify Home Network Public Key provisioning on UDM; check SUPI revealing config."), ], } # ── NF → possible container name fragments (tried in order) ───────────────── NF_CONTAINER_HINTS: dict[str, list[str]] = { "AMF": ["amf"], "SMF": ["smf"], "UPF": ["upf"], "NRF": ["nrf"], "UDM": ["udm"], "AUSF": ["ausf"], "PCF": ["pcf"], } # ── Container discovery cache ──────────────────────────────────────────────── _container_cache: dict[str, str] = {} _container_cache_ts: float = 0.0 async def _discover_containers() -> dict[str, str]: """Run the configured container runtime and map NF names to actual container names.""" global _container_cache, _container_cache_ts now = time.monotonic() if _container_cache and now - _container_cache_ts < 60: return _container_cache try: cmd = [CONTAINER_RUNTIME] if CONTAINER_HOST: cmd.extend(["--host", CONTAINER_HOST]) cmd.extend(["ps", "--format", "{{.Names}}"]) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) names = [n.strip() for n in stdout.decode().splitlines() if n.strip()] except Exception: names = [] mapping: dict[str, str] = {} for nf, hints in NF_CONTAINER_HINTS.items(): for hint in hints: match = next((n for n in names if hint in n.lower()), None) if match: mapping[nf] = match break _container_cache = mapping _container_cache_ts = now return mapping async def _read_logs(container: str, tail: int = 400) -> str: """Read recent logs from a container (stdout + stderr).""" try: cmd = [CONTAINER_RUNTIME] if CONTAINER_HOST: cmd.extend(["--host", CONTAINER_HOST]) cmd.extend(["logs", "--tail", str(tail), container]) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=8) return (stdout.decode("utf-8", errors="replace") + stderr.decode("utf-8", errors="replace")) except Exception: return "" def _match_count(text: str, pattern: str) -> int: if not text: return 0 try: return len(re.findall(pattern, text, re.IGNORECASE | re.MULTILINE)) except re.error: return 0 # ── Category/NF mapping for Alertmanager alerts ────────────────────────────── def _alert_category(alert: dict) -> str: name = (alert.get("name", "") + " " + alert.get("summary", "")).lower() if any(k in name for k in ["register", "attach", "ngap", "n2"]): return "Registration" if any(k in name for k in ["session", "pdu", "bearer", "smf_", "upf_", "n4", "pfcp"]): return "Sessions" if any(k in name for k in ["auth", "ausf", "udm_", "supi", "aka", "eap"]): return "Authentication" if any(k in name for k in ["nrf", "discovery", "unavailable", "sbi", "connect"]): return "Connectivity" if any(k in name for k in ["pcf", "policy", "qos", "pcc", "charge"]): return "Policy" if any(k in name for k in ["tls", "cert", "security", "cipher", "integ", "suci"]): return "Security" return "Connectivity" def _alert_nf(alert: dict) -> str: from app.config import ALL_NFS text = (alert.get("name", "") + alert.get("instance", "")).lower() for nf in ALL_NFS: if nf.lower() in text: return nf return "System" # ── Main analysis entry point ──────────────────────────────────────────────── async def analyze_logs() -> dict: """ Gather log-pattern issues + Prometheus NF status + Alertmanager alerts. Returns a fully structured dict ready for JSON serialisation. """ from app.services import alertmanager, prometheus # Kick off all I/O in parallel containers_f = asyncio.create_task(_discover_containers()) alerts_f = asyncio.create_task(alertmanager.get_alerts()) nf_status_f = asyncio.create_task(prometheus.get_nf_status()) containers = await containers_f alerts, nf_statuses = await asyncio.gather(alerts_f, nf_status_f, return_exceptions=True) if isinstance(alerts, Exception): alerts = [] if isinstance(nf_statuses, Exception): nf_statuses = [] # Read all container logs concurrently log_tasks = {nf: asyncio.create_task(_read_logs(cname)) for nf, cname in containers.items()} log_texts: dict[str, str] = {} if log_tasks: log_results = await asyncio.gather(*log_tasks.values(), return_exceptions=True) for nf, result in zip(log_tasks.keys(), log_results): log_texts[nf] = result if isinstance(result, str) else "" issues: list[dict] = [] # 1. Log-pattern analysis for category, patterns in CATEGORY_PATTERNS.items(): for (pat_re, nf, severity, description, remediation) in patterns: count = _match_count(log_texts.get(nf, ""), pat_re) if count: issues.append({ "id": f"log-{nf}-{len(issues)}", "category": category, "nf": nf, "severity": severity, "count": count, "description": description, "remediation": remediation, "source": "log", }) # 2. NF-down events from Prometheus for nf_st in nf_statuses: if isinstance(nf_st, dict) and nf_st.get("state") == "down": issues.append({ "id": f"nf-down-{nf_st['name']}", "category": "Connectivity", "nf": nf_st["name"], "severity": "critical", "count": 1, "description": f"{nf_st['name']} is unreachable", "remediation": (f"Run `{CONTAINER_RUNTIME} ps` and check if {nf_st['name']} " f"container is running; inspect its logs."), "source": "prometheus", }) # 3. Active Alertmanager alerts for alert in alerts: if isinstance(alert, dict): issues.append({ "id": f"alert-{alert.get('name', '')}-{len(issues)}", "category": _alert_category(alert), "nf": _alert_nf(alert), "severity": alert.get("severity", "warning"), "count": 1, "description": alert.get("summary") or alert.get("name", "Unknown alert"), "remediation": "Investigate the active Alertmanager alert and follow runbook.", "source": "alertmanager", }) # Group by category, preserving canonical order cats: dict[str, dict] = {} for cat_name in ALL_CATEGORIES: cats[cat_name] = { "name": cat_name, "color": CATEGORY_COLORS[cat_name], "count": 0, "issues": [], } for issue in issues: cat = issue["category"] if cat not in cats: cats[cat] = {"name": cat, "color": "#7a8499", "count": 0, "issues": []} cats[cat]["count"] += issue["count"] cats[cat]["issues"].append(issue) total = sum(c["count"] for c in cats.values()) categories = [c for c in cats.values()] result = { "total": total, "categories": categories, "timestamp": datetime.now().isoformat(), "log_sources": list(containers.keys()), } # Persist to history ring-buffer _history.append({ "time": datetime.now().isoformat(), "total": total, "by_category": {name: cats[name]["count"] for name in ALL_CATEGORIES}, }) return result def get_history() -> list: """Return the accumulated history snapshots as a plain list.""" return list(_history)