# resource_group_service.py import time import json from aliyunsdkresourcemanager.request.v20200331.CreateResourceGroupRequest import CreateResourceGroupRequest from aliyunsdkresourcemanager.request.v20200331.ListResourceGroupsRequest import ListResourceGroupsRequest from aliyunsdkram.request.v20150501.CreatePolicyRequest import CreatePolicyRequest from aliyunsdkram.request.v20150501.AttachPolicyToUserRequest import AttachPolicyToUserRequest from aliyunsdkram.request.v20150501.ListPoliciesRequest import ListPoliciesRequest from utils.logger import log_action from services.ram_service import get_account_uid_by_user_name def create_resource_group(resource_name, client) -> str: try: req = CreateResourceGroupRequest() req.set_DisplayName(resource_name) req.set_Name(resource_name) client.do_action_with_exception(req) log_action(f"✅ 创建资源组: {resource_name}") except Exception as e: if "EntityAlreadyExists.ResourceGroup" in str(e): log_action(f"⚠️ 资源组已存在: {resource_name},跳过创建") else: raise RuntimeError(f"❌ 创建资源组失败: {e}") for attempt in range(15): try: page = 1 while True: req = ListResourceGroupsRequest() req.set_PageNumber(page) req.set_PageSize(50) res = client.do_action_with_exception(req) groups = json.loads(res)['ResourceGroups']['ResourceGroup'] for g in groups: if g['Name'] == resource_name: return g['Id'] if len(groups) < 50: break page += 1 except Exception: time.sleep(3) raise RuntimeError("❌ 无法获取资源组 ID") def create_custom_ecs_policy_for_group(resource_group_id: str, client) -> str: policy_name = f"ECSReadOnlyAccess-{resource_group_id[-6:]}" doc = { "Version": "1", "Statement": [ { "Effect": "Allow", "Action": ["ecs:DescribeInstances", "ecs:DescribeInstanceAttribute"], "Resource": "*", "Condition": { "StringEquals": {"acs:ResourceGroupId": resource_group_id} } } ] } try: req = CreatePolicyRequest() req.set_PolicyName(policy_name) req.set_Description("限制 ECS 只读策略") req.set_PolicyDocument(json.dumps(doc)) client.do_action_with_exception(req) log_action(f"✅ 创建策略 {policy_name}") except Exception as e: if "EntityAlreadyExists.Policy" in str(e): log_action(f"⚠️ 策略 {policy_name} 已存在") else: raise RuntimeError(f"❌ 创建策略失败: {e}") for _ in range(10): time.sleep(2) try: list_req = ListPoliciesRequest() list_req.set_PolicyType("Custom") res = client.do_action_with_exception(list_req) names = [p['PolicyName'] for p in json.loads(res)['Policies']['Policy']] if policy_name in names: return policy_name except Exception: continue raise RuntimeError("❌ 策略创建后未生效") def attach_policy_to_user(username, policy_name, client): log_action(f"🔐 正在绑定策略给用户: {username}") try: req = AttachPolicyToUserRequest() req.set_PolicyType("Custom") req.set_PolicyName(policy_name) req.set_UserName(username) # 直接用用户名绑定 client.do_action_with_exception(req) log_action(f"✅ 用户 {username} 成功绑定策略: {policy_name}") except Exception as e: if "EntityAlreadyExists.User.Policy" in str(e): log_action(f"⚠️ 用户 {username} 已绑定策略: {policy_name}") else: raise RuntimeError(f"❌ 策略绑定失败: {e}")