2025-07-10 10:02:51 +08:00

179 lines
5.1 KiB
Python

from flask import Blueprint, request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity
from models import Script, Server, ExecuteHistory, ExecuteResult
from database import db
import paramiko
import time
import threading
execute_bp = Blueprint('execute', __name__)
def execute_script_on_server(server, script_content, history_id):
"""在单个服务器上执行脚本"""
start_time = time.time()
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(server.ip, port=server.port, username=server.username, timeout=30)
stdin, stdout, stderr = ssh.exec_command(script_content)
output = stdout.read().decode('utf-8', errors='ignore')
error = stderr.read().decode('utf-8', errors='ignore')
ssh.close()
duration = int((time.time() - start_time) * 1000)
status = 'success' if not error else 'error'
# 保存执行结果
result = ExecuteResult(
history_id=history_id,
server_id=server.id,
server_name=server.name,
status=status,
output=output,
error=error,
duration=duration
)
db.session.add(result)
db.session.commit()
return {
'serverId': server.id,
'serverName': server.name,
'status': status,
'output': output,
'error': error,
'duration': duration
}
except Exception as e:
duration = int((time.time() - start_time) * 1000)
error_msg = str(e)
result = ExecuteResult(
history_id=history_id,
server_id=server.id,
server_name=server.name,
status='error',
output='',
error=error_msg,
duration=duration
)
db.session.add(result)
db.session.commit()
return {
'serverId': server.id,
'serverName': server.name,
'status': 'error',
'output': '',
'error': error_msg,
'duration': duration
}
@execute_bp.route('', methods=['POST'])
@jwt_required()
def execute_script():
user_id = get_jwt_identity()
data = request.get_json()
script_id = data['scriptId']
server_ids = data['serverIds']
# 获取脚本和服务器信息
script = Script.query.get_or_404(script_id)
servers = Server.query.filter(Server.id.in_(server_ids)).all()
if not servers:
return jsonify({'message': '未找到有效的服务器'}), 400
# 创建执行历史记录
history = ExecuteHistory(
script_id=script.id,
script_name=script.name,
server_count=len(servers),
executed_by=user_id
)
db.session.add(history)
db.session.commit()
# 并发执行脚本
results = []
threads = []
def execute_on_server(server):
result = execute_script_on_server(server, script.content, history.id)
results.append(result)
# 创建线程池
for server in servers:
thread = threading.Thread(target=execute_on_server, args=(server,))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
# 统计结果
success_count = sum(1 for r in results if r['status'] == 'success')
fail_count = len(results) - success_count
# 更新历史记录
history.success_count = success_count
history.fail_count = fail_count
history.status = 'completed'
db.session.commit()
return jsonify({
'message': '脚本执行完成',
'historyId': history.id,
'results': results,
'successCount': success_count,
'failCount': fail_count
})
@execute_bp.route('/history', methods=['GET'])
@jwt_required()
def get_execute_history():
history = ExecuteHistory.query.order_by(ExecuteHistory.execute_time.desc()).limit(50).all()
result = []
for h in history:
result.append({
'id': h.id,
'scriptName': h.script_name,
'serverCount': h.server_count,
'successCount': h.success_count,
'failCount': h.fail_count,
'status': h.status,
'executeTime': h.execute_time.strftime('%Y-%m-%d %H:%M:%S')
})
return jsonify(result)
@execute_bp.route('/history/<int:history_id>/results', methods=['GET'])
@jwt_required()
def get_execute_results(history_id):
results = ExecuteResult.query.filter_by(history_id=history_id).all()
result = []
for r in results:
result.append({
'id': r.id,
'serverId': r.server_id,
'serverName': r.server_name,
'status': r.status,
'output': r.output,
'error': r.error,
'duration': r.duration,
'executeTime': r.execute_time.strftime('%Y-%m-%d %H:%M:%S')
})
return jsonify(result)