diff --git a/Dockerfile b/Dockerfile index 3e5facf..48631d7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,16 +10,30 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ && rm -rf /var/lib/apt/lists/* \ && pip install --no-cache-dir -r requirements.txt +RUN mkdir -p /app/data + COPY app/ ./app/ COPY config/ ./config/ -EXPOSE 8100 +EXPOSE 8100 5514 # Appliance-style defaults for host-network deployment. Override via env or # systemd EnvironmentFile when needed. ENV MARVIS_PROMETHEUS_URL=http://127.0.0.1:9090 ENV MARVIS_PROMETHEUS_PREFIX=/prometheus ENV MARVIS_ALERTMANAGER_URL=http://127.0.0.1:9093 +ENV MARVIS_LOG_INGEST_ENABLED=true +ENV MARVIS_LOG_AUTO_CONFIGURE=true +ENV MARVIS_LOG_RECEIVER_BIND_HOST=0.0.0.0 +ENV MARVIS_LOG_RECEIVER_PORT=5514 +ENV MARVIS_LOG_RECEIVER_FORMAT=json_lines +ENV MARVIS_LOG_BUFFER_LINES=1000 +ENV MARVIS_LOG_TRACE_BUFFER_LINES=5000 +ENV MARVIS_LOG_ALERT_CONTEXT_BEFORE=5 +ENV MARVIS_LOG_ALERT_CONTEXT_AFTER=5 +ENV MARVIS_LOG_ALERT_CONTEXT_DB_PATH=/app/data/marvis-alert-context.db +ENV MARVIS_LOG_ALERT_CONTEXT_DB_MAX_ROWS=500 +ENV MARVIS_LOG_FLUENTBIT_MATCH=* ENV MARVIS_AI_MODE=rule ENV MARVIS_CONTAINER_RUNTIME=docker ENV MARVIS_UERANSIM_ENV_FILE=/app/config/ueransim.env diff --git a/app/__pycache__/config.cpython-314.pyc b/app/__pycache__/config.cpython-314.pyc index 7ac8b77..3a854e4 100644 Binary files a/app/__pycache__/config.cpython-314.pyc and b/app/__pycache__/config.cpython-314.pyc differ diff --git a/app/__pycache__/main.cpython-314.pyc b/app/__pycache__/main.cpython-314.pyc index 618f756..cb6bec5 100644 Binary files a/app/__pycache__/main.cpython-314.pyc and b/app/__pycache__/main.cpython-314.pyc differ diff --git a/app/config.py b/app/config.py index 26a89fe..7b06802 100644 --- a/app/config.py +++ b/app/config.py @@ -8,6 +8,24 @@ def _env_bool(name: str, default: bool) -> bool: return value.lower() in {"1", "true", "yes", "on"} +def _env_int(name: str, default: int) -> int: + value = os.getenv(name) + if value is None: + return default + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _env_list(name: str, default: list[str]) -> list[str]: + value = os.getenv(name) + if value is None: + return default + parsed = [item.strip() for item in value.split(",") if item.strip()] + return parsed or default + + # Defaults assume the appliance-style deployment model where Marvis runs with # host networking and talks to sibling services over host loopback. PROMETHEUS_URL = os.getenv("MARVIS_PROMETHEUS_URL", "http://127.0.0.1:9090") @@ -21,6 +39,25 @@ PLS_PASSWORD = os.getenv("MARVIS_PLS_PASSWORD", "") PLS_AUTH_BACKEND = os.getenv("MARVIS_PLS_AUTH_BACKEND", "local") PLS_VERIFY_TLS = _env_bool("MARVIS_PLS_VERIFY_TLS", False) +# Fluent Bit ingestion and retention. +LOG_INGEST_ENABLED = _env_bool("MARVIS_LOG_INGEST_ENABLED", True) +LOG_AUTO_CONFIGURE = _env_bool("MARVIS_LOG_AUTO_CONFIGURE", True) +LOG_RECEIVER_BIND_HOST = os.getenv("MARVIS_LOG_RECEIVER_BIND_HOST", "0.0.0.0") +LOG_RECEIVER_HOST = os.getenv("MARVIS_LOG_RECEIVER_HOST", "") +LOG_RECEIVER_PORT = _env_int("MARVIS_LOG_RECEIVER_PORT", 5514) +LOG_RECEIVER_FORMAT = os.getenv("MARVIS_LOG_RECEIVER_FORMAT", "json_lines") +LOG_BUFFER_LINES = _env_int("MARVIS_LOG_BUFFER_LINES", 1000) +LOG_ALERT_CONTEXT_BEFORE = _env_int("MARVIS_LOG_ALERT_CONTEXT_BEFORE", 5) +LOG_ALERT_CONTEXT_AFTER = _env_int("MARVIS_LOG_ALERT_CONTEXT_AFTER", 5) +LOG_ALERT_CONTEXT_DB_PATH = os.getenv("MARVIS_LOG_ALERT_CONTEXT_DB_PATH", "/app/data/marvis-alert-context.db") +LOG_ALERT_CONTEXT_DB_MAX_ROWS = _env_int("MARVIS_LOG_ALERT_CONTEXT_DB_MAX_ROWS", 500) +LOG_TRACE_BUFFER_LINES = _env_int("MARVIS_LOG_TRACE_BUFFER_LINES", 5000) +LOG_FLUENTBIT_MATCH = os.getenv("MARVIS_LOG_FLUENTBIT_MATCH", "*") +LOG_ALLOWED_NFS = [item.upper() for item in _env_list( + "MARVIS_LOG_ALLOWED_NFS", + ["AMF", "SMF", "UPF", "UDM", "UDR", "NRF", "AUSF", "PCF", "MME", "SGWC", "DRA", "DSM", "AAA", "BMSC", "CHF", "SMSF", "EIR"], +)] + # AI backend: "rule" | "openai" | "ollama" AI_MODE = os.getenv("MARVIS_AI_MODE", "rule") OPENAI_API_KEY = os.getenv("MARVIS_OPENAI_API_KEY", "") diff --git a/app/main.py b/app/main.py index a2208c6..5471437 100644 --- a/app/main.py +++ b/app/main.py @@ -3,7 +3,15 @@ from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from pathlib import Path -from app.routers import network, alerts, query as query_router, actions as actions_router, emulated_session as emulated_session_router +from app.routers import ( + actions as actions_router, + alerts, + emulated_session as emulated_session_router, + logs as logs_router, + network, + query as query_router, +) +from app.services import log_ingest app = FastAPI(title="P5G Marvis", version="1.0.0", docs_url="/api/docs") @@ -18,6 +26,7 @@ app.include_router(network.router, prefix="/api") app.include_router(alerts.router, prefix="/api") app.include_router(query_router.router, prefix="/api") app.include_router(actions_router.router, prefix="/api") +app.include_router(logs_router.router, prefix="/api") app.include_router(emulated_session_router.router, prefix="/api") UI = Path(__file__).parent / "ui" / "index.html" @@ -51,6 +60,16 @@ async def actions_page(): return FileResponse(str(ACTIONS_UI)) +@app.on_event("startup") +async def _startup() -> None: + await log_ingest.startup() + + +@app.on_event("shutdown") +async def _shutdown() -> None: + await log_ingest.shutdown() + + # Catch-all: serve the SPA for any unmatched path (supports deep-linking) @app.get("/{full_path:path}") async def spa(full_path: str): diff --git a/app/routers/__pycache__/logs.cpython-314.pyc b/app/routers/__pycache__/logs.cpython-314.pyc new file mode 100644 index 0000000..2a3982e Binary files /dev/null and b/app/routers/__pycache__/logs.cpython-314.pyc differ diff --git a/app/routers/__pycache__/query.cpython-314.pyc b/app/routers/__pycache__/query.cpython-314.pyc index 61598e7..7a6179a 100644 Binary files a/app/routers/__pycache__/query.cpython-314.pyc and b/app/routers/__pycache__/query.cpython-314.pyc differ diff --git a/app/routers/logs.py b/app/routers/logs.py new file mode 100644 index 0000000..79ca3e5 --- /dev/null +++ b/app/routers/logs.py @@ -0,0 +1,36 @@ +from fastapi import APIRouter, HTTPException, Query + +from app.services import log_ingest + +router = APIRouter() + + +@router.get("/logs/status") +async def get_log_status(): + return log_ingest.receiver_status() + + +@router.get("/logs/events") +async def get_log_events( + limit: int = Query(default=200, ge=1, le=5000), + node: str | None = None, + nf: str | None = None, + imsi: str | None = None, +): + return { + "events": log_ingest.get_events(limit=limit, node=node, nf=nf, imsi=imsi), + "status": log_ingest.receiver_status(), + } + + +@router.get("/logs/contexts") +async def get_log_contexts(limit: int = Query(default=20, ge=1, le=200)): + return {"contexts": log_ingest.recent_alert_context(limit=limit)} + + +@router.post("/logs/configure") +async def configure_log_shipping(): + try: + return await log_ingest.configure_site_output() + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) from exc diff --git a/app/routers/query.py b/app/routers/query.py index d022105..e7e4cdd 100644 --- a/app/routers/query.py +++ b/app/routers/query.py @@ -1,6 +1,6 @@ from fastapi import APIRouter from pydantic import BaseModel -from app.services import cluster_inventory, alertmanager, ai +from app.services import cluster_inventory, alertmanager, ai, log_ingest router = APIRouter() @@ -11,14 +11,15 @@ class QueryRequest(BaseModel): @router.post("/query") async def query(req: QueryRequest): - network_state, alerts = await _gather(req.query) - response = await ai.answer(req.query, network_state, alerts) - return {"response": response, "network_state": network_state, "alerts": alerts} + network_state, alerts, logs = await _gather(req.query) + response = await ai.answer(req.query, network_state, alerts, logs) + return {"response": response, "network_state": network_state, "alerts": alerts, "logs": logs} async def _gather(query_text: str): import asyncio nfs_task = asyncio.create_task(cluster_inventory.get_network_status()) alerts_task = asyncio.create_task(alertmanager.get_alerts()) - network_state, alerts = await asyncio.gather(nfs_task, alerts_task) - return network_state, alerts + logs_task = asyncio.to_thread(log_ingest.get_events, 200) + network_state, alerts, logs = await asyncio.gather(nfs_task, alerts_task, logs_task) + return network_state, alerts, logs diff --git a/app/services/__pycache__/ai.cpython-314.pyc b/app/services/__pycache__/ai.cpython-314.pyc index 3f84970..76e80f4 100644 Binary files a/app/services/__pycache__/ai.cpython-314.pyc and b/app/services/__pycache__/ai.cpython-314.pyc differ diff --git a/app/services/__pycache__/alertmanager.cpython-314.pyc b/app/services/__pycache__/alertmanager.cpython-314.pyc index 8294fd0..67eb625 100644 Binary files a/app/services/__pycache__/alertmanager.cpython-314.pyc and b/app/services/__pycache__/alertmanager.cpython-314.pyc differ diff --git a/app/services/__pycache__/cluster_inventory.cpython-314.pyc b/app/services/__pycache__/cluster_inventory.cpython-314.pyc index d810518..1524df5 100644 Binary files a/app/services/__pycache__/cluster_inventory.cpython-314.pyc and b/app/services/__pycache__/cluster_inventory.cpython-314.pyc differ diff --git a/app/services/__pycache__/log_analyzer.cpython-314.pyc b/app/services/__pycache__/log_analyzer.cpython-314.pyc index a5ac562..aa8de51 100644 Binary files a/app/services/__pycache__/log_analyzer.cpython-314.pyc and b/app/services/__pycache__/log_analyzer.cpython-314.pyc differ diff --git a/app/services/__pycache__/log_ingest.cpython-314.pyc b/app/services/__pycache__/log_ingest.cpython-314.pyc new file mode 100644 index 0000000..ce39e8a Binary files /dev/null and b/app/services/__pycache__/log_ingest.cpython-314.pyc differ diff --git a/app/services/__pycache__/log_rules.cpython-314.pyc b/app/services/__pycache__/log_rules.cpython-314.pyc new file mode 100644 index 0000000..e89bbcd Binary files /dev/null and b/app/services/__pycache__/log_rules.cpython-314.pyc differ diff --git a/app/services/__pycache__/pls.cpython-314.pyc b/app/services/__pycache__/pls.cpython-314.pyc index f14e2dc..58b8eed 100644 Binary files a/app/services/__pycache__/pls.cpython-314.pyc and b/app/services/__pycache__/pls.cpython-314.pyc differ diff --git a/app/services/ai.py b/app/services/ai.py index ca1b9d5..ce4d3da 100644 --- a/app/services/ai.py +++ b/app/services/ai.py @@ -6,6 +6,7 @@ Phase 2: swap MARVIS_AI_MODE=openai or MARVIS_AI_MODE=ollama to route through LL """ from datetime import datetime +import re from app.config import ( AI_MODE, CONTAINER_RUNTIME, @@ -17,23 +18,25 @@ from app.config import ( ) -async def answer(query: str, network_state: dict, alerts: list) -> str: +async def answer(query: str, network_state: dict, alerts: list, logs: list[dict] | None = None) -> str: if AI_MODE == "openai": - return await _call_openai(query, network_state, alerts) + return await _call_openai(query, network_state, alerts, logs or []) if AI_MODE == "ollama": - return await _call_ollama(query, network_state, alerts) - return _rule_based(query, network_state, alerts) + return await _call_ollama(query, network_state, alerts, logs or []) + return _rule_based(query, network_state, alerts, logs or []) # ── Rule-based engine ────────────────────────────────────────────────────── -def _rule_based(query: str, network_state: dict, alerts: list) -> str: +def _rule_based(query: str, network_state: dict, alerts: list, logs: list[dict]) -> str: q = query.lower() nfs = network_state.get("nfs", []) cluster = network_state.get("cluster", {}) up = [n for n in nfs if n["state"] == "up"] down = [n for n in nfs if n["state"] == "down"] + log_hits = _find_log_hits(q, logs) + if any(w in q for w in ["hello", "hi ", "hey", "howdy"]): return ("Hello! I'm **P5G Marvis**, your AI network assistant for HPE Private 5G.\n" "Ask me about network health, specific functions, alerts, or performance.") @@ -53,22 +56,25 @@ def _rule_based(query: str, network_state: dict, alerts: list) -> str: from app.config import ALL_NFS for nf_name in ALL_NFS: if nf_name.lower() in q: - return _nf_detail(nf_name, nfs, alerts) + return _nf_detail(nf_name, nfs, alerts, log_hits) if any(w in q for w in ["alert", "alarm", "warning", "critical", "incident", "problem", "issue"]): return _alerts_summary(alerts) + if any(w in q for w in ["log", "trace", "journal", "message", "error"]): + return _log_summary(log_hits, logs) + if any(w in q for w in ["subscriber", "ue ", "device", "phone", "handset", "registration", "attach"]): - return _subscriber_analysis(nfs, alerts, cluster) + return _subscriber_analysis(nfs, alerts, cluster, log_hits) if any(w in q for w in ["session", "pdu", "bearer", "user plane", "traffic", "throughput"]): - return _session_analysis(nfs, alerts, cluster) + return _session_analysis(nfs, alerts, cluster, log_hits) # Default → health summary - return _health_summary(up, down, alerts, cluster) + return _health_summary(up, down, alerts, cluster, log_hits) -def _health_summary(up: list, down: list, alerts: list, cluster: dict) -> str: +def _health_summary(up: list, down: list, alerts: list, cluster: dict, log_hits: list[dict]) -> str: ts = datetime.now().strftime("%H:%M:%S") crit = [a for a in alerts if a.get("severity") == "critical"] warn = [a for a in alerts if a.get("severity") != "critical"] @@ -104,13 +110,21 @@ def _health_summary(up: list, down: list, alerts: list, cluster: dict) -> str: if not down and not alerts: lines.append("\n🟢 All systems nominal.") + if log_hits: + lines.append(f"\n🧾 **Relevant log hits ({len(log_hits)})**") + for hit in log_hits[:4]: + lines.append( + f" • {hit.get('timestamp','')} — {hit.get('node','unknown')} {hit.get('nf','SYSTEM')}: " + f"{_trim_message(hit.get('message',''))}" + ) return "\n".join(lines) -def _nf_detail(nf_name: str, nfs: list, alerts: list) -> str: +def _nf_detail(nf_name: str, nfs: list, alerts: list, log_hits: list[dict]) -> str: nf = next((n for n in nfs if n["name"] == nf_name), None) nf_alerts = [a for a in alerts if nf_name in a.get("name", "") or nf_name.lower() in a.get("instance", "").lower()] + nf_logs = [hit for hit in log_hits if hit.get("nf") == nf_name] if not nf or nf["state"] == "unknown": return (f"ℹ️ No Prometheus data found for **{nf_name}**.\n" @@ -132,6 +146,13 @@ def _nf_detail(nf_name: str, nfs: list, alerts: list) -> str: lines.append(f" → {a['name']}: {a.get('summary', '')}") else: lines.append("No active alerts for this function.") + if nf_logs: + lines.append(f"\n🧾 Recent {nf_name} log evidence:") + for hit in nf_logs[:4]: + lines.append( + f" • {hit.get('timestamp','')} on {hit.get('node','unknown')}: " + f"{_trim_message(hit.get('message',''))}" + ) return "\n".join(lines) @@ -151,7 +172,7 @@ def _alerts_summary(alerts: list) -> str: return "\n".join(lines) -def _subscriber_analysis(nfs: list, alerts: list, cluster: dict) -> str: +def _subscriber_analysis(nfs: list, alerts: list, cluster: dict, log_hits: list[dict]) -> str: amf = next((n for n in nfs if n["name"] == "AMF"), None) smf = next((n for n in nfs if n["name"] == "SMF"), None) lines = ["**Subscriber & Registration Analysis**\n"] @@ -163,11 +184,18 @@ def _subscriber_analysis(nfs: list, alerts: list, cluster: dict) -> str: lines.append(f"\n⚠️ {len(sub_alerts)} subscriber-related alert(s) active.") else: lines.append("\nNo subscriber-related alerts detected.") + sub_logs = [hit for hit in log_hits if any(key in hit.get("message", "").lower() for key in ["imsi", "supi", "registration", "attach", "subscriber"])] + if sub_logs: + lines.append("\nRecent subscriber-related log evidence:") + for hit in sub_logs[:4]: + lines.append( + f"• {hit.get('nf','SYSTEM')} on {hit.get('node','unknown')}: {_trim_message(hit.get('message',''))}" + ) lines.append(_cluster_scope(cluster)) return "\n".join(lines) -def _session_analysis(nfs: list, alerts: list, cluster: dict) -> str: +def _session_analysis(nfs: list, alerts: list, cluster: dict, log_hits: list[dict]) -> str: smf = next((n for n in nfs if n["name"] == "SMF"), None) upf = next((n for n in nfs if n["name"] == "UPF"), None) lines = ["**PDU Session & Data Plane Analysis**\n"] @@ -177,10 +205,38 @@ def _session_analysis(nfs: list, alerts: list, cluster: dict) -> str: lines.append("\n⚡ **Impact**: PDU sessions will fail until both SMF and UPF are operational.") else: lines.append("\nBoth SMF and UPF operational — sessions should be establishing normally.") + session_logs = [hit for hit in log_hits if hit.get("nf") in {"SMF", "UPF"}] + if session_logs: + lines.append("\nRecent session/data-plane log evidence:") + for hit in session_logs[:4]: + lines.append( + f"• {hit.get('nf','SYSTEM')} on {hit.get('node','unknown')}: {_trim_message(hit.get('message',''))}" + ) lines.append(_cluster_scope(cluster)) return "\n".join(lines) +def _log_summary(log_hits: list[dict], logs: list[dict]) -> str: + if not logs: + return "ℹ️ No ingested logs are currently available." + if not log_hits: + latest = max(logs, key=lambda event: event.get("epoch", 0.0), default=None) + if latest: + return ( + "ℹ️ I do not see direct log matches for that question.\n\n" + f"Latest ingested log: {latest.get('timestamp','')} on {latest.get('node','unknown')} " + f"{latest.get('nf','SYSTEM')} — {_trim_message(latest.get('message',''))}" + ) + return "ℹ️ No relevant log matches were found." + lines = [f"🧾 **Relevant log matches ({len(log_hits)})**\n"] + for hit in log_hits[:8]: + lines.append( + f"• {hit.get('timestamp','')} — {hit.get('node','unknown')} {hit.get('nf','SYSTEM')}: " + f"{_trim_message(hit.get('message',''))}" + ) + return "\n".join(lines) + + def _nf_label(nf: dict) -> str: placements = nf.get("nodes", []) if not placements: @@ -207,24 +263,30 @@ def _cluster_scope(cluster: dict) -> str: # ── LLM backends ────────────────────────────────────────────────────────── -def _build_context(network_state: dict, alerts: list) -> str: +def _build_context(network_state: dict, alerts: list, logs: list[dict]) -> str: nfs = network_state.get("nfs", []) up = [n["name"] for n in nfs if n["state"] == "up"] down = [n["name"] for n in nfs if n["state"] == "down"] nodes = network_state.get("cluster", {}).get("nodes", []) node_summary = ", ".join(f"{node['hostname']} ({node.get('role', 'AP')})" for node in nodes) or "none" + recent_logs = logs[-10:] if logs else [] + log_summary = "; ".join( + f"{entry.get('timestamp','')} {entry.get('node','unknown')} {entry.get('nf','SYSTEM')}: {_trim_message(entry.get('message',''), 120)}" + for entry in recent_logs + ) or "none" return ( f"NFs UP: {', '.join(up) or 'none'}\n" f"NFs DOWN: {', '.join(down) or 'none'}\n" f"Cluster nodes: {node_summary}\n" - f"Active alerts: {', '.join(a.get('name','') for a in alerts[:5]) or 'none'}" + f"Active alerts: {', '.join(a.get('name','') for a in alerts[:5]) or 'none'}\n" + f"Recent logs: {log_summary}" ) -async def _call_openai(query: str, network_state: dict, alerts: list) -> str: +async def _call_openai(query: str, network_state: dict, alerts: list, logs: list[dict]) -> str: try: import httpx - ctx = _build_context(network_state, alerts) + ctx = _build_context(network_state, alerts, logs) messages = [ {"role": "system", "content": f"You are P5G Marvis, an AI network assistant for HPE Private 5G.\n" @@ -247,13 +309,13 @@ async def _call_openai(query: str, network_state: dict, alerts: list) -> str: # some reasoning models put the answer in content, others in reasoning_content return msg.get("content") or msg.get("reasoning_content") or "(empty response)" except Exception as e: - return f"LLM error: {e}\n\n" + _rule_based(query, network_state, alerts) + return f"LLM error: {e}\n\n" + _rule_based(query, network_state, alerts, logs) -async def _call_ollama(query: str, network_state: dict, alerts: list) -> str: +async def _call_ollama(query: str, network_state: dict, alerts: list, logs: list[dict]) -> str: try: import httpx - ctx = _build_context(network_state, alerts) + ctx = _build_context(network_state, alerts, logs) prompt = (f"You are P5G Marvis, an AI network assistant.\n" f"Network state:\n{ctx}\n\nUser: {query}\nAssistant:") async with httpx.AsyncClient(timeout=60) as client: @@ -263,4 +325,34 @@ async def _call_ollama(query: str, network_state: dict, alerts: list) -> str: ) return resp.json().get("response", "No response.") except Exception as e: - return f"Ollama error: {e}\n\n" + _rule_based(query, network_state, alerts) + return f"Ollama error: {e}\n\n" + _rule_based(query, network_state, alerts, logs) + + +def _find_log_hits(query: str, logs: list[dict]) -> list[dict]: + terms = [term for term in re.findall(r"[a-z0-9_-]+", query.lower()) if len(term) >= 3] + if not logs or not terms: + return [] + hits = [] + for event in logs: + haystack = " ".join( + [ + str(event.get("nf", "")).lower(), + str(event.get("node", "")).lower(), + str(event.get("source", "")).lower(), + str(event.get("message", "")).lower(), + ] + ) + score = sum(1 for term in terms if term in haystack) + if score: + event_copy = dict(event) + event_copy["_score"] = score + hits.append(event_copy) + hits.sort(key=lambda event: (event.get("_score", 0), event.get("epoch", 0.0)), reverse=True) + return hits + + +def _trim_message(message: str, limit: int = 160) -> str: + message = " ".join(str(message).split()) + if len(message) <= limit: + return message + return message[: limit - 3] + "..." diff --git a/app/services/alertmanager.py b/app/services/alertmanager.py index c3c02ed..bb2aa0e 100644 --- a/app/services/alertmanager.py +++ b/app/services/alertmanager.py @@ -1,14 +1,31 @@ -"""Alertmanager client.""" +"""Alert sources: Alertmanager plus log-derived alerts.""" +import asyncio +import json import httpx from app.config import ALERTMANAGER_URL -from app.services import cluster_inventory +from app.services import cluster_inventory, log_ingest _BASE = ALERTMANAGER_URL.rstrip("/") async def get_alerts() -> list: - """Return normalised list of active alerts from Alertmanager.""" + """Return normalised list of active alerts from Alertmanager and log analysis.""" + cluster = await cluster_inventory.get_cluster_inventory() + alertmanager_task = asyncio.create_task(_get_alertmanager_alerts(cluster)) + log_task = asyncio.to_thread(_get_log_alerts, cluster) + am_alerts, log_alerts = await asyncio.gather(alertmanager_task, log_task, return_exceptions=True) + if isinstance(am_alerts, Exception): + am_alerts = [] + if isinstance(log_alerts, Exception): + log_alerts = [] + return sorted( + [*am_alerts, *log_alerts], + key=lambda alert: (_severity_rank(alert.get("severity")), alert.get("timestamp", "")), + ) + + +async def _get_alertmanager_alerts(cluster: dict) -> list: try: async with httpx.AsyncClient(timeout=5) as client: r = await client.get(f"{_BASE}/api/v2/alerts", params={"active": "true", "silenced": "false"}) @@ -17,7 +34,6 @@ async def get_alerts() -> list: except Exception: return [] - cluster = await cluster_inventory.get_cluster_inventory() alerts = [] for a in raw: labels = a.get("labels", {}) @@ -33,10 +49,62 @@ async def get_alerts() -> list: "summary": summary, "nf": nf_name, "nodes": nodes, + "source": "alertmanager", + "timestamp": a.get("startsAt", ""), }) return alerts +def _get_log_alerts(cluster: dict) -> list: + node_map = {} + for node in cluster.get("nodes", []): + if node.get("hostname"): + node_map[node["hostname"]] = node + if node.get("address"): + node_map[node["address"]] = node + + alerts = [] + for ctx in log_ingest.recent_alert_context(limit=50): + before = _decode_context(ctx.get("before_context")) + after = _decode_context(ctx.get("after_context")) + node_name = ctx.get("node", "") + nodes = [] + if node_name and node_name in node_map: + nodes = [node_map[node_name]] + alerts.append({ + "name": f"{ctx.get('nf') or 'System'} log anomaly", + "severity": ctx.get("severity", "warning"), + "instance": ctx.get("source", ""), + "summary": ctx.get("description", "Log-derived alert"), + "nf": ctx.get("nf", ""), + "nodes": nodes, + "source": "logs", + "timestamp": ctx.get("event_ts", ""), + "context_id": ctx.get("id"), + "node": node_name, + "match_message": ctx.get("match_message", ""), + "context_preview": { + "before": before[-3:], + "after": after[:3], + }, + }) + return alerts + + +def _decode_context(value: str | None) -> list[dict]: + if not value: + return [] + try: + data = json.loads(value) + return data if isinstance(data, list) else [] + except Exception: + return [] + + +def _severity_rank(severity: str | None) -> int: + return {"critical": 0, "warning": 1, "info": 2}.get((severity or "warning").lower(), 3) + + def _infer_nf(name: str, summary: str, instance: str) -> str: text = f"{name} {summary} {instance}".upper() for nf_name in ["AMF", "SMF", "UPF", "UDM", "UDR", "NRF", "AUSF", "PCF", "MME", "SGWC", "DRA", "DSM"]: diff --git a/app/services/cluster_inventory.py b/app/services/cluster_inventory.py index 7451cca..4febc8d 100644 --- a/app/services/cluster_inventory.py +++ b/app/services/cluster_inventory.py @@ -8,6 +8,8 @@ import re from app.config import ALL_NFS from app.services import pls, prometheus +_last_inventory: dict | None = None + ROLE_NF_MAP = { "5GALL": {"amf", "smf", "pcf", "udr", "udm", "nrf", "eir", "ausf", "dra", "upf", "chf", "smsf", "aaa", "bmsc"}, "CP": {"amf", "smf", "pcf", "udr", "udm", "nrf", "eir", "ausf", "dra", "chf", "smsf", "aaa", "bmsc"}, @@ -41,9 +43,10 @@ def _infer_role(hostname: str) -> str: async def get_cluster_inventory() -> dict: + global _last_inventory cluster = await pls.get_cluster_status() if not cluster: - return { + return _last_inventory or { "enabled": False, "current_node": None, "fully_established": False, @@ -78,12 +81,14 @@ async def get_cluster_inventory() -> dict: } ) - return { + inventory = { "enabled": True, "current_node": cluster.get("current_node"), "fully_established": bool(cluster.get("fully_established")), "nodes": nodes, } + _last_inventory = inventory + return inventory def _aggregate_nf_state(nf_name: str, nodes: list[dict], prom_states: dict[str, dict]) -> dict: @@ -137,8 +142,14 @@ def _attach_node_nf_status(nodes: list[dict]) -> list[dict]: enriched = [] for node in nodes: node_copy = dict(node) - expected_nfs = node_copy.get("expected_nfs", []) - node_copy["nfs"] = [_node_nf_state(node_copy, nf_name.upper()) for nf_name in expected_nfs] + expected_nfs = {nf.upper() for nf in node_copy.get("expected_nfs", [])} + started_nf_services = { + svc.get("name", "").upper() + for svc in node_copy.get("services", []) + if svc.get("type") == "nf" and svc.get("name") + } + visible_nfs = sorted(expected_nfs | started_nf_services) + node_copy["nfs"] = [_node_nf_state(node_copy, nf_name.upper()) for nf_name in visible_nfs] enriched.append(node_copy) return enriched diff --git a/app/services/log_analyzer.py b/app/services/log_analyzer.py index 6b4a95c..4920e2b 100644 --- a/app/services/log_analyzer.py +++ b/app/services/log_analyzer.py @@ -1,8 +1,4 @@ -""" -log_analyzer.py — Reads P5G NF container logs and active Prometheus/Alertmanager -data to produce a structured list of recommended remediation actions, grouped -by category. This is the data backend powering the /api/actions endpoint. -""" +"""Structured issue generation from ingested cross-node log events and state.""" import asyncio import re @@ -10,7 +6,13 @@ import time from collections import deque from datetime import datetime -from app.config import CONTAINER_HOST, CONTAINER_RUNTIME +from app.config import ( + CONTAINER_HOST, + CONTAINER_RUNTIME, + LOG_ALERT_CONTEXT_AFTER, + LOG_ALERT_CONTEXT_BEFORE, +) +from app.services.log_rules import load_category_patterns # ── In-memory history (up to 96 snapshots ≈ 48 min at 30 s refresh) ──────── _history: deque = deque(maxlen=96) @@ -29,99 +31,6 @@ CATEGORY_COLORS: dict[str, str] = { ALL_CATEGORIES = ["Registration", "Authentication", "Security", "Sessions", "Connectivity", "Policy"] -# ── Log-pattern definitions ────────────────────────────────────────────────── -# Each entry: (regex, affected_nf, severity, short_description, remediation) -CATEGORY_PATTERNS: dict[str, list[tuple]] = { - "Registration": [ - (r"RegistrationFailure|UeRegistrationFailed|N1.*[Rr]egistration.*[Ff]ail", - "AMF", "critical", - "UE registration failure", - "Check AMF logs for NGAP errors; verify UE credentials and NRF registration."), - (r"N2SetupFail|NgapSetupFail|N2.*[Tt]imeout|NgapProcedure.*failed", - "AMF", "critical", - "N2 interface setup failure", - "Verify gNB connectivity to AMF; check SCTP transport and NGAP PLMN config."), - (r"InitialContextSetupFail|UeContextRelease.*[Aa]bnormal", - "AMF", "warning", - "UE context setup failure", - "Review AMF-SMF N11 interface; check subscriber profile in UDM/UDR."), - (r"PagingFail|UeUnreachable|UeNotFound", - "AMF", "warning", - "UE paging failure", - "Verify UE is registered; check AMF tracking area configuration."), - ], - "Sessions": [ - (r"PduSessionEstablishmentReject|PduSession.*[Ff]ail|CreateSessionResponse.*[Ff]ail", - "SMF", "critical", - "PDU session establishment failure", - "Check SMF-UPF N4 path; verify DNN/APN config and UPF N3/N9 interfaces."), - (r"N4Session.*[Ff]ail|PfcpSession.*[Ee]rror|N4.*[Tt]imeout|PfcpAssociation.*[Ff]ail", - "UPF", "critical", - "N4/PFCP session error", - "Restart PFCP association between SMF and UPF; check N4 IP reachability."), - (r"IpAllocationFail|AddressPoolExhausted|NoIpAvailable", - "SMF", "critical", - "IP address pool exhausted", - "Expand UE IP address pool in SMF config; review active session count."), - (r"SessionModification.*[Ff]ail|BearerModification.*[Ee]rror", - "SMF", "warning", - "Session modification failure", - "Check PCF policy consistency; verify QoS parameters match UPF capabilities."), - ], - "Authentication": [ - (r"AuthenticationFailure|AuthReject|EapFailure|5g-aka.*[Ff]ail|EapAkaFailure", - "AUSF", "critical", - "UE authentication failure", - "Verify USIM credentials match UDM subscriber data; check AUSF-UDM N12 link."), - (r"UdmAuthReq.*[Ee]rror|SuciDeconceal.*[Ff]ail|UdmUeAuth.*[Ee]rror", - "UDM", "critical", - "UDM authentication error", - "Check UDM-UDR N35 connectivity; verify Home Network Public Key configuration."), - (r"AuthVectorFetch.*[Ff]ail|AusfUeAuth.*[Rr]eject|HssAuth.*[Ff]ail", - "AUSF", "warning", - "Auth vector fetch failure", - "Review UDR data integrity for affected SUPI; check AUSF-UDM TLS certificates."), - ], - "Connectivity": [ - (r"NfDiscovery.*[Ff]ail|NrfRegistration.*[Ff]ail|NfDeregistration.*unexpect", - "NRF", "warning", - "NF service discovery failure", - "Verify NRF is reachable from all NFs; check NRF registration TTL and heartbeat."), - (r"ServiceUnavailable.*NF|HTTP.*503.*NF|NfProfile.*expired", - "NRF", "warning", - "NF service unavailable", - "Check NF pod health and SBI listen port; review NRF subscription notifications."), - (r"SbiRequest.*[Tt]imeout|SbiConn.*[Ff]ail|Http2.*[Ee]rror", - "NRF", "warning", - "SBI interface timeout", - "Inspect inter-NF network MTU and TLS handshake; check load balancer config."), - ], - "Policy": [ - (r"PcfSmPolicy.*[Ee]rror|PolicyDecision.*[Ff]ail|SmPolicy.*[Rr]eject", - "PCF", "warning", - "Policy decision failure", - "Review PCF policy rules and subscriber group config; check PCF-UDR N36 link."), - (r"QosEnforce.*[Ff]ail|ChargingRule.*[Ee]rror|PccRule.*[Rr]eject", - "PCF", "warning", - "QoS policy enforcement failure", - "Verify QoS profiles match UPF capabilities; check PCF-CHF N40 charging path."), - ], - "Security": [ - (r"SecurityMode.*[Ff]ail|IntegrityCheck.*[Ff]ail|NasIntegrity.*[Ee]rror", - "AMF", "critical", - "NAS security mode failure", - "Check AMF cipher/integrity algorithm priority list matches UE capabilities."), - (r"TlsHandshake.*[Ff]ail|Certificate.*[Ee]xpir|x509.*[Ee]rror|CertVerify.*[Ff]ail", - "AMF", "critical", - "TLS/certificate error", - "Renew expired certificates; verify trust chain between NFs; check SBI TLS config."), - (r"SuciProtection.*[Ff]ail|PrivacyProtection.*[Ee]rror|HomeNetworkKey.*[Ee]rror", - "UDM", "warning", - "SUCI privacy protection error", - "Verify Home Network Public Key provisioning on UDM; check SUPI revealing config."), - ], -} - # ── NF → possible container name fragments (tried in order) ───────────────── NF_CONTAINER_HINTS: dict[str, list[str]] = { "AMF": ["amf"], @@ -191,13 +100,13 @@ async def _read_logs(container: str, tail: int = 400) -> str: return "" -def _match_count(text: str, pattern: str) -> int: - if not text: - return 0 +def _rule_matches(message: str, pattern: str) -> bool: + if not message: + return False try: - return len(re.findall(pattern, text, re.IGNORECASE | re.MULTILINE)) + return bool(re.search(pattern, message, re.IGNORECASE | re.MULTILINE)) except re.error: - return 0 + return False # ── Category/NF mapping for Alertmanager alerts ────────────────────────────── @@ -235,50 +144,104 @@ async def analyze_logs() -> dict: Gather log-pattern issues + Prometheus NF status + Alertmanager alerts. Returns a fully structured dict ready for JSON serialisation. """ - from app.services import alertmanager, prometheus, cluster_inventory + from app.services import alertmanager, cluster_inventory, log_ingest, prometheus # Kick off all I/O in parallel containers_f = asyncio.create_task(_discover_containers()) alerts_f = asyncio.create_task(alertmanager.get_alerts()) nf_status_f = asyncio.create_task(prometheus.get_nf_status()) cluster_f = asyncio.create_task(cluster_inventory.get_cluster_inventory()) + events_f = asyncio.to_thread(log_ingest.get_events) containers = await containers_f - alerts, nf_statuses, cluster = await asyncio.gather(alerts_f, nf_status_f, cluster_f, - return_exceptions=True) + alerts, nf_statuses, cluster, events = await asyncio.gather( + alerts_f, nf_status_f, cluster_f, events_f, return_exceptions=True + ) if isinstance(alerts, Exception): alerts = [] if isinstance(nf_statuses, Exception): nf_statuses = [] if isinstance(cluster, Exception): cluster = {"enabled": False, "nodes": []} + if isinstance(events, Exception): + events = [] - # Read all container logs concurrently - log_tasks = {nf: asyncio.create_task(_read_logs(cname)) - for nf, cname in containers.items()} - log_texts: dict[str, str] = {} - if log_tasks: - log_results = await asyncio.gather(*log_tasks.values(), return_exceptions=True) - for nf, result in zip(log_tasks.keys(), log_results): - log_texts[nf] = result if isinstance(result, str) else "" - + events = sorted( + [event for event in events if isinstance(event, dict)], + key=lambda event: event.get("epoch", 0.0), + ) issues: list[dict] = [] + grouped_log_issues: dict[tuple[str, str, str, str], dict] = {} - # 1. Log-pattern analysis - for category, patterns in CATEGORY_PATTERNS.items(): - for (pat_re, nf, severity, description, remediation) in patterns: - count = _match_count(log_texts.get(nf, ""), pat_re) - if count: - issues.append({ - "id": f"log-{nf}-{len(issues)}", - "category": category, - "nf": nf, - "severity": severity, - "count": count, - "description": description, - "remediation": remediation, - "source": "log", - }) + # 1. Time-ordered log-pattern analysis across all nodes. + for idx, event in enumerate(events): + message = event.get("message", "") + event_nf = str(event.get("nf", "")).upper() + event_node = event.get("node", "") + for category, patterns in load_category_patterns().items(): + for rule in patterns: + rule_nf = str(rule["nf"]).upper() + if event_nf != rule_nf: + continue + if not _rule_matches(message, rule["pattern"]): + continue + + before_context = events[max(0, idx - LOG_ALERT_CONTEXT_BEFORE):idx] + after_context = events[idx + 1:idx + 1 + LOG_ALERT_CONTEXT_AFTER] + context_id = log_ingest.record_alert_context( + category=category, + nf=rule_nf, + node=event_node, + severity=rule["severity"], + description=rule["description"], + remediation=rule["remediation"], + source="fluentbit", + event=event, + before_context=before_context, + after_context=after_context, + ) + + issue_key = (category, rule_nf, event_node, rule["description"]) + if issue_key not in grouped_log_issues: + grouped_log_issues[issue_key] = { + "id": f"log-{rule_nf}-{len(grouped_log_issues)}", + "category": category, + "nf": rule_nf, + "node": event_node, + "severity": rule["severity"], + "count": 0, + "description": rule["description"], + "remediation": rule["remediation"], + "source": "fluentbit", + "context_id": context_id, + } + grouped_log_issues[issue_key]["count"] += 1 + + issues.extend(grouped_log_issues.values()) + + # Fallback to local container logs until Fluent Bit has populated the buffer. + if not issues and not events: + log_tasks = {nf: asyncio.create_task(_read_logs(cname)) for nf, cname in containers.items()} + if log_tasks: + log_results = await asyncio.gather(*log_tasks.values(), return_exceptions=True) + log_texts = { + nf: result if isinstance(result, str) else "" + for nf, result in zip(log_tasks.keys(), log_results) + } + for category, patterns in load_category_patterns().items(): + for rule in patterns: + nf = rule["nf"] + if _rule_matches(log_texts.get(nf, ""), rule["pattern"]): + issues.append({ + "id": f"log-{nf}-{len(issues)}", + "category": category, + "nf": nf, + "severity": rule["severity"], + "count": 1, + "description": rule["description"], + "remediation": rule["remediation"], + "source": "local-log-fallback", + }) # 2. NF-down events from Prometheus for nf_st in nf_statuses: @@ -337,7 +300,8 @@ async def analyze_logs() -> dict: "total": total, "categories": categories, "timestamp": datetime.now().isoformat(), - "log_sources": list(containers.keys()), + "log_sources": sorted({f"{event.get('node', 'unknown')}:{event.get('nf', 'SYSTEM')}" for event in events}) or list(containers.keys()), + "log_ingest": log_ingest.receiver_status(), "cluster": cluster, } diff --git a/app/services/log_ingest.py b/app/services/log_ingest.py new file mode 100644 index 0000000..b925bff --- /dev/null +++ b/app/services/log_ingest.py @@ -0,0 +1,499 @@ +"""Fluent Bit log ingestion, buffering, and alert-context persistence.""" + +from __future__ import annotations + +import asyncio +import json +import sqlite3 +from collections import deque +from datetime import UTC, datetime +from hashlib import sha1 +from pathlib import Path +from typing import Any + +from app.config import ( + ALL_NFS, + LOG_ALERT_CONTEXT_AFTER, + LOG_ALERT_CONTEXT_BEFORE, + LOG_ALLOWED_NFS, + LOG_ALERT_CONTEXT_DB_MAX_ROWS, + LOG_ALERT_CONTEXT_DB_PATH, + LOG_BUFFER_LINES, + LOG_AUTO_CONFIGURE, + LOG_FLUENTBIT_MATCH, + LOG_INGEST_ENABLED, + LOG_RECEIVER_BIND_HOST, + LOG_RECEIVER_FORMAT, + LOG_RECEIVER_HOST, + LOG_RECEIVER_PORT, + LOG_TRACE_BUFFER_LINES, +) +from app.services import pls + +_server: asyncio.base_events.Server | None = None +_events: deque[dict[str, Any]] = deque(maxlen=max(LOG_BUFFER_LINES, 1)) +_trace_events: deque[dict[str, Any]] = deque(maxlen=max(LOG_TRACE_BUFFER_LINES, LOG_BUFFER_LINES, 1)) +_ingested_total = 0 +_parse_errors = 0 +_last_event_at: str | None = None +_db_initialized = False +_allowed_nfs = {nf.upper() for nf in LOG_ALLOWED_NFS} + + +def _db_path() -> Path: + return Path(LOG_ALERT_CONTEXT_DB_PATH) + + +def _ensure_db() -> None: + global _db_initialized + if _db_initialized: + return + path = _db_path() + path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(path) + try: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS alert_context ( + id TEXT PRIMARY KEY, + fingerprint TEXT UNIQUE, + created_at TEXT NOT NULL, + event_ts TEXT NOT NULL, + category TEXT NOT NULL, + nf TEXT, + node TEXT, + severity TEXT, + description TEXT, + remediation TEXT, + source TEXT, + match_message TEXT, + before_context TEXT, + after_context TEXT + ) + """ + ) + conn.commit() + finally: + conn.close() + _db_initialized = True + + +def _trim_db(conn: sqlite3.Connection) -> None: + conn.execute( + """ + DELETE FROM alert_context + WHERE id NOT IN ( + SELECT id + FROM alert_context + ORDER BY event_ts DESC, created_at DESC + LIMIT ? + ) + """, + (max(LOG_ALERT_CONTEXT_DB_MAX_ROWS, 1),), + ) + + +def _parse_timestamp(value: Any) -> tuple[float, str]: + if value is None: + now = datetime.now(UTC) + return now.timestamp(), now.isoformat() + + if isinstance(value, (int, float)): + raw = float(value) + if raw > 1_000_000_000_000: + raw = raw / 1_000_000.0 + elif raw > 10_000_000_000: + raw = raw / 1000.0 + dt = datetime.fromtimestamp(raw, UTC) + return raw, dt.isoformat() + + text = str(value).strip() + if text.isdigit(): + return _parse_timestamp(int(text)) + + normalized = text.replace("Z", "+00:00") + for candidate in (normalized, normalized.replace(" ", "T")): + try: + dt = datetime.fromisoformat(candidate) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=UTC) + else: + dt = dt.astimezone(UTC) + return dt.timestamp(), dt.isoformat() + except ValueError: + continue + + now = datetime.now(UTC) + return now.timestamp(), now.isoformat() + + +def _candidate_fields(payload: dict[str, Any]) -> list[str]: + candidates = [] + for key in ( + "message", + "MESSAGE", + "log", + "msg", + "systemd_unit", + "_SYSTEMD_UNIT", + "syslog_identifier", + "SYSLOG_IDENTIFIER", + "_COMM", + "comm", + "_EXE", + "container_name", + "tag", + ): + value = payload.get(key) + if value not in (None, ""): + candidates.append(str(value)) + return candidates + + +def _infer_nf(payload: dict[str, Any], message: str) -> str: + haystack = " ".join(_candidate_fields(payload) + [message]).lower() + aliases = { + "upf": "UPF", + "amf": "AMF", + "smf": "SMF", + "udm": "UDM", + "udr": "UDR", + "nrf": "NRF", + "ausf": "AUSF", + "pcf": "PCF", + "mme": "MME", + "sgwc": "SGWC", + "dra": "DRA", + "dsm": "DSM", + "aaa": "AAA", + "bmsc": "BMSC", + "chf": "CHF", + "smsf": "SMSF", + "eir": "EIR", + "licensed": "LICENSED", + "prometheus": "PROMETHEUS", + "alertmanager": "ALERTMANAGER", + "fluent-bit": "FLUENT-BIT", + } + for needle, label in aliases.items(): + if needle in haystack: + return label + return "SYSTEM" + + +def _normalize_event(payload: dict[str, Any], remote_host: str) -> dict[str, Any]: + ts_value = ( + payload.get("timestamp") + or payload.get("@timestamp") + or payload.get("time") + or payload.get("date") + or payload.get("_SOURCE_REALTIME_TIMESTAMP") + ) + epoch, ts_iso = _parse_timestamp(ts_value) + + node = ( + payload.get("hostname") + or payload.get("host") + or payload.get("_HOSTNAME") + or payload.get("syslog_hostname") + or remote_host + ) + source = ( + payload.get("systemd_unit") + or payload.get("_SYSTEMD_UNIT") + or payload.get("syslog_identifier") + or payload.get("SYSLOG_IDENTIFIER") + or payload.get("_COMM") + or payload.get("tag") + or "unknown" + ) + message = ( + payload.get("message") + or payload.get("MESSAGE") + or payload.get("log") + or payload.get("msg") + or "" + ) + message = str(message).strip() + tag = str(payload.get("tag", "")) + nf = _infer_nf(payload, message) + fingerprint = sha1(f"{ts_iso}|{node}|{nf}|{source}|{message}".encode("utf-8")).hexdigest() + return { + "id": fingerprint, + "timestamp": ts_iso, + "epoch": epoch, + "node": str(node), + "nf": nf, + "source": str(source), + "tag": tag, + "message": message, + "raw": payload, + } + + +async def _ingest_payload(payload: dict[str, Any], remote_host: str) -> None: + global _ingested_total, _last_event_at + event = _normalize_event(payload, remote_host) + if event.get("nf", "").upper() not in _allowed_nfs: + return + _events.append(event) + _trace_events.append(event) + _ingested_total += 1 + _last_event_at = event["timestamp"] + + +async def _handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + global _parse_errors + peer = writer.get_extra_info("peername") + remote_host = peer[0] if isinstance(peer, tuple) and peer else "unknown" + try: + while not reader.at_eof(): + line = await reader.readline() + if not line: + break + text = line.decode("utf-8", errors="replace").strip() + if not text: + continue + try: + payload = json.loads(text) + if isinstance(payload, dict): + await _ingest_payload(payload, remote_host) + elif isinstance(payload, list): + for item in payload: + if isinstance(item, dict): + await _ingest_payload(item, remote_host) + except Exception: + _parse_errors += 1 + finally: + writer.close() + await writer.wait_closed() + + +async def startup() -> None: + global _server + _ensure_db() + if not LOG_INGEST_ENABLED or _server is not None: + return + _server = await asyncio.start_server(_handle_client, LOG_RECEIVER_BIND_HOST, LOG_RECEIVER_PORT) + if LOG_AUTO_CONFIGURE: + try: + await configure_site_output() + except Exception: + pass + + +async def shutdown() -> None: + global _server + if _server is None: + return + _server.close() + await _server.wait_closed() + _server = None + + +def receiver_status() -> dict[str, Any]: + return { + "enabled": LOG_INGEST_ENABLED, + "bind_host": LOG_RECEIVER_BIND_HOST, + "receiver_host": LOG_RECEIVER_HOST, + "port": LOG_RECEIVER_PORT, + "format": LOG_RECEIVER_FORMAT, + "allowed_nfs": sorted(_allowed_nfs), + "buffer_lines": LOG_BUFFER_LINES, + "trace_buffer_lines": LOG_TRACE_BUFFER_LINES, + "context_before": LOG_ALERT_CONTEXT_BEFORE, + "context_after": LOG_ALERT_CONTEXT_AFTER, + "db_path": str(_db_path()), + "ingested_total": _ingested_total, + "parse_errors": _parse_errors, + "last_event_at": _last_event_at, + "current_buffer_size": len(_events), + } + + +def current_output_config(receiver_host: str) -> dict[str, Any]: + return { + "name": "tcp", + "match": LOG_FLUENTBIT_MATCH, + "host": receiver_host, + "port": LOG_RECEIVER_PORT, + "format": LOG_RECEIVER_FORMAT, + } + + +def default_input_config() -> dict[str, Any]: + return { + "name": "systemd", + "path": "/var/log/journal", + "tag": "marvis.systemd", + "read_from_tail": "on", + "strip_underscores": "off", + } + + +async def _resolve_receiver_host() -> str: + if LOG_RECEIVER_HOST: + return LOG_RECEIVER_HOST + + cluster = await pls.get_cluster_status() + if isinstance(cluster, dict): + current_node = cluster.get("current_node") + if isinstance(current_node, str) and current_node: + return pls.node_host(current_node) + + system = await pls.get_system_info() + if isinstance(system, dict) and system.get("hostname"): + return str(system["hostname"]) + + return "127.0.0.1" + + +def _merged_fluentbit_config(config: dict[str, Any], receiver_host: str) -> dict[str, Any]: + merged = dict(config or {}) + pipeline = dict(merged.get("pipeline") or {}) + inputs = list(pipeline.get("inputs") or []) + outputs = list(pipeline.get("outputs") or []) + desired = current_output_config(receiver_host) + + if not inputs: + inputs = [default_input_config()] + + filtered = [] + for output in outputs: + if not isinstance(output, dict): + continue + is_existing_marvis = ( + output.get("name") == "tcp" + and output.get("port") == LOG_RECEIVER_PORT + and output.get("format") == LOG_RECEIVER_FORMAT + ) + if not is_existing_marvis: + filtered.append(output) + + filtered.append(desired) + pipeline["inputs"] = inputs + pipeline["outputs"] = filtered + merged["pipeline"] = pipeline + if "parsers" not in merged: + merged["parsers"] = list(config.get("parsers") or []) if isinstance(config, dict) else [] + return merged + + +async def configure_site_output() -> dict[str, Any]: + current = await pls.get_fluentbit_config() + if not isinstance(current, dict): + raise RuntimeError("Could not read current Fluent Bit config from PLS") + receiver_host = await _resolve_receiver_host() + desired = _merged_fluentbit_config(current, receiver_host) + updated = await pls.put_fluentbit_config(desired) + if not isinstance(updated, dict): + raise RuntimeError("PLS rejected Fluent Bit config update") + return { + "receiver_host": receiver_host, + "receiver_port": LOG_RECEIVER_PORT, + "match": LOG_FLUENTBIT_MATCH, + "config": updated, + } + + +def get_events(limit: int | None = None, node: str | None = None, nf: str | None = None, imsi: str | None = None) -> list[dict[str, Any]]: + events = list(_trace_events if imsi else _events) + if node: + node_l = node.lower() + events = [event for event in events if event.get("node", "").lower() == node_l] + if nf: + nf_u = nf.upper() + events = [event for event in events if event.get("nf", "").upper() == nf_u] + if imsi: + needle = imsi.strip() + events = [event for event in events if needle and needle in event.get("message", "")] + events.sort(key=lambda event: event.get("epoch", 0.0)) + if limit is not None: + return events[-limit:] + return events + + +def record_alert_context( + *, + category: str, + nf: str, + node: str, + severity: str, + description: str, + remediation: str, + source: str, + event: dict[str, Any], + before_context: list[dict[str, Any]], + after_context: list[dict[str, Any]], +) -> str: + _ensure_db() + fingerprint = sha1( + "|".join( + [ + category, + nf, + node, + severity, + description, + remediation, + event.get("timestamp", ""), + event.get("message", ""), + ] + ).encode("utf-8") + ).hexdigest() + alert_id = sha1(f"{fingerprint}|{source}".encode("utf-8")).hexdigest() + conn = sqlite3.connect(_db_path()) + try: + conn.execute( + """ + INSERT OR REPLACE INTO alert_context ( + id, fingerprint, created_at, event_ts, category, nf, node, severity, + description, remediation, source, match_message, before_context, after_context + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + alert_id, + fingerprint, + datetime.now(UTC).isoformat(), + event.get("timestamp", ""), + category, + nf, + node, + severity, + description, + remediation, + source, + event.get("message", ""), + json.dumps(before_context), + json.dumps(after_context), + ), + ) + _trim_db(conn) + conn.commit() + finally: + conn.close() + return alert_id + + +def recent_alert_context(limit: int = 20) -> list[dict[str, Any]]: + _ensure_db() + conn = sqlite3.connect(_db_path()) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute( + """ + SELECT id, created_at, event_ts, category, nf, node, severity, description, + remediation, source, match_message, before_context, after_context + FROM alert_context + ORDER BY event_ts DESC, created_at DESC + LIMIT ? + """, + (limit,), + ).fetchall() + return [dict(row) for row in rows] + finally: + conn.close() + + +def known_nfs() -> list[str]: + return list(ALL_NFS) diff --git a/app/services/log_rules.py b/app/services/log_rules.py new file mode 100644 index 0000000..fbf2f63 --- /dev/null +++ b/app/services/log_rules.py @@ -0,0 +1,37 @@ +"""JSON-backed log rule loading for runtime-editable log analysis.""" + +from __future__ import annotations + +import json +from pathlib import Path + +RULES_PATH = Path(__file__).resolve().parents[2] / "config" / "log_rules.json" +_rules_cache: dict[str, list[dict]] | None = None +_rules_cache_mtime: float | None = None + + +def load_category_patterns() -> dict[str, list[dict]]: + global _rules_cache, _rules_cache_mtime + try: + stat = RULES_PATH.stat() + if _rules_cache is not None and _rules_cache_mtime == stat.st_mtime: + return _rules_cache + + data = json.loads(RULES_PATH.read_text()) + categories = data.get("categories", {}) + loaded: dict[str, list[dict]] = {} + for category, rules in categories.items(): + loaded[category] = [] + for rule in rules: + if not all( + key in rule + for key in ("pattern", "nf", "severity", "description", "remediation") + ): + continue + loaded[category].append(rule) + + _rules_cache = loaded + _rules_cache_mtime = stat.st_mtime + return loaded + except Exception: + return {} diff --git a/app/services/pls.py b/app/services/pls.py index 8e62242..5005af8 100644 --- a/app/services/pls.py +++ b/app/services/pls.py @@ -1,4 +1,4 @@ -"""PLS API client for cluster and per-node discovery.""" +"""PLS API client for cluster, per-node discovery, and site-wide config.""" from __future__ import annotations @@ -11,6 +11,10 @@ from app.config import PLS_AUTH_BACKEND, PLS_BASE_URL, PLS_PASSWORD, PLS_USERNAM _token: str | None = None +class PlsRequestError(RuntimeError): + pass + + def _base_url_for_host(host: str | None = None) -> str: if not host: return PLS_BASE_URL.rstrip("/") @@ -18,9 +22,9 @@ def _base_url_for_host(host: str | None = None) -> str: return urlunsplit((parts.scheme, host, parts.path.rstrip("/"), "", "")) -async def _login() -> str | None: +async def _login(force: bool = False) -> str | None: global _token - if _token: + if _token and not force: return _token if not PLS_USERNAME or not PLS_PASSWORD: return None @@ -48,17 +52,45 @@ async def _get(path: str, host: str | None = None) -> dict | list | None: if not token: return None - headers = {"Authorization": f"Bearer {token}"} url = f"{_base_url_for_host(host)}/{path.lstrip('/')}" try: async with httpx.AsyncClient(timeout=5, verify=PLS_VERIFY_TLS) as client: - response = await client.get(url, headers=headers) + response = await client.get(url, headers={"Authorization": f"Bearer {token}"}) + if response.status_code in {401, 403}: + refreshed = await _login(force=True) + if not refreshed: + return None + response = await client.get(url, headers={"Authorization": f"Bearer {refreshed}"}) response.raise_for_status() return response.json() except Exception: return None +async def _put(path: str, payload: dict, host: str | None = None) -> dict | list | None: + token = await _login() + if not token: + raise PlsRequestError("PLS authentication is not configured or login failed") + + url = f"{_base_url_for_host(host)}/{path.lstrip('/')}" + try: + async with httpx.AsyncClient(timeout=8, verify=PLS_VERIFY_TLS) as client: + response = await client.put(url, headers={"Authorization": f"Bearer {token}"}, json=payload) + if response.status_code in {401, 403}: + refreshed = await _login(force=True) + if not refreshed: + raise PlsRequestError("PLS token expired and re-login failed") + response = await client.put(url, headers={"Authorization": f"Bearer {refreshed}"}, json=payload) + if response.is_error: + detail = response.text.strip() + raise PlsRequestError(f"HTTP {response.status_code}: {detail or 'unknown PLS validation error'}") + return response.json() + except PlsRequestError: + raise + except Exception as exc: + raise PlsRequestError(str(exc)) from exc + + def node_host(node_name: str) -> str: return node_name.split("@", 1)[1] if "@" in node_name else node_name @@ -76,3 +108,13 @@ async def get_system_info(host: str | None = None) -> dict | None: async def get_services(host: str | None = None) -> list[dict]: data = await _get("services", host=host) return data if isinstance(data, list) else [] + + +async def get_fluentbit_config() -> dict | None: + data = await _get("fluent-bit/config") + return data if isinstance(data, dict) else None + + +async def put_fluentbit_config(config: dict) -> dict | None: + data = await _put("fluent-bit/config", config) + return data if isinstance(data, dict) else None diff --git a/app/ui/index.html b/app/ui/index.html index e7d149e..942d91c 100644 --- a/app/ui/index.html +++ b/app/ui/index.html @@ -174,14 +174,28 @@ header h1 span { color: var(--muted); font-weight: 400; } padding: 9px 12px; margin-bottom: 7px; border-left: 3px solid var(--yellow); } .alert-row.critical { border-left-color: var(--red); } +.alert-row.logs { border-left-color: var(--blue); } .alert-row-name { font-size: 12px; font-weight: 600; } .alert-row-desc { font-size: 11px; color: var(--muted); margin-top: 2px; } .alert-row-node { font-size: 10px; color: var(--blue); margin-top: 5px; } +.alert-row-meta { display: flex; gap: 6px; align-items: center; margin-top: 6px; flex-wrap: wrap; } +.alert-badge { + font-size: 9px; text-transform: uppercase; letter-spacing: .08em; + border-radius: 999px; padding: 2px 6px; border: 1px solid var(--border); color: var(--muted); +} +.alert-badge.logs { color: var(--blue); border-color: rgba(59,130,246,.4); background: rgba(59,130,246,.12); } +.alert-badge.alertmanager { color: var(--yellow); border-color: rgba(245,158,11,.4); background: rgba(245,158,11,.12); } +.alert-context { + margin-top: 7px; font-family: ui-monospace,SFMono-Regular,Menlo,monospace; + font-size: 10px; line-height: 1.45; color: #c8d1e3; + background: rgba(0,0,0,.18); border: 1px solid rgba(255,255,255,.05); + border-radius: 6px; padding: 7px 8px; white-space: pre-wrap; +} /* ── Chat panel ─────────────────────────────────────────────────── */ -.chat { display: flex; flex-direction: column; overflow: hidden; } +.chat { display: grid; grid-template-rows: auto auto minmax(0,1fr) auto; overflow: hidden; } .messages { - flex: 1; overflow-y: auto; padding: 20px; display: flex; flex-direction: column; gap: 14px; + min-height: 0; overflow-y: auto; padding: 20px; display: flex; flex-direction: column; gap: 14px; } .messages::-webkit-scrollbar { width: 4px; } .messages::-webkit-scrollbar-thumb { background: var(--border); border-radius: 4px; } @@ -241,9 +255,62 @@ header h1 span { color: var(--muted); font-weight: 400; } .send:hover { opacity: .85; } .send:disabled { opacity: .35; cursor: default; } +/* Trace panel */ +.trace-panel { + background: linear-gradient(180deg, rgba(30,37,53,.65), rgba(15,17,23,.95)); + flex-shrink: 0; + display: flex; + flex-direction: column; + min-height: 220px; + max-height: 280px; + border-bottom: 1px solid var(--border); +} +.trace-header { + padding: 12px 20px 10px; + display: flex; align-items: center; justify-content: space-between; gap: 10px; + border-bottom: 1px solid rgba(255,255,255,.04); +} +.trace-title { + font-size: 11px; font-weight: 700; text-transform: uppercase; letter-spacing: .1em; color: var(--muted); +} +.trace-status { font-size: 11px; color: var(--muted); } +.trace-controls { + padding: 10px 20px; + display: grid; grid-template-columns: repeat(4, minmax(0, 1fr)); gap: 8px; +} +.trace-controls select, +.trace-controls input, +.trace-controls button { + background: var(--card); color: var(--text); border: 1px solid var(--border); + border-radius: 8px; padding: 8px 10px; font: inherit; min-width: 0; +} +.trace-controls button { cursor: pointer; } +.trace-controls button:hover { border-color: var(--purple); } +.trace-log { + flex: 1; overflow: auto; padding: 0 20px 16px; +} +.trace-log::-webkit-scrollbar { width: 4px; height: 4px; } +.trace-log::-webkit-scrollbar-thumb { background: var(--border); border-radius: 4px; } +.trace-empty { + color: var(--muted); font-size: 12px; padding-top: 16px; +} +.trace-pre { + font-family: ui-monospace,SFMono-Regular,Menlo,monospace; + font-size: 11px; line-height: 1.55; color: #dbe5f5; white-space: pre-wrap; +} +.trace-line { + display: block; padding: 2px 0; +} +.trace-line .t-ts { color: var(--muted); } +.trace-line .t-node { color: var(--blue); } +.trace-line .t-nf { color: var(--green); } +.trace-line .t-src { color: var(--yellow); } + @media (max-width: 680px) { .layout { grid-template-columns: 1fr; } .left { max-height: 260px; } + .trace-controls { grid-template-columns: 1fr 1fr; } + .trace-panel { max-height: 320px; } } @@ -282,7 +349,25 @@ header h1 span { color: var(--muted); font-weight: 400; }
-
+
+
+
Live Log Trace
+
Waiting for log stream…
+
+
+ + + + +
+
+
No trace data yet.
+
+
@@ -291,6 +376,7 @@ header h1 span { color: var(--muted); font-weight: 400; }
+
@@ -312,6 +398,9 @@ const ROLE_LABELS = { 'COMBOCP': 'Combo CP', 'COMBODCP': 'Combo DCP', }; +let latestCluster = { nodes: [] }; +let allowedTraceNfs = []; +let tracePollHandle = null; function md(text) { // minimal markdown: **bold**, `code`, newlines @@ -342,6 +431,7 @@ function addMsg(role, html, isTyping=false) { async function loadNFs() { try { const d = await (await fetch('./api/network/status')).json(); + latestCluster = d.cluster || { nodes: [] }; const grid = $('nfGrid'); grid.innerHTML = ''; (d.nfs||[]).forEach(nf => { @@ -353,6 +443,7 @@ async function loadNFs() { grid.appendChild(c); }); renderNodes(d.cluster); + populateTraceFilters(d.cluster); $('dot').className = 'dot'; $('connLabel').textContent = 'Live'; } catch { @@ -363,6 +454,25 @@ async function loadNFs() { } } +function populateTraceFilters(cluster) { + const nodes = cluster?.nodes || []; + const nodeSel = $('traceNode'); + const nfSel = $('traceNf'); + const currentNode = nodeSel.value; + const currentNf = nfSel.value; + + const nodeOptions = [''] + .concat(nodes.map(node => ``)); + nodeSel.innerHTML = nodeOptions.join(''); + nodeSel.value = nodes.some(node => node.hostname === currentNode) ? currentNode : ''; + + const nfs = new Set(allowedTraceNfs); + nfSel.innerHTML = [''] + .concat([...nfs].sort().map(nf => ``)) + .join(''); + nfSel.value = nfs.has(currentNf) ? currentNf : ''; +} + function toggleNodeCard(button) { button.closest('.node-card')?.classList.toggle('open'); } @@ -424,10 +534,15 @@ async function loadAlerts() { el.innerHTML = '
No active alerts
'; } else { el.innerHTML = d.alerts.slice(0,10).map(a => - `
+ `
${a.name}
${a.summary||a.instance||''}
${(a.nodes||[]).length ? 'Node: ' + a.nodes.map(n => n.hostname).join(', ') : 'Node: unresolved'}
+
+ ${a.source || 'alertmanager'} + ${a.severity || 'warning'} +
+ ${a.source === 'logs' && a.match_message ? `
${escapeHtml(a.match_message)}
` : ''}
` ).join(''); } @@ -436,7 +551,66 @@ async function loadAlerts() { } } -async function refresh() { await Promise.all([loadNFs(), loadAlerts()]); } +async function loadTraces() { + try { + const limit = Math.max(10, Math.min(1000, parseInt($('traceLines').value || '80', 10) || 80)); + const params = new URLSearchParams({ limit: String(limit) }); + if ($('traceNode').value) params.set('node', $('traceNode').value); + if ($('traceNf').value) params.set('nf', $('traceNf').value); + const d = await (await fetch(`./api/logs/events?${params.toString()}`)).json(); + allowedTraceNfs = (d.status?.allowed_nfs || []).map(nf => String(nf).toUpperCase()); + populateTraceFilters(latestCluster); + const events = d.events || []; + $('traceStatus').textContent = d.status?.last_event_at + ? `Last event ${formatFullDateTime(d.status.last_event_at)}` + : 'Waiting for log stream…'; + if (!events.length) { + $('traceLog').innerHTML = '
No log events match the selected filters.
'; + return; + } + $('traceLog').innerHTML = `
${ + events.map(evt => `${escapeHtml(shortTs(evt.timestamp))} ${escapeHtml(evt.node || 'unknown')} ${escapeHtml(evt.nf || 'SYSTEM')} ${escapeHtml(evt.source || 'unknown')} ${escapeHtml(evt.message || '')}`).join('') + }
`; + $('traceLog').scrollTop = $('traceLog').scrollHeight; + } catch { + $('traceStatus').textContent = 'Trace unavailable'; + $('traceLog').innerHTML = '
Cannot reach trace API.
'; + } +} + +function shortTs(value) { + if (!value) return '--:--:--'; + const dt = new Date(value); + return Number.isNaN(dt.getTime()) + ? value + : dt.toLocaleTimeString([], {hour:'2-digit', minute:'2-digit', second:'2-digit'}); +} + +function formatFullDateTime(value) { + if (!value) return 'unknown'; + const dt = new Date(value); + return Number.isNaN(dt.getTime()) + ? value + : dt.toLocaleString([], { + year: 'numeric', + month: '2-digit', + day: '2-digit', + hour: '2-digit', + minute: '2-digit', + second: '2-digit', + }); +} + +function escapeHtml(value) { + return String(value ?? '') + .replace(/&/g, '&') + .replace(//g, '>') + .replace(/"/g, '"') + .replace(/'/g, '''); +} + +async function refresh() { await Promise.all([loadNFs(), loadAlerts(), loadTraces()]); } // ── Chat ────────────────────────────────────────────────────────────────── async function send() { @@ -478,6 +652,7 @@ function ask(btn) { $('inp').value = btn.textContent; send(); } )); await refresh(); setInterval(refresh, 30000); + tracePollHandle = setInterval(loadTraces, 5000); })(); diff --git a/config/log_rules.json b/config/log_rules.json new file mode 100644 index 0000000..d6c4dfb --- /dev/null +++ b/config/log_rules.json @@ -0,0 +1,149 @@ +{ + "categories": { + "Registration": [ + { + "pattern": "RegistrationFailure|UeRegistrationFailed|N1.*[Rr]egistration.*[Ff]ail", + "nf": "AMF", + "severity": "critical", + "description": "UE registration failure", + "remediation": "Check AMF logs for NGAP errors; verify UE credentials and NRF registration." + }, + { + "pattern": "N2SetupFail|NgapSetupFail|N2.*[Tt]imeout|NgapProcedure.*failed", + "nf": "AMF", + "severity": "critical", + "description": "N2 interface setup failure", + "remediation": "Verify gNB connectivity to AMF; check SCTP transport and NGAP PLMN config." + }, + { + "pattern": "InitialContextSetupFail|UeContextRelease.*[Aa]bnormal", + "nf": "AMF", + "severity": "warning", + "description": "UE context setup failure", + "remediation": "Review AMF-SMF N11 interface; check subscriber profile in UDM/UDR." + }, + { + "pattern": "PagingFail|UeUnreachable|UeNotFound", + "nf": "AMF", + "severity": "warning", + "description": "UE paging failure", + "remediation": "Verify UE is registered; check AMF tracking area configuration." + } + ], + "Sessions": [ + { + "pattern": "PduSessionEstablishmentReject|PduSession.*[Ff]ail|CreateSessionResponse.*[Ff]ail", + "nf": "SMF", + "severity": "critical", + "description": "PDU session establishment failure", + "remediation": "Check SMF-UPF N4 path; verify DNN/APN config and UPF N3/N9 interfaces." + }, + { + "pattern": "N4Session.*[Ff]ail|PfcpSession.*[Ee]rror|N4.*[Tt]imeout|PfcpAssociation.*[Ff]ail", + "nf": "UPF", + "severity": "critical", + "description": "N4/PFCP session error", + "remediation": "Restart PFCP association between SMF and UPF; check N4 IP reachability." + }, + { + "pattern": "IpAllocationFail|AddressPoolExhausted|NoIpAvailable", + "nf": "SMF", + "severity": "critical", + "description": "IP address pool exhausted", + "remediation": "Expand UE IP address pool in SMF config; review active session count." + }, + { + "pattern": "SessionModification.*[Ff]ail|BearerModification.*[Ee]rror", + "nf": "SMF", + "severity": "warning", + "description": "Session modification failure", + "remediation": "Check PCF policy consistency; verify QoS parameters match UPF capabilities." + } + ], + "Authentication": [ + { + "pattern": "AuthenticationFailure|AuthReject|EapFailure|5g-aka.*[Ff]ail|EapAkaFailure", + "nf": "AUSF", + "severity": "critical", + "description": "UE authentication failure", + "remediation": "Verify USIM credentials match UDM subscriber data; check AUSF-UDM N12 link." + }, + { + "pattern": "UdmAuthReq.*[Ee]rror|SuciDeconceal.*[Ff]ail|UdmUeAuth.*[Ee]rror", + "nf": "UDM", + "severity": "critical", + "description": "UDM authentication error", + "remediation": "Check UDM-UDR N35 connectivity; verify Home Network Public Key configuration." + }, + { + "pattern": "AuthVectorFetch.*[Ff]ail|AusfUeAuth.*[Rr]eject|HssAuth.*[Ff]ail", + "nf": "AUSF", + "severity": "warning", + "description": "Auth vector fetch failure", + "remediation": "Review UDR data integrity for affected SUPI; check AUSF-UDM TLS certificates." + } + ], + "Connectivity": [ + { + "pattern": "NfDiscovery.*[Ff]ail|NrfRegistration.*[Ff]ail|NfDeregistration.*unexpect", + "nf": "NRF", + "severity": "warning", + "description": "NF service discovery failure", + "remediation": "Verify NRF is reachable from all NFs; check NRF registration TTL and heartbeat." + }, + { + "pattern": "ServiceUnavailable.*NF|HTTP.*503.*NF|NfProfile.*expired", + "nf": "NRF", + "severity": "warning", + "description": "NF service unavailable", + "remediation": "Check NF pod health and SBI listen port; review NRF subscription notifications." + }, + { + "pattern": "SbiRequest.*[Tt]imeout|SbiConn.*[Ff]ail|Http2.*[Ee]rror", + "nf": "NRF", + "severity": "warning", + "description": "SBI interface timeout", + "remediation": "Inspect inter-NF network MTU and TLS handshake; check load balancer config." + } + ], + "Policy": [ + { + "pattern": "PcfSmPolicy.*[Ee]rror|PolicyDecision.*[Ff]ail|SmPolicy.*[Rr]eject", + "nf": "PCF", + "severity": "warning", + "description": "Policy decision failure", + "remediation": "Review PCF policy rules and subscriber group config; check PCF-UDR N36 link." + }, + { + "pattern": "QosEnforce.*[Ff]ail|ChargingRule.*[Ee]rror|PccRule.*[Rr]eject", + "nf": "PCF", + "severity": "warning", + "description": "QoS policy enforcement failure", + "remediation": "Verify QoS profiles match UPF capabilities; check PCF-CHF N40 charging path." + } + ], + "Security": [ + { + "pattern": "SecurityMode.*[Ff]ail|IntegrityCheck.*[Ff]ail|NasIntegrity.*[Ee]rror", + "nf": "AMF", + "severity": "critical", + "description": "NAS security mode failure", + "remediation": "Check AMF cipher/integrity algorithm priority list matches UE capabilities." + }, + { + "pattern": "TlsHandshake.*[Ff]ail|Certificate.*[Ee]xpir|x509.*[Ee]rror|CertVerify.*[Ff]ail", + "nf": "AMF", + "severity": "critical", + "description": "TLS/certificate error", + "remediation": "Renew expired certificates; verify trust chain between NFs; check SBI TLS config." + }, + { + "pattern": "SuciProtection.*[Ff]ail|PrivacyProtection.*[Ee]rror|HomeNetworkKey.*[Ee]rror", + "nf": "UDM", + "severity": "warning", + "description": "SUCI privacy protection error", + "remediation": "Verify Home Network Public Key provisioning on UDM; check SUPI revealing config." + } + ] + } +} diff --git a/config/marvis.env.example b/config/marvis.env.example index d809e6b..bfbd102 100644 --- a/config/marvis.env.example +++ b/config/marvis.env.example @@ -10,6 +10,22 @@ MARVIS_PLS_PASSWORD= MARVIS_PLS_AUTH_BACKEND=local MARVIS_PLS_VERIFY_TLS=false +# Fluent Bit log ingestion. +MARVIS_LOG_INGEST_ENABLED=true +MARVIS_LOG_AUTO_CONFIGURE=true +MARVIS_LOG_RECEIVER_BIND_HOST=0.0.0.0 +MARVIS_LOG_RECEIVER_HOST= +MARVIS_LOG_RECEIVER_PORT=5514 +MARVIS_LOG_RECEIVER_FORMAT=json_lines +MARVIS_LOG_BUFFER_LINES=1000 +MARVIS_LOG_TRACE_BUFFER_LINES=5000 +MARVIS_LOG_ALERT_CONTEXT_BEFORE=5 +MARVIS_LOG_ALERT_CONTEXT_AFTER=5 +MARVIS_LOG_ALERT_CONTEXT_DB_PATH=/app/data/marvis-alert-context.db +MARVIS_LOG_ALERT_CONTEXT_DB_MAX_ROWS=500 +MARVIS_LOG_FLUENTBIT_MATCH=* +MARVIS_LOG_ALLOWED_NFS=AMF,SMF,UPF,UDM,UDR,NRF,AUSF,PCF,MME,SGWC,DRA,DSM,AAA,BMSC,CHF,SMSF,EIR + # AI backend configuration. MARVIS_AI_MODE=rule MARVIS_OPENAI_API_KEY= diff --git a/config/p5g-marvis.service b/config/p5g-marvis.service index f65d368..d964277 100644 --- a/config/p5g-marvis.service +++ b/config/p5g-marvis.service @@ -16,6 +16,20 @@ Environment=MARVIS_PLS_USERNAME= Environment=MARVIS_PLS_PASSWORD= Environment=MARVIS_PLS_AUTH_BACKEND=local Environment=MARVIS_PLS_VERIFY_TLS=false +Environment=MARVIS_LOG_INGEST_ENABLED=true +Environment=MARVIS_LOG_AUTO_CONFIGURE=true +Environment=MARVIS_LOG_RECEIVER_BIND_HOST=0.0.0.0 +Environment=MARVIS_LOG_RECEIVER_HOST= +Environment=MARVIS_LOG_RECEIVER_PORT=5514 +Environment=MARVIS_LOG_RECEIVER_FORMAT=json_lines +Environment=MARVIS_LOG_BUFFER_LINES=1000 +Environment=MARVIS_LOG_TRACE_BUFFER_LINES=5000 +Environment=MARVIS_LOG_ALERT_CONTEXT_BEFORE=5 +Environment=MARVIS_LOG_ALERT_CONTEXT_AFTER=5 +Environment=MARVIS_LOG_ALERT_CONTEXT_DB_PATH=/app/data/marvis-alert-context.db +Environment=MARVIS_LOG_ALERT_CONTEXT_DB_MAX_ROWS=500 +Environment=MARVIS_LOG_FLUENTBIT_MATCH=* +Environment=MARVIS_LOG_ALLOWED_NFS=AMF,SMF,UPF,UDM,UDR,NRF,AUSF,PCF,MME,SGWC,DRA,DSM,AAA,BMSC,CHF,SMSF,EIR Environment=MARVIS_AI_MODE=rule Environment=MARVIS_OPENAI_API_KEY= Environment=MARVIS_OPENAI_BASE_URL=https://api.openai.com @@ -28,6 +42,7 @@ ExecStartPre=-/usr/bin/docker rm -f p5g-marvis ExecStart=/usr/bin/docker run \ --name p5g-marvis \ --network host \ + --volume /var/lib/p5g-marvis:/app/data \ --env MARVIS_PROMETHEUS_URL \ --env MARVIS_PROMETHEUS_PREFIX \ --env MARVIS_ALERTMANAGER_URL \ @@ -36,6 +51,20 @@ ExecStart=/usr/bin/docker run \ --env MARVIS_PLS_PASSWORD \ --env MARVIS_PLS_AUTH_BACKEND \ --env MARVIS_PLS_VERIFY_TLS \ + --env MARVIS_LOG_INGEST_ENABLED \ + --env MARVIS_LOG_AUTO_CONFIGURE \ + --env MARVIS_LOG_RECEIVER_BIND_HOST \ + --env MARVIS_LOG_RECEIVER_HOST \ + --env MARVIS_LOG_RECEIVER_PORT \ + --env MARVIS_LOG_RECEIVER_FORMAT \ + --env MARVIS_LOG_BUFFER_LINES \ + --env MARVIS_LOG_TRACE_BUFFER_LINES \ + --env MARVIS_LOG_ALERT_CONTEXT_BEFORE \ + --env MARVIS_LOG_ALERT_CONTEXT_AFTER \ + --env MARVIS_LOG_ALERT_CONTEXT_DB_PATH \ + --env MARVIS_LOG_ALERT_CONTEXT_DB_MAX_ROWS \ + --env MARVIS_LOG_FLUENTBIT_MATCH \ + --env MARVIS_LOG_ALLOWED_NFS \ --env MARVIS_AI_MODE \ --env MARVIS_OPENAI_API_KEY \ --env MARVIS_OPENAI_BASE_URL \