Files
algorithm/backend/app/routes/monitoring.py
2026-02-08 14:42:58 +08:00

345 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""监控与日志路由,提供系统监控、指标收集和日志查询功能"""
from fastapi import APIRouter, HTTPException, status, Depends
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import json
from app.services.monitoring import monitoring_service
from app.utils.logger import structured_logger, log_query
from app.models.database import get_db
from app.routes.user import get_current_active_user
router = APIRouter(prefix="/monitoring", tags=["monitoring"])
@router.get("/health")
async def get_system_health():
"""获取系统健康状况"""
health = monitoring_service.get_system_health()
return health
@router.get("/dashboard")
async def get_dashboard_data(
current_user: dict = Depends(get_current_active_user),
db = Depends(get_db)
):
"""获取仪表板数据"""
# 只有管理员可以访问仪表板
if current_user.get("role") not in ["admin", "manager"]:
raise HTTPException(status_code=403, detail="Insufficient permissions")
dashboard_data = monitoring_service.get_dashboard_data(db)
return dashboard_data
@router.get("/metrics/system")
async def get_system_metrics(
current_user: dict = Depends(get_current_active_user)
):
"""获取系统指标"""
if current_user.get("role") not in ["admin", "manager"]:
raise HTTPException(status_code=403, detail="Insufficient permissions")
from app.services.monitoring import MetricsCollector
collector = MetricsCollector()
metrics = collector.collect_system_metrics()
return metrics
@router.get("/metrics/business")
async def get_business_metrics(
current_user: dict = Depends(get_current_active_user),
db = Depends(get_db)
):
"""获取业务指标"""
if current_user.get("role") not in ["admin", "manager"]:
raise HTTPException(status_code=403, detail="Insufficient permissions")
from app.services.monitoring import MetricsCollector
collector = MetricsCollector()
metrics = collector.collect_business_metrics(db)
return metrics
@router.get("/metrics/history")
async def get_metrics_history(
metric_type: str = "system",
limit: int = 100,
current_user: dict = Depends(get_current_active_user)
):
"""获取指标历史"""
if current_user.get("role") not in ["admin", "manager"]:
raise HTTPException(status_code=403, detail="Insufficient permissions")
if metric_type not in ["system", "business"]:
raise HTTPException(status_code=400, detail="Invalid metric type. Use 'system' or 'business'")
from app.services.monitoring import MetricsCollector
collector = MetricsCollector()
history = collector.get_metric_history(metric_type, limit)
return {"history": history}
@router.get("/alerts/active")
async def get_active_alerts(
current_user: dict = Depends(get_current_active_user)
):
"""获取当前激活的告警"""
if current_user.get("role") not in ["admin", "manager"]:
raise HTTPException(status_code=403, detail="Insufficient permissions")
active_alerts = monitoring_service.alert_manager.get_active_alerts()
return {"active_alerts": active_alerts}
@router.get("/alerts/history")
async def get_alert_history(
limit: int = 100,
current_user: dict = Depends(get_current_active_user)
):
"""获取告警历史"""
if current_user.get("role") not in ["admin", "manager"]:
raise HTTPException(status_code=403, detail="Insufficient permissions")
history = monitoring_service.alert_manager.get_alert_history(limit)
return {"alert_history": history}
@router.post("/monitoring/start")
async def start_monitoring(
interval: int = 60,
current_user: dict = Depends(get_current_active_user),
db = Depends(get_db)
):
"""启动监控"""
if current_user.get("role") not in ["admin", "manager"]:
raise HTTPException(status_code=403, detail="Insufficient permissions")
# 注意:在实际应用中,我们不会在这里启动一个长时间运行的协程
# 这通常会在应用启动时完成
# 这里仅作为示例返回确认信息
return {
"message": "Monitoring started",
"interval": interval,
"timestamp": datetime.utcnow().isoformat()
}
@router.post("/monitoring/stop")
async def stop_monitoring(
current_user: dict = Depends(get_current_active_user)
):
"""停止监控"""
if current_user.get("role") not in ["admin", "manager"]:
raise HTTPException(status_code=403, detail="Insufficient permissions")
await monitoring_service.stop_monitoring()
return {
"message": "Monitoring stopped",
"timestamp": datetime.utcnow().isoformat()
}
@router.post("/logs/event")
async def log_custom_event(
event_type: str,
user_id: Optional[str] = None,
algorithm_id: Optional[str] = None,
extra_data: Dict[str, Any] = {},
current_user: dict = Depends(get_current_active_user)
):
"""记录自定义事件日志"""
# 普通用户只能记录自己的事件
if current_user.get("role") not in ["admin", "manager"]:
if user_id and user_id != current_user.get("id"):
raise HTTPException(status_code=403, detail="Cannot log events for other users")
user_id = current_user.get("id")
structured_logger.log_event(
event_type=event_type,
user_id=user_id,
algorithm_id=algorithm_id,
extra_data=extra_data
)
return {
"message": "Event logged successfully",
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat()
}
@router.post("/logs/api-call")
async def log_api_call(
user_id: str,
algorithm_id: str,
version_id: str,
input_size: int,
response_time: float,
success: bool,
error_msg: Optional[str] = None,
current_user: dict = Depends(get_current_active_user)
):
"""记录API调用日志"""
# 管理员或用户自己可以记录日志
if current_user.get("role") not in ["admin", "manager"]:
if user_id != current_user.get("id"):
raise HTTPException(status_code=403, detail="Cannot log API calls for other users")
structured_logger.log_api_call(
user_id=user_id,
algorithm_id=algorithm_id,
version_id=version_id,
input_size=input_size,
response_time=response_time,
success=success,
error_msg=error_msg
)
return {
"message": "API call logged successfully",
"success": success,
"timestamp": datetime.utcnow().isoformat()
}
@router.get("/logs/search")
async def search_logs(
start_date: Optional[str] = None,
end_date: Optional[str] = None,
event_types: Optional[str] = None, # 逗号分隔的事件类型
user_ids: Optional[str] = None, # 逗号分隔的用户ID
algorithm_ids: Optional[str] = None, # 逗号分隔的算法ID
log_levels: Optional[str] = None, # 逗号分隔的日志级别
limit: int = 100,
current_user: dict = Depends(get_current_active_user)
):
"""搜索日志"""
# 普通用户只能搜索自己的日志
if current_user.get("role") not in ["admin", "manager"]:
# 如果指定了其他用户ID则只允许查看自己的
if user_ids:
user_id_list = user_ids.split(',')
if current_user.get("id") not in user_id_list:
raise HTTPException(status_code=403, detail="Cannot search logs for other users")
else:
user_ids = current_user.get("id")
# 解析日期
start_dt = None
end_dt = None
if start_date:
try:
start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
except ValueError:
raise HTTPException(status_code=400, detail="Invalid start_date format")
if end_date:
try:
end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
except ValueError:
raise HTTPException(status_code=400, detail="Invalid end_date format")
# 解析数组参数
event_type_list = event_types.split(',') if event_types else None
user_id_list = user_ids.split(',') if user_ids else None
algorithm_id_list = algorithm_ids.split(',') if algorithm_ids else None
log_level_list = log_levels.split(',') if log_levels else None
# 执行搜索
results = log_query.search_logs(
start_date=start_dt,
end_date=end_dt,
event_types=event_type_list,
user_ids=user_id_list,
algorithm_ids=algorithm_id_list,
log_levels=log_level_list,
limit=limit
)
return {
"logs": results,
"count": len(results),
"limit": limit
}
@router.get("/logs/stats")
async def get_log_stats(
days: int = 7,
current_user: dict = Depends(get_current_active_user)
):
"""获取日志统计信息"""
if current_user.get("role") not in ["admin", "manager"]:
raise HTTPException(status_code=403, detail="Insufficient permissions")
stats = log_query.get_log_stats(days=days)
return stats
@router.get("/performance/algorithm/{algorithm_id}")
async def get_algorithm_performance(
algorithm_id: str,
days: int = 7,
current_user: dict = Depends(get_current_active_user),
db = Depends(get_db)
):
"""获取算法性能指标"""
# 用户只能查看自己有权访问的算法
if current_user.get("role") not in ["admin", "manager"]:
# 这里应该检查用户是否有权访问该算法
# 简单起见,我们假设用户可以查看任何算法
pass
from sqlalchemy import func
from app.models.models import AlgorithmCall
# 计算性能指标
start_date = datetime.utcnow() - timedelta(days=days)
# 总调用次数
total_calls = db.query(func.count(AlgorithmCall.id)).filter(
AlgorithmCall.algorithm_id == algorithm_id,
AlgorithmCall.created_at >= start_date
).scalar()
# 成功调用次数
success_calls = db.query(func.count(AlgorithmCall.id)).filter(
AlgorithmCall.algorithm_id == algorithm_id,
AlgorithmCall.status == 'success',
AlgorithmCall.created_at >= start_date
).scalar()
# 平均响应时间
avg_response_time = db.query(func.avg(AlgorithmCall.response_time)).filter(
AlgorithmCall.algorithm_id == algorithm_id,
AlgorithmCall.response_time.isnot(None),
AlgorithmCall.created_at >= start_date
).scalar()
# 按状态分组
status_counts = db.query(
AlgorithmCall.status,
func.count(AlgorithmCall.id)
).filter(
AlgorithmCall.algorithm_id == algorithm_id,
AlgorithmCall.created_at >= start_date
).group_by(AlgorithmCall.status).all()
status_dict = {status: count for status, count in status_counts}
success_rate = (success_calls / total_calls * 100) if total_calls > 0 else 0
return {
"algorithm_id": algorithm_id,
"period_days": days,
"total_calls": total_calls,
"success_calls": success_calls,
"failed_calls": total_calls - success_calls,
"success_rate": round(success_rate, 2),
"average_response_time": round(avg_response_time, 3) if avg_response_time else None,
"status_distribution": status_dict,
"timestamp": datetime.utcnow().isoformat()
}