Initial commit of AthonetTools

This commit is contained in:
2025-08-21 12:59:43 +00:00
commit cd932b8fcb
2483 changed files with 433999 additions and 0 deletions

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
DEFAULT_TIMEOUT = 10.0
class ComboCoreClient:
def __init__(self, host: str, username: str, password: str, verify_ssl: bool = False, timeout: float = DEFAULT_TIMEOUT):
self.base = f"https://{host}"
self.username = username
self.password = password
self.verify = verify_ssl
self.timeout = timeout
self._token = None
self._s = requests.Session()
retry = Retry(total=3, backoff_factor=0.3, status_forcelist=[429,500,502,503,504])
self._s.mount("https://", HTTPAdapter(max_retries=retry))
self._s.mount("http://", HTTPAdapter(max_retries=retry))
# ----- PLS auth -----
def login(self) -> str:
r = self._s.post(
f"{self.base}/core/pls/api/1/auth/login",
json={"username": self.username, "password": self.password},
timeout=self.timeout, verify=self.verify
)
r.raise_for_status()
j = r.json()
self._token = j.get("access_token") or j.get("token")
if not self._token:
raise RuntimeError("No access token in login response.")
return self._token
def _auth_headers(self):
if not self._token:
self.login()
return {"Authorization": f"Bearer {self._token}"}
def get(self, path: str):
r = self._s.get(f"{self.base}{path}", headers=self._auth_headers(), timeout=self.timeout, verify=self.verify)
if r.status_code == 401:
self._token = None
r = self._s.get(f"{self.base}{path}", headers=self._auth_headers(), timeout=self.timeout, verify=self.verify)
r.raise_for_status()
return r.json()
def post(self, path: str, json=None):
r = self._s.post(f"{self.base}{path}", headers=self._auth_headers(), json=json or {}, timeout=self.timeout, verify=self.verify)
if r.status_code == 401:
self._token = None
r = self._s.post(f"{self.base}{path}", headers=self._auth_headers(), json=json or {}, timeout=self.timeout, verify=self.verify)
r.raise_for_status()
return r.json()

29
services/combocore/ncm.py Normal file
View File

@@ -0,0 +1,29 @@
from typing import Tuple, Optional, List, Dict
from .client import ComboCoreClient
def get_routes(host: str, user: str, pwd: str, verify_ssl=False, timeout=10.0) -> List[Dict]:
cli = ComboCoreClient(host, user, pwd, verify_ssl, timeout)
return cli.get("/core/ncm/api/1/status/routes")
def derive_eth0_cidr_gw(routes: List[Dict]) -> Tuple[str, Optional[str]]:
gw = None
ip = None
masklen = None
for rt in routes:
if rt.get("family") == "inet" and rt.get("dst") == "default" and rt.get("dev") == "eth0":
gw = rt.get("gateway")
for rt in routes:
if rt.get("family") == "inet" and rt.get("dev") == "eth0":
if not ip and rt.get("prefsrc"):
ip = rt["prefsrc"]
dst = rt.get("dst", "")
if "/" in dst:
try:
masklen = int(dst.split("/", 1)[1])
except Exception:
pass
if ip and masklen is not None:
break
if not ip or masklen is None:
raise RuntimeError("Could not derive eth0 IP/prefix from routes.")
return f"{ip}/{masklen}", gw

View File

@@ -0,0 +1,9 @@
from .client import ComboCoreClient
# Minimal PLS surface to start
def login(host: str, user: str, pwd: str, verify_ssl=False, timeout=10.0) -> str:
return ComboCoreClient(host, user, pwd, verify_ssl, timeout).login()
def get_me(host: str, user: str, pwd: str, verify_ssl=False, timeout=10.0) -> dict:
cli = ComboCoreClient(host, user, pwd, verify_ssl, timeout)
return cli.get("/core/pls/api/1/auth/me") # adjust if different