Files
algorithm/backend/app/services/data_manager.py
2026-02-18 09:36:18 +08:00

310 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""数据管理服务,负责管理输入数据、输出结果和元数据"""
from typing import Dict, Any, Optional, List
from datetime import datetime
import json
import logging
from pathlib import Path
import uuid
from sqlalchemy.orm import Session
from app.models.models import AlgorithmCall
from app.utils.file import file_storage
logger = logging.getLogger(__name__)
class DataManager:
"""数据管理器,处理输入数据、输出结果和元数据的存储与检索"""
def __init__(self):
self.storage_path = "data_storage" # 数据存储路径
def save_input_data(self, user_id: str, algorithm_id: str, input_data: Dict[str, Any]) -> str:
"""保存输入数据"""
try:
# 生成唯一ID
data_id = f"input_{uuid.uuid4().hex[:12]}"
# 准备存储数据
storage_data = {
"data_id": data_id,
"user_id": user_id,
"algorithm_id": algorithm_id,
"input_data": input_data,
"created_at": datetime.utcnow().isoformat()
}
# 存储到数据库或文件系统
# 这里我们使用MinIO存储大文件数据库存储引用
file_path = f"inputs/{user_id}/{algorithm_id}/{data_id}.json"
# 转换为JSON字符串
json_str = json.dumps(storage_data, ensure_ascii=False, default=str)
# 上传到MinIO
success = file_storage.upload_from_bytes(json_str.encode('utf-8'), file_path)
if success:
logger.info(f"Input data saved: {data_id}")
return data_id
else:
logger.error(f"Failed to save input data: {data_id}")
return None
except Exception as e:
logger.error(f"Error saving input data: {str(e)}")
return None
def save_output_data(self, user_id: str, algorithm_id: str, call_id: str, output_data: Dict[str, Any]) -> str:
"""保存输出结果数据"""
try:
# 生成唯一ID
data_id = f"output_{uuid.uuid4().hex[:12]}"
# 准备存储数据
storage_data = {
"data_id": data_id,
"user_id": user_id,
"algorithm_id": algorithm_id,
"call_id": call_id,
"output_data": output_data,
"created_at": datetime.utcnow().isoformat()
}
# 存储到MinIO
file_path = f"outputs/{user_id}/{algorithm_id}/{call_id}_{data_id}.json"
# 转换为JSON字符串
json_str = json.dumps(storage_data, ensure_ascii=False, default=str)
# 上传到MinIO
success = file_storage.upload_from_bytes(json_str.encode('utf-8'), file_path)
if success:
logger.info(f"Output data saved: {data_id}")
return data_id
else:
logger.error(f"Failed to save output data: {data_id}")
return None
except Exception as e:
logger.error(f"Error saving output data: {str(e)}")
return None
def get_input_data(self, data_id: str) -> Optional[Dict[str, Any]]:
"""获取输入数据"""
try:
# 从MinIO获取数据
file_path = f"inputs/{data_id}.json"
# 下载文件内容
content = file_storage.download_file(file_path)
if content:
data = json.loads(content.decode('utf-8'))
return data
else:
logger.warning(f"Input data not found: {data_id}")
return None
except Exception as e:
logger.error(f"Error getting input data: {str(e)}")
return None
def get_output_data(self, data_id: str) -> Optional[Dict[str, Any]]:
"""获取输出结果数据"""
try:
# 从MinIO获取数据
file_path = f"outputs/{data_id}.json"
# 下载文件内容
content = file_storage.download_file(file_path)
if content:
data = json.loads(content.decode('utf-8'))
return data
else:
logger.warning(f"Output data not found: {data_id}")
return None
except Exception as e:
logger.error(f"Error getting output data: {str(e)}")
return None
def get_user_inputs(self, user_id: str, algorithm_id: str = None, limit: int = 100) -> List[Dict[str, Any]]:
"""获取用户的历史输入数据"""
try:
# 构建搜索路径
if algorithm_id:
search_prefix = f"inputs/{user_id}/{algorithm_id}/"
else:
search_prefix = f"inputs/{user_id}/"
# 列出MinIO中的文件
files = file_storage.list_files(search_prefix)
inputs = []
for file_info in files[:limit]: # 限制返回数量
try:
content = file_storage.download_file(file_info.object_name)
if content:
data = json.loads(content.decode('utf-8'))
inputs.append(data)
except Exception as e:
logger.error(f"Error processing input file {file_info.object_name}: {str(e)}")
continue
return inputs
except Exception as e:
logger.error(f"Error getting user inputs: {str(e)}")
return []
def get_user_outputs(self, user_id: str, algorithm_id: str = None, limit: int = 100) -> List[Dict[str, Any]]:
"""获取用户的历史输出数据"""
try:
# 构建搜索路径
if algorithm_id:
search_prefix = f"outputs/{user_id}/{algorithm_id}/"
else:
search_prefix = f"outputs/{user_id}/"
# 列出MinIO中的文件
files = file_storage.list_files(search_prefix)
outputs = []
for file_info in files[:limit]: # 限制返回数量
try:
content = file_storage.download_file(file_info.object_name)
if content:
data = json.loads(content.decode('utf-8'))
outputs.append(data)
except Exception as e:
logger.error(f"Error processing output file {file_info.object_name}: {str(e)}")
continue
return outputs
except Exception as e:
logger.error(f"Error getting user outputs: {str(e)}")
return []
def delete_user_data(self, user_id: str) -> bool:
"""删除用户的所有数据"""
try:
# 删除用户相关的所有输入数据
input_prefix = f"inputs/{user_id}/"
input_files = file_storage.list_files(input_prefix)
for file_info in input_files:
file_storage.remove_file(file_info.object_name)
# 删除用户相关的所有输出数据
output_prefix = f"outputs/{user_id}/"
output_files = file_storage.list_files(output_prefix)
for file_info in output_files:
file_storage.remove_file(file_info.object_name)
logger.info(f"User data deleted: {user_id}")
return True
except Exception as e:
logger.error(f"Error deleting user data: {str(e)}")
return False
def save_media_file(self, user_id: str, algorithm_id: str, file_content: bytes, file_name: str) -> Optional[str]:
"""保存媒体文件(如图片、视频等)"""
try:
# 生成唯一文件名
unique_name = f"{uuid.uuid4().hex[:12]}_{file_name}"
file_path = f"media/{user_id}/{algorithm_id}/{unique_name}"
# 上传到MinIO
success = file_storage.upload_from_bytes(file_content, file_path)
if success:
logger.info(f"Media file saved: {file_path}")
return file_path
else:
logger.error(f"Failed to save media file: {file_path}")
return None
except Exception as e:
logger.error(f"Error saving media file: {str(e)}")
return None
def get_media_file(self, file_path: str) -> Optional[bytes]:
"""获取媒体文件内容"""
try:
content = file_storage.get_object(file_path)
return content
except Exception as e:
logger.error(f"Error getting media file: {str(e)}")
return None
def create_data_snapshot(self, call_record: AlgorithmCall) -> Dict[str, Any]:
"""创建数据快照,保存调用时的完整数据状态"""
try:
snapshot = {
"snapshot_id": f"snapshot_{uuid.uuid4().hex[:12]}",
"call_id": call_record.id,
"user_id": call_record.user_id,
"algorithm_id": call_record.algorithm_id,
"version_id": call_record.version_id,
"input_data": call_record.input_data,
"output_data": call_record.output_data,
"params": call_record.params,
"status": call_record.status,
"response_time": call_record.response_time,
"created_at": call_record.created_at.isoformat() if call_record.created_at else None,
"updated_at": call_record.updated_at.isoformat() if call_record.updated_at else None
}
# 保存快照到MinIO
file_path = f"snapshots/{call_record.user_id}/{call_record.algorithm_id}/{call_record.id}.json"
json_str = json.dumps(snapshot, ensure_ascii=False, default=str)
success = file_storage.upload_from_bytes(json_str.encode('utf-8'), file_path)
if success:
logger.info(f"Data snapshot created: {snapshot['snapshot_id']}")
return snapshot
else:
logger.error(f"Failed to create data snapshot: {snapshot['snapshot_id']}")
return None
except Exception as e:
logger.error(f"Error creating data snapshot: {str(e)}")
return None
def search_data_by_metadata(self, filters: Dict[str, Any]) -> List[Dict[str, Any]]:
"""根据元数据搜索数据"""
try:
# 从数据库中获取匹配的调用记录
# 注意:这里仅作示例,实际实现可能需要更复杂的索引和搜索机制
results = []
# 如果需要按用户搜索
if filters.get('user_id'):
# 这里应该查询数据库中的相关记录
pass
# 如果需要按算法搜索
if filters.get('algorithm_id'):
# 这里应该查询数据库中的相关记录
pass
# 如果需要按日期范围搜索
if filters.get('date_range'):
# 这里应该查询数据库中的相关记录
pass
# 返回匹配的结果
return results
except Exception as e:
logger.error(f"Error searching data by metadata: {str(e)}")
return []
# 全局数据管理器实例
data_manager = DataManager()