# app/services/authentication.py from app.db.errors import EntityDoesNotExist from app.db.repositories.users import UsersRepository async def check_username_is_taken(repo: UsersRepository, username: str) -> bool: """ 返回 True 表示用户名已被占用 """ try: await repo.get_user_by_username(username=username) except EntityDoesNotExist: return False return True async def check_email_is_taken(repo: UsersRepository, email: str) -> bool: """ 返回 True 表示邮箱已被占用 """ try: await repo.get_user_by_email(email=email) except EntityDoesNotExist: return False return True def assert_passwords_match(password: str, confirm_password: str) -> None: """ 两次密码一致性校验,不一致抛 ValueError """ if password != confirm_password: raise ValueError("Passwords do not match") async def make_unique_username(repo: UsersRepository, email: str) -> str: """ 由邮箱前缀自动生成唯一 username,例如: - 先用 local-part: foo - 若被占用,则 foo1、foo2…直到找到可用 """ base = (email.split("@", 1)[0] or "user").strip().lower() # 兜底:避免空前缀 if not base: base = "user" candidate = base suffix = 0 while await check_username_is_taken(repo, candidate): suffix += 1 candidate = f"{base}{suffix}" return candidate