audoWin/autodemo/recorder.py
2025-12-19 16:24:04 +08:00

446 lines
16 KiB
Python

# MIT License
# Copyright (c) 2024
"""Multimodal recorder for Windows desktop sessions."""
from __future__ import annotations
import json
import threading
import time
import uuid
from pathlib import Path
from typing import List, Optional, Tuple
import cv2 # type: ignore
import numpy as np # type: ignore
import psutil # type: ignore
import uiautomation as auto # type: ignore
from pynput import keyboard, mouse
import mss # type: ignore
from .schema import (
EventRecord,
FramePaths,
MouseInfo,
Rect,
SessionManifest,
UISnapshot,
UITreeNode,
UISelector,
WindowInfo,
)
from .screen_recorder import ScreenRecorder
class Recorder:
"""Capture UI events, UIA context, screenshots, and screen video."""
def __init__(self, output_dir: Path, hotkey: str = "F9", fps: int = 12, screen: int = 0) -> None:
self.output_dir = output_dir
self.hotkey = hotkey
self.fps = fps
self.screen = screen
self.session_id = str(uuid.uuid4())
self.session_dir = self.output_dir / self.session_id
self.events_path = self.session_dir / "events.jsonl"
self.video_path = self.session_dir / "video.mp4"
self.frames_dir = self.session_dir / "frames"
self.frames_crops_dir = self.session_dir / "frames_crops"
self.ui_snapshots_dir = self.session_dir / "ui_snapshots"
self.events: List[EventRecord] = []
self._stop_event = threading.Event()
self._lock = threading.Lock()
self._text_buffer: List[str] = []
self._flush_timer: Optional[threading.Timer] = None
self._start_perf = 0.0
self._start_ts = 0.0
self._last_hwnd: Optional[int] = None
self._mouse_controller = mouse.Controller()
self._screen_recorder: Optional[ScreenRecorder] = None
self._window_thread: Optional[threading.Thread] = None
self._mouse_listener: Optional[mouse.Listener] = None
self._keyboard_listener: Optional[keyboard.Listener] = None
self._monitor: Optional[dict] = None
self._event_index = 0
self._uia_local = threading.local()
self._ensure_uia_initialized()
# Public API ---------------------------------------------------------
def start(self) -> Path:
"""Start recording until the hotkey is pressed."""
self.session_dir.mkdir(parents=True, exist_ok=True)
self.frames_dir.mkdir(parents=True, exist_ok=True)
self.frames_crops_dir.mkdir(parents=True, exist_ok=True)
self.ui_snapshots_dir.mkdir(parents=True, exist_ok=True)
self._start_perf = time.perf_counter()
self._start_ts = time.time()
with mss.mss() as sct:
monitors = sct.monitors
if 0 <= self.screen < len(monitors):
self._monitor = monitors[self.screen]
else:
self._monitor = monitors[0]
self._screen_recorder = ScreenRecorder(self.video_path, fps=self.fps, screen=self.screen)
self._screen_recorder.start()
self._window_thread = threading.Thread(target=self._watch_window, daemon=True)
self._window_thread.start()
self._mouse_listener = mouse.Listener(on_click=self._on_click)
self._keyboard_listener = keyboard.Listener(on_press=self._on_key_press)
self._mouse_listener.start()
self._keyboard_listener.start()
self._stop_event.wait()
self._flush_text_buffer()
self._shutdown()
return self.session_dir
# Event handlers -----------------------------------------------------
def _on_click(self, x: int, y: int, button: mouse.Button, pressed: bool) -> None:
if not pressed or self._stop_event.is_set():
return
window_info = self._get_window_info()
selector = self._hit_test(x, y)
mouse_info = MouseInfo(x=int(x), y=int(y), button=str(button).split(".")[-1], action="down")
self._record_event(
event_type="mouse_click",
mouse_info=mouse_info,
text=None,
uia_selector=selector,
window=window_info,
)
def _on_key_press(self, key: keyboard.Key | keyboard.KeyCode) -> Optional[bool]:
if self._is_hotkey(key):
self._stop_event.set()
return False
if self._stop_event.is_set():
return False
ch = self._key_to_char(key)
if ch is None:
return None
self._text_buffer.append(ch)
self._schedule_flush()
return None
# Background watchers ------------------------------------------------
def _watch_window(self, interval: float = 0.5) -> None:
while not self._stop_event.is_set():
info = self._get_window_info()
hwnd = info.hwnd if info else None
if hwnd and hwnd != self._last_hwnd:
self._last_hwnd = hwnd
selector = self._hit_test(*self._current_mouse_position())
self._record_event(
event_type="window_change",
mouse_info=self._current_mouse_info(),
text=None,
uia_selector=selector,
window=info,
)
time.sleep(interval)
# Recording helpers --------------------------------------------------
def _shutdown(self) -> None:
if self._flush_timer and self._flush_timer.is_alive():
self._flush_timer.cancel()
if self._mouse_listener:
self._mouse_listener.stop()
if self._keyboard_listener:
self._keyboard_listener.stop()
if self._window_thread and self._window_thread.is_alive():
self._window_thread.join(timeout=1.0)
if self._screen_recorder:
self._screen_recorder.stop()
self._write_events()
self._write_manifest()
def _schedule_flush(self) -> None:
if self._flush_timer and self._flush_timer.is_alive():
self._flush_timer.cancel()
self._flush_timer = threading.Timer(0.8, self._flush_text_buffer)
self._flush_timer.daemon = True
self._flush_timer.start()
def _flush_text_buffer(self) -> None:
if not self._text_buffer:
return
text = "".join(self._text_buffer)
self._text_buffer = []
mouse_info = self._current_mouse_info()
selector = None
if mouse_info:
selector = self._hit_test(mouse_info.x, mouse_info.y)
window_info = self._get_window_info()
self._record_event(
event_type="text_input",
mouse_info=mouse_info,
text=text,
uia_selector=selector,
window=window_info,
)
def _record_event(
self,
event_type: str,
mouse_info: Optional[MouseInfo],
text: Optional[str],
uia_selector: Optional[UISelector],
window: Optional[WindowInfo],
) -> None:
self._event_index += 1
ts = time.time()
offset_ms = int((time.perf_counter() - self._start_perf) * 1000)
frame_paths = self._capture_frame(event_type, self._event_index, mouse_info, uia_selector, window)
ui_snapshot_path = self._save_ui_snapshot(self._event_index, uia_selector)
record = EventRecord(
ts=ts,
event_type=event_type,
window=window,
mouse=mouse_info,
text=text,
uia=uia_selector,
frame_paths=frame_paths,
video_time_offset_ms=offset_ms,
ui_snapshot=ui_snapshot_path,
)
with self._lock:
self.events.append(record)
def _capture_frame(
self,
tag: str,
event_index: int,
mouse_info: Optional[MouseInfo],
uia_selector: Optional[UISelector],
window: Optional[WindowInfo],
) -> Optional[FramePaths]:
if not self._monitor:
return None
region = self._monitor_region(window)
with mss.mss() as sct:
shot = np.array(sct.grab(region))
frame = cv2.cvtColor(shot, cv2.COLOR_BGRA2BGR)
full_path = self.frames_dir / f"frame_{event_index:05d}_{tag}.png"
cv2.imwrite(str(full_path), frame)
crop_mouse_path = None
crop_element_path = None
if mouse_info:
crop_mouse_path = self._save_mouse_crop(frame, region, mouse_info, event_index)
if uia_selector and uia_selector.bounding_rect:
crop_element_path = self._save_element_crop(frame, region, uia_selector.bounding_rect, event_index)
return FramePaths(
full=str(full_path),
crop_mouse=str(crop_mouse_path) if crop_mouse_path else None,
crop_element=str(crop_element_path) if crop_element_path else None,
)
def _save_mouse_crop(self, frame: np.ndarray, region: dict, mouse_info: MouseInfo, event_index: int) -> Optional[Path]:
width, height = frame.shape[1], frame.shape[0]
center_x = int(mouse_info.x - region["left"])
center_y = int(mouse_info.y - region["top"])
crop_w, crop_h = 400, 300
x0 = max(0, center_x - crop_w // 2)
y0 = max(0, center_y - crop_h // 2)
x1 = min(width, x0 + crop_w)
y1 = min(height, y0 + crop_h)
if x1 <= x0 or y1 <= y0:
return None
crop = frame[y0:y1, x0:x1]
path = self.frames_crops_dir / f"frame_{event_index:05d}_mouse.png"
cv2.imwrite(str(path), crop)
return path
def _save_element_crop(self, frame: np.ndarray, region: dict, rect: Rect, event_index: int) -> Optional[Path]:
width, height = frame.shape[1], frame.shape[0]
x0 = max(0, int(rect.left - region["left"]))
y0 = max(0, int(rect.top - region["top"]))
x1 = min(width, int(rect.right - region["left"]))
y1 = min(height, int(rect.bottom - region["top"]))
if x1 <= x0 or y1 <= y0:
return None
crop = frame[y0:y1, x0:x1]
path = self.frames_crops_dir / f"frame_{event_index:05d}_element.png"
cv2.imwrite(str(path), crop)
return path
def _monitor_region(self, window: Optional[WindowInfo]) -> dict:
if window and window.rect and window.rect.width > 0 and window.rect.height > 0:
return {
"left": int(window.rect.left),
"top": int(window.rect.top),
"width": int(window.rect.width),
"height": int(window.rect.height),
}
return {
"left": int(self._monitor["left"]),
"top": int(self._monitor["top"]),
"width": int(self._monitor["width"]),
"height": int(self._monitor["height"]),
}
def _save_ui_snapshot(self, event_index: int, selector: Optional[UISelector]) -> Optional[str]:
tree = self._capture_tree(max_depth=3)
if not tree and selector is None:
return None
path = self.ui_snapshots_dir / f"ui_{event_index:05d}.json"
snapshot = UISnapshot(selector=selector, tree=tree)
with path.open("w", encoding="utf-8") as f:
json.dump(snapshot.dict(exclude_none=True), f, ensure_ascii=False)
return str(path)
# UI helpers ---------------------------------------------------------
def _capture_tree(self, max_depth: int = 3) -> List[UITreeNode]:
self._ensure_uia_initialized()
root = auto.GetForegroundControl()
if root is None:
return []
nodes: List[UITreeNode] = []
queue: List[Tuple[auto.Control, int]] = [(root, 0)] # type: ignore
while queue:
node, depth = queue.pop(0)
if depth > max_depth:
continue
nodes.append(
UITreeNode(
name=node.Name,
automation_id=node.AutomationId,
class_name=node.ClassName,
control_type=node.ControlTypeName,
depth=depth,
)
)
try:
children = list(node.GetChildren())
except Exception:
children = []
for child in children:
queue.append((child, depth + 1))
return nodes
def _hit_test(self, x: int, y: int) -> Optional[UISelector]:
try:
self._ensure_uia_initialized()
ctrl = auto.ControlFromPoint((int(x), int(y)))
except Exception:
ctrl = None
if not ctrl:
return None
return self._build_selector(ctrl)
def _get_window_info(self) -> Optional[WindowInfo]:
self._ensure_uia_initialized()
ctrl = auto.GetForegroundControl()
if ctrl is None:
return None
rect = getattr(ctrl, "BoundingRectangle", None)
self._ensure_uia_initialized()
rect_model = None
if rect:
rect_model = Rect(left=int(rect.left), top=int(rect.top), right=int(rect.right), bottom=int(rect.bottom))
process_name = None
try:
process_name = psutil.Process(ctrl.ProcessId).name()
except Exception:
process_name = None
hwnd = getattr(ctrl, "NativeWindowHandle", None) or getattr(ctrl, "Handle", None)
return WindowInfo(
hwnd=int(hwnd) if hwnd else None,
title=ctrl.Name,
process_name=process_name,
rect=rect_model,
)
def _build_selector(self, ctrl: auto.Control) -> UISelector: # type: ignore
rect = getattr(ctrl, "BoundingRectangle", None)
rect_model = None
if rect:
rect_model = Rect(left=int(rect.left), top=int(rect.top), right=int(rect.right), bottom=int(rect.bottom))
return UISelector(
automation_id=getattr(ctrl, "AutomationId", None),
name=getattr(ctrl, "Name", None),
class_name=getattr(ctrl, "ClassName", None),
control_type=getattr(ctrl, "ControlTypeName", None),
bounding_rect=rect_model,
)
# Utility ------------------------------------------------------------
def _key_to_char(self, key: keyboard.Key | keyboard.KeyCode) -> Optional[str]:
if isinstance(key, keyboard.KeyCode) and key.char:
return key.char
if key == keyboard.Key.space:
return " "
if key == keyboard.Key.enter:
return "\n"
if key == keyboard.Key.backspace:
if self._text_buffer:
self._text_buffer.pop()
return None
return None
def _is_hotkey(self, key: keyboard.Key | keyboard.KeyCode) -> bool:
target = self.hotkey.lower()
name = None
if isinstance(key, keyboard.Key):
name = (key.name or "").lower()
elif isinstance(key, keyboard.KeyCode):
name = (key.char or "").lower()
return name == target
def _current_mouse_position(self) -> Tuple[int, int]:
pos = self._mouse_controller.position
return int(pos[0]), int(pos[1])
def _current_mouse_info(self) -> Optional[MouseInfo]:
x, y = self._current_mouse_position()
return MouseInfo(x=int(x), y=int(y), button=None, action=None)
def _ensure_uia_initialized(self) -> None:
if getattr(self._uia_local, "token", None) is None:
self._uia_local.token = auto.UIAutomationInitializerInThread()
# Persistence --------------------------------------------------------
def _write_events(self) -> None:
with self.events_path.open("w", encoding="utf-8") as f:
for event in self.events:
f.write(json.dumps(event.dict(exclude_none=True), ensure_ascii=False))
f.write("\n")
def _write_manifest(self) -> None:
resolution = self._resolution()
manifest = SessionManifest(
session_id=self.session_id,
start_time=self._start_ts,
end_time=time.time(),
resolution=resolution,
fps=self.fps,
screen=self.screen,
video_path=str(self.video_path),
events_path=str(self.events_path),
frames_dir=str(self.frames_dir),
frames_crops_dir=str(self.frames_crops_dir),
ui_snapshots_dir=str(self.ui_snapshots_dir),
)
path = self.session_dir / "manifest.json"
with path.open("w", encoding="utf-8") as f:
json.dump(manifest.dict(exclude_none=True), f, ensure_ascii=False, indent=2)
def _resolution(self) -> str:
if self._monitor:
return f"{self._monitor['width']}x{self._monitor['height']}"
try:
width, height = auto.GetScreenSize()
return f"{width}x{height}"
except Exception:
return "unknown"