import logging from typing import Optional from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from flask import current_app from flask_apscheduler import APScheduler from app.models import ApiConfig from flask import current_app from app.services.api_executor import execute_api_by_id logger = logging.getLogger(__name__) class SchedulerService: """ Thin wrapper around APScheduler for adding/removing API jobs. """ def __init__(self, scheduler: APScheduler) -> None: self.scheduler = scheduler def load_enabled_jobs(self) -> None: try: enabled_configs = ApiConfig.query.filter_by(enabled=True).all() except Exception: logger.warning("Could not load jobs; database might not be initialized yet.") return for config in enabled_configs: self.add_job_for_api(config) logger.info("Scheduler loaded %s enabled jobs", len(enabled_configs)) def add_job_for_api(self, api_config: ApiConfig) -> None: job_id = api_config.job_id() trigger = self._build_trigger(api_config) if not trigger: logger.error("Unsupported schedule type for api_id=%s", api_config.id) return self.remove_job_for_api(api_config.id) app_obj = current_app._get_current_object() self.scheduler.add_job( func=execute_api_by_id, trigger=trigger, args=[api_config.id, app_obj], id=job_id, replace_existing=True, max_instances=1, misfire_grace_time=current_app.config.get("SCHEDULER_MISFIRE_GRACE_TIME", 60), ) logger.info("Registered job %s for api_id=%s", job_id, api_config.id) def remove_job_for_api(self, api_id: int) -> None: job_id = f"api_job_{api_id}" try: self.scheduler.remove_job(job_id) logger.info("Removed job %s", job_id) except Exception: # 如果任务不存在则静默忽略 pass def reschedule_job_for_api(self, api_config: ApiConfig) -> None: self.add_job_for_api(api_config) def _build_trigger(self, api_config: ApiConfig): schedule_type = api_config.schedule_type expr = api_config.schedule_expression if schedule_type == "cron": try: fields = expr.split() if len(fields) == 5: minute, hour, day, month, day_of_week = fields return CronTrigger(minute=minute, hour=hour, day=day, month=month, day_of_week=day_of_week) return CronTrigger.from_crontab(expr) except Exception as exc: logger.error("Invalid cron expression for api_id=%s error=%s", api_config.id, exc) return None if schedule_type == "interval": try: seconds = int(expr) return IntervalTrigger(seconds=seconds) except ValueError: logger.error("Invalid interval seconds for api_id=%s", api_config.id) return None if schedule_type == "daily": # 期待格式为 "HH:MM" try: hour, minute = expr.split(":") return CronTrigger(hour=int(hour), minute=int(minute)) except Exception as exc: logger.error("Invalid daily expression for api_id=%s error=%s", api_config.id, exc) return None return None