2025-12-10 12:02:17 +08:00

118 lines
4.0 KiB
Python

from typing import List
from fastapi import HTTPException, status
from sqlalchemy import and_, select
from sqlalchemy.ext.asyncio import AsyncSession
from backend.modules.audit.models import AuditAction, AuditLog, AuditResourceType
from backend.modules.aws_accounts.models import AWSCredential, CustomerCredential
from backend.modules.users.models import User
async def list_credentials(session: AsyncSession, customer_id: int | None = None) -> List[AWSCredential]:
query = select(AWSCredential)
if customer_id:
query = (
query.join(CustomerCredential, CustomerCredential.credential_id == AWSCredential.id)
.where(CustomerCredential.customer_id == customer_id)
.where(CustomerCredential.is_allowed == 1)
)
return (await session.scalars(query)).all()
async def create_credential(session: AsyncSession, data: dict, actor: User) -> AWSCredential:
cred = AWSCredential(**data)
session.add(cred)
await session.commit()
await session.refresh(cred)
session.add(
AuditLog(
user_id=actor.id,
customer_id=actor.customer_id,
action=AuditAction.CREDENTIAL_CREATE,
resource_type=AuditResourceType.AWS_CREDENTIAL,
resource_id=cred.id,
description=f"Create credential {cred.name}",
)
)
await session.commit()
return cred
async def get_credential(session: AsyncSession, credential_id: int) -> AWSCredential:
cred = await session.get(AWSCredential, credential_id)
if not cred:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Credential not found")
return cred
async def update_credential(session: AsyncSession, credential_id: int, data: dict, actor: User) -> AWSCredential:
cred = await get_credential(session, credential_id)
for field, value in data.items():
setattr(cred, field, value)
await session.commit()
await session.refresh(cred)
session.add(
AuditLog(
user_id=actor.id,
customer_id=actor.customer_id,
action=AuditAction.CREDENTIAL_UPDATE,
resource_type=AuditResourceType.AWS_CREDENTIAL,
resource_id=cred.id,
description=f"Update credential {cred.name}",
payload=data,
)
)
await session.commit()
return cred
async def delete_credential(session: AsyncSession, credential_id: int, actor: User) -> None:
cred = await session.get(AWSCredential, credential_id)
if not cred:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Credential not found")
await session.delete(cred)
session.add(
AuditLog(
user_id=actor.id,
customer_id=actor.customer_id,
action=AuditAction.CREDENTIAL_DELETE,
resource_type=AuditResourceType.AWS_CREDENTIAL,
resource_id=credential_id,
description=f"Delete credential {credential_id}",
)
)
await session.commit()
async def authorize_customer(
session: AsyncSession, customer_id: int, credential_id: int, is_allowed: int, actor: User
) -> CustomerCredential:
mapping = await session.scalar(
select(CustomerCredential).where(
and_(
CustomerCredential.customer_id == customer_id,
CustomerCredential.credential_id == credential_id,
)
)
)
if mapping:
mapping.is_allowed = is_allowed
else:
mapping = CustomerCredential(customer_id=customer_id, credential_id=credential_id, is_allowed=is_allowed)
session.add(mapping)
await session.commit()
await session.refresh(mapping)
session.add(
AuditLog(
user_id=actor.id,
customer_id=customer_id,
action=AuditAction.CREDENTIAL_UPDATE,
resource_type=AuditResourceType.AWS_CREDENTIAL,
resource_id=credential_id,
description=f"Authorize credential {credential_id} to customer {customer_id}",
)
)
await session.commit()
return mapping