From 80bc288a888d1926f81489921037ba825e0c4c34 Mon Sep 17 00:00:00 2001 From: zqc <835569504@qq.com> Date: Thu, 5 Mar 2026 14:41:49 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E8=AD=A6=E5=91=8A=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=E6=94=B9=E4=B8=BA=E4=BC=A0=E8=A7=86=E9=A2=91=EF=BC=8C?= =?UTF-8?q?=E5=B7=B2=E6=B5=8B=E8=AF=95=E4=BF=9D=E5=AD=98=E8=A7=86=E9=A2=91?= =?UTF-8?q?=EF=BC=8C=E6=9C=AA=E4=B8=8E=E6=8E=A5=E5=8F=A3=E8=81=94=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- biz/base_frame_processor.py | 245 +++++++++++++++++++++++++++++++++++- common/constants.py | 14 +++ config.yaml | 8 +- utils/hls_utils.py | 126 +++++++++++++++++++ 4 files changed, 386 insertions(+), 7 deletions(-) diff --git a/biz/base_frame_processor.py b/biz/base_frame_processor.py index 1fdef68..5e68a6f 100644 --- a/biz/base_frame_processor.py +++ b/biz/base_frame_processor.py @@ -4,6 +4,9 @@ import cv2 import base64 +import json +import os +import subprocess import time import threading import queue @@ -13,6 +16,7 @@ from concurrent.futures import ThreadPoolExecutor from common import constants from utils.logger import get_logger +from utils.hls_utils import get_segments_before_current, parse_segment_info logger = get_logger(__name__) @@ -67,6 +71,12 @@ class BaseFrameProcessorWorker(threading.Thread): max_workers=post_workers, thread_name_prefix="alert_post" ) + + # MP4缓存 {segment_path: mp4_path} + self._mp4_cache: Dict[str, str] = {} + + # 启动视频文件清理线程 + self._start_cleanup_thread() def _encode_image_to_base64(self, img) -> str: """图像编码为 Base64""" @@ -102,7 +112,7 @@ class BaseFrameProcessorWorker(threading.Thread): return result def _post_alert(self, msg: dict): - """异步发送告警 POST 请求(在线程池中执行)""" + """异步发送告警 POST 请求(在线程池中执行)- 旧接口,保留备用""" try: response = requests.post(constants.ALERT_PUSH_URL, json=msg, timeout=5.0) if response.status_code == 200: @@ -112,6 +122,218 @@ class BaseFrameProcessorWorker(threading.Thread): except Exception as e: print(f"[ERROR] POST alert request failed: {e}") + def _post_alert_with_video(self, msg: dict, video_path: str = None): + """ + 异步发送告警 POST 请求(带视频,multipart/form-data) + + Args: + msg: 消息内容 + video_path: 视频文件路径(可选) + """ + try: + if video_path and os.path.exists(video_path): + # 有视频,使用 multipart/form-data 上传 + with open(video_path, 'rb') as f: + files = { + 'video': f, + 'metadata': (None, json.dumps(msg)) + } + response = requests.post(constants.ALERT_PUSH_URL, files=files, timeout=10.0) + else: + # 无视频,也使用 multipart/form-data + files = { + 'metadata': (None, json.dumps(msg)) + } + response = requests.post(constants.ALERT_PUSH_URL, files=files, timeout=5.0) + + if response.status_code == 200: + logger.info(f"[INFO] POST alert sent successfully for actions: {msg.get('result_type')}") + else: + logger.warning(f"[WARN] POST alert failed with status: {response.status_code}") + except Exception as e: + logger.error(f"[ERROR] POST alert request failed: {e}") + + def _start_cleanup_thread(self): + """启动视频文件清理线程""" + def cleanup_loop(): + while not self.stop_event.is_set(): + try: + self._cleanup_expired_files() + except Exception as e: + logger.error(f"[ERROR] Cleanup thread error: {e}") + # 每10分钟检查一次 + self.stop_event.wait(600) + + thread = threading.Thread(target=cleanup_loop, daemon=True, name="video_cleanup") + thread.start() + logger.info("[INFO] Video cleanup thread started") + + def _cleanup_expired_files(self): + """清理过期的视频文件""" + output_dir = constants.VIDEO_CLIP_OUTPUT_DIR + if not output_dir or not os.path.exists(output_dir): + return + + retention_seconds = constants.VIDEO_CLIP_RETENTION_SECONDS + current_time = time.time() + + try: + for filename in os.listdir(output_dir): + if not filename.endswith('.mp4'): + continue + filepath = os.path.join(output_dir, filename) + if os.path.isfile(filepath): + file_mtime = os.path.getmtime(filepath) + if current_time - file_mtime > retention_seconds: + try: + os.remove(filepath) + logger.info(f"[INFO] Cleaned up expired video: {filename}") + except Exception as e: + logger.error(f"[ERROR] Failed to delete {filename}: {e}") + except Exception as e: + logger.error(f"[ERROR] Cleanup expired files error: {e}") + + def _create_or_get_video_clip(self, segment_path: str, segment_duration: float = None) -> str | None: + """ + 创建或获取视频剪辑 + + Args: + segment_path: 当前TS分片路径 + segment_duration: 当前分片时长(秒) + + Returns: + MP4文件路径,失败返回 None + """ + if not segment_path: + return None + + # 检查缓存 + if segment_path in self._mp4_cache: + cached_path = self._mp4_cache[segment_path] + if os.path.exists(cached_path): + return cached_path + else: + # 缓存失效,移除 + del self._mp4_cache[segment_path] + + # 解析分片信息,构建MP4路径 + camera_id, timestamp, seq = parse_segment_info(segment_path) + if not camera_id: + logger.warning(f"[WARN] Failed to parse segment info: {segment_path}") + return None + + output_dir = constants.VIDEO_CLIP_OUTPUT_DIR + if not output_dir: + logger.warning("[WARN] VIDEO_CLIP_OUTPUT_DIR not configured") + return None + + # 确保输出目录存在 + os.makedirs(output_dir, exist_ok=True) + + # MP4文件名 + mp4_filename = f"{camera_id}_{timestamp}_{seq}.mp4" + mp4_path = os.path.join(output_dir, mp4_filename) + + # 检查是否已存在 + if os.path.exists(mp4_path): + self._mp4_cache[segment_path] = mp4_path + return mp4_path + + # 计算需要回溯的分片数量 + clip_duration = constants.VIDEO_CLIP_DURATION_SECONDS + default_segment_duration = constants.VIDEO_CLIP_DEFAULT_SEGMENT_DURATION + + effective_duration = segment_duration if segment_duration else default_segment_duration + if effective_duration <= 0: + effective_duration = default_segment_duration + + n_segments = int(clip_duration / effective_duration) + 1 + + # 获取需要合并的分片 + ts_files = get_segments_before_current(segment_path, n_segments) + if not ts_files: + logger.warning(f"[WARN] No segments found for clip: {segment_path}") + return None + + # 合并TS为MP4 + if self._merge_ts_to_mp4(ts_files, mp4_path): + self._mp4_cache[segment_path] = mp4_path + return mp4_path + + return None + + def _merge_ts_to_mp4(self, ts_files: list, output_path: str) -> bool: + """ + 使用 ffmpeg 合并 TS 分片为 MP4 + + Args: + ts_files: TS文件路径列表(按时间顺序) + output_path: 输出MP4路径 + + Returns: + 是否成功 + """ + if not ts_files: + return False + + try: + # 构建 concat 字符串 + concat_str = "|".join(ts_files) + + # ffmpeg 命令 + cmd = [ + 'ffmpeg', + '-i', f'concat:{concat_str}', + '-c', 'copy', + '-y', # 覆盖输出文件 + output_path + ] + + # 执行命令 + result = subprocess.run( + cmd, + capture_output=True, + timeout=60 # 60秒超时 + ) + + if result.returncode == 0: + logger.info(f"[INFO] Created video clip: {output_path}") + return True + else: + logger.error(f"[ERROR] ffmpeg failed: {result.stderr.decode()}") + return False + + except subprocess.TimeoutExpired: + logger.error(f"[ERROR] ffmpeg timeout for {output_path}") + return False + except FileNotFoundError: + logger.error("[ERROR] ffmpeg not found, please install ffmpeg") + return False + except Exception as e: + logger.error(f"[ERROR] Failed to merge TS files: {e}") + return False + + def _process_alert_with_video(self, msg: dict, segment_path: str, segment_duration: float): + """ + 处理告警(含视频剪辑)- 在线程池中执行 + + Args: + msg: 基础消息 + segment_path: 当前TS分片路径 + segment_duration: 当前分片时长 + """ + # 尝试创建/获取视频剪辑 + mp4_path = None + if segment_path: + mp4_path = self._create_or_get_video_clip(segment_path, segment_duration) + + # 展开 result_type + expanded_msgs = self._expand_msg_by_result_type(msg) + + # 发送每个展开后的消息 + for expanded_msg in expanded_msgs: + self._post_alert_with_video(expanded_msg, mp4_path) + def _create_detector(self, params): """创建检测器实例""" # 使用 type(self) 访问类属性,避免 lambda 被绑定 self 参数 @@ -203,13 +425,24 @@ class BaseFrameProcessorWorker(threading.Thread): try: self.ws_queue.put(msg, timeout=1.0) if push_actions and len(push_actions) > 0: - # 异步发送 POST 请求(提交到线程池) + # 构建消息 post_msg = msg.copy() post_msg['type'] = self.POST_TYPE - # 展开 result_type 为多个独立的 msg - expanded_msgs = self._expand_msg_by_result_type(post_msg) - for expanded_msg in expanded_msgs: - self.post_executor.submit(self._post_alert, expanded_msg) + + #备用backup + #self.post_executor.submit(self._post_alert, post_msg) + + # 获取视频相关信息(仅HLS模式有) + segment_path = item.get("segment_path") + segment_duration = item.get("segment_duration") + + # 提交到线程池执行(包含视频剪辑和POST) + self.post_executor.submit( + self._process_alert_with_video, + post_msg, + segment_path, + segment_duration + ) except queue.Full: logger.warning("[WARN] ws_send_queue full, drop frame message") diff --git a/common/constants.py b/common/constants.py index 6ea183d..d75a3bf 100644 --- a/common/constants.py +++ b/common/constants.py @@ -9,6 +9,12 @@ HLS_ROOT_PATH = "" HLS_SEGMENT_PATTERN = "segment_%09d.ts" # TS文件命名模式 +# 视频剪辑配置 +VIDEO_CLIP_OUTPUT_DIR = "" +VIDEO_CLIP_DURATION_SECONDS = 30 +VIDEO_CLIP_RETENTION_SECONDS = 3600 +VIDEO_CLIP_DEFAULT_SEGMENT_DURATION = 2 + def init_config(config_path: str = "config.yaml"): """ @@ -18,6 +24,7 @@ def init_config(config_path: str = "config.yaml"): config_path: 配置文件路径,默认为 config.yaml """ global ALERT_PUSH_URL, HLS_ROOT_PATH + global VIDEO_CLIP_OUTPUT_DIR, VIDEO_CLIP_DURATION_SECONDS, VIDEO_CLIP_RETENTION_SECONDS, VIDEO_CLIP_DEFAULT_SEGMENT_DURATION try: with open(config_path, "r", encoding="utf-8") as f: @@ -25,6 +32,13 @@ def init_config(config_path: str = "config.yaml"): ALERT_PUSH_URL = cfg.get("alert_push_url", "") # HLS_ROOT_PATH = cfg.get("hls_root_path", "") + + # 视频剪辑配置 + VIDEO_CLIP_OUTPUT_DIR = cfg.get("video_clip_output_dir", "") + VIDEO_CLIP_DURATION_SECONDS = cfg.get("video_clip_duration_seconds", 30) + VIDEO_CLIP_RETENTION_SECONDS = cfg.get("video_clip_retention_seconds", 3600) + VIDEO_CLIP_DEFAULT_SEGMENT_DURATION = cfg.get("video_clip_default_segment_duration", 2) + logger.info(f"[INFO] Config initialized from {config_path}, alert_push_url={ALERT_PUSH_URL}") except Exception as e: diff --git a/config.yaml b/config.yaml index f930980..c87bed1 100644 --- a/config.yaml +++ b/config.yaml @@ -41,6 +41,12 @@ hls_downloader_daily_rotate_hour: 3 # 凌晨轮换时间 hls_downloader_retention_days: 3 # 文件保留天数 hls_downloader_retry_interval_seconds: 10 # 重试等待秒数 +# 视频剪辑配置 +video_clip_output_dir: "D:/ProjectDoc/Police/data/video_clips" # 视频剪辑输出目录 +video_clip_duration_seconds: 30 # 回溯时长(秒) +video_clip_retention_seconds: 3600 # 视频文件保留时长(秒) +video_clip_default_segment_duration: 2 # 默认分片时长fallback(秒) + service_groups: - name: "kadian_group" # 服务组名称 video_source_type: "hls" @@ -49,7 +55,7 @@ service_groups: algorithm: "checkpoint" # 算法类型 cameras: # 该组下的摄像头列表 - id: 8 - index: 12345 + index: "12345" name: Entrance params: roi_points: diff --git a/utils/hls_utils.py b/utils/hls_utils.py index e6b79c3..5f9ae3a 100644 --- a/utils/hls_utils.py +++ b/utils/hls_utils.py @@ -1,5 +1,6 @@ import os import glob +import re from common import constants @@ -67,3 +68,128 @@ def get_latest_n_segments_by_camera_id(camera_id: str, n: int) -> list: """ camera_root_dir = os.path.join(constants.HLS_ROOT_PATH, camera_id) return get_latest_n_segments(camera_root_dir, n) + + +def parse_segment_info(segment_path: str) -> tuple: + """ + 从TS分片路径解析出 camera_id, timestamp, sequence + + Args: + segment_path: TS分片路径,格式如: hls_root_path/camera_id/timestamp/segment_xxxxx.ts + + Returns: + (camera_id, timestamp, sequence) 或 (None, None, None) 解析失败时 + """ + try: + # 获取文件名和目录结构 + # 路径格式: .../camera_id/timestamp/segment_00001.ts + abs_path = os.path.abspath(segment_path) + parts = abs_path.split(os.sep) + + # 从后往前找 + # parts[-1] = segment_00001.ts + # parts[-2] = timestamp + # parts[-3] = camera_id + + if len(parts) < 3: + return None, None, None + + # 解析序号 + filename = parts[-1] + match = re.search(r'segment_(\d+)\.ts$', filename) + if not match: + return None, None, None + sequence = match.group(1) + + # 时间戳 + timestamp = parts[-2] + + # camera_id + camera_id = parts[-3] + + return camera_id, timestamp, sequence + + except Exception: + return None, None, None + + +def get_segments_before_current(current_segment_path: str, n: int) -> list: + """ + 获取当前分片之前的n个分片(包括当前分片) + + Args: + current_segment_path: 当前TS分片路径 + n: 需要获取的分片数量 + + Returns: + 分片路径列表(按时间顺序,旧的在前),如果不够n个则返回现有的 + """ + if not current_segment_path or not os.path.exists(current_segment_path): + return [] + + # 解析当前分片信息 + camera_id, timestamp, current_seq = parse_segment_info(current_segment_path) + if not camera_id: + return [] + + # 构建摄像头根目录和时间戳目录 + camera_root_dir = os.path.join(constants.HLS_ROOT_PATH, camera_id) + timestamp_dir = os.path.join(camera_root_dir, timestamp) + + if not os.path.exists(timestamp_dir): + return [] + + # 获取当前时间戳文件夹下的所有分片 + pattern = os.path.join(timestamp_dir, "segment_*.ts") + segment_files = glob.glob(pattern) + + # 按分片序号排序 + segment_files.sort(key=lambda x: int(os.path.basename(x).split('_')[-1].split('.')[0])) + + # 找到当前分片的位置 + try: + current_index = segment_files.index(current_segment_path) + except ValueError: + # 当前分片不在列表中 + return [] + + # 计算起始位置 + start_index = max(0, current_index - n + 1) + + # 返回从起始位置到当前位置的所有分片 + result = segment_files[start_index:current_index + 1] + + # 如果不够n个,需要从之前的时间戳文件夹获取 + if len(result) < n: + # 获取所有时间戳文件夹 + timestamp_folders = [] + for folder_name in os.listdir(camera_root_dir): + folder_path = os.path.join(camera_root_dir, folder_name) + if os.path.isdir(folder_path): + timestamp_folders.append(folder_name) + + # 排序,找到当前时间戳之前的时间戳 + timestamp_folders.sort() + try: + current_ts_index = timestamp_folders.index(timestamp) + except ValueError: + current_ts_index = len(timestamp_folders) + + # 从之前的时间戳文件夹获取分片 + needed_count = n - len(result) + for i in range(current_ts_index - 1, -1, -1): + prev_ts_dir = os.path.join(camera_root_dir, timestamp_folders[i]) + prev_pattern = os.path.join(prev_ts_dir, "segment_*.ts") + prev_segments = glob.glob(prev_pattern) + prev_segments.sort(key=lambda x: int(os.path.basename(x).split('_')[-1].split('.')[0])) + + # 取最后 needed_count 个 + take_count = min(needed_count, len(prev_segments)) + if take_count > 0: + result = prev_segments[-take_count:] + result + needed_count -= take_count + + if needed_count <= 0: + break + + return result