164 lines
6.2 KiB
Python
164 lines
6.2 KiB
Python
import json
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
from uuid import uuid4
|
|
|
|
from .. import config
|
|
from ..schemas.config import (
|
|
CredentialCreate,
|
|
CredentialRecord,
|
|
CredentialUpdate,
|
|
)
|
|
|
|
|
|
class ConfigRepository:
|
|
"""File-based storage for credentials, keywords, and channels."""
|
|
|
|
def __init__(
|
|
self,
|
|
credentials_file: Path = config.CREDENTIALS_FILE,
|
|
keywords_file: Path = config.KEYWORDS_FILE,
|
|
channels_file: Path = config.CHANNELS_FILE,
|
|
) -> None:
|
|
self._credentials_file = credentials_file
|
|
self._keywords_file = keywords_file
|
|
self._channels_file = channels_file
|
|
self._ensure_credentials_file()
|
|
|
|
# Credential helpers -------------------------------------------------
|
|
def list_credentials(self) -> List[CredentialRecord]:
|
|
data = self._load_credentials_data()
|
|
active_id = data.get("active")
|
|
return [self._build_record(item, active_id) for item in data["items"]]
|
|
|
|
def get_credential(self, credential_id: str) -> Optional[CredentialRecord]:
|
|
data = self._load_credentials_data()
|
|
active_id = data.get("active")
|
|
for item in data["items"]:
|
|
if item["id"] == credential_id:
|
|
return self._build_record(item, active_id)
|
|
return None
|
|
|
|
def get_active_credentials(self) -> Optional[CredentialRecord]:
|
|
data = self._load_credentials_data()
|
|
active_id = data.get("active")
|
|
if not active_id:
|
|
return None
|
|
for item in data["items"]:
|
|
if item["id"] == active_id:
|
|
return self._build_record(item, active_id)
|
|
return None
|
|
|
|
def create_credentials(self, payload: CredentialCreate) -> CredentialRecord:
|
|
data = self._load_credentials_data()
|
|
record = payload.model_dump()
|
|
record["id"] = uuid4().hex
|
|
data["items"].append(record)
|
|
if not data.get("active"):
|
|
data["active"] = record["id"]
|
|
self._save_credentials_data(data)
|
|
return self._build_record(record, data["active"])
|
|
|
|
def update_credentials(
|
|
self, credential_id: str, payload: CredentialUpdate
|
|
) -> CredentialRecord:
|
|
data = self._load_credentials_data()
|
|
active_id = data.get("active")
|
|
body = payload.model_dump(exclude_none=True)
|
|
for item in data["items"]:
|
|
if item["id"] == credential_id:
|
|
item.update(body)
|
|
self._save_credentials_data(data)
|
|
return self._build_record(item, active_id)
|
|
raise ValueError("Credential not found")
|
|
|
|
def delete_credentials(self, credential_id: str) -> None:
|
|
data = self._load_credentials_data()
|
|
active_id = data.get("active")
|
|
new_items = [item for item in data["items"] if item["id"] != credential_id]
|
|
if len(new_items) == len(data["items"]):
|
|
raise ValueError("Credential not found")
|
|
data["items"] = new_items
|
|
if active_id == credential_id:
|
|
data["active"] = new_items[0]["id"] if new_items else None
|
|
self._save_credentials_data(data)
|
|
|
|
def activate_credentials(self, credential_id: str) -> CredentialRecord:
|
|
data = self._load_credentials_data()
|
|
for item in data["items"]:
|
|
if item["id"] == credential_id:
|
|
data["active"] = credential_id
|
|
self._save_credentials_data(data)
|
|
return self._build_record(item, credential_id)
|
|
raise ValueError("Credential not found")
|
|
|
|
# Legacy compatibility
|
|
def read_credentials(self) -> Optional[CredentialRecord]:
|
|
return self.get_active_credentials()
|
|
|
|
def read_keywords(self) -> List[str]:
|
|
return self._read_entries(self._keywords_file)
|
|
|
|
def write_keywords(self, entries: List[str]) -> List[str]:
|
|
self._write_entries(self._keywords_file, entries)
|
|
return entries
|
|
|
|
def read_channels(self) -> List[str]:
|
|
return self._read_entries(self._channels_file)
|
|
|
|
def write_channels(self, entries: List[str]) -> List[str]:
|
|
self._write_entries(self._channels_file, entries)
|
|
return entries
|
|
|
|
def _read_entries(self, file_path: Path) -> List[str]:
|
|
if not file_path.exists():
|
|
file_path.write_text("", encoding="utf-8")
|
|
return []
|
|
content = file_path.read_text(encoding="utf-8")
|
|
entries = [x.strip() for x in content.replace("\n", "").split(",") if x.strip()]
|
|
return entries
|
|
|
|
def _write_entries(self, file_path: Path, entries: List[str]) -> None:
|
|
normalized = ", ".join(entry.strip() for entry in entries if entry.strip())
|
|
file_path.write_text(normalized, encoding="utf-8")
|
|
|
|
def _ensure_credentials_file(self) -> None:
|
|
if not self._credentials_file.exists():
|
|
payload = {"active": None, "items": []}
|
|
self._credentials_file.write_text(
|
|
json.dumps(payload, indent=2), encoding="utf-8"
|
|
)
|
|
|
|
def _load_credentials_data(self) -> dict:
|
|
self._ensure_credentials_file()
|
|
raw = json.loads(self._credentials_file.read_text(encoding="utf-8"))
|
|
if "items" not in raw:
|
|
# migrate legacy single credential format
|
|
record_id = uuid4().hex
|
|
legacy_title = raw.get("title") or "Default Credential"
|
|
legacy_record = {
|
|
"id": record_id,
|
|
"title": legacy_title,
|
|
"api_id": raw.get("api_id"),
|
|
"api_hash": raw.get("api_hash"),
|
|
"phone": raw.get("phone"),
|
|
"username": raw.get("username"),
|
|
"bot_token": raw.get("bot_token"),
|
|
"channel_id": raw.get("channel_id"),
|
|
}
|
|
raw = {"active": record_id, "items": [legacy_record]}
|
|
self._save_credentials_data(raw)
|
|
raw.setdefault("active", None)
|
|
raw.setdefault("items", [])
|
|
return raw
|
|
|
|
def _save_credentials_data(self, data: dict) -> None:
|
|
self._credentials_file.write_text(
|
|
json.dumps(data, indent=2), encoding="utf-8"
|
|
)
|
|
|
|
def _build_record(self, item: dict, active_id: Optional[str]) -> CredentialRecord:
|
|
payload = item.copy()
|
|
payload["active"] = payload["id"] == active_id
|
|
return CredentialRecord(**payload)
|