新增base_detector,将帧回溯放入其中
This commit is contained in:
62
biz/base_detector.py
Normal file
62
biz/base_detector.py
Normal file
@@ -0,0 +1,62 @@
|
||||
from collections import deque
|
||||
from typing import Optional
|
||||
import numpy as np
|
||||
|
||||
|
||||
class BaseDetector:
|
||||
"""
|
||||
检测器基类
|
||||
提供通用的帧回溯缓存功能,子类可按需使用
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# 帧回溯缓存(子类需要时调用 init_frame_buffer 初始化)
|
||||
self._frame_buffer: Optional[deque] = None
|
||||
|
||||
def init_frame_buffer(self, buffer_seconds: float, fps: float):
|
||||
"""
|
||||
初始化帧回溯缓存队列
|
||||
|
||||
Args:
|
||||
buffer_seconds: 需要缓存的时间长度(秒)
|
||||
fps: 视频帧率
|
||||
"""
|
||||
maxlen = int(buffer_seconds * fps)
|
||||
self._frame_buffer = deque(maxlen=maxlen)
|
||||
|
||||
def append_frame(self, frame: np.ndarray, timestamp: float):
|
||||
"""
|
||||
将当前帧入队缓存
|
||||
|
||||
Args:
|
||||
frame: 当前帧图像
|
||||
timestamp: 当前帧的时间戳
|
||||
"""
|
||||
if self._frame_buffer is not None:
|
||||
self._frame_buffer.append({
|
||||
'timestamp': timestamp,
|
||||
'frame': frame.copy(),
|
||||
})
|
||||
|
||||
def find_target_frame(self, target_time_sec: float) -> Optional[np.ndarray]:
|
||||
"""
|
||||
在帧缓存中找到最接近目标时间的帧
|
||||
|
||||
Args:
|
||||
target_time_sec: 目标时间戳
|
||||
|
||||
Returns:
|
||||
最接近目标时间的帧图像,缓存为空则返回 None
|
||||
"""
|
||||
if self._frame_buffer is None or len(self._frame_buffer) == 0:
|
||||
return None
|
||||
|
||||
target_frame = None
|
||||
min_time_diff = float('inf')
|
||||
for buffered in self._frame_buffer:
|
||||
time_diff = abs(buffered['timestamp'] - target_time_sec)
|
||||
if time_diff < min_time_diff:
|
||||
min_time_diff = time_diff
|
||||
target_frame = buffered['frame']
|
||||
|
||||
return target_frame
|
||||
Reference in New Issue
Block a user