2025-11-29 23:16:56 +08:00
|
|
|
|
import json
|
2025-11-28 17:39:54 +08:00
|
|
|
|
import logging
|
|
|
|
|
|
import time
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
from typing import Any, Dict, Optional
|
|
|
|
|
|
|
|
|
|
|
|
import requests
|
|
|
|
|
|
from requests import Response
|
|
|
|
|
|
from flask import current_app
|
|
|
|
|
|
|
2025-11-29 23:16:56 +08:00
|
|
|
|
from app.extensions import db
|
2025-11-28 17:39:54 +08:00
|
|
|
|
from app.models import ApiCallLog, ApiConfig
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_json_field(raw: Optional[str]) -> Optional[Dict[str, Any]]:
|
|
|
|
|
|
if not raw:
|
|
|
|
|
|
return None
|
|
|
|
|
|
try:
|
|
|
|
|
|
return json.loads(raw)
|
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def execute_api(api_config: ApiConfig) -> None:
|
|
|
|
|
|
"""
|
2025-11-28 18:43:11 +08:00
|
|
|
|
根据配置调用 API,带重试,并将每次尝试都记录为日志(保留历史)。
|
2025-11-29 23:16:56 +08:00
|
|
|
|
支持流式响应:边接收边写入 response_body,便于前端实时查看。
|
2025-11-28 17:39:54 +08:00
|
|
|
|
"""
|
|
|
|
|
|
request_time = datetime.utcnow()
|
|
|
|
|
|
headers = _parse_json_field(api_config.headers) or {}
|
|
|
|
|
|
params = _parse_json_field(api_config.query_params) or {}
|
|
|
|
|
|
data = api_config.body
|
|
|
|
|
|
try:
|
|
|
|
|
|
json_body = json.loads(api_config.body) if api_config.body else None
|
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
|
json_body = None
|
|
|
|
|
|
|
|
|
|
|
|
retries = max(api_config.retry_times, 0)
|
|
|
|
|
|
delay = max(api_config.retry_interval_seconds, 1)
|
|
|
|
|
|
timeout = max(api_config.timeout_seconds, 1)
|
|
|
|
|
|
attempt = 0
|
2025-11-28 18:43:11 +08:00
|
|
|
|
total_attempts = retries + 1
|
2025-11-28 17:39:54 +08:00
|
|
|
|
last_error: Optional[str] = None
|
|
|
|
|
|
|
2025-11-28 18:43:11 +08:00
|
|
|
|
while attempt < total_attempts:
|
|
|
|
|
|
last_response: Optional[Response] = None
|
2025-11-29 23:16:56 +08:00
|
|
|
|
log_entry: Optional[ApiCallLog] = None
|
2025-11-28 18:43:11 +08:00
|
|
|
|
start_ts = time.time()
|
|
|
|
|
|
attempt += 1
|
2025-11-28 17:39:54 +08:00
|
|
|
|
try:
|
2025-11-29 23:16:56 +08:00
|
|
|
|
# stream=True 便于流式响应实时写日志;非流式也兼容
|
2025-11-28 17:39:54 +08:00
|
|
|
|
last_response = requests.request(
|
|
|
|
|
|
method=api_config.http_method,
|
|
|
|
|
|
url=api_config.url,
|
|
|
|
|
|
headers=headers,
|
|
|
|
|
|
params=params,
|
|
|
|
|
|
data=None if json_body is not None else data,
|
|
|
|
|
|
json=json_body,
|
|
|
|
|
|
timeout=timeout,
|
2025-11-29 23:16:56 +08:00
|
|
|
|
stream=True,
|
2025-11-28 17:39:54 +08:00
|
|
|
|
)
|
2025-11-29 23:16:56 +08:00
|
|
|
|
|
|
|
|
|
|
# 创建进行中的日志,前端可以立即看到“执行中”
|
|
|
|
|
|
log_entry = ApiCallLog(
|
|
|
|
|
|
api_id=api_config.id,
|
|
|
|
|
|
request_time=request_time,
|
|
|
|
|
|
success=None, # 进行中
|
|
|
|
|
|
http_status_code=last_response.status_code,
|
|
|
|
|
|
response_body="",
|
|
|
|
|
|
)
|
|
|
|
|
|
db.session.add(log_entry)
|
|
|
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
|
|
|
|
max_body_length = 20000 # 避免日志过大
|
|
|
|
|
|
last_flush = time.time()
|
|
|
|
|
|
encoding = last_response.encoding or "utf-8"
|
|
|
|
|
|
for chunk in last_response.iter_content(chunk_size=1024):
|
|
|
|
|
|
if not chunk:
|
|
|
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
|
|
|
text_part = chunk.decode(encoding, errors="ignore")
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
text_part = ""
|
|
|
|
|
|
if not text_part:
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
current_body = log_entry.response_body or ""
|
|
|
|
|
|
if len(current_body) < max_body_length:
|
|
|
|
|
|
log_entry.response_body = (current_body + text_part)[:max_body_length]
|
|
|
|
|
|
|
|
|
|
|
|
now = time.time()
|
|
|
|
|
|
# 每秒刷新一次数据库,兼顾实时性与写入开销
|
|
|
|
|
|
if now - last_flush >= 1:
|
|
|
|
|
|
db.session.commit()
|
|
|
|
|
|
last_flush = now
|
|
|
|
|
|
|
|
|
|
|
|
# 流结束后,根据状态码判定是否成功
|
2025-11-28 17:39:54 +08:00
|
|
|
|
if 200 <= last_response.status_code < 300:
|
|
|
|
|
|
last_error = None
|
2025-11-28 18:43:11 +08:00
|
|
|
|
else:
|
2025-11-29 23:16:56 +08:00
|
|
|
|
body_snippet = (log_entry.response_body or "")[:200]
|
2025-11-28 18:43:11 +08:00
|
|
|
|
last_error = f"Non-2xx status: {last_response.status_code}, body: {body_snippet}"
|
2025-11-28 17:39:54 +08:00
|
|
|
|
except requests.RequestException as exc:
|
|
|
|
|
|
last_error = str(exc)
|
2025-11-29 23:16:56 +08:00
|
|
|
|
except Exception as exc:
|
|
|
|
|
|
last_error = str(exc)
|
2025-11-28 17:39:54 +08:00
|
|
|
|
|
2025-11-29 23:16:56 +08:00
|
|
|
|
end_time = datetime.utcnow()
|
|
|
|
|
|
duration_ms = int((end_time - request_time).total_seconds() * 1000)
|
2025-11-28 18:43:11 +08:00
|
|
|
|
success = last_error is None
|
|
|
|
|
|
|
2025-11-29 23:16:56 +08:00
|
|
|
|
# 更新日志最终状态;如果之前没有创建(例如请求异常前失败),则创建一条
|
|
|
|
|
|
if log_entry is None:
|
|
|
|
|
|
log_entry = ApiCallLog(
|
|
|
|
|
|
api_id=api_config.id,
|
|
|
|
|
|
request_time=request_time,
|
|
|
|
|
|
)
|
|
|
|
|
|
db.session.add(log_entry)
|
|
|
|
|
|
|
|
|
|
|
|
log_entry.response_time = end_time
|
|
|
|
|
|
log_entry.duration_ms = duration_ms
|
|
|
|
|
|
log_entry.success = success
|
|
|
|
|
|
if last_response:
|
|
|
|
|
|
log_entry.http_status_code = last_response.status_code
|
|
|
|
|
|
if last_error:
|
|
|
|
|
|
log_entry.error_message = f"[第{attempt}次尝试/{total_attempts}] {last_error}"
|
|
|
|
|
|
|
2025-11-28 18:43:11 +08:00
|
|
|
|
try:
|
|
|
|
|
|
db.session.commit()
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
logger.exception("Failed to persist ApiCallLog for api_id=%s", api_config.id)
|
|
|
|
|
|
db.session.rollback()
|
|
|
|
|
|
|
|
|
|
|
|
if success:
|
|
|
|
|
|
break
|
|
|
|
|
|
if attempt < total_attempts:
|
|
|
|
|
|
time.sleep(delay)
|
2025-11-28 17:39:54 +08:00
|
|
|
|
|
2025-11-28 18:43:11 +08:00
|
|
|
|
if last_error:
|
2025-11-28 17:39:54 +08:00
|
|
|
|
logger.warning("API call failed for api_id=%s error=%s", api_config.id, last_error)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def execute_api_by_id(api_id: int, app=None) -> None:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Load ApiConfig by id and execute within app context (for scheduler threads).
|
|
|
|
|
|
"""
|
|
|
|
|
|
app_obj = app or current_app._get_current_object()
|
|
|
|
|
|
with app_obj.app_context():
|
|
|
|
|
|
config = ApiConfig.query.get(api_id)
|
|
|
|
|
|
if not config:
|
|
|
|
|
|
logger.error("ApiConfig not found for id=%s", api_id)
|
|
|
|
|
|
return
|
|
|
|
|
|
execute_api(config)
|