jdeip/app/rotation_service.py

245 lines
7.6 KiB
Python
Raw Normal View History

2025-10-21 18:41:07 +08:00
"""
IP轮换服务模块
提供IP地址轮换功能包括
- 从可用设备中选择未使用的IP
- 配置网关路由规则
- 执行IP轮换操作
- 查询当前状态和统计信息
使用Redis存储使用记录支持按天统计IP使用情况
"""
import random
from typing import Any, Dict, List, Optional, Tuple
2025-11-06 15:36:43 +08:00
from .config import settings, city_dict, all_client_infos, port_mapping
from .eip_client import client_singleton as eip
from .redis_store import store_singleton as kv
2025-10-22 14:46:29 +08:00
from .line_status_manager import status_manager
def _extract_device_ip_and_id(device: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
2025-10-21 18:41:07 +08:00
"""
从设备信息中提取IP地址和边缘设备ID
由于后端返回结构可能不一致使用多种字段名进行容错提取
- IP字段ip, public_ip, eip, addr, address
- ID字段id, edge, mac, device_id
Args:
device: 设备信息字典
Returns:
Tuple[Optional[str], Optional[str]]: (IP地址, 边缘设备ID)
"""
# 尝试多种可能的IP字段名
ip = (
2025-10-21 18:41:07 +08:00
device.get("public")
)
2025-10-21 18:41:07 +08:00
macaddr = device.get("macaddr")
return (str(ip), str(macaddr))
2025-10-29 18:27:28 +08:00
def select_unused_ip(devices: List[Dict[str, Any]], limit: int = 4) -> List[Tuple[str, str]]:
2025-10-21 18:41:07 +08:00
"""
2025-10-29 18:27:28 +08:00
从设备列表中选择今天未使用的最多 N IP 地址
2025-10-21 18:41:07 +08:00
2025-10-29 18:27:28 +08:00
遍历设备列表收集今天未使用且在线数量未达上限的设备 IP 与对应的边缘设备 ID
最多返回 `limit` 个候选
2025-10-21 18:41:07 +08:00
Args:
devices: 设备信息列表
2025-10-29 18:27:28 +08:00
limit: 返回的候选数量上限默认 4
2025-10-21 18:41:07 +08:00
Returns:
2025-10-29 18:27:28 +08:00
List[Tuple[str, str]]: 列表元素为 (IP地址, 边缘设备ID)
2025-10-21 18:41:07 +08:00
"""
2025-10-29 18:27:28 +08:00
candidates: List[Tuple[str, str]] = []
2025-10-21 18:41:07 +08:00
for d in devices.get('edges'):
2025-10-24 14:47:24 +08:00
if d['online'] < settings.max_online:
ip, edge_id = _extract_device_ip_and_id(d)
if not ip:
continue
if not kv.is_ip_used_today(ip):
2025-10-29 18:27:28 +08:00
candidates.append((ip, edge_id))
if len(candidates) >= limit:
break
return candidates
2025-10-29 18:27:28 +08:00
def apply_gateway_route(edge_ids: Optional[str], ips: str, geo: str, client_id:str) -> Dict[str, Any]:
2025-10-21 18:41:07 +08:00
"""
将选中的边缘设备配置到网关路由规则中
创建路由规则并应用到指定的网关设备实现IP轮换
Args:
edge_id: 边缘设备ID
ip: IP地址用于日志记录实际路由基于edge_id
Returns:
Dict[str, Any]: 网关配置设置结果
"""
2025-10-22 14:46:29 +08:00
lines = status_manager.get_all_lines_status()
rule = []
for line in lines:
if line['id'] == client_id:
rule.append({
"table": client_id, # 路由表ID
"enable": True, # 启用规则
2025-10-29 18:27:28 +08:00
"edge": edge_ids, # 边缘设备列表
2025-11-06 15:36:43 +08:00
"network": [all_client_infos[str(client_id)]], # 网络配置(当前为空)
2025-10-22 14:46:29 +08:00
"cityhash": geo or "", # 城市哈希值
})
else:
rule.append({
"table": line['id'], # 路由表ID
"enable": True, # 启用规则
2025-10-30 12:25:42 +08:00
"edge": line['edge_device'].split(','), # 边缘设备列表
2025-11-06 15:36:43 +08:00
"network": [all_client_infos[str(line['id'])]], # 网络配置(当前为空)
2025-10-22 14:46:29 +08:00
"cityhash": line['geo_location'], # 城市哈希值
})
config = {"id": 1, "rules": rule} # 配置ID和规则列表
return eip.gateway_config_set(settings.eip_gateway_mac, config)
2025-10-21 18:41:07 +08:00
def rotate(client_id,cities) -> Dict[str, Any]:
2025-10-21 18:41:07 +08:00
"""
执行IP轮换操作
从指定地理位置获取设备列表选择未使用的IP进行轮换
Args:
cityhash: 城市哈希值用于指定地理位置
num: 获取设备数量默认为配置值或10
Returns:
Dict[str, Any]: 轮换结果
- changed: bool - 是否成功轮换
- reason: str - 失败原因当changed=False时
- ip: str - 新的IP地址当changed=True时
- edge: str - 边缘设备ID当changed=True时
- status: dict - 网关状态信息当changed=True时
Raises:
ValueError: 当cityhash为空时
"""
# 确定地理位置参数
geo = get_random_cityhash(cities)
2025-10-21 18:41:07 +08:00
# 确定获取设备数量
n = 1000
2025-10-21 18:41:07 +08:00
# 获取设备列表
devices = eip.list_devices(geo=geo, offset=0, num=n)
2025-10-21 18:41:07 +08:00
2025-10-29 18:27:28 +08:00
# 选择未使用的最多 4 个 IP
2025-10-30 12:25:42 +08:00
candidates = select_unused_ip(devices, limit=settings.ip_limit)
2025-10-29 18:27:28 +08:00
if not candidates:
return {"changed": False, "reason": "没有可用且今天未使用的 IP"}
2025-10-29 18:27:28 +08:00
# 取第一个用于生效,其余作为候选缓存
# ip, edge_id = candidates[0]
ips = [ip for ip, _ in candidates]
edge_ids = [edge_id for _, edge_id in candidates]
2025-10-21 18:41:07 +08:00
# 应用网关路由配置
2025-10-29 18:27:28 +08:00
_ = apply_gateway_route(edge_ids=edge_ids, ips=ips, geo= geo,client_id=client_id)
2025-10-21 18:41:07 +08:00
2025-10-29 18:27:28 +08:00
# 记录使用情况(仅首个生效 IP 计入已使用;其余作为候选缓存)
for num,ip in enumerate(ips):
kv.add_used_ip_today(ip)
kv.set_current(ip=ip, edge_id=edge_ids[num])
try:
kv.cache_candidates([ip])
except Exception:
pass
2025-10-21 18:41:07 +08:00
# 获取网关状态
2025-10-30 12:25:42 +08:00
# status = eip.gateway_status(settings.eip_gateway_mac)
2025-10-21 18:41:07 +08:00
2025-10-30 12:25:42 +08:00
return {"changed": True, "ip": ','.join(ips), "edges": ','.join(edge_ids), "status": "activate", "geo": geo, "candidates": [c[0] for c in candidates]}
def citie_list():
return {"data":list(city_dict.keys())}
def status() -> Dict[str, Any]:
2025-10-21 18:41:07 +08:00
"""
获取当前轮换服务状态
返回当前使用的IP今日使用统计和网关状态信息
Returns:
Dict[str, Any]: 状态信息
- current: dict - 当前使用的IP和设备信息
- used_today: int - 今日已使用的IP数量
- gateway: dict - 网关状态信息如果获取失败则为空字典
"""
# 获取当前使用的IP和设备信息
cur = kv.get_current()
2025-10-21 18:41:07 +08:00
# 获取今日使用统计
used_count = kv.get_used_count_today()
2025-10-21 18:41:07 +08:00
# 获取网关状态(容错处理)
gw = {}
try:
gw = eip.gateway_status(settings.eip_gateway_mac)
except Exception:
2025-10-21 18:41:07 +08:00
# 网关状态获取失败时使用空字典
pass
2025-10-21 18:41:07 +08:00
return {"current": cur, "used_today": used_count, "gateway": gw}
def get_random_city(city_list) -> Dict[str, str]:
2025-10-21 18:41:07 +08:00
"""
随机获取一个城市编码
从所有可用的城市中随机选择一个返回城市名称和对应的编码
Returns:
Dict[str, str]: 包含城市信息的字典
- province: 省份名称
- city: 城市名称
- cityhash: 城市编码
"""
# 收集所有城市信息
all_cities = []
for province, cities in city_dict.items():
if province in city_list:
for city, cityhash in cities.items():
all_cities.append({
"province": province,
"city": city,
"cityhash": cityhash
})
2025-10-21 18:41:07 +08:00
# 随机选择一个城市
if not all_cities:
raise ValueError("没有可用的城市数据")
selected_city = random.choice(all_cities)
return selected_city
def get_random_cityhash(cities) -> str:
2025-10-21 18:41:07 +08:00
"""
随机获取一个城市编码简化版本
Returns:
str: 随机城市编码
"""
city_info = get_random_city(cities)
2025-10-21 18:41:07 +08:00
return city_info["cityhash"]
2025-10-22 14:46:29 +08:00
def get_port(line_id):
return port_mapping[line_id]