apiScheduler/app/services/api_executor.py
2025-11-29 23:16:56 +08:00

156 lines
5.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
import time
from datetime import datetime
from typing import Any, Dict, Optional
import requests
from requests import Response
from flask import current_app
from app.extensions import db
from app.models import ApiCallLog, ApiConfig
logger = logging.getLogger(__name__)
def _parse_json_field(raw: Optional[str]) -> Optional[Dict[str, Any]]:
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return None
def execute_api(api_config: ApiConfig) -> None:
"""
根据配置调用 API带重试并将每次尝试都记录为日志保留历史
支持流式响应:边接收边写入 response_body便于前端实时查看。
"""
request_time = datetime.utcnow()
headers = _parse_json_field(api_config.headers) or {}
params = _parse_json_field(api_config.query_params) or {}
data = api_config.body
try:
json_body = json.loads(api_config.body) if api_config.body else None
except json.JSONDecodeError:
json_body = None
retries = max(api_config.retry_times, 0)
delay = max(api_config.retry_interval_seconds, 1)
timeout = max(api_config.timeout_seconds, 1)
attempt = 0
total_attempts = retries + 1
last_error: Optional[str] = None
while attempt < total_attempts:
last_response: Optional[Response] = None
log_entry: Optional[ApiCallLog] = None
start_ts = time.time()
attempt += 1
try:
# stream=True 便于流式响应实时写日志;非流式也兼容
last_response = requests.request(
method=api_config.http_method,
url=api_config.url,
headers=headers,
params=params,
data=None if json_body is not None else data,
json=json_body,
timeout=timeout,
stream=True,
)
# 创建进行中的日志,前端可以立即看到“执行中”
log_entry = ApiCallLog(
api_id=api_config.id,
request_time=request_time,
success=None, # 进行中
http_status_code=last_response.status_code,
response_body="",
)
db.session.add(log_entry)
db.session.commit()
max_body_length = 20000 # 避免日志过大
last_flush = time.time()
encoding = last_response.encoding or "utf-8"
for chunk in last_response.iter_content(chunk_size=1024):
if not chunk:
continue
try:
text_part = chunk.decode(encoding, errors="ignore")
except Exception:
text_part = ""
if not text_part:
continue
current_body = log_entry.response_body or ""
if len(current_body) < max_body_length:
log_entry.response_body = (current_body + text_part)[:max_body_length]
now = time.time()
# 每秒刷新一次数据库,兼顾实时性与写入开销
if now - last_flush >= 1:
db.session.commit()
last_flush = now
# 流结束后,根据状态码判定是否成功
if 200 <= last_response.status_code < 300:
last_error = None
else:
body_snippet = (log_entry.response_body or "")[:200]
last_error = f"Non-2xx status: {last_response.status_code}, body: {body_snippet}"
except requests.RequestException as exc:
last_error = str(exc)
except Exception as exc:
last_error = str(exc)
end_time = datetime.utcnow()
duration_ms = int((end_time - request_time).total_seconds() * 1000)
success = last_error is None
# 更新日志最终状态;如果之前没有创建(例如请求异常前失败),则创建一条
if log_entry is None:
log_entry = ApiCallLog(
api_id=api_config.id,
request_time=request_time,
)
db.session.add(log_entry)
log_entry.response_time = end_time
log_entry.duration_ms = duration_ms
log_entry.success = success
if last_response:
log_entry.http_status_code = last_response.status_code
if last_error:
log_entry.error_message = f"[第{attempt}次尝试/{total_attempts}] {last_error}"
try:
db.session.commit()
except Exception:
logger.exception("Failed to persist ApiCallLog for api_id=%s", api_config.id)
db.session.rollback()
if success:
break
if attempt < total_attempts:
time.sleep(delay)
if last_error:
logger.warning("API call failed for api_id=%s error=%s", api_config.id, last_error)
def execute_api_by_id(api_id: int, app=None) -> None:
"""
Load ApiConfig by id and execute within app context (for scheduler threads).
"""
app_obj = app or current_app._get_current_object()
with app_obj.app_context():
config = ApiConfig.query.get(api_id)
if not config:
logger.error("ApiConfig not found for id=%s", api_id)
return
execute_api(config)