# MIT License # Copyright (c) 2024 """执行层:基于 DSL 进行 UI 自动化,并支持可选视觉校验与结构化日志""" from __future__ import annotations import json import re import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional import cv2 # type: ignore import mss # type: ignore import numpy as np # type: ignore import uiautomation as auto # type: ignore from .schema import DSLSpec @dataclass class ExecContext: """执行上下文""" allow_title: str dry_run: bool = False artifacts_dir: Path = Path("artifacts") def _match_window(allow_title: str) -> Optional[auto.Control]: """仅在窗口标题匹配白名单时返回窗口,容忍标题前缀/包含""" patterns = [allow_title] if " - " in allow_title: patterns.append(allow_title.split(" - ", 1)[0]) def _title_match(name: Optional[str]) -> bool: if not name: return False for pat in patterns: if pat and pat in name: return True return False def _ascend_to_top(node: auto.Control) -> auto.Control: """向上寻找最可能的顶层窗口(Chrome 主窗口类名/WindowControl 优先)""" best = node cur = node while True: try: parent = cur.GetParent() except Exception: parent = None if not parent: return best try: cls = getattr(parent, "ClassName", None) ctype = getattr(parent, "ControlTypeName", None) if cls == "Chrome_WidgetWin_1" or ctype == "WindowControl": best = parent except Exception: pass cur = parent fg = auto.GetForegroundControl() if fg and _title_match(getattr(fg, "Name", None)): return _ascend_to_top(fg) root = auto.GetRootControl() queue: List[Any] = [(root, 0)] if root else [] while queue: node, depth = queue.pop(0) if depth > 2: continue try: name = node.Name except Exception: name = None if _title_match(name): return _ascend_to_top(node) try: children = list(node.GetChildren()) except Exception: children = [] for child in children: queue.append((child, depth + 1)) return None def _find_control(root: auto.Control, locator: Dict[str, Any], timeout: float) -> Optional[auto.Control]: """Find a control under root according to locator.""" start = time.time() try: print( f"[debug] 查找控件 locator={locator} root=({getattr(root, 'Name', None)}, {getattr(root, 'ClassName', None)}, {getattr(root, 'ControlTypeName', None)})" ) except Exception: pass def _matches(ctrl: auto.Control) -> bool: """Simple property match without relying on uiautomation AndCondition.""" try: name_val = locator.get("Name") name_contains = locator.get("Name__contains") class_val = locator.get("ClassName") ctrl_type_val = locator.get("ControlType") auto_id_val = locator.get("AutomationId") if name_val and ctrl.Name != name_val: return False if name_contains: cur = "" if ctrl.Name is None else str(ctrl.Name) pat = str(name_contains) if pat not in cur and cur not in pat: return False if class_val and ctrl.ClassName != class_val: if not name_contains: return False if ctrl_type_val and ctrl.ControlTypeName != ctrl_type_val: # 当使用标题包含匹配时,容忍控件类型差异(不同 Chrome 版本可能是 PaneControl) if not name_contains: return False if auto_id_val and ctrl.AutomationId != auto_id_val: return False return True except Exception: return False while time.time() - start <= timeout: try: if not locator: print("10001") return root # Check root itself first if _matches(root): print("10002") return root # Simple BFS when AndCondition is unavailable queue: List[Any] = [(root, 0)] while queue: node, depth = queue.pop(0) if depth >= 15: # Chrome 控件层级较深,放宽搜索深度 continue try: children = list(node.GetChildren()) except Exception: children = [] for child in children: if _matches(child): return child queue.append((child, depth + 1)) except Exception as exc: print(f"[warn] find control error: {exc}") time.sleep(0.2) # 额外兜底:在全局根下再搜一次(只在超时后触发) try: sys_root = auto.GetRootControl() queue: List[Any] = [(sys_root, 0)] if sys_root else [] while queue: node, depth = queue.pop(0) if depth >= 20: continue if _matches(node): return node try: children = list(node.GetChildren()) except Exception: children = [] for ch in children: queue.append((ch, depth + 1)) except Exception: pass # 再次兜底:如果按 name/name_contains 未命中,尝试忽略 ClassName/ControlType 放宽匹配 try: relaxed = dict(locator) relaxed.pop("ClassName", None) relaxed.pop("ControlType", None) if relaxed.get("Name") or relaxed.get("Name__contains"): print("[debug] 放宽匹配,仅按名称再次查找") return _find_control(root, relaxed, max(timeout, 0.5)) except Exception: pass return None def _capture_screenshot(ctrl: Optional[auto.Control], out_path: Path) -> Optional[Path]: """截取控件区域或全屏""" try: with mss.mss() as sct: if ctrl and getattr(ctrl, "BoundingRectangle", None): rect = ctrl.BoundingRectangle region = {"left": int(rect.left), "top": int(rect.top), "width": int(rect.right - rect.left), "height": int(rect.bottom - rect.top)} else: monitor = sct.monitors[1] if len(sct.monitors) > 1 else sct.monitors[0] region = {"left": monitor["left"], "top": monitor["top"], "width": monitor["width"], "height": monitor["height"]} shot = np.array(sct.grab(region)) frame = cv2.cvtColor(shot, cv2.COLOR_BGRA2BGR) out_path.parent.mkdir(parents=True, exist_ok=True) cv2.imwrite(str(out_path), frame) return out_path except Exception: return None def _capture_tree(ctrl: Optional[auto.Control], max_depth: int = 3) -> List[Dict[str, Any]]: """采集浅层 UIA 树摘要""" if ctrl is None: return [] nodes: List[Dict[str, Any]] = [] queue: List[Any] = [(ctrl, 0)] while queue: node, depth = queue.pop(0) if depth > max_depth: continue nodes.append( { "name": node.Name, "automation_id": node.AutomationId, "class_name": node.ClassName, "control_type": node.ControlTypeName, "depth": depth, } ) try: children = list(node.GetChildren()) except Exception: children = [] for child in children: queue.append((child, depth + 1)) return nodes def _save_tree(ctrl: Optional[auto.Control], out_path: Path) -> Optional[Path]: try: data = _capture_tree(ctrl) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") return out_path except Exception: return None def _image_similarity(full_img_path: Path, template_path: Path, threshold: float = 0.8) -> bool: """简单模板匹配,相似度 >= 阈值视为通过""" if not full_img_path.exists() or not template_path.exists(): return False full = cv2.imread(str(full_img_path), cv2.IMREAD_COLOR) tmpl = cv2.imread(str(template_path), cv2.IMREAD_COLOR) if full is None or tmpl is None or full.shape[0] < tmpl.shape[0] or full.shape[1] < tmpl.shape[1]: return False res = cv2.matchTemplate(full, tmpl, cv2.TM_CCOEFF_NORMED) _, max_val, _, _ = cv2.minMaxLoc(res) return float(max_val) >= threshold def _visual_check(expected: Dict[str, Any], ctrl: Optional[auto.Control], artifacts_dir: Path, step_idx: int, attempt: int) -> bool: """执行可选视觉校验:模板匹配""" template_path = expected.get("template_path") threshold = float(expected.get("threshold", 0.8)) if not template_path: return True snap_path = artifacts_dir / "screenshots" / f"step{step_idx:03d}_attempt{attempt}_visual.png" snap = _capture_screenshot(ctrl, snap_path) if not snap: return False return _image_similarity(snap, Path(template_path), threshold) def _log_event(log_path: Path, record: Dict[str, Any]) -> None: log_path.parent.mkdir(parents=True, exist_ok=True) with log_path.open("a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False)) f.write("\n") def _render_value(val: Any, params: Dict[str, Any]) -> Any: """简单占位符替换 ${param}""" if isinstance(val, str): out = val for k, v in params.items(): placeholder = f"${{{k}}}" if placeholder in out: out = out.replace(placeholder, str(v)) return out if isinstance(val, dict): return {k: _render_value(v, params) for k, v in val.items()} if isinstance(val, list): return [_render_value(v, params) for v in val] return val def _do_action(ctrl: auto.Control, step: Dict[str, Any], dry_run: bool) -> None: """执行单步动作""" action = step.get("action") text = step.get("text", "") send_enter = bool(step.get("send_enter")) if dry_run: extra = " +Enter" if send_enter else "" print(f"[dry-run] {action} -> target={step.get('target')} text={text}{extra}") return if action == "click": ctrl.Click() elif action == "type": ctrl.SetFocus() to_send = text + ("{Enter}" if send_enter else "") auto.SendKeys(to_send) elif action == "set_value": try: ctrl.GetValuePattern().SetValue(text) except Exception: ctrl.SendKeys(text) elif action == "assert_exists": if ctrl is None: raise RuntimeError("控件未找到") elif action == "wait_for": time.sleep(float(step.get("waits", {}).get("appear", 1.0))) def execute_spec(spec: DSLSpec, ctx: ExecContext) -> None: """执行完整 DSL。 流程概览: 1. 先根据 allow_title 找到当前前台窗口作为根控件 root。 2. 逐步标准化 DSL:字段兼容、文本替换、等待策略等。 3. 对每个步骤依次查找目标控件 -> 视觉校验(可选)-> 执行动作/记录 dry-run。 4. 每次尝试都会落盘截图、UI 树和日志,方便回溯。""" # 给前台窗口切换预留时间,避免刚启动命令时窗口还未聚焦 time.sleep(1.0) root = _match_window(ctx.allow_title) if root is None: raise RuntimeError(f"前台窗口标题未匹配白名单: {ctx.allow_title}") if ctx.dry_run: try: print(f"[debug] root -> name={root.Name} class={root.ClassName} type={root.ControlTypeName}") except Exception: pass artifacts = ctx.artifacts_dir screenshots_dir = artifacts / "screenshots" trees_dir = artifacts / "ui_trees" log_path = artifacts / "executor_log.jsonl" def _normalize_target(tgt: Dict[str, Any]) -> Dict[str, Any]: """规范 target 键名,兼容窗口标题匹配/包含等写法""" norm: Dict[str, Any] = {} for k, v in tgt.items(): lk = k.lower() if lk in ("name", "window_title"): if lk == "window_title" and isinstance(v, str) and " - " in v: norm["Name__contains"] = v else: norm["Name"] = v norm["Name__contains"] = v elif lk in ("window_title_contains", "name_contains"): norm["Name__contains"] = v elif lk == "window_title_contains_param": norm["Name__contains"] = spec.params.get(str(v), v) elif lk in ("classname", "class_name"): norm["ClassName"] = v elif lk in ("controltype", "control_type"): norm["ControlType"] = v elif lk == "automationid": norm["AutomationId"] = v else: norm[k] = v return norm def normalize_step(step: Dict[str, Any]) -> Dict[str, Any]: """归一化字段,兼容不同 DSL 变体""" out = _render_value(dict(step), spec.params) if "target" not in out and "selector" in out: out["target"] = out.get("selector") out.pop("selector", None) if "value" in out and "text" not in out: out["text"] = out.get("value") out.pop("value", None) if "text" not in out and out.get("text_param"): key = str(out.pop("text_param")) out["text"] = str(spec.params.get(key, "")) tgt = out.get("target") if isinstance(tgt, dict): out["target"] = _normalize_target(tgt) waits_obj = out.get("waits") if isinstance(waits_obj, list): appear = None for w in waits_obj: if isinstance(w, dict) and "timeout_ms" in w: appear = float(w.get("timeout_ms", 0)) / 1000.0 break out["waits"] = {"appear": appear or spec.waits.get("appear", 5.0), "disappear": spec.waits.get("disappear", 1.0)} elif isinstance(waits_obj, dict): waits_obj = dict(waits_obj) if "timeout_ms" in waits_obj and "appear" not in waits_obj: waits_obj["appear"] = float(waits_obj.pop("timeout_ms")) / 1000.0 out["waits"] = waits_obj else: out["waits"] = spec.waits if "timeout_ms" in out: out.setdefault("waits", {}) out["waits"]["appear"] = float(out.pop("timeout_ms")) / 1000.0 return out def normalize_steps(steps: List[Any]) -> List[Any]: normed: List[Any] = [] for st in steps: if isinstance(st, dict): st = normalize_step(st) if "steps" in st and isinstance(st["steps"], list): st["steps"] = normalize_steps(st["steps"]) if "else_steps" in st and isinstance(st["else_steps"], list): st["else_steps"] = normalize_steps(st["else_steps"]) normed.append(st) return normed normalized_steps = normalize_steps(spec.steps) def run_steps(steps: List[Any]) -> None: """按顺序执行步骤,支持 for_each/if_condition 嵌套。""" for idx, step in enumerate(steps, start=1): if "for_each" in step: # for_each:根据参数数组展开子步骤 iterable = spec.params.get(step["for_each"], []) for item in iterable: run_steps(step.get("steps", [])) elif "if_condition" in step: # if_condition:依据参数布尔值选择分支 cond = step["if_condition"] if spec.params.get(cond): run_steps(step.get("steps", [])) else: run_steps(step.get("else_steps", [])) else: # 普通步骤:查找控件 -> 视觉校验(可选) -> 执行动作 target = step.get("target", {}) timeout = float(step.get("waits", {}).get("appear", spec.waits.get("appear", 1.0))) if ctx.dry_run: timeout = min(timeout, 1) # 纯 dry-run 场景快速返回,避免长时间等待 retry = step.get("retry_policy", spec.retry_policy) attempts = int(retry.get("max_attempts", 1)) interval = float(retry.get("interval", 1.0)) expected = step.get("expected_screen") or {} last_err: Optional[Exception] = None for attempt in range(1, attempts + 1): ctrl = _find_control(root, target, timeout) try: if ctrl is None and ctx.dry_run: ctrl = root if ctrl is None: raise RuntimeError("控件未找到") # 视觉校验(可选) if expected: ok = _visual_check(expected, ctrl, artifacts, idx, attempt) if not ok: raise RuntimeError("视觉校验未通过") _do_action(ctrl, step, ctx.dry_run) snap_path = _capture_screenshot(ctrl, screenshots_dir / f"step{idx:03d}_attempt{attempt}_success.png") tree_path = _save_tree(ctrl, trees_dir / f"step{idx:03d}_attempt{attempt}_tree.json") _log_event( log_path, { "ts": time.time(), "step_index": idx, "action": step.get("action"), "target": target, "attempt": attempt, "result": "success", "screenshot": str(snap_path) if snap_path else None, "tree": str(tree_path) if tree_path else None, }, ) last_err = None break except Exception as e: # noqa: BLE001 last_err = e snap_path = _capture_screenshot(ctrl, screenshots_dir / f"step{idx:03d}_attempt{attempt}_fail.png") tree_path = _save_tree(ctrl, trees_dir / f"step{idx:03d}_attempt{attempt}_tree.json") _log_event( log_path, { "ts": time.time(), "step_index": idx, "action": step.get("action"), "target": target, "attempt": attempt, "result": "fail", "error": str(e), "screenshot": str(snap_path) if snap_path else None, "tree": str(tree_path) if tree_path else None, }, ) if attempt < attempts: time.sleep(interval) if last_err: raise last_err run_steps(normalized_steps)