import requests import json import paramiko import subprocess import time import re import os import auth_utils import hashlib from requests.exceptions import HTTPError from datetime import datetime requests.packages.urllib3.disable_warnings() VPN_CONFIG_NAMES = ["Triton", "Star", "Bluebonnet", "Lonestar", "Production", "US-Support", "EU-Support"] SERIAL_PASSWORDS = { "3M1D2211Z3": "EP5G!f15878b4af20", "3M1D10146B": "EP5G!076689528baf", "3M1D10146G": "EP5G!c3b0072cabf5", "3M1D2211Z1": "EP5G!65b22ae8617a", "3M1D19125H": "EP5G!da3c04fde559", "3M1D19125G": "EP5G!b73f98633108", "3M1D19125F": "EP5G!e61201fb9234", "3M1D1R16M4": "EP5G!ca439b544329" } def list_home_network_keys(host_ip, token): url = f"https://{host_ip}/core/udm/api/1/provisioning/home_network_keys" headers = {"Authorization": f"Bearer {token}"} response = requests.get(url, headers=headers, verify=False) response.raise_for_status() return response.json().get("data", []) def get_home_network_key(host_ip, token, key_id): url = f"https://{host_ip}/core/udm/api/1/provisioning/home_network_keys/{key_id}" headers = {"Authorization": f"Bearer {token}"} response = requests.get(url, headers=headers, verify=False) response.raise_for_status() return response.json() def create_home_network_key(host_ip, token, key_id, home_network_identifier, private_key, profile, description=None): url = f"https://{host_ip}/core/udm/api/1/provisioning/home_network_keys" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } payload = { "key_id": key_id, "home_network_identifier": home_network_identifier, "private_key": private_key, "profile": profile } if description: payload["description"] = description response = requests.post(url, headers=headers, json=payload, verify=False) response.raise_for_status() return {"status": "success", "message": f"Home Network Key with ID {key_id} created successfully."} def delete_home_network_key(host_ip, token, key_id): """Deletes a Home Network Key and returns a success message.""" url = f"https://{host_ip}/core/udm/api/1/provisioning/home_network_keys/{key_id}" headers = {"Authorization": f"Bearer {token}"} response = requests.delete(url, headers=headers, verify=False) response.raise_for_status() return {"status": "success", "message": f"Home Network Key with ID {key_id} deleted successfully."} def list_m2000_vpns(base_url, token, session): """Lists all network devices from the Aruba dashboard.""" network_url = f"{base_url}/portal/api/1/network" auth_headers = session.headers.copy() auth_headers["horus-token"] = token response = session.get(network_url, headers=auth_headers, verify=False) response.raise_for_status() processed_items = [] items = response.json().get("items", []) for item in items: hw_list = item.get("info", {}).get("hardware", []) for hw in hw_list: processed_items.append({ "id": item.get("id"), "name": item.get("name"), "status": item.get("status"), "serial": hw.get("serial"), "subnet": hw.get("subnet_delegation") }) return processed_items def restart_m2000_vpn(serial, subnet): import ipaddress try: password = SERIAL_PASSWORDS.get(serial) if not password: raise ValueError(f"No password found for serial {serial}") subnet_obj = ipaddress.ip_network(subnet, strict=False) router_ip = str(list(subnet_obj.hosts())[0]) ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(router_ip, username='root', password=password, timeout=10) ssh.exec_command('systemctl restart openvpn@openvpn.service') ssh.close() return {"status": "success", "message": f"Restart command sent to {serial}. Please refresh the list in a moment to see the updated status."} except Exception as e: raise e def get_vpn_config_and_details(host_ip): """ Connects to a host via SSH, gets VPN config, and fetches system details. """ # --- 1. Get VPN Config and Current Endpoint via SSH --- key_path = os.path.expanduser("~/.ssh/5G-SSH-Key.pem") ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(host_ip, username='root', key_filename=key_path, timeout=10) stdin, stdout, stderr = ssh.exec_command('head -n 5 /etc/openvpn/client/athonet.conf') vpn_config_output = stdout.read().decode().strip() # Also get the current endpoint IP from the config stdin, stdout, stderr = ssh.exec_command("grep '^remote' /etc/openvpn/client/athonet.conf") config_line = stdout.read().decode().strip() ssh.close() if not config_line: raise Exception("Could not read VPN configuration from host.") current_ip = config_line.split()[1] current_region = "Unknown" for region, ip in VPN_ENDPOINTS.items(): if ip == current_ip: current_region = region break # --- 2. Get Host Details via API --- host_details = get_host_details(host_ip) # --- 3. Combine all the results --- return { "vpn_config": vpn_config_output, "vpn_endpoint": {"region": current_region, "ip": current_ip}, "details": host_details } VPN_ENDPOINTS = { "US": "128.136.82.165", "EU": "156.54.30.27" } def get_current_vpn_endpoint(host_ip): """Connects to a host and reads the current VPN endpoint from the config file.""" key_path = os.path.expanduser("~/.ssh/5G-SSH-Key.pem") ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(host_ip, username='root', key_filename=key_path, timeout=10) # Read the 'remote' line from the config file stdin, stdout, stderr = ssh.exec_command("grep '^remote' /etc/openvpn/client/athonet.conf") config_line = stdout.read().decode().strip() ssh.close() if not config_line: raise Exception("Could not read VPN configuration from host.") current_ip = config_line.split()[1] # Determine if it's US or EU for region, ip in VPN_ENDPOINTS.items(): if ip == current_ip: return {"region": region, "ip": ip} return {"region": "Unknown", "ip": current_ip} def set_vpn_endpoint(host_ip, region): """Connects to a host, updates the VPN endpoint, and restarts the service.""" if region not in VPN_ENDPOINTS: raise ValueError("Invalid region specified.") new_ip = VPN_ENDPOINTS[region] key_path = os.path.expanduser("~/.ssh/5G-SSH-Key.pem") ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(host_ip, username='root', key_filename=key_path, timeout=10) # Use sed to replace the IP in the config file and then restart the service command = ( f"sed -i 's/^remote .*/remote {new_ip}/' /etc/openvpn/client/athonet.conf && " "systemctl restart openvpn-client@athonet.service" ) stdin, stdout, stderr = ssh.exec_command(command) # It's good practice to check for errors error = stderr.read().decode().strip() ssh.close() if error: raise Exception(f"Failed to switch VPN: {error}") return {"status": "success", "message": f"VPN endpoint switched to {region} ({new_ip})."} def get_active_vpn(): for name in VPN_CONFIG_NAMES: for pattern in ["openvpn-client@{name}.service", "openvpn@{name}.service"]: try: service_name = pattern.format(name=name) cmd = ["/usr/bin/sudo", "/usr/bin/systemctl", "is-active", service_name] result = subprocess.run(cmd, capture_output=True, text=True) if result.stdout.strip() == "active": return name except FileNotFoundError: continue return None def toggle_vpn_connection(vpn_name, turn_on): active_vpn = get_active_vpn() if active_vpn: subprocess.run(["/usr/bin/sudo", "/usr/bin/systemctl", "stop", f"openvpn-client@{active_vpn}.service"]) subprocess.run(["/usr/bin/sudo", "/usr/bin/systemctl", "stop", f"openvpn@{active_vpn}.service"]) if turn_on: try: service_name = f"openvpn-client@{vpn_name}.service" cmd = ["/usr/bin/sudo", "/usr/bin/systemctl", "start", service_name] subprocess.run(cmd, check=True) except subprocess.CalledProcessError: service_name = f"openvpn@{vpn_name}.service" cmd = ["/usr/bin/sudo", "/usr/bin/systemctl", "start", service_name] subprocess.run(cmd, check=True) time.sleep(4) if get_active_vpn() != vpn_name: raise Exception(f"Connection failed for {vpn_name}. Check OpenVPN logs for details.") return get_active_vpn() def get_full_network_config(base_url, token, session): network_url = f"{base_url}/portal/api/1/network" auth_headers = session.headers.copy() auth_headers["horus-token"] = token response = session.get(network_url, headers=auth_headers, verify=False) response.raise_for_status() return response.json() def list_tenants(base_url, token, session): url = f"{base_url}/portal/api/tenants/" headers = session.headers.copy() headers["horus-token"] = token response = session.get(url, headers=headers, verify=False) response.raise_for_status() return response.json().get("tenants", []) def list_plmns(base_url, token, session, tenant_id): url = f"{base_url}/portal/api/tenants/{tenant_id}/plmns" headers = session.headers.copy() headers["horus-token"] = token response = session.get(url, headers=headers, verify=False) response.raise_for_status() return response.json().get("items", []) def list_plmn_hnks(base_url, token, session, tenant_id, plmn_id): url = f"{base_url}/portal/api/tenants/{tenant_id}/plmns/{plmn_id}/home-network-keys" headers = session.headers.copy() headers["horus-token"] = token response = session.get(url, headers=headers, verify=False) response.raise_for_status() return response.json().get("items", []) def update_radio_count(base_url, token, session, network_id, new_count, operation): url = f"{base_url}/portal/api/1/network/{network_id}" headers = { "horus-token": token, "Content-Type": "application/json-patch+json" } payload = {"ops": []} if operation == 'replace': op_details = { "op": "replace", "path": "/info/radio_pool/0/num_of_radios", "value": str(new_count) } else: op_details = { "op": "add", "path": "/info/radio_pool", "value": [{"num_of_radios": str(new_count)}] } payload["ops"].append(op_details) response = session.patch(url, headers=headers, json=payload, verify=False) response.raise_for_status() return response.json() def get_system_browser_data(): customers = {} try: with open('customers.txt', 'r') as f: for line in f: line = line.strip() if line: parts = line.split(',', 1) if len(parts) == 2: customers[parts[0]] = parts[1] except FileNotFoundError: raise Exception("Error: customers.txt file not found on the server.") vpn_clients = {} routing_table = {} vpn_status_urls = [ "http://100.127.0.1/_vpn_status/t2-status.txt", "http://100.127.0.6/_vpn_status/t2-status.txt" ] for url in vpn_status_urls: try: response = requests.get(url, timeout=10) response.raise_for_status() lines = response.text.split('\n') is_parsing_routing_table = False for line in lines: if "ROUTING TABLE" in line or "ROUTING_TABLE" in line: is_parsing_routing_table = True continue if not line.strip() or line.startswith(("TITLE", "TIME", "HEADER", "GLOBAL", "OpenVPN", "Updated", "END")): continue parts = line.split(',') common_name = "" real_address = "" virtual_ip = "N/A" connected_since = "" if is_parsing_routing_table: if len(parts) >= 2: routing_table[parts[1]] = parts[0] elif line.startswith("CLIENT_LIST"): if len(parts) > 7: common_name, real_address, virtual_ip, connected_since = parts[1], parts[2], parts[3] if parts[3] else "N/A", parts[7] elif len(parts) >= 5: common_name, real_address, connected_since = parts[0], parts[1], parts[4] if common_name and common_name not in vpn_clients: customer_id_match = re.search(r'(\d{3})z', common_name) customer_id = customer_id_match.group(1) if customer_id_match else "N/A" customer_name = customers.get(customer_id, "Unknown") # Clean the public IP to remove the port public_ip = real_address.split(':')[0] vpn_clients[common_name] = { "customer_id": customer_id, "customer_name": customer_name, "common_name": common_name, "virtual_ip": virtual_ip, "public_ip": public_ip, "connected_since": connected_since } except requests.exceptions.RequestException as e: print(f"Warning: Could not fetch VPN status from {url}. Error: {e}") continue for name, client_data in vpn_clients.items(): if client_data["virtual_ip"] == "N/A" and name in routing_table: client_data["virtual_ip"] = routing_table[name] if not vpn_clients: raise Exception("Connection failed. Please ensure you are connected to the correct VPN.") return list(vpn_clients.values()) # ------- System ID Data Begin --------- def _make_host_api_get_request(host_ip, token, endpoint): url = f"https://{host_ip}/{endpoint}" headers = {"Authorization": f"Bearer {token}"} response = requests.get(url, headers=headers, verify=False) response.raise_for_status() return response.json() def get_system_info(host_ip, token): return _make_host_api_get_request(host_ip, token, "core/pls/api/1/system/info") def get_site_info(host_ip, token): return _make_host_api_get_request(host_ip, token, "core/pls/api/1/site/info") def get_frontend_config(host_ip, token): return _make_host_api_get_request(host_ip, token, "frontend/config") def get_licensed_host_info(host_ip, token): """Retrieves host info from the /mgt/host endpoint.""" url = f"https://{host_ip}/core/licensed/api/1/mgt/host" headers = {"Authorization": f"Bearer {token}"} response = requests.get(url, headers=headers, verify=False) response.raise_for_status() return response.json() def get_licenses_info(host_ip, token): url = f"https://{host_ip}/core/licensed/api/1/mgt/licenses" headers = {"Authorization": f"Bearer {token}"} response = requests.get(url, headers=headers, verify=False) response.raise_for_status() return response.json() def get_host_details(host_ip): token = auth_utils.authenticate(host_ip) system_info = get_system_info(host_ip, token) site_info = get_site_info(host_ip, token) frontend_config = get_frontend_config(host_ip, token) licensed_host_info = get_licensed_host_info(host_ip, token) licenses_info = get_licenses_info(host_ip, token) license_map = {} if isinstance(licenses_info, list): for lic in licenses_info: app_content = lic.get('license', {}).get('app_content', {}) app_type = app_content.get('app_type') if app_type: license_map[app_type] = lic services_with_licenses = [] for service in frontend_config.get("services", []): matching_license = license_map.get(service.get('name')) if matching_license: params = matching_license.get('license', {}).get('license_params', {}) start_epoch = params.get('start_date') expire_epoch = params.get('expire_date') if isinstance(start_epoch, (int, float)): params['start_date_str'] = datetime.fromtimestamp(start_epoch).strftime('%Y-%m-%d') if isinstance(expire_epoch, (int, float)): params['expire_date_str'] = datetime.fromtimestamp(expire_epoch).strftime('%Y-%m-%d') service['license'] = matching_license services_with_licenses.append(service) main_license = licenses_info[0] if licenses_info else None if main_license: params = main_license.get('license', {}).get('license_params', {}) start_epoch = params.get('start_date') expire_epoch = params.get('expire_date') if isinstance(start_epoch, (int, float)): params['start_date_str'] = datetime.fromtimestamp(start_epoch).strftime('%Y-%m-%d') if isinstance(expire_epoch, (int, float)): params['expire_date_str'] = datetime.fromtimestamp(expire_epoch).strftime('%Y-%m-%d') combined_data = { "system": system_info, "site": site_info, "services": services_with_licenses, "licensed_host": licensed_host_info, "license": main_license } return combined_data # ------- System ID Data End --------- def create_backup(host_ip, token): url = f"https://{host_ip}/core/pls/api/1/backup/create" headers = {"Authorization": f"Bearer {token}"} payload = { "services": [ "amf", "ausf", "bmsc", "chf", "dra", "dsm", "eir", "mme", "smsf", "licensed", "nrf", "pcf", "aaa", "sgwc", "smf", "udm", "udr", "upf", "ncm", "pls" ] } response = requests.post(url, headers=headers, json=payload, verify=False, stream=True) response.raise_for_status() return response def list_users(base_url, token, session): url = f"{base_url}/portal/api/tenants/users" headers = session.headers.copy() headers["horus-token"] = token response = session.get(url, headers=headers, verify=False) response.raise_for_status() return response.json().get("users", []) def generate_m2000_password(serial, seed="ANWEP5G", prefix="EP5G"): seed_serial = seed + serial sha_dig = hashlib.sha256(seed_serial.encode('utf-8')).hexdigest() pointer = int(sha_dig[0], 16) twelve = sha_dig[pointer:pointer+12] password = f"{prefix}!{twelve}" return password def reset_m2000_configuration(base_ipv6): """ Resets specified services on two hosts derived from a base IPv6 address. """ # List of services to be reset services_to_reset = ["amf", "upf", "smf", "sgwc", "mme", "pcf"] # Derive the two host IPs # This assumes the base address ends with something like ':0' or '::' base_parts = base_ipv6.rsplit(':', 1) if len(base_parts) < 2: raise ValueError("Invalid IPv6 address format for deriving hosts.") base_prefix = base_parts[0] host_a_ip = f"{base_prefix}:a" host_b_ip = f"{base_prefix}:b" hosts = [host_a_ip, host_b_ip] results = [] for host in hosts: try: # Authenticate with the current host token = auth_utils.authenticate(host) for service in services_to_reset: try: # Construct the specific reset URL for each service url = f"https://[{host}]/core/{service}/api/1/mgmt/config/factory_reset" headers = {"Authorization": f"Bearer {token}"} # Make the POST request to trigger the reset response = requests.post(url, headers=headers, json={}, verify=False) response.raise_for_status() results.append({"host": host, "service": service, "status": "Success", "message": "Reset command sent successfully."}) except HTTPError as http_err: results.append({"host": host, "service": service, "status": "Failed", "message": str(http_err)}) except Exception as e: results.append({"host": host, "service": "N/A", "status": "Connection Failed", "message": str(e)}) return results