156 lines
4.7 KiB
Python
156 lines
4.7 KiB
Python
|
|
# MIT License
|
||
|
|
# Copyright (c) 2024
|
||
|
|
"""Screen recording helper with ffmpeg primary and mss+cv2 fallback."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import shutil
|
||
|
|
import subprocess
|
||
|
|
import threading
|
||
|
|
import time
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Dict, Optional
|
||
|
|
|
||
|
|
import cv2 # type: ignore
|
||
|
|
import mss # type: ignore
|
||
|
|
import numpy as np # type: ignore
|
||
|
|
|
||
|
|
|
||
|
|
class ScreenRecorder:
|
||
|
|
"""Record the screen to an MP4 file."""
|
||
|
|
|
||
|
|
def __init__(self, output_path: Path, fps: int = 12, screen: int = 0) -> None:
|
||
|
|
self.output_path = output_path
|
||
|
|
self.fps = fps
|
||
|
|
self.screen = screen
|
||
|
|
|
||
|
|
self._proc: Optional[subprocess.Popen] = None
|
||
|
|
self._thread: Optional[threading.Thread] = None
|
||
|
|
self._stop_event = threading.Event()
|
||
|
|
self._monitor: Optional[Dict[str, int]] = None
|
||
|
|
self._writer: Optional[cv2.VideoWriter] = None
|
||
|
|
|
||
|
|
@property
|
||
|
|
def monitor(self) -> Optional[Dict[str, int]]:
|
||
|
|
return self._monitor
|
||
|
|
|
||
|
|
def start(self) -> None:
|
||
|
|
"""Start recording using ffmpeg if available, otherwise mss+cv2."""
|
||
|
|
self.output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
if self._start_ffmpeg():
|
||
|
|
return
|
||
|
|
self._start_mss_fallback()
|
||
|
|
|
||
|
|
def stop(self) -> None:
|
||
|
|
"""Stop recording gracefully."""
|
||
|
|
self._stop_event.set()
|
||
|
|
if self._proc:
|
||
|
|
try:
|
||
|
|
if self._proc.stdin:
|
||
|
|
self._proc.stdin.write(b"q")
|
||
|
|
self._proc.stdin.flush()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
try:
|
||
|
|
self._proc.wait(timeout=5)
|
||
|
|
except Exception:
|
||
|
|
self._proc.kill()
|
||
|
|
self._proc = None
|
||
|
|
if self._thread and self._thread.is_alive():
|
||
|
|
self._thread.join(timeout=5)
|
||
|
|
self._thread = None
|
||
|
|
if self._writer:
|
||
|
|
self._writer.release()
|
||
|
|
self._writer = None
|
||
|
|
|
||
|
|
def _start_ffmpeg(self) -> bool:
|
||
|
|
if shutil.which("ffmpeg") is None:
|
||
|
|
return False
|
||
|
|
|
||
|
|
with mss.mss() as sct:
|
||
|
|
monitors = sct.monitors
|
||
|
|
if 0 <= self.screen < len(monitors):
|
||
|
|
self._monitor = monitors[self.screen]
|
||
|
|
else:
|
||
|
|
self._monitor = monitors[0]
|
||
|
|
|
||
|
|
width = int(self._monitor["width"])
|
||
|
|
height = int(self._monitor["height"])
|
||
|
|
offset_x = int(self._monitor["left"])
|
||
|
|
offset_y = int(self._monitor["top"])
|
||
|
|
|
||
|
|
cmd = [
|
||
|
|
"ffmpeg",
|
||
|
|
"-y",
|
||
|
|
"-f",
|
||
|
|
"gdigrab",
|
||
|
|
"-framerate",
|
||
|
|
str(self.fps),
|
||
|
|
"-offset_x",
|
||
|
|
str(offset_x),
|
||
|
|
"-offset_y",
|
||
|
|
str(offset_y),
|
||
|
|
"-video_size",
|
||
|
|
f"{width}x{height}",
|
||
|
|
"-draw_mouse",
|
||
|
|
"1",
|
||
|
|
"-i",
|
||
|
|
"desktop",
|
||
|
|
"-pix_fmt",
|
||
|
|
"yuv420p",
|
||
|
|
"-vcodec",
|
||
|
|
"libx264",
|
||
|
|
"-preset",
|
||
|
|
"ultrafast",
|
||
|
|
str(self.output_path),
|
||
|
|
]
|
||
|
|
|
||
|
|
creation_flags = subprocess.CREATE_NO_WINDOW if hasattr(subprocess, "CREATE_NO_WINDOW") else 0
|
||
|
|
try:
|
||
|
|
self._proc = subprocess.Popen(
|
||
|
|
cmd,
|
||
|
|
stdin=subprocess.PIPE,
|
||
|
|
stdout=subprocess.DEVNULL,
|
||
|
|
stderr=subprocess.DEVNULL,
|
||
|
|
creationflags=creation_flags,
|
||
|
|
)
|
||
|
|
return True
|
||
|
|
except Exception:
|
||
|
|
self._proc = None
|
||
|
|
return False
|
||
|
|
|
||
|
|
def _start_mss_fallback(self) -> None:
|
||
|
|
self._stop_event.clear()
|
||
|
|
self._thread = threading.Thread(target=self._capture_loop, daemon=True)
|
||
|
|
self._thread.start()
|
||
|
|
|
||
|
|
def _capture_loop(self) -> None:
|
||
|
|
with mss.mss() as sct:
|
||
|
|
monitors = sct.monitors
|
||
|
|
if 0 <= self.screen < len(monitors):
|
||
|
|
self._monitor = monitors[self.screen]
|
||
|
|
else:
|
||
|
|
self._monitor = monitors[0]
|
||
|
|
|
||
|
|
width = int(self._monitor["width"])
|
||
|
|
height = int(self._monitor["height"])
|
||
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
||
|
|
self._writer = cv2.VideoWriter(str(self.output_path), fourcc, self.fps, (width, height))
|
||
|
|
|
||
|
|
frame_interval = 1.0 / max(self.fps, 1)
|
||
|
|
next_ts = time.perf_counter()
|
||
|
|
|
||
|
|
while not self._stop_event.is_set():
|
||
|
|
shot = np.array(sct.grab(self._monitor))
|
||
|
|
frame = cv2.cvtColor(shot, cv2.COLOR_BGRA2BGR)
|
||
|
|
self._writer.write(frame)
|
||
|
|
|
||
|
|
next_ts += frame_interval
|
||
|
|
sleep_for = max(0.0, next_ts - time.perf_counter())
|
||
|
|
if sleep_for:
|
||
|
|
time.sleep(sleep_for)
|
||
|
|
|
||
|
|
if self._writer:
|
||
|
|
self._writer.release()
|
||
|
|
self._writer = None
|