From ada666fa6b1a8605974ec72b2e8dc0004c00fa21 Mon Sep 17 00:00:00 2001 From: wangqifan Date: Wed, 22 Oct 2025 11:03:20 +0800 Subject: [PATCH] first commit --- app.py | 477 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 477 insertions(+) create mode 100644 app.py diff --git a/app.py b/app.py new file mode 100644 index 0000000..22076d2 --- /dev/null +++ b/app.py @@ -0,0 +1,477 @@ +# EC2 EIP Rotator — Auto-Region + Loading Overlay (Optimized) +# ---------------------------------------------------------------------------- +# 只输入“当前 EIP 公网IP”→ 自动定位其所在区域与实例 → 分配新EIP → 绑定 → 可选释放旧EIP +# 启动: +# pip install -r requirements.txt +# python -m uvicorn app:app --host 0.0.0.0 --port 9099 +# +# .env(仅作为获取区域清单的起始区;真正操作区由自动探测结果决定): +# AWS_REGION=ap-northeast-1 +# AWS_ACCESS_KEY_ID=AKIA... +# AWS_SECRET_ACCESS_KEY=xxxx... + +from __future__ import annotations + +import os +import re +import time +from datetime import datetime, timezone +from typing import List, Dict, Any, Optional + +import boto3 +from botocore.exceptions import ClientError +from fastapi import FastAPI, HTTPException, Body, Response +from fastapi.responses import HTMLResponse +from fastapi.middleware.cors import CORSMiddleware +from jinja2 import Template +from dotenv import load_dotenv + +load_dotenv() + +# 仅用作“起始区域”去拿区域清单;真正的操作区域由自动探测结果决定 +SEED_REGION = os.getenv("AWS_REGION", os.getenv("AWS_DEFAULT_REGION", "us-east-1")) + +# 轻量重试参数(关联/释放时用) +RETRY_ON = { + "IncorrectInstanceState", + "RequestLimitExceeded", + "Throttling", + "ThrottlingException", + "DependencyViolation", +} +MAX_RETRY = 3 +RETRY_BASE_SLEEP = 0.8 # s + +# IPv4 简单校验(不含端口/掩码) +IPv4_RE = re.compile( + r"^(25[0-5]|2[0-4]\d|[01]?\d?\d)\." + r"(25[0-5]|2[0-4]\d|[01]?\d?\d)\." + r"(25[0-5]|2[0-4]\d|[01]?\d?\d)\." + r"(25[0-5]|2[0-4]\d|[01]?\d?\d)$" +) + +app = FastAPI(title="EC2 EIP Rotator (Auto-Region, Optimized)") +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# ---------------- helpers ---------------- +def ec2_in(region: str): + return boto3.client("ec2", region_name=region) + +def sts_in(region: str): + return boto3.client("sts", region_name=region) + +def now_ts() -> str: + return datetime.now(timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S") + +def account_id(seed_region: str) -> str: + try: + return sts_in(seed_region).get_caller_identity()["Account"] + except Exception: + return "-" + +def list_regions(seed_region: str) -> List[str]: + """从 seed 区域取区域清单;失败则回退 us-east-1。""" + try: + regions = ec2_in(seed_region).describe_regions(AllRegions=False)["Regions"] + except Exception: + regions = ec2_in("us-east-1").describe_regions(AllRegions=False)["Regions"] + return [r["RegionName"] for r in regions] + +def json_error(message: str, status: int = 400): + raise HTTPException(status_code=status, detail=message) + +def is_ipv4(ip: str) -> bool: + return bool(IPv4_RE.match(ip or "")) + +def with_retry(fn, *args, **kwargs): + """对偶发状态/限流做轻量重试""" + for i in range(MAX_RETRY): + try: + return fn(*args, **kwargs) + except ClientError as e: + code = e.response.get("Error", {}).get("Code") + if code in RETRY_ON and i < MAX_RETRY - 1: + time.sleep(RETRY_BASE_SLEEP * (2 ** i)) + continue + raise + +# 简单内存日志 +LOGS: List[Dict[str, Any]] = [] +def append_log(row: Dict[str, Any]): + LOGS.append(row) + if len(LOGS) > 5000: + del LOGS[:1000] + +# ---------------- API ---------------- +@app.get("/api/logs") +def get_logs(): + return {"logs": LOGS} + +@app.post("/api/logs/clear") +def clear_logs(): + LOGS.clear() + return {"ok": True} + +@app.get("/healthz") +def healthz(): + return {"ok": True, "seed_region": SEED_REGION} + +# 静音 favicon,避免控制台 404 噪音 +@app.get("/favicon.ico", include_in_schema=False) +def favicon(): + return Response(status_code=204) + +@app.post("/api/rotate_by_ip") +def rotate_by_ip(body: Dict[str, Any] = Body(...)): + """ + 入参: + { + "current_ip": "1.2.3.4", + "release_old": true + } + 步骤: + 1) 枚举区域, 用 describe_addresses(PublicIps=[ip]) 找到归属区 + 2) 取 InstanceId -> allocate_address(Domain='vpc') -> associate -> (可选) release 旧EIP + """ + current_ip = (body.get("current_ip") or "").strip() + release_old = bool(body.get("release_old", True)) + + if not current_ip: + json_error("current_ip required", 422) + if not is_ipv4(current_ip): + json_error(f"invalid IPv4: {current_ip}", 422) + + acct = account_id(SEED_REGION) + regions = list_regions(SEED_REGION) + + home_region: Optional[str] = None + addr: Optional[Dict[str, Any]] = None + + # 1) 自动查找 IP 归属区域 + for region in regions: + try: + out = ec2_in(region).describe_addresses(PublicIps=[current_ip]) + arr = out.get("Addresses", []) + if arr: + home_region = region + addr = arr[0] + break + except ClientError as e: + code = e.response.get("Error", {}).get("Code") + # 不是该区会抛 InvalidAddress.NotFound / InvalidParameterValue,忽略 + if code in ("InvalidAddress.NotFound", "InvalidParameterValue"): + continue + json_error(str(e), 400) + + if not home_region or not addr: + remark = "EIP 不属于当前账号的任一区域,或不是 Elastic IP" + append_log({ + "time": now_ts(), "region": "-", "instance_name": "-", + "instance_id": "-", "arn": "-", "old_ip": current_ip, + "new_ip": "-", "retry": 0, "status": "FAIL", "remark": remark, + }) + json_error(f"EIP {current_ip} not found in any region of this account", 404) + + # 2) 在归属区执行更换 + ec2 = ec2_in(home_region) + inst_id = addr.get("InstanceId") + old_alloc = addr.get("AllocationId") + + if not inst_id: + remark = "该 EIP 未绑定到任何实例" + append_log({ + "time": now_ts(), "region": home_region, "instance_name": "-", + "instance_id": "-", "arn": "-", "old_ip": current_ip, + "new_ip": "-", "retry": 0, "status": "FAIL", "remark": remark, + }) + json_error(f"EIP {current_ip} is not attached to any instance", 400) + + # 实例名(日志显示) + inst_name = inst_id + try: + di = ec2.describe_instances(InstanceIds=[inst_id]) + for r in di.get("Reservations", []): + for ins in r.get("Instances", []): + for t in ins.get("Tags", []): + if t.get("Key") == "Name": + inst_name = t.get("Value") + except Exception: + pass + + try: + # allocate new + new_addr = with_retry(ec2.allocate_address, Domain="vpc") + new_alloc = new_addr["AllocationId"] + new_public_ip = new_addr["PublicIp"] + + # 给新 EIP 打标签(便于审计/回溯) + try: + ec2.create_tags( + Resources=[new_alloc], + Tags=[ + {"Key": "RotatedFrom", "Value": current_ip}, + {"Key": "RotatedAt", "Value": now_ts()}, + {"Key": "Rotator", "Value": "auto-region-ui"}, + ], + ) + except ClientError: + # 标签失败不影响主流程 + pass + + # associate to instance (force) + with_retry( + ec2.associate_address, + AllocationId=new_alloc, + InstanceId=inst_id, + AllowReassociation=True, + ) + + # optional release old + remark = "success" + released = False + if release_old and old_alloc and old_alloc != new_alloc: + try: + with_retry(ec2.release_address, AllocationId=old_alloc) + released = True + except ClientError as re: + remark = f"old_release_error: {re}" + + append_log({ + "time": now_ts(), + "region": home_region, + "instance_name": inst_name, + "instance_id": inst_id, + "arn": f"arn:aws:ec2:{home_region}:{acct}:instance/{inst_id}", + "old_ip": current_ip, + "new_ip": new_public_ip, + "retry": 0, + "status": "OK", + "remark": remark, + }) + + return { + "ok": True, + "region": home_region, + "instance_id": inst_id, + "old_ip": current_ip, + "new_ip": new_public_ip, + "old_released": released + } + + except ClientError as e: + append_log({ + "time": now_ts(), + "region": home_region, + "instance_name": inst_name, + "instance_id": inst_id, + "arn": f"arn:aws:ec2:{home_region}:{acct}:instance/{inst_id}", + "old_ip": current_ip, + "new_ip": "-", + "retry": 0, + "status": "FAIL", + "remark": str(e), + }) + json_error(str(e), 400) + +# ---------------- UI ---------------- + +INDEX_HTML = Template(r""" + + + + + +Lightsail IP 更换面板(EC2 自动区域) + + + +
+

Lightsail IP 更换面板(EC2 自动区域)

+ +
+ • 请输入当前绑定在实例上的 EIP 公网IP,系统会自动定位所属区域与实例,分配EIP并绑定,然后(可选)释放旧EIP。
+ • 无需选择区域/实例;需权限:DescribeAddresses / AllocateAddress / AssociateAddress / ReleaseAddress。
+
+ +
+
+ 当前 EIP 公网IP + + + +
+
+ +
+ +
+
+
更换记录
+
+ + +
+
+ + + + + + + +
时间区域实例名实例ID(ARN)旧 IP新 IP重试状态备注
+
+
+ + + + + + + +""") + +@app.get("/", response_class=HTMLResponse) +def index(): + return HTMLResponse(INDEX_HTML.render())