207 lines
7.6 KiB
Python
207 lines
7.6 KiB
Python
"""
|
|
主程序示例
|
|
演示如何使用各个模块
|
|
"""
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
|
|
from src.config import settings
|
|
from src.database.connection import db_manager, init_database
|
|
from src.models.face_feature import SurFaceFeature
|
|
from src.repositories.face_feature_repository import FaceFeatureRepository
|
|
from src.services.face_feature_service import FaceFeatureService
|
|
from src.schemas.face_feature import (
|
|
FaceFeatureCreate,
|
|
FaceFeatureUpdate,
|
|
FaceFeatureQuery,
|
|
FeatureStatus,
|
|
BatchFaceFeatureCreate
|
|
)
|
|
|
|
|
|
def demo_sync_operations():
|
|
"""演示同步操作"""
|
|
print("=== 同步操作演示 ===")
|
|
|
|
# 初始化数据库
|
|
init_database()
|
|
|
|
# 创建仓库和服务
|
|
with db_manager.get_session() as session:
|
|
repository = FaceFeatureRepository(session)
|
|
service = FaceFeatureService(repository)
|
|
|
|
try:
|
|
# 1. 创建特征记录(先检查是否存在)
|
|
print("\n1. 创建特征记录")
|
|
|
|
# 固定的测试ID
|
|
test_person_id = 1001
|
|
test_feature_type = 1
|
|
|
|
# 检查是否已存在
|
|
existing = service.get_feature_by_person_and_type(test_person_id, test_feature_type)
|
|
|
|
if existing:
|
|
print(f"记录已存在: ID={existing.id}, 状态={existing.status_name}")
|
|
feature_id = existing.id
|
|
else:
|
|
feature_data = FaceFeatureCreate(
|
|
person_id=test_person_id,
|
|
feature_type=test_feature_type,
|
|
pic_id="test_image_001.jpg",
|
|
status=FeatureStatus.NOT_STARTED
|
|
)
|
|
|
|
feature_response = service.create_feature(feature_data)
|
|
print(f"创建成功: ID={feature_response.id}, 人员ID={feature_response.person_id}")
|
|
feature_id = feature_response.id
|
|
|
|
# 2. 开始处理(如果未开始)
|
|
print("\n2. 检查并开始处理特征计算")
|
|
feature = service.get_feature(feature_id)
|
|
if feature and feature.status == FeatureStatus.NOT_STARTED:
|
|
if service.start_processing(feature_id):
|
|
print(f"已开始处理: ID={feature_id}")
|
|
else:
|
|
print(f"特征计算已开始或已完成: 状态={feature.status_name if feature else '未知'}")
|
|
|
|
# 3. 完成处理(如果还在处理中)
|
|
print("\n3. 检查并完成处理特征计算")
|
|
feature = service.get_feature(feature_id)
|
|
if feature and feature.status == FeatureStatus.PROCESSING:
|
|
if service.finish_processing(feature_id, success=True):
|
|
print(f"已完成处理: ID={feature_id}")
|
|
else:
|
|
print(f"特征计算已完成或未开始: 状态={feature.status_name if feature else '未知'}")
|
|
|
|
# 4. 查询特征
|
|
print("\n4. 查询特征记录")
|
|
retrieved = service.get_feature(feature_id)
|
|
if retrieved:
|
|
print(f"查询成功: ID={retrieved.id}, 状态={retrieved.status_name}")
|
|
print(f"处理时间: {retrieved.processing_time}秒")
|
|
print(f"是否有特征数据: {retrieved.has_feature_data}")
|
|
|
|
# 5. 批量创建 - 跳过已存在的
|
|
print("\n5. 批量创建特征记录")
|
|
batch_items = []
|
|
for i in range(3):
|
|
person_id = 2000 + i
|
|
# 检查是否已存在
|
|
existing = service.get_feature_by_person_and_type(person_id, 1)
|
|
if not existing:
|
|
batch_items.append(FaceFeatureCreate(person_id=person_id, feature_type=1))
|
|
|
|
if batch_items:
|
|
batch_data = BatchFaceFeatureCreate(items=batch_items)
|
|
try:
|
|
batch_result = service.create_features_batch(batch_data)
|
|
print(f"批量创建成功: {len(batch_result)}条记录")
|
|
except ValueError as e:
|
|
print(f"批量创建失败: {e}")
|
|
else:
|
|
print("所有记录已存在,跳过批量创建")
|
|
|
|
# 6. 查询列表
|
|
print("\n6. 查询特征记录列表")
|
|
query = FaceFeatureQuery(
|
|
feature_type=1,
|
|
start_date=datetime.now() - timedelta(days=1)
|
|
)
|
|
|
|
result = service.query_features(query, page=1, page_size=10)
|
|
print(f"查询结果: 共{result.total}条记录,本页{len(result.items)}条")
|
|
|
|
if result.items:
|
|
print(f"第一笔记录: ID={result.items[0].id}, 人员ID={result.items[0].person_id}")
|
|
|
|
# 7. 获取统计信息
|
|
print("\n7. 获取统计信息")
|
|
stats = service.get_statistics()
|
|
print(f"总记录数: {stats.total_count}")
|
|
print(f"状态分布: {stats.by_status}")
|
|
print(f"特征类型分布: {stats.by_feature_type}")
|
|
|
|
# 8. 更新特征数据
|
|
print("\n8. 更新特征数据")
|
|
test_feature_data = b"test_feature_data_12345"
|
|
success = service.update_feature_data(feature_id, test_feature_data)
|
|
print(f"更新特征数据: {'成功' if success else '失败'}")
|
|
|
|
# 重新查询查看更新后的数据
|
|
updated_feature = service.get_feature(feature_id)
|
|
if updated_feature:
|
|
print(f"更新后是否有特征数据: {updated_feature.has_feature_data}")
|
|
|
|
# 9. 演示删除操作
|
|
print("\n9. 演示删除操作")
|
|
# 先创建一个要删除的记录
|
|
delete_person_id = 9999 # 使用一个不存在的ID
|
|
delete_feature_data = FaceFeatureCreate(
|
|
person_id=delete_person_id,
|
|
feature_type=3,
|
|
pic_id="to_delete.jpg"
|
|
)
|
|
delete_feature = service.create_feature(delete_feature_data)
|
|
print(f"创建待删除记录: ID={delete_feature.id}")
|
|
|
|
# 删除记录
|
|
delete_success = service.delete_feature(delete_feature.id)
|
|
print(f"删除记录: {'成功' if delete_success else '失败'}")
|
|
|
|
# 验证删除
|
|
deleted_check = service.get_feature(delete_feature.id)
|
|
print(f"验证删除: {'记录不存在' if deleted_check is None else '记录还存在'}")
|
|
|
|
except Exception as e:
|
|
print(f"操作失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# 关闭数据库连接
|
|
db_manager.close()
|
|
print("\n数据库连接已关闭")
|
|
|
|
|
|
async def demo_async_operations():
|
|
"""演示异步操作"""
|
|
print("\n=== 异步操作演示 ===")
|
|
|
|
# 注意:异步操作需要异步数据库管理器
|
|
# 这里仅展示结构,实际使用时需要配置异步数据库
|
|
|
|
print("异步操作示例代码已准备,需要配置异步数据库连接")
|
|
print("请使用 async_db_manager 和异步版本的repository")
|
|
|
|
|
|
def main():
|
|
"""主函数"""
|
|
print("人脸特征管理系统演示")
|
|
print("=" * 50)
|
|
|
|
# 显示配置信息
|
|
print(f"应用名称: {settings.APP_NAME}")
|
|
print(f"数据库: {settings.DATABASE_NAME}")
|
|
print(f"连接池大小: {settings.DATABASE_POOL_SIZE}")
|
|
|
|
try:
|
|
# 执行同步演示
|
|
demo_sync_operations()
|
|
|
|
# 如果需要异步演示,可以取消下面的注释
|
|
# asyncio.run(demo_async_operations())
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n程序被用户中断")
|
|
except Exception as e:
|
|
print(f"程序执行出错: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
print("\n演示完成!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |