Files
algorithm/backend/app/services/history_manager.py
2026-02-08 14:42:58 +08:00

257 lines
8.8 KiB
Python

"""历史记录管理服务,负责管理算法调用历史和用户操作历史"""
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import json
import logging
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, desc
from app.models.models import AlgorithmCall, User, Algorithm
from app.utils.file import file_storage
logger = logging.getLogger(__name__)
class HistoryManager:
"""历史记录管理器,处理算法调用历史和其他用户操作历史"""
def __init__(self):
pass
def get_user_call_history(
self,
db: Session,
user_id: str,
algorithm_id: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
status: Optional[str] = None,
skip: int = 0,
limit: int = 100
) -> List[AlgorithmCall]:
"""获取用户的调用历史"""
query = db.query(AlgorithmCall).filter(AlgorithmCall.user_id == user_id)
# 添加过滤条件
if algorithm_id:
query = query.filter(AlgorithmCall.algorithm_id == algorithm_id)
if start_date:
query = query.filter(AlgorithmCall.created_at >= start_date)
if end_date:
query = query.filter(AlgorithmCall.created_at <= end_date)
if status:
query = query.filter(AlgorithmCall.status == status)
# 按创建时间倒序排列
query = query.order_by(desc(AlgorithmCall.created_at))
# 分页
history = query.offset(skip).limit(limit).all()
return history
def get_algorithm_call_history(
self,
db: Session,
algorithm_id: str,
user_id: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
status: Optional[str] = None,
skip: int = 0,
limit: int = 100
) -> List[AlgorithmCall]:
"""获取特定算法的调用历史"""
query = db.query(AlgorithmCall).filter(AlgorithmCall.algorithm_id == algorithm_id)
# 添加过滤条件
if user_id:
query = query.filter(AlgorithmCall.user_id == user_id)
if start_date:
query = query.filter(AlgorithmCall.created_at >= start_date)
if end_date:
query = query.filter(AlgorithmCall.created_at <= end_date)
if status:
query = query.filter(AlgorithmCall.status == status)
# 按创建时间倒序排列
query = query.order_by(desc(AlgorithmCall.created_at))
# 分页
history = query.offset(skip).limit(limit).all()
return history
def get_call_statistics(
self,
db: Session,
user_id: Optional[str] = None,
algorithm_id: Optional[str] = None
) -> Dict[str, Any]:
"""获取调用统计信息"""
query = db.query(AlgorithmCall)
if user_id:
query = query.filter(AlgorithmCall.user_id == user_id)
if algorithm_id:
query = query.filter(AlgorithmCall.algorithm_id == algorithm_id)
# 总调用次数
total_calls = query.count()
# 按状态统计
status_counts = db.query(
AlgorithmCall.status,
db.func.count(AlgorithmCall.id)
).filter(
AlgorithmCall.user_id == user_id if user_id else AlgorithmCall.algorithm_id == algorithm_id
).group_by(AlgorithmCall.status).all()
status_dict = {status: count for status, count in status_counts}
# 成功率
success_count = status_dict.get('success', 0)
success_rate = (success_count / total_calls * 100) if total_calls > 0 else 0
# 平均响应时间
avg_response_time = db.query(
db.func.avg(AlgorithmCall.response_time)
).filter(
AlgorithmCall.response_time.isnot(None),
AlgorithmCall.user_id == user_id if user_id else AlgorithmCall.algorithm_id == algorithm_id
).scalar()
# 今日调用次数
today_start = datetime.combine(datetime.today().date(), datetime.min.time())
today_calls = query.filter(AlgorithmCall.created_at >= today_start).count()
return {
"total_calls": total_calls,
"status_counts": status_dict,
"success_rate": round(success_rate, 2),
"avg_response_time": round(avg_response_time, 3) if avg_response_time else None,
"today_calls": today_calls
}
def get_comparison_data(
self,
db: Session,
call_ids: List[str]
) -> List[Dict[str, Any]]:
"""获取用于对比的历史数据"""
calls = db.query(AlgorithmCall).filter(AlgorithmCall.id.in_(call_ids)).all()
comparison_data = []
for call in calls:
comparison_data.append({
"id": call.id,
"algorithm_id": call.algorithm_id,
"algorithm_name": getattr(call.algorithm, 'name', 'Unknown'),
"version_id": call.version_id,
"input_data": call.input_data,
"output_data": call.output_data,
"params": call.params,
"status": call.status,
"response_time": call.response_time,
"created_at": call.created_at.isoformat() if call.created_at else None,
"error_message": call.error_message
})
return comparison_data
def export_history(
self,
db: Session,
user_id: str,
algorithm_id: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
format_type: str = "json"
) -> Optional[str]:
"""导出历史记录"""
history = self.get_user_call_history(
db=db,
user_id=user_id,
algorithm_id=algorithm_id,
start_date=start_date,
end_date=end_date,
skip=0,
limit=10000 # 限制导出数量
)
# 转换为可序列化的格式
export_data = []
for call in history:
export_data.append({
"id": call.id,
"user_id": call.user_id,
"algorithm_id": call.algorithm_id,
"algorithm_name": getattr(call.algorithm, 'name', 'Unknown'),
"version_id": call.version_id,
"version_number": getattr(call.version, 'version', 'Unknown'),
"input_data": call.input_data,
"output_data": call.output_data,
"params": call.params,
"status": call.status,
"response_time": call.response_time,
"created_at": call.created_at.isoformat() if call.created_at else None,
"updated_at": call.updated_at.isoformat() if call.updated_at else None,
"error_message": call.error_message
})
try:
if format_type.lower() == "json":
json_str = json.dumps(export_data, ensure_ascii=False, default=str)
file_path = f"exports/history_{user_id}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
# 上传到存储
success = file_storage.upload_from_bytes(json_str.encode('utf-8'), file_path)
if success:
return file_path
else:
logger.error("Failed to upload exported history")
return None
else:
logger.error(f"Unsupported export format: {format_type}")
return None
except Exception as e:
logger.error(f"Error exporting history: {str(e)}")
return None
def delete_old_history(
self,
db: Session,
days_old: int,
user_id: Optional[str] = None,
algorithm_id: Optional[str] = None
) -> int:
"""删除旧的历史记录"""
cutoff_date = datetime.utcnow() - timedelta(days=days_old)
query = db.query(AlgorithmCall).filter(AlgorithmCall.created_at < cutoff_date)
if user_id:
query = query.filter(AlgorithmCall.user_id == user_id)
if algorithm_id:
query = query.filter(AlgorithmCall.algorithm_id == algorithm_id)
deleted_count = query.delete()
db.commit()
logger.info(f"Deleted {deleted_count} old history records")
return deleted_count
# 全局历史记录管理器实例
history_manager = HistoryManager()