jdeip/app/eip_client.py

99 lines
4.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
from typing import Any, Dict, List, Optional
import httpx
from pydantic import BaseModel
from tenacity import retry, stop_after_attempt, wait_exponential
from .config import settings
class AuthResponse(BaseModel):
token: str
exp: Optional[int] = None # epoch seconds
class EipClient:
def __init__(self, base_url: str, username: str, password: str) -> None:
self.base_url = base_url.rstrip("/")
self.username = username
self.password = password
self._token: Optional[str] = None
self._token_exp: Optional[int] = None
self._client = httpx.Client(timeout=15.0)
def _is_token_valid(self) -> bool:
if not self._token or not self._token_exp:
return False
# refresh 60s earlier
return time.time() < (self._token_exp - 60)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def authenticate(self) -> AuthResponse:
url = f"{self.base_url}/client/auth"
resp = self._client.post(url, json={"username": self.username, "password": self.password})
resp.raise_for_status()
data = resp.json()
# 尝试从响应中提取 exp如果没有设置 1 小时后过期
exp = data.get("exp") or int(time.time()) + 3600
token = data.get("token") or data.get("X-Token") or data.get("access_token")
if not token:
# 兼容后端直接返回字符串 token 的情况
if isinstance(data, str) and data:
token = data
else:
raise ValueError("Auth response missing token")
self._token = token
self._token_exp = exp
return AuthResponse(token=token, exp=exp)
def _get_headers(self) -> Dict[str, str]:
if not self._is_token_valid():
self.authenticate()
assert self._token
return {"X-Token": self._token, "Accept": "application/json"}
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def list_cities(self) -> List[Dict[str, Any]]:
url = f"{self.base_url}/edge/city"
resp = self._client.get(url, headers=self._get_headers())
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else data.get("data", [])
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def list_devices(self, geo: str, offset: int, num: int) -> List[Dict[str, Any]]:
url = f"{self.base_url}/edge/device"
payload = {"geo": geo, "offset": offset, "num": num}
resp = self._client.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else data.get("data", [])
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def gateway_config_get(self, macaddr: str) -> Dict[str, Any]:
url = f"{self.base_url}/gateway/config/get"
resp = self._client.post(url, headers=self._get_headers(), json={"macaddr": macaddr})
resp.raise_for_status()
return resp.json()
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def gateway_config_set(self, macaddr: str, config: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self.base_url}/gateway/config/set"
payload = {"macaddr": macaddr, "config": config}
resp = self._client.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
return resp.json()
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def gateway_status(self, macaddr: str) -> Dict[str, Any]:
url = f"{self.base_url}/gateway/status"
resp = self._client.post(url, headers=self._get_headers(), json={"macaddr": macaddr})
resp.raise_for_status()
return resp.json()
client_singleton = EipClient(settings.eip_base_url, settings.eip_username, settings.eip_password)