95 lines
3.5 KiB
Python
95 lines
3.5 KiB
Python
|
|
import logging
|
||
|
|
from typing import Optional
|
||
|
|
|
||
|
|
from apscheduler.triggers.cron import CronTrigger
|
||
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
||
|
|
from flask import current_app
|
||
|
|
from flask_apscheduler import APScheduler
|
||
|
|
|
||
|
|
from app.models import ApiConfig
|
||
|
|
from flask import current_app
|
||
|
|
|
||
|
|
from app.services.api_executor import execute_api_by_id
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
class SchedulerService:
|
||
|
|
"""
|
||
|
|
Thin wrapper around APScheduler for adding/removing API jobs.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, scheduler: APScheduler) -> None:
|
||
|
|
self.scheduler = scheduler
|
||
|
|
|
||
|
|
def load_enabled_jobs(self) -> None:
|
||
|
|
try:
|
||
|
|
enabled_configs = ApiConfig.query.filter_by(enabled=True).all()
|
||
|
|
except Exception:
|
||
|
|
logger.warning("Could not load jobs; database might not be initialized yet.")
|
||
|
|
return
|
||
|
|
for config in enabled_configs:
|
||
|
|
self.add_job_for_api(config)
|
||
|
|
logger.info("Scheduler loaded %s enabled jobs", len(enabled_configs))
|
||
|
|
|
||
|
|
def add_job_for_api(self, api_config: ApiConfig) -> None:
|
||
|
|
job_id = api_config.job_id()
|
||
|
|
trigger = self._build_trigger(api_config)
|
||
|
|
if not trigger:
|
||
|
|
logger.error("Unsupported schedule type for api_id=%s", api_config.id)
|
||
|
|
return
|
||
|
|
self.remove_job_for_api(api_config.id)
|
||
|
|
app_obj = current_app._get_current_object()
|
||
|
|
self.scheduler.add_job(
|
||
|
|
func=execute_api_by_id,
|
||
|
|
trigger=trigger,
|
||
|
|
args=[api_config.id, app_obj],
|
||
|
|
id=job_id,
|
||
|
|
replace_existing=True,
|
||
|
|
max_instances=1,
|
||
|
|
misfire_grace_time=current_app.config.get("SCHEDULER_MISFIRE_GRACE_TIME", 60),
|
||
|
|
)
|
||
|
|
logger.info("Registered job %s for api_id=%s", job_id, api_config.id)
|
||
|
|
|
||
|
|
def remove_job_for_api(self, api_id: int) -> None:
|
||
|
|
job_id = f"api_job_{api_id}"
|
||
|
|
try:
|
||
|
|
self.scheduler.remove_job(job_id)
|
||
|
|
logger.info("Removed job %s", job_id)
|
||
|
|
except Exception:
|
||
|
|
# 如果任务不存在则静默忽略
|
||
|
|
pass
|
||
|
|
|
||
|
|
def reschedule_job_for_api(self, api_config: ApiConfig) -> None:
|
||
|
|
self.add_job_for_api(api_config)
|
||
|
|
|
||
|
|
def _build_trigger(self, api_config: ApiConfig):
|
||
|
|
schedule_type = api_config.schedule_type
|
||
|
|
expr = api_config.schedule_expression
|
||
|
|
if schedule_type == "cron":
|
||
|
|
try:
|
||
|
|
fields = expr.split()
|
||
|
|
if len(fields) == 5:
|
||
|
|
minute, hour, day, month, day_of_week = fields
|
||
|
|
return CronTrigger(minute=minute, hour=hour, day=day, month=month, day_of_week=day_of_week)
|
||
|
|
return CronTrigger.from_crontab(expr)
|
||
|
|
except Exception as exc:
|
||
|
|
logger.error("Invalid cron expression for api_id=%s error=%s", api_config.id, exc)
|
||
|
|
return None
|
||
|
|
if schedule_type == "interval":
|
||
|
|
try:
|
||
|
|
seconds = int(expr)
|
||
|
|
return IntervalTrigger(seconds=seconds)
|
||
|
|
except ValueError:
|
||
|
|
logger.error("Invalid interval seconds for api_id=%s", api_config.id)
|
||
|
|
return None
|
||
|
|
if schedule_type == "daily":
|
||
|
|
# 期待格式为 "HH:MM"
|
||
|
|
try:
|
||
|
|
hour, minute = expr.split(":")
|
||
|
|
return CronTrigger(hour=int(hour), minute=int(minute))
|
||
|
|
except Exception as exc:
|
||
|
|
logger.error("Invalid daily expression for api_id=%s error=%s", api_config.id, exc)
|
||
|
|
return None
|
||
|
|
return None
|