aws-mt5/app.py
2026-01-05 11:23:41 +08:00

277 lines
8.5 KiB
Python

import os
from typing import Dict
from functools import wraps
from dotenv import load_dotenv
from flask import Flask, jsonify, render_template, request, redirect, url_for, session
from aws_service import (
AWSOperationError,
ConfigError,
AccountConfig,
load_account_configs,
replace_instance_ip,
)
from db import (
add_replacement_history,
get_account_by_ip,
get_replacement_history,
get_history_by_ip_or_group,
get_history_chains,
get_server_spec,
init_db,
load_disallowed_ips,
list_account_mappings,
update_ip_account_mapping,
upsert_server_spec,
upsert_account_mapping,
delete_account_mapping,
)
load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv("SECRET_KEY", "please-change-me")
APP_USER = os.getenv("APP_USER", "")
APP_PASSWORD = os.getenv("APP_PASSWORD", "")
ZHIYUN_PASS = os.getenv("ZHIYUN_PASS", "")
def login_required(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if not session.get("authed"):
return redirect(url_for("login", next=request.path))
return fn(*args, **kwargs)
return wrapper
def zhiyun_required(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if not session.get("zhiyun_authed"):
return jsonify({"error": "未通过校验密码"}), 403
return fn(*args, **kwargs)
return wrapper
def load_configs() -> Dict[str, AccountConfig]:
config_path = os.getenv("AWS_CONFIG_PATH", "config/accounts.yaml")
return load_account_configs(config_path)
try:
account_configs = load_configs()
init_error = ""
except ConfigError as exc:
account_configs = {}
init_error = str(exc)
retry_limit = int(os.getenv("IP_RETRY_LIMIT", "5"))
try:
init_db()
db_error = ""
except Exception as exc: # noqa: BLE001 - surface DB connection issues to UI
db_error = f"数据库初始化失败: {exc}"
@app.route("/", methods=["GET"])
@login_required
def index():
if init_error or db_error:
return render_template("index.html", accounts=[], init_error=init_error or db_error)
return render_template("index.html", accounts=account_configs.values(), init_error="")
@app.route("/login", methods=["GET", "POST"])
def login():
if session.get("authed"):
return redirect(url_for("index"))
error = ""
next_url = request.args.get("next", "/")
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "").strip()
if username == APP_USER and password == APP_PASSWORD:
session["authed"] = True
return redirect(next_url or url_for("index"))
error = "用户名或密码错误"
return render_template("login.html", error=error, next_url=next_url)
@app.route("/logout", methods=["POST"])
def logout():
session.clear()
return redirect(url_for("login"))
@app.route("/mapping/auth", methods=["POST"])
@login_required
def mapping_auth():
pwd = request.form.get("password", "")
if pwd and pwd == ZHIYUN_PASS:
session["zhiyun_authed"] = True
return jsonify({"ok": True})
session.pop("zhiyun_authed", None)
return jsonify({"error": "密码错误"}), 403
@app.route("/mapping_page", methods=["GET"])
@login_required
def mapping_page():
return render_template("mapping.html")
@app.route("/mapping/list", methods=["GET"])
@login_required
@zhiyun_required
def mapping_list():
try:
data = list_account_mappings()
except Exception as exc: # noqa: BLE001
return jsonify({"error": f"读取失败: {exc}"}), 500
return jsonify({"items": data})
@app.route("/mapping/upsert", methods=["POST"])
@login_required
@zhiyun_required
def mapping_upsert():
ip = request.form.get("ip", "").strip()
account = request.form.get("account", "").strip()
if not ip or not account:
return jsonify({"error": "IP 和账户名不能为空"}), 400
try:
upsert_account_mapping(ip, account)
except Exception as exc: # noqa: BLE001
return jsonify({"error": f"保存失败: {exc}"}), 500
return jsonify({"ok": True})
@app.route("/mapping/delete", methods=["POST"])
@login_required
@zhiyun_required
def mapping_delete():
ip = request.form.get("ip", "").strip()
if not ip:
return jsonify({"error": "IP 不能为空"}), 400
try:
delete_account_mapping(ip)
except Exception as exc: # noqa: BLE001
return jsonify({"error": f"删除失败: {exc}"}), 500
return jsonify({"ok": True})
@app.route("/replace_ip", methods=["POST"])
@login_required
def replace_ip():
if init_error or db_error:
return jsonify({"error": init_error or db_error}), 500
ip_to_replace = request.form.get("ip_to_replace", "").strip()
if not ip_to_replace:
return jsonify({"error": "请输入要替换的IP"}), 400
account_name = get_account_by_ip(ip_to_replace)
if not account_name:
return jsonify({"error": "数据库中未找到该IP对应的账户映射"}), 400
if account_name not in account_configs:
return jsonify({"error": f"账户 {account_name} 未在配置文件中定义"}), 400
disallowed = load_disallowed_ips()
fallback_spec = get_server_spec(ip_to_replace)
account = account_configs[account_name]
try:
result = replace_instance_ip(
ip_to_replace, account, disallowed, retry_limit, fallback_spec=fallback_spec
)
spec_used = result.get("spec_used", {}) if isinstance(result, dict) else {}
# 记录当前 IP 的规格(输入 IP、数据库规格、或从 AWS 读到的规格)
upsert_server_spec(
ip_address=ip_to_replace,
account_name=account_name,
instance_type=spec_used.get("instance_type"),
instance_name=spec_used.get("instance_name"),
volume_type=spec_used.get("root_volume_type"),
security_group_names=spec_used.get("security_group_names", []),
security_group_ids=spec_used.get("security_group_ids", []),
region=spec_used.get("region"),
subnet_id=spec_used.get("subnet_id"),
availability_zone=spec_used.get("availability_zone"),
)
# 新 IP 同步规格
upsert_server_spec(
ip_address=result["new_ip"],
account_name=account_name,
instance_type=spec_used.get("instance_type"),
instance_name=spec_used.get("instance_name"),
volume_type=spec_used.get("root_volume_type"),
security_group_names=spec_used.get("security_group_names", []),
security_group_ids=spec_used.get("security_group_ids", []),
region=spec_used.get("region"),
subnet_id=spec_used.get("subnet_id"),
availability_zone=spec_used.get("availability_zone"),
)
update_ip_account_mapping(ip_to_replace, result["new_ip"], account_name)
add_replacement_history(
ip_to_replace,
result["new_ip"],
account_name,
None,
terminated_network_out_mb=result.get("terminated_network_out_mb"),
)
except (AWSOperationError, ValueError) as exc:
return jsonify({"error": str(exc)}), 400
return jsonify(result), 200
@app.route("/history", methods=["GET"])
@login_required
def history():
try:
records = get_replacement_history(limit=100)
except Exception as exc: # noqa: BLE001
return jsonify({"error": f"读取历史失败: {exc}"}), 500
return jsonify({"items": records})
@app.route("/history/search", methods=["GET"])
@login_required
def history_search():
ip = request.args.get("ip", "").strip() or None
group_id = request.args.get("group", "").strip() or None
try:
records = get_history_by_ip_or_group(ip, group_id, limit=200)
except Exception as exc: # noqa: BLE001
return jsonify({"error": f"读取历史失败: {exc}"}), 500
return jsonify({"items": records})
@app.route("/history_page", methods=["GET"])
@login_required
def history_page():
return render_template("history.html")
@app.route("/history/chains", methods=["GET"])
@login_required
def history_chains():
ip = request.args.get("ip", "").strip() or None
group_id = request.args.get("group", "").strip() or None
try:
records = get_history_chains(ip=ip, group_id=group_id, limit=500)
except Exception as exc: # noqa: BLE001
return jsonify({"error": f"读取历史失败: {exc}"}), 500
return jsonify({"items": records})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)