"""Helpers for streaming journalctl logs over SSH.""" from __future__ import annotations import json import logging import os import queue import shlex import threading from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Iterable, List import paramiko SSH_KEY_PATH = Path(os.getenv("SSH_KEY_PATH", "keys/5G-SSH-Key.pem")) JQ_FILTER = '.TIMESTAMP + " " + .SYSLOG_IDENTIFIER + " " + (.SUPI // "") + " " + .MESSAGE' @dataclass class LogTarget: host: str processes: List[str] hostname: str | None = None class JournalctlStream: """Manage concurrent journalctl streams for multiple hosts.""" def __init__(self, targets: Iterable[LogTarget]) -> None: self._logger = logging.getLogger(__name__) self.targets = [t for t in targets if t.processes] if not self.targets: raise ValueError("No valid log targets provided") if not SSH_KEY_PATH.exists(): raise FileNotFoundError(f"SSH key not found at {SSH_KEY_PATH}") self._queue: "queue.Queue[dict]" = queue.Queue() self._stop_event = threading.Event() self._threads: list[threading.Thread] = [] self._clients: dict[str, paramiko.SSHClient] = {} def start(self) -> None: for target in self.targets: thread = threading.Thread(target=self._stream_host, args=(target,), daemon=True) thread.start() self._threads.append(thread) def iter_events(self): self.start() finished = 0 total = len(self.targets) while finished < total and not self._stop_event.is_set(): try: event = self._queue.get(timeout=0.5) except queue.Empty: yield {"type": "heartbeat", "timestamp": datetime.now(timezone.utc).isoformat()} continue if event.get("type") == "complete": finished += 1 yield event # Drain remaining events if any while not self._queue.empty(): yield self._queue.get() def stop(self) -> None: self._stop_event.set() for client in self._clients.values(): try: client.close() except Exception: pass for thread in self._threads: thread.join(timeout=1) # --- internal helpers --- def _stream_host(self, target: LogTarget) -> None: filter_args = [] for proc in target.processes: proc = (proc or "").strip() if not proc: continue filter_args.append(f"-t {shlex.quote(proc)}") safe_filters = " ".join(filter_args) if not safe_filters: message = "No processes selected" self._logger.error("Log stream aborted for %s: %s", target.host, message) self._queue.put({ "type": "error", "host": target.host, "hostname": target.hostname or target.host, "message": message }) self._queue.put({"type": "complete", "host": target.host, "hostname": target.hostname or target.host}) return pipeline = ( f"journalctl {safe_filters} -o json -n 50 -f " f"| jq -r --unbuffered '{JQ_FILTER}'" ) command = f"bash -lc {shlex.quote(pipeline)}" self._logger.debug("Executing log command on %s: %s", target.host, pipeline) client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: client.connect( target.host, username="root", key_filename=str(SSH_KEY_PATH), look_for_keys=False, timeout=15, ) self._clients[target.host] = client _, stdout, stderr = client.exec_command(command, get_pty=True) for line in iter(lambda: stdout.readline(), ""): if self._stop_event.is_set(): break payload = line.rstrip("\r\n") if not payload: continue self._queue.put({ "type": "log", "host": target.host, "hostname": target.hostname or target.host, "line": payload, "timestamp": datetime.now(timezone.utc).isoformat(), }) err = stderr.read().decode(errors="ignore").strip() exit_status = stdout.channel.recv_exit_status() if exit_status not in (0, -1) or err: message = err or f"journalctl exited with status {exit_status}" self._logger.error("Log stream error on %s: %s", target.host, message) self._queue.put({ "type": "error", "host": target.host, "hostname": target.hostname or target.host, "message": message, }) except Exception as exc: message = str(exc) self._logger.exception("Log stream exception for %s: %s", target.host, message) self._queue.put({ "type": "error", "host": target.host, "hostname": target.hostname or target.host, "message": message, }) finally: self._queue.put({ "type": "complete", "host": target.host, "hostname": target.hostname or target.host, }) try: client.close() except Exception: pass