""" 主程序示例 演示如何使用各个模块 """ import asyncio from datetime import datetime, timedelta from src.config import settings from src.database.connection import db_manager, init_database from src.models.face_feature import SurFaceFeature from src.repositories.face_feature_repository import FaceFeatureRepository from src.services.face_feature_service import FaceFeatureService from src.schemas.face_feature import ( FaceFeatureCreate, FaceFeatureUpdate, FaceFeatureQuery, FeatureStatus, BatchFaceFeatureCreate ) def demo_sync_operations(): """演示同步操作""" print("=== 同步操作演示 ===") # 初始化数据库 init_database() # 创建仓库和服务 with db_manager.get_session() as session: repository = FaceFeatureRepository(session) service = FaceFeatureService(repository) try: # 1. 创建特征记录(先检查是否存在) print("\n1. 创建特征记录") # 固定的测试ID test_person_id = 10 test_feature_type = 1 # 检查是否已存在 existing = service.get_feature_by_person_and_type(test_person_id, test_feature_type) if existing: print(f"记录已存在: ID={existing.id}, 状态={existing.status_name}") feature_id = existing.id else: feature_data = FaceFeatureCreate( person_id=test_person_id, feature_type=test_feature_type, pic_id="test_image_001.jpg", status=FeatureStatus.NOT_STARTED ) feature_response = service.create_feature(feature_data) print(f"创建成功: ID={feature_response.id}, 人员ID={feature_response.person_id}") feature_id = feature_response.id # 2. 开始处理(如果未开始) print("\n2. 检查并开始处理特征计算") feature = service.get_feature(feature_id) if feature and feature.status == FeatureStatus.NOT_STARTED: if service.start_processing(feature_id): print(f"已开始处理: ID={feature_id}") else: print(f"特征计算已开始或已完成: 状态={feature.status_name if feature else '未知'}") # 3. 完成处理(如果还在处理中) print("\n3. 检查并完成处理特征计算") feature = service.get_feature(feature_id) if feature and feature.status == FeatureStatus.PROCESSING: if service.finish_processing(feature_id, success=True): print(f"已完成处理: ID={feature_id}") else: print(f"特征计算已完成或未开始: 状态={feature.status_name if feature else '未知'}") # 4. 查询特征 print("\n4. 查询特征记录") retrieved = service.get_feature(feature_id) if retrieved: print(f"查询成功: ID={retrieved.id}, 状态={retrieved.status_name}") print(f"处理时间: {retrieved.processing_time}秒") print(f"是否有特征数据: {retrieved.has_feature_data}") # 5. 批量创建 - 跳过已存在的 print("\n5. 批量创建特征记录") batch_items = [] for i in range(3): person_id = 2000 + i # 检查是否已存在 existing = service.get_feature_by_person_and_type(person_id, 1) if not existing: batch_items.append(FaceFeatureCreate(person_id=person_id, feature_type=1)) if batch_items: batch_data = BatchFaceFeatureCreate(items=batch_items) try: batch_result = service.create_features_batch(batch_data) print(f"批量创建成功: {len(batch_result)}条记录") except ValueError as e: print(f"批量创建失败: {e}") else: print("所有记录已存在,跳过批量创建") # 6. 查询列表 print("\n6. 查询特征记录列表") query = FaceFeatureQuery( feature_type=1, start_date=datetime.now() - timedelta(days=1) ) result = service.query_features(query, page=1, page_size=10) print(f"查询结果: 共{result.total}条记录,本页{len(result.items)}条") if result.items: print(f"第一笔记录: ID={result.items[0].id}, 人员ID={result.items[0].person_id}") # 7. 获取统计信息 print("\n7. 获取统计信息") stats = service.get_statistics() print(f"总记录数: {stats.total_count}") print(f"状态分布: {stats.by_status}") print(f"特征类型分布: {stats.by_feature_type}") # 8. 更新特征数据 print("\n8. 更新特征数据") test_feature_data = b"test_feature_data_12345" success = service.update_feature_data(feature_id, test_feature_data) print(f"更新特征数据: {'成功' if success else '失败'}") # 重新查询查看更新后的数据 updated_feature = service.get_feature(feature_id) if updated_feature: print(f"更新后是否有特征数据: {updated_feature.has_feature_data}") # 9. 演示删除操作 print("\n9. 演示删除操作") # 先创建一个要删除的记录 delete_person_id = 9999 # 使用一个不存在的ID delete_feature_data = FaceFeatureCreate( person_id=delete_person_id, feature_type=3, pic_id="to_delete.jpg" ) delete_feature = service.create_feature(delete_feature_data) print(f"创建待删除记录: ID={delete_feature.id}") # 删除记录 delete_success = service.delete_feature(delete_feature.id) print(f"删除记录: {'成功' if delete_success else '失败'}") # 验证删除 deleted_check = service.get_feature(delete_feature.id) print(f"验证删除: {'记录不存在' if deleted_check is None else '记录还存在'}") except Exception as e: print(f"操作失败: {e}") import traceback traceback.print_exc() # 关闭数据库连接 db_manager.close() print("\n数据库连接已关闭") async def demo_async_operations(): """演示异步操作""" print("\n=== 异步操作演示 ===") # 注意:异步操作需要异步数据库管理器 # 这里仅展示结构,实际使用时需要配置异步数据库 print("异步操作示例代码已准备,需要配置异步数据库连接") print("请使用 async_db_manager 和异步版本的repository") def main(): """主函数""" print("人脸特征管理系统演示") print("=" * 50) # 显示配置信息 print(f"应用名称: {settings.APP_NAME}") print(f"数据库: {settings.DATABASE_NAME}") print(f"连接池大小: {settings.DATABASE_POOL_SIZE}") try: # 执行同步演示 demo_sync_operations() # 如果需要异步演示,可以取消下面的注释 # asyncio.run(demo_async_operations()) except KeyboardInterrupt: print("\n程序被用户中断") except Exception as e: print(f"程序执行出错: {e}") import traceback traceback.print_exc() print("\n演示完成!") if __name__ == "__main__": main()