from collections import deque from typing import Optional import numpy as np class BaseDetector: """ 检测器基类 提供通用的帧回溯缓存功能,子类可按需使用 """ def __init__(self): # 帧回溯缓存(子类需要时调用 init_frame_buffer 初始化) self._frame_buffer: Optional[deque] = None def init_frame_buffer(self, buffer_seconds: float, fps: float): """ 初始化帧回溯缓存队列 Args: buffer_seconds: 需要缓存的时间长度(秒) fps: 视频帧率 """ maxlen = int(buffer_seconds * fps) self._frame_buffer = deque(maxlen=maxlen) def append_frame(self, frame: np.ndarray, timestamp: float): """ 将当前帧入队缓存 Args: frame: 当前帧图像 timestamp: 当前帧的时间戳 """ if self._frame_buffer is not None: self._frame_buffer.append({ 'timestamp': timestamp, 'frame': frame.copy(), }) def find_target_frame(self, target_time_sec: float) -> Optional[np.ndarray]: """ 在帧缓存中找到最接近目标时间的帧 Args: target_time_sec: 目标时间戳 Returns: 最接近目标时间的帧图像,缓存为空则返回 None """ if self._frame_buffer is None or len(self._frame_buffer) == 0: return None target_frame = None min_time_diff = float('inf') for buffered in self._frame_buffer: time_diff = abs(buffered['timestamp'] - target_time_sec) if time_diff < min_time_diff: min_time_diff = time_diff target_frame = buffered['frame'] return target_frame