63 lines
1.8 KiB
Python
63 lines
1.8 KiB
Python
from collections import deque
|
|
from typing import Optional
|
|
import numpy as np
|
|
|
|
|
|
class BaseDetector:
|
|
"""
|
|
检测器基类
|
|
提供通用的帧回溯缓存功能,子类可按需使用
|
|
"""
|
|
|
|
def __init__(self):
|
|
# 帧回溯缓存(子类需要时调用 init_frame_buffer 初始化)
|
|
self._frame_buffer: Optional[deque] = None
|
|
|
|
def init_frame_buffer(self, buffer_seconds: float, fps: float):
|
|
"""
|
|
初始化帧回溯缓存队列
|
|
|
|
Args:
|
|
buffer_seconds: 需要缓存的时间长度(秒)
|
|
fps: 视频帧率
|
|
"""
|
|
maxlen = int(buffer_seconds * fps)
|
|
self._frame_buffer = deque(maxlen=maxlen)
|
|
|
|
def append_frame(self, frame: np.ndarray, timestamp: float):
|
|
"""
|
|
将当前帧入队缓存
|
|
|
|
Args:
|
|
frame: 当前帧图像
|
|
timestamp: 当前帧的时间戳
|
|
"""
|
|
if self._frame_buffer is not None:
|
|
self._frame_buffer.append({
|
|
'timestamp': timestamp,
|
|
'frame': frame.copy(),
|
|
})
|
|
|
|
def find_target_frame(self, target_time_sec: float) -> Optional[np.ndarray]:
|
|
"""
|
|
在帧缓存中找到最接近目标时间的帧
|
|
|
|
Args:
|
|
target_time_sec: 目标时间戳
|
|
|
|
Returns:
|
|
最接近目标时间的帧图像,缓存为空则返回 None
|
|
"""
|
|
if self._frame_buffer is None or len(self._frame_buffer) == 0:
|
|
return None
|
|
|
|
target_frame = None
|
|
min_time_diff = float('inf')
|
|
for buffered in self._frame_buffer:
|
|
time_diff = abs(buffered['timestamp'] - target_time_sec)
|
|
if time_diff < min_time_diff:
|
|
min_time_diff = time_diff
|
|
target_frame = buffered['frame']
|
|
|
|
return target_frame
|