# services/remote_admin.py import requests, ipaddress REQUEST_KW = dict(timeout=10, verify=False) def validate_ipv4(ip: str) -> str: try: ipaddress.IPv4Address(ip) return ip except Exception: raise ValueError(f"Invalid IPv4 address: {ip}") def authenticate(ip: str, username: str, password: str) -> str: url = f"https://{ip}/core/pls/api/1/auth/login" r = requests.post(url, json={"username": username, "password": password}, **REQUEST_KW) r.raise_for_status() data = r.json() token = data.get("access_token") or data.get("token") or data.get("data", {}).get("access_token") if not token: raise RuntimeError("No access token in auth response") return token ALLOWED_SERVICES = {"ssh", "webconsole"} ALLOWED_ACTIONS = {"enable", "enable-autostart", "start"} def service_action(ip: str, service: str, action: str, username: str, password: str) -> dict: if service not in ALLOWED_SERVICES: raise ValueError(f"Unsupported service: {service}") if action not in ALLOWED_ACTIONS: raise ValueError(f"Unsupported action: {action}") ip = validate_ipv4(ip) token = authenticate(ip, username, password) url = f"https://{ip}/core/pls/api/1/services/{service}/{action}" r = requests.post(url, headers={"Authorization": f"Bearer {token}"}, **REQUEST_KW) r.raise_for_status() return r.json() if r.content else {"ok": True} def perform_service_sequence(ip: str, service: str, username: str, password: str) -> None: for action in ("enable", "enable-autostart", "start"): service_action(ip, service, action, username, password) # Backwards-compatible wrappers (optional) def ssh_action(ip, action, username, password): return service_action(ip, "ssh", action, username, password) def perform_ssh_sequence(ip, username, password): return perform_service_sequence(ip, "ssh", username, password)