99 lines
3.6 KiB
Python
Raw Normal View History

2025-12-04 10:09:04 +08:00
import json
from aliyunsdkram.request.v20150501.CreateUserRequest import CreateUserRequest
from aliyunsdkram.request.v20150501.CreateLoginProfileRequest import CreateLoginProfileRequest
from aliyunsdkram.request.v20150501.GetUserRequest import GetUserRequest
from utils.generators import generate_password
from utils.logger import log_action
CREDENTIALS_FILE = "ram_credentials.json"
def create_ram_user(resource_name: str, region: str, client):
log_action(f"🧪 创建 RAM 用户流程启动: {resource_name}")
print(f"🧪 [RAM] 创建用户流程启动: {resource_name}")
try:
with open(CREDENTIALS_FILE, "r", encoding="utf-8") as f:
credentials = json.load(f)
print("📖 成功读取本地凭证")
except:
credentials = {}
print("⚠️ 未读取到本地凭证文件,初始化为空")
if resource_name in credentials:
log_action(f"⚠️ RAM 用户已存在: {resource_name},从本地读取密码")
print(f"⚠️ [RAM] 用户已存在于本地缓存: {resource_name}")
return {"user": resource_name, "password": credentials[resource_name]}
password = generate_password()
print(f"🔐 生成密码成功: {password}")
try:
print("📨 创建 RAM 用户请求发送中...")
req = CreateUserRequest()
req.set_UserName(resource_name)
req.set_DisplayName(resource_name)
client.do_action_with_exception(req)
log_action(f"✅ 创建 RAM 用户成功: {resource_name}")
print(f"✅ [RAM] 创建成功: {resource_name}")
except Exception as e:
print("❌ 创建 RAM 用户失败")
import traceback
traceback.print_exc()
if "EntityAlreadyExists.User" in str(e):
raise RuntimeError(f"用户已存在但本地无密码记录:{resource_name}")
raise
try:
print("🔧 设置 RAM 登录配置中...")
login_req = CreateLoginProfileRequest()
login_req.set_UserName(resource_name)
login_req.set_Password(password)
login_req.set_PasswordResetRequired(False)
login_req.set_MFABindRequired(False)
client.do_action_with_exception(login_req)
log_action("✅ 设置 RAM 登录密码成功")
print("✅ [RAM] 登录密码配置成功")
except Exception as e:
print("❌ 设置 RAM 密码失败")
import traceback
traceback.print_exc()
raise RuntimeError(f"❌ 设置 RAM 密码失败: {e}")
credentials[resource_name] = password
with open(CREDENTIALS_FILE, "w", encoding="utf-8") as f:
json.dump(credentials, f, indent=2, ensure_ascii=False)
print("💾 已将密码写入本地凭证文件")
return {"user": resource_name, "password": password}
def get_account_uid_by_user_name(client, user_name: str):
from utils.logger import log_action
try:
req = GetUserRequest()
req.set_UserName(user_name)
res = json.loads(client.do_action_with_exception(req))
log_action(f"📎 GetUser 返回: {res}")
# 优先尝试 Arn
arn = res.get("User", {}).get("Arn")
if arn and ":" in arn:
uid = arn.split(":")[4]
log_action(f"✅ UID 来自 ARN: {uid}")
return uid
# 使用 UserId 作为替代
user_id = res.get("User", {}).get("UserId")
if user_id:
log_action(f"✅ UID 来自 UserId: {user_id}")
return user_id
raise ValueError("❌ 无法从 GetUser 返回中提取 UID")
except Exception as e:
import traceback
traceback.print_exc()
raise RuntimeError(f"❌ 获取账号 UID 失败: {e}")