99 lines
3.6 KiB
Python
99 lines
3.6 KiB
Python
import json
|
|
from aliyunsdkram.request.v20150501.CreateUserRequest import CreateUserRequest
|
|
from aliyunsdkram.request.v20150501.CreateLoginProfileRequest import CreateLoginProfileRequest
|
|
from aliyunsdkram.request.v20150501.GetUserRequest import GetUserRequest
|
|
|
|
from utils.generators import generate_password
|
|
from utils.logger import log_action
|
|
|
|
CREDENTIALS_FILE = "ram_credentials.json"
|
|
|
|
|
|
def create_ram_user(resource_name: str, region: str, client):
|
|
log_action(f"🧪 创建 RAM 用户流程启动: {resource_name}")
|
|
print(f"🧪 [RAM] 创建用户流程启动: {resource_name}")
|
|
|
|
try:
|
|
with open(CREDENTIALS_FILE, "r", encoding="utf-8") as f:
|
|
credentials = json.load(f)
|
|
print("📖 成功读取本地凭证")
|
|
except:
|
|
credentials = {}
|
|
print("⚠️ 未读取到本地凭证文件,初始化为空")
|
|
|
|
if resource_name in credentials:
|
|
log_action(f"⚠️ RAM 用户已存在: {resource_name},从本地读取密码")
|
|
print(f"⚠️ [RAM] 用户已存在于本地缓存: {resource_name}")
|
|
return {"user": resource_name, "password": credentials[resource_name]}
|
|
|
|
password = generate_password()
|
|
print(f"🔐 生成密码成功: {password}")
|
|
|
|
try:
|
|
print("📨 创建 RAM 用户请求发送中...")
|
|
req = CreateUserRequest()
|
|
req.set_UserName(resource_name)
|
|
req.set_DisplayName(resource_name)
|
|
client.do_action_with_exception(req)
|
|
log_action(f"✅ 创建 RAM 用户成功: {resource_name}")
|
|
print(f"✅ [RAM] 创建成功: {resource_name}")
|
|
except Exception as e:
|
|
print("❌ 创建 RAM 用户失败")
|
|
import traceback
|
|
traceback.print_exc()
|
|
if "EntityAlreadyExists.User" in str(e):
|
|
raise RuntimeError(f"用户已存在但本地无密码记录:{resource_name}")
|
|
raise
|
|
|
|
try:
|
|
print("🔧 设置 RAM 登录配置中...")
|
|
login_req = CreateLoginProfileRequest()
|
|
login_req.set_UserName(resource_name)
|
|
login_req.set_Password(password)
|
|
login_req.set_PasswordResetRequired(False)
|
|
login_req.set_MFABindRequired(False)
|
|
client.do_action_with_exception(login_req)
|
|
log_action("✅ 设置 RAM 登录密码成功")
|
|
print("✅ [RAM] 登录密码配置成功")
|
|
except Exception as e:
|
|
print("❌ 设置 RAM 密码失败")
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise RuntimeError(f"❌ 设置 RAM 密码失败: {e}")
|
|
|
|
credentials[resource_name] = password
|
|
with open(CREDENTIALS_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(credentials, f, indent=2, ensure_ascii=False)
|
|
print("💾 已将密码写入本地凭证文件")
|
|
|
|
return {"user": resource_name, "password": password}
|
|
|
|
|
|
def get_account_uid_by_user_name(client, user_name: str):
|
|
from utils.logger import log_action
|
|
try:
|
|
req = GetUserRequest()
|
|
req.set_UserName(user_name)
|
|
res = json.loads(client.do_action_with_exception(req))
|
|
log_action(f"📎 GetUser 返回: {res}")
|
|
|
|
# 优先尝试 Arn
|
|
arn = res.get("User", {}).get("Arn")
|
|
if arn and ":" in arn:
|
|
uid = arn.split(":")[4]
|
|
log_action(f"✅ UID 来自 ARN: {uid}")
|
|
return uid
|
|
|
|
# 使用 UserId 作为替代
|
|
user_id = res.get("User", {}).get("UserId")
|
|
if user_id:
|
|
log_action(f"✅ UID 来自 UserId: {user_id}")
|
|
return user_id
|
|
|
|
raise ValueError("❌ 无法从 GetUser 返回中提取 UID")
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise RuntimeError(f"❌ 获取账号 UID 失败: {e}")
|