audoWin/autodemo/executor.py
2025-12-19 16:24:04 +08:00

126 lines
4.2 KiB
Python

# MIT License
# Copyright (c) 2024
"""执行层:根据 DSL 进行 UI 自动化。"""
import re
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import uiautomation as auto # type: ignore
from .schema import DSLSpec
@dataclass
class ExecContext:
"""执行上下文。"""
allow_title: str
dry_run: bool = False
def _match_window(allow_title: str) -> Optional[auto.Control]:
"""仅在窗口标题匹配白名单时返回前台窗口。"""
ctrl = auto.GetForegroundControl()
if ctrl is None:
return None
if ctrl.Name is None:
return None
if not re.search(allow_title, ctrl.Name):
return None
return ctrl
def _find_control(root: auto.Control, locator: Dict[str, Any], timeout: float) -> Optional[auto.Control]:
"""根据 locator 在 root 下查找控件。"""
start = time.time()
while time.time() - start <= timeout:
try:
conds = []
if "AutomationId" in locator:
conds.append(auto.Control.AutomationId == locator["AutomationId"])
if "Name" in locator:
conds.append(auto.Control.Name == locator["Name"])
if "ClassName" in locator:
conds.append(auto.Control.ClassName == locator["ClassName"])
if "ControlType" in locator:
conds.append(auto.Control.ControlTypeName == locator["ControlType"])
if conds:
ctrl = root.Control(searchDepth=4, condition=auto.AndCondition(*conds))
else:
ctrl = root
if ctrl:
return ctrl
except Exception:
pass
time.sleep(0.5)
return None
def _do_action(ctrl: auto.Control, step: Dict[str, Any], dry_run: bool) -> None:
"""执行单步动作。"""
action = step.get("action")
text = step.get("text", "")
if dry_run:
print(f"[dry-run] {action} -> target={step.get('target')} text={text}")
return
if action == "click":
ctrl.Click()
elif action == "type":
ctrl.SetFocus()
auto.SendKeys(text)
elif action == "set_value":
try:
ctrl.GetValuePattern().SetValue(text)
except Exception:
ctrl.SendKeys(text)
elif action == "assert_exists":
assert ctrl is not None, "控件未找到"
elif action == "wait_for":
# wait_for 仅等待存在
time.sleep(float(step.get("waits", {}).get("appear", 1.0)))
def execute_spec(spec: DSLSpec, ctx: ExecContext) -> None:
"""执行完整的 DSL。"""
root = _match_window(ctx.allow_title)
if root is None:
raise RuntimeError(f"前台窗口标题未匹配白名单: {ctx.allow_title}")
def run_steps(steps: List[Any]) -> None:
for step in steps:
if "for_each" in step:
# 简单遍历列表参数
iterable = spec.params.get(step["for_each"], [])
for item in iterable:
run_steps(step.get("steps", []))
elif "if_condition" in step:
cond = step["if_condition"]
if spec.params.get(cond):
run_steps(step.get("steps", []))
else:
run_steps(step.get("else_steps", []))
else:
target = step.get("target", {})
timeout = float(step.get("waits", {}).get("appear", spec.waits.get("appear", 5.0)))
retry = step.get("retry_policy", spec.retry_policy)
attempts = int(retry.get("max_attempts", 1))
interval = float(retry.get("interval", 1.0))
last_err: Optional[Exception] = None
for _ in range(attempts):
ctrl = _find_control(root, target, timeout)
try:
if ctrl is None:
raise RuntimeError("控件未找到")
_do_action(ctrl, step, ctx.dry_run)
last_err = None
break
except Exception as e: # noqa: BLE001
last_err = e
time.sleep(interval)
if last_err:
raise last_err
run_steps(spec.steps)