159 lines
5.6 KiB
Python
159 lines
5.6 KiB
Python
"""Helpers for streaming journalctl logs over SSH."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import queue
|
|
import shlex
|
|
import threading
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Iterable, List
|
|
|
|
import paramiko
|
|
|
|
SSH_KEY_PATH = Path(os.getenv("SSH_KEY_PATH", "keys/5G-SSH-Key.pem"))
|
|
JQ_FILTER = '.TIMESTAMP + " " + .SYSLOG_IDENTIFIER + " " + (.SUPI // "") + " " + .MESSAGE'
|
|
|
|
@dataclass
|
|
class LogTarget:
|
|
host: str
|
|
processes: List[str]
|
|
hostname: str | None = None
|
|
|
|
class JournalctlStream:
|
|
"""Manage concurrent journalctl streams for multiple hosts."""
|
|
|
|
def __init__(self, targets: Iterable[LogTarget]) -> None:
|
|
self._logger = logging.getLogger(__name__)
|
|
self.targets = [t for t in targets if t.processes]
|
|
if not self.targets:
|
|
raise ValueError("No valid log targets provided")
|
|
if not SSH_KEY_PATH.exists():
|
|
raise FileNotFoundError(f"SSH key not found at {SSH_KEY_PATH}")
|
|
self._queue: "queue.Queue[dict]" = queue.Queue()
|
|
self._stop_event = threading.Event()
|
|
self._threads: list[threading.Thread] = []
|
|
self._clients: dict[str, paramiko.SSHClient] = {}
|
|
|
|
def start(self) -> None:
|
|
for target in self.targets:
|
|
thread = threading.Thread(target=self._stream_host, args=(target,), daemon=True)
|
|
thread.start()
|
|
self._threads.append(thread)
|
|
|
|
def iter_events(self):
|
|
self.start()
|
|
finished = 0
|
|
total = len(self.targets)
|
|
while finished < total and not self._stop_event.is_set():
|
|
try:
|
|
event = self._queue.get(timeout=0.5)
|
|
except queue.Empty:
|
|
yield {"type": "heartbeat", "timestamp": datetime.now(timezone.utc).isoformat()}
|
|
continue
|
|
if event.get("type") == "complete":
|
|
finished += 1
|
|
yield event
|
|
# Drain remaining events if any
|
|
while not self._queue.empty():
|
|
yield self._queue.get()
|
|
|
|
def stop(self) -> None:
|
|
self._stop_event.set()
|
|
for client in self._clients.values():
|
|
try:
|
|
client.close()
|
|
except Exception:
|
|
pass
|
|
for thread in self._threads:
|
|
thread.join(timeout=1)
|
|
|
|
# --- internal helpers ---
|
|
|
|
def _stream_host(self, target: LogTarget) -> None:
|
|
filter_args = []
|
|
for proc in target.processes:
|
|
proc = (proc or "").strip()
|
|
if not proc:
|
|
continue
|
|
filter_args.append(f"-t {shlex.quote(proc)}")
|
|
|
|
safe_filters = " ".join(filter_args)
|
|
if not safe_filters:
|
|
message = "No processes selected"
|
|
self._logger.error("Log stream aborted for %s: %s", target.host, message)
|
|
self._queue.put({
|
|
"type": "error",
|
|
"host": target.host,
|
|
"hostname": target.hostname or target.host,
|
|
"message": message
|
|
})
|
|
self._queue.put({"type": "complete", "host": target.host, "hostname": target.hostname or target.host})
|
|
return
|
|
|
|
pipeline = (
|
|
f"journalctl {safe_filters} -o json -n 50 -f "
|
|
f"| jq -r --unbuffered '{JQ_FILTER}'"
|
|
)
|
|
command = f"bash -lc {shlex.quote(pipeline)}"
|
|
self._logger.debug("Executing log command on %s: %s", target.host, pipeline)
|
|
|
|
client = paramiko.SSHClient()
|
|
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
try:
|
|
client.connect(
|
|
target.host,
|
|
username="root",
|
|
key_filename=str(SSH_KEY_PATH),
|
|
look_for_keys=False,
|
|
timeout=15,
|
|
)
|
|
self._clients[target.host] = client
|
|
_, stdout, stderr = client.exec_command(command, get_pty=True)
|
|
for line in iter(lambda: stdout.readline(), ""):
|
|
if self._stop_event.is_set():
|
|
break
|
|
payload = line.rstrip("\r\n")
|
|
if not payload:
|
|
continue
|
|
self._queue.put({
|
|
"type": "log",
|
|
"host": target.host,
|
|
"hostname": target.hostname or target.host,
|
|
"line": payload,
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
})
|
|
err = stderr.read().decode(errors="ignore").strip()
|
|
exit_status = stdout.channel.recv_exit_status()
|
|
if exit_status not in (0, -1) or err:
|
|
message = err or f"journalctl exited with status {exit_status}"
|
|
self._logger.error("Log stream error on %s: %s", target.host, message)
|
|
self._queue.put({
|
|
"type": "error",
|
|
"host": target.host,
|
|
"hostname": target.hostname or target.host,
|
|
"message": message,
|
|
})
|
|
except Exception as exc:
|
|
message = str(exc)
|
|
self._logger.exception("Log stream exception for %s: %s", target.host, message)
|
|
self._queue.put({
|
|
"type": "error",
|
|
"host": target.host,
|
|
"hostname": target.hostname or target.host,
|
|
"message": message,
|
|
})
|
|
finally:
|
|
self._queue.put({
|
|
"type": "complete",
|
|
"host": target.host,
|
|
"hostname": target.hostname or target.host,
|
|
})
|
|
try:
|
|
client.close()
|
|
except Exception:
|
|
pass
|