"""Alert sources: Alertmanager plus log-derived alerts.""" import asyncio import json import httpx from app.config import ALERTMANAGER_URL from app.services import cluster_inventory, log_ingest _BASE = ALERTMANAGER_URL.rstrip("/") async def get_alerts() -> list: """Return normalised list of active alerts from Alertmanager and log analysis.""" cluster = await cluster_inventory.get_cluster_inventory() alertmanager_task = asyncio.create_task(_get_alertmanager_alerts(cluster)) log_task = asyncio.to_thread(_get_log_alerts, cluster) am_alerts, log_alerts = await asyncio.gather(alertmanager_task, log_task, return_exceptions=True) if isinstance(am_alerts, Exception): am_alerts = [] if isinstance(log_alerts, Exception): log_alerts = [] return sorted( [*am_alerts, *log_alerts], key=lambda alert: (_severity_rank(alert.get("severity")), alert.get("timestamp", "")), ) async def _get_alertmanager_alerts(cluster: dict) -> list: try: async with httpx.AsyncClient(timeout=5) as client: r = await client.get(f"{_BASE}/api/v2/alerts", params={"active": "true", "silenced": "false"}) r.raise_for_status() raw = r.json() except Exception: return [] alerts = [] for a in raw: labels = a.get("labels", {}) annotations = a.get("annotations", {}) name = labels.get("alertname", "Unknown") summary = annotations.get("summary", annotations.get("description", "")) nf_name = _infer_nf(name, summary, labels.get("instance", "")) nodes = _resolve_nodes(cluster, labels, name, summary, nf_name) alerts.append({ "name": name, "severity": labels.get("severity", "warning"), "instance": labels.get("instance", ""), "summary": summary, "nf": nf_name, "nodes": nodes, "source": "alertmanager", "timestamp": a.get("startsAt", ""), }) return alerts def _get_log_alerts(cluster: dict) -> list: node_map = {} for node in cluster.get("nodes", []): if node.get("hostname"): node_map[node["hostname"]] = node if node.get("address"): node_map[node["address"]] = node alerts = [] for ctx in log_ingest.recent_alert_context(limit=50): before = _decode_context(ctx.get("before_context")) after = _decode_context(ctx.get("after_context")) node_name = ctx.get("node", "") nodes = [] if node_name and node_name in node_map: nodes = [node_map[node_name]] alerts.append({ "name": f"{ctx.get('nf') or 'System'} log anomaly", "severity": ctx.get("severity", "warning"), "instance": ctx.get("source", ""), "summary": ctx.get("description", "Log-derived alert"), "nf": ctx.get("nf", ""), "nodes": nodes, "source": "logs", "timestamp": ctx.get("event_ts", ""), "context_id": ctx.get("id"), "node": node_name, "match_message": ctx.get("match_message", ""), "context_preview": { "before": before[-3:], "after": after[:3], }, }) return alerts def _decode_context(value: str | None) -> list[dict]: if not value: return [] try: data = json.loads(value) return data if isinstance(data, list) else [] except Exception: return [] def _severity_rank(severity: str | None) -> int: return {"critical": 0, "warning": 1, "info": 2}.get((severity or "warning").lower(), 3) def _infer_nf(name: str, summary: str, instance: str) -> str: text = f"{name} {summary} {instance}".upper() for nf_name in ["AMF", "SMF", "UPF", "UDM", "UDR", "NRF", "AUSF", "PCF", "MME", "SGWC", "DRA", "DSM", "AAA", "BMSC", "CHF", "SMSF", "EIR"]: if nf_name in text: return nf_name return "" def _resolve_nodes(cluster: dict, labels: dict, name: str, summary: str, nf_name: str) -> list: nodes = cluster.get("nodes", []) if not nodes: return [] search_parts = [name, summary] for key in ("instance", "job", "service", "container", "hostname", "node", "pod"): value = labels.get(key) if value: search_parts.append(value) search_text = " ".join(search_parts).upper() explicit = [] for node in nodes: hostname = str(node.get("hostname", "")) address = str(node.get("address", "")) node_name = str(node.get("name", "")) if any(token and token.upper() in search_text for token in (hostname, address, node_name)): explicit.append(_node_ref(node)) if explicit: return _dedupe_nodes(explicit) if nf_name: nf_nodes = cluster_inventory.find_nf_nodes(cluster, nf_name) if nf_nodes: return nf_nodes service_matches = [] for node in nodes: started = {str(service).upper() for service in node.get("started_services", [])} if any(service and service in search_text for service in started): service_matches.append(_node_ref(node)) if service_matches: return _dedupe_nodes(service_matches) current = next((node for node in nodes if node.get("current")), None) if current and labels.get("instance", "").startswith(("127.0.0.1", "localhost")): return [_node_ref(current)] return [] def _node_ref(node: dict) -> dict: return { "hostname": node.get("hostname", ""), "address": node.get("address", ""), "role": node.get("role", "AP"), "current": node.get("current", False), } def _dedupe_nodes(nodes: list[dict]) -> list[dict]: seen = set() result = [] for node in nodes: key = (node.get("hostname"), node.get("address")) if key in seen: continue seen.add(key) result.append(node) return result