64 lines
2.5 KiB
Python
Raw Permalink Normal View History

2025-12-10 12:02:17 +08:00
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from ..db import get_session
from ..dependencies import AuthUser, get_current_user
from ..models import AuditAction, AuditLog, AuditResourceType, User
from ..schemas import LoginRequest, TokenResponse, UserOut
from .jwt_utils import create_access_token, verify_password
router = APIRouter(prefix="/api/v1/auth", tags=["auth"])
@router.post("/login", response_model=TokenResponse)
async def login(payload: LoginRequest, request: Request, session: AsyncSession = Depends(get_session)) -> TokenResponse:
user = await session.scalar(
select(User).where(User.username == payload.username).options(selectinload(User.role), selectinload(User.customer))
)
if not user or not verify_password(payload.password, user.password_hash):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
if not user.is_active:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User disabled")
user.last_login_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(user)
token_payload = {"sub": str(user.id), "role": user.role.name if user.role else "", "customer_id": user.customer_id}
access_token = create_access_token(token_payload)
audit = AuditLog(
user_id=user.id,
customer_id=user.customer_id,
action=AuditAction.LOGIN,
resource_type=AuditResourceType.USER,
resource_id=user.id,
description="User login success",
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("User-Agent"),
)
session.add(audit)
await session.commit()
return TokenResponse(access_token=access_token, user=UserOut.model_validate(user))
@router.get("/me", response_model=UserOut)
async def read_me(auth_user: AuthUser = Depends(get_current_user)) -> UserOut:
return UserOut.model_validate(auth_user.user)
@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(auth_user: AuthUser = Depends(get_current_user)) -> TokenResponse:
token_payload = {
"sub": str(auth_user.user.id),
"role": auth_user.role_name,
"customer_id": auth_user.customer_id,
}
access_token = create_access_token(token_payload)
return TokenResponse(access_token=access_token, user=UserOut.model_validate(auth_user.user))