diff --git a/app/__pycache__/config.cpython-314.pyc b/app/__pycache__/config.cpython-314.pyc index 3a854e4..e51ccbf 100644 Binary files a/app/__pycache__/config.cpython-314.pyc and b/app/__pycache__/config.cpython-314.pyc differ diff --git a/app/config.py b/app/config.py index 7b06802..acdf55f 100644 --- a/app/config.py +++ b/app/config.py @@ -47,12 +47,19 @@ LOG_RECEIVER_HOST = os.getenv("MARVIS_LOG_RECEIVER_HOST", "") LOG_RECEIVER_PORT = _env_int("MARVIS_LOG_RECEIVER_PORT", 5514) LOG_RECEIVER_FORMAT = os.getenv("MARVIS_LOG_RECEIVER_FORMAT", "json_lines") LOG_BUFFER_LINES = _env_int("MARVIS_LOG_BUFFER_LINES", 1000) +LOG_PROCESS_BUFFER_LINES = _env_int("MARVIS_LOG_PROCESS_BUFFER_LINES", 500) +LOG_SUBSCRIBER_BUFFER_LINES = _env_int("MARVIS_LOG_SUBSCRIBER_BUFFER_LINES", 500) LOG_ALERT_CONTEXT_BEFORE = _env_int("MARVIS_LOG_ALERT_CONTEXT_BEFORE", 5) LOG_ALERT_CONTEXT_AFTER = _env_int("MARVIS_LOG_ALERT_CONTEXT_AFTER", 5) LOG_ALERT_CONTEXT_DB_PATH = os.getenv("MARVIS_LOG_ALERT_CONTEXT_DB_PATH", "/app/data/marvis-alert-context.db") LOG_ALERT_CONTEXT_DB_MAX_ROWS = _env_int("MARVIS_LOG_ALERT_CONTEXT_DB_MAX_ROWS", 500) LOG_TRACE_BUFFER_LINES = _env_int("MARVIS_LOG_TRACE_BUFFER_LINES", 5000) LOG_FLUENTBIT_MATCH = os.getenv("MARVIS_LOG_FLUENTBIT_MATCH", "*") +LOG_TRACE_DEBUG_LEVEL = os.getenv("MARVIS_LOG_TRACE_DEBUG_LEVEL", "debug") +LOG_TRACE_TARGET_SERVICES = [item.lower() for item in _env_list( + "MARVIS_LOG_TRACE_TARGET_SERVICES", + ["amf", "smf", "mme", "upf", "sgwc"], +)] LOG_ALLOWED_NFS = [item.upper() for item in _env_list( "MARVIS_LOG_ALLOWED_NFS", ["AMF", "SMF", "UPF", "UDM", "UDR", "NRF", "AUSF", "PCF", "MME", "SGWC", "DRA", "DSM", "AAA", "BMSC", "CHF", "SMSF", "EIR"], diff --git a/app/routers/__pycache__/logs.cpython-314.pyc b/app/routers/__pycache__/logs.cpython-314.pyc index 2a3982e..6025886 100644 Binary files a/app/routers/__pycache__/logs.cpython-314.pyc and b/app/routers/__pycache__/logs.cpython-314.pyc differ diff --git a/app/routers/logs.py b/app/routers/logs.py index 79ca3e5..d27414f 100644 --- a/app/routers/logs.py +++ b/app/routers/logs.py @@ -34,3 +34,19 @@ async def configure_log_shipping(): return await log_ingest.configure_site_output() except Exception as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc + + +@router.post("/logs/traces/start") +async def start_trace(supi: str = Query(min_length=3)): + try: + return await log_ingest.start_subscriber_trace(supi) + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) from exc + + +@router.post("/logs/traces/stop") +async def stop_trace(): + try: + return await log_ingest.stop_subscriber_trace() + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) from exc diff --git a/app/services/__pycache__/ai.cpython-314.pyc b/app/services/__pycache__/ai.cpython-314.pyc index 76e80f4..62fe178 100644 Binary files a/app/services/__pycache__/ai.cpython-314.pyc and b/app/services/__pycache__/ai.cpython-314.pyc differ diff --git a/app/services/__pycache__/alertmanager.cpython-314.pyc b/app/services/__pycache__/alertmanager.cpython-314.pyc index 67eb625..437ed99 100644 Binary files a/app/services/__pycache__/alertmanager.cpython-314.pyc and b/app/services/__pycache__/alertmanager.cpython-314.pyc differ diff --git a/app/services/__pycache__/log_analyzer.cpython-314.pyc b/app/services/__pycache__/log_analyzer.cpython-314.pyc index aa8de51..29b72b2 100644 Binary files a/app/services/__pycache__/log_analyzer.cpython-314.pyc and b/app/services/__pycache__/log_analyzer.cpython-314.pyc differ diff --git a/app/services/__pycache__/log_ingest.cpython-314.pyc b/app/services/__pycache__/log_ingest.cpython-314.pyc index ce39e8a..acf8f65 100644 Binary files a/app/services/__pycache__/log_ingest.cpython-314.pyc and b/app/services/__pycache__/log_ingest.cpython-314.pyc differ diff --git a/app/services/__pycache__/pls.cpython-314.pyc b/app/services/__pycache__/pls.cpython-314.pyc index 58b8eed..9ddcc90 100644 Binary files a/app/services/__pycache__/pls.cpython-314.pyc and b/app/services/__pycache__/pls.cpython-314.pyc differ diff --git a/app/services/ai.py b/app/services/ai.py index ce4d3da..b91c9f7 100644 --- a/app/services/ai.py +++ b/app/services/ai.py @@ -9,6 +9,7 @@ from datetime import datetime import re from app.config import ( AI_MODE, + ALL_NFS, CONTAINER_RUNTIME, OPENAI_API_KEY, OPENAI_MODEL, @@ -19,6 +20,9 @@ from app.config import ( async def answer(query: str, network_state: dict, alerts: list, logs: list[dict] | None = None) -> str: + special = await _handle_log_queries(query, network_state, alerts, logs or []) + if special: + return special if AI_MODE == "openai": return await _call_openai(query, network_state, alerts, logs or []) if AI_MODE == "ollama": @@ -53,7 +57,6 @@ def _rule_based(query: str, network_state: dict, alerts: list, logs: list[dict]) ) # Specific NF query - from app.config import ALL_NFS for nf_name in ALL_NFS: if nf_name.lower() in q: return _nf_detail(nf_name, nfs, alerts, log_hits) @@ -74,6 +77,60 @@ def _rule_based(query: str, network_state: dict, alerts: list, logs: list[dict]) return _health_summary(up, down, alerts, cluster, log_hits) +async def _handle_log_queries(query: str, network_state: dict, alerts: list, logs: list[dict]) -> str | None: + from app.services import log_analyzer, log_ingest + + q = query.strip() + lowered = q.lower() + + if "trace" in lowered and any(word in lowered for word in ["stop", "end", "disable", "finish"]): + summary = await log_ingest.stop_subscriber_trace() + if not summary.get("started_at"): + return "βΉοΈ No subscriber trace is currently active." + return ( + f"π **Subscriber trace stopped** for `{summary.get('filter')}`\n\n" + f"Started: {summary.get('started_at')}\n" + f"Matched events: **{summary.get('matched_events', 0)}**\n" + f"Restored nodes: {', '.join(summary.get('restored_nodes', [])) or 'none'}" + ) + + trace_target = _extract_trace_target(q) + if trace_target: + state = await log_ingest.start_subscriber_trace(trace_target) + events = log_ingest.get_subscriber_events(trace_target, limit=20) + findings = log_analyzer.summarize_event_slice(events) + return _format_trace_response(trace_target, state, events, findings) + + supi_query = _extract_supi_query(q) + asks_logs = any( + phrase in lowered + for phrase in ["show me the logs", "show logs", "logs for", "what do the logs show", "trace output", "recent logs"] + ) + nf_query = _extract_nf_query(q) + + if supi_query and (_is_bare_supi(q) or "subscriber" in lowered or "supi" in lowered or "imsi" in lowered or asks_logs): + events = log_ingest.get_subscriber_events(supi_query, limit=500) + findings = log_analyzer.summarize_event_slice(events) + return _format_log_slice( + title=f"Subscriber logs for `{supi_query}`", + events=events, + findings=findings, + empty_message=f"βΉοΈ No recent logs matched subscriber `{supi_query}`.", + ) + + if nf_query and ("process" in lowered or asks_logs or "show me" in lowered): + events = log_ingest.get_process_events(nf_query, limit=500) + findings = log_analyzer.summarize_event_slice(events) + return _format_log_slice( + title=f"Process logs for `{nf_query}`", + events=events, + findings=findings, + empty_message=f"βΉοΈ No recent logs are buffered for process `{nf_query}`.", + ) + + return None + + def _health_summary(up: list, down: list, alerts: list, cluster: dict, log_hits: list[dict]) -> str: ts = datetime.now().strftime("%H:%M:%S") crit = [a for a in alerts if a.get("severity") == "critical"] @@ -237,6 +294,90 @@ def _log_summary(log_hits: list[dict], logs: list[dict]) -> str: return "\n".join(lines) +def _extract_supi_query(query: str) -> str: + lowered = query.lower() + match = re.search(r"(imsi-\d{6,20}|\b\d{6,20}\b)", lowered) + if not match: + return "" + token = match.group(1) + if token.startswith("imsi-"): + return token + return f"imsi-{token}" + + +def _is_bare_supi(query: str) -> bool: + cleaned = query.strip().lower() + return bool(re.fullmatch(r"(imsi-\d{6,20}|\d{6,20})", cleaned)) + + +def _extract_nf_query(query: str) -> str: + text = query.upper() + for nf_name in ALL_NFS: + if nf_name in text: + return nf_name + return "" + + +def _extract_trace_target(query: str) -> str: + lowered = query.lower() + if "trace" not in lowered: + return "" + if not any(word in lowered for word in ["start", "run", "begin", "trace"]): + return "" + return _extract_supi_query(query) + + +def _format_log_slice(*, title: str, events: list[dict], findings: list[dict], empty_message: str) -> str: + if not events: + return empty_message + lines = [f"π§Ύ **{title}**", f"Buffered lines: **{len(events)}**\n"] + if findings: + lines.append("Rule hits:") + for finding in findings[:6]: + lines.append( + f"β’ **{finding['severity'].upper()}** {finding['nf']} on {finding.get('node','unknown')}: " + f"{finding['description']}" + ) + lines.append(f" Fix: {finding['remediation']}") + lines.append("") + lines.append("Recent log lines:") + for event in events[-12:]: + lines.append( + f"β’ {event.get('timestamp','')} β {event.get('node','unknown')} {event.get('nf','SYSTEM')}: " + f"{_trim_message(event.get('message',''), 220)}" + ) + return "\n".join(lines) + + +def _format_trace_response(target: str, state: dict, events: list[dict], findings: list[dict]) -> str: + lines = [ + f"π **Subscriber trace active** for `{target}`", + f"Level override: **{state.get('level', 'debug')}**", + f"Nodes updated: {', '.join(state.get('nodes', [])) or 'none'}", + f"Matched events so far: **{state.get('matched_events', 0)}**\n", + ] + if findings: + lines.append("Current rule-based diagnosis:") + for finding in findings[:5]: + lines.append( + f"β’ **{finding['severity'].upper()}** {finding['nf']} on {finding.get('node','unknown')}: " + f"{finding['description']}" + ) + lines.append(f" Fix: {finding['remediation']}") + lines.append("") + if events: + lines.append("Current trace lines:") + for event in events[-10:]: + lines.append( + f"β’ {event.get('timestamp','')} β {event.get('node','unknown')} {event.get('nf','SYSTEM')}: " + f"{_trim_message(event.get('message',''), 220)}" + ) + else: + lines.append("No matching subscriber logs have arrived yet.") + lines.append("\nUse `stop trace` when the attach/session test is complete.") + return "\n".join(lines) + + def _nf_label(nf: dict) -> str: placements = nf.get("nodes", []) if not placements: diff --git a/app/services/alertmanager.py b/app/services/alertmanager.py index bb2aa0e..1d7f12f 100644 --- a/app/services/alertmanager.py +++ b/app/services/alertmanager.py @@ -41,7 +41,7 @@ async def _get_alertmanager_alerts(cluster: dict) -> list: name = labels.get("alertname", "Unknown") summary = annotations.get("summary", annotations.get("description", "")) nf_name = _infer_nf(name, summary, labels.get("instance", "")) - nodes = cluster_inventory.find_nf_nodes(cluster, nf_name) if nf_name else [] + nodes = _resolve_nodes(cluster, labels, name, summary, nf_name) alerts.append({ "name": name, "severity": labels.get("severity", "warning"), @@ -107,7 +107,70 @@ def _severity_rank(severity: str | None) -> int: def _infer_nf(name: str, summary: str, instance: str) -> str: text = f"{name} {summary} {instance}".upper() - for nf_name in ["AMF", "SMF", "UPF", "UDM", "UDR", "NRF", "AUSF", "PCF", "MME", "SGWC", "DRA", "DSM"]: + for nf_name in ["AMF", "SMF", "UPF", "UDM", "UDR", "NRF", "AUSF", "PCF", "MME", "SGWC", "DRA", "DSM", "AAA", "BMSC", "CHF", "SMSF", "EIR"]: if nf_name in text: return nf_name return "" + + +def _resolve_nodes(cluster: dict, labels: dict, name: str, summary: str, nf_name: str) -> list: + nodes = cluster.get("nodes", []) + if not nodes: + return [] + + search_parts = [name, summary] + for key in ("instance", "job", "service", "container", "hostname", "node", "pod"): + value = labels.get(key) + if value: + search_parts.append(value) + search_text = " ".join(search_parts).upper() + + explicit = [] + for node in nodes: + hostname = str(node.get("hostname", "")) + address = str(node.get("address", "")) + node_name = str(node.get("name", "")) + if any(token and token.upper() in search_text for token in (hostname, address, node_name)): + explicit.append(_node_ref(node)) + if explicit: + return _dedupe_nodes(explicit) + + if nf_name: + nf_nodes = cluster_inventory.find_nf_nodes(cluster, nf_name) + if nf_nodes: + return nf_nodes + + service_matches = [] + for node in nodes: + started = {str(service).upper() for service in node.get("started_services", [])} + if any(service and service in search_text for service in started): + service_matches.append(_node_ref(node)) + if service_matches: + return _dedupe_nodes(service_matches) + + current = next((node for node in nodes if node.get("current")), None) + if current and labels.get("instance", "").startswith(("127.0.0.1", "localhost")): + return [_node_ref(current)] + + return [] + + +def _node_ref(node: dict) -> dict: + return { + "hostname": node.get("hostname", ""), + "address": node.get("address", ""), + "role": node.get("role", "AP"), + "current": node.get("current", False), + } + + +def _dedupe_nodes(nodes: list[dict]) -> list[dict]: + seen = set() + result = [] + for node in nodes: + key = (node.get("hostname"), node.get("address")) + if key in seen: + continue + seen.add(key) + result.append(node) + return result diff --git a/app/services/log_analyzer.py b/app/services/log_analyzer.py index 4920e2b..9b513e6 100644 --- a/app/services/log_analyzer.py +++ b/app/services/log_analyzer.py @@ -109,6 +109,39 @@ def _rule_matches(message: str, pattern: str) -> bool: return False +def summarize_event_slice(events: list[dict]) -> list[dict]: + findings: list[dict] = [] + seen: set[tuple[str, str, str, str]] = set() + for event in sorted(events or [], key=lambda item: item.get("epoch", 0.0)): + message = event.get("message", "") + event_nf = str(event.get("nf", "")).upper() + event_node = event.get("node", "") + for category, patterns in load_category_patterns().items(): + for rule in patterns: + rule_nf = str(rule["nf"]).upper() + if rule_nf != event_nf: + continue + if not _rule_matches(message, rule["pattern"]): + continue + key = (category, rule_nf, event_node, rule["description"]) + if key in seen: + continue + seen.add(key) + findings.append( + { + "category": category, + "nf": rule_nf, + "node": event_node, + "severity": rule["severity"], + "description": rule["description"], + "remediation": rule["remediation"], + "message": message, + "timestamp": event.get("timestamp", ""), + } + ) + return findings + + # ββ Category/NF mapping for Alertmanager alerts ββββββββββββββββββββββββββββββ def _alert_category(alert: dict) -> str: diff --git a/app/services/log_ingest.py b/app/services/log_ingest.py index b925bff..931a6c6 100644 --- a/app/services/log_ingest.py +++ b/app/services/log_ingest.py @@ -4,6 +4,7 @@ from __future__ import annotations import asyncio import json +import re import sqlite3 from collections import deque from datetime import UTC, datetime @@ -22,22 +23,43 @@ from app.config import ( LOG_AUTO_CONFIGURE, LOG_FLUENTBIT_MATCH, LOG_INGEST_ENABLED, + LOG_PROCESS_BUFFER_LINES, LOG_RECEIVER_BIND_HOST, LOG_RECEIVER_FORMAT, LOG_RECEIVER_HOST, LOG_RECEIVER_PORT, + LOG_SUBSCRIBER_BUFFER_LINES, + LOG_TRACE_DEBUG_LEVEL, LOG_TRACE_BUFFER_LINES, + LOG_TRACE_TARGET_SERVICES, ) from app.services import pls _server: asyncio.base_events.Server | None = None +_allowed_nfs = {nf.upper() for nf in LOG_ALLOWED_NFS} _events: deque[dict[str, Any]] = deque(maxlen=max(LOG_BUFFER_LINES, 1)) _trace_events: deque[dict[str, Any]] = deque(maxlen=max(LOG_TRACE_BUFFER_LINES, LOG_BUFFER_LINES, 1)) +_process_events: dict[str, deque[dict[str, Any]]] = { + nf.upper(): deque(maxlen=max(LOG_PROCESS_BUFFER_LINES, 1)) + for nf in _allowed_nfs if nf != "SYSTEM" +} +_subscriber_events: dict[str, deque[dict[str, Any]]] = {} _ingested_total = 0 _parse_errors = 0 _last_event_at: str | None = None _db_initialized = False -_allowed_nfs = {nf.upper() for nf in LOG_ALLOWED_NFS} +_supi_pattern = re.compile(r"(imsi-\d{6,20}|\b\d{6,20}\b)", re.IGNORECASE) +_trace_state: dict[str, Any] = { + "active": False, + "filter": "", + "normalized": "", + "started_at": None, + "matched_events": 0, + "nodes": [], + "services": list(LOG_TRACE_TARGET_SERVICES), + "level": LOG_TRACE_DEBUG_LEVEL, + "original_levels": {}, +} def _db_path() -> Path: @@ -181,6 +203,43 @@ def _infer_nf(payload: dict[str, Any], message: str) -> str: return "SYSTEM" +def _normalize_supi(value: str | None) -> str: + if not value: + return "" + text = str(value).strip().lower() + if not text: + return "" + if text.startswith("imsi-"): + digits = "".join(ch for ch in text[5:] if ch.isdigit()) + return f"imsi-{digits}" if digits else text + digits = "".join(ch for ch in text if ch.isdigit()) + if digits: + return f"imsi-{digits}" + return text + + +def _extract_supis(message: str) -> list[str]: + matches = [] + for raw in _supi_pattern.findall(message or ""): + normalized = _normalize_supi(raw) + if normalized and normalized not in matches: + matches.append(normalized) + return matches + + +def _matches_trace(event: dict[str, Any]) -> bool: + if not _trace_state.get("active"): + return False + normalized = _trace_state.get("normalized", "") + if not normalized: + return False + message = str(event.get("message", "")).lower() + if normalized in message: + return True + digits = normalized.removeprefix("imsi-") + return bool(digits and digits in message) + + def _normalize_event(payload: dict[str, Any], remote_host: str) -> dict[str, Any]: ts_value = ( payload.get("timestamp") @@ -215,6 +274,7 @@ def _normalize_event(payload: dict[str, Any], remote_host: str) -> dict[str, Any or "" ) message = str(message).strip() + supis = _extract_supis(message) tag = str(payload.get("tag", "")) nf = _infer_nf(payload, message) fingerprint = sha1(f"{ts_iso}|{node}|{nf}|{source}|{message}".encode("utf-8")).hexdigest() @@ -227,6 +287,7 @@ def _normalize_event(payload: dict[str, Any], remote_host: str) -> dict[str, Any "source": str(source), "tag": tag, "message": message, + "supis": supis, "raw": payload, } @@ -238,6 +299,16 @@ async def _ingest_payload(payload: dict[str, Any], remote_host: str) -> None: return _events.append(event) _trace_events.append(event) + nf_key = event.get("nf", "").upper() + if nf_key: + _process_events.setdefault(nf_key, deque(maxlen=max(LOG_PROCESS_BUFFER_LINES, 1))).append(event) + for supi in event.get("supis", []): + _subscriber_events.setdefault( + supi, + deque(maxlen=max(LOG_SUBSCRIBER_BUFFER_LINES, 1)), + ).append(event) + if _matches_trace(event): + _trace_state["matched_events"] = int(_trace_state.get("matched_events", 0)) + 1 _ingested_total += 1 _last_event_at = event["timestamp"] @@ -300,6 +371,8 @@ def receiver_status() -> dict[str, Any]: "format": LOG_RECEIVER_FORMAT, "allowed_nfs": sorted(_allowed_nfs), "buffer_lines": LOG_BUFFER_LINES, + "process_buffer_lines": LOG_PROCESS_BUFFER_LINES, + "subscriber_buffer_lines": LOG_SUBSCRIBER_BUFFER_LINES, "trace_buffer_lines": LOG_TRACE_BUFFER_LINES, "context_before": LOG_ALERT_CONTEXT_BEFORE, "context_after": LOG_ALERT_CONTEXT_AFTER, @@ -308,6 +381,17 @@ def receiver_status() -> dict[str, Any]: "parse_errors": _parse_errors, "last_event_at": _last_event_at, "current_buffer_size": len(_events), + "process_buffers": sorted(_process_events.keys()), + "subscriber_buffers": len(_subscriber_events), + "trace": { + "active": bool(_trace_state.get("active")), + "filter": _trace_state.get("filter", ""), + "started_at": _trace_state.get("started_at"), + "matched_events": _trace_state.get("matched_events", 0), + "nodes": list(_trace_state.get("nodes", [])), + "services": list(_trace_state.get("services", [])), + "level": _trace_state.get("level", LOG_TRACE_DEBUG_LEVEL), + }, } @@ -396,8 +480,135 @@ async def configure_site_output() -> dict[str, Any]: } +def _sort_and_limit(events: list[dict[str, Any]], limit: int | None = None) -> list[dict[str, Any]]: + deduped: dict[str, dict[str, Any]] = {} + for event in events: + deduped[event.get("id", str(id(event)))] = event + ordered = sorted(deduped.values(), key=lambda event: event.get("epoch", 0.0)) + if limit is not None: + return ordered[-limit:] + return ordered + + +def get_process_events(nf: str, limit: int | None = None) -> list[dict[str, Any]]: + nf_key = str(nf or "").upper() + events = list(_process_events.get(nf_key, [])) + return _sort_and_limit(events, limit) + + +def get_subscriber_events(supi_or_fragment: str, limit: int | None = None) -> list[dict[str, Any]]: + normalized = _normalize_supi(supi_or_fragment) + fragment = str(supi_or_fragment or "").strip().lower() + if not normalized and not fragment: + return [] + matches: list[dict[str, Any]] = [] + for supi, events in _subscriber_events.items(): + digits = supi.removeprefix("imsi-") + if normalized and (supi == normalized or normalized in supi or normalized.removeprefix("imsi-") in digits): + matches.extend(events) + continue + if fragment and (fragment in supi.lower() or fragment in digits): + matches.extend(events) + return _sort_and_limit(matches, limit) + + +async def _trace_target_nodes() -> list[dict[str, Any]]: + cluster = await pls.get_cluster_status() + nodes = [] + if isinstance(cluster, dict): + for node in cluster.get("nodes", []): + host = pls.node_host(node.get("name", "")) + if host: + nodes.append({"name": node.get("name", ""), "host": host}) + if not nodes: + system = await pls.get_system_info() + host = str(system.get("hostname", "") if isinstance(system, dict) else "") or "127.0.0.1" + nodes.append({"name": host, "host": host}) + deduped = {} + for node in nodes: + deduped[node["host"]] = node + return list(deduped.values()) + + +async def start_subscriber_trace(supi_or_fragment: str) -> dict[str, Any]: + normalized = _normalize_supi(supi_or_fragment) + fragment = str(supi_or_fragment or "").strip() + if not normalized and not fragment: + raise RuntimeError("A SUPI or SUPI fragment is required to start a trace") + + if _trace_state.get("active"): + await stop_subscriber_trace() + + target_nodes = await _trace_target_nodes() + original_levels: dict[str, dict[str, Any]] = {} + applied_nodes: list[str] = [] + for node in target_nodes: + host = node["host"] + current = await pls.get_log_config(host=host) + if not isinstance(current, dict): + continue + original_levels[host] = current + updated = dict(current) + updated["level"] = LOG_TRACE_DEBUG_LEVEL + await pls.put_log_config(updated, host=host) + applied_nodes.append(host) + + _trace_state.update( + { + "active": True, + "filter": fragment, + "normalized": normalized or fragment.lower(), + "started_at": datetime.now(UTC).isoformat(), + "matched_events": 0, + "nodes": applied_nodes, + "services": list(LOG_TRACE_TARGET_SERVICES), + "level": LOG_TRACE_DEBUG_LEVEL, + "original_levels": original_levels, + } + ) + return receiver_status()["trace"] + + +async def stop_subscriber_trace() -> dict[str, Any]: + original_levels = dict(_trace_state.get("original_levels", {})) + restored_nodes: list[str] = [] + for host, config in original_levels.items(): + try: + if isinstance(config, dict): + await pls.put_log_config(config, host=host) + restored_nodes.append(host) + except Exception: + continue + + summary = { + "filter": _trace_state.get("filter", ""), + "started_at": _trace_state.get("started_at"), + "matched_events": _trace_state.get("matched_events", 0), + "restored_nodes": restored_nodes, + } + _trace_state.update( + { + "active": False, + "filter": "", + "normalized": "", + "started_at": None, + "matched_events": 0, + "nodes": [], + "services": list(LOG_TRACE_TARGET_SERVICES), + "level": LOG_TRACE_DEBUG_LEVEL, + "original_levels": {}, + } + ) + return summary + + def get_events(limit: int | None = None, node: str | None = None, nf: str | None = None, imsi: str | None = None) -> list[dict[str, Any]]: - events = list(_trace_events if imsi else _events) + if imsi: + events = get_subscriber_events(imsi, limit=None) + elif nf: + events = get_process_events(nf, limit=None) + else: + events = list(_events) if node: node_l = node.lower() events = [event for event in events if event.get("node", "").lower() == node_l] @@ -405,12 +616,20 @@ def get_events(limit: int | None = None, node: str | None = None, nf: str | None nf_u = nf.upper() events = [event for event in events if event.get("nf", "").upper() == nf_u] if imsi: - needle = imsi.strip() - events = [event for event in events if needle and needle in event.get("message", "")] - events.sort(key=lambda event: event.get("epoch", 0.0)) - if limit is not None: - return events[-limit:] - return events + needle = str(imsi).strip().lower() + normalized = _normalize_supi(imsi) + digits = normalized.removeprefix("imsi-") if normalized else "" + events = [ + event for event in events + if needle and ( + needle in event.get("message", "").lower() + or any( + needle in supi.lower() or (digits and digits in supi.removeprefix("imsi-")) + for supi in event.get("supis", []) + ) + ) + ] + return _sort_and_limit(events, limit) def record_alert_context( diff --git a/app/services/pls.py b/app/services/pls.py index 5005af8..42e27d2 100644 --- a/app/services/pls.py +++ b/app/services/pls.py @@ -118,3 +118,13 @@ async def get_fluentbit_config() -> dict | None: async def put_fluentbit_config(config: dict) -> dict | None: data = await _put("fluent-bit/config", config) return data if isinstance(data, dict) else None + + +async def get_log_config(host: str | None = None) -> dict | None: + data = await _get("mgmt/config/logs", host=host) + return data if isinstance(data, dict) else None + + +async def put_log_config(config: dict, host: str | None = None) -> dict | None: + data = await _put("mgmt/config/logs", config, host=host) + return data if isinstance(data, dict) else None diff --git a/app/ui/index.html b/app/ui/index.html index 942d91c..243824b 100644 --- a/app/ui/index.html +++ b/app/ui/index.html @@ -70,11 +70,20 @@ header h1 span { color: var(--muted); font-weight: 400; } letter-spacing: .1em; color: var(--muted); margin-bottom: 12px; display: flex; align-items: center; justify-content: space-between; } +.section-head { + display: flex; align-items: center; justify-content: space-between; gap: 10px; +} .refresh-btn { background: none; border: none; color: var(--muted); cursor: pointer; font-size: 13px; padding: 1px 4px; border-radius: 4px; transition: color .15s; } .refresh-btn:hover { color: var(--text); } +.collapse-btn { + background: none; border: none; color: var(--muted); cursor: pointer; + font-size: 12px; border-radius: 4px; transition: color .15s, transform .15s; +} +.collapse-btn:hover { color: var(--text); } +.collapsed .collapse-btn { transform: rotate(-90deg); } /* NF grid */ .nf-grid { display: grid; grid-template-columns: repeat(3,1fr); gap: 7px; } @@ -192,8 +201,16 @@ header h1 span { color: var(--muted); font-weight: 400; } border-radius: 6px; padding: 7px 8px; white-space: pre-wrap; } +.cluster-flyout { + margin-top: 12px; + border-top: 1px solid rgba(255,255,255,.05); + padding-top: 12px; +} +.cluster-flyout.hidden, +.cluster-flyout[hidden] { display: none !important; } + /* ββ Chat panel βββββββββββββββββββββββββββββββββββββββββββββββββββ */ -.chat { display: grid; grid-template-rows: auto auto minmax(0,1fr) auto; overflow: hidden; } +.chat { display: grid; grid-template-rows: auto minmax(0,1fr) auto auto; overflow: hidden; } .messages { min-height: 0; overflow-y: auto; padding: 20px; display: flex; flex-direction: column; gap: 14px; } @@ -225,11 +242,15 @@ header h1 span { color: var(--muted); font-weight: 400; } @keyframes bounce{0%,100%{transform:translateY(0)}50%{transform:translateY(-7px)}} /* Suggestions */ -.chips { display: flex; gap: 6px; padding: 0 20px 10px; overflow-x: auto; flex-shrink: 0; } +.chips { + display: flex; gap: 6px; padding: 8px 20px; overflow-x: auto; flex-shrink: 0; + border-top: 1px solid rgba(255,255,255,.04); + background: rgba(15,17,23,.75); +} .chips::-webkit-scrollbar { display: none; } .chip { background: var(--card); border: 1px solid var(--border); border-radius: 20px; - color: var(--text); padding: 5px 13px; font-size: 12px; cursor: pointer; + color: var(--text); padding: 4px 10px; font-size: 11px; line-height: 1.2; cursor: pointer; white-space: nowrap; transition: border-color .15s, background .15s; } .chip:hover { border-color: var(--purple); background: var(--purple-dim); } @@ -261,10 +282,13 @@ header h1 span { color: var(--muted); font-weight: 400; } flex-shrink: 0; display: flex; flex-direction: column; - min-height: 220px; - max-height: 280px; + min-height: 72px; + max-height: 320px; border-bottom: 1px solid var(--border); } +.trace-panel.collapsed { max-height: 72px; } +.trace-panel.collapsed .trace-controls, +.trace-panel.collapsed .trace-log { display: none; } .trace-header { padding: 12px 20px 10px; display: flex; align-items: center; justify-content: space-between; gap: 10px; @@ -274,6 +298,12 @@ header h1 span { color: var(--muted); font-weight: 400; } font-size: 11px; font-weight: 700; text-transform: uppercase; letter-spacing: .1em; color: var(--muted); } .trace-status { font-size: 11px; color: var(--muted); } +.trace-head-left { display: flex; align-items: center; gap: 10px; } +.trace-head-right { display: flex; align-items: center; gap: 12px; } +.refresh-mode { + background: var(--card); color: var(--text); border: 1px solid var(--border); + border-radius: 8px; padding: 5px 8px; font: inherit; +} .trace-controls { padding: 10px 20px; display: grid; grid-template-columns: repeat(4, minmax(0, 1fr)); gap: 8px; @@ -311,6 +341,7 @@ header h1 span { color: var(--muted); font-weight: 400; } .left { max-height: 260px; } .trace-controls { grid-template-columns: 1fr 1fr; } .trace-panel { max-height: 320px; } + .trace-head-right { flex-wrap: wrap; justify-content: flex-end; } } @@ -326,19 +357,22 @@ header h1 span { color: var(--muted); font-weight: 400; }