from flask import Blueprint, request, jsonify from flask_jwt_extended import jwt_required, get_jwt_identity from models import Script, Server, ExecuteHistory, ExecuteResult from app import db import paramiko import time import threading execute_bp = Blueprint('execute', __name__) def execute_script_on_server(server, script_content, history_id): """在单个服务器上执行脚本""" start_time = time.time() try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(server.ip, port=server.port, username=server.username, timeout=30) stdin, stdout, stderr = ssh.exec_command(script_content) output = stdout.read().decode('utf-8', errors='ignore') error = stderr.read().decode('utf-8', errors='ignore') ssh.close() duration = int((time.time() - start_time) * 1000) status = 'success' if not error else 'error' # 保存执行结果 result = ExecuteResult( history_id=history_id, server_id=server.id, server_name=server.name, status=status, output=output, error=error, duration=duration ) db.session.add(result) db.session.commit() return { 'serverId': server.id, 'serverName': server.name, 'status': status, 'output': output, 'error': error, 'duration': duration } except Exception as e: duration = int((time.time() - start_time) * 1000) error_msg = str(e) result = ExecuteResult( history_id=history_id, server_id=server.id, server_name=server.name, status='error', output='', error=error_msg, duration=duration ) db.session.add(result) db.session.commit() return { 'serverId': server.id, 'serverName': server.name, 'status': 'error', 'output': '', 'error': error_msg, 'duration': duration } @execute_bp.route('', methods=['POST']) @jwt_required() def execute_script(): user_id = get_jwt_identity() data = request.get_json() script_id = data['scriptId'] server_ids = data['serverIds'] # 获取脚本和服务器信息 script = Script.query.get_or_404(script_id) servers = Server.query.filter(Server.id.in_(server_ids)).all() if not servers: return jsonify({'message': '未找到有效的服务器'}), 400 # 创建执行历史记录 history = ExecuteHistory( script_id=script.id, script_name=script.name, server_count=len(servers), executed_by=user_id ) db.session.add(history) db.session.commit() # 并发执行脚本 results = [] threads = [] def execute_on_server(server): result = execute_script_on_server(server, script.content, history.id) results.append(result) # 创建线程池 for server in servers: thread = threading.Thread(target=execute_on_server, args=(server,)) threads.append(thread) thread.start() # 等待所有线程完成 for thread in threads: thread.join() # 统计结果 success_count = sum(1 for r in results if r['status'] == 'success') fail_count = len(results) - success_count # 更新历史记录 history.success_count = success_count history.fail_count = fail_count history.status = 'completed' db.session.commit() return jsonify({ 'message': '脚本执行完成', 'historyId': history.id, 'results': results, 'successCount': success_count, 'failCount': fail_count }) @execute_bp.route('/history', methods=['GET']) @jwt_required() def get_execute_history(): history = ExecuteHistory.query.order_by(ExecuteHistory.execute_time.desc()).limit(50).all() result = [] for h in history: result.append({ 'id': h.id, 'scriptName': h.script_name, 'serverCount': h.server_count, 'successCount': h.success_count, 'failCount': h.fail_count, 'status': h.status, 'executeTime': h.execute_time.strftime('%Y-%m-%d %H:%M:%S') }) return jsonify(result) @execute_bp.route('/history//results', methods=['GET']) @jwt_required() def get_execute_results(history_id): results = ExecuteResult.query.filter_by(history_id=history_id).all() result = [] for r in results: result.append({ 'id': r.id, 'serverId': r.server_id, 'serverName': r.server_name, 'status': r.status, 'output': r.output, 'error': r.error, 'duration': r.duration, 'executeTime': r.execute_time.strftime('%Y-%m-%d %H:%M:%S') }) return jsonify(result)