246 lines
7.7 KiB
Python
246 lines
7.7 KiB
Python
"""权限管理路由,提供算法访问权限和用户权限管理功能"""
|
||
|
||
from fastapi import APIRouter, HTTPException, status, Depends
|
||
from typing import List, Dict, Any, Optional
|
||
from pydantic import BaseModel
|
||
|
||
from app.services.permission import (
|
||
permission_manager, rbac_manager,
|
||
AccessLevel, PermissionType
|
||
)
|
||
from app.models.database import get_db
|
||
from app.routes.user import get_current_active_user
|
||
|
||
router = APIRouter(prefix="/permissions", tags=["permissions"])
|
||
|
||
|
||
class GrantPermissionRequest(BaseModel):
|
||
"""授予权限请求"""
|
||
user_id: str
|
||
algorithm_id: str
|
||
access_level: str # 使用字符串,稍后转换为AccessLevel
|
||
|
||
|
||
class CheckPermissionRequest(BaseModel):
|
||
"""检查权限请求"""
|
||
algorithm_id: str
|
||
permission_type: str # 使用字符串,稍后转换为PermissionType
|
||
|
||
|
||
class RevokePermissionRequest(BaseModel):
|
||
"""撤销权限请求"""
|
||
user_id: str
|
||
algorithm_id: str
|
||
|
||
|
||
@router.post("/grant")
|
||
async def grant_permission(
|
||
request: GrantPermissionRequest,
|
||
current_user: dict = Depends(get_current_active_user),
|
||
db = Depends(get_db)
|
||
):
|
||
"""授予用户对算法的权限"""
|
||
# 只有管理员和经理可以授予权限
|
||
if current_user.get("role") not in ["admin", "manager"]:
|
||
raise HTTPException(status_code=403, detail="Insufficient permissions to grant permissions")
|
||
|
||
# 验证访问级别
|
||
try:
|
||
access_level = AccessLevel(request.access_level)
|
||
except ValueError:
|
||
raise HTTPException(status_code=400, detail=f"Invalid access level. Valid levels: {[level.value for level in AccessLevel]}")
|
||
|
||
success = permission_manager.grant_permission(
|
||
db, current_user.get("id"), request.user_id,
|
||
request.algorithm_id, access_level
|
||
)
|
||
|
||
if success:
|
||
return {
|
||
"message": "Permission granted successfully",
|
||
"user_id": request.user_id,
|
||
"algorithm_id": request.algorithm_id,
|
||
"access_level": request.access_level
|
||
}
|
||
else:
|
||
raise HTTPException(status_code=500, detail="Failed to grant permission")
|
||
|
||
|
||
@router.post("/revoke")
|
||
async def revoke_permission(
|
||
request: RevokePermissionRequest,
|
||
current_user: dict = Depends(get_current_active_user),
|
||
db = Depends(get_db)
|
||
):
|
||
"""撤销用户对算法的权限"""
|
||
# 只有管理员和经理可以撤销权限
|
||
if current_user.get("role") not in ["admin", "manager"]:
|
||
raise HTTPException(status_code=403, detail="Insufficient permissions to revoke permissions")
|
||
|
||
success = permission_manager.revoke_permission(
|
||
db, current_user.get("id"), request.user_id, request.algorithm_id
|
||
)
|
||
|
||
if success:
|
||
return {
|
||
"message": "Permission revoked successfully",
|
||
"user_id": request.user_id,
|
||
"algorithm_id": request.algorithm_id
|
||
}
|
||
else:
|
||
raise HTTPException(status_code=500, detail="Failed to revoke permission")
|
||
|
||
|
||
@router.post("/check")
|
||
async def check_permission(
|
||
request: CheckPermissionRequest,
|
||
current_user: dict = Depends(get_current_active_user),
|
||
db = Depends(get_db)
|
||
):
|
||
"""检查用户对算法的权限"""
|
||
# 验证权限类型
|
||
try:
|
||
permission_type = PermissionType(request.permission_type)
|
||
except ValueError:
|
||
raise HTTPException(status_code=400, detail=f"Invalid permission type. Valid types: {[ptype.value for ptype in PermissionType]}")
|
||
|
||
has_permission = permission_manager.check_algorithm_access(
|
||
db, current_user.get("id"), request.algorithm_id, permission_type
|
||
)
|
||
|
||
return {
|
||
"has_permission": has_permission,
|
||
"user_id": current_user.get("id"),
|
||
"algorithm_id": request.algorithm_id,
|
||
"permission_type": request.permission_type
|
||
}
|
||
|
||
|
||
@router.get("/user/{user_id}")
|
||
async def get_user_permissions(
|
||
user_id: str,
|
||
current_user: dict = Depends(get_current_active_user),
|
||
db = Depends(get_db)
|
||
):
|
||
"""获取用户的权限列表"""
|
||
# 用户只能查看自己的权限,管理员可以查看任何用户权限
|
||
if current_user.get("role") not in ["admin", "manager"]:
|
||
if user_id != current_user.get("id"):
|
||
raise HTTPException(status_code=403, detail="Cannot view permissions for other users")
|
||
|
||
permissions = permission_manager.get_user_permissions(db, user_id)
|
||
|
||
return {
|
||
"user_id": user_id,
|
||
"permissions": permissions,
|
||
"count": len(permissions)
|
||
}
|
||
|
||
|
||
@router.get("/algorithm/{algorithm_id}")
|
||
async def get_algorithm_permissions(
|
||
algorithm_id: str,
|
||
current_user: dict = Depends(get_current_active_user),
|
||
db = Depends(get_db)
|
||
):
|
||
"""获取算法的权限分配情况"""
|
||
# 检查用户是否有权限查看算法权限
|
||
can_read = permission_manager.check_algorithm_access(
|
||
db, current_user.get("id"), algorithm_id, PermissionType.READ
|
||
)
|
||
|
||
if not can_read and current_user.get("role") not in ["admin", "manager"]:
|
||
raise HTTPException(status_code=403, detail="Insufficient permissions to view algorithm permissions")
|
||
|
||
permissions = permission_manager.get_algorithm_permissions(db, algorithm_id)
|
||
|
||
return {
|
||
"algorithm_id": algorithm_id,
|
||
"permissions": permissions,
|
||
"count": len(permissions)
|
||
}
|
||
|
||
|
||
@router.get("/role/{role_name}")
|
||
async def get_role_permissions(
|
||
role_name: str,
|
||
current_user: dict = Depends(get_current_active_user)
|
||
):
|
||
"""获取角色的权限列表"""
|
||
# 所有用户都可以查看角色权限
|
||
permissions = rbac_manager.get_role_permissions(role_name)
|
||
|
||
if not permissions:
|
||
raise HTTPException(status_code=404, detail="Role not found")
|
||
|
||
return {
|
||
"role": role_name,
|
||
"permissions": [perm.value for perm in permissions]
|
||
}
|
||
|
||
|
||
@router.get("/validate-operation")
|
||
async def validate_user_algorithm_operation(
|
||
algorithm_id: str,
|
||
operation: str,
|
||
current_user: dict = Depends(get_current_active_user),
|
||
db = Depends(get_db)
|
||
):
|
||
"""验证用户对算法的操作权限"""
|
||
is_valid = permission_manager.validate_user_algorithm_operation(
|
||
db, current_user.get("id"), algorithm_id, operation
|
||
)
|
||
|
||
return {
|
||
"user_id": current_user.get("id"),
|
||
"algorithm_id": algorithm_id,
|
||
"operation": operation,
|
||
"has_permission": is_valid
|
||
}
|
||
|
||
|
||
@router.get("/my-permissions")
|
||
async def get_my_permissions(
|
||
current_user: dict = Depends(get_current_active_user),
|
||
db = Depends(get_db)
|
||
):
|
||
"""获取当前用户的权限"""
|
||
permissions = permission_manager.get_user_permissions(db, current_user.get("id"))
|
||
|
||
return {
|
||
"user_id": current_user.get("id"),
|
||
"username": current_user.get("username"),
|
||
"role": current_user.get("role"),
|
||
"permissions": permissions,
|
||
"count": len(permissions)
|
||
}
|
||
|
||
|
||
@router.get("/user-role-permissions/{user_id}")
|
||
async def get_user_role_based_permissions(
|
||
user_id: str,
|
||
current_user: dict = Depends(get_current_active_user),
|
||
db = Depends(get_db)
|
||
):
|
||
"""获取用户的基于角色的权限(而非具体算法权限)"""
|
||
# 用户只能查看自己的权限,管理员可以查看任何用户权限
|
||
if current_user.get("role") not in ["admin", "manager"]:
|
||
if user_id != current_user.get("id"):
|
||
raise HTTPException(status_code=403, detail="Cannot view permissions for other users")
|
||
|
||
# 获取用户角色
|
||
from app.models.models import User
|
||
user = db.query(User).filter(User.id == user_id).first()
|
||
if not user:
|
||
raise HTTPException(status_code=404, detail="User not found")
|
||
|
||
role_permissions = rbac_manager.get_role_permissions(user.role)
|
||
|
||
return {
|
||
"user_id": user_id,
|
||
"role": user.role,
|
||
"role_permissions": [perm.value for perm in role_permissions]
|
||
}
|
||
|
||
|