import json from pathlib import Path from typing import List, Optional from uuid import uuid4 from .. import config from ..schemas.config import ( CredentialCreate, CredentialRecord, CredentialUpdate, ) class ConfigRepository: """File-based storage for credentials, keywords, and channels.""" def __init__( self, credentials_file: Path = config.CREDENTIALS_FILE, keywords_file: Path = config.KEYWORDS_FILE, channels_file: Path = config.CHANNELS_FILE, ) -> None: self._credentials_file = credentials_file self._keywords_file = keywords_file self._channels_file = channels_file self._ensure_credentials_file() # Credential helpers ------------------------------------------------- def list_credentials(self) -> List[CredentialRecord]: data = self._load_credentials_data() active_id = data.get("active") return [self._build_record(item, active_id) for item in data["items"]] def get_credential(self, credential_id: str) -> Optional[CredentialRecord]: data = self._load_credentials_data() active_id = data.get("active") for item in data["items"]: if item["id"] == credential_id: return self._build_record(item, active_id) return None def get_active_credentials(self) -> Optional[CredentialRecord]: data = self._load_credentials_data() active_id = data.get("active") if not active_id: return None for item in data["items"]: if item["id"] == active_id: return self._build_record(item, active_id) return None def create_credentials(self, payload: CredentialCreate) -> CredentialRecord: data = self._load_credentials_data() record = payload.model_dump() record["id"] = uuid4().hex data["items"].append(record) if not data.get("active"): data["active"] = record["id"] self._save_credentials_data(data) return self._build_record(record, data["active"]) def update_credentials( self, credential_id: str, payload: CredentialUpdate ) -> CredentialRecord: data = self._load_credentials_data() active_id = data.get("active") body = payload.model_dump(exclude_none=True) for item in data["items"]: if item["id"] == credential_id: item.update(body) self._save_credentials_data(data) return self._build_record(item, active_id) raise ValueError("Credential not found") def delete_credentials(self, credential_id: str) -> None: data = self._load_credentials_data() active_id = data.get("active") new_items = [item for item in data["items"] if item["id"] != credential_id] if len(new_items) == len(data["items"]): raise ValueError("Credential not found") data["items"] = new_items if active_id == credential_id: data["active"] = new_items[0]["id"] if new_items else None self._save_credentials_data(data) def activate_credentials(self, credential_id: str) -> CredentialRecord: data = self._load_credentials_data() for item in data["items"]: if item["id"] == credential_id: data["active"] = credential_id self._save_credentials_data(data) return self._build_record(item, credential_id) raise ValueError("Credential not found") # Legacy compatibility def read_credentials(self) -> Optional[CredentialRecord]: return self.get_active_credentials() def read_keywords(self) -> List[str]: return self._read_entries(self._keywords_file) def write_keywords(self, entries: List[str]) -> List[str]: self._write_entries(self._keywords_file, entries) return entries def read_channels(self) -> List[str]: return self._read_entries(self._channels_file) def write_channels(self, entries: List[str]) -> List[str]: self._write_entries(self._channels_file, entries) return entries def _read_entries(self, file_path: Path) -> List[str]: if not file_path.exists(): file_path.write_text("", encoding="utf-8") return [] content = file_path.read_text(encoding="utf-8") entries = [x.strip() for x in content.replace("\n", "").split(",") if x.strip()] return entries def _write_entries(self, file_path: Path, entries: List[str]) -> None: normalized = ", ".join(entry.strip() for entry in entries if entry.strip()) file_path.write_text(normalized, encoding="utf-8") def _ensure_credentials_file(self) -> None: if not self._credentials_file.exists(): payload = {"active": None, "items": []} self._credentials_file.write_text( json.dumps(payload, indent=2), encoding="utf-8" ) def _load_credentials_data(self) -> dict: self._ensure_credentials_file() raw = json.loads(self._credentials_file.read_text(encoding="utf-8")) if "items" not in raw: # migrate legacy single credential format record_id = uuid4().hex legacy_title = raw.get("title") or "Default Credential" legacy_record = { "id": record_id, "title": legacy_title, "api_id": raw.get("api_id"), "api_hash": raw.get("api_hash"), "phone": raw.get("phone"), "username": raw.get("username"), "bot_token": raw.get("bot_token"), "channel_id": raw.get("channel_id"), } raw = {"active": record_id, "items": [legacy_record]} self._save_credentials_data(raw) raw.setdefault("active", None) raw.setdefault("items", []) return raw def _save_credentials_data(self, data: dict) -> None: self._credentials_file.write_text( json.dumps(data, indent=2), encoding="utf-8" ) def _build_record(self, item: dict, active_id: Optional[str]) -> CredentialRecord: payload = item.copy() payload["active"] = payload["id"] == active_id return CredentialRecord(**payload)