# MIT License # Copyright (c) 2024 """Multimodal recorder for Windows desktop sessions.""" from __future__ import annotations import json import threading import time import uuid from pathlib import Path from typing import List, Optional, Tuple import cv2 # type: ignore import numpy as np # type: ignore import psutil # type: ignore import uiautomation as auto # type: ignore from pynput import keyboard, mouse import mss # type: ignore from .schema import ( EventRecord, FramePaths, MouseInfo, Rect, SessionManifest, UISnapshot, UITreeNode, UISelector, WindowInfo, ) from .screen_recorder import ScreenRecorder class Recorder: """Capture UI events, UIA context, screenshots, and screen video.""" def __init__(self, output_dir: Path, hotkey: str = "F9", fps: int = 12, screen: int = 0) -> None: self.output_dir = output_dir self.hotkey = hotkey self.fps = fps self.screen = screen self.session_id = str(uuid.uuid4()) self.session_dir = self.output_dir / self.session_id self.events_path = self.session_dir / "events.jsonl" self.video_path = self.session_dir / "video.mp4" self.frames_dir = self.session_dir / "frames" self.frames_crops_dir = self.session_dir / "frames_crops" self.ui_snapshots_dir = self.session_dir / "ui_snapshots" self.events: List[EventRecord] = [] self._stop_event = threading.Event() self._lock = threading.Lock() self._text_buffer: List[str] = [] self._flush_timer: Optional[threading.Timer] = None self._start_perf = 0.0 self._start_ts = 0.0 self._last_hwnd: Optional[int] = None self._mouse_controller = mouse.Controller() self._screen_recorder: Optional[ScreenRecorder] = None self._window_thread: Optional[threading.Thread] = None self._mouse_listener: Optional[mouse.Listener] = None self._keyboard_listener: Optional[keyboard.Listener] = None self._monitor: Optional[dict] = None self._event_index = 0 self._uia_local = threading.local() self._ensure_uia_initialized() # Public API --------------------------------------------------------- def start(self) -> Path: """Start recording until the hotkey is pressed.""" self.session_dir.mkdir(parents=True, exist_ok=True) self.frames_dir.mkdir(parents=True, exist_ok=True) self.frames_crops_dir.mkdir(parents=True, exist_ok=True) self.ui_snapshots_dir.mkdir(parents=True, exist_ok=True) self._start_perf = time.perf_counter() self._start_ts = time.time() with mss.mss() as sct: monitors = sct.monitors if 0 <= self.screen < len(monitors): self._monitor = monitors[self.screen] else: self._monitor = monitors[0] self._screen_recorder = ScreenRecorder(self.video_path, fps=self.fps, screen=self.screen) self._screen_recorder.start() self._window_thread = threading.Thread(target=self._watch_window, daemon=True) self._window_thread.start() self._mouse_listener = mouse.Listener(on_click=self._on_click) self._keyboard_listener = keyboard.Listener(on_press=self._on_key_press) self._mouse_listener.start() self._keyboard_listener.start() self._stop_event.wait() self._flush_text_buffer() self._shutdown() return self.session_dir # Event handlers ----------------------------------------------------- def _on_click(self, x: int, y: int, button: mouse.Button, pressed: bool) -> None: if not pressed or self._stop_event.is_set(): return window_info = self._get_window_info() selector = self._hit_test(x, y) mouse_info = MouseInfo(x=int(x), y=int(y), button=str(button).split(".")[-1], action="down") self._record_event( event_type="mouse_click", mouse_info=mouse_info, text=None, uia_selector=selector, window=window_info, ) def _on_key_press(self, key: keyboard.Key | keyboard.KeyCode) -> Optional[bool]: if self._is_hotkey(key): self._stop_event.set() return False if self._stop_event.is_set(): return False ch = self._key_to_char(key) if ch is None: return None self._text_buffer.append(ch) self._schedule_flush() return None # Background watchers ------------------------------------------------ def _watch_window(self, interval: float = 0.5) -> None: while not self._stop_event.is_set(): info = self._get_window_info() hwnd = info.hwnd if info else None if hwnd and hwnd != self._last_hwnd: self._last_hwnd = hwnd selector = self._hit_test(*self._current_mouse_position()) self._record_event( event_type="window_change", mouse_info=self._current_mouse_info(), text=None, uia_selector=selector, window=info, ) time.sleep(interval) # Recording helpers -------------------------------------------------- def _shutdown(self) -> None: if self._flush_timer and self._flush_timer.is_alive(): self._flush_timer.cancel() if self._mouse_listener: self._mouse_listener.stop() if self._keyboard_listener: self._keyboard_listener.stop() if self._window_thread and self._window_thread.is_alive(): self._window_thread.join(timeout=1.0) if self._screen_recorder: self._screen_recorder.stop() self._write_events() self._write_manifest() def _schedule_flush(self) -> None: if self._flush_timer and self._flush_timer.is_alive(): self._flush_timer.cancel() self._flush_timer = threading.Timer(0.8, self._flush_text_buffer) self._flush_timer.daemon = True self._flush_timer.start() def _flush_text_buffer(self) -> None: if not self._text_buffer: return text = "".join(self._text_buffer) self._text_buffer = [] mouse_info = self._current_mouse_info() selector = None if mouse_info: selector = self._hit_test(mouse_info.x, mouse_info.y) window_info = self._get_window_info() self._record_event( event_type="text_input", mouse_info=mouse_info, text=text, uia_selector=selector, window=window_info, ) def _record_event( self, event_type: str, mouse_info: Optional[MouseInfo], text: Optional[str], uia_selector: Optional[UISelector], window: Optional[WindowInfo], ) -> None: self._event_index += 1 ts = time.time() offset_ms = int((time.perf_counter() - self._start_perf) * 1000) frame_paths = self._capture_frame(event_type, self._event_index, mouse_info, uia_selector, window) ui_snapshot_path = self._save_ui_snapshot(self._event_index, uia_selector) record = EventRecord( ts=ts, event_type=event_type, window=window, mouse=mouse_info, text=text, uia=uia_selector, frame_paths=frame_paths, video_time_offset_ms=offset_ms, ui_snapshot=ui_snapshot_path, ) with self._lock: self.events.append(record) def _capture_frame( self, tag: str, event_index: int, mouse_info: Optional[MouseInfo], uia_selector: Optional[UISelector], window: Optional[WindowInfo], ) -> Optional[FramePaths]: if not self._monitor: return None region = self._monitor_region(window) with mss.mss() as sct: shot = np.array(sct.grab(region)) frame = cv2.cvtColor(shot, cv2.COLOR_BGRA2BGR) full_path = self.frames_dir / f"frame_{event_index:05d}_{tag}.png" cv2.imwrite(str(full_path), frame) crop_mouse_path = None crop_element_path = None if mouse_info: crop_mouse_path = self._save_mouse_crop(frame, region, mouse_info, event_index) if uia_selector and uia_selector.bounding_rect: crop_element_path = self._save_element_crop(frame, region, uia_selector.bounding_rect, event_index) return FramePaths( full=str(full_path), crop_mouse=str(crop_mouse_path) if crop_mouse_path else None, crop_element=str(crop_element_path) if crop_element_path else None, ) def _save_mouse_crop(self, frame: np.ndarray, region: dict, mouse_info: MouseInfo, event_index: int) -> Optional[Path]: width, height = frame.shape[1], frame.shape[0] center_x = int(mouse_info.x - region["left"]) center_y = int(mouse_info.y - region["top"]) crop_w, crop_h = 400, 300 x0 = max(0, center_x - crop_w // 2) y0 = max(0, center_y - crop_h // 2) x1 = min(width, x0 + crop_w) y1 = min(height, y0 + crop_h) if x1 <= x0 or y1 <= y0: return None crop = frame[y0:y1, x0:x1] path = self.frames_crops_dir / f"frame_{event_index:05d}_mouse.png" cv2.imwrite(str(path), crop) return path def _save_element_crop(self, frame: np.ndarray, region: dict, rect: Rect, event_index: int) -> Optional[Path]: width, height = frame.shape[1], frame.shape[0] x0 = max(0, int(rect.left - region["left"])) y0 = max(0, int(rect.top - region["top"])) x1 = min(width, int(rect.right - region["left"])) y1 = min(height, int(rect.bottom - region["top"])) if x1 <= x0 or y1 <= y0: return None crop = frame[y0:y1, x0:x1] path = self.frames_crops_dir / f"frame_{event_index:05d}_element.png" cv2.imwrite(str(path), crop) return path def _monitor_region(self, window: Optional[WindowInfo]) -> dict: if window and window.rect and window.rect.width > 0 and window.rect.height > 0: return { "left": int(window.rect.left), "top": int(window.rect.top), "width": int(window.rect.width), "height": int(window.rect.height), } return { "left": int(self._monitor["left"]), "top": int(self._monitor["top"]), "width": int(self._monitor["width"]), "height": int(self._monitor["height"]), } def _save_ui_snapshot(self, event_index: int, selector: Optional[UISelector]) -> Optional[str]: tree = self._capture_tree(max_depth=3) if not tree and selector is None: return None path = self.ui_snapshots_dir / f"ui_{event_index:05d}.json" snapshot = UISnapshot(selector=selector, tree=tree) with path.open("w", encoding="utf-8") as f: json.dump(snapshot.dict(exclude_none=True), f, ensure_ascii=False) return str(path) # UI helpers --------------------------------------------------------- def _capture_tree(self, max_depth: int = 3) -> List[UITreeNode]: self._ensure_uia_initialized() root = auto.GetForegroundControl() if root is None: return [] nodes: List[UITreeNode] = [] queue: List[Tuple[auto.Control, int]] = [(root, 0)] # type: ignore while queue: node, depth = queue.pop(0) if depth > max_depth: continue nodes.append( UITreeNode( name=node.Name, automation_id=node.AutomationId, class_name=node.ClassName, control_type=node.ControlTypeName, depth=depth, ) ) try: children = list(node.GetChildren()) except Exception: children = [] for child in children: queue.append((child, depth + 1)) return nodes def _hit_test(self, x: int, y: int) -> Optional[UISelector]: try: self._ensure_uia_initialized() ctrl = auto.ControlFromPoint((int(x), int(y))) except Exception: ctrl = None if not ctrl: return None return self._build_selector(ctrl) def _get_window_info(self) -> Optional[WindowInfo]: self._ensure_uia_initialized() ctrl = auto.GetForegroundControl() if ctrl is None: return None rect = getattr(ctrl, "BoundingRectangle", None) self._ensure_uia_initialized() rect_model = None if rect: rect_model = Rect(left=int(rect.left), top=int(rect.top), right=int(rect.right), bottom=int(rect.bottom)) process_name = None try: process_name = psutil.Process(ctrl.ProcessId).name() except Exception: process_name = None hwnd = getattr(ctrl, "NativeWindowHandle", None) or getattr(ctrl, "Handle", None) return WindowInfo( hwnd=int(hwnd) if hwnd else None, title=ctrl.Name, process_name=process_name, rect=rect_model, ) def _build_selector(self, ctrl: auto.Control) -> UISelector: # type: ignore rect = getattr(ctrl, "BoundingRectangle", None) rect_model = None if rect: rect_model = Rect(left=int(rect.left), top=int(rect.top), right=int(rect.right), bottom=int(rect.bottom)) return UISelector( automation_id=getattr(ctrl, "AutomationId", None), name=getattr(ctrl, "Name", None), class_name=getattr(ctrl, "ClassName", None), control_type=getattr(ctrl, "ControlTypeName", None), bounding_rect=rect_model, ) # Utility ------------------------------------------------------------ def _key_to_char(self, key: keyboard.Key | keyboard.KeyCode) -> Optional[str]: if isinstance(key, keyboard.KeyCode) and key.char: return key.char if key == keyboard.Key.space: return " " if key == keyboard.Key.enter: return "\n" if key == keyboard.Key.backspace: if self._text_buffer: self._text_buffer.pop() return None return None def _is_hotkey(self, key: keyboard.Key | keyboard.KeyCode) -> bool: target = self.hotkey.lower() name = None if isinstance(key, keyboard.Key): name = (key.name or "").lower() elif isinstance(key, keyboard.KeyCode): name = (key.char or "").lower() return name == target def _current_mouse_position(self) -> Tuple[int, int]: pos = self._mouse_controller.position return int(pos[0]), int(pos[1]) def _current_mouse_info(self) -> Optional[MouseInfo]: x, y = self._current_mouse_position() return MouseInfo(x=int(x), y=int(y), button=None, action=None) def _ensure_uia_initialized(self) -> None: if getattr(self._uia_local, "token", None) is None: self._uia_local.token = auto.UIAutomationInitializerInThread() # Persistence -------------------------------------------------------- def _write_events(self) -> None: with self.events_path.open("w", encoding="utf-8") as f: for event in self.events: f.write(json.dumps(event.dict(exclude_none=True), ensure_ascii=False)) f.write("\n") def _write_manifest(self) -> None: resolution = self._resolution() manifest = SessionManifest( session_id=self.session_id, start_time=self._start_ts, end_time=time.time(), resolution=resolution, fps=self.fps, screen=self.screen, video_path=str(self.video_path), events_path=str(self.events_path), frames_dir=str(self.frames_dir), frames_crops_dir=str(self.frames_crops_dir), ui_snapshots_dir=str(self.ui_snapshots_dir), ) path = self.session_dir / "manifest.json" with path.open("w", encoding="utf-8") as f: json.dump(manifest.dict(exclude_none=True), f, ensure_ascii=False, indent=2) def _resolution(self) -> str: if self._monitor: return f"{self._monitor['width']}x{self._monitor['height']}" try: width, height = auto.GetScreenSize() return f"{width}x{height}" except Exception: return "unknown"