# MIT License # Copyright (c) 2024 """执行层:基于 DSL 进行 UI 自动化,并支持可选视觉校验与结构化日志""" from __future__ import annotations import json import re import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional import cv2 # type: ignore import mss # type: ignore import numpy as np # type: ignore import uiautomation as auto # type: ignore from .schema import DSLSpec @dataclass class ExecContext: """执行上下文""" allow_title: str dry_run: bool = False artifacts_dir: Path = Path("artifacts") def _match_window(allow_title: str) -> Optional[auto.Control]: """仅在窗口标题匹配白名单时返回前台窗口""" ctrl = auto.GetForegroundControl() if ctrl is None or ctrl.Name is None: return None if not re.search(allow_title, ctrl.Name): return None return ctrl def _find_control(root: auto.Control, locator: Dict[str, Any], timeout: float) -> Optional[auto.Control]: """根据 locator 在 root 下查找控件""" start = time.time() while time.time() - start <= timeout: try: conds = [] name_val = locator.get("Name") class_val = locator.get("ClassName") ctrl_type_val = locator.get("ControlType") auto_id_val = locator.get("AutomationId") if auto_id_val: conds.append(auto.Control.AutomationId == auto_id_val) if name_val: conds.append(auto.Control.Name == name_val) if class_val: conds.append(auto.Control.ClassName == class_val) if ctrl_type_val: conds.append(auto.Control.ControlTypeName == ctrl_type_val) # 先检查 root 自身是否满足 try: if ( (not name_val or root.Name == name_val) and (not class_val or root.ClassName == class_val) and (not ctrl_type_val or root.ControlTypeName == ctrl_type_val) and (not auto_id_val or root.AutomationId == auto_id_val) ): return root except Exception: pass if conds: ctrl = root.Control(searchDepth=4, condition=auto.AndCondition(*conds)) else: ctrl = root if ctrl: return ctrl except Exception as exc: print(f"[warn] 查找控件异常: {exc}") time.sleep(0.5) return None def _capture_screenshot(ctrl: Optional[auto.Control], out_path: Path) -> Optional[Path]: """截取控件区域或全屏""" try: with mss.mss() as sct: if ctrl and getattr(ctrl, "BoundingRectangle", None): rect = ctrl.BoundingRectangle region = {"left": int(rect.left), "top": int(rect.top), "width": int(rect.right - rect.left), "height": int(rect.bottom - rect.top)} else: monitor = sct.monitors[1] if len(sct.monitors) > 1 else sct.monitors[0] region = {"left": monitor["left"], "top": monitor["top"], "width": monitor["width"], "height": monitor["height"]} shot = np.array(sct.grab(region)) frame = cv2.cvtColor(shot, cv2.COLOR_BGRA2BGR) out_path.parent.mkdir(parents=True, exist_ok=True) cv2.imwrite(str(out_path), frame) return out_path except Exception: return None def _capture_tree(ctrl: Optional[auto.Control], max_depth: int = 3) -> List[Dict[str, Any]]: """采集浅层 UIA 树摘要""" if ctrl is None: return [] nodes: List[Dict[str, Any]] = [] queue: List[Any] = [(ctrl, 0)] while queue: node, depth = queue.pop(0) if depth > max_depth: continue nodes.append( { "name": node.Name, "automation_id": node.AutomationId, "class_name": node.ClassName, "control_type": node.ControlTypeName, "depth": depth, } ) try: children = list(node.GetChildren()) except Exception: children = [] for child in children: queue.append((child, depth + 1)) return nodes def _save_tree(ctrl: Optional[auto.Control], out_path: Path) -> Optional[Path]: try: data = _capture_tree(ctrl) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") return out_path except Exception: return None def _image_similarity(full_img_path: Path, template_path: Path, threshold: float = 0.8) -> bool: """简单模板匹配,相似度 >= 阈值视为通过""" if not full_img_path.exists() or not template_path.exists(): return False full = cv2.imread(str(full_img_path), cv2.IMREAD_COLOR) tmpl = cv2.imread(str(template_path), cv2.IMREAD_COLOR) if full is None or tmpl is None or full.shape[0] < tmpl.shape[0] or full.shape[1] < tmpl.shape[1]: return False res = cv2.matchTemplate(full, tmpl, cv2.TM_CCOEFF_NORMED) _, max_val, _, _ = cv2.minMaxLoc(res) return float(max_val) >= threshold def _visual_check(expected: Dict[str, Any], ctrl: Optional[auto.Control], artifacts_dir: Path, step_idx: int, attempt: int) -> bool: """执行可选视觉校验:模板匹配""" template_path = expected.get("template_path") threshold = float(expected.get("threshold", 0.8)) if not template_path: return True snap_path = artifacts_dir / "screenshots" / f"step{step_idx:03d}_attempt{attempt}_visual.png" snap = _capture_screenshot(ctrl, snap_path) if not snap: return False return _image_similarity(snap, Path(template_path), threshold) def _log_event(log_path: Path, record: Dict[str, Any]) -> None: log_path.parent.mkdir(parents=True, exist_ok=True) with log_path.open("a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False)) f.write("\n") def _render_value(val: Any, params: Dict[str, Any]) -> Any: """简单占位符替换 ${param}""" if isinstance(val, str): out = val for k, v in params.items(): placeholder = f"${{{k}}}" if placeholder in out: out = out.replace(placeholder, str(v)) return out if isinstance(val, dict): return {k: _render_value(v, params) for k, v in val.items()} if isinstance(val, list): return [_render_value(v, params) for v in val] return val def _do_action(ctrl: auto.Control, step: Dict[str, Any], dry_run: bool) -> None: """执行单步动作""" action = step.get("action") text = step.get("text", "") if dry_run: print(f"[dry-run] {action} -> target={step.get('target')} text={text}") return if action == "click": ctrl.Click() elif action == "type": ctrl.SetFocus() auto.SendKeys(text) elif action == "set_value": try: ctrl.GetValuePattern().SetValue(text) except Exception: ctrl.SendKeys(text) elif action == "assert_exists": if ctrl is None: raise RuntimeError("控件未找到") elif action == "wait_for": time.sleep(float(step.get("waits", {}).get("appear", 1.0))) def execute_spec(spec: DSLSpec, ctx: ExecContext) -> None: """执行完整 DSL""" # 给前台窗口切换预留时间,避免刚启动命令时窗口还未聚焦 time.sleep(1.0) root = _match_window(ctx.allow_title) if root is None: raise RuntimeError(f"前台窗口标题未匹配白名单: {ctx.allow_title}") artifacts = ctx.artifacts_dir screenshots_dir = artifacts / "screenshots" trees_dir = artifacts / "ui_trees" log_path = artifacts / "executor_log.jsonl" def _normalize_target(tgt: Dict[str, Any]) -> Dict[str, Any]: """规范 target 键名到 UIA 期望大小写""" norm: Dict[str, Any] = {} for k, v in tgt.items(): lk = k.lower() if lk == "name": norm["Name"] = v elif lk in ("classname", "class_name"): norm["ClassName"] = v elif lk in ("controltype", "control_type"): norm["ControlType"] = v elif lk == "automationid": norm["AutomationId"] = v else: norm[k] = v return norm def normalize_step(step: Dict[str, Any]) -> Dict[str, Any]: """归一化字段,兼容不同 DSL 变体""" out = _render_value(dict(step), spec.params) if "target" not in out and "selector" in out: out["target"] = out.get("selector") out.pop("selector", None) if "value" in out and "text" not in out: out["text"] = out.get("value") out.pop("value", None) tgt = out.get("target") if isinstance(tgt, dict): out["target"] = _normalize_target(tgt) waits_obj = out.get("waits") if isinstance(waits_obj, list): appear = None for w in waits_obj: if isinstance(w, dict) and "timeout_ms" in w: appear = float(w.get("timeout_ms", 0)) / 1000.0 break out["waits"] = {"appear": appear or spec.waits.get("appear", 5.0), "disappear": spec.waits.get("disappear", 1.0)} elif isinstance(waits_obj, dict): waits_obj = dict(waits_obj) if "timeout_ms" in waits_obj and "appear" not in waits_obj: waits_obj["appear"] = float(waits_obj.pop("timeout_ms")) / 1000.0 out["waits"] = waits_obj else: out["waits"] = spec.waits if "timeout_ms" in out: out.setdefault("waits", {}) out["waits"]["appear"] = float(out.pop("timeout_ms")) / 1000.0 return out def normalize_steps(steps: List[Any]) -> List[Any]: normed: List[Any] = [] for st in steps: if isinstance(st, dict): st = normalize_step(st) if "steps" in st and isinstance(st["steps"], list): st["steps"] = normalize_steps(st["steps"]) if "else_steps" in st and isinstance(st["else_steps"], list): st["else_steps"] = normalize_steps(st["else_steps"]) normed.append(st) return normed normalized_steps = normalize_steps(spec.steps) def run_steps(steps: List[Any]) -> None: for idx, step in enumerate(steps, start=1): if "for_each" in step: iterable = spec.params.get(step["for_each"], []) for item in iterable: run_steps(step.get("steps", [])) elif "if_condition" in step: cond = step["if_condition"] if spec.params.get(cond): run_steps(step.get("steps", [])) else: run_steps(step.get("else_steps", [])) else: target = step.get("target", {}) timeout = float(step.get("waits", {}).get("appear", spec.waits.get("appear", 1.0))) retry = step.get("retry_policy", spec.retry_policy) attempts = int(retry.get("max_attempts", 1)) interval = float(retry.get("interval", 1.0)) expected = step.get("expected_screen") or {} last_err: Optional[Exception] = None for attempt in range(1, attempts + 1): ctrl = _find_control(root, target, timeout) try: if ctrl is None: raise RuntimeError("控件未找到") # 视觉校验(可选) if expected: ok = _visual_check(expected, ctrl, artifacts, idx, attempt) if not ok: raise RuntimeError("视觉校验未通过") _do_action(ctrl, step, ctx.dry_run) snap_path = _capture_screenshot(ctrl, screenshots_dir / f"step{idx:03d}_attempt{attempt}_success.png") tree_path = _save_tree(ctrl, trees_dir / f"step{idx:03d}_attempt{attempt}_tree.json") _log_event( log_path, { "ts": time.time(), "step_index": idx, "action": step.get("action"), "target": target, "attempt": attempt, "result": "success", "screenshot": str(snap_path) if snap_path else None, "tree": str(tree_path) if tree_path else None, }, ) last_err = None break except Exception as e: # noqa: BLE001 last_err = e snap_path = _capture_screenshot(ctrl, screenshots_dir / f"step{idx:03d}_attempt{attempt}_fail.png") tree_path = _save_tree(ctrl, trees_dir / f"step{idx:03d}_attempt{attempt}_tree.json") _log_event( log_path, { "ts": time.time(), "step_index": idx, "action": step.get("action"), "target": target, "attempt": attempt, "result": "fail", "error": str(e), "screenshot": str(snap_path) if snap_path else None, "tree": str(tree_path) if tree_path else None, }, ) if attempt < attempts: time.sleep(interval) if last_err: raise last_err run_steps(normalized_steps)