Files
algorithm/backend/app/routes/permissions.py
2026-02-08 14:42:58 +08:00

264 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""权限管理路由,提供算法访问权限和用户权限管理功能"""
from fastapi import APIRouter, HTTPException, status, Depends
from typing import List, Dict, Any, Optional
from pydantic import BaseModel
from app.services.permission import (
permission_manager, rbac_manager,
AccessLevel, PermissionType
)
from app.models.database import get_db
from app.routes.user import get_current_active_user
router = APIRouter(prefix="/permissions", tags=["permissions"])
class GrantPermissionRequest(BaseModel):
"""授予权限请求"""
user_id: str
algorithm_id: str
access_level: str # 使用字符串稍后转换为AccessLevel
class CheckPermissionRequest(BaseModel):
"""检查权限请求"""
algorithm_id: str
permission_type: str # 使用字符串稍后转换为PermissionType
class RevokePermissionRequest(BaseModel):
"""撤销权限请求"""
user_id: str
algorithm_id: str
@router.post("/grant")
async def grant_permission(
request: GrantPermissionRequest,
current_user: dict = Depends(get_current_active_user),
db = Depends(get_db)
):
"""授予用户对算法的权限"""
# 只有管理员和经理可以授予权限
if current_user.get("role") not in ["admin", "manager"]:
raise HTTPException(status_code=403, detail="Insufficient permissions to grant permissions")
# 验证访问级别
try:
access_level = AccessLevel(request.access_level)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid access level. Valid levels: {[level.value for level in AccessLevel]}")
success = permission_manager.grant_permission(
db, current_user.get("id"), request.user_id,
request.algorithm_id, access_level
)
if success:
return {
"message": "Permission granted successfully",
"user_id": request.user_id,
"algorithm_id": request.algorithm_id,
"access_level": request.access_level
}
else:
raise HTTPException(status_code=500, detail="Failed to grant permission")
@router.post("/revoke")
async def revoke_permission(
request: RevokePermissionRequest,
current_user: dict = Depends(get_current_active_user),
db = Depends(get_db)
):
"""撤销用户对算法的权限"""
# 只有管理员和经理可以撤销权限
if current_user.get("role") not in ["admin", "manager"]:
raise HTTPException(status_code=403, detail="Insufficient permissions to revoke permissions")
success = permission_manager.revoke_permission(
db, current_user.get("id"), request.user_id, request.algorithm_id
)
if success:
return {
"message": "Permission revoked successfully",
"user_id": request.user_id,
"algorithm_id": request.algorithm_id
}
else:
raise HTTPException(status_code=500, detail="Failed to revoke permission")
@router.post("/check")
async def check_permission(
request: CheckPermissionRequest,
current_user: dict = Depends(get_current_active_user),
db = Depends(get_db)
):
"""检查用户对算法的权限"""
# 验证权限类型
try:
permission_type = PermissionType(request.permission_type)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid permission type. Valid types: {[ptype.value for ptype in PermissionType]}")
has_permission = permission_manager.check_algorithm_access(
db, current_user.get("id"), request.algorithm_id, permission_type
)
return {
"has_permission": has_permission,
"user_id": current_user.get("id"),
"algorithm_id": request.algorithm_id,
"permission_type": request.permission_type
}
@router.get("/user/{user_id}")
async def get_user_permissions(
user_id: str,
current_user: dict = Depends(get_current_active_user),
db = Depends(get_db)
):
"""获取用户的权限列表"""
# 用户只能查看自己的权限,管理员可以查看任何用户权限
if current_user.get("role") not in ["admin", "manager"]:
if user_id != current_user.get("id"):
raise HTTPException(status_code=403, detail="Cannot view permissions for other users")
permissions = permission_manager.get_user_permissions(db, user_id)
return {
"user_id": user_id,
"permissions": permissions,
"count": len(permissions)
}
@router.get("/algorithm/{algorithm_id}")
async def get_algorithm_permissions(
algorithm_id: str,
current_user: dict = Depends(get_current_active_user),
db = Depends(get_db)
):
"""获取算法的权限分配情况"""
# 检查用户是否有权限查看算法权限
can_read = permission_manager.check_algorithm_access(
db, current_user.get("id"), algorithm_id, PermissionType.READ
)
if not can_read and current_user.get("role") not in ["admin", "manager"]:
raise HTTPException(status_code=403, detail="Insufficient permissions to view algorithm permissions")
permissions = permission_manager.get_algorithm_permissions(db, algorithm_id)
return {
"algorithm_id": algorithm_id,
"permissions": permissions,
"count": len(permissions)
}
@router.get("/role/{role_name}")
async def get_role_permissions(
role_name: str,
current_user: dict = Depends(get_current_active_user)
):
"""获取角色的权限列表"""
# 所有用户都可以查看角色权限
permissions = rbac_manager.get_role_permissions(role_name)
if not permissions:
raise HTTPException(status_code=404, detail="Role not found")
return {
"role": role_name,
"permissions": [perm.value for perm in permissions]
}
@router.get("/validate-operation")
async def validate_user_algorithm_operation(
algorithm_id: str,
operation: str,
current_user: dict = Depends(get_current_active_user),
db = Depends(get_db)
):
"""验证用户对算法的操作权限"""
is_valid = permission_manager.validate_user_algorithm_operation(
db, current_user.get("id"), algorithm_id, operation
)
return {
"user_id": current_user.get("id"),
"algorithm_id": algorithm_id,
"operation": operation,
"has_permission": is_valid
}
@router.get("/my-permissions")
async def get_my_permissions(
current_user: dict = Depends(get_current_active_user),
db = Depends(get_db)
):
"""获取当前用户的权限"""
permissions = permission_manager.get_user_permissions(db, current_user.get("id"))
return {
"user_id": current_user.get("id"),
"username": current_user.get("username"),
"role": current_user.get("role"),
"permissions": permissions,
"count": len(permissions)
}
@router.get("/user-role-permissions/{user_id}")
async def get_user_role_based_permissions(
user_id: str,
current_user: dict = Depends(get_current_active_user),
db = Depends(get_db)
):
"""获取用户的基于角色的权限(而非具体算法权限)"""
# 用户只能查看自己的权限,管理员可以查看任何用户权限
if current_user.get("role") not in ["admin", "manager"]:
if user_id != current_user.get("id"):
raise HTTPException(status_code=403, detail="Cannot view permissions for other users")
# 获取用户角色
from app.models.models import User
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
role_permissions = rbac_manager.get_role_permissions(user.role)
return {
"user_id": user_id,
"role": user.role,
"role_permissions": [perm.value for perm in role_permissions]
}
@router.get("/check-api-key-access")
async def check_api_key_access(
api_key_value: str,
algorithm_id: str,
current_user: dict = Depends(get_current_active_user),
db = Depends(get_db)
):
"""检查API密钥对算法的访问权限"""
# 只有管理员可以检查任意API密钥的权限
if current_user.get("role") != "admin":
raise HTTPException(status_code=403, detail="Only admins can check API key access")
has_access = permission_manager.check_api_key_access(db, api_key_value, algorithm_id)
return {
"api_key_valid": True, # 如果到达这里说明API密钥存在且活跃
"has_algorithm_access": has_access,
"algorithm_id": algorithm_id
}