""" 人脸特征计算算法路由 提供人脸特征计算的HTTP接口 """ import os import logging from datetime import datetime, timedelta from fastapi import APIRouter, HTTPException, BackgroundTasks from config import settings from database.connection import db_manager from models.face_feature import FeatureStatus from models.video_check_task import SurVideoCheckTask from models.sur_config import SurConfigBase, SurConfig from models.sur_person import SurPersonBlacklist, SurFaceFeature from repositories.face_feature_repository import FaceFeatureRepository from algorithm.face_recognition_algorithm import FaceRecognitionAlgorithm from biz.base_face_biz import BaseFaceBiz from biz.video_check_biz import VideoCheckBiz from biz.video_face_biz import VideoFaceBiz from biz.video_face_prison_biz import VideoFacePrisonBiz from repositories.video_check_repository import VideoCheckTaskRepository # 创建路由器 router = APIRouter(prefix="/algorithm", tags=["algorithm"]) # 初始化人脸识别算法 face_algorithm = FaceRecognitionAlgorithm(use_gpu=settings.FACE_USE_GPU, use_npu=settings.FACE_USE_NPU) # 初始化RTSP专用人脸识别算法 face_algorithm_for_rtsp = FaceRecognitionAlgorithm(use_gpu=settings.FACE_USE_GPU, use_npu=settings.FACE_USE_NPU) # 初始化RTSP专用VideoFaceBiz实例 video_face_biz = VideoFaceBiz(face_algorithm_for_rtsp.get_app()) # 初始化RTSP专用VideoFacePrisonBiz实例 video_face_prison_biz = VideoFacePrisonBiz(face_algorithm_for_rtsp.get_app()) logger = logging.getLogger(__name__) def process_feature_calculation(feature_id: int) -> bool: """ 处理单个人脸特征计算 参数: feature_id: 特征记录ID 返回: 是否成功处理 """ try: with db_manager.get_session() as session: repository = FaceFeatureRepository(session) # 获取特征记录 feature = repository.get_by_id(feature_id) if not feature: logger.error(f"特征记录不存在: {feature_id}") return False # 检查是否已经处理完成 if feature.status in [FeatureStatus.SUCCESS, FeatureStatus.FAILED]: logger.info(f"特征记录已处理完成: {feature_id}, 状态: {feature.status_name}") return True # 检查是否超时 if feature.status == FeatureStatus.PROCESSING: if feature.start_time: timeout_duration = timedelta(hours=settings.FACE_CAL_FEATURE_TIMEOUT_HOURS) if datetime.now() - feature.start_time > timeout_duration: # 超时处理 feature.status = FeatureStatus.FAILED feature.finish_time = datetime.now() session.commit() logger.warning(f"特征计算超时: {feature_id}") return False else: # 没有开始时间,重置状态 feature.status = FeatureStatus.NOT_STARTED session.commit() # 处理未开始的计算 if feature.status == FeatureStatus.NOT_STARTED: # 设置状态为计算中 feature.status = FeatureStatus.PROCESSING feature.start_time = datetime.now() session.commit() logger.info(f"开始计算特征: {feature_id}") # 构建图片路径 if not feature.pic_id: logger.error(f"特征记录缺少图片ID: {feature_id}") feature.status = FeatureStatus.FAILED feature.finish_time = datetime.now() session.commit() return False image_path = os.path.join(settings.FACE_REGISTER_IMAGE_RESOURCE_DIR, feature.pic_id) # 检查图片文件是否存在 if not os.path.exists(image_path): logger.error(f"图片文件不存在: {image_path}") feature.status = FeatureStatus.FAILED feature.finish_time = datetime.now() session.commit() return False # 提取人脸特征 try: # 直接创建BaseFaceBiz实例 face_biz = BaseFaceBiz(face_algorithm.get_app()) feature_vector = face_biz.extract_face_feature(str(image_path)) if feature_vector is not None: # 转换为二进制数据 feature_bytes = feature_vector.tobytes() feature.feature_data = feature_bytes feature.status = FeatureStatus.SUCCESS feature.finish_time = datetime.now() session.commit() logger.info(f"特征计算成功: {feature_id}, 特征向量长度: {len(feature_vector)}") return True else: logger.error(f"特征提取失败: {feature_id}") feature.status = FeatureStatus.FAILED feature.finish_time = datetime.now() session.commit() return False except Exception as e: logger.error(f"特征计算过程中出错: {feature_id}, 错误: {str(e)}") feature.status = FeatureStatus.FAILED feature.finish_time = datetime.now() session.commit() return False return True except Exception as e: logger.error(f"处理特征计算时发生异常: {feature_id}, 错误: {str(e)}") return False async def process_pending_features(): """ 异步处理所有待处理的人脸特征计算 """ try: with db_manager.get_session() as session: repository = FaceFeatureRepository(session) # 查找需要处理的记录 # 条件: feature_type = FACE_MODEL_VERSION 且 status = 0 (未开始) pending_features = repository.get_features_by_type_and_status( feature_type=settings.FACE_MODEL_VERSION, status=FeatureStatus.NOT_STARTED ) # 查找可能超时的记录 (status = 1 且超时) timeout_features = [] processing_features = repository.get_features_by_type_and_status( feature_type=settings.FACE_MODEL_VERSION, status=FeatureStatus.PROCESSING ) for feature in processing_features: if feature.start_time: timeout_duration = timedelta(hours=settings.FACE_CAL_FEATURE_TIMEOUT_HOURS) if datetime.now() - feature.start_time > timeout_duration: timeout_features.append(feature) total_pending = len(pending_features) total_timeout = len(timeout_features) logger.info(f"发现待处理特征: {total_pending}个, 超时特征: {total_timeout}个") # 处理超时记录 for feature in timeout_features: feature.status = FeatureStatus.FAILED feature.finish_time = datetime.now() if timeout_features: session.commit() # 处理待处理记录 processed_count = 0 for feature in pending_features: processed_count += 1 process_feature_calculation(feature.id) # 每处理10个记录输出一次进度 if processed_count % 10 == 0: logger.info(f"处理进度: {processed_count}/{total_pending}") logger.info(f"特征计算处理完成: 共处理 {processed_count} 个特征") except Exception as e: logger.error(f"批量处理特征计算时发生异常: {str(e)}") @router.post("/start-feature-calculation", summary="开始人脸特征计算") async def start_feature_calculation(background_tasks: BackgroundTasks): """ 开始处理人脸特征计算 此接口会: 1. 查找所有feature_type为当前模型版本且status为0的记录 2. 将状态改为1,设置开始时间 3. 提取人脸特征值 4. 对于status为1且超时的记录,标记为失败 返回处理结果统计 """ try: # 在后台任务中异步处理,避免阻塞请求 background_tasks.add_task(process_pending_features) return { "success": True, "message": "收到特征值计算请求" } except Exception as e: logger.error(f"启动特征计算失败: {str(e)}") raise HTTPException(status_code=500, detail=f"启动特征计算失败: {str(e)}") @router.get("/feature-calculation-status", summary="获取特征计算状态") async def get_feature_calculation_status(): """ 获取当前特征计算的状态统计 """ try: with db_manager.get_session() as session: repository = FaceFeatureRepository(session) # 获取统计信息 stats = repository.get_statistics() # 获取当前模型版本的特定统计 current_model_stats = { "total": repository.count_by_type_and_status( feature_type=settings.FACE_MODEL_VERSION ), "not_started": repository.count_by_type_and_status( feature_type=settings.FACE_MODEL_VERSION, status=FeatureStatus.NOT_STARTED ), "processing": repository.count_by_type_and_status( feature_type=settings.FACE_MODEL_VERSION, status=FeatureStatus.PROCESSING ), "success": repository.count_by_type_and_status( feature_type=settings.FACE_MODEL_VERSION, status=FeatureStatus.SUCCESS ), "failed": repository.count_by_type_and_status( feature_type=settings.FACE_MODEL_VERSION, status=FeatureStatus.FAILED ) } return { "success": True, "data": { "overall_stats": stats, "current_model_stats": current_model_stats, "model_version": settings.FACE_MODEL_VERSION, "timeout_hours": settings.FACE_CAL_FEATURE_TIMEOUT_HOURS } } except Exception as e: logger.error(f"获取特征计算状态失败: {str(e)}") raise HTTPException(status_code=500, detail=f"获取特征计算状态失败: {str(e)}") @router.post("/calculate-single-feature/{feature_id}", summary="计算单个特征") async def calculate_single_feature(feature_id: int): """ 计算单个特征记录的人脸特征 参数: feature_id: 特征记录ID """ try: success = process_feature_calculation(feature_id) if success: return { "success": True, "message": f"特征计算完成: {feature_id}" } else: return { "success": False, "message": f"特征计算失败: {feature_id}" } except Exception as e: logger.error(f"计算单个特征失败: {feature_id}, 错误: {str(e)}") raise HTTPException(status_code=500, detail=f"计算单个特征失败: {str(e)}") def process_video_check_task(task_id: int) -> bool: """ 处理单个视频检查任务 参数: task_id: 任务ID 返回: 是否成功处理 """ try: with db_manager.get_session() as session: repository = VideoCheckTaskRepository(session) # 获取任务记录 task = repository.get_by_id(task_id) if not task: logger.error(f"视频检查任务不存在: {task_id}") return False # 检查是否已经处理完成 if task.status in [2, 3, 5]: # 完成、取消、失败 logger.info(f"视频检查任务已处理完成: {task_id}, 状态: {task.status}") return True # 检查是否超时 if task.status == 1: # 处理中 if task.start_time: timeout_duration = timedelta(hours=settings.FACE_CAL_FEATURE_TIMEOUT_HOURS) if datetime.now() - task.start_time > timeout_duration: # 超时处理 repository.update_task_status( task_id, 5, finish_time=datetime.now(), result=0, result_data={"error": "任务超时"} ) logger.warning(f"视频检查任务超时: {task_id}") return False else: # 没有开始时间,重置状态 repository.update_task_status(task_id, 0) # 处理未开始的任务 if task.status == 0: # 等待 # 设置状态为处理中 repository.update_task_status(task_id, 1, start_time=datetime.now()) logger.info(f"开始处理视频检查任务: {task_id}") # 创建VideoCheckBiz实例 video_biz = VideoCheckBiz(face_algorithm.get_app()) # 获取配置参数 config_dict = repository.get_config_by_group_id(task.config_id) # 设置VideoCheckBiz参数 if config_dict: param_mapping = { "face.list_mode": "list_mode", "face.clarity_threshold": "clarity_threshold", "face.min_face_size": "min_face_size", "face.pitch_threshold": "pitch_threshold", "face.yaw_threshold": "yaw_threshold", "face.similarity_threshold": "similarity_threshold", "face.skip_frame": "frame_skip" } for config_key, biz_param in param_mapping.items(): if config_key in config_dict: try: value = config_dict[config_key] if biz_param == "list_mode": video_biz.set_list_mode(value) elif biz_param == "clarity_threshold": video_biz.set_clarity_threshold(float(value)) elif biz_param == "min_face_size": video_biz.set_min_face_size(int(value)) elif biz_param == "pitch_threshold": video_biz.set_pitch_threshold(float(value)) elif biz_param == "yaw_threshold": video_biz.set_yaw_threshold(float(value)) elif biz_param == "similarity_threshold": video_biz.set_similarity_threshold(float(value)) elif biz_param == "frame_skip": # frame_skip作为参数传递给方法,不设置到实例 pass except (ValueError, TypeError) as e: logger.warning(f"参数设置失败 {config_key}: {value}, 错误: {str(e)}") # 获取目标视频路径 target_video = repository.get_video_by_id(int(task.target_video_id)) if not target_video: logger.error(f"目标视频不存在: {task.target_video_id}") repository.update_task_status( task_id, 5, finish_time=datetime.now(), result=0, result_data={"error": "目标视频不存在"} ) return False target_video_path = os.path.join(settings.VIDEO_RESOURCE_DIR, target_video.video_name_on_server) if not os.path.exists(target_video_path): logger.error(f"目标视频文件不存在: {target_video_path}") repository.update_task_status( task_id, 5, finish_time=datetime.now(), result=0, result_data={"error": "目标视频文件不存在"} ) return False # 提取最佳人脸特征 frame_skip = int(config_dict.get("face.skip_frame", 10)) best_feature = video_biz.extract_best_face_from_video(target_video_path, frame_skip) if best_feature is None: logger.error(f"无法从目标视频中提取人脸特征: {task_id}") repository.update_task_status( task_id, 5, finish_time=datetime.now(), result=0, result_data={"error": "无法从目标视频中提取人脸特征"} ) return False # 将特征值保存到数据库 feature_bytes = best_feature.tobytes() # 设置黑名单(使用提取的特征) video_biz.set_registered_faces({"target_person": best_feature}) # 获取待检查的视频列表 video_ids = [int(vid.strip()) for vid in task.video_id_list.split(",") if vid.strip()] video_list = repository.get_videos_by_ids(video_ids) if not video_list: logger.error(f"待检查视频列表为空: {task_id}") repository.update_task_status( task_id, 5, finish_time=datetime.now(), result=0, result_data={"error": "待检查视频列表为空"} ) return False # 构建视频信息列表 video_infos = [] for video in video_list: video_path = os.path.join(settings.VIDEO_RESOURCE_DIR, video.video_name_on_server) if os.path.exists(video_path): video_infos.append({ 'video_id': video.id, 'video_name': video.video_name, 'video_path': video_path }) else: logger.warning(f"视频文件不存在,跳过: {video_path}") if not video_infos: logger.error(f"所有待检查视频文件都不存在: {task_id}") repository.update_task_status( task_id, 5, finish_time=datetime.now(), result=0, result_data={"error": "所有待检查视频文件都不存在"} ) return False # 批量处理视频进行黑名单检测 results = video_biz.batch_process_videos_with_blacklist_detection( video_infos, frame_skip, "_checked" ) # 分析结果 has_blacklist_match = any(result.get('has_blacklist_match', False) for result in results) total_detections = sum(result.get('detection_count', 0) for result in results) # 更新任务状态 result_status = 1 if has_blacklist_match else 2 # 1=找到,2=未找到 repository.update_task_status( task_id, 2, finish_time=datetime.now(), result=result_status, result_data={ "has_blacklist_match": has_blacklist_match, "total_detections": total_detections, "video_results": results, "target_video": target_video_path, "target_video_name": target_video.video_name, "checked_videos": len(video_infos) }, feature_data=feature_bytes ) logger.info(f"视频检查任务完成: {task_id}, 结果: {'找到' if has_blacklist_match else '未找到'}") return True return True except Exception as e: logger.error(f"处理视频检查任务时发生异常: {task_id}, 错误: {str(e)}") try: with db_manager.get_session() as session: repository = VideoCheckTaskRepository(session) repository.update_task_status( task_id, 5, finish_time=datetime.now(), result=0, result_data={"error": str(e)} ) except Exception: pass return False async def process_pending_video_checks(): """ 异步处理所有待处理的视频检查任务 """ try: with db_manager.get_session() as session: repository = VideoCheckTaskRepository(session) # 查找需要处理的任务(status=0) pending_tasks = repository.get_pending_tasks() # 查找可能超时的任务(status=1且超时) timeout_tasks = repository.get_timeout_tasks(settings.FACE_CAL_FEATURE_TIMEOUT_HOURS) total_pending = len(pending_tasks) total_timeout = len(timeout_tasks) logger.info(f"发现待处理视频检查任务: {total_pending}个, 超时任务: {total_timeout}个") # 处理超时任务 for task in timeout_tasks: repository.update_task_status( task.id, 5, finish_time=datetime.now(), result=0, result_data={"error": "任务超时"} ) if timeout_tasks: session.commit() # 处理待处理任务 processed_count = 0 for task in pending_tasks: processed_count += 1 process_video_check_task(task.id) # 每处理5个任务输出一次进度 if processed_count % 5 == 0: logger.info(f"视频检查处理进度: {processed_count}/{total_pending}") logger.info(f"视频检查任务处理完成: 共处理 {processed_count} 个任务") except Exception as e: logger.error(f"批量处理视频检查任务时发生异常: {str(e)}") @router.post("/start-video-check", summary="开始视频检查") async def start_video_check(background_tasks: BackgroundTasks): """ 开始处理视频检查任务 此接口会: 1. 查找所有status为0的视频检查任务 2. 将状态改为1,设置开始时间 3. 提取目标视频中最佳人脸特征 4. 进行黑名单检测 5. 对于status为1且超时的任务,标记为失败 返回处理结果统计 """ try: # 在后台任务中异步处理,避免阻塞请求 background_tasks.add_task(process_pending_video_checks) return { "success": True, "message": "收到视频检查请求" } except Exception as e: logger.error(f"启动视频检查失败: {str(e)}") raise HTTPException(status_code=500, detail=f"启动视频检查失败: {str(e)}") @router.get("/video-check-status", summary="获取视频检查状态") async def get_video_check_status(): """ 获取当前视频检查任务的状态统计 """ try: with db_manager.get_session() as session: repository = VideoCheckTaskRepository(session) # 获取统计信息 total_tasks = len(repository.session.query(SurVideoCheckTask).all()) pending_tasks = len(repository.get_pending_tasks()) processing_tasks = len(repository.get_processing_tasks()) # 获取已完成任务的统计 completed_tasks = repository.session.query(SurVideoCheckTask).filter( SurVideoCheckTask.status == 2 ).all() found_count = sum(1 for task in completed_tasks if task.result == 1) not_found_count = sum(1 for task in completed_tasks if task.result == 2) failed_count = len(repository.session.query(SurVideoCheckTask).filter( SurVideoCheckTask.status == 5 ).all()) return { "success": True, "data": { "total_tasks": total_tasks, "pending_tasks": pending_tasks, "processing_tasks": processing_tasks, "completed_tasks": len(completed_tasks), "found_count": found_count, "not_found_count": not_found_count, "failed_count": failed_count, "timeout_hours": settings.FACE_CAL_FEATURE_TIMEOUT_HOURS } } except Exception as e: logger.error(f"获取视频检查状态失败: {str(e)}") raise HTTPException(status_code=500, detail=f"获取视频检查状态失败: {str(e)}") def sync_videofacebiz_params(): """ 同步VideoFaceBiz的参数 """ try: with db_manager.get_session() as session: # 查询人脸识别配置(根据实际表结构) config_records = session.query(SurConfigBase).filter( SurConfigBase.config_type == settings.SUR_CONFIG_TYPE_FACE ).all() # 构建配置参数字典 config_params = {} for record in config_records: if record.config_key and record.config_value: config_params[record.config_key] = record.config_value # 配置键映射关系 config_mapping = { "face.list_mode": "list_mode", "face.clarity_threshold": "clarity_threshold", "face.min_face_size": "min_face_size", "face.pitch_threshold": "pitch_threshold", "face.yaw_threshold": "yaw_threshold", "face.similarity_threshold": "similarity_threshold" } updated_count = 0 for config_key, param_name in config_mapping.items(): if config_key in config_params: config_value = config_params[config_key] # 根据参数类型进行转换和设置 if param_name == "list_mode": if config_value in ["0", "1"]: video_face_biz.set_list_mode(config_value) updated_count += 1 elif param_name == "clarity_threshold": try: threshold = float(config_value) video_face_biz.set_clarity_threshold(threshold) updated_count += 1 except ValueError: logger.error(f"无效的清晰度阈值: {config_value}") elif param_name == "min_face_size": try: size = int(config_value) video_face_biz.set_min_face_size(size) updated_count += 1 except ValueError: logger.error(f"无效的最小人脸尺寸: {config_value}") elif param_name == "pitch_threshold": try: threshold = float(config_value) video_face_biz.set_pitch_threshold(threshold) updated_count += 1 except ValueError: logger.error(f"无效的俯仰角阈值: {config_value}") elif param_name == "yaw_threshold": try: threshold = float(config_value) video_face_biz.set_yaw_threshold(threshold) updated_count += 1 except ValueError: logger.error(f"无效的偏航角阈值: {config_value}") elif param_name == "similarity_threshold": try: threshold = float(config_value) video_face_biz.set_similarity_threshold(threshold) updated_count += 1 except ValueError: logger.error(f"无效的相似度阈值: {config_value}") logger.info(f"✅ 同步VideoFaceBiz参数完成,更新了 {updated_count} 个参数") return updated_count except Exception as e: logger.error(f"❌ 同步VideoFaceBiz参数失败: {e}") return 0 def sync_videofacebiz_blacklist(): """ 同步VideoFaceBiz的黑名单 """ try: with db_manager.get_session() as session: # 查询启用的黑名单人员 blacklist_persons = session.query(SurPersonBlacklist).filter( SurPersonBlacklist.status == 1 ).all() if not blacklist_persons: logger.info("⚠️ 黑名单为空,清空当前黑名单") video_face_biz.set_registered_faces({}) return 0 person_ids = [person.person_id for person in blacklist_persons] # 查询对应的人脸特征 face_features = session.query(SurFaceFeature).filter( SurFaceFeature.person_id.in_(person_ids), SurFaceFeature.feature_type == settings.FACE_MODEL_VERSION, SurFaceFeature.status == 2 # 计算成功的特征 ).all() # 构建特征字典 registered_faces = {} loaded_count = 0 for feature in face_features: if feature.feature_data: try: # 将bytea转换为numpy数组 import numpy as np feature_array = np.frombuffer(feature.feature_data, dtype=np.float32) # 使用person_id作为标识符 person_name = f"{feature.person_id}" registered_faces[person_name] = feature_array loaded_count += 1 except Exception as e: logger.error(f"❌ 解析黑名单人员 {feature.person_id} 的特征数据失败: {e}") continue # 设置黑名单 success = video_face_biz.set_registered_faces(registered_faces) if success: logger.info(f"✅ 同步黑名单完成,加载了 {loaded_count} 个黑名单人员") else: logger.error("❌ 设置黑名单失败") return loaded_count except Exception as e: logger.error(f"❌ 同步黑名单失败: {e}") return 0 @router.post("/sync-videofacebiz-params", summary="同步VideoFaceBiz参数") async def sync_videofacebiz_params_endpoint(): """ 同步VideoFaceBiz的参数 从sur_config表同步参数到VideoFaceBiz实例 """ try: updated_count = sync_videofacebiz_params() return { "success": True, "message": f"同步参数完成,更新了 {updated_count} 个参数", "updated_count": updated_count } except Exception as e: logger.error(f"同步VideoFaceBiz参数失败: {e}") raise HTTPException(status_code=500, detail=f"同步参数失败: {str(e)}") @router.post("/sync-videofacebiz-blacklist", summary="同步VideoFaceBiz黑名单") async def sync_videofacebiz_blacklist_endpoint(): """ 同步VideoFaceBiz的黑名单 从sur_person_blacklist表同步黑名单到VideoFaceBiz实例 """ try: loaded_count = sync_videofacebiz_blacklist() return { "success": True, "message": f"同步黑名单完成,加载了 {loaded_count} 个黑名单人员", "loaded_count": loaded_count } except Exception as e: logger.error(f"同步VideoFaceBiz黑名单失败: {e}") raise HTTPException(status_code=500, detail=f"同步黑名单失败: {str(e)}") @router.get("/videofacebiz-status", summary="获取VideoFaceBiz状态") async def get_videofacebiz_status(): """ 获取VideoFaceBiz的当前状态 """ try: status = { "list_mode": video_face_biz.get_list_mode(), "clarity_threshold": video_face_biz.get_clarity_threshold(), "min_face_size": video_face_biz.get_min_face_size(), "pitch_threshold": video_face_biz.get_pitch_threshold(), "yaw_threshold": video_face_biz.get_yaw_threshold(), "similarity_threshold": video_face_biz.get_similarity_threshold(), "blacklist_count": video_face_biz.get_registered_face_count() } return { "success": True, "data": status } except Exception as e: logger.error(f"获取VideoFaceBiz状态失败: {e}") raise HTTPException(status_code=500, detail=f"获取状态失败: {str(e)}") def sync_videofaceprisonbiz_params(): """ 同步VideoFacePrisonBiz的参数 """ try: with db_manager.get_session() as session: # 查询sur_config表,条件为scope=1且target_id=3 prison_config = session.query(SurConfig).filter( SurConfig.scope == 1, # 房间作用域 SurConfig.target_id == 3, # 目标ID为3 SurConfig.config_type == settings.SUR_CONFIG_TYPE_FACE # 人脸识别配置类型 ).first() if not prison_config: logger.warning("未找到监狱场景的配置记录(scope=1, target_id=3)") return 0 if not prison_config.config_group_id: logger.warning("监狱配置记录缺少config_group_id") return 0 # 使用联表查询 config_records = session.query(SurConfigBase).filter( SurConfigBase.group_id == prison_config.config_group_id, SurConfigBase.config_type == settings.SUR_CONFIG_TYPE_FACE ).all() if not config_records: logger.warning(f"未找到对应的配置基础记录(group_id={prison_config.config_group_id})") return 0 # 构建配置参数字典 config_params = {} for record in config_records: if record.config_key and record.config_value: config_params[record.config_key] = record.config_value # 配置键映射关系 config_mapping = { "face.list_mode": "list_mode", "face.clarity_threshold": "clarity_threshold", "face.min_face_size": "min_face_size", "face.pitch_threshold": "pitch_threshold", "face.yaw_threshold": "yaw_threshold", "face.similarity_threshold": "similarity_threshold" } updated_count = 0 for config_key, param_name in config_mapping.items(): if config_key in config_params: config_value = config_params[config_key] # 根据参数类型进行转换和设置 if param_name == "list_mode": if config_value in ["0", "1"]: video_face_prison_biz.set_list_mode(config_value) updated_count += 1 elif param_name == "clarity_threshold": try: threshold = float(config_value) video_face_prison_biz.set_clarity_threshold(threshold) updated_count += 1 except ValueError: logger.error(f"无效的清晰度阈值: {config_value}") elif param_name == "min_face_size": try: size = int(config_value) video_face_prison_biz.set_min_face_size(size) updated_count += 1 except ValueError: logger.error(f"无效的最小人脸尺寸: {config_value}") elif param_name == "pitch_threshold": try: threshold = float(config_value) video_face_prison_biz.set_pitch_threshold(threshold) updated_count += 1 except ValueError: logger.error(f"无效的俯仰角阈值: {config_value}") elif param_name == "yaw_threshold": try: threshold = float(config_value) video_face_prison_biz.set_yaw_threshold(threshold) updated_count += 1 except ValueError: logger.error(f"无效的偏航角阈值: {config_value}") elif param_name == "similarity_threshold": try: threshold = float(config_value) video_face_prison_biz.set_similarity_threshold(threshold) updated_count += 1 except ValueError: logger.error(f"无效的相似度阈值: {config_value}") logger.info(f"✅ 同步VideoFacePrisonBiz参数完成,更新了 {updated_count} 个参数(配置组ID: {prison_config.config_group_id})") return updated_count except Exception as e: logger.error(f"❌ 同步VideoFacePrisonBiz参数失败: {e}") return 0 def sync_videofaceprisonbiz_blacklist(): """ 同步VideoFacePrisonBiz的黑名单 """ try: with db_manager.get_session() as session: # 查询启用的黑名单人员 blacklist_persons = session.query(SurPersonBlacklist).filter( SurPersonBlacklist.status == 1 ).all() if not blacklist_persons: logger.info("⚠️ 黑名单为空,清空当前黑名单") video_face_prison_biz.set_registered_faces({}) return 0 person_ids = [person.person_id for person in blacklist_persons] # 查询对应的人脸特征 face_features = session.query(SurFaceFeature).filter( SurFaceFeature.person_id.in_(person_ids), SurFaceFeature.feature_type == settings.FACE_MODEL_VERSION, SurFaceFeature.status == 2 # 计算成功的特征 ).all() # 构建特征字典 registered_faces = {} loaded_count = 0 for feature in face_features: if feature.feature_data: try: # 将bytea转换为numpy数组 import numpy as np feature_array = np.frombuffer(feature.feature_data, dtype=np.float32) # 使用person_id作为标识符 person_name = f"{feature.person_id}" registered_faces[person_name] = feature_array loaded_count += 1 except Exception as e: logger.error(f"❌ 解析黑名单人员 {feature.person_id} 的特征数据失败: {e}") continue # 设置黑名单 success = video_face_prison_biz.set_registered_faces(registered_faces) if success: logger.info(f"✅ 同步黑名单完成,加载了 {loaded_count} 个黑名单人员") else: logger.error("❌ 设置黑名单失败") return loaded_count except Exception as e: logger.error(f"❌ 同步黑名单失败: {e}") return 0 @router.post("/sync-videofaceprisonbiz-params", summary="同步VideoFacePrisonBiz参数") async def sync_videofaceprisonbiz_params_endpoint(): """ 同步VideoFacePrisonBiz的参数 从sur_config表同步参数到VideoFacePrisonBiz实例 """ try: updated_count = sync_videofaceprisonbiz_params() return { "success": True, "message": f"同步参数完成,更新了 {updated_count} 个参数", "updated_count": updated_count } except Exception as e: logger.error(f"同步VideoFacePrisonBiz参数失败: {e}") raise HTTPException(status_code=500, detail=f"同步参数失败: {str(e)}") @router.post("/sync-videofaceprisonbiz-blacklist", summary="同步VideoFacePrisonBiz黑名单") async def sync_videofaceprisonbiz_blacklist_endpoint(): """ 同步VideoFacePrisonBiz的黑名单 从sur_person_blacklist表同步黑名单到VideoFacePrisonBiz实例 """ try: loaded_count = sync_videofaceprisonbiz_blacklist() return { "success": True, "message": f"同步黑名单完成,加载了 {loaded_count} 个黑名单人员", "loaded_count": loaded_count } except Exception as e: logger.error(f"同步VideoFacePrisonBiz黑名单失败: {e}") raise HTTPException(status_code=500, detail=f"同步黑名单失败: {str(e)}") @router.get("/videofaceprisonbiz-status", summary="获取VideoFacePrisonBiz状态") async def get_videofaceprisonbiz_status(): """ 获取VideoFacePrisonBiz的当前状态 """ try: status = { "list_mode": video_face_prison_biz.get_list_mode(), "clarity_threshold": video_face_prison_biz.get_clarity_threshold(), "min_face_size": video_face_prison_biz.get_min_face_size(), "pitch_threshold": video_face_prison_biz.get_pitch_threshold(), "yaw_threshold": video_face_prison_biz.get_yaw_threshold(), "similarity_threshold": video_face_prison_biz.get_similarity_threshold(), "blacklist_count": video_face_prison_biz.get_registered_face_count() } return { "success": True, "data": status } except Exception as e: logger.error(f"获取VideoFacePrisonBiz状态失败: {e}") raise HTTPException(status_code=500, detail=f"获取状态失败: {str(e)}") # 导出路由器 __all__ = ["router", "sync_videofacebiz_params", "sync_videofacebiz_blacklist", "sync_videofaceprisonbiz_params", "sync_videofaceprisonbiz_blacklist"]