AWS-Panel/backend/routers/aws_credentials.py

160 lines
5.6 KiB
Python
Raw Permalink Normal View History

2025-12-10 12:02:17 +08:00
from typing import List
from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlalchemy import and_, select
from sqlalchemy.ext.asyncio import AsyncSession
from ..db import get_session
from ..dependencies import AuthUser, require_admin, require_roles
from ..models import AuditAction, AuditResourceType, AWSCredential, CustomerCredential, RoleName
from ..schemas import (
AWSCredentialCreate,
AWSCredentialOut,
AWSCredentialUpdate,
CustomerCredentialCreate,
CustomerCredentialOut,
)
from ..utils.audit import create_audit_log
router = APIRouter(prefix="/api/v1/aws_credentials", tags=["aws_credentials"])
@router.get("", response_model=List[AWSCredentialOut])
async def list_credentials(
session: AsyncSession = Depends(get_session),
auth_user: AuthUser = Depends(require_roles([RoleName.ADMIN, RoleName.CUSTOMER_ADMIN, RoleName.CUSTOMER_USER])),
) -> List[AWSCredentialOut]:
query = select(AWSCredential)
if auth_user.role_name != RoleName.ADMIN.value:
query = (
query.join(CustomerCredential, CustomerCredential.credential_id == AWSCredential.id)
.where(CustomerCredential.customer_id == auth_user.customer_id)
.where(CustomerCredential.is_allowed == 1)
)
credentials = (await session.scalars(query)).all()
return [AWSCredentialOut.model_validate(c) for c in credentials]
@router.post("", response_model=AWSCredentialOut, status_code=status.HTTP_201_CREATED)
async def create_credential(
payload: AWSCredentialCreate,
request: Request,
session: AsyncSession = Depends(get_session),
auth_user: AuthUser = Depends(require_admin),
) -> AWSCredentialOut:
cred = AWSCredential(**payload.model_dump())
session.add(cred)
await session.commit()
await session.refresh(cred)
await create_audit_log(
session,
user_id=auth_user.user.id,
customer_id=None,
action=AuditAction.CREDENTIAL_CREATE,
resource_type=AuditResourceType.AWS_CREDENTIAL,
resource_id=cred.id,
description=f"Create credential {cred.name}",
request=request,
)
await session.commit()
return AWSCredentialOut.model_validate(cred)
@router.put("/{credential_id}", response_model=AWSCredentialOut)
async def update_credential(
credential_id: int,
payload: AWSCredentialUpdate,
request: Request,
session: AsyncSession = Depends(get_session),
auth_user: AuthUser = Depends(require_admin),
) -> AWSCredentialOut:
cred = await session.get(AWSCredential, credential_id)
if not cred:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Credential not found")
update_data = payload.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(cred, field, value)
await session.commit()
await session.refresh(cred)
await create_audit_log(
session,
user_id=auth_user.user.id,
customer_id=None,
action=AuditAction.CREDENTIAL_UPDATE,
resource_type=AuditResourceType.AWS_CREDENTIAL,
resource_id=cred.id,
description=f"Update credential {cred.name}",
payload=update_data,
request=request,
)
await session.commit()
return AWSCredentialOut.model_validate(cred)
@router.delete("/{credential_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_credential(
credential_id: int,
request: Request,
session: AsyncSession = Depends(get_session),
auth_user: AuthUser = Depends(require_admin),
) -> None:
cred = await session.get(AWSCredential, credential_id)
if not cred:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Credential not found")
await session.delete(cred)
await create_audit_log(
session,
user_id=auth_user.user.id,
customer_id=None,
action=AuditAction.CREDENTIAL_DELETE,
resource_type=AuditResourceType.AWS_CREDENTIAL,
resource_id=credential_id,
description=f"Delete credential {credential_id}",
request=request,
)
await session.commit()
@router.post("/{credential_id}/authorize", response_model=CustomerCredentialOut)
async def authorize_credential(
credential_id: int,
payload: CustomerCredentialCreate,
request: Request,
session: AsyncSession = Depends(get_session),
auth_user: AuthUser = Depends(require_admin),
) -> CustomerCredentialOut:
cred = await session.get(AWSCredential, credential_id)
if not cred:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Credential not found")
mapping = await session.scalar(
select(CustomerCredential).where(
and_(
CustomerCredential.customer_id == payload.customer_id,
CustomerCredential.credential_id == credential_id,
)
)
)
if mapping:
mapping.is_allowed = payload.is_allowed
else:
mapping = CustomerCredential(
customer_id=payload.customer_id, credential_id=credential_id, is_allowed=payload.is_allowed
)
session.add(mapping)
await session.commit()
await session.refresh(mapping)
await create_audit_log(
session,
user_id=auth_user.user.id,
customer_id=payload.customer_id,
action=AuditAction.CREDENTIAL_UPDATE,
resource_type=AuditResourceType.AWS_CREDENTIAL,
resource_id=credential_id,
description=f"Authorize credential {credential_id} to customer {payload.customer_id}",
payload=payload.model_dump(),
request=request,
)
await session.commit()
return CustomerCredentialOut.model_validate(mapping)