177 lines
5.9 KiB
Python
177 lines
5.9 KiB
Python
"""Alert sources: Alertmanager plus log-derived alerts."""
|
|
|
|
import asyncio
|
|
import json
|
|
import httpx
|
|
from app.config import ALERTMANAGER_URL
|
|
from app.services import cluster_inventory, log_ingest
|
|
|
|
_BASE = ALERTMANAGER_URL.rstrip("/")
|
|
|
|
|
|
async def get_alerts() -> list:
|
|
"""Return normalised list of active alerts from Alertmanager and log analysis."""
|
|
cluster = await cluster_inventory.get_cluster_inventory()
|
|
alertmanager_task = asyncio.create_task(_get_alertmanager_alerts(cluster))
|
|
log_task = asyncio.to_thread(_get_log_alerts, cluster)
|
|
am_alerts, log_alerts = await asyncio.gather(alertmanager_task, log_task, return_exceptions=True)
|
|
if isinstance(am_alerts, Exception):
|
|
am_alerts = []
|
|
if isinstance(log_alerts, Exception):
|
|
log_alerts = []
|
|
return sorted(
|
|
[*am_alerts, *log_alerts],
|
|
key=lambda alert: (_severity_rank(alert.get("severity")), alert.get("timestamp", "")),
|
|
)
|
|
|
|
|
|
async def _get_alertmanager_alerts(cluster: dict) -> list:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5) as client:
|
|
r = await client.get(f"{_BASE}/api/v2/alerts", params={"active": "true", "silenced": "false"})
|
|
r.raise_for_status()
|
|
raw = r.json()
|
|
except Exception:
|
|
return []
|
|
|
|
alerts = []
|
|
for a in raw:
|
|
labels = a.get("labels", {})
|
|
annotations = a.get("annotations", {})
|
|
name = labels.get("alertname", "Unknown")
|
|
summary = annotations.get("summary", annotations.get("description", ""))
|
|
nf_name = _infer_nf(name, summary, labels.get("instance", ""))
|
|
nodes = _resolve_nodes(cluster, labels, name, summary, nf_name)
|
|
alerts.append({
|
|
"name": name,
|
|
"severity": labels.get("severity", "warning"),
|
|
"instance": labels.get("instance", ""),
|
|
"summary": summary,
|
|
"nf": nf_name,
|
|
"nodes": nodes,
|
|
"source": "alertmanager",
|
|
"timestamp": a.get("startsAt", ""),
|
|
})
|
|
return alerts
|
|
|
|
|
|
def _get_log_alerts(cluster: dict) -> list:
|
|
node_map = {}
|
|
for node in cluster.get("nodes", []):
|
|
if node.get("hostname"):
|
|
node_map[node["hostname"]] = node
|
|
if node.get("address"):
|
|
node_map[node["address"]] = node
|
|
|
|
alerts = []
|
|
for ctx in log_ingest.recent_alert_context(limit=50):
|
|
before = _decode_context(ctx.get("before_context"))
|
|
after = _decode_context(ctx.get("after_context"))
|
|
node_name = ctx.get("node", "")
|
|
nodes = []
|
|
if node_name and node_name in node_map:
|
|
nodes = [node_map[node_name]]
|
|
alerts.append({
|
|
"name": f"{ctx.get('nf') or 'System'} log anomaly",
|
|
"severity": ctx.get("severity", "warning"),
|
|
"instance": ctx.get("source", ""),
|
|
"summary": ctx.get("description", "Log-derived alert"),
|
|
"nf": ctx.get("nf", ""),
|
|
"nodes": nodes,
|
|
"source": "logs",
|
|
"timestamp": ctx.get("event_ts", ""),
|
|
"context_id": ctx.get("id"),
|
|
"node": node_name,
|
|
"match_message": ctx.get("match_message", ""),
|
|
"context_preview": {
|
|
"before": before[-3:],
|
|
"after": after[:3],
|
|
},
|
|
})
|
|
return alerts
|
|
|
|
|
|
def _decode_context(value: str | None) -> list[dict]:
|
|
if not value:
|
|
return []
|
|
try:
|
|
data = json.loads(value)
|
|
return data if isinstance(data, list) else []
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def _severity_rank(severity: str | None) -> int:
|
|
return {"critical": 0, "warning": 1, "info": 2}.get((severity or "warning").lower(), 3)
|
|
|
|
|
|
def _infer_nf(name: str, summary: str, instance: str) -> str:
|
|
text = f"{name} {summary} {instance}".upper()
|
|
for nf_name in ["AMF", "SMF", "UPF", "UDM", "UDR", "NRF", "AUSF", "PCF", "MME", "SGWC", "DRA", "DSM", "AAA", "BMSC", "CHF", "SMSF", "EIR"]:
|
|
if nf_name in text:
|
|
return nf_name
|
|
return ""
|
|
|
|
|
|
def _resolve_nodes(cluster: dict, labels: dict, name: str, summary: str, nf_name: str) -> list:
|
|
nodes = cluster.get("nodes", [])
|
|
if not nodes:
|
|
return []
|
|
|
|
search_parts = [name, summary]
|
|
for key in ("instance", "job", "service", "container", "hostname", "node", "pod"):
|
|
value = labels.get(key)
|
|
if value:
|
|
search_parts.append(value)
|
|
search_text = " ".join(search_parts).upper()
|
|
|
|
explicit = []
|
|
for node in nodes:
|
|
hostname = str(node.get("hostname", ""))
|
|
address = str(node.get("address", ""))
|
|
node_name = str(node.get("name", ""))
|
|
if any(token and token.upper() in search_text for token in (hostname, address, node_name)):
|
|
explicit.append(_node_ref(node))
|
|
if explicit:
|
|
return _dedupe_nodes(explicit)
|
|
|
|
if nf_name:
|
|
nf_nodes = cluster_inventory.find_nf_nodes(cluster, nf_name)
|
|
if nf_nodes:
|
|
return nf_nodes
|
|
|
|
service_matches = []
|
|
for node in nodes:
|
|
started = {str(service).upper() for service in node.get("started_services", [])}
|
|
if any(service and service in search_text for service in started):
|
|
service_matches.append(_node_ref(node))
|
|
if service_matches:
|
|
return _dedupe_nodes(service_matches)
|
|
|
|
current = next((node for node in nodes if node.get("current")), None)
|
|
if current and labels.get("instance", "").startswith(("127.0.0.1", "localhost")):
|
|
return [_node_ref(current)]
|
|
|
|
return []
|
|
|
|
|
|
def _node_ref(node: dict) -> dict:
|
|
return {
|
|
"hostname": node.get("hostname", ""),
|
|
"address": node.get("address", ""),
|
|
"role": node.get("role", "AP"),
|
|
"current": node.get("current", False),
|
|
}
|
|
|
|
|
|
def _dedupe_nodes(nodes: list[dict]) -> list[dict]:
|
|
seen = set()
|
|
result = []
|
|
for node in nodes:
|
|
key = (node.get("hostname"), node.get("address"))
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
result.append(node)
|
|
return result
|