Files
SupervisorAI/api/routes/algorithm_router.py
2026-01-09 16:33:17 +08:00

1092 lines
46 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
人脸特征计算算法路由
提供人脸特征计算的HTTP接口
"""
import os
import logging
from datetime import datetime, timedelta
from fastapi import APIRouter, HTTPException, BackgroundTasks
from config import settings
from database.connection import db_manager
from models.face_feature import FeatureStatus
from models.video_check_task import SurVideoCheckTask
from models.sur_config import SurConfigBase, SurConfig
from models.sur_person import SurPersonBlacklist, SurFaceFeature
from repositories.face_feature_repository import FaceFeatureRepository
from algorithm.face_recognition_algorithm import FaceRecognitionAlgorithm
from biz.base_face_biz import BaseFaceBiz
from biz.video_check_biz import VideoCheckBiz
from biz.video_face_biz import VideoFaceBiz
from biz.video_face_prison_biz import VideoFacePrisonBiz
from repositories.video_check_repository import VideoCheckTaskRepository
# 创建路由器
router = APIRouter(prefix="/algorithm", tags=["algorithm"])
# 初始化人脸识别算法
face_algorithm = FaceRecognitionAlgorithm(use_gpu=settings.FACE_USE_GPU, use_npu=settings.FACE_USE_NPU)
# 初始化RTSP专用人脸识别算法
face_algorithm_for_rtsp = FaceRecognitionAlgorithm(use_gpu=settings.FACE_USE_GPU, use_npu=settings.FACE_USE_NPU)
# 初始化RTSP专用VideoFaceBiz实例
video_face_biz = VideoFaceBiz(face_algorithm_for_rtsp.get_app())
# 初始化RTSP专用VideoFacePrisonBiz实例
video_face_prison_biz = VideoFacePrisonBiz(face_algorithm_for_rtsp.get_app())
logger = logging.getLogger(__name__)
def process_feature_calculation(feature_id: int) -> bool:
"""
处理单个人脸特征计算
参数:
feature_id: 特征记录ID
返回:
是否成功处理
"""
try:
with db_manager.get_session() as session:
repository = FaceFeatureRepository(session)
# 获取特征记录
feature = repository.get_by_id(feature_id)
if not feature:
logger.error(f"特征记录不存在: {feature_id}")
return False
# 检查是否已经处理完成
if feature.status in [FeatureStatus.SUCCESS, FeatureStatus.FAILED]:
logger.info(f"特征记录已处理完成: {feature_id}, 状态: {feature.status_name}")
return True
# 检查是否超时
if feature.status == FeatureStatus.PROCESSING:
if feature.start_time:
timeout_duration = timedelta(hours=settings.FACE_CAL_FEATURE_TIMEOUT_HOURS)
if datetime.now() - feature.start_time > timeout_duration:
# 超时处理
feature.status = FeatureStatus.FAILED
feature.finish_time = datetime.now()
session.commit()
logger.warning(f"特征计算超时: {feature_id}")
return False
else:
# 没有开始时间,重置状态
feature.status = FeatureStatus.NOT_STARTED
session.commit()
# 处理未开始的计算
if feature.status == FeatureStatus.NOT_STARTED:
# 设置状态为计算中
feature.status = FeatureStatus.PROCESSING
feature.start_time = datetime.now()
session.commit()
logger.info(f"开始计算特征: {feature_id}")
# 构建图片路径
if not feature.pic_id:
logger.error(f"特征记录缺少图片ID: {feature_id}")
feature.status = FeatureStatus.FAILED
feature.finish_time = datetime.now()
session.commit()
return False
image_path = os.path.join(settings.FACE_REGISTER_IMAGE_RESOURCE_DIR, feature.pic_id)
# 检查图片文件是否存在
if not os.path.exists(image_path):
logger.error(f"图片文件不存在: {image_path}")
feature.status = FeatureStatus.FAILED
feature.finish_time = datetime.now()
session.commit()
return False
# 提取人脸特征
try:
# 直接创建BaseFaceBiz实例
face_biz = BaseFaceBiz(face_algorithm.get_app())
feature_vector = face_biz.extract_face_feature(str(image_path))
if feature_vector is not None:
# 转换为二进制数据
feature_bytes = feature_vector.tobytes()
feature.feature_data = feature_bytes
feature.status = FeatureStatus.SUCCESS
feature.finish_time = datetime.now()
session.commit()
logger.info(f"特征计算成功: {feature_id}, 特征向量长度: {len(feature_vector)}")
return True
else:
logger.error(f"特征提取失败: {feature_id}")
feature.status = FeatureStatus.FAILED
feature.finish_time = datetime.now()
session.commit()
return False
except Exception as e:
logger.error(f"特征计算过程中出错: {feature_id}, 错误: {str(e)}")
feature.status = FeatureStatus.FAILED
feature.finish_time = datetime.now()
session.commit()
return False
return True
except Exception as e:
logger.error(f"处理特征计算时发生异常: {feature_id}, 错误: {str(e)}")
return False
async def process_pending_features():
"""
异步处理所有待处理的人脸特征计算
"""
try:
with db_manager.get_session() as session:
repository = FaceFeatureRepository(session)
# 查找需要处理的记录
# 条件: feature_type = FACE_MODEL_VERSION 且 status = 0 (未开始)
pending_features = repository.get_features_by_type_and_status(
feature_type=settings.FACE_MODEL_VERSION,
status=FeatureStatus.NOT_STARTED
)
# 查找可能超时的记录 (status = 1 且超时)
timeout_features = []
processing_features = repository.get_features_by_type_and_status(
feature_type=settings.FACE_MODEL_VERSION,
status=FeatureStatus.PROCESSING
)
for feature in processing_features:
if feature.start_time:
timeout_duration = timedelta(hours=settings.FACE_CAL_FEATURE_TIMEOUT_HOURS)
if datetime.now() - feature.start_time > timeout_duration:
timeout_features.append(feature)
total_pending = len(pending_features)
total_timeout = len(timeout_features)
logger.info(f"发现待处理特征: {total_pending}个, 超时特征: {total_timeout}")
# 处理超时记录
for feature in timeout_features:
feature.status = FeatureStatus.FAILED
feature.finish_time = datetime.now()
if timeout_features:
session.commit()
# 处理待处理记录
processed_count = 0
for feature in pending_features:
processed_count += 1
process_feature_calculation(feature.id)
# 每处理10个记录输出一次进度
if processed_count % 10 == 0:
logger.info(f"处理进度: {processed_count}/{total_pending}")
logger.info(f"特征计算处理完成: 共处理 {processed_count} 个特征")
except Exception as e:
logger.error(f"批量处理特征计算时发生异常: {str(e)}")
@router.post("/start-feature-calculation", summary="开始人脸特征计算")
async def start_feature_calculation(background_tasks: BackgroundTasks):
"""
开始处理人脸特征计算
此接口会:
1. 查找所有feature_type为当前模型版本且status为0的记录
2. 将状态改为1设置开始时间
3. 提取人脸特征值
4. 对于status为1且超时的记录标记为失败
返回处理结果统计
"""
try:
# 在后台任务中异步处理,避免阻塞请求
background_tasks.add_task(process_pending_features)
return {
"success": True,
"message": "收到特征值计算请求"
}
except Exception as e:
logger.error(f"启动特征计算失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"启动特征计算失败: {str(e)}")
@router.get("/feature-calculation-status", summary="获取特征计算状态")
async def get_feature_calculation_status():
"""
获取当前特征计算的状态统计
"""
try:
with db_manager.get_session() as session:
repository = FaceFeatureRepository(session)
# 获取统计信息
stats = repository.get_statistics()
# 获取当前模型版本的特定统计
current_model_stats = {
"total": repository.count_by_type_and_status(
feature_type=settings.FACE_MODEL_VERSION
),
"not_started": repository.count_by_type_and_status(
feature_type=settings.FACE_MODEL_VERSION,
status=FeatureStatus.NOT_STARTED
),
"processing": repository.count_by_type_and_status(
feature_type=settings.FACE_MODEL_VERSION,
status=FeatureStatus.PROCESSING
),
"success": repository.count_by_type_and_status(
feature_type=settings.FACE_MODEL_VERSION,
status=FeatureStatus.SUCCESS
),
"failed": repository.count_by_type_and_status(
feature_type=settings.FACE_MODEL_VERSION,
status=FeatureStatus.FAILED
)
}
return {
"success": True,
"data": {
"overall_stats": stats,
"current_model_stats": current_model_stats,
"model_version": settings.FACE_MODEL_VERSION,
"timeout_hours": settings.FACE_CAL_FEATURE_TIMEOUT_HOURS
}
}
except Exception as e:
logger.error(f"获取特征计算状态失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取特征计算状态失败: {str(e)}")
@router.post("/calculate-single-feature/{feature_id}", summary="计算单个特征")
async def calculate_single_feature(feature_id: int):
"""
计算单个特征记录的人脸特征
参数:
feature_id: 特征记录ID
"""
try:
success = process_feature_calculation(feature_id)
if success:
return {
"success": True,
"message": f"特征计算完成: {feature_id}"
}
else:
return {
"success": False,
"message": f"特征计算失败: {feature_id}"
}
except Exception as e:
logger.error(f"计算单个特征失败: {feature_id}, 错误: {str(e)}")
raise HTTPException(status_code=500, detail=f"计算单个特征失败: {str(e)}")
def process_video_check_task(task_id: int) -> bool:
"""
处理单个视频检查任务
参数:
task_id: 任务ID
返回:
是否成功处理
"""
try:
with db_manager.get_session() as session:
repository = VideoCheckTaskRepository(session)
# 获取任务记录
task = repository.get_by_id(task_id)
if not task:
logger.error(f"视频检查任务不存在: {task_id}")
return False
# 检查是否已经处理完成
if task.status in [2, 3, 5]: # 完成、取消、失败
logger.info(f"视频检查任务已处理完成: {task_id}, 状态: {task.status}")
return True
# 检查是否超时
if task.status == 1: # 处理中
if task.start_time:
timeout_duration = timedelta(hours=settings.FACE_CAL_FEATURE_TIMEOUT_HOURS)
if datetime.now() - task.start_time > timeout_duration:
# 超时处理
repository.update_task_status(
task_id, 5, finish_time=datetime.now(),
result=0, result_data={"error": "任务超时"}
)
logger.warning(f"视频检查任务超时: {task_id}")
return False
else:
# 没有开始时间,重置状态
repository.update_task_status(task_id, 0)
# 处理未开始的任务
if task.status == 0: # 等待
# 设置状态为处理中
repository.update_task_status(task_id, 1, start_time=datetime.now())
logger.info(f"开始处理视频检查任务: {task_id}")
# 创建VideoCheckBiz实例
video_biz = VideoCheckBiz(face_algorithm.get_app())
# 获取配置参数
config_dict = repository.get_config_by_group_id(task.config_id)
# 设置VideoCheckBiz参数
if config_dict:
param_mapping = {
"face.list_mode": "list_mode",
"face.clarity_threshold": "clarity_threshold",
"face.min_face_size": "min_face_size",
"face.pitch_threshold": "pitch_threshold",
"face.yaw_threshold": "yaw_threshold",
"face.similarity_threshold": "similarity_threshold",
"face.skip_frame": "frame_skip"
}
for config_key, biz_param in param_mapping.items():
if config_key in config_dict:
try:
value = config_dict[config_key]
if biz_param == "list_mode":
video_biz.set_list_mode(value)
elif biz_param == "clarity_threshold":
video_biz.set_clarity_threshold(float(value))
elif biz_param == "min_face_size":
video_biz.set_min_face_size(int(value))
elif biz_param == "pitch_threshold":
video_biz.set_pitch_threshold(float(value))
elif biz_param == "yaw_threshold":
video_biz.set_yaw_threshold(float(value))
elif biz_param == "similarity_threshold":
video_biz.set_similarity_threshold(float(value))
elif biz_param == "frame_skip":
# frame_skip作为参数传递给方法不设置到实例
pass
except (ValueError, TypeError) as e:
logger.warning(f"参数设置失败 {config_key}: {value}, 错误: {str(e)}")
# 获取目标视频路径
target_video = repository.get_video_by_id(int(task.target_video_id))
if not target_video:
logger.error(f"目标视频不存在: {task.target_video_id}")
repository.update_task_status(
task_id, 5, finish_time=datetime.now(),
result=0, result_data={"error": "目标视频不存在"}
)
return False
target_video_path = os.path.join(settings.VIDEO_RESOURCE_DIR, target_video.video_name_on_server)
if not os.path.exists(target_video_path):
logger.error(f"目标视频文件不存在: {target_video_path}")
repository.update_task_status(
task_id, 5, finish_time=datetime.now(),
result=0, result_data={"error": "目标视频文件不存在"}
)
return False
# 提取最佳人脸特征
frame_skip = int(config_dict.get("face.skip_frame", 10))
best_feature = video_biz.extract_best_face_from_video(target_video_path, frame_skip)
if best_feature is None:
logger.error(f"无法从目标视频中提取人脸特征: {task_id}")
repository.update_task_status(
task_id, 5, finish_time=datetime.now(),
result=0, result_data={"error": "无法从目标视频中提取人脸特征"}
)
return False
# 将特征值保存到数据库
feature_bytes = best_feature.tobytes()
# 设置黑名单(使用提取的特征)
video_biz.set_registered_faces({"target_person": best_feature})
# 获取待检查的视频列表
video_ids = [int(vid.strip()) for vid in task.video_id_list.split(",") if vid.strip()]
video_list = repository.get_videos_by_ids(video_ids)
if not video_list:
logger.error(f"待检查视频列表为空: {task_id}")
repository.update_task_status(
task_id, 5, finish_time=datetime.now(),
result=0, result_data={"error": "待检查视频列表为空"}
)
return False
# 构建视频信息列表
video_infos = []
for video in video_list:
video_path = os.path.join(settings.VIDEO_RESOURCE_DIR, video.video_name_on_server)
if os.path.exists(video_path):
video_infos.append({
'video_id': video.id,
'video_name': video.video_name,
'video_path': video_path
})
else:
logger.warning(f"视频文件不存在,跳过: {video_path}")
if not video_infos:
logger.error(f"所有待检查视频文件都不存在: {task_id}")
repository.update_task_status(
task_id, 5, finish_time=datetime.now(),
result=0, result_data={"error": "所有待检查视频文件都不存在"}
)
return False
# 批量处理视频进行黑名单检测
results = video_biz.batch_process_videos_with_blacklist_detection(
video_infos, frame_skip, "_checked"
)
# 分析结果
has_blacklist_match = any(result.get('has_blacklist_match', False) for result in results)
total_detections = sum(result.get('detection_count', 0) for result in results)
# 更新任务状态
result_status = 1 if has_blacklist_match else 2 # 1=找到2=未找到
repository.update_task_status(
task_id, 2, finish_time=datetime.now(),
result=result_status, result_data={
"has_blacklist_match": has_blacklist_match,
"total_detections": total_detections,
"video_results": results,
"target_video": target_video_path,
"target_video_name": target_video.video_name,
"checked_videos": len(video_infos)
},
feature_data=feature_bytes
)
logger.info(f"视频检查任务完成: {task_id}, 结果: {'找到' if has_blacklist_match else '未找到'}")
return True
return True
except Exception as e:
logger.error(f"处理视频检查任务时发生异常: {task_id}, 错误: {str(e)}")
try:
with db_manager.get_session() as session:
repository = VideoCheckTaskRepository(session)
repository.update_task_status(
task_id, 5, finish_time=datetime.now(),
result=0, result_data={"error": str(e)}
)
except Exception:
pass
return False
async def process_pending_video_checks():
"""
异步处理所有待处理的视频检查任务
"""
try:
with db_manager.get_session() as session:
repository = VideoCheckTaskRepository(session)
# 查找需要处理的任务status=0
pending_tasks = repository.get_pending_tasks()
# 查找可能超时的任务status=1且超时
timeout_tasks = repository.get_timeout_tasks(settings.FACE_CAL_FEATURE_TIMEOUT_HOURS)
total_pending = len(pending_tasks)
total_timeout = len(timeout_tasks)
logger.info(f"发现待处理视频检查任务: {total_pending}个, 超时任务: {total_timeout}")
# 处理超时任务
for task in timeout_tasks:
repository.update_task_status(
task.id, 5, finish_time=datetime.now(),
result=0, result_data={"error": "任务超时"}
)
if timeout_tasks:
session.commit()
# 处理待处理任务
processed_count = 0
for task in pending_tasks:
processed_count += 1
process_video_check_task(task.id)
# 每处理5个任务输出一次进度
if processed_count % 5 == 0:
logger.info(f"视频检查处理进度: {processed_count}/{total_pending}")
logger.info(f"视频检查任务处理完成: 共处理 {processed_count} 个任务")
except Exception as e:
logger.error(f"批量处理视频检查任务时发生异常: {str(e)}")
@router.post("/start-video-check", summary="开始视频检查")
async def start_video_check(background_tasks: BackgroundTasks):
"""
开始处理视频检查任务
此接口会:
1. 查找所有status为0的视频检查任务
2. 将状态改为1设置开始时间
3. 提取目标视频中最佳人脸特征
4. 进行黑名单检测
5. 对于status为1且超时的任务标记为失败
返回处理结果统计
"""
try:
# 在后台任务中异步处理,避免阻塞请求
background_tasks.add_task(process_pending_video_checks)
return {
"success": True,
"message": "收到视频检查请求"
}
except Exception as e:
logger.error(f"启动视频检查失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"启动视频检查失败: {str(e)}")
@router.get("/video-check-status", summary="获取视频检查状态")
async def get_video_check_status():
"""
获取当前视频检查任务的状态统计
"""
try:
with db_manager.get_session() as session:
repository = VideoCheckTaskRepository(session)
# 获取统计信息
total_tasks = len(repository.session.query(SurVideoCheckTask).all())
pending_tasks = len(repository.get_pending_tasks())
processing_tasks = len(repository.get_processing_tasks())
# 获取已完成任务的统计
completed_tasks = repository.session.query(SurVideoCheckTask).filter(
SurVideoCheckTask.status == 2
).all()
found_count = sum(1 for task in completed_tasks if task.result == 1)
not_found_count = sum(1 for task in completed_tasks if task.result == 2)
failed_count = len(repository.session.query(SurVideoCheckTask).filter(
SurVideoCheckTask.status == 5
).all())
return {
"success": True,
"data": {
"total_tasks": total_tasks,
"pending_tasks": pending_tasks,
"processing_tasks": processing_tasks,
"completed_tasks": len(completed_tasks),
"found_count": found_count,
"not_found_count": not_found_count,
"failed_count": failed_count,
"timeout_hours": settings.FACE_CAL_FEATURE_TIMEOUT_HOURS
}
}
except Exception as e:
logger.error(f"获取视频检查状态失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取视频检查状态失败: {str(e)}")
def sync_videofacebiz_params():
"""
同步VideoFaceBiz的参数
"""
try:
with db_manager.get_session() as session:
# 查询人脸识别配置(根据实际表结构)
config_records = session.query(SurConfigBase).filter(
SurConfigBase.config_type == settings.SUR_CONFIG_TYPE_FACE
).all()
# 构建配置参数字典
config_params = {}
for record in config_records:
if record.config_key and record.config_value:
config_params[record.config_key] = record.config_value
# 配置键映射关系
config_mapping = {
"face.list_mode": "list_mode",
"face.clarity_threshold": "clarity_threshold",
"face.min_face_size": "min_face_size",
"face.pitch_threshold": "pitch_threshold",
"face.yaw_threshold": "yaw_threshold",
"face.similarity_threshold": "similarity_threshold"
}
updated_count = 0
for config_key, param_name in config_mapping.items():
if config_key in config_params:
config_value = config_params[config_key]
# 根据参数类型进行转换和设置
if param_name == "list_mode":
if config_value in ["0", "1"]:
video_face_biz.set_list_mode(config_value)
updated_count += 1
elif param_name == "clarity_threshold":
try:
threshold = float(config_value)
video_face_biz.set_clarity_threshold(threshold)
updated_count += 1
except ValueError:
logger.error(f"无效的清晰度阈值: {config_value}")
elif param_name == "min_face_size":
try:
size = int(config_value)
video_face_biz.set_min_face_size(size)
updated_count += 1
except ValueError:
logger.error(f"无效的最小人脸尺寸: {config_value}")
elif param_name == "pitch_threshold":
try:
threshold = float(config_value)
video_face_biz.set_pitch_threshold(threshold)
updated_count += 1
except ValueError:
logger.error(f"无效的俯仰角阈值: {config_value}")
elif param_name == "yaw_threshold":
try:
threshold = float(config_value)
video_face_biz.set_yaw_threshold(threshold)
updated_count += 1
except ValueError:
logger.error(f"无效的偏航角阈值: {config_value}")
elif param_name == "similarity_threshold":
try:
threshold = float(config_value)
video_face_biz.set_similarity_threshold(threshold)
updated_count += 1
except ValueError:
logger.error(f"无效的相似度阈值: {config_value}")
logger.info(f"✅ 同步VideoFaceBiz参数完成更新了 {updated_count} 个参数")
return updated_count
except Exception as e:
logger.error(f"❌ 同步VideoFaceBiz参数失败: {e}")
return 0
def sync_videofacebiz_blacklist():
"""
同步VideoFaceBiz的黑名单
"""
try:
with db_manager.get_session() as session:
# 查询启用的黑名单人员
blacklist_persons = session.query(SurPersonBlacklist).filter(
SurPersonBlacklist.status == 1
).all()
if not blacklist_persons:
logger.info("⚠️ 黑名单为空,清空当前黑名单")
video_face_biz.set_registered_faces({})
return 0
person_ids = [person.person_id for person in blacklist_persons]
# 查询对应的人脸特征
face_features = session.query(SurFaceFeature).filter(
SurFaceFeature.person_id.in_(person_ids),
SurFaceFeature.feature_type == settings.FACE_MODEL_VERSION,
SurFaceFeature.status == 2 # 计算成功的特征
).all()
# 构建特征字典
registered_faces = {}
loaded_count = 0
for feature in face_features:
if feature.feature_data:
try:
# 将bytea转换为numpy数组
import numpy as np
feature_array = np.frombuffer(feature.feature_data, dtype=np.float32)
# 使用person_id作为标识符
person_name = f"{feature.person_id}"
registered_faces[person_name] = feature_array
loaded_count += 1
except Exception as e:
logger.error(f"❌ 解析黑名单人员 {feature.person_id} 的特征数据失败: {e}")
continue
# 设置黑名单
success = video_face_biz.set_registered_faces(registered_faces)
if success:
logger.info(f"✅ 同步黑名单完成,加载了 {loaded_count} 个黑名单人员")
else:
logger.error("❌ 设置黑名单失败")
return loaded_count
except Exception as e:
logger.error(f"❌ 同步黑名单失败: {e}")
return 0
@router.post("/sync-videofacebiz-params", summary="同步VideoFaceBiz参数")
async def sync_videofacebiz_params_endpoint():
"""
同步VideoFaceBiz的参数
从sur_config表同步参数到VideoFaceBiz实例
"""
try:
updated_count = sync_videofacebiz_params()
return {
"success": True,
"message": f"同步参数完成,更新了 {updated_count} 个参数",
"updated_count": updated_count
}
except Exception as e:
logger.error(f"同步VideoFaceBiz参数失败: {e}")
raise HTTPException(status_code=500, detail=f"同步参数失败: {str(e)}")
@router.post("/sync-videofacebiz-blacklist", summary="同步VideoFaceBiz黑名单")
async def sync_videofacebiz_blacklist_endpoint():
"""
同步VideoFaceBiz的黑名单
从sur_person_blacklist表同步黑名单到VideoFaceBiz实例
"""
try:
loaded_count = sync_videofacebiz_blacklist()
return {
"success": True,
"message": f"同步黑名单完成,加载了 {loaded_count} 个黑名单人员",
"loaded_count": loaded_count
}
except Exception as e:
logger.error(f"同步VideoFaceBiz黑名单失败: {e}")
raise HTTPException(status_code=500, detail=f"同步黑名单失败: {str(e)}")
@router.get("/videofacebiz-status", summary="获取VideoFaceBiz状态")
async def get_videofacebiz_status():
"""
获取VideoFaceBiz的当前状态
"""
try:
status = {
"list_mode": video_face_biz.get_list_mode(),
"clarity_threshold": video_face_biz.get_clarity_threshold(),
"min_face_size": video_face_biz.get_min_face_size(),
"pitch_threshold": video_face_biz.get_pitch_threshold(),
"yaw_threshold": video_face_biz.get_yaw_threshold(),
"similarity_threshold": video_face_biz.get_similarity_threshold(),
"blacklist_count": video_face_biz.get_registered_face_count()
}
return {
"success": True,
"data": status
}
except Exception as e:
logger.error(f"获取VideoFaceBiz状态失败: {e}")
raise HTTPException(status_code=500, detail=f"获取状态失败: {str(e)}")
def sync_videofaceprisonbiz_params():
"""
同步VideoFacePrisonBiz的参数
"""
try:
with db_manager.get_session() as session:
# 查询sur_config表条件为scope=1且target_id=3
prison_config = session.query(SurConfig).filter(
SurConfig.scope == 1, # 房间作用域
SurConfig.target_id == 3, # 目标ID为3
SurConfig.config_type == settings.SUR_CONFIG_TYPE_FACE # 人脸识别配置类型
).first()
if not prison_config:
logger.warning("未找到监狱场景的配置记录scope=1, target_id=3")
return 0
if not prison_config.config_group_id:
logger.warning("监狱配置记录缺少config_group_id")
return 0
# 使用联表查询
config_records = session.query(SurConfigBase).filter(
SurConfigBase.group_id == prison_config.config_group_id,
SurConfigBase.config_type == settings.SUR_CONFIG_TYPE_FACE
).all()
if not config_records:
logger.warning(f"未找到对应的配置基础记录group_id={prison_config.config_group_id}")
return 0
# 构建配置参数字典
config_params = {}
for record in config_records:
if record.config_key and record.config_value:
config_params[record.config_key] = record.config_value
# 配置键映射关系
config_mapping = {
"face.list_mode": "list_mode",
"face.clarity_threshold": "clarity_threshold",
"face.min_face_size": "min_face_size",
"face.pitch_threshold": "pitch_threshold",
"face.yaw_threshold": "yaw_threshold",
"face.similarity_threshold": "similarity_threshold",
"face.detection_window_seconds": "detection_window_seconds",
"face.min_match_count": "min_match_count",
"face.cooldown_seconds": "cooldown_seconds",
"face.prison_escort_window_hours": "escort_window_hours"
}
updated_count = 0
for config_key, param_name in config_mapping.items():
if config_key in config_params:
config_value = config_params[config_key]
# 根据参数类型进行转换和设置
if param_name == "list_mode":
if config_value in ["0", "1"]:
video_face_prison_biz.set_list_mode(config_value)
updated_count += 1
elif param_name == "clarity_threshold":
try:
threshold = float(config_value)
video_face_prison_biz.set_clarity_threshold(threshold)
updated_count += 1
except ValueError:
logger.error(f"无效的清晰度阈值: {config_value}")
elif param_name == "min_face_size":
try:
size = int(config_value)
video_face_prison_biz.set_min_face_size(size)
updated_count += 1
except ValueError:
logger.error(f"无效的最小人脸尺寸: {config_value}")
elif param_name == "pitch_threshold":
try:
threshold = float(config_value)
video_face_prison_biz.set_pitch_threshold(threshold)
updated_count += 1
except ValueError:
logger.error(f"无效的俯仰角阈值: {config_value}")
elif param_name == "yaw_threshold":
try:
threshold = float(config_value)
video_face_prison_biz.set_yaw_threshold(threshold)
updated_count += 1
except ValueError:
logger.error(f"无效的偏航角阈值: {config_value}")
elif param_name == "similarity_threshold":
try:
threshold = float(config_value)
video_face_prison_biz.set_similarity_threshold(threshold)
updated_count += 1
except ValueError:
logger.error(f"无效的相似度阈值: {config_value}")
elif param_name == "detection_window_seconds":
try:
window_seconds = float(config_value)
video_face_prison_biz.set_detection_window_seconds(window_seconds)
updated_count += 1
except ValueError:
logger.error(f"无效的检测窗口时间: {config_value}")
elif param_name == "min_match_count":
try:
min_matches = int(config_value)
video_face_prison_biz.set_min_match_count(min_matches)
updated_count += 1
except ValueError:
logger.error(f"无效的最小匹配次数: {config_value}")
elif param_name == "cooldown_seconds":
try:
cooldown_seconds = int(config_value)
video_face_prison_biz.set_cooldown_seconds(cooldown_seconds)
updated_count += 1
except ValueError:
logger.error(f"无效的冷却时间: {config_value}")
elif param_name == "escort_window_hours":
try:
escort_hours = int(config_value)
video_face_prison_biz.set_escort_window_hours(escort_hours)
updated_count += 1
except ValueError:
logger.error(f"无效的犯人带出窗口时间: {config_value}")
logger.info(f"✅ 同步VideoFacePrisonBiz参数完成更新了 {updated_count} 个参数配置组ID: {prison_config.config_group_id}")
return updated_count
except Exception as e:
logger.error(f"❌ 同步VideoFacePrisonBiz参数失败: {e}")
return 0
def sync_videofaceprisonbiz_blacklist():
"""
同步VideoFacePrisonBiz的黑名单
"""
try:
with db_manager.get_session() as session:
# 查询启用的黑名单人员
blacklist_persons = session.query(SurPersonBlacklist).filter(
SurPersonBlacklist.status == 1
).all()
if not blacklist_persons:
logger.info("⚠️ 黑名单为空,清空当前黑名单")
video_face_prison_biz.set_registered_faces({})
return 0
person_ids = [person.person_id for person in blacklist_persons]
# 查询对应的人脸特征
face_features = session.query(SurFaceFeature).filter(
SurFaceFeature.person_id.in_(person_ids),
SurFaceFeature.feature_type == settings.FACE_MODEL_VERSION,
SurFaceFeature.status == 2 # 计算成功的特征
).all()
# 构建特征字典
registered_faces = {}
loaded_count = 0
for feature in face_features:
if feature.feature_data:
try:
# 将bytea转换为numpy数组
import numpy as np
feature_array = np.frombuffer(feature.feature_data, dtype=np.float32)
# 使用person_id作为标识符
person_name = f"{feature.person_id}"
registered_faces[person_name] = feature_array
loaded_count += 1
except Exception as e:
logger.error(f"❌ 解析黑名单人员 {feature.person_id} 的特征数据失败: {e}")
continue
# 设置黑名单
success = video_face_prison_biz.set_registered_faces(registered_faces)
if success:
logger.info(f"✅ 同步黑名单完成,加载了 {loaded_count} 个黑名单人员")
else:
logger.error("❌ 设置黑名单失败")
return loaded_count
except Exception as e:
logger.error(f"❌ 同步黑名单失败: {e}")
return 0
@router.post("/sync-videofaceprisonbiz-params", summary="同步VideoFacePrisonBiz参数")
async def sync_videofaceprisonbiz_params_endpoint():
"""
同步VideoFacePrisonBiz的参数
从sur_config表同步参数到VideoFacePrisonBiz实例
"""
try:
updated_count = sync_videofaceprisonbiz_params()
return {
"success": True,
"message": f"同步参数完成,更新了 {updated_count} 个参数",
"updated_count": updated_count
}
except Exception as e:
logger.error(f"同步VideoFacePrisonBiz参数失败: {e}")
raise HTTPException(status_code=500, detail=f"同步参数失败: {str(e)}")
@router.post("/sync-videofaceprisonbiz-blacklist", summary="同步VideoFacePrisonBiz黑名单")
async def sync_videofaceprisonbiz_blacklist_endpoint():
"""
同步VideoFacePrisonBiz的黑名单
从sur_person_blacklist表同步黑名单到VideoFacePrisonBiz实例
"""
try:
loaded_count = sync_videofaceprisonbiz_blacklist()
return {
"success": True,
"message": f"同步黑名单完成,加载了 {loaded_count} 个黑名单人员",
"loaded_count": loaded_count
}
except Exception as e:
logger.error(f"同步VideoFacePrisonBiz黑名单失败: {e}")
raise HTTPException(status_code=500, detail=f"同步黑名单失败: {str(e)}")
@router.get("/videofaceprisonbiz-status", summary="获取VideoFacePrisonBiz状态")
async def get_videofaceprisonbiz_status():
"""
获取VideoFacePrisonBiz的当前状态
"""
try:
status = {
"list_mode": video_face_prison_biz.get_list_mode(),
"clarity_threshold": video_face_prison_biz.get_clarity_threshold(),
"min_face_size": video_face_prison_biz.get_min_face_size(),
"pitch_threshold": video_face_prison_biz.get_pitch_threshold(),
"yaw_threshold": video_face_prison_biz.get_yaw_threshold(),
"similarity_threshold": video_face_prison_biz.get_similarity_threshold(),
"blacklist_count": video_face_prison_biz.get_registered_face_count()
}
return {
"success": True,
"data": status
}
except Exception as e:
logger.error(f"获取VideoFacePrisonBiz状态失败: {e}")
raise HTTPException(status_code=500, detail=f"获取状态失败: {str(e)}")
# 导出路由器
__all__ = ["router", "sync_videofacebiz_params", "sync_videofacebiz_blacklist", "sync_videofaceprisonbiz_params", "sync_videofaceprisonbiz_blacklist"]