Files
algorithm/backend/app/services/service_orchestrator.py
2026-02-18 09:36:18 +08:00

1607 lines
55 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""服务编排服务,用于管理算法服务的生命周期"""
import os
import json
import time
import docker
import uuid
import subprocess
import signal
import psutil
from typing import Dict, Any, Optional
from docker.errors import DockerException, NotFound
# 数据库相关导入
try:
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
DATABASE_AVAILABLE = True
except ImportError:
DATABASE_AVAILABLE = False
class ServiceOrchestrator:
"""服务编排服务"""
def __init__(self, deployment_mode="local", db_url=None):
"""初始化服务编排器
Args:
deployment_mode: 部署模式,支持"docker""local"
db_url: 数据库连接URL用于重新加载服务信息
"""
self.deployment_mode = deployment_mode
self.processes = {} # 存储本地进程信息
self.db_url = db_url
self.db_engine = None
self.db_session = None
# 初始化数据库连接
if db_url and DATABASE_AVAILABLE:
try:
self.db_engine = create_engine(db_url)
self.db_session = sessionmaker(bind=self.db_engine)()
print("数据库连接成功")
except Exception as e:
print(f"数据库连接失败: {e}")
self.db_engine = None
self.db_session = None
if deployment_mode == "docker":
try:
# 连接Docker客户端
self.client = docker.from_env()
# 测试连接
self.client.ping()
print("Docker连接成功")
except DockerException as e:
print(f"Docker连接失败: {e}")
self.client = None
else:
self.client = None
print("使用本地进程部署模式")
def deploy_service(self, service_id: str, service_config: Dict[str, Any], project_info: Dict[str, Any], repo_path: str = None) -> Dict[str, Any]:
"""部署服务
Args:
service_id: 服务ID
service_config: 服务配置
project_info: 项目信息
repo_path: 仓库路径(用于复制真实的算法文件)
Returns:
部署结果
"""
try:
if self.deployment_mode == "docker":
if not self.client:
return {
"success": False,
"error": "Docker连接失败",
"service_id": service_id,
"container_id": None,
"status": "error",
"api_url": None
}
# 1. 构建Docker镜像
image_name = self._build_docker_image(service_id, project_info, service_config)
# 2. 启动服务容器
container_id = self._start_service_container(service_id, image_name, service_config)
# 3. 验证服务启动
if not self._verify_service_startup(container_id, service_config):
return {
"success": False,
"error": "服务启动验证失败",
"service_id": service_id,
"container_id": container_id,
"status": "error",
"api_url": None
}
# 4. 构建API URL
api_url = f"http://{service_config.get('host', 'localhost')}:{service_config.get('port', 8000)}"
return {
"success": True,
"service_id": service_id,
"container_id": container_id,
"status": "running",
"api_url": api_url,
"error": None
}
else:
# 本地进程部署
# 1. 创建服务目录
service_dir = self._create_service_directory(service_id)
# 2. 生成服务包装器
self._generate_local_service_wrapper(service_dir, project_info, service_config, repo_path)
# 3. 启动服务进程
process_info = self._start_local_service_process(service_id, service_dir, project_info, service_config)
# 4. 验证服务启动
if not self._verify_local_service_startup(service_id, service_config):
return {
"success": False,
"error": "服务启动验证失败",
"service_id": service_id,
"container_id": None,
"status": "error",
"api_url": None
}
# 5. 构建API URL
api_url = f"http://{service_config.get('host', 'localhost')}:{service_config.get('port', 8000)}"
return {
"success": True,
"service_id": service_id,
"container_id": service_id, # 使用服务ID作为容器ID
"status": "running",
"api_url": api_url,
"error": None
}
except Exception as e:
return {
"success": False,
"error": str(e),
"service_id": service_id,
"container_id": None,
"status": "error",
"api_url": None
}
def start_service(self, service_id: str, container_id: str) -> Dict[str, Any]:
"""启动服务
Args:
service_id: 服务ID
container_id: 容器ID
Returns:
启动结果
"""
try:
if self.deployment_mode == "docker":
if not self.client:
return {
"success": False,
"error": "Docker连接失败",
"service_id": service_id,
"status": "error"
}
# 获取容器
container = self.client.containers.get(container_id)
# 启动容器
container.start()
# 验证服务启动
if not self._verify_service_health(container_id):
return {
"success": False,
"error": "服务健康检查失败",
"service_id": service_id,
"status": "error"
}
return {
"success": True,
"service_id": service_id,
"status": "running",
"error": None
}
else:
# 本地进程启动
if service_id not in self.processes:
# 服务不在进程列表中,可能是服务重启导致的
# 尝试从数据库重新加载服务信息
print(f"服务 {service_id} 不在进程列表中,尝试从数据库重新加载")
service_info = self.reload_service_from_db(service_id)
if not service_info:
return {
"success": False,
"error": "服务不存在,请重新注册服务",
"service_id": service_id,
"status": "error"
}
process_info = self.processes[service_id]
# 检查进程是否已经在运行
if process_info.get("pid"):
try:
process = psutil.Process(process_info["pid"])
if process.is_running():
return {
"success": True,
"service_id": service_id,
"status": "running",
"error": None
}
except:
pass
# 重新启动进程
service_dir = process_info["service_dir"]
project_info = process_info["project_info"]
service_config = process_info["service_config"]
print(f"准备启动服务 {service_id}")
print(f"服务目录: {service_dir}")
print(f"服务配置: {service_config}")
# 检查服务目录是否存在如果不存在则从Gitea克隆
if not os.path.exists(service_dir):
print(f"服务目录不存在: {service_dir}尝试从Gitea克隆代码")
repository_id = service_config.get("repository_id")
if not repository_id:
return {
"success": False,
"error": "无法获取仓库ID无法克隆代码",
"service_id": service_id,
"status": "error"
}
# 从数据库获取仓库信息
try:
from app.models.database import SessionLocal
from app.models.models import AlgorithmRepository
db = SessionLocal()
repository = db.query(AlgorithmRepository).filter(AlgorithmRepository.id == repository_id).first()
db.close()
if not repository:
return {
"success": False,
"error": f"仓库不存在: {repository_id}",
"service_id": service_id,
"status": "error"
}
# 克隆仓库
from app.gitea.service import GiteaService
gitea_service = GiteaService()
clone_success = gitea_service.clone_repository(
repository.repo_url,
service_id,
repository.branch or "main"
)
if not clone_success:
return {
"success": False,
"error": f"克隆仓库失败: {repository.repo_url}",
"service_id": service_id,
"status": "error"
}
print(f"成功从Gitea克隆仓库到: {service_dir}")
except Exception as e:
print(f"克隆仓库时出错: {str(e)}")
return {
"success": False,
"error": f"克隆仓库时出错: {str(e)}",
"service_id": service_id,
"status": "error"
}
# 启动服务进程
print(f"开始启动服务进程...")
new_process_info = self._start_local_service_process(service_id, service_dir, project_info, service_config)
print(f"服务进程启动完成: {new_process_info}")
# 验证服务启动
print(f"开始验证服务启动...")
if not self._verify_local_service_startup(service_id, service_config):
print(f"服务启动验证失败")
return {
"success": False,
"error": "服务启动验证失败",
"service_id": service_id,
"status": "error"
}
print(f"服务启动成功!")
return {
"success": True,
"service_id": service_id,
"status": "running",
"error": None
}
except NotFound:
return {
"success": False,
"error": "容器不存在",
"service_id": service_id,
"status": "error"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"service_id": service_id,
"status": "error"
}
def stop_service(self, service_id: str, container_id: str) -> Dict[str, Any]:
"""停止服务
Args:
service_id: 服务ID
container_id: 容器ID
Returns:
停止结果
"""
try:
if self.deployment_mode == "docker":
if not self.client:
return {
"success": False,
"error": "Docker连接失败",
"service_id": service_id,
"status": "error"
}
# 获取容器
container = self.client.containers.get(container_id)
# 停止容器
container.stop(timeout=30)
return {
"success": True,
"service_id": service_id,
"status": "stopped",
"error": None
}
else:
# 本地进程停止
if service_id not in self.processes:
# 服务不在进程列表中,可能是服务重启导致的
# 尝试通过端口查找并停止进程
print(f"服务 {service_id} 不在进程列表中,尝试通过端口查找进程")
# 从服务配置中获取端口信息
# 这里需要从外部传入服务配置,或者从数据库查询
# 暂时返回成功,因为服务可能已经停止了
return {
"success": True,
"service_id": service_id,
"status": "stopped",
"error": None
}
process_info = self.processes[service_id]
# 停止进程
if process_info.get("pid"):
try:
process = psutil.Process(process_info["pid"])
if process.is_running():
process.terminate()
process.wait(timeout=30)
except:
pass
# 更新进程状态
self.processes[service_id]["pid"] = None
return {
"success": True,
"service_id": service_id,
"status": "stopped",
"error": None
}
except NotFound:
return {
"success": False,
"error": "容器不存在",
"service_id": service_id,
"status": "error"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"service_id": service_id,
"status": "error"
}
def restart_service(self, service_id: str, container_id: str) -> Dict[str, Any]:
"""重启服务
Args:
service_id: 服务ID
container_id: 容器ID
Returns:
重启结果
"""
try:
if self.deployment_mode == "docker":
if not self.client:
return {
"success": False,
"error": "Docker连接失败",
"service_id": service_id,
"status": "error"
}
# 获取容器
container = self.client.containers.get(container_id)
# 重启容器
container.restart(timeout=30)
# 验证服务启动
if not self._verify_service_health(container_id):
return {
"success": False,
"error": "服务健康检查失败",
"service_id": service_id,
"status": "error"
}
return {
"success": True,
"service_id": service_id,
"status": "running",
"error": None
}
else:
# 本地进程重启
if service_id not in self.processes:
return {
"success": False,
"error": "服务不存在",
"service_id": service_id,
"status": "error"
}
process_info = self.processes[service_id]
# 停止当前进程
if process_info.get("pid"):
try:
process = psutil.Process(process_info["pid"])
if process.is_running():
process.terminate()
process.wait(timeout=30)
except:
pass
# 重新启动进程
service_dir = process_info["service_dir"]
project_info = process_info["project_info"]
service_config = process_info["service_config"]
# 启动服务进程
new_process_info = self._start_local_service_process(service_id, service_dir, project_info, service_config)
# 验证服务启动
if not self._verify_local_service_startup(service_id, service_config):
return {
"success": False,
"error": "服务启动验证失败",
"service_id": service_id,
"status": "error"
}
return {
"success": True,
"service_id": service_id,
"status": "running",
"error": None
}
except NotFound:
return {
"success": False,
"error": "容器不存在",
"service_id": service_id,
"status": "error"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"service_id": service_id,
"status": "error"
}
def delete_service(self, service_id: str, container_id: str, image_name: str) -> Dict[str, Any]:
"""删除服务
Args:
service_id: 服务ID
container_id: 容器ID
image_name: 镜像名称
Returns:
删除结果
"""
try:
if self.deployment_mode == "docker":
if not self.client:
return {
"success": False,
"error": "Docker连接失败",
"service_id": service_id
}
# 停止并删除容器
if container_id:
try:
container = self.client.containers.get(container_id)
container.stop(timeout=10)
container.remove(force=True)
except NotFound:
pass
# 删除镜像
if image_name:
try:
self.client.images.remove(image_name, force=True)
except:
pass
return {
"success": True,
"service_id": service_id,
"error": None
}
else:
# 本地进程删除
if service_id not in self.processes:
return {
"success": False,
"error": "服务不存在",
"service_id": service_id
}
process_info = self.processes[service_id]
# 停止进程
if process_info.get("pid"):
try:
process = psutil.Process(process_info["pid"])
if process.is_running():
process.terminate()
process.wait(timeout=30)
except:
pass
# 删除服务目录
service_dir = process_info["service_dir"]
try:
import shutil
shutil.rmtree(service_dir)
except:
pass
# 从进程列表中删除
del self.processes[service_id]
return {
"success": True,
"service_id": service_id,
"error": None
}
except Exception as e:
return {
"success": False,
"error": str(e),
"service_id": service_id
}
def get_service_status(self, container_id: str) -> Dict[str, Any]:
"""获取服务状态
Args:
container_id: 容器ID
Returns:
服务状态
"""
try:
if self.deployment_mode == "docker":
if not self.client:
return {
"success": False,
"error": "Docker连接失败",
"status": "unknown",
"health": "unknown"
}
# 获取容器
container = self.client.containers.get(container_id)
# 获取容器状态
status = container.status
# 检查服务健康状态
health = "unknown"
if status == "running":
if self._verify_service_health(container_id):
health = "healthy"
else:
health = "unhealthy"
return {
"success": True,
"status": status,
"health": health,
"error": None
}
else:
# 本地进程状态查询
# 假设container_id就是service_id
service_id = container_id
if service_id not in self.processes:
return {
"success": False,
"error": "服务不存在",
"status": "not_found",
"health": "unknown"
}
process_info = self.processes[service_id]
# 检查进程状态
status = "unknown"
health = "unknown"
if process_info.get("pid"):
try:
process = psutil.Process(process_info["pid"])
if process.is_running():
status = "running"
# 检查服务健康状态
service_config = process_info["service_config"]
if self._verify_local_service_health(service_id, service_config):
health = "healthy"
else:
health = "unhealthy"
else:
status = "stopped"
except:
status = "stopped"
else:
status = "stopped"
return {
"success": True,
"status": status,
"health": health,
"error": None
}
except NotFound:
return {
"success": False,
"error": "容器不存在",
"status": "not_found",
"health": "unknown"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"status": "unknown",
"health": "unknown"
}
def reload_service_from_db(self, service_id: str) -> Optional[Dict[str, Any]]:
"""从数据库重新加载服务信息
Args:
service_id: 服务ID
Returns:
服务信息字典如果未找到则返回None
"""
if not self.db_session:
print("数据库连接不可用,无法重新加载服务信息")
return None
try:
# 从数据库查询服务信息
query = text("""
SELECT service_id, api_url, status, config, host, port
FROM algorithm_services
WHERE service_id = :service_id
""")
result = self.db_session.execute(query, {"service_id": service_id}).fetchone()
if not result:
print(f"数据库中未找到服务 {service_id}")
return None
# 构建服务信息字典
# config字段已经是JSON类型不需要再解析
config_data = result[3] if result[3] else {}
# 将host和port添加到config中
config_data["host"] = result[4] if result[4] else "localhost"
config_data["port"] = result[5] if result[5] else 8000
service_info = {
"service_id": result[0],
"api_url": result[1],
"status": result[2],
"config": config_data
}
# 从config中提取容器ID
container_id = service_info["config"].get("container_id")
if container_id:
service_info["container_id"] = container_id
# 更新本地进程缓存(仅用于本地模式)
if self.deployment_mode == "local":
# 从config中获取项目类型如果没有则默认为python
project_type = service_info["config"].get("project_type", "python")
self.processes[service_id] = {
"service_dir": f"/tmp/algorithms/{service_id}",
"project_info": {
"project_type": project_type,
"name": service_info["config"].get("name", ""),
"version": service_info["config"].get("version", "1.0.0"),
"description": service_info["config"].get("description", "")
},
"service_config": service_info["config"],
"pid": None # PID需要重新获取
}
print(f"成功从数据库重新加载服务 {service_id} 的信息")
return service_info
except Exception as e:
print(f"从数据库重新加载服务信息失败: {e}")
return None
def get_service_logs(self, container_id: str, lines: int = 100) -> Dict[str, Any]:
"""获取服务日志
Args:
container_id: 容器ID
lines: 日志行数
Returns:
服务日志
"""
try:
if self.deployment_mode == "docker":
if not self.client:
return {
"success": False,
"error": "Docker连接失败",
"logs": []
}
# 获取容器
container = self.client.containers.get(container_id)
# 获取日志
logs = container.logs(tail=lines).decode('utf-8').split('\n')
return {
"success": True,
"logs": logs,
"error": None
}
else:
# 本地进程日志获取
# 假设container_id就是service_id
service_id = container_id
if service_id not in self.processes:
return {
"success": False,
"error": "服务不存在",
"logs": []
}
process_info = self.processes[service_id]
# 获取日志文件路径
log_file = process_info.get("log_file")
if not log_file or not os.path.exists(log_file):
return {
"success": True,
"logs": [],
"error": None
}
# 读取日志文件
try:
with open(log_file, 'r') as f:
logs = f.readlines()
# 只返回最后lines行
logs = [line.rstrip('\n') for line in logs[-lines:]]
except:
logs = []
return {
"success": True,
"logs": logs,
"error": None
}
except NotFound:
return {
"success": False,
"error": "容器不存在",
"logs": []
}
except Exception as e:
return {
"success": False,
"error": str(e),
"logs": []
}
def _build_docker_image(self, service_id: str, project_info: Dict[str, Any], service_config: Dict[str, Any]) -> str:
"""构建Docker镜像
Args:
service_id: 服务ID
project_info: 项目信息
service_config: 服务配置
Returns:
镜像名称
"""
# 生成镜像名称
image_name = f"algorithm-service-{service_id}:{service_config.get('version', '1.0.0')}"
# 构建上下文目录
build_context = os.path.join("/tmp", f"service-build-{service_id}")
os.makedirs(build_context, exist_ok=True)
try:
# 创建Dockerfile
dockerfile_content = self._generate_dockerfile(project_info, service_config)
with open(os.path.join(build_context, "Dockerfile"), "w") as f:
f.write(dockerfile_content)
# 复制项目文件(这里简化处理,实际应该复制完整项目)
# 注意:在实际实现中,应该从算法仓库复制项目文件到构建上下文
# 创建服务包装器
service_wrapper_content = self._generate_service_wrapper(project_info, service_config)
wrapper_extension = ".py" if project_info["project_type"] == "python" else ".js"
with open(os.path.join(build_context, f"service_wrapper{wrapper_extension}"), "w") as f:
f.write(service_wrapper_content)
# 创建依赖文件
self._create_dependency_file(build_context, project_info)
# 构建镜像
print(f"开始构建Docker镜像: {image_name}")
image, logs = self.client.images.build(
path=build_context,
tag=image_name,
rm=True,
pull=False
)
# 打印构建日志
for log in logs:
if 'stream' in log:
print(log['stream'], end='')
print(f"Docker镜像构建成功: {image_name}")
return image_name
finally:
# 清理构建上下文
import shutil
try:
shutil.rmtree(build_context)
except:
pass
def _start_service_container(self, service_id: str, image_name: str, service_config: Dict[str, Any]) -> str:
"""启动服务容器
Args:
service_id: 服务ID
image_name: 镜像名称
service_config: 服务配置
Returns:
容器ID
"""
# 容器名称
container_name = f"algorithm-service-{service_id}"
# 端口映射
ports = {
f"{service_config.get('port', 8000)}/tcp": service_config.get('port', 8000)
}
# 环境变量
environment = {
"HOST": service_config.get('host', '0.0.0.0'),
"PORT": str(service_config.get('port', 8000)),
"TIMEOUT": str(service_config.get('timeout', 30))
}
# 启动容器
container = self.client.containers.run(
image_name,
name=container_name,
ports=ports,
environment=environment,
detach=True,
restart_policy={"Name": "unless-stopped"}
)
print(f"容器启动成功: {container.id}")
return container.id
def _verify_service_startup(self, container_id: str, service_config: Dict[str, Any]) -> bool:
"""验证服务启动
Args:
container_id: 容器ID
service_config: 服务配置
Returns:
是否启动成功
"""
# 等待服务启动
time.sleep(5)
# 验证服务健康状态
return self._verify_service_health(container_id)
def _verify_service_health(self, container_id: str) -> bool:
"""验证服务健康状态
Args:
container_id: 容器ID
Returns:
是否健康
"""
try:
container = self.client.containers.get(container_id)
# 检查容器状态
if container.status != "running":
return False
# 这里可以添加更详细的健康检查例如发送HTTP请求到/health端点
# 简化处理,只检查容器状态
return True
except:
return False
def _generate_dockerfile(self, project_info: Dict[str, Any], service_config: Dict[str, Any]) -> str:
"""生成Dockerfile
Args:
project_info: 项目信息
service_config: 服务配置
Returns:
Dockerfile内容
"""
dockerfile_templates = {
"python": """
FROM python:3.9-slim
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制项目文件
COPY . .
# 复制生成的服务包装器
COPY service_wrapper.py .
# 设置环境变量
ENV HOST={{host}}
ENV PORT={{port}}
ENV TIMEOUT={{timeout}}
# 暴露端口
EXPOSE {{port}}
# 启动服务
CMD ["python", "service_wrapper.py"]
""",
"nodejs": """
FROM node:16-alpine
WORKDIR /app
# 复制依赖文件
COPY package.json package-lock.json* .
# 安装依赖
RUN npm install --production
# 复制项目文件
COPY . .
# 复制生成的服务包装器
COPY service_wrapper.js .
# 设置环境变量
ENV HOST={{host}}
ENV PORT={{port}}
ENV TIMEOUT={{timeout}}
# 暴露端口
EXPOSE {{port}}
# 启动服务
CMD ["node", "service_wrapper.js"]
"""
}
template = dockerfile_templates.get(project_info["project_type"], dockerfile_templates["python"])
# 替换模板变量
template = template.replace("{{host}}", service_config.get("host", "0.0.0.0"))
template = template.replace("{{port}}", str(service_config.get("port", 8000)))
template = template.replace("{{timeout}}", str(service_config.get("timeout", 30)))
return template
def _generate_service_wrapper(self, project_info: Dict[str, Any], service_config: Dict[str, Any]) -> str:
"""生成服务包装器
Args:
project_info: 项目信息
service_config: 服务配置
Returns:
服务包装器代码
"""
if project_info["project_type"] == "python":
return '''
# Python HTTP服务包装器
import os
import sys
import json
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
# 添加项目路径到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 尝试导入算法模块
try:
# 尝试导入主要模块
import algorithm
algorithm_module = algorithm
print("算法模块导入成功")
except Exception as e:
print(f"算法模块导入失败: {e}")
algorithm_module = None
# 服务配置
HOST = os.environ.get("HOST", "0.0.0.0")
PORT = int(os.environ.get("PORT", "8000"))
TIMEOUT = int(os.environ.get("TIMEOUT", "30"))
class AlgorithmRequestHandler(BaseHTTPRequestHandler):
"""算法请求处理器"""
def do_POST(self):
"""处理POST请求"""
try:
# 读取请求体
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
# 解析请求数据
request_data = json.loads(post_data.decode('utf-8'))
# 记录请求开始时间
start_time = time.time()
# 调用算法
result = self._call_algorithm(request_data)
# 计算响应时间
response_time = time.time() - start_time
# 构建响应
response = {
"success": True,
"result": result,
"response_time": round(response_time, 4),
"message": "算法执行成功"
}
# 发送响应
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(response).encode('utf-8'))
except Exception as e:
# 构建错误响应
error_response = {
"success": False,
"error": str(e),
"message": "算法执行失败"
}
# 发送错误响应
self.send_response(400)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(error_response).encode('utf-8'))
def do_GET(self):
"""处理GET请求"""
if self.path == "/health":
# 健康检查
self._handle_health_check()
elif self.path == "/info":
# 服务信息
self._handle_info()
else:
# 404响应
self.send_response(404)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"error": "Not Found"}).encode('utf-8'))
def _call_algorithm(self, request_data):
"""调用算法
Args:
request_data: 请求数据
Returns:
算法执行结果
"""
if algorithm_module is None:
raise Exception("算法模块未加载")
# 尝试调用算法的主要函数
try:
# 检查是否有predict函数
if hasattr(algorithm_module, 'predict'):
return algorithm_module.predict(request_data)
# 检查是否有run函数
elif hasattr(algorithm_module, 'run'):
return algorithm_module.run(request_data)
# 检查是否有main函数
elif hasattr(algorithm_module, 'main'):
return algorithm_module.main(request_data)
else:
raise Exception("未找到算法执行函数")
except Exception as e:
raise Exception(f"算法执行失败: {e}")
def _handle_health_check(self):
"""处理健康检查"""
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "healthy", "service": "{{service_name}}"}).encode('utf-8'))
def _handle_info(self):
"""处理服务信息请求"""
info = {
"service": "{{service_name}}",
"version": "{{service_version}}",
"host": HOST,
"port": PORT,
"timeout": TIMEOUT,
"algorithm_loaded": algorithm_module is not None
}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(info).encode('utf-8'))
def run_server():
"""启动服务"""
server = HTTPServer((HOST, PORT), AlgorithmRequestHandler)
print(f"服务启动成功,监听地址: {HOST}:{PORT}")
print(f"健康检查地址: http://{HOST}:{PORT}/health")
print(f"服务信息地址: http://{HOST}:{PORT}/info")
try:
server.serve_forever()
except KeyboardInterrupt:
print("服务停止")
server.shutdown()
if __name__ == "__main__":
run_server()
'''.replace("{{service_name}}", service_config.get("name", "algorithm-service")).replace("{{service_version}}", service_config.get("version", "1.0.0"))
else:
return '''
// Node.js HTTP服务包装器
const http = require('http');
const url = require('url');
const fs = require('fs');
const path = require('path');
// 服务配置
const HOST = process.env.HOST || '0.0.0.0';
const PORT = process.env.PORT || 8000;
const TIMEOUT = process.env.TIMEOUT || 30;
// 尝试导入算法模块
let algorithmModule = null;
try {
// 尝试导入主要模块
algorithmModule = require('./algorithm');
console.log('算法模块导入成功');
} catch (e) {
console.error('算法模块导入失败:', e);
algorithmModule = null;
}
/**
* 调用算法
* @param {Object} requestData 请求数据
* @returns {Object} 算法执行结果
*/
function callAlgorithm(requestData) {
if (!algorithmModule) {
throw new Error('算法模块未加载');
}
// 尝试调用算法的主要函数
try {
if (algorithmModule.predict) {
return algorithmModule.predict(requestData);
} else if (algorithmModule.run) {
return algorithmModule.run(requestData);
} else if (algorithmModule.main) {
return algorithmModule.main(requestData);
} else {
throw new Error('未找到算法执行函数');
}
} catch (e) {
throw new Error(`算法执行失败: ${e.message}`);
}
}
/**
* 处理请求
* @param {http.IncomingMessage} req 请求对象
* @param {http.ServerResponse} res 响应对象
*/
function handleRequest(req, res) {
const parsedUrl = url.parse(req.url, true);
const pathname = parsedUrl.pathname;
// 设置响应头
res.setHeader('Content-Type', 'application/json');
if (req.method === 'GET') {
if (pathname === '/health') {
// 健康检查
res.writeHead(200);
res.end(JSON.stringify({ status: 'healthy', service: '{{service_name}}' }));
} else if (pathname === '/info') {
// 服务信息
const info = {
service: '{{service_name}}',
version: '{{service_version}}',
host: HOST,
port: PORT,
timeout: TIMEOUT,
algorithm_loaded: algorithmModule !== null
};
res.writeHead(200);
res.end(JSON.stringify(info));
} else {
// 404响应
res.writeHead(404);
res.end(JSON.stringify({ error: 'Not Found' }));
}
} else if (req.method === 'POST') {
let body = '';
// 读取请求体
req.on('data', chunk => {
body += chunk.toString();
});
req.on('end', () => {
try {
// 解析请求数据
const requestData = JSON.parse(body);
// 记录请求开始时间
const startTime = Date.now();
// 调用算法
const result = callAlgorithm(requestData);
// 计算响应时间
const responseTime = (Date.now() - startTime) / 1000;
// 构建响应
const response = {
success: true,
result: result,
response_time: responseTime.toFixed(4),
message: '算法执行成功'
};
// 发送响应
res.writeHead(200);
res.end(JSON.stringify(response));
} catch (e) {
// 构建错误响应
const errorResponse = {
success: false,
error: e.message,
message: '算法执行失败'
};
// 发送错误响应
res.writeHead(400);
res.end(JSON.stringify(errorResponse));
}
});
} else {
// 不支持的方法
res.writeHead(405);
res.end(JSON.stringify({ error: 'Method Not Allowed' }));
}
}
/**
* 启动服务
*/
function startServer() {
const server = http.createServer(handleRequest);
server.listen(PORT, HOST, () => {
console.log(`服务启动成功,监听地址: ${HOST}:${PORT}`);
console.log(`健康检查地址: http://${HOST}:${PORT}/health`);
console.log(`服务信息地址: http://${HOST}:${PORT}/info`);
});
server.on('error', (error) => {
console.error('服务启动失败:', error);
});
}
// 启动服务
startServer();
'''.replace("{{service_name}}", service_config.get("name", "algorithm-service")).replace("{{service_version}}", service_config.get("version", "1.0.0"))
def _create_dependency_file(self, build_context: str, project_info: Dict[str, Any]):
"""创建依赖文件
Args:
build_context: 构建上下文目录
project_info: 项目信息
"""
if project_info["project_type"] == "python":
# 创建requirements.txt
with open(os.path.join(build_context, "requirements.txt"), "w") as f:
f.write("""
# 基础依赖
http.server
json
# 算法依赖
# 注意在实际实现中应该从项目的requirements.txt复制依赖
""")
elif project_info["project_type"] == "nodejs":
# 创建package.json
package_data = {
"name": "algorithm-service",
"version": "1.0.0",
"description": "Algorithm service wrapper",
"main": "service_wrapper.js",
"scripts": {
"start": "node service_wrapper.js"
},
"dependencies": {
# 基础依赖
}
}
with open(os.path.join(build_context, "package.json"), "w") as f:
json.dump(package_data, f, indent=2)
def _create_service_directory(self, service_id: str) -> str:
"""创建服务目录
Args:
service_id: 服务ID
Returns:
服务目录路径
"""
service_dir = os.path.join("/tmp", f"algorithm-service-{service_id}")
os.makedirs(service_dir, exist_ok=True)
return service_dir
def _generate_local_service_wrapper(self, service_dir: str, project_info: Dict[str, Any], service_config: Dict[str, Any], repo_path: str = None):
"""生成本地服务包装器
Args:
service_dir: 服务目录
project_info: 项目信息
service_config: 服务配置
repo_path: 仓库路径(用于复制真实的算法文件)
"""
# 生成服务包装器
service_wrapper_content = self._generate_service_wrapper(project_info, service_config)
wrapper_extension = ".py" if project_info["project_type"] == "python" else ".js"
with open(os.path.join(service_dir, f"service_wrapper{wrapper_extension}"), "w") as f:
f.write(service_wrapper_content)
# 复制真实的算法文件
if repo_path and project_info["project_type"] == "python":
# 尝试找到并复制主要的算法文件
entry_point = project_info.get("entry_point")
if entry_point:
source_file = os.path.join(repo_path, entry_point)
if os.path.exists(source_file):
# 复制算法文件到服务目录
import shutil
shutil.copy2(source_file, os.path.join(service_dir, "algorithm.py"))
print(f"已复制算法文件: {source_file} -> {os.path.join(service_dir, 'algorithm.py')}")
return
# 如果没有找到入口点尝试复制所有Python文件
if os.path.exists(repo_path):
import shutil
for root, dirs, files in os.walk(repo_path):
for file in files:
if file.endswith(".py") and not file.startswith("_"):
source_file = os.path.join(root, file)
dest_file = os.path.join(service_dir, file)
shutil.copy2(source_file, dest_file)
print(f"已复制Python文件: {source_file} -> {dest_file}")
# 如果有algorithm.py就使用它否则创建一个模拟的
if not os.path.exists(os.path.join(service_dir, "algorithm.py")):
print("未找到algorithm.py创建模拟算法文件")
self._create_mock_algorithm(service_dir)
else:
# 创建模拟的算法文件
self._create_mock_algorithm(service_dir)
def _create_mock_algorithm(self, service_dir: str):
"""创建模拟的算法文件
Args:
service_dir: 服务目录
"""
algorithm_content = """
def predict(data):
return {"result": "Prediction result", "input": data}
def run(data):
return {"result": "Run result", "input": data}
def main(data):
return {"result": "Main result", "input": data}
"""
with open(os.path.join(service_dir, "algorithm.py"), "w") as f:
f.write(algorithm_content)
def _start_local_service_process(self, service_id: str, service_dir: str, project_info: Dict[str, Any], service_config: Dict[str, Any]) -> Dict[str, Any]:
"""启动本地服务进程
Args:
service_id: 服务ID
service_dir: 服务目录
project_info: 项目信息
service_config: 服务配置
Returns:
进程信息
"""
# 创建日志文件
log_file = os.path.join(service_dir, f"service_{service_id}.log")
# 构建启动命令
if project_info["project_type"] == "python":
cmd = ["python", "service_wrapper.py"]
else:
cmd = ["node", "service_wrapper.js"]
# 设置环境变量
env = os.environ.copy()
env["HOST"] = service_config.get("host", "0.0.0.0")
env["PORT"] = str(service_config.get("port", 8000))
env["TIMEOUT"] = str(service_config.get("timeout", 30))
# 启动进程
process = subprocess.Popen(
cmd,
cwd=service_dir,
env=env,
stdout=open(log_file, "a"),
stderr=subprocess.STDOUT,
start_new_session=True
)
# 保存进程信息
process_info = {
"pid": process.pid,
"service_dir": service_dir,
"log_file": log_file,
"project_info": project_info,
"service_config": service_config
}
self.processes[service_id] = process_info
return process_info
def _verify_local_service_startup(self, service_id: str, service_config: Dict[str, Any]) -> bool:
"""验证本地服务启动
Args:
service_id: 服务ID
service_config: 服务配置
Returns:
是否启动成功
"""
# 等待服务启动
time.sleep(5)
# 验证服务健康状态
return self._verify_local_service_health(service_id, service_config)
def _verify_local_service_health(self, service_id: str, service_config: Dict[str, Any]) -> bool:
"""验证本地服务健康状态
Args:
service_id: 服务ID
service_config: 服务配置
Returns:
是否健康
"""
try:
import requests
# 构建健康检查URL
# 使用localhost而不是0.0.0.0,因为健康检查是在本地执行的
host = service_config.get("host", "localhost")
if host == "0.0.0.0":
host = "localhost"
port = service_config.get("port", 8000)
health_check_url = f"http://{host}:{port}/health"
# 发送健康检查请求
response = requests.get(health_check_url, timeout=10)
return response.status_code == 200
except Exception as e:
print(f"健康检查失败: {e}")
return False