jdeip/app/eip_client.py

198 lines
6.8 KiB
Python
Raw Normal View History

2025-10-21 18:41:07 +08:00
"""
EIP客户端模块
提供与EIPEdge IP服务交互的客户端功能包括认证设备管理网关配置等操作
支持自动重试token自动刷新等机制
"""
import time
from typing import Any, Dict, List, Optional
import httpx
from pydantic import BaseModel
from tenacity import retry, stop_after_attempt, wait_exponential
from .config import settings
class AuthResponse(BaseModel):
2025-10-21 18:41:07 +08:00
"""认证响应模型"""
token: str # 认证令牌
exp: Optional[int] = None # 过期时间epoch秒
class EipClient:
2025-10-21 18:41:07 +08:00
"""EIP客户端类用于与EIP服务进行交互"""
def __init__(self, base_url: str, username: str, password: str) -> None:
2025-10-21 18:41:07 +08:00
"""
初始化EIP客户端
Args:
base_url: EIP服务的基础URL
username: 用户名
password: 密码
"""
self.base_url = base_url.rstrip("/") # 移除末尾的斜杠
self.username = username
self.password = password
2025-10-21 18:41:07 +08:00
self._token: Optional[str] = None # 当前认证令牌
self._token_exp: Optional[int] = None # 令牌过期时间
self._client = httpx.Client(timeout=15.0) # HTTP客户端设置15秒超时
def _is_token_valid(self) -> bool:
2025-10-21 18:41:07 +08:00
"""
检查当前令牌是否有效
Returns:
bool: 令牌是否有效提前60秒刷新
"""
if not self._token or not self._token_exp:
return False
2025-10-21 18:41:07 +08:00
# 提前60秒刷新令牌避免在请求时过期
return time.time() < (self._token_exp - 60)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def authenticate(self) -> AuthResponse:
2025-10-21 18:41:07 +08:00
"""
进行用户认证获取访问令牌
支持多种令牌字段格式的兼容性处理包括
- token
- X-Token
- access_token
- 直接返回字符串token的情况
Returns:
AuthResponse: 包含令牌和过期时间的认证响应
Raises:
ValueError: 当响应中缺少令牌时
"""
url = f"{self.base_url}/client/auth"
resp = self._client.post(url, json={"username": self.username, "password": self.password})
resp.raise_for_status()
data = resp.json()
2025-10-21 18:41:07 +08:00
data_token = data.get("data")
# 尝试从响应中提取过期时间如果没有设置1小时后过期
exp = data.get("exp") or int(time.time()) + 3600
2025-10-21 18:41:07 +08:00
# 尝试多种可能的令牌字段名
token = data.get("token") or data.get("X-Token") or data.get("access_token")
if not token:
2025-10-21 18:41:07 +08:00
# 兼容后端直接返回字符串token的情况
if isinstance(data_token, str) and data:
token = data_token
else:
raise ValueError("Auth response missing token")
2025-10-21 18:41:07 +08:00
# 保存令牌和过期时间
self._token = token
self._token_exp = exp
return AuthResponse(token=token, exp=exp)
def _get_headers(self) -> Dict[str, str]:
2025-10-21 18:41:07 +08:00
"""
获取包含认证令牌的请求头
如果令牌无效或即将过期会自动重新认证
Returns:
Dict[str, str]: 包含认证令牌和Accept头的字典
"""
if not self._is_token_valid():
self.authenticate()
assert self._token
return {"X-Token": self._token, "Accept": "application/json"}
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def list_cities(self) -> List[Dict[str, Any]]:
2025-10-21 18:41:07 +08:00
"""
获取可用城市列表
Returns:
List[Dict[str, Any]]: 城市信息列表
"""
url = f"{self.base_url}/edge/city"
resp = self._client.get(url, headers=self._get_headers())
resp.raise_for_status()
data = resp.json()
2025-10-21 18:41:07 +08:00
# 兼容不同的响应格式直接返回列表或包含data字段的对象
return data if isinstance(data, list) else data.get("data", [])
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def list_devices(self, geo: str, offset: int, num: int) -> List[Dict[str, Any]]:
2025-10-21 18:41:07 +08:00
"""
获取指定地理位置的设备列表
Args:
geo: 地理位置标识
offset: 偏移量分页
num: 返回数量
Returns:
List[Dict[str, Any]]: 设备信息列表
"""
url = f"{self.base_url}/edge/device"
payload = {"geo": geo, "offset": offset, "num": num}
resp = self._client.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
data = resp.json()
2025-10-21 18:41:07 +08:00
# 兼容不同的响应格式直接返回列表或包含data字段的对象
return data if isinstance(data, list) else data.get("data", [])
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def gateway_config_get(self, macaddr: str) -> Dict[str, Any]:
2025-10-21 18:41:07 +08:00
"""
获取网关配置信息
Args:
macaddr: 网关MAC地址
Returns:
Dict[str, Any]: 网关配置信息
"""
url = f"{self.base_url}/gateway/config/get"
resp = self._client.post(url, headers=self._get_headers(), json={"macaddr": macaddr})
resp.raise_for_status()
return resp.json()
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def gateway_config_set(self, macaddr: str, config: Dict[str, Any]) -> Dict[str, Any]:
2025-10-21 18:41:07 +08:00
"""
设置网关配置信息
Args:
macaddr: 网关MAC地址
config: 配置信息字典
Returns:
Dict[str, Any]: 设置结果
"""
url = f"{self.base_url}/gateway/config/set"
payload = {"macaddr": macaddr, "config": config}
resp = self._client.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
return resp.json()
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def gateway_status(self, macaddr: str) -> Dict[str, Any]:
2025-10-21 18:41:07 +08:00
"""
获取网关状态信息
Args:
macaddr: 网关MAC地址
Returns:
Dict[str, Any]: 网关状态信息
"""
url = f"{self.base_url}/gateway/status"
resp = self._client.post(url, headers=self._get_headers(), json={"macaddr": macaddr})
resp.raise_for_status()
return resp.json()
2025-10-21 18:41:07 +08:00
# 创建全局单例客户端实例
client_singleton = EipClient(settings.eip_base_url, settings.eip_username, settings.eip_password)