apiScheduler/app/services/scheduler.py

95 lines
3.5 KiB
Python
Raw Normal View History

2025-11-28 17:39:54 +08:00
import logging
from typing import Optional
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from flask import current_app
from flask_apscheduler import APScheduler
from app.models import ApiConfig
from flask import current_app
from app.services.api_executor import execute_api_by_id
logger = logging.getLogger(__name__)
class SchedulerService:
"""
Thin wrapper around APScheduler for adding/removing API jobs.
"""
def __init__(self, scheduler: APScheduler) -> None:
self.scheduler = scheduler
def load_enabled_jobs(self) -> None:
try:
enabled_configs = ApiConfig.query.filter_by(enabled=True).all()
except Exception:
logger.warning("Could not load jobs; database might not be initialized yet.")
return
for config in enabled_configs:
self.add_job_for_api(config)
logger.info("Scheduler loaded %s enabled jobs", len(enabled_configs))
def add_job_for_api(self, api_config: ApiConfig) -> None:
job_id = api_config.job_id()
trigger = self._build_trigger(api_config)
if not trigger:
logger.error("Unsupported schedule type for api_id=%s", api_config.id)
return
self.remove_job_for_api(api_config.id)
app_obj = current_app._get_current_object()
self.scheduler.add_job(
func=execute_api_by_id,
trigger=trigger,
args=[api_config.id, app_obj],
id=job_id,
replace_existing=True,
max_instances=1,
misfire_grace_time=current_app.config.get("SCHEDULER_MISFIRE_GRACE_TIME", 60),
)
logger.info("Registered job %s for api_id=%s", job_id, api_config.id)
def remove_job_for_api(self, api_id: int) -> None:
job_id = f"api_job_{api_id}"
try:
self.scheduler.remove_job(job_id)
logger.info("Removed job %s", job_id)
except Exception:
# 如果任务不存在则静默忽略
pass
def reschedule_job_for_api(self, api_config: ApiConfig) -> None:
self.add_job_for_api(api_config)
def _build_trigger(self, api_config: ApiConfig):
schedule_type = api_config.schedule_type
expr = api_config.schedule_expression
if schedule_type == "cron":
try:
fields = expr.split()
if len(fields) == 5:
minute, hour, day, month, day_of_week = fields
return CronTrigger(minute=minute, hour=hour, day=day, month=month, day_of_week=day_of_week)
return CronTrigger.from_crontab(expr)
except Exception as exc:
logger.error("Invalid cron expression for api_id=%s error=%s", api_config.id, exc)
return None
if schedule_type == "interval":
try:
seconds = int(expr)
return IntervalTrigger(seconds=seconds)
except ValueError:
logger.error("Invalid interval seconds for api_id=%s", api_config.id)
return None
if schedule_type == "daily":
# 期待格式为 "HH:MM"
try:
hour, minute = expr.split(":")
return CronTrigger(hour=int(hour), minute=int(minute))
except Exception as exc:
logger.error("Invalid daily expression for api_id=%s error=%s", api_config.id, exc)
return None
return None