Files
algorithm/backend/app/services/service_manager.py
2026-02-08 14:42:58 +08:00

213 lines
7.9 KiB
Python

"""服务管理模块,负责管理算法服务的状态和配置"""
from typing import Dict, List, Optional, Any
import asyncio
import aiohttp
import logging
from datetime import datetime
import json
logger = logging.getLogger(__name__)
class ServiceStatus:
"""服务状态枚举"""
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
UNKNOWN = "unknown"
MAINTENANCE = "maintenance"
class ServiceInfo:
"""服务信息类"""
def __init__(
self,
service_id: str,
name: str,
url: str,
status: str = ServiceStatus.UNKNOWN,
last_heartbeat: Optional[datetime] = None,
metadata: Optional[Dict[str, Any]] = None
):
self.service_id = service_id
self.name = name
self.url = url
self.status = status
self.last_heartbeat = last_heartbeat or datetime.utcnow()
self.metadata = metadata or {}
self.created_at = datetime.utcnow()
self.updated_at = datetime.utcnow()
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
"service_id": self.service_id,
"name": self.name,
"url": self.url,
"status": self.status,
"last_heartbeat": self.last_heartbeat.isoformat() if self.last_heartbeat else None,
"metadata": self.metadata,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat()
}
class ServiceManager:
"""服务管理器,管理所有注册的服务"""
def __init__(self):
self._services: Dict[str, ServiceInfo] = {}
self._health_check_interval = 30 # 健康检查间隔(秒)
self._monitoring_task = None
def register_service(
self,
service_id: str,
name: str,
url: str,
metadata: Optional[Dict[str, Any]] = None
) -> bool:
"""注册服务"""
try:
service_info = ServiceInfo(
service_id=service_id,
name=name,
url=url,
metadata=metadata or {}
)
self._services[service_id] = service_info
logger.info(f"Service registered: {service_id} at {url}")
return True
except Exception as e:
logger.error(f"Failed to register service {service_id}: {str(e)}")
return False
def unregister_service(self, service_id: str) -> bool:
"""注销服务"""
if service_id in self._services:
del self._services[service_id]
logger.info(f"Service unregistered: {service_id}")
return True
return False
def get_service(self, service_id: str) -> Optional[ServiceInfo]:
"""获取服务信息"""
return self._services.get(service_id)
def get_all_services(self) -> List[ServiceInfo]:
"""获取所有服务信息"""
return list(self._services.values())
def update_service_status(self, service_id: str, status: str) -> bool:
"""更新服务状态"""
if service_id in self._services:
service = self._services[service_id]
service.status = status
service.updated_at = datetime.utcnow()
service.last_heartbeat = datetime.utcnow()
return True
return False
def update_service_metadata(self, service_id: str, metadata: Dict[str, Any]) -> bool:
"""更新服务元数据"""
if service_id in self._services:
service = self._services[service_id]
service.metadata.update(metadata)
service.updated_at = datetime.utcnow()
return True
return False
async def health_check_single(self, service_info: ServiceInfo) -> str:
"""对单个服务进行健康检查"""
try:
async with aiohttp.ClientSession() as session:
# 尝试访问服务的健康检查端点
health_url = f"{service_info.url.rstrip('/')}/health"
async with session.get(health_url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
data = await response.json()
# 检查响应中的状态字段
if isinstance(data, dict) and data.get("status") == "healthy":
return ServiceStatus.HEALTHY
return ServiceStatus.HEALTHY
else:
return ServiceStatus.UNHEALTHY
except asyncio.TimeoutError:
logger.warning(f"Health check timeout for service {service_info.service_id}")
return ServiceStatus.UNHEALTHY
except Exception as e:
logger.warning(f"Health check failed for service {service_info.service_id}: {str(e)}")
return ServiceStatus.UNHEALTHY
async def health_check_all(self):
"""对所有服务进行健康检查"""
tasks = []
for service_info in self._services.values():
task = self.health_check_single(service_info)
tasks.append(task)
if tasks:
results = await asyncio.gather(*tasks, return_exceptions=True)
for service_info, result in zip(self._services.values(), results):
if isinstance(result, Exception):
logger.error(f"Health check error for {service_info.service_id}: {result}")
new_status = ServiceStatus.UNHEALTHY
else:
new_status = result
# 如果状态发生变化,则更新
if service_info.status != new_status:
logger.info(f"Service {service_info.service_id} status changed from {service_info.status} to {new_status}")
self.update_service_status(service_info.service_id, new_status)
async def start_monitoring(self):
"""启动服务监控"""
if self._monitoring_task and not self._monitoring_task.done():
logger.warning("Monitoring task already running")
return
self._monitoring_task = asyncio.create_task(self._monitor_loop())
logger.info("Service monitoring started")
async def stop_monitoring(self):
"""停止服务监控"""
if self._monitoring_task and not self._monitoring_task.done():
self._monitoring_task.cancel()
try:
await self._monitoring_task
except asyncio.CancelledError:
pass
logger.info("Service monitoring stopped")
async def _monitor_loop(self):
"""监控循环"""
while True:
try:
await self.health_check_all()
await asyncio.sleep(self._health_check_interval)
except asyncio.CancelledError:
logger.info("Monitor loop cancelled")
break
except Exception as e:
logger.error(f"Error in monitor loop: {str(e)}")
await asyncio.sleep(self._health_check_interval)
def get_statistics(self) -> Dict[str, Any]:
"""获取服务统计信息"""
total_services = len(self._services)
healthy_count = sum(1 for s in self._services.values() if s.status == ServiceStatus.HEALTHY)
unhealthy_count = sum(1 for s in self._services.values() if s.status == ServiceStatus.UNHEALTHY)
unknown_count = sum(1 for s in self._services.values() if s.status == ServiceStatus.UNKNOWN)
return {
"total_services": total_services,
"healthy_services": healthy_count,
"unhealthy_services": unhealthy_count,
"unknown_services": unknown_count,
"timestamp": datetime.utcnow().isoformat()
}
# 全局服务管理器实例
service_manager = ServiceManager()