tg-bot/backend/app/services/config_repo.py

164 lines
6.2 KiB
Python
Raw Permalink Normal View History

2025-12-04 09:52:39 +08:00
import json
from pathlib import Path
from typing import List, Optional
from uuid import uuid4
from .. import config
from ..schemas.config import (
CredentialCreate,
CredentialRecord,
CredentialUpdate,
)
class ConfigRepository:
"""File-based storage for credentials, keywords, and channels."""
def __init__(
self,
credentials_file: Path = config.CREDENTIALS_FILE,
keywords_file: Path = config.KEYWORDS_FILE,
channels_file: Path = config.CHANNELS_FILE,
) -> None:
self._credentials_file = credentials_file
self._keywords_file = keywords_file
self._channels_file = channels_file
self._ensure_credentials_file()
# Credential helpers -------------------------------------------------
def list_credentials(self) -> List[CredentialRecord]:
data = self._load_credentials_data()
active_id = data.get("active")
return [self._build_record(item, active_id) for item in data["items"]]
def get_credential(self, credential_id: str) -> Optional[CredentialRecord]:
data = self._load_credentials_data()
active_id = data.get("active")
for item in data["items"]:
if item["id"] == credential_id:
return self._build_record(item, active_id)
return None
def get_active_credentials(self) -> Optional[CredentialRecord]:
data = self._load_credentials_data()
active_id = data.get("active")
if not active_id:
return None
for item in data["items"]:
if item["id"] == active_id:
return self._build_record(item, active_id)
return None
def create_credentials(self, payload: CredentialCreate) -> CredentialRecord:
data = self._load_credentials_data()
record = payload.model_dump()
record["id"] = uuid4().hex
data["items"].append(record)
if not data.get("active"):
data["active"] = record["id"]
self._save_credentials_data(data)
return self._build_record(record, data["active"])
def update_credentials(
self, credential_id: str, payload: CredentialUpdate
) -> CredentialRecord:
data = self._load_credentials_data()
active_id = data.get("active")
body = payload.model_dump(exclude_none=True)
for item in data["items"]:
if item["id"] == credential_id:
item.update(body)
self._save_credentials_data(data)
return self._build_record(item, active_id)
raise ValueError("Credential not found")
def delete_credentials(self, credential_id: str) -> None:
data = self._load_credentials_data()
active_id = data.get("active")
new_items = [item for item in data["items"] if item["id"] != credential_id]
if len(new_items) == len(data["items"]):
raise ValueError("Credential not found")
data["items"] = new_items
if active_id == credential_id:
data["active"] = new_items[0]["id"] if new_items else None
self._save_credentials_data(data)
def activate_credentials(self, credential_id: str) -> CredentialRecord:
data = self._load_credentials_data()
for item in data["items"]:
if item["id"] == credential_id:
data["active"] = credential_id
self._save_credentials_data(data)
return self._build_record(item, credential_id)
raise ValueError("Credential not found")
# Legacy compatibility
def read_credentials(self) -> Optional[CredentialRecord]:
return self.get_active_credentials()
def read_keywords(self) -> List[str]:
return self._read_entries(self._keywords_file)
def write_keywords(self, entries: List[str]) -> List[str]:
self._write_entries(self._keywords_file, entries)
return entries
def read_channels(self) -> List[str]:
return self._read_entries(self._channels_file)
def write_channels(self, entries: List[str]) -> List[str]:
self._write_entries(self._channels_file, entries)
return entries
def _read_entries(self, file_path: Path) -> List[str]:
if not file_path.exists():
file_path.write_text("", encoding="utf-8")
return []
content = file_path.read_text(encoding="utf-8")
entries = [x.strip() for x in content.replace("\n", "").split(",") if x.strip()]
return entries
def _write_entries(self, file_path: Path, entries: List[str]) -> None:
normalized = ", ".join(entry.strip() for entry in entries if entry.strip())
file_path.write_text(normalized, encoding="utf-8")
def _ensure_credentials_file(self) -> None:
if not self._credentials_file.exists():
payload = {"active": None, "items": []}
self._credentials_file.write_text(
json.dumps(payload, indent=2), encoding="utf-8"
)
def _load_credentials_data(self) -> dict:
self._ensure_credentials_file()
raw = json.loads(self._credentials_file.read_text(encoding="utf-8"))
if "items" not in raw:
# migrate legacy single credential format
record_id = uuid4().hex
legacy_title = raw.get("title") or "Default Credential"
legacy_record = {
"id": record_id,
"title": legacy_title,
"api_id": raw.get("api_id"),
"api_hash": raw.get("api_hash"),
"phone": raw.get("phone"),
"username": raw.get("username"),
"bot_token": raw.get("bot_token"),
"channel_id": raw.get("channel_id"),
}
raw = {"active": record_id, "items": [legacy_record]}
self._save_credentials_data(raw)
raw.setdefault("active", None)
raw.setdefault("items", [])
return raw
def _save_credentials_data(self, data: dict) -> None:
self._credentials_file.write_text(
json.dumps(data, indent=2), encoding="utf-8"
)
def _build_record(self, item: dict, active_id: Optional[str]) -> CredentialRecord:
payload = item.copy()
payload["active"] = payload["id"] == active_id
return CredentialRecord(**payload)