jdeip/app/redis_store.py

50 lines
1.4 KiB
Python
Raw Normal View History

import datetime
from typing import Iterable, Optional
import redis
from .config import settings
class RedisStore:
def __init__(self, url: str) -> None:
self._r = redis.Redis.from_url(url, decode_responses=True)
@staticmethod
def _today_key(prefix: str) -> str:
today = datetime.datetime.utcnow().strftime("%Y%m%d")
return f"eip:{prefix}:{today}"
def add_used_ip_today(self, ip: str) -> None:
key = self._today_key("used")
self._r.sadd(key, ip)
# 设置 2 天过期,跨时区冗余
self._r.expire(key, 172800)
def is_ip_used_today(self, ip: str) -> bool:
key = self._today_key("used")
return bool(self._r.sismember(key, ip))
def get_used_count_today(self) -> int:
key = self._today_key("used")
return int(self._r.scard(key))
def set_current(self, ip: str, edge_id: Optional[str]) -> None:
self._r.hset("eip:current", mapping={"ip": ip, "edge": edge_id or ""})
def get_current(self) -> dict:
data = self._r.hgetall("eip:current")
return data or {}
def cache_candidates(self, items: Iterable[str]) -> None:
key = self._today_key("candidates")
if items:
self._r.delete(key)
self._r.rpush(key, *list(items))
self._r.expire(key, 172800)
store_singleton = RedisStore(settings.redis_url)