53 lines
1.4 KiB
Python
53 lines
1.4 KiB
Python
|
|
# app/services/authentication.py
|
|||
|
|
from app.db.errors import EntityDoesNotExist
|
|||
|
|
from app.db.repositories.users import UsersRepository
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def check_username_is_taken(repo: UsersRepository, username: str) -> bool:
|
|||
|
|
"""
|
|||
|
|
返回 True 表示用户名已被占用
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
await repo.get_user_by_username(username=username)
|
|||
|
|
except EntityDoesNotExist:
|
|||
|
|
return False
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def check_email_is_taken(repo: UsersRepository, email: str) -> bool:
|
|||
|
|
"""
|
|||
|
|
返回 True 表示邮箱已被占用
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
await repo.get_user_by_email(email=email)
|
|||
|
|
except EntityDoesNotExist:
|
|||
|
|
return False
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
|
|||
|
|
def assert_passwords_match(password: str, confirm_password: str) -> None:
|
|||
|
|
"""
|
|||
|
|
两次密码一致性校验,不一致抛 ValueError
|
|||
|
|
"""
|
|||
|
|
if password != confirm_password:
|
|||
|
|
raise ValueError("Passwords do not match")
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def make_unique_username(repo: UsersRepository, email: str) -> str:
|
|||
|
|
"""
|
|||
|
|
由邮箱前缀自动生成唯一 username,例如:
|
|||
|
|
- 先用 local-part: foo
|
|||
|
|
- 若被占用,则 foo1、foo2…直到找到可用
|
|||
|
|
"""
|
|||
|
|
base = (email.split("@", 1)[0] or "user").strip().lower()
|
|||
|
|
# 兜底:避免空前缀
|
|||
|
|
if not base:
|
|||
|
|
base = "user"
|
|||
|
|
|
|||
|
|
candidate = base
|
|||
|
|
suffix = 0
|
|||
|
|
while await check_username_is_taken(repo, candidate):
|
|||
|
|
suffix += 1
|
|||
|
|
candidate = f"{base}{suffix}"
|
|||
|
|
return candidate
|