"""服务编排服务,用于管理算法服务的生命周期""" import os import json import time import docker import uuid import subprocess import signal import psutil from typing import Dict, Any, Optional from docker.errors import DockerException, NotFound # 数据库相关导入 try: from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker DATABASE_AVAILABLE = True except ImportError: DATABASE_AVAILABLE = False class ServiceOrchestrator: """服务编排服务""" def __init__(self, deployment_mode="local", db_url=None): """初始化服务编排器 Args: deployment_mode: 部署模式,支持"docker"和"local" db_url: 数据库连接URL,用于重新加载服务信息 """ self.deployment_mode = deployment_mode self.processes = {} # 存储本地进程信息 self.db_url = db_url self.db_engine = None self.db_session = None # 初始化数据库连接 if db_url and DATABASE_AVAILABLE: try: self.db_engine = create_engine(db_url) self.db_session = sessionmaker(bind=self.db_engine)() print("数据库连接成功") except Exception as e: print(f"数据库连接失败: {e}") self.db_engine = None self.db_session = None if deployment_mode == "docker": try: # 连接Docker客户端 self.client = docker.from_env() # 测试连接 self.client.ping() print("Docker连接成功") except DockerException as e: print(f"Docker连接失败: {e}") self.client = None else: self.client = None print("使用本地进程部署模式") def deploy_service(self, service_id: str, service_config: Dict[str, Any], project_info: Dict[str, Any], repo_path: str = None) -> Dict[str, Any]: """部署服务 Args: service_id: 服务ID service_config: 服务配置 project_info: 项目信息 repo_path: 仓库路径(用于复制真实的算法文件) Returns: 部署结果 """ try: if self.deployment_mode == "docker": if not self.client: return { "success": False, "error": "Docker连接失败", "service_id": service_id, "container_id": None, "status": "error", "api_url": None } # 1. 构建Docker镜像 image_name = self._build_docker_image(service_id, project_info, service_config) # 2. 启动服务容器 container_id = self._start_service_container(service_id, image_name, service_config) # 3. 验证服务启动 if not self._verify_service_startup(container_id, service_config): return { "success": False, "error": "服务启动验证失败", "service_id": service_id, "container_id": container_id, "status": "error", "api_url": None } # 4. 构建API URL api_url = f"http://{service_config.get('host', 'localhost')}:{service_config.get('port', 8000)}" return { "success": True, "service_id": service_id, "container_id": container_id, "status": "running", "api_url": api_url, "error": None } else: # 本地进程部署 # 1. 创建服务目录 service_dir = self._create_service_directory(service_id) # 2. 生成服务包装器 self._generate_local_service_wrapper(service_dir, project_info, service_config, repo_path) # 3. 启动服务进程 process_info = self._start_local_service_process(service_id, service_dir, project_info, service_config) # 4. 验证服务启动 if not self._verify_local_service_startup(service_id, service_config): return { "success": False, "error": "服务启动验证失败", "service_id": service_id, "container_id": None, "status": "error", "api_url": None } # 5. 构建API URL api_url = f"http://{service_config.get('host', 'localhost')}:{service_config.get('port', 8000)}" return { "success": True, "service_id": service_id, "container_id": service_id, # 使用服务ID作为容器ID "status": "running", "api_url": api_url, "error": None } except Exception as e: return { "success": False, "error": str(e), "service_id": service_id, "container_id": None, "status": "error", "api_url": None } def start_service(self, service_id: str, container_id: str) -> Dict[str, Any]: """启动服务 Args: service_id: 服务ID container_id: 容器ID Returns: 启动结果 """ try: if self.deployment_mode == "docker": if not self.client: return { "success": False, "error": "Docker连接失败", "service_id": service_id, "status": "error" } # 获取容器 container = self.client.containers.get(container_id) # 启动容器 container.start() # 验证服务启动 if not self._verify_service_health(container_id): return { "success": False, "error": "服务健康检查失败", "service_id": service_id, "status": "error" } return { "success": True, "service_id": service_id, "status": "running", "error": None } else: # 本地进程启动 if service_id not in self.processes: # 服务不在进程列表中,可能是服务重启导致的 # 尝试从数据库重新加载服务信息 print(f"服务 {service_id} 不在进程列表中,尝试从数据库重新加载") service_info = self.reload_service_from_db(service_id) if not service_info: return { "success": False, "error": "服务不存在,请重新注册服务", "service_id": service_id, "status": "error" } process_info = self.processes[service_id] # 检查进程是否已经在运行 if process_info.get("pid"): try: process = psutil.Process(process_info["pid"]) if process.is_running(): return { "success": True, "service_id": service_id, "status": "running", "error": None } except: pass # 重新启动进程 service_dir = process_info["service_dir"] project_info = process_info["project_info"] service_config = process_info["service_config"] print(f"准备启动服务 {service_id}") print(f"服务目录: {service_dir}") print(f"服务配置: {service_config}") # 检查服务目录是否存在,如果不存在则从Gitea克隆 if not os.path.exists(service_dir): print(f"服务目录不存在: {service_dir},尝试从Gitea克隆代码") repository_id = service_config.get("repository_id") if not repository_id: return { "success": False, "error": "无法获取仓库ID,无法克隆代码", "service_id": service_id, "status": "error" } # 从数据库获取仓库信息 try: from app.models.database import SessionLocal from app.models.models import AlgorithmRepository db = SessionLocal() repository = db.query(AlgorithmRepository).filter(AlgorithmRepository.id == repository_id).first() db.close() if not repository: return { "success": False, "error": f"仓库不存在: {repository_id}", "service_id": service_id, "status": "error" } # 克隆仓库 from app.gitea.service import GiteaService gitea_service = GiteaService() clone_success = gitea_service.clone_repository( repository.repo_url, service_id, repository.branch or "main" ) if not clone_success: return { "success": False, "error": f"克隆仓库失败: {repository.repo_url}", "service_id": service_id, "status": "error" } print(f"成功从Gitea克隆仓库到: {service_dir}") except Exception as e: print(f"克隆仓库时出错: {str(e)}") return { "success": False, "error": f"克隆仓库时出错: {str(e)}", "service_id": service_id, "status": "error" } # 启动服务进程 print(f"开始启动服务进程...") new_process_info = self._start_local_service_process(service_id, service_dir, project_info, service_config) print(f"服务进程启动完成: {new_process_info}") # 验证服务启动 print(f"开始验证服务启动...") if not self._verify_local_service_startup(service_id, service_config): print(f"服务启动验证失败") return { "success": False, "error": "服务启动验证失败", "service_id": service_id, "status": "error" } print(f"服务启动成功!") return { "success": True, "service_id": service_id, "status": "running", "error": None } except NotFound: return { "success": False, "error": "容器不存在", "service_id": service_id, "status": "error" } except Exception as e: return { "success": False, "error": str(e), "service_id": service_id, "status": "error" } def stop_service(self, service_id: str, container_id: str) -> Dict[str, Any]: """停止服务 Args: service_id: 服务ID container_id: 容器ID Returns: 停止结果 """ try: if self.deployment_mode == "docker": if not self.client: return { "success": False, "error": "Docker连接失败", "service_id": service_id, "status": "error" } # 获取容器 container = self.client.containers.get(container_id) # 停止容器 container.stop(timeout=30) return { "success": True, "service_id": service_id, "status": "stopped", "error": None } else: # 本地进程停止 if service_id not in self.processes: # 服务不在进程列表中,可能是服务重启导致的 # 尝试通过端口查找并停止进程 print(f"服务 {service_id} 不在进程列表中,尝试通过端口查找进程") # 从服务配置中获取端口信息 # 这里需要从外部传入服务配置,或者从数据库查询 # 暂时返回成功,因为服务可能已经停止了 return { "success": True, "service_id": service_id, "status": "stopped", "error": None } process_info = self.processes[service_id] # 停止进程 if process_info.get("pid"): try: process = psutil.Process(process_info["pid"]) if process.is_running(): process.terminate() process.wait(timeout=30) except: pass # 更新进程状态 self.processes[service_id]["pid"] = None return { "success": True, "service_id": service_id, "status": "stopped", "error": None } except NotFound: return { "success": False, "error": "容器不存在", "service_id": service_id, "status": "error" } except Exception as e: return { "success": False, "error": str(e), "service_id": service_id, "status": "error" } def restart_service(self, service_id: str, container_id: str) -> Dict[str, Any]: """重启服务 Args: service_id: 服务ID container_id: 容器ID Returns: 重启结果 """ try: if self.deployment_mode == "docker": if not self.client: return { "success": False, "error": "Docker连接失败", "service_id": service_id, "status": "error" } # 获取容器 container = self.client.containers.get(container_id) # 重启容器 container.restart(timeout=30) # 验证服务启动 if not self._verify_service_health(container_id): return { "success": False, "error": "服务健康检查失败", "service_id": service_id, "status": "error" } return { "success": True, "service_id": service_id, "status": "running", "error": None } else: # 本地进程重启 if service_id not in self.processes: return { "success": False, "error": "服务不存在", "service_id": service_id, "status": "error" } process_info = self.processes[service_id] # 停止当前进程 if process_info.get("pid"): try: process = psutil.Process(process_info["pid"]) if process.is_running(): process.terminate() process.wait(timeout=30) except: pass # 重新启动进程 service_dir = process_info["service_dir"] project_info = process_info["project_info"] service_config = process_info["service_config"] # 启动服务进程 new_process_info = self._start_local_service_process(service_id, service_dir, project_info, service_config) # 验证服务启动 if not self._verify_local_service_startup(service_id, service_config): return { "success": False, "error": "服务启动验证失败", "service_id": service_id, "status": "error" } return { "success": True, "service_id": service_id, "status": "running", "error": None } except NotFound: return { "success": False, "error": "容器不存在", "service_id": service_id, "status": "error" } except Exception as e: return { "success": False, "error": str(e), "service_id": service_id, "status": "error" } def delete_service(self, service_id: str, container_id: str, image_name: str) -> Dict[str, Any]: """删除服务 Args: service_id: 服务ID container_id: 容器ID image_name: 镜像名称 Returns: 删除结果 """ try: if self.deployment_mode == "docker": if not self.client: return { "success": False, "error": "Docker连接失败", "service_id": service_id } # 停止并删除容器 if container_id: try: container = self.client.containers.get(container_id) container.stop(timeout=10) container.remove(force=True) except NotFound: pass # 删除镜像 if image_name: try: self.client.images.remove(image_name, force=True) except: pass return { "success": True, "service_id": service_id, "error": None } else: # 本地进程删除 if service_id not in self.processes: return { "success": False, "error": "服务不存在", "service_id": service_id } process_info = self.processes[service_id] # 停止进程 if process_info.get("pid"): try: process = psutil.Process(process_info["pid"]) if process.is_running(): process.terminate() process.wait(timeout=30) except: pass # 删除服务目录 service_dir = process_info["service_dir"] try: import shutil shutil.rmtree(service_dir) except: pass # 从进程列表中删除 del self.processes[service_id] return { "success": True, "service_id": service_id, "error": None } except Exception as e: return { "success": False, "error": str(e), "service_id": service_id } def get_service_status(self, container_id: str) -> Dict[str, Any]: """获取服务状态 Args: container_id: 容器ID Returns: 服务状态 """ try: if self.deployment_mode == "docker": if not self.client: return { "success": False, "error": "Docker连接失败", "status": "unknown", "health": "unknown" } # 获取容器 container = self.client.containers.get(container_id) # 获取容器状态 status = container.status # 检查服务健康状态 health = "unknown" if status == "running": if self._verify_service_health(container_id): health = "healthy" else: health = "unhealthy" return { "success": True, "status": status, "health": health, "error": None } else: # 本地进程状态查询 # 假设container_id就是service_id service_id = container_id if service_id not in self.processes: return { "success": False, "error": "服务不存在", "status": "not_found", "health": "unknown" } process_info = self.processes[service_id] # 检查进程状态 status = "unknown" health = "unknown" if process_info.get("pid"): try: process = psutil.Process(process_info["pid"]) if process.is_running(): status = "running" # 检查服务健康状态 service_config = process_info["service_config"] if self._verify_local_service_health(service_id, service_config): health = "healthy" else: health = "unhealthy" else: status = "stopped" except: status = "stopped" else: status = "stopped" return { "success": True, "status": status, "health": health, "error": None } except NotFound: return { "success": False, "error": "容器不存在", "status": "not_found", "health": "unknown" } except Exception as e: return { "success": False, "error": str(e), "status": "unknown", "health": "unknown" } def reload_service_from_db(self, service_id: str) -> Optional[Dict[str, Any]]: """从数据库重新加载服务信息 Args: service_id: 服务ID Returns: 服务信息字典,如果未找到则返回None """ if not self.db_session: print("数据库连接不可用,无法重新加载服务信息") return None try: # 从数据库查询服务信息 query = text(""" SELECT service_id, api_url, status, config, host, port FROM algorithm_services WHERE service_id = :service_id """) result = self.db_session.execute(query, {"service_id": service_id}).fetchone() if not result: print(f"数据库中未找到服务 {service_id}") return None # 构建服务信息字典 # config字段已经是JSON类型,不需要再解析 config_data = result[3] if result[3] else {} # 将host和port添加到config中 config_data["host"] = result[4] if result[4] else "localhost" config_data["port"] = result[5] if result[5] else 8000 service_info = { "service_id": result[0], "api_url": result[1], "status": result[2], "config": config_data } # 从config中提取容器ID container_id = service_info["config"].get("container_id") if container_id: service_info["container_id"] = container_id # 更新本地进程缓存(仅用于本地模式) if self.deployment_mode == "local": # 从config中获取项目类型,如果没有则默认为python project_type = service_info["config"].get("project_type", "python") self.processes[service_id] = { "service_dir": f"/tmp/algorithms/{service_id}", "project_info": { "project_type": project_type, "name": service_info["config"].get("name", ""), "version": service_info["config"].get("version", "1.0.0"), "description": service_info["config"].get("description", "") }, "service_config": service_info["config"], "pid": None # PID需要重新获取 } print(f"成功从数据库重新加载服务 {service_id} 的信息") return service_info except Exception as e: print(f"从数据库重新加载服务信息失败: {e}") return None def get_service_logs(self, container_id: str, lines: int = 100) -> Dict[str, Any]: """获取服务日志 Args: container_id: 容器ID lines: 日志行数 Returns: 服务日志 """ try: if self.deployment_mode == "docker": if not self.client: return { "success": False, "error": "Docker连接失败", "logs": [] } # 获取容器 container = self.client.containers.get(container_id) # 获取日志 logs = container.logs(tail=lines).decode('utf-8').split('\n') return { "success": True, "logs": logs, "error": None } else: # 本地进程日志获取 # 假设container_id就是service_id service_id = container_id if service_id not in self.processes: return { "success": False, "error": "服务不存在", "logs": [] } process_info = self.processes[service_id] # 获取日志文件路径 log_file = process_info.get("log_file") if not log_file or not os.path.exists(log_file): return { "success": True, "logs": [], "error": None } # 读取日志文件 try: with open(log_file, 'r') as f: logs = f.readlines() # 只返回最后lines行 logs = [line.rstrip('\n') for line in logs[-lines:]] except: logs = [] return { "success": True, "logs": logs, "error": None } except NotFound: return { "success": False, "error": "容器不存在", "logs": [] } except Exception as e: return { "success": False, "error": str(e), "logs": [] } def _build_docker_image(self, service_id: str, project_info: Dict[str, Any], service_config: Dict[str, Any]) -> str: """构建Docker镜像 Args: service_id: 服务ID project_info: 项目信息 service_config: 服务配置 Returns: 镜像名称 """ # 生成镜像名称 image_name = f"algorithm-service-{service_id}:{service_config.get('version', '1.0.0')}" # 构建上下文目录 build_context = os.path.join("/tmp", f"service-build-{service_id}") os.makedirs(build_context, exist_ok=True) try: # 创建Dockerfile dockerfile_content = self._generate_dockerfile(project_info, service_config) with open(os.path.join(build_context, "Dockerfile"), "w") as f: f.write(dockerfile_content) # 复制项目文件(这里简化处理,实际应该复制完整项目) # 注意:在实际实现中,应该从算法仓库复制项目文件到构建上下文 # 创建服务包装器 service_wrapper_content = self._generate_service_wrapper(project_info, service_config) wrapper_extension = ".py" if project_info["project_type"] == "python" else ".js" with open(os.path.join(build_context, f"service_wrapper{wrapper_extension}"), "w") as f: f.write(service_wrapper_content) # 创建依赖文件 self._create_dependency_file(build_context, project_info) # 构建镜像 print(f"开始构建Docker镜像: {image_name}") image, logs = self.client.images.build( path=build_context, tag=image_name, rm=True, pull=False ) # 打印构建日志 for log in logs: if 'stream' in log: print(log['stream'], end='') print(f"Docker镜像构建成功: {image_name}") return image_name finally: # 清理构建上下文 import shutil try: shutil.rmtree(build_context) except: pass def _start_service_container(self, service_id: str, image_name: str, service_config: Dict[str, Any]) -> str: """启动服务容器 Args: service_id: 服务ID image_name: 镜像名称 service_config: 服务配置 Returns: 容器ID """ # 容器名称 container_name = f"algorithm-service-{service_id}" # 端口映射 ports = { f"{service_config.get('port', 8000)}/tcp": service_config.get('port', 8000) } # 环境变量 environment = { "HOST": service_config.get('host', '0.0.0.0'), "PORT": str(service_config.get('port', 8000)), "TIMEOUT": str(service_config.get('timeout', 30)) } # 启动容器 container = self.client.containers.run( image_name, name=container_name, ports=ports, environment=environment, detach=True, restart_policy={"Name": "unless-stopped"} ) print(f"容器启动成功: {container.id}") return container.id def _verify_service_startup(self, container_id: str, service_config: Dict[str, Any]) -> bool: """验证服务启动 Args: container_id: 容器ID service_config: 服务配置 Returns: 是否启动成功 """ # 等待服务启动 time.sleep(5) # 验证服务健康状态 return self._verify_service_health(container_id) def _verify_service_health(self, container_id: str) -> bool: """验证服务健康状态 Args: container_id: 容器ID Returns: 是否健康 """ try: container = self.client.containers.get(container_id) # 检查容器状态 if container.status != "running": return False # 这里可以添加更详细的健康检查,例如发送HTTP请求到/health端点 # 简化处理,只检查容器状态 return True except: return False def _generate_dockerfile(self, project_info: Dict[str, Any], service_config: Dict[str, Any]) -> str: """生成Dockerfile Args: project_info: 项目信息 service_config: 服务配置 Returns: Dockerfile内容 """ dockerfile_templates = { "python": """ FROM python:3.9-slim WORKDIR /app # 复制依赖文件 COPY requirements.txt . # 安装依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制项目文件 COPY . . # 复制生成的服务包装器 COPY service_wrapper.py . # 设置环境变量 ENV HOST={{host}} ENV PORT={{port}} ENV TIMEOUT={{timeout}} # 暴露端口 EXPOSE {{port}} # 启动服务 CMD ["python", "service_wrapper.py"] """, "nodejs": """ FROM node:16-alpine WORKDIR /app # 复制依赖文件 COPY package.json package-lock.json* . # 安装依赖 RUN npm install --production # 复制项目文件 COPY . . # 复制生成的服务包装器 COPY service_wrapper.js . # 设置环境变量 ENV HOST={{host}} ENV PORT={{port}} ENV TIMEOUT={{timeout}} # 暴露端口 EXPOSE {{port}} # 启动服务 CMD ["node", "service_wrapper.js"] """ } template = dockerfile_templates.get(project_info["project_type"], dockerfile_templates["python"]) # 替换模板变量 template = template.replace("{{host}}", service_config.get("host", "0.0.0.0")) template = template.replace("{{port}}", str(service_config.get("port", 8000))) template = template.replace("{{timeout}}", str(service_config.get("timeout", 30))) return template def _generate_service_wrapper(self, project_info: Dict[str, Any], service_config: Dict[str, Any]) -> str: """生成服务包装器 Args: project_info: 项目信息 service_config: 服务配置 Returns: 服务包装器代码 """ if project_info["project_type"] == "python": return ''' # Python HTTP服务包装器 import os import sys import json import time from http.server import HTTPServer, BaseHTTPRequestHandler # 添加项目路径到Python路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) # 尝试导入算法模块 try: # 尝试导入主要模块 import algorithm algorithm_module = algorithm print("算法模块导入成功") except Exception as e: print(f"算法模块导入失败: {e}") algorithm_module = None # 服务配置 HOST = os.environ.get("HOST", "0.0.0.0") PORT = int(os.environ.get("PORT", "8000")) TIMEOUT = int(os.environ.get("TIMEOUT", "30")) class AlgorithmRequestHandler(BaseHTTPRequestHandler): """算法请求处理器""" def do_POST(self): """处理POST请求""" try: # 读取请求体 content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) # 解析请求数据 request_data = json.loads(post_data.decode('utf-8')) # 记录请求开始时间 start_time = time.time() # 调用算法 result = self._call_algorithm(request_data) # 计算响应时间 response_time = time.time() - start_time # 构建响应 response = { "success": True, "result": result, "response_time": round(response_time, 4), "message": "算法执行成功" } # 发送响应 self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(json.dumps(response).encode('utf-8')) except Exception as e: # 构建错误响应 error_response = { "success": False, "error": str(e), "message": "算法执行失败" } # 发送错误响应 self.send_response(400) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(json.dumps(error_response).encode('utf-8')) def do_GET(self): """处理GET请求""" if self.path == "/health": # 健康检查 self._handle_health_check() elif self.path == "/info": # 服务信息 self._handle_info() else: # 404响应 self.send_response(404) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(json.dumps({"error": "Not Found"}).encode('utf-8')) def _call_algorithm(self, request_data): """调用算法 Args: request_data: 请求数据 Returns: 算法执行结果 """ if algorithm_module is None: raise Exception("算法模块未加载") # 尝试调用算法的主要函数 try: # 检查是否有predict函数 if hasattr(algorithm_module, 'predict'): return algorithm_module.predict(request_data) # 检查是否有run函数 elif hasattr(algorithm_module, 'run'): return algorithm_module.run(request_data) # 检查是否有main函数 elif hasattr(algorithm_module, 'main'): return algorithm_module.main(request_data) else: raise Exception("未找到算法执行函数") except Exception as e: raise Exception(f"算法执行失败: {e}") def _handle_health_check(self): """处理健康检查""" self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(json.dumps({"status": "healthy", "service": "{{service_name}}"}).encode('utf-8')) def _handle_info(self): """处理服务信息请求""" info = { "service": "{{service_name}}", "version": "{{service_version}}", "host": HOST, "port": PORT, "timeout": TIMEOUT, "algorithm_loaded": algorithm_module is not None } self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(json.dumps(info).encode('utf-8')) def run_server(): """启动服务""" server = HTTPServer((HOST, PORT), AlgorithmRequestHandler) print(f"服务启动成功,监听地址: {HOST}:{PORT}") print(f"健康检查地址: http://{HOST}:{PORT}/health") print(f"服务信息地址: http://{HOST}:{PORT}/info") try: server.serve_forever() except KeyboardInterrupt: print("服务停止") server.shutdown() if __name__ == "__main__": run_server() '''.replace("{{service_name}}", service_config.get("name", "algorithm-service")).replace("{{service_version}}", service_config.get("version", "1.0.0")) else: return ''' // Node.js HTTP服务包装器 const http = require('http'); const url = require('url'); const fs = require('fs'); const path = require('path'); // 服务配置 const HOST = process.env.HOST || '0.0.0.0'; const PORT = process.env.PORT || 8000; const TIMEOUT = process.env.TIMEOUT || 30; // 尝试导入算法模块 let algorithmModule = null; try { // 尝试导入主要模块 algorithmModule = require('./algorithm'); console.log('算法模块导入成功'); } catch (e) { console.error('算法模块导入失败:', e); algorithmModule = null; } /** * 调用算法 * @param {Object} requestData 请求数据 * @returns {Object} 算法执行结果 */ function callAlgorithm(requestData) { if (!algorithmModule) { throw new Error('算法模块未加载'); } // 尝试调用算法的主要函数 try { if (algorithmModule.predict) { return algorithmModule.predict(requestData); } else if (algorithmModule.run) { return algorithmModule.run(requestData); } else if (algorithmModule.main) { return algorithmModule.main(requestData); } else { throw new Error('未找到算法执行函数'); } } catch (e) { throw new Error(`算法执行失败: ${e.message}`); } } /** * 处理请求 * @param {http.IncomingMessage} req 请求对象 * @param {http.ServerResponse} res 响应对象 */ function handleRequest(req, res) { const parsedUrl = url.parse(req.url, true); const pathname = parsedUrl.pathname; // 设置响应头 res.setHeader('Content-Type', 'application/json'); if (req.method === 'GET') { if (pathname === '/health') { // 健康检查 res.writeHead(200); res.end(JSON.stringify({ status: 'healthy', service: '{{service_name}}' })); } else if (pathname === '/info') { // 服务信息 const info = { service: '{{service_name}}', version: '{{service_version}}', host: HOST, port: PORT, timeout: TIMEOUT, algorithm_loaded: algorithmModule !== null }; res.writeHead(200); res.end(JSON.stringify(info)); } else { // 404响应 res.writeHead(404); res.end(JSON.stringify({ error: 'Not Found' })); } } else if (req.method === 'POST') { let body = ''; // 读取请求体 req.on('data', chunk => { body += chunk.toString(); }); req.on('end', () => { try { // 解析请求数据 const requestData = JSON.parse(body); // 记录请求开始时间 const startTime = Date.now(); // 调用算法 const result = callAlgorithm(requestData); // 计算响应时间 const responseTime = (Date.now() - startTime) / 1000; // 构建响应 const response = { success: true, result: result, response_time: responseTime.toFixed(4), message: '算法执行成功' }; // 发送响应 res.writeHead(200); res.end(JSON.stringify(response)); } catch (e) { // 构建错误响应 const errorResponse = { success: false, error: e.message, message: '算法执行失败' }; // 发送错误响应 res.writeHead(400); res.end(JSON.stringify(errorResponse)); } }); } else { // 不支持的方法 res.writeHead(405); res.end(JSON.stringify({ error: 'Method Not Allowed' })); } } /** * 启动服务 */ function startServer() { const server = http.createServer(handleRequest); server.listen(PORT, HOST, () => { console.log(`服务启动成功,监听地址: ${HOST}:${PORT}`); console.log(`健康检查地址: http://${HOST}:${PORT}/health`); console.log(`服务信息地址: http://${HOST}:${PORT}/info`); }); server.on('error', (error) => { console.error('服务启动失败:', error); }); } // 启动服务 startServer(); '''.replace("{{service_name}}", service_config.get("name", "algorithm-service")).replace("{{service_version}}", service_config.get("version", "1.0.0")) def _create_dependency_file(self, build_context: str, project_info: Dict[str, Any]): """创建依赖文件 Args: build_context: 构建上下文目录 project_info: 项目信息 """ if project_info["project_type"] == "python": # 创建requirements.txt with open(os.path.join(build_context, "requirements.txt"), "w") as f: f.write(""" # 基础依赖 http.server json # 算法依赖 # 注意:在实际实现中,应该从项目的requirements.txt复制依赖 """) elif project_info["project_type"] == "nodejs": # 创建package.json package_data = { "name": "algorithm-service", "version": "1.0.0", "description": "Algorithm service wrapper", "main": "service_wrapper.js", "scripts": { "start": "node service_wrapper.js" }, "dependencies": { # 基础依赖 } } with open(os.path.join(build_context, "package.json"), "w") as f: json.dump(package_data, f, indent=2) def _create_service_directory(self, service_id: str) -> str: """创建服务目录 Args: service_id: 服务ID Returns: 服务目录路径 """ service_dir = os.path.join("/tmp", f"algorithm-service-{service_id}") os.makedirs(service_dir, exist_ok=True) return service_dir def _generate_local_service_wrapper(self, service_dir: str, project_info: Dict[str, Any], service_config: Dict[str, Any], repo_path: str = None): """生成本地服务包装器 Args: service_dir: 服务目录 project_info: 项目信息 service_config: 服务配置 repo_path: 仓库路径(用于复制真实的算法文件) """ # 生成服务包装器 service_wrapper_content = self._generate_service_wrapper(project_info, service_config) wrapper_extension = ".py" if project_info["project_type"] == "python" else ".js" with open(os.path.join(service_dir, f"service_wrapper{wrapper_extension}"), "w") as f: f.write(service_wrapper_content) # 复制真实的算法文件 if repo_path and project_info["project_type"] == "python": # 尝试找到并复制主要的算法文件 entry_point = project_info.get("entry_point") if entry_point: source_file = os.path.join(repo_path, entry_point) if os.path.exists(source_file): # 复制算法文件到服务目录 import shutil shutil.copy2(source_file, os.path.join(service_dir, "algorithm.py")) print(f"已复制算法文件: {source_file} -> {os.path.join(service_dir, 'algorithm.py')}") return # 如果没有找到入口点,尝试复制所有Python文件 if os.path.exists(repo_path): import shutil for root, dirs, files in os.walk(repo_path): for file in files: if file.endswith(".py") and not file.startswith("_"): source_file = os.path.join(root, file) dest_file = os.path.join(service_dir, file) shutil.copy2(source_file, dest_file) print(f"已复制Python文件: {source_file} -> {dest_file}") # 如果有algorithm.py,就使用它,否则创建一个模拟的 if not os.path.exists(os.path.join(service_dir, "algorithm.py")): print("未找到algorithm.py,创建模拟算法文件") self._create_mock_algorithm(service_dir) else: # 创建模拟的算法文件 self._create_mock_algorithm(service_dir) def _create_mock_algorithm(self, service_dir: str): """创建模拟的算法文件 Args: service_dir: 服务目录 """ algorithm_content = """ def predict(data): return {"result": "Prediction result", "input": data} def run(data): return {"result": "Run result", "input": data} def main(data): return {"result": "Main result", "input": data} """ with open(os.path.join(service_dir, "algorithm.py"), "w") as f: f.write(algorithm_content) def _start_local_service_process(self, service_id: str, service_dir: str, project_info: Dict[str, Any], service_config: Dict[str, Any]) -> Dict[str, Any]: """启动本地服务进程 Args: service_id: 服务ID service_dir: 服务目录 project_info: 项目信息 service_config: 服务配置 Returns: 进程信息 """ # 创建日志文件 log_file = os.path.join(service_dir, f"service_{service_id}.log") # 构建启动命令 if project_info["project_type"] == "python": cmd = ["python", "service_wrapper.py"] else: cmd = ["node", "service_wrapper.js"] # 设置环境变量 env = os.environ.copy() env["HOST"] = service_config.get("host", "0.0.0.0") env["PORT"] = str(service_config.get("port", 8000)) env["TIMEOUT"] = str(service_config.get("timeout", 30)) # 启动进程 process = subprocess.Popen( cmd, cwd=service_dir, env=env, stdout=open(log_file, "a"), stderr=subprocess.STDOUT, start_new_session=True ) # 保存进程信息 process_info = { "pid": process.pid, "service_dir": service_dir, "log_file": log_file, "project_info": project_info, "service_config": service_config } self.processes[service_id] = process_info return process_info def _verify_local_service_startup(self, service_id: str, service_config: Dict[str, Any]) -> bool: """验证本地服务启动 Args: service_id: 服务ID service_config: 服务配置 Returns: 是否启动成功 """ # 等待服务启动 time.sleep(5) # 验证服务健康状态 return self._verify_local_service_health(service_id, service_config) def _verify_local_service_health(self, service_id: str, service_config: Dict[str, Any]) -> bool: """验证本地服务健康状态 Args: service_id: 服务ID service_config: 服务配置 Returns: 是否健康 """ try: import requests # 构建健康检查URL # 使用localhost而不是0.0.0.0,因为健康检查是在本地执行的 host = service_config.get("host", "localhost") if host == "0.0.0.0": host = "localhost" port = service_config.get("port", 8000) health_check_url = f"http://{host}:{port}/health" # 发送健康检查请求 response = requests.get(health_check_url, timeout=10) return response.status_code == 200 except Exception as e: print(f"健康检查失败: {e}") return False