Files
p5g-marvis/app/services/alertmanager.py
2026-04-24 12:33:52 -04:00

46 lines
1.6 KiB
Python

"""Alertmanager client."""
import httpx
from app.config import ALERTMANAGER_URL
from app.services import cluster_inventory
_BASE = ALERTMANAGER_URL.rstrip("/")
async def get_alerts() -> list:
"""Return normalised list of active alerts from Alertmanager."""
try:
async with httpx.AsyncClient(timeout=5) as client:
r = await client.get(f"{_BASE}/api/v2/alerts", params={"active": "true", "silenced": "false"})
r.raise_for_status()
raw = r.json()
except Exception:
return []
cluster = await cluster_inventory.get_cluster_inventory()
alerts = []
for a in raw:
labels = a.get("labels", {})
annotations = a.get("annotations", {})
name = labels.get("alertname", "Unknown")
summary = annotations.get("summary", annotations.get("description", ""))
nf_name = _infer_nf(name, summary, labels.get("instance", ""))
nodes = cluster_inventory.find_nf_nodes(cluster, nf_name) if nf_name else []
alerts.append({
"name": name,
"severity": labels.get("severity", "warning"),
"instance": labels.get("instance", ""),
"summary": summary,
"nf": nf_name,
"nodes": nodes,
})
return alerts
def _infer_nf(name: str, summary: str, instance: str) -> str:
text = f"{name} {summary} {instance}".upper()
for nf_name in ["AMF", "SMF", "UPF", "UDM", "UDR", "NRF", "AUSF", "PCF", "MME", "SGWC", "DRA", "DSM"]:
if nf_name in text:
return nf_name
return ""