"""Structured issue generation from ingested cross-node log events and state.""" import asyncio import re import time from collections import deque from datetime import datetime from app.config import ( CONTAINER_HOST, CONTAINER_RUNTIME, LOG_ALERT_CONTEXT_AFTER, LOG_ALERT_CONTEXT_BEFORE, ) from app.services.log_rules import load_category_patterns # ── In-memory history (up to 96 snapshots ≈ 48 min at 30 s refresh) ──────── _history: deque = deque(maxlen=96) # ── Category colour palette ────────────────────────────────────────────────── CATEGORY_COLORS: dict[str, str] = { "Registration": "#3b82f6", "Sessions": "#7c3aed", "Authentication": "#f59e0b", "Connectivity": "#06b6d4", "Policy": "#10b981", "Security": "#ef4444", } # All categories in canonical display order (left side, right side) ALL_CATEGORIES = ["Registration", "Authentication", "Security", "Sessions", "Connectivity", "Policy"] # ── NF → possible container name fragments (tried in order) ───────────────── NF_CONTAINER_HINTS: dict[str, list[str]] = { "AMF": ["amf"], "SMF": ["smf"], "UPF": ["upf"], "NRF": ["nrf"], "UDM": ["udm"], "AUSF": ["ausf"], "PCF": ["pcf"], } # ── Container discovery cache ──────────────────────────────────────────────── _container_cache: dict[str, str] = {} _container_cache_ts: float = 0.0 async def _discover_containers() -> dict[str, str]: """Run the configured container runtime and map NF names to actual container names.""" global _container_cache, _container_cache_ts now = time.monotonic() if _container_cache and now - _container_cache_ts < 60: return _container_cache try: cmd = [CONTAINER_RUNTIME] if CONTAINER_HOST: cmd.extend(["--host", CONTAINER_HOST]) cmd.extend(["ps", "--format", "{{.Names}}"]) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) names = [n.strip() for n in stdout.decode().splitlines() if n.strip()] except Exception: names = [] mapping: dict[str, str] = {} for nf, hints in NF_CONTAINER_HINTS.items(): for hint in hints: match = next((n for n in names if hint in n.lower()), None) if match: mapping[nf] = match break _container_cache = mapping _container_cache_ts = now return mapping async def _read_logs(container: str, tail: int = 400) -> str: """Read recent logs from a container (stdout + stderr).""" try: cmd = [CONTAINER_RUNTIME] if CONTAINER_HOST: cmd.extend(["--host", CONTAINER_HOST]) cmd.extend(["logs", "--tail", str(tail), container]) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=8) return (stdout.decode("utf-8", errors="replace") + stderr.decode("utf-8", errors="replace")) except Exception: return "" def _rule_matches(message: str, pattern: str) -> bool: if not message: return False try: return bool(re.search(pattern, message, re.IGNORECASE | re.MULTILINE)) except re.error: return False def summarize_event_slice(events: list[dict]) -> list[dict]: findings: list[dict] = [] seen: set[tuple[str, str, str, str]] = set() for event in sorted(events or [], key=lambda item: item.get("epoch", 0.0)): message = event.get("message", "") event_nf = str(event.get("nf", "")).upper() event_node = event.get("node", "") for category, patterns in load_category_patterns().items(): for rule in patterns: rule_nf = str(rule["nf"]).upper() if rule_nf != event_nf: continue if not _rule_matches(message, rule["pattern"]): continue key = (category, rule_nf, event_node, rule["description"]) if key in seen: continue seen.add(key) findings.append( { "category": category, "nf": rule_nf, "node": event_node, "severity": rule["severity"], "description": rule["description"], "remediation": rule["remediation"], "message": message, "timestamp": event.get("timestamp", ""), } ) return findings # ── Category/NF mapping for Alertmanager alerts ────────────────────────────── def _alert_category(alert: dict) -> str: name = (alert.get("name", "") + " " + alert.get("summary", "")).lower() if any(k in name for k in ["register", "attach", "ngap", "n2"]): return "Registration" if any(k in name for k in ["session", "pdu", "bearer", "smf_", "upf_", "n4", "pfcp"]): return "Sessions" if any(k in name for k in ["auth", "ausf", "udm_", "supi", "aka", "eap"]): return "Authentication" if any(k in name for k in ["nrf", "discovery", "unavailable", "sbi", "connect"]): return "Connectivity" if any(k in name for k in ["pcf", "policy", "qos", "pcc", "charge"]): return "Policy" if any(k in name for k in ["tls", "cert", "security", "cipher", "integ", "suci"]): return "Security" return "Connectivity" def _alert_nf(alert: dict) -> str: from app.config import ALL_NFS text = (alert.get("name", "") + alert.get("instance", "")).lower() for nf in ALL_NFS: if nf.lower() in text: return nf return "System" # ── Main analysis entry point ──────────────────────────────────────────────── async def analyze_logs() -> dict: """ Gather log-pattern issues + Prometheus NF status + Alertmanager alerts. Returns a fully structured dict ready for JSON serialisation. """ from app.services import alertmanager, cluster_inventory, log_ingest, prometheus # Kick off all I/O in parallel containers_f = asyncio.create_task(_discover_containers()) alerts_f = asyncio.create_task(alertmanager.get_alerts()) nf_status_f = asyncio.create_task(prometheus.get_nf_status()) cluster_f = asyncio.create_task(cluster_inventory.get_cluster_inventory()) events_f = asyncio.to_thread(log_ingest.get_events) containers = await containers_f alerts, nf_statuses, cluster, events = await asyncio.gather( alerts_f, nf_status_f, cluster_f, events_f, return_exceptions=True ) if isinstance(alerts, Exception): alerts = [] if isinstance(nf_statuses, Exception): nf_statuses = [] if isinstance(cluster, Exception): cluster = {"enabled": False, "nodes": []} if isinstance(events, Exception): events = [] events = sorted( [event for event in events if isinstance(event, dict)], key=lambda event: event.get("epoch", 0.0), ) issues: list[dict] = [] grouped_log_issues: dict[tuple[str, str, str, str], dict] = {} # 1. Time-ordered log-pattern analysis across all nodes. for idx, event in enumerate(events): message = event.get("message", "") event_nf = str(event.get("nf", "")).upper() event_node = event.get("node", "") for category, patterns in load_category_patterns().items(): for rule in patterns: rule_nf = str(rule["nf"]).upper() if event_nf != rule_nf: continue if not _rule_matches(message, rule["pattern"]): continue before_context = events[max(0, idx - LOG_ALERT_CONTEXT_BEFORE):idx] after_context = events[idx + 1:idx + 1 + LOG_ALERT_CONTEXT_AFTER] context_id = log_ingest.record_alert_context( category=category, nf=rule_nf, node=event_node, severity=rule["severity"], description=rule["description"], remediation=rule["remediation"], source="fluentbit", event=event, before_context=before_context, after_context=after_context, ) issue_key = (category, rule_nf, event_node, rule["description"]) if issue_key not in grouped_log_issues: grouped_log_issues[issue_key] = { "id": f"log-{rule_nf}-{len(grouped_log_issues)}", "category": category, "nf": rule_nf, "node": event_node, "severity": rule["severity"], "count": 0, "description": rule["description"], "remediation": rule["remediation"], "source": "fluentbit", "context_id": context_id, } grouped_log_issues[issue_key]["count"] += 1 issues.extend(grouped_log_issues.values()) # Fallback to local container logs until Fluent Bit has populated the buffer. if not issues and not events: log_tasks = {nf: asyncio.create_task(_read_logs(cname)) for nf, cname in containers.items()} if log_tasks: log_results = await asyncio.gather(*log_tasks.values(), return_exceptions=True) log_texts = { nf: result if isinstance(result, str) else "" for nf, result in zip(log_tasks.keys(), log_results) } for category, patterns in load_category_patterns().items(): for rule in patterns: nf = rule["nf"] if _rule_matches(log_texts.get(nf, ""), rule["pattern"]): issues.append({ "id": f"log-{nf}-{len(issues)}", "category": category, "nf": nf, "severity": rule["severity"], "count": 1, "description": rule["description"], "remediation": rule["remediation"], "source": "local-log-fallback", }) # 2. NF-down events from Prometheus for nf_st in nf_statuses: if isinstance(nf_st, dict) and nf_st.get("state") == "down": node_text = ", ".join(node["hostname"] for node in nf_st.get("nodes", [])) issues.append({ "id": f"nf-down-{nf_st['name']}", "category": "Connectivity", "nf": nf_st["name"], "node": node_text, "severity": "critical", "count": 1, "description": f"{nf_st['name']} is unreachable", "remediation": (f"Check {node_text or 'the hosting node'} first, then run " f"`{CONTAINER_RUNTIME} ps` and inspect `{nf_st['name'].lower()}` logs."), "source": "prometheus", }) # 3. Active Alertmanager alerts for alert in alerts: if isinstance(alert, dict): node_text = ", ".join(node["hostname"] for node in alert.get("nodes", [])) issues.append({ "id": f"alert-{alert.get('name', '')}-{len(issues)}", "category": _alert_category(alert), "nf": _alert_nf(alert), "node": node_text, "severity": alert.get("severity", "warning"), "count": 1, "description": alert.get("summary") or alert.get("name", "Unknown alert"), "remediation": "Investigate the active Alertmanager alert and follow runbook.", "source": "alertmanager", }) # Group by category, preserving canonical order cats: dict[str, dict] = {} for cat_name in ALL_CATEGORIES: cats[cat_name] = { "name": cat_name, "color": CATEGORY_COLORS[cat_name], "count": 0, "issues": [], } for issue in issues: cat = issue["category"] if cat not in cats: cats[cat] = {"name": cat, "color": "#7a8499", "count": 0, "issues": []} cats[cat]["count"] += issue["count"] cats[cat]["issues"].append(issue) total = sum(c["count"] for c in cats.values()) categories = [c for c in cats.values()] result = { "total": total, "categories": categories, "timestamp": datetime.now().isoformat(), "log_sources": sorted({f"{event.get('node', 'unknown')}:{event.get('nf', 'SYSTEM')}" for event in events}) or list(containers.keys()), "log_ingest": log_ingest.receiver_status(), "cluster": cluster, } # Persist to history ring-buffer _history.append({ "time": datetime.now().isoformat(), "total": total, "by_category": {name: cats[name]["count"] for name in ALL_CATEGORIES}, }) return result def get_history() -> list: """Return the accumulated history snapshots as a plain list.""" return list(_history)