"""算法服务管理路由,提供服务注册、管理等功能""" from fastapi import APIRouter, HTTPException, status, Depends from typing import List, Dict, Any, Optional from pydantic import BaseModel import uuid import os import logging from app.config.settings import settings from app.models.models import AlgorithmService, AlgorithmRepository, Algorithm, AlgorithmVersion from app.models.database import SessionLocal from app.models.api import ApiEndpoint from app.routes.user import get_current_active_user from app.schemas.user import UserResponse from app.services.project_analyzer import ProjectAnalyzer from app.services.service_generator import ServiceGenerator from app.services.service_orchestrator import ServiceOrchestrator from app.gitea.service import gitea_service router = APIRouter(prefix="/services", tags=["services"]) logger = logging.getLogger(__name__) class RegisterServiceRequest(BaseModel): """注册服务请求""" repository_id: str name: str version: str = "1.0.0" description: Optional[str] = "" tech_category: str = "computer_vision" output_type: str = "image" service_type: str = "http" host: str = "localhost" port: int = 8000 timeout: int = 30 health_check_path: str = "/health" environment: Dict[str, str] = {} class ServiceResponse(BaseModel): """服务响应""" id: str service_id: str name: str algorithm_name: str version: str host: str port: int api_url: str status: str created_at: str updated_at: Optional[str] class ServiceListResponse(BaseModel): """服务列表响应""" success: bool services: List[ServiceResponse] class ServiceDetailResponse(BaseModel): """服务详情响应""" success: bool service: ServiceResponse class ServiceOperationResponse(BaseModel): """服务操作响应""" success: bool message: str service_id: str status: str class ServiceStatusResponse(BaseModel): """服务状态响应""" success: bool status: str health: str class ServiceLogsResponse(BaseModel): """服务日志响应""" success: bool logs: List[str] class RepositoryAlgorithmsResponse(BaseModel): """仓库算法列表响应""" success: bool algorithms: List[Dict[str, Any]] class BatchOperationRequest(BaseModel): """批量操作请求""" service_ids: List[str] class BatchOperationResponse(BaseModel): """批量操作响应""" success: bool message: str results: List[Dict[str, Any]] # 初始化服务组件 project_analyzer = ProjectAnalyzer() service_generator = ServiceGenerator() service_orchestrator = ServiceOrchestrator(deployment_mode=settings.DEPLOYMENT_MODE, db_url=settings.DATABASE_URL) @router.post("/register", status_code=status.HTTP_201_CREATED) async def register_service( request: RegisterServiceRequest, current_user: UserResponse = Depends(get_current_active_user) ): """注册新服务""" # 检查用户权限 print(f"用户角色: {current_user.role_name}") print(f"用户角色对象: {current_user.role}") if not hasattr(current_user, 'role_name') or current_user.role_name != "admin": raise HTTPException(status_code=403, detail="Insufficient permissions") # 创建数据库会话 db = SessionLocal() try: # 1. 获取仓库信息 repo = db.query(AlgorithmRepository).filter(AlgorithmRepository.id == request.repository_id).first() if not repo: raise HTTPException(status_code=404, detail="仓库不存在") # 记录仓库信息 print(f"仓库信息: {repo.name}, {repo.description}, {repo.repo_url}") # 2. 从Gitea仓库克隆代码到本地 repo_path = f"/tmp/algorithms/{request.repository_id}" # 使用Gitea服务克隆仓库 clone_success = gitea_service.clone_repository(repo.repo_url, request.repository_id, repo.branch or "main") if not clone_success: raise HTTPException(status_code=400, detail=f"克隆仓库失败: {repo.repo_url}") print(f"仓库克隆成功: {repo_path}") # 3. 分析项目 project_info = project_analyzer.analyze_project(repo_path) if not project_info["success"]: raise HTTPException(status_code=400, detail=f"项目分析失败: {project_info['error']}") print(f"项目分析成功: {project_info}") # 4. 生成服务包装器 service_config = { "name": request.name, "version": request.version, "service_type": request.service_type, "host": request.host, "port": request.port, "timeout": request.timeout, "health_check_path": request.health_check_path, "environment": request.environment } generate_result = service_generator.generate_service(project_info, service_config) if not generate_result["success"]: raise HTTPException(status_code=400, detail=f"服务生成失败: {generate_result['error']}") print(f"服务生成成功: {generate_result}") # 5. 部署服务 service_id = str(uuid.uuid4()) deploy_result = service_orchestrator.deploy_service(service_id, service_config, project_info, repo_path) if not deploy_result["success"]: raise HTTPException(status_code=400, detail=f"服务部署失败: {deploy_result['error']}") print(f"服务部署成功: {deploy_result}") # 6. 保存服务信息到数据库 new_service = AlgorithmService( id=str(uuid.uuid4()), service_id=service_id, name=request.name, algorithm_name=repo.name, # 使用仓库名称作为算法名称 version=request.version, tech_category=request.tech_category, output_type=request.output_type, host=request.host, port=request.port, api_url=deploy_result["api_url"], status=deploy_result["status"], config={ "repository_id": request.repository_id, # 保存仓库ID "service_type": request.service_type, "timeout": request.timeout, "health_check_path": request.health_check_path, "environment": request.environment, "container_id": deploy_result["container_id"] } ) db.add(new_service) db.commit() db.refresh(new_service) # 7. 自动创建API端点 try: algorithm = db.query(Algorithm).filter(Algorithm.name == repo.name).first() if not algorithm: algorithm = Algorithm( id=str(uuid.uuid4()), name=repo.name, description=request.description or f"算法服务: {request.name}", type=request.tech_category, tech_category=request.tech_category, output_type=request.output_type ) db.add(algorithm) db.commit() db.refresh(algorithm) version = db.query(AlgorithmVersion).filter( AlgorithmVersion.algorithm_id == algorithm.id, AlgorithmVersion.version == request.version ).first() if not version: version = AlgorithmVersion( id=str(uuid.uuid4()), algorithm_id=algorithm.id, version=request.version, url=request.service_url if hasattr(request, 'service_url') else "" ) db.add(version) db.commit() db.refresh(version) api_endpoint = ApiEndpoint( id=str(uuid.uuid4()), name=request.name, description=request.description or f"{request.name} API端点", path=f"/api/v1/algorithms/{algorithm.id}/call", method="POST", algorithm_id=algorithm.id, version_id=version.id, service_id=service_id, requires_auth=False, is_public=True, status="active", config={ "service_url": deploy_result["api_url"], "timeout": request.timeout, "health_check_path": request.health_check_path } ) db.add(api_endpoint) db.commit() logger.info(f"API端点创建成功: {api_endpoint.name}, 路径: {api_endpoint.path}") except Exception as e: logger.error(f"创建API端点失败: {str(e)}") # 8. 返回响应 return { "success": True, "message": "服务注册成功", "service": { "id": new_service.id, "service_id": new_service.service_id, "name": new_service.name, "algorithm_name": new_service.algorithm_name, "version": new_service.version, "host": new_service.host, "port": new_service.port, "api_url": new_service.api_url, "status": new_service.status, "created_at": new_service.created_at.isoformat(), "updated_at": new_service.updated_at.isoformat() if new_service.updated_at else None } } finally: db.close() @router.get("", response_model=ServiceListResponse) async def list_services( current_user: UserResponse = Depends(get_current_active_user) ): """获取服务列表""" # 检查用户权限 if current_user.role_name != "admin": raise HTTPException(status_code=403, detail="Insufficient permissions") # 创建数据库会话 db = SessionLocal() try: # 查询服务列表 services = db.query(AlgorithmService).all() # 转换为响应格式 service_list = [] for service in services: service_list.append(ServiceResponse( id=service.id, service_id=service.service_id, name=service.name, algorithm_name=service.algorithm_name, version=service.version, host=service.host, port=service.port, api_url=service.api_url, status=service.status, created_at=service.created_at.isoformat(), updated_at=service.updated_at.isoformat() if service.updated_at else None )) return ServiceListResponse( success=True, services=service_list ) finally: db.close() @router.get("/{service_id}", response_model=ServiceDetailResponse) async def get_service( service_id: str, current_user: UserResponse = Depends(get_current_active_user) ): """获取服务详情""" # 检查用户权限 if current_user.role_name != "admin": raise HTTPException(status_code=403, detail="Insufficient permissions") # 创建数据库会话 db = SessionLocal() try: # 查询服务 service = db.query(AlgorithmService).filter(AlgorithmService.service_id == service_id).first() if not service: raise HTTPException(status_code=404, detail="Service not found") # 返回响应 return ServiceDetailResponse( success=True, service=ServiceResponse( id=service.id, service_id=service.service_id, name=service.name, algorithm_name=service.algorithm_name, version=service.version, host=service.host, port=service.port, api_url=service.api_url, status=service.status, created_at=service.created_at.isoformat(), updated_at=service.updated_at.isoformat() if service.updated_at else None ) ) finally: db.close() @router.post("/{service_id}/start") async def start_service( service_id: str, current_user: UserResponse = Depends(get_current_active_user) ): """启动服务""" # 检查用户权限 if current_user.role_name != "admin": raise HTTPException(status_code=403, detail="Insufficient permissions") # 创建数据库会话 db = SessionLocal() try: # 查询服务 service = db.query(AlgorithmService).filter(AlgorithmService.service_id == service_id).first() if not service: raise HTTPException(status_code=404, detail="Service not found") # 获取容器ID container_id = service.config.get("container_id") if not container_id: raise HTTPException(status_code=400, detail="Container ID not found") # 启动服务 start_result = service_orchestrator.start_service(service_id, container_id) # 如果启动失败,尝试从数据库重新注册服务 if not start_result["success"]: print(f"服务启动失败: {start_result['error']},尝试从数据库重新注册服务") # 获取仓库信息 repository_id = service.config.get("repository_id") if not repository_id: raise HTTPException(status_code=400, detail="Repository ID not found in service config") repository = db.query(AlgorithmRepository).filter(AlgorithmRepository.id == repository_id).first() if not repository: raise HTTPException(status_code=404, detail="Repository not found") # 从Gitea克隆仓库 clone_success = gitea_service.clone_repository( repository.repo_url, service_id, repository.branch or "main" ) if not clone_success: raise HTTPException(status_code=400, detail="Failed to clone repository") # 仓库路径 repo_path = f"/tmp/algorithms/{service_id}" # 分析项目 project_info = project_analyzer.analyze_project(repo_path) if not project_info: raise HTTPException(status_code=400, detail="Failed to analyze project") # 生成服务 service_config = { "name": service.name, "version": service.version, "host": service.host, "port": service.port, "timeout": service.config.get("timeout", 30), "health_check_path": service.config.get("health_check_path", "/health"), "environment": service.config.get("environment", {}) } # 部署服务 deploy_result = service_orchestrator.deploy_service(service_id, project_info, service_config, repo_path) if not deploy_result["success"]: raise HTTPException(status_code=400, detail=f"服务部署失败: {deploy_result['error']}") # 更新服务配置 service.config["container_id"] = deploy_result["container_id"] service.api_url = deploy_result["api_url"] db.commit() start_result = { "success": True, "service_id": service_id, "status": "running", "error": None } # 更新服务状态 service.status = start_result["status"] db.commit() # 返回响应 return ServiceOperationResponse( success=True, message="服务启动成功", service_id=service_id, status=start_result["status"] ) finally: db.close() @router.post("/{service_id}/stop") async def stop_service( service_id: str, current_user: UserResponse = Depends(get_current_active_user) ): """停止服务""" # 检查用户权限 if current_user.role_name != "admin": raise HTTPException(status_code=403, detail="Insufficient permissions") # 创建数据库会话 db = SessionLocal() try: # 查询服务 service = db.query(AlgorithmService).filter(AlgorithmService.service_id == service_id).first() if not service: raise HTTPException(status_code=404, detail="Service not found") # 获取容器ID container_id = service.config.get("container_id") if not container_id: raise HTTPException(status_code=400, detail="Container ID not found") # 停止服务 stop_result = service_orchestrator.stop_service(service_id, container_id) if not stop_result["success"]: raise HTTPException(status_code=400, detail=f"服务停止失败: {stop_result['error']}") # 更新服务状态 service.status = stop_result["status"] db.commit() # 返回响应 return ServiceOperationResponse( success=True, message="服务停止成功", service_id=service_id, status=stop_result["status"] ) finally: db.close() @router.post("/{service_id}/restart") async def restart_service( service_id: str, current_user: UserResponse = Depends(get_current_active_user) ): """重启服务""" # 检查用户权限 if current_user.role_name != "admin": raise HTTPException(status_code=403, detail="Insufficient permissions") # 创建数据库会话 db = SessionLocal() try: # 查询服务 service = db.query(AlgorithmService).filter(AlgorithmService.service_id == service_id).first() if not service: raise HTTPException(status_code=404, detail="Service not found") # 获取容器ID container_id = service.config.get("container_id") if not container_id: raise HTTPException(status_code=400, detail="Container ID not found") # 重启服务 restart_result = service_orchestrator.restart_service(service_id, container_id) if not restart_result["success"]: raise HTTPException(status_code=400, detail=f"服务重启失败: {restart_result['error']}") # 更新服务状态 service.status = restart_result["status"] db.commit() # 返回响应 return ServiceOperationResponse( success=True, message="服务重启成功", service_id=service_id, status=restart_result["status"] ) finally: db.close() @router.delete("/{service_id}") async def delete_service( service_id: str, current_user: UserResponse = Depends(get_current_active_user) ): """删除服务""" # 检查用户权限 if current_user.role_name != "admin": raise HTTPException(status_code=403, detail="Insufficient permissions") # 创建数据库会话 db = SessionLocal() try: # 查询服务 service = db.query(AlgorithmService).filter(AlgorithmService.service_id == service_id).first() if not service: raise HTTPException(status_code=404, detail="Service not found") # 先删除关联的API端点 db.query(ApiEndpoint).filter(ApiEndpoint.service_id == service_id).delete() # 获取算法名称,用于后续删除算法记录 algorithm_name = service.algorithm_name # 获取容器ID和镜像名称 container_id = service.config.get("container_id") image_name = f"algorithm-service-{service_id}:{service.version}" # 删除服务 delete_result = service_orchestrator.delete_service(service_id, container_id, image_name) if not delete_result["success"]: # 继续执行,即使Docker操作失败 pass # 删除数据库记录 db.delete(service) # 删除关联的算法记录(通过算法名称匹配) if algorithm_name: algorithm = db.query(Algorithm).filter(Algorithm.name == algorithm_name).first() if algorithm: # 先删除关联的算法版本 db.query(AlgorithmVersion).filter(AlgorithmVersion.algorithm_id == algorithm.id).delete() # 再删除算法记录 db.query(AlgorithmCall).filter(AlgorithmCall.algorithm_id == algorithm.id).delete() db.delete(algorithm) db.commit() # 返回响应 return { "success": True, "message": "服务删除成功", "service_id": service_id } finally: db.close() @router.get("/{service_id}/status") async def get_service_status( service_id: str, current_user: UserResponse = Depends(get_current_active_user) ): """获取服务状态""" # 检查用户权限 if current_user.role_name != "admin": raise HTTPException(status_code=403, detail="Insufficient permissions") # 创建数据库会话 db = SessionLocal() try: # 查询服务 service = db.query(AlgorithmService).filter(AlgorithmService.service_id == service_id).first() if not service: raise HTTPException(status_code=404, detail="Service not found") # 获取容器ID container_id = service.config.get("container_id") if not container_id: raise HTTPException(status_code=400, detail="Container ID not found") # 获取服务状态 status_result = service_orchestrator.get_service_status(container_id) if not status_result["success"]: raise HTTPException(status_code=400, detail=f"获取服务状态失败: {status_result['error']}") # 返回响应 return ServiceStatusResponse( success=True, status=status_result["status"], health=status_result["health"] ) finally: db.close() @router.get("/{service_id}/logs") async def get_service_logs( service_id: str, lines: int = 100, current_user: UserResponse = Depends(get_current_active_user) ): """获取服务日志""" # 检查用户权限 if current_user.role_name != "admin": raise HTTPException(status_code=403, detail="Insufficient permissions") # 创建数据库会话 db = SessionLocal() try: # 查询服务 service = db.query(AlgorithmService).filter(AlgorithmService.service_id == service_id).first() if not service: raise HTTPException(status_code=404, detail="Service not found") # 获取容器ID container_id = service.config.get("container_id") if not container_id: raise HTTPException(status_code=400, detail="Container ID not found") # 获取服务日志 logs_result = service_orchestrator.get_service_logs(container_id, lines) if not logs_result["success"]: raise HTTPException(status_code=400, detail=f"获取服务日志失败: {logs_result['error']}") # 返回响应 return ServiceLogsResponse( success=True, logs=logs_result["logs"] ) finally: db.close() @router.get("/repository/algorithms") async def get_repository_algorithms( repository_id: str, current_user: UserResponse = Depends(get_current_active_user) ): """获取仓库中的算法列表""" # 检查用户权限 if current_user.role_name != "admin": raise HTTPException(status_code=403, detail="Insufficient permissions") try: # 模拟获取仓库中的算法列表 # 注意:在实际实现中,应该从算法仓库中获取真实的算法列表 algorithms = [ { "id": "alg-001", "name": "图像分类算法", "description": "基于深度学习的图像分类算法", "type": "computer_vision", "entry_point": "algorithm.py" }, { "id": "alg-002", "name": "文本分类算法", "description": "基于BERT的文本分类算法", "type": "nlp", "entry_point": "text_algorithm.py" } ] return RepositoryAlgorithmsResponse( success=True, algorithms=algorithms ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # 批量服务操作API @router.post("/batch/start") async def batch_start_services( request: BatchOperationRequest, current_user: UserResponse = Depends(get_current_active_user) ): """批量启动服务""" # 检查用户权限 if current_user.role_name != "admin": raise HTTPException(status_code=403, detail="Insufficient permissions") # 创建数据库会话 db = SessionLocal() try: results = [] success_count = 0 for service_id in request.service_ids: # 查询服务 service = db.query(AlgorithmService).filter(AlgorithmService.service_id == service_id).first() if not service: results.append({ "service_id": service_id, "success": False, "message": "服务不存在" }) continue # 获取容器ID container_id = service.config.get("container_id") if not container_id: results.append({ "service_id": service_id, "success": False, "message": "容器ID不存在" }) continue # 启动服务 start_result = service_orchestrator.start_service(service_id, container_id) if start_result["success"]: # 更新服务状态 service.status = start_result["status"] db.commit() success_count += 1 results.append({ "service_id": service_id, "success": True, "message": "服务启动成功" }) else: results.append({ "service_id": service_id, "success": False, "message": f"服务启动失败: {start_result['error']}" }) return BatchOperationResponse( success=True, message=f"批量启动完成,成功{success_count}个,失败{len(request.service_ids) - success_count}个", results=results ) finally: db.close() @router.post("/batch/stop") async def batch_stop_services( request: BatchOperationRequest, current_user: UserResponse = Depends(get_current_active_user) ): """批量停止服务""" # 检查用户权限 if current_user.role_name != "admin": raise HTTPException(status_code=403, detail="Insufficient permissions") # 创建数据库会话 db = SessionLocal() try: results = [] success_count = 0 for service_id in request.service_ids: # 查询服务 service = db.query(AlgorithmService).filter(AlgorithmService.service_id == service_id).first() if not service: results.append({ "service_id": service_id, "success": False, "message": "服务不存在" }) continue # 获取容器ID container_id = service.config.get("container_id") if not container_id: results.append({ "service_id": service_id, "success": False, "message": "容器ID不存在" }) continue # 停止服务 stop_result = service_orchestrator.stop_service(service_id, container_id) if stop_result["success"]: # 更新服务状态 service.status = stop_result["status"] db.commit() success_count += 1 results.append({ "service_id": service_id, "success": True, "message": "服务停止成功" }) else: results.append({ "service_id": service_id, "success": False, "message": f"服务停止失败: {stop_result['error']}" }) return BatchOperationResponse( success=True, message=f"批量停止完成,成功{success_count}个,失败{len(request.service_ids) - success_count}个", results=results ) finally: db.close() @router.post("/batch/restart") async def batch_restart_services( request: BatchOperationRequest, current_user: UserResponse = Depends(get_current_active_user) ): """批量重启服务""" # 检查用户权限 if current_user.role_name != "admin": raise HTTPException(status_code=403, detail="Insufficient permissions") # 创建数据库会话 db = SessionLocal() try: results = [] success_count = 0 for service_id in request.service_ids: # 查询服务 service = db.query(AlgorithmService).filter(AlgorithmService.service_id == service_id).first() if not service: results.append({ "service_id": service_id, "success": False, "message": "服务不存在" }) continue # 获取容器ID container_id = service.config.get("container_id") if not container_id: results.append({ "service_id": service_id, "success": False, "message": "容器ID不存在" }) continue # 重启服务 restart_result = service_orchestrator.restart_service(service_id, container_id) if restart_result["success"]: # 更新服务状态 service.status = restart_result["status"] db.commit() success_count += 1 results.append({ "service_id": service_id, "success": True, "message": "服务重启成功" }) else: results.append({ "service_id": service_id, "success": False, "message": f"服务重启失败: {restart_result['error']}" }) return BatchOperationResponse( success=True, message=f"批量重启完成,成功{success_count}个,失败{len(request.service_ids) - success_count}个", results=results ) finally: db.close() @router.post("/batch/delete") async def batch_delete_services( request: BatchOperationRequest, current_user: UserResponse = Depends(get_current_active_user) ): """批量删除服务""" # 检查用户权限 if current_user.role_name != "admin": raise HTTPException(status_code=403, detail="Insufficient permissions") # 创建数据库会话 db = SessionLocal() try: results = [] success_count = 0 for service_id in request.service_ids: # 查询服务 service = db.query(AlgorithmService).filter(AlgorithmService.service_id == service_id).first() if not service: results.append({ "service_id": service_id, "success": False, "message": "服务不存在" }) continue # 获取容器ID和镜像名称 container_id = service.config.get("container_id") image_name = f"algorithm-service-{service_id}:{service.version}" # 删除服务 delete_result = service_orchestrator.delete_service(service_id, container_id, image_name) # 删除数据库记录 db.delete(service) db.commit() success_count += 1 results.append({ "service_id": service_id, "success": True, "message": "服务删除成功" }) return BatchOperationResponse( success=True, message=f"批量删除完成,成功{success_count}个,失败{len(request.service_ids) - success_count}个", results=results ) finally: db.close() class ServiceCallRequest(BaseModel): """服务调用请求""" service_id: str payload: Dict[str, Any] class ServiceCallResponse(BaseModel): """服务调用响应""" success: bool result: Dict[str, Any] service_id: str execution_time: float error: Optional[str] = None @router.post("/call") async def call_service( request: ServiceCallRequest, current_user: UserResponse = Depends(get_current_active_user) ): """直接调用注册的服务""" import time import httpx # 创建数据库会话 db = SessionLocal() try: # 查询服务 service = db.query(AlgorithmService).filter( AlgorithmService.service_id == request.service_id ).first() if not service: raise HTTPException(status_code=404, detail="服务不存在") # 检查服务状态 if service.status != "running": raise HTTPException( status_code=503, detail=f"服务未运行,当前状态: {service.status}" ) # 调用服务 start_time = time.time() try: # 构建服务URL service_url = service.api_url # 如果URL没有路径,添加默认路径 if not service_url.endswith("/"): service_url += "/" # 添加调用端点 call_url = f"{service_url}predict" # 使用httpx调用服务 async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( call_url, json=request.payload, headers={"Content-Type": "application/json"} ) execution_time = time.time() - start_time if response.status_code == 200: return ServiceCallResponse( success=True, result=response.json(), service_id=request.service_id, execution_time=execution_time ) else: return ServiceCallResponse( success=False, result={}, service_id=request.service_id, execution_time=execution_time, error=f"服务返回错误: HTTP {response.status_code} - {response.text}" ) except httpx.RequestError as e: execution_time = time.time() - start_time return ServiceCallResponse( success=False, result={}, service_id=request.service_id, execution_time=execution_time, error=f"无法连接到服务: {str(e)}" ) except Exception as e: execution_time = time.time() - start_time return ServiceCallResponse( success=False, result={}, service_id=request.service_id, execution_time=execution_time, error=f"服务调用异常: {str(e)}" ) finally: db.close() @router.post("/sync-api-endpoints") async def sync_api_endpoints( current_user: UserResponse = Depends(get_current_active_user) ): """同步所有服务到API端点""" if current_user.role_name != "admin": raise HTTPException(status_code=403, detail="权限不足") db = SessionLocal() try: services = db.query(AlgorithmService).all() synced_count = 0 for service in services: existing_endpoint = db.query(ApiEndpoint).filter( (ApiEndpoint.service_id == service.service_id) | (ApiEndpoint.path == f"/api/v1/algorithms/{service.algorithm_name}/call") ).first() if existing_endpoint: continue algorithm = db.query(Algorithm).filter(Algorithm.name == service.algorithm_name).first() if not algorithm: algorithm = Algorithm( id=str(uuid.uuid4()), name=service.algorithm_name, description=f"算法服务: {service.name}", type=service.tech_category or "computer_vision", tech_category=service.tech_category or "computer_vision", output_type=service.output_type or "image" ) db.add(algorithm) db.commit() db.refresh(algorithm) version = db.query(AlgorithmVersion).filter( AlgorithmVersion.algorithm_id == algorithm.id ).first() if not version: version = AlgorithmVersion( id=str(uuid.uuid4()), algorithm_id=algorithm.id, version=service.version or "1.0.0", url=service.api_url ) db.add(version) db.commit() db.refresh(version) api_endpoint = ApiEndpoint( id=str(uuid.uuid4()), name=service.name, description=f"{service.name} API端点", path=f"/api/v1/algorithms/{algorithm.id}/call/{service.service_id[:8]}", method="POST", algorithm_id=algorithm.id, version_id=version.id, service_id=service.service_id, requires_auth=False, is_public=True, status=service.status or "active", config={ "service_url": service.api_url, "timeout": service.config.get("timeout") if service.config else 30 } ) db.add(api_endpoint) synced_count += 1 db.commit() return { "success": True, "message": f"同步完成,共同步 {synced_count} 个API端点" } except Exception as e: logger.error(f"同步API端点失败: {str(e)}") raise HTTPException(status_code=500, detail=f"同步失败: {str(e)}") finally: db.close()