Files
SupervisorAI/api/routes/face_features.py

417 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
人脸特征API路由
"""
from typing import List, Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, Query, BackgroundTasks
from fastapi.responses import JSONResponse
from schemas.face_feature import (
FaceFeatureCreate,
FaceFeatureUpdate,
FaceFeatureQuery,
FaceFeatureResponse,
FaceFeatureListResponse,
FaceFeatureStatsResponse,
BatchFaceFeatureCreate,
FeatureStatus
)
from api.dependencies import (
get_face_feature_service,
get_face_feature_by_id
)
from services.face_feature_service import FaceFeatureService
from api.errors import (
FaceFeatureProcessingError,
FeatureNotFoundError,
DuplicateFeatureError
)
from config import settings
# 创建路由器
router = APIRouter(
prefix="/face-features",
tags=["人脸特征管理"],
responses={
404: {"description": "资源不存在"},
400: {"description": "请求参数错误"},
500: {"description": "服务器内部错误"}
}
)
@router.post(
"/",
response_model=FaceFeatureResponse,
status_code=status.HTTP_201_CREATED,
summary="创建人脸特征记录",
description="创建新的人脸特征记录"
)
async def create_face_feature(
feature_data: FaceFeatureCreate,
service: FaceFeatureService = Depends(get_face_feature_service)
):
"""
创建人脸特征记录
- **person_id**: 人员ID (必须大于0)
- **feature_type**: 特征类型 (可选大于等于0)
- **pic_id**: 图片ID (可选)
- **status**: 计算状态 (默认: NOT_STARTED)
- **feature_data**: 特征数据 (可选,二进制)
"""
try:
return service.create_feature(feature_data)
except ValueError as e:
if "already exists" in str(e):
# 解析错误信息中的person_id和feature_type
raise DuplicateFeatureError(
person_id=feature_data.person_id,
feature_type=feature_data.feature_type or 0
)
raise FaceFeatureProcessingError(detail=str(e))
@router.get(
"/{feature_id}",
response_model=FaceFeatureResponse,
summary="获取人脸特征记录",
description="根据ID获取人脸特征记录"
)
async def get_face_feature(
feature: FaceFeatureResponse = Depends(get_face_feature_by_id)
):
"""
根据ID获取人脸特征记录
- **feature_id**: 特征记录ID (路径参数)
"""
return feature
@router.get(
"/",
response_model=FaceFeatureListResponse,
summary="查询人脸特征记录",
description="查询人脸特征记录列表,支持分页和过滤"
)
async def list_face_features(
person_id: Optional[int] = Query(None, description="人员ID", gt=0),
feature_type: Optional[int] = Query(None, description="特征类型", ge=0),
status: Optional[FeatureStatus] = Query(None, description="计算状态"),
start_date: Optional[datetime] = Query(None, description="开始时间"),
end_date: Optional[datetime] = Query(None, description="结束时间"),
has_feature_data: Optional[bool] = Query(None, description="是否有特征数据"),
page: int = Query(1, description="页码", ge=1),
page_size: int = Query(20, description="每页数量", ge=1, le=100),
service: FaceFeatureService = Depends(get_face_feature_service)
):
"""
查询人脸特征记录
- **person_id**: 按人员ID过滤 (可选)
- **feature_type**: 按特征类型过滤 (可选)
- **status**: 按计算状态过滤 (可选)
- **start_date**: 开始时间过滤 (可选)
- **end_date**: 结束时间过滤 (可选)
- **has_feature_data**: 是否有特征数据过滤 (可选)
- **page**: 页码 (默认: 1)
- **page_size**: 每页数量 (默认: 20, 最大: 100)
"""
# 构建查询参数
query = FaceFeatureQuery(
person_id=person_id,
feature_type=feature_type,
status=status,
start_date=start_date,
end_date=end_date,
has_feature_data=has_feature_data
)
return service.query_features(
query=query,
page=page,
page_size=page_size,
order_by="created_time",
desc_order=True
)
@router.put(
"/{feature_id}",
response_model=FaceFeatureResponse,
summary="更新人脸特征记录",
description="更新指定ID的人脸特征记录"
)
async def update_face_feature(
feature_id: int,
update_data: FaceFeatureUpdate,
service: FaceFeatureService = Depends(get_face_feature_service)
):
"""
更新人脸特征记录
- **feature_id**: 特征记录ID (路径参数)
- **update_data**: 更新数据 (请求体)
"""
try:
result = service.update_feature(feature_id, update_data)
if not result:
raise FeatureNotFoundError(feature_id)
return result
except ValueError as e:
raise FaceFeatureProcessingError(detail=str(e), feature_id=feature_id)
@router.delete(
"/{feature_id}",
status_code=status.HTTP_204_NO_CONTENT,
summary="删除人脸特征记录",
description="删除指定ID的人脸特征记录"
)
async def delete_face_feature(
feature_id: int,
service: FaceFeatureService = Depends(get_face_feature_service)
):
"""
删除人脸特征记录
- **feature_id**: 特征记录ID (路径参数)
"""
success = service.delete_feature(feature_id)
if not success:
raise FeatureNotFoundError(feature_id)
return JSONResponse(
status_code=status.HTTP_204_NO_CONTENT,
content=None
)
@router.post(
"/{feature_id}/start-processing",
response_model=FaceFeatureResponse,
summary="开始处理人脸特征",
description="开始计算指定ID的人脸特征值"
)
async def start_face_feature_processing(
feature_id: int,
background_tasks: BackgroundTasks,
service: FaceFeatureService = Depends(get_face_feature_service)
):
"""
开始处理人脸特征计算
- **feature_id**: 特征记录ID (路径参数)
注意:这是一个异步处理接口,会立即返回开始状态,
实际特征计算可能在后台进行。
"""
try:
# 先获取特征记录
feature = service.get_feature(feature_id)
if not feature:
raise FeatureNotFoundError(feature_id)
# 检查是否可以开始处理
if feature.status != FeatureStatus.NOT_STARTED:
raise FaceFeatureProcessingError(
detail=f"特征记录状态为 {feature.status_name},无法开始处理",
feature_id=feature_id
)
# 开始处理
success = service.start_processing(feature_id)
if not success:
raise FaceFeatureProcessingError(
detail="开始处理失败",
feature_id=feature_id
)
# 异步任务:模拟特征计算过程
# 在实际应用中,这里应该调用实际的特征计算服务
background_tasks.add_task(
simulate_feature_processing,
feature_id=feature_id,
service=service
)
# 返回更新后的特征记录
return service.get_feature(feature_id)
except ValueError as e:
raise FaceFeatureProcessingError(detail=str(e), feature_id=feature_id)
@router.post(
"/{feature_id}/finish-processing",
response_model=FaceFeatureResponse,
summary="完成人脸特征处理",
description="完成指定ID的人脸特征值计算"
)
async def finish_face_feature_processing(
feature_id: int,
success: bool = Query(True, description="是否成功完成"),
service: FaceFeatureService = Depends(get_face_feature_service)
):
"""
完成人脸特征计算
- **feature_id**: 特征记录ID (路径参数)
- **success**: 是否成功完成 (查询参数,默认: true)
"""
try:
# 检查特征记录
feature = service.get_feature(feature_id)
if not feature:
raise FeatureNotFoundError(feature_id)
# 检查是否可以完成处理
if feature.status != FeatureStatus.PROCESSING:
raise FaceFeatureProcessingError(
detail=f"特征记录状态为 {feature.status_name},无法完成处理",
feature_id=feature_id
)
# 完成处理
finish_success = service.finish_processing(feature_id, success)
if not finish_success:
raise FaceFeatureProcessingError(
detail="完成处理失败",
feature_id=feature_id
)
return service.get_feature(feature_id)
except ValueError as e:
raise FaceFeatureProcessingError(detail=str(e), feature_id=feature_id)
@router.post(
"/batch",
response_model=List[FaceFeatureResponse],
status_code=status.HTTP_201_CREATED,
summary="批量创建人脸特征记录",
description="批量创建多个人脸特征记录"
)
async def batch_create_face_features(
batch_data: BatchFaceFeatureCreate,
service: FaceFeatureService = Depends(get_face_feature_service)
):
"""
批量创建人脸特征记录
- **items**: 特征记录列表 (必须1-1000条)
"""
try:
return service.create_features_batch(batch_data)
except ValueError as e:
raise FaceFeatureProcessingError(detail=str(e))
@router.get(
"/person/{person_id}",
response_model=List[FaceFeatureResponse],
summary="获取人员的人脸特征记录",
description="根据人员ID获取所有相关的人脸特征记录"
)
async def get_face_features_by_person(
person_id: int,
limit: int = Query(100, description="返回数量限制", ge=1, le=1000),
service: FaceFeatureService = Depends(get_face_feature_service)
):
"""
获取人员的人脸特征记录
- **person_id**: 人员ID (路径参数)
- **limit**: 返回数量限制 (查询参数,默认: 100, 最大: 1000)
"""
return service.list_features_by_person(person_id, limit)
@router.get(
"/stats/summary",
response_model=FaceFeatureStatsResponse,
summary="获取特征记录统计信息",
description="获取人脸特征记录的统计摘要"
)
async def get_face_features_stats(
service: FaceFeatureService = Depends(get_face_feature_service)
):
"""
获取特征记录统计信息
"""
return service.get_statistics()
@router.get(
"/person/{person_id}/stats",
summary="获取人员特征统计信息",
description="获取指定人员的特征记录统计信息"
)
async def get_person_face_features_stats(
person_id: int,
service: FaceFeatureService = Depends(get_face_feature_service)
):
"""
获取人员特征统计信息
- **person_id**: 人员ID (路径参数)
"""
try:
stats = service.get_person_statistics(person_id)
return {
"person_id": person_id,
"total_features": stats["total_features"],
"status_summary": stats["status_summary"],
"feature_types": stats["feature_types"],
"avg_processing_time": stats["avg_processing_time"],
"successful_count": stats["successful_count"]
}
except Exception as e:
raise FaceFeatureProcessingError(detail=str(e))
async def simulate_feature_processing(
feature_id: int,
service: FaceFeatureService
):
"""
模拟人脸特征计算过程
在实际应用中,这里应该调用实际的特征计算算法
例如使用InsightFace、OpenCV等库进行人脸特征提取
Args:
feature_id: 特征记录ID
service: 人脸特征服务
"""
import asyncio
import random
try:
# 模拟计算延迟 (3-10秒)
delay = random.uniform(3, 10)
await asyncio.sleep(delay)
# 模拟成功或失败 (90%成功率)
success = random.random() < 0.9
# 完成处理
service.finish_processing(feature_id, success)
# 如果成功,添加模拟的特征数据
if success:
# 生成模拟的512维特征向量 (float32)
import numpy as np
feature_data = np.random.randn(512).astype(np.float32).tobytes()
service.update_feature_data(feature_id, feature_data)
except Exception as e:
# 如果发生异常,标记为失败
service.finish_processing(feature_id, False)
# 记录日志
import logging
logger = logging.getLogger(__name__)
logger.error(f"特征计算失败 (ID: {feature_id}): {e}")