2025-10-22 11:38:04 +08:00

467 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Lightsail Static IP Rotator — Auto-Region + Loading Overlay (Optimized)
# ----------------------------------------------------------------------------
# 只输入“当前静态IP公网IP”→ 自动定位其所在区域与实例 → 分配新静态IP → 绑定 → 可选释放旧静态IP
# 启动:
# pip install -r requirements.txt
# python -m uvicorn app:app --host 0.0.0.0 --port 9099
#
# .env仅作为获取区域清单的起始区真正操作区由自动探测结果决定:
# AWS_REGION=ap-northeast-1
# AWS_ACCESS_KEY_ID=AKIA...
# AWS_SECRET_ACCESS_KEY=xxxx...
from __future__ import annotations
import os
import re
import time
from datetime import datetime, timezone
from typing import List, Dict, Any, Optional
import boto3
from botocore.exceptions import ClientError
from fastapi import FastAPI, HTTPException, Body, Response
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from jinja2 import Template
from dotenv import load_dotenv
load_dotenv()
# 仅用作“起始区域”去拿区域清单;真正的操作区域由自动探测结果决定
SEED_REGION = os.getenv("AWS_REGION", os.getenv("AWS_DEFAULT_REGION", "us-east-1"))
# 轻量重试参数(关联/释放时用)
RETRY_ON = {
"IncorrectInstanceState",
"RequestLimitExceeded",
"Throttling",
"ThrottlingException",
"DependencyViolation",
}
MAX_RETRY = 3
RETRY_BASE_SLEEP = 0.8 # s
# IPv4 简单校验(不含端口/掩码)
IPv4_RE = re.compile(
r"^(25[0-5]|2[0-4]\d|[01]?\d?\d)\."
r"(25[0-5]|2[0-4]\d|[01]?\d?\d)\."
r"(25[0-5]|2[0-4]\d|[01]?\d?\d)\."
r"(25[0-5]|2[0-4]\d|[01]?\d?\d)$"
)
app = FastAPI(title="Lightsail Static IP Rotator (Auto-Region, Optimized)")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------- helpers ----------------
def lightsail_in(region: str):
return boto3.client("lightsail", region_name=region)
def sts_in(region: str):
return boto3.client("sts", region_name=region)
def now_ts() -> str:
return datetime.now(timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S")
def account_id(seed_region: str) -> str:
try:
return sts_in(seed_region).get_caller_identity()["Account"]
except Exception:
return "-"
def list_regions(seed_region: str) -> List[str]:
"""从 seed 区域取区域清单;失败则回退 us-east-1。"""
try:
regions = lightsail_in(seed_region).get_regions()["regions"]
except Exception:
regions = lightsail_in("us-east-1").get_regions()["regions"]
return [r["name"] for r in regions]
def json_error(message: str, status: int = 400):
raise HTTPException(status_code=status, detail=message)
def is_ipv4(ip: str) -> bool:
return bool(IPv4_RE.match(ip or ""))
def with_retry(fn, *args, **kwargs):
"""对偶发状态/限流做轻量重试"""
for i in range(MAX_RETRY):
try:
return fn(*args, **kwargs)
except ClientError as e:
code = e.response.get("Error", {}).get("Code")
if code in RETRY_ON and i < MAX_RETRY - 1:
time.sleep(RETRY_BASE_SLEEP * (2 ** i))
continue
raise
# 简单内存日志
LOGS: List[Dict[str, Any]] = []
def append_log(row: Dict[str, Any]):
LOGS.append(row)
if len(LOGS) > 5000:
del LOGS[:1000]
# ---------------- API ----------------
@app.get("/api/logs")
def get_logs():
return {"logs": LOGS}
@app.post("/api/logs/clear")
def clear_logs():
LOGS.clear()
return {"ok": True}
@app.get("/healthz")
def healthz():
return {"ok": True, "seed_region": SEED_REGION}
# 静音 favicon避免控制台 404 噪音
@app.get("/favicon.ico", include_in_schema=False)
def favicon():
return Response(status_code=204)
@app.post("/api/rotate_by_ip")
def rotate_by_ip(body: Dict[str, Any] = Body(...)):
"""
入参:
{
"current_ip": "1.2.3.4",
"release_old": true
}
步骤:
1) 枚举区域, 用 get_static_ips() 找到归属区
2) 取 InstanceName -> allocate_static_ip() -> attach_static_ip() -> (可选) release 旧静态IP
"""
current_ip = (body.get("current_ip") or "").strip()
release_old = bool(body.get("release_old", True))
if not current_ip:
json_error("current_ip required", 422)
if not is_ipv4(current_ip):
json_error(f"invalid IPv4: {current_ip}", 422)
acct = account_id(SEED_REGION)
regions = list_regions(SEED_REGION)
home_region: Optional[str] = None
addr: Optional[Dict[str, Any]] = None
# 1) 自动查找 IP 归属区域
for region in regions:
try:
out = lightsail_in(region).get_static_ips()
arr = out.get("staticIps", [])
for static_ip in arr:
if static_ip.get("ipAddress") == current_ip:
home_region = region
addr = static_ip
break
if home_region:
break
except ClientError as e:
code = e.response.get("Error", {}).get("Code")
# 不是该区会抛 NotFoundException忽略
if code in ("NotFoundException", "InvalidParameterValue"):
continue
json_error(str(e), 400)
if not home_region or not addr:
remark = "静态IP 不属于当前账号的任一区域,或不是 Lightsail 静态IP"
append_log({
"time": now_ts(), "region": "-", "instance_name": "-",
"instance_id": "-", "arn": "-", "old_ip": current_ip,
"new_ip": "-", "retry": 0, "status": "FAIL", "remark": remark,
})
json_error(f"静态IP {current_ip} not found in any region of this account", 404)
# 2) 在归属区执行更换
lightsail = lightsail_in(home_region)
inst_name = addr.get("attachedTo")
old_static_ip_name = addr.get("name")
if not inst_name:
remark = "该静态IP 未绑定到任何实例"
append_log({
"time": now_ts(), "region": home_region, "instance_name": "-",
"instance_id": "-", "arn": "-", "old_ip": current_ip,
"new_ip": "-", "retry": 0, "status": "FAIL", "remark": remark,
})
json_error(f"静态IP {current_ip} is not attached to any instance", 400)
# 实例名(日志显示)
try:
di = lightsail.get_instance(instanceName=inst_name)
inst_name = di.get("instance", {}).get("name", inst_name)
except Exception:
pass
try:
# allocate new static IP
new_static_ip_name = f"rotated-{int(time.time())}"
with_retry(lightsail.allocate_static_ip, staticIpName=new_static_ip_name)
# 获取新静态IP的详细信息
new_static_ip_info = lightsail.get_static_ip(staticIpName=new_static_ip_name)
new_public_ip = new_static_ip_info["staticIp"]["ipAddress"]
# attach to instance
with_retry(
lightsail.attach_static_ip,
staticIpName=new_static_ip_name,
instanceName=inst_name,
)
# optional release old
remark = "success"
released = False
if release_old and old_static_ip_name and old_static_ip_name != new_static_ip_name:
try:
# 先分离旧静态IP
with_retry(lightsail.detach_static_ip, staticIpName=old_static_ip_name)
# 再释放旧静态IP
with_retry(lightsail.release_static_ip, staticIpName=old_static_ip_name)
released = True
except ClientError as re:
remark = f"old_release_error: {re}"
append_log({
"time": now_ts(),
"region": home_region,
"instance_name": inst_name,
"instance_id": inst_name, # Lightsail使用实例名作为ID
"arn": f"arn:aws:lightsail:{home_region}:{acct}:instance/{inst_name}",
"old_ip": current_ip,
"new_ip": new_public_ip,
"retry": 0,
"status": "OK",
"remark": remark,
})
return {
"ok": True,
"region": home_region,
"instance_id": inst_name,
"old_ip": current_ip,
"new_ip": new_public_ip,
"old_released": released
}
except ClientError as e:
append_log({
"time": now_ts(),
"region": home_region,
"instance_name": inst_name,
"instance_id": inst_name,
"arn": f"arn:aws:lightsail:{home_region}:{acct}:instance/{inst_name}",
"old_ip": current_ip,
"new_ip": "-",
"retry": 0,
"status": "FAIL",
"remark": str(e),
})
json_error(str(e), 400)
# ---------------- UI ----------------
INDEX_HTML = Template(r"""
<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8"/>
<meta name="viewport" content="width=device-width, initial-scale=1"/>
<title>Lightsail 静态IP 更换面板(自动区域)</title>
<style>
:root { color-scheme: dark; }
body { margin:0; background:#0b1020; color:#e2e8f0; font-family: system-ui,-apple-system,Segoe UI,Roboto,Helvetica,Arial,"PingFang SC","Microsoft YaHei",sans-serif; }
.wrap { max-width: 1100px; margin: 24px auto; padding: 0 12px; }
h1 { margin: 0 0 14px; font-size: 22px; }
.card { background:#111827; border:1px solid #1f2937; border-radius:12px; padding:14px; }
.row { display:flex; flex-wrap:wrap; align-items:center; gap:10px; }
.hint { font-size:12px; opacity:.85; line-height:1.7; }
input, button { background:#0b1225; color:#e2e8f0; border:1px solid #27324f; border-radius:10px; padding:10px 12px; }
input[type=text]{ min-width:320px; }
button.primary { background:#2563eb; border-color:#2563eb; cursor: pointer; }
button.danger { background:#b91c1c; border-color:#b91c1c; cursor: pointer; }
.mono { font-family: ui-monospace,Menlo,Consolas,monospace; }
table { width:100%; border-collapse: collapse; margin-top:10px; }
th,td { text-align:left; border-bottom:1px solid #1f2937; padding:8px; font-size:13px; vertical-align: top; }
.space { height: 12px; }
/* Loading Overlay */
.overlay {
position: fixed; inset: 0; background: rgba(2,6,23,.72);
display: none; align-items: center; justify-content: center; z-index: 9999;
}
.overlay.show { display: flex; }
.loader {
display:flex; flex-direction:column; align-items:center; gap:12px;
background:#0f172a; border:1px solid #1f2937; padding:24px 28px; border-radius:14px;
box-shadow:0 12px 36px rgba(0,0,0,.45);
}
.spinner {
width: 38px; height: 38px; border-radius: 50%;
border: 3px solid #334155; border-top-color: #60a5fa;
animation: spin 0.9s linear infinite;
}
@keyframes spin { to { transform: rotate(360deg); } }
.muted { opacity:.75; }
</style>
</head>
<body>
<div class="wrap">
<h1>Lightsail 静态IP 更换面板(自动区域)</h1>
<div class="card hint" style="margin-bottom:12px">
• 请输入<b>当前绑定在实例上的静态IP公网IP</b>,系统会<b>自动定位所属区域与实例</b>,分配<b>新</b>静态IP并绑定然后可选释放旧静态IP。<br/>
• 无需选择区域/实例需权限GetStaticIps / AllocateStaticIp / AttachStaticIp / DetachStaticIp / ReleaseStaticIp。<br/>
</div>
<div class="card">
<div class="row">
<span>当前静态IP 公网IP</span>
<input id="inpIp" type="text" placeholder="例如 18.183.203.98" inputmode="numeric"
pattern="^(25[0-5]|2[0-4]\\d|[01]?\\d?\\d)\\.(25[0-5]|2[0-4]\\d|[01]?\\d?\\d)\\.(25[0-5]|2[0-4]\\d|[01]?\\d?\\d)\\.(25[0-5]|2[0-4]\\d|[01]?\\d?\\d)$" />
<label class="muted" style="display:none;align-items:center;gap:6px">
<input id="chkRelease" type="checkbox" checked/> 更换后释放旧静态IP
</label>
<button id="btnRotate" class="primary">换新静态IP自动区域</button>
</div>
</div>
<div class="space"></div>
<div class="card">
<div class="row" style="justify-content:space-between">
<div><b>更换记录</b></div>
<div>
<button id="btnReload">刷新记录</button>
<button id="btnClear" class="danger">清空记录</button>
</div>
</div>
<table id="tblLogs">
<thead>
<tr>
<th>时间</th><th>区域</th><th>实例名</th><th>实例ID(ARN)</th><th>旧 IP</th><th>新 IP</th><th>重试</th><th>状态</th><th>备注</th>
</tr>
</thead>
<tbody></tbody>
</table>
</div>
</div>
<!-- Loading Overlay -->
<div id="overlay" class="overlay" role="alert" aria-live="assertive" aria-busy="true">
<div class="loader">
<div class="spinner" aria-hidden="true"></div>
<div id="loaderText" class="mono">正在执行,请稍候…</div>
</div>
</div>
<script>
(function(){
const $ = s => document.querySelector(s);
const inpIp = $('#inpIp');
const chkRelease= $('#chkRelease');
const btnRotate = $('#btnRotate');
const btnReload = $('#btnReload');
const btnClear = $('#btnClear');
const tbody = $('#tblLogs tbody');
const overlay = $('#overlay');
const loaderTxt = $('#loaderText');
const IPv4RE = /^(25[0-5]|2[0-4]\d|[01]?\d?\d)\.(25[0-5]|2[0-4]\d|[01]?\d?\d)\.(25[0-5]|2[0-4]\d|[01]?\d?\d)\.(25[0-5]|2[0-4]\d|[01]?\d?\d)$/;
function showLoading(msg){
loaderTxt.textContent = msg || '正在执行,请稍候…';
overlay.classList.add('show');
btnRotate.disabled = true;
inpIp.disabled = true;
}
function hideLoading(){
overlay.classList.remove('show');
btnRotate.disabled = false;
inpIp.disabled = false;
}
async function api(path, opts={}){
const res = await fetch(path, { ...opts, headers: { 'Content-Type':'application/json', ...(opts.headers||{}) }});
if(!res.ok){
// 统一解析 FastAPI 的 detail
let txt = await res.text();
try {
const j = JSON.parse(txt);
if (j && j.detail) txt = (typeof j.detail === 'string') ? j.detail : JSON.stringify(j.detail);
} catch {}
throw new Error(txt);
}
return await res.json();
}
async function reloadLogs(){
const data = await api('/api/logs');
const rows = data.logs || [];
tbody.innerHTML = '';
rows.slice().reverse().forEach(r=>{
const tr = document.createElement('tr');
tr.innerHTML = `
<td>${r.time||''}</td>
<td>${r.region||''}</td>
<td>${r.instance_name||''}</td>
<td class="mono">${r.arn||r.instance_id||''}</td>
<td class="mono">${r.old_ip||''}</td>
<td class="mono">${r.new_ip||''}</td>
<td>${r.retry||0}</td>
<td>${r.status||''}</td>
<td style="max-width:460px;white-space:pre-wrap;word-break:break-all">${r.remark||''}</td>
`;
tbody.appendChild(tr);
});
}
btnRotate.onclick = async ()=>{
const ip = (inpIp.value||'').trim();
if(!ip) return alert('请输入当前静态IP 公网IP');
if(!IPv4RE.test(ip)) return alert('请输入有效的 IPv4 地址');
showLoading(`正在为 ${ip} 更换静态IP …`);
try{
await api('/api/rotate_by_ip', {
method: 'POST',
body: JSON.stringify({ current_ip: ip, release_old: true })
});
hideLoading();
alert('换新静态IP成功');
await reloadLogs();
}catch(e){
hideLoading();
alert('失败:' + e.message);
}
};
btnReload.onclick = reloadLogs;
btnClear.onclick = async ()=>{
if(!confirm('确定清空所有记录?')) return;
await api('/api/logs/clear', { method:'POST', body:'{}' });
await reloadLogs();
};
reloadLogs();
})();
</script>
</body>
</html>
""")
@app.get("/", response_class=HTMLResponse)
def index():
return HTMLResponse(INDEX_HTML.render())