# EC2 EIP Rotator — Auto-Region + Loading Overlay (Optimized) # ---------------------------------------------------------------------------- # 只输入“当前 EIP 公网IP”→ 自动定位其所在区域与实例 → 分配新EIP → 绑定 → 可选释放旧EIP # 启动: # pip install -r requirements.txt # python -m uvicorn app:app --host 0.0.0.0 --port 9099 # # .env(仅作为获取区域清单的起始区;真正操作区由自动探测结果决定): # AWS_REGION=ap-northeast-1 # AWS_ACCESS_KEY_ID=AKIA... # AWS_SECRET_ACCESS_KEY=xxxx... from __future__ import annotations import os import re import time import signal import sys from datetime import datetime, timezone from typing import List, Dict, Any, Optional import boto3 from botocore.exceptions import ClientError from fastapi import FastAPI, HTTPException, Body, Response from fastapi.responses import HTMLResponse from fastapi.middleware.cors import CORSMiddleware from jinja2 import Template from dotenv import load_dotenv load_dotenv() # 仅用作“起始区域”去拿区域清单;真正的操作区域由自动探测结果决定 SEED_REGION = os.getenv("AWS_REGION", os.getenv("AWS_DEFAULT_REGION", "us-east-1")) # 轻量重试参数(关联/释放/重启时用) RETRY_ON = { "IncorrectInstanceState", "RequestLimitExceeded", "Throttling", "ThrottlingException", "DependencyViolation", "InvalidInstanceID.NotFound", "InvalidInstanceState", } MAX_RETRY = 3 RETRY_BASE_SLEEP = 0.8 # s # IPv4 简单校验(不含端口/掩码) IPv4_RE = re.compile( r"^(25[0-5]|2[0-4]\d|[01]?\d?\d)\." r"(25[0-5]|2[0-4]\d|[01]?\d?\d)\." r"(25[0-5]|2[0-4]\d|[01]?\d?\d)\." r"(25[0-5]|2[0-4]\d|[01]?\d?\d)$" ) app = FastAPI(title="EC2 EIP Rotator (Auto-Region, Optimized)") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ---------------- helpers ---------------- def ec2_in(region: str): return boto3.client("ec2", region_name=region) def sts_in(region: str): return boto3.client("sts", region_name=region) def now_ts() -> str: return datetime.now(timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S") def account_id(seed_region: str) -> str: try: return sts_in(seed_region).get_caller_identity()["Account"] except Exception: return "-" def list_regions(seed_region: str) -> List[str]: """从 seed 区域取区域清单;失败则回退 us-east-1。""" try: regions = ec2_in(seed_region).describe_regions(AllRegions=False)["Regions"] except Exception: regions = ec2_in("us-east-1").describe_regions(AllRegions=False)["Regions"] return [r["RegionName"] for r in regions] def json_error(message: str, status: int = 400): raise HTTPException(status_code=status, detail=message) def is_ipv4(ip: str) -> bool: return bool(IPv4_RE.match(ip or "")) def with_retry(fn, *args, **kwargs): """对偶发状态/限流做轻量重试""" for i in range(MAX_RETRY): try: return fn(*args, **kwargs) except ClientError as e: code = e.response.get("Error", {}).get("Code") if code in RETRY_ON and i < MAX_RETRY - 1: time.sleep(RETRY_BASE_SLEEP * (2 ** i)) continue raise # 简单内存日志 LOGS: List[Dict[str, Any]] = [] def append_log(row: Dict[str, Any]): LOGS.append(row) if len(LOGS) > 5000: del LOGS[:1000] # ---------------- API ---------------- @app.get("/api/logs") def get_logs(): return {"logs": LOGS} @app.post("/api/logs/clear") def clear_logs(): LOGS.clear() return {"ok": True} @app.post("/api/restart") def restart_ec2_instance(body: Dict[str, Any] = Body(...)): """ 重启AWS EC2实例功能 根据IP地址找到对应的EC2实例并重启 """ current_ip = (body.get("current_ip") or "").strip() if not current_ip: json_error("current_ip required", 422) if not is_ipv4(current_ip): json_error(f"invalid IPv4: {current_ip}", 422) acct = account_id(SEED_REGION) regions = list_regions(SEED_REGION) home_region: Optional[str] = None addr: Optional[Dict[str, Any]] = None # 1) 自动查找 IP 归属区域 for region in regions: try: out = ec2_in(region).describe_addresses(PublicIps=[current_ip]) arr = out.get("Addresses", []) if arr: home_region = region addr = arr[0] break except ClientError as e: code = e.response.get("Error", {}).get("Code") # 不是该区会抛 InvalidAddress.NotFound / InvalidParameterValue,忽略 if code in ("InvalidAddress.NotFound", "InvalidParameterValue"): continue json_error(str(e), 400) if not home_region or not addr: remark = "EIP 不属于当前账号的任一区域,或不是 Elastic IP" append_log({ "time": now_ts(), "region": "-", "instance_name": "-", "instance_id": "-", "arn": "-", "old_ip": current_ip, "new_ip": "-", "retry": 0, "status": "FAIL", "remark": remark, }) json_error(f"EIP {current_ip} not found in any region of this account", 404) # 2) 获取实例信息 ec2 = ec2_in(home_region) inst_id = addr.get("InstanceId") if not inst_id: remark = "该 EIP 未绑定到任何实例" append_log({ "time": now_ts(), "region": home_region, "instance_name": "-", "instance_id": "-", "arn": "-", "old_ip": current_ip, "new_ip": "-", "retry": 0, "status": "FAIL", "remark": remark, }) json_error(f"EIP {current_ip} is not attached to any instance", 400) # 实例名(日志显示) inst_name = inst_id try: di = ec2.describe_instances(InstanceIds=[inst_id]) for r in di.get("Reservations", []): for ins in r.get("Instances", []): for t in ins.get("Tags", []): if t.get("Key") == "Name": inst_name = t.get("Value") except Exception: pass try: # 重启EC2实例 with_retry(ec2.reboot_instances, InstanceIds=[inst_id]) append_log({ "time": now_ts(), "region": home_region, "instance_name": inst_name, "instance_id": inst_id, "arn": f"arn:aws:ec2:{home_region}:{acct}:instance/{inst_id}", "old_ip": current_ip, "new_ip": current_ip, # IP不变 "retry": 0, "status": "OK", "remark": "EC2实例重启成功", }) return { "ok": True, "region": home_region, "instance_id": inst_id, "instance_name": inst_name, "message": "EC2实例重启成功" } except ClientError as e: append_log({ "time": now_ts(), "region": home_region, "instance_name": inst_name, "instance_id": inst_id, "arn": f"arn:aws:ec2:{home_region}:{acct}:instance/{inst_id}", "old_ip": current_ip, "new_ip": "-", "retry": 0, "status": "FAIL", "remark": f"重启失败: {str(e)}", }) json_error(f"重启EC2实例失败: {str(e)}", 400) @app.get("/healthz") def healthz(): return {"ok": True, "seed_region": SEED_REGION} # 静音 favicon,避免控制台 404 噪音 @app.get("/favicon.ico", include_in_schema=False) def favicon(): return Response(status_code=204) @app.post("/api/rotate_by_ip") def rotate_by_ip(body: Dict[str, Any] = Body(...)): """ 入参: { "current_ip": "1.2.3.4", "release_old": true } 步骤: 1) 枚举区域, 用 describe_addresses(PublicIps=[ip]) 找到归属区 2) 取 InstanceId -> allocate_address(Domain='vpc') -> associate -> (可选) release 旧EIP """ current_ip = (body.get("current_ip") or "").strip() release_old = bool(body.get("release_old", True)) if not current_ip: json_error("current_ip required", 422) if not is_ipv4(current_ip): json_error(f"invalid IPv4: {current_ip}", 422) acct = account_id(SEED_REGION) regions = list_regions(SEED_REGION) home_region: Optional[str] = None addr: Optional[Dict[str, Any]] = None # 1) 自动查找 IP 归属区域 for region in regions: try: out = ec2_in(region).describe_addresses(PublicIps=[current_ip]) arr = out.get("Addresses", []) if arr: home_region = region addr = arr[0] break except ClientError as e: code = e.response.get("Error", {}).get("Code") # 不是该区会抛 InvalidAddress.NotFound / InvalidParameterValue,忽略 if code in ("InvalidAddress.NotFound", "InvalidParameterValue"): continue json_error(str(e), 400) if not home_region or not addr: remark = "EIP 不属于当前账号的任一区域,或不是 Elastic IP" append_log({ "time": now_ts(), "region": "-", "instance_name": "-", "instance_id": "-", "arn": "-", "old_ip": current_ip, "new_ip": "-", "retry": 0, "status": "FAIL", "remark": remark, }) json_error(f"EIP {current_ip} not found in any region of this account", 404) # 2) 在归属区执行更换 ec2 = ec2_in(home_region) inst_id = addr.get("InstanceId") old_alloc = addr.get("AllocationId") if not inst_id: remark = "该 EIP 未绑定到任何实例" append_log({ "time": now_ts(), "region": home_region, "instance_name": "-", "instance_id": "-", "arn": "-", "old_ip": current_ip, "new_ip": "-", "retry": 0, "status": "FAIL", "remark": remark, }) json_error(f"EIP {current_ip} is not attached to any instance", 400) # 实例名(日志显示) inst_name = inst_id try: di = ec2.describe_instances(InstanceIds=[inst_id]) for r in di.get("Reservations", []): for ins in r.get("Instances", []): for t in ins.get("Tags", []): if t.get("Key") == "Name": inst_name = t.get("Value") except Exception: pass try: # allocate new new_addr = with_retry(ec2.allocate_address, Domain="vpc") new_alloc = new_addr["AllocationId"] new_public_ip = new_addr["PublicIp"] # 给新 EIP 打标签(便于审计/回溯) try: ec2.create_tags( Resources=[new_alloc], Tags=[ {"Key": "RotatedFrom", "Value": current_ip}, {"Key": "RotatedAt", "Value": now_ts()}, {"Key": "Rotator", "Value": "auto-region-ui"}, ], ) except ClientError: # 标签失败不影响主流程 pass # associate to instance (force) with_retry( ec2.associate_address, AllocationId=new_alloc, InstanceId=inst_id, AllowReassociation=True, ) # optional release old remark = "success" released = False if release_old and old_alloc and old_alloc != new_alloc: try: with_retry(ec2.release_address, AllocationId=old_alloc) released = True except ClientError as re: remark = f"old_release_error: {re}" append_log({ "time": now_ts(), "region": home_region, "instance_name": inst_name, "instance_id": inst_id, "arn": f"arn:aws:ec2:{home_region}:{acct}:instance/{inst_id}", "old_ip": current_ip, "new_ip": new_public_ip, "retry": 0, "status": "OK", "remark": remark, }) return { "ok": True, "region": home_region, "instance_id": inst_id, "old_ip": current_ip, "new_ip": new_public_ip, "old_released": released } except ClientError as e: append_log({ "time": now_ts(), "region": home_region, "instance_name": inst_name, "instance_id": inst_id, "arn": f"arn:aws:ec2:{home_region}:{acct}:instance/{inst_id}", "old_ip": current_ip, "new_ip": "-", "retry": 0, "status": "FAIL", "remark": str(e), }) json_error(str(e), 400) # ---------------- UI ---------------- INDEX_HTML = Template(r"""
| 时间 | 区域 | 实例名 | 实例ID(ARN) | 旧 IP | 新 IP | 重试 | 状态 | 备注 |
|---|