# MIT License # Copyright (c) 2024 """Screen recording helper with ffmpeg primary and mss+cv2 fallback.""" from __future__ import annotations import shutil import subprocess import threading import time from pathlib import Path from typing import Dict, Optional import cv2 # type: ignore import mss # type: ignore import numpy as np # type: ignore class ScreenRecorder: """Record the screen to an MP4 file.""" def __init__(self, output_path: Path, fps: int = 12, screen: int = 0) -> None: self.output_path = output_path self.fps = fps self.screen = screen self._proc: Optional[subprocess.Popen] = None self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._monitor: Optional[Dict[str, int]] = None self._writer: Optional[cv2.VideoWriter] = None @property def monitor(self) -> Optional[Dict[str, int]]: return self._monitor def start(self) -> None: """Start recording using ffmpeg if available, otherwise mss+cv2.""" self.output_path.parent.mkdir(parents=True, exist_ok=True) if self._start_ffmpeg(): return self._start_mss_fallback() def stop(self) -> None: """Stop recording gracefully.""" self._stop_event.set() if self._proc: try: if self._proc.stdin: self._proc.stdin.write(b"q") self._proc.stdin.flush() except Exception: pass try: self._proc.wait(timeout=5) except Exception: self._proc.kill() self._proc = None if self._thread and self._thread.is_alive(): self._thread.join(timeout=5) self._thread = None if self._writer: self._writer.release() self._writer = None def _start_ffmpeg(self) -> bool: if shutil.which("ffmpeg") is None: return False with mss.mss() as sct: monitors = sct.monitors if 0 <= self.screen < len(monitors): self._monitor = monitors[self.screen] else: self._monitor = monitors[0] width = int(self._monitor["width"]) height = int(self._monitor["height"]) offset_x = int(self._monitor["left"]) offset_y = int(self._monitor["top"]) cmd = [ "ffmpeg", "-y", "-f", "gdigrab", "-framerate", str(self.fps), "-offset_x", str(offset_x), "-offset_y", str(offset_y), "-video_size", f"{width}x{height}", "-draw_mouse", "1", "-i", "desktop", "-pix_fmt", "yuv420p", "-vcodec", "libx264", "-preset", "ultrafast", str(self.output_path), ] creation_flags = subprocess.CREATE_NO_WINDOW if hasattr(subprocess, "CREATE_NO_WINDOW") else 0 try: self._proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creation_flags, ) return True except Exception: self._proc = None return False def _start_mss_fallback(self) -> None: self._stop_event.clear() self._thread = threading.Thread(target=self._capture_loop, daemon=True) self._thread.start() def _capture_loop(self) -> None: with mss.mss() as sct: monitors = sct.monitors if 0 <= self.screen < len(monitors): self._monitor = monitors[self.screen] else: self._monitor = monitors[0] width = int(self._monitor["width"]) height = int(self._monitor["height"]) fourcc = cv2.VideoWriter_fourcc(*"mp4v") self._writer = cv2.VideoWriter(str(self.output_path), fourcc, self.fps, (width, height)) frame_interval = 1.0 / max(self.fps, 1) next_ts = time.perf_counter() while not self._stop_event.is_set(): shot = np.array(sct.grab(self._monitor)) frame = cv2.cvtColor(shot, cv2.COLOR_BGRA2BGR) self._writer.write(frame) next_ts += frame_interval sleep_for = max(0.0, next_ts - time.perf_counter()) if sleep_for: time.sleep(sleep_for) if self._writer: self._writer.release() self._writer = None