import json import logging import time from datetime import datetime from typing import Any, Dict, Optional import requests from requests import Response from flask import current_app from app.extensions import db from app.models import ApiCallLog, ApiConfig logger = logging.getLogger(__name__) def _parse_json_field(raw: Optional[str]) -> Optional[Dict[str, Any]]: if not raw: return None try: return json.loads(raw) except json.JSONDecodeError: return None def execute_api(api_config: ApiConfig) -> None: """ 根据配置调用 API,带重试,并将每次尝试都记录为日志(保留历史)。 支持流式响应:边接收边写入 response_body,便于前端实时查看。 """ request_time = datetime.utcnow() headers = _parse_json_field(api_config.headers) or {} params = _parse_json_field(api_config.query_params) or {} data = api_config.body try: json_body = json.loads(api_config.body) if api_config.body else None except json.JSONDecodeError: json_body = None retries = max(api_config.retry_times, 0) delay = max(api_config.retry_interval_seconds, 1) timeout = max(api_config.timeout_seconds, 1) attempt = 0 total_attempts = retries + 1 last_error: Optional[str] = None while attempt < total_attempts: last_response: Optional[Response] = None log_entry: Optional[ApiCallLog] = None start_ts = time.time() attempt += 1 try: # stream=True 便于流式响应实时写日志;非流式也兼容 last_response = requests.request( method=api_config.http_method, url=api_config.url, headers=headers, params=params, data=None if json_body is not None else data, json=json_body, timeout=timeout, stream=True, ) # 创建进行中的日志,前端可以立即看到“执行中” log_entry = ApiCallLog( api_id=api_config.id, request_time=request_time, success=None, # 进行中 http_status_code=last_response.status_code, response_body="", ) db.session.add(log_entry) db.session.commit() max_body_length = 20000 # 避免日志过大 last_flush = time.time() encoding = last_response.encoding or "utf-8" for chunk in last_response.iter_content(chunk_size=1024): if not chunk: continue try: text_part = chunk.decode(encoding, errors="ignore") except Exception: text_part = "" if not text_part: continue current_body = log_entry.response_body or "" if len(current_body) < max_body_length: log_entry.response_body = (current_body + text_part)[:max_body_length] now = time.time() # 每秒刷新一次数据库,兼顾实时性与写入开销 if now - last_flush >= 1: db.session.commit() last_flush = now # 流结束后,根据状态码判定是否成功 if 200 <= last_response.status_code < 300: last_error = None else: body_snippet = (log_entry.response_body or "")[:200] last_error = f"Non-2xx status: {last_response.status_code}, body: {body_snippet}" except requests.RequestException as exc: last_error = str(exc) except Exception as exc: last_error = str(exc) end_time = datetime.utcnow() duration_ms = int((end_time - request_time).total_seconds() * 1000) success = last_error is None # 更新日志最终状态;如果之前没有创建(例如请求异常前失败),则创建一条 if log_entry is None: log_entry = ApiCallLog( api_id=api_config.id, request_time=request_time, ) db.session.add(log_entry) log_entry.response_time = end_time log_entry.duration_ms = duration_ms log_entry.success = success if last_response: log_entry.http_status_code = last_response.status_code if last_error: log_entry.error_message = f"[第{attempt}次尝试/{total_attempts}] {last_error}" try: db.session.commit() except Exception: logger.exception("Failed to persist ApiCallLog for api_id=%s", api_config.id) db.session.rollback() if success: break if attempt < total_attempts: time.sleep(delay) if last_error: logger.warning("API call failed for api_id=%s error=%s", api_config.id, last_error) def execute_api_by_id(api_id: int, app=None) -> None: """ Load ApiConfig by id and execute within app context (for scheduler threads). """ app_obj = app or current_app._get_current_object() with app_obj.app_context(): config = ApiConfig.query.get(api_id) if not config: logger.error("ApiConfig not found for id=%s", api_id) return execute_api(config)