Files
algorithm/backend/app/services/permission.py
2026-02-08 20:06:35 +08:00

270 lines
10 KiB
Python

"""权限管理服务,负责算法访问权限和用户权限管理"""
from typing import Dict, List, Optional, Any
from enum import Enum
from datetime import datetime
import logging
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from app.models.models import User, Algorithm
logger = logging.getLogger(__name__)
class PermissionType(Enum):
"""权限类型枚举"""
READ = "read"
WRITE = "write"
EXECUTE = "execute"
ADMIN = "admin"
class AccessLevel(Enum):
"""访问级别枚举"""
NONE = "none"
READ = "read"
READ_WRITE = "read_write"
EXECUTE = "execute"
FULL = "full"
class PermissionManager:
"""权限管理器,处理用户对算法的访问权限"""
def __init__(self):
pass
def check_algorithm_access(self, db: Session, user_id: str, algorithm_id: str,
permission_type: PermissionType) -> bool:
"""检查用户对算法的访问权限"""
try:
# 获取用户和算法信息
user = db.query(User).filter(User.id == user_id).first()
algorithm = db.query(Algorithm).filter(Algorithm.id == algorithm_id).first()
if not user or not algorithm:
return False
# 管理员拥有所有权限
if user.role == "admin":
return True
# 算法所有者拥有完全权限
# 注意:在这个模型中,我们没有直接的算法所有者字段
# 实际应用中可能需要添加算法创建者的关联
# 根据算法状态决定权限
if algorithm.status == "inactive":
# 非活跃算法只有管理员可以访问
return False
# 根据权限类型检查权限
if permission_type in [PermissionType.READ, PermissionType.EXECUTE]:
# 读取和执行权限:活跃算法对所有认证用户开放
return algorithm.status == "active"
elif permission_type == PermissionType.WRITE:
# 写入权限:只有特定用户或管理员可以
return user.role in ["admin", "manager"]
elif permission_type == PermissionType.ADMIN:
# 管理权限:只有管理员可以
return user.role == "admin"
return False
except Exception as e:
logger.error(f"Error checking algorithm access: {str(e)}")
return False
def grant_permission(self, db: Session, admin_user_id: str, user_id: str,
algorithm_id: str, access_level: AccessLevel) -> bool:
"""授予用户对算法的权限"""
try:
# 验证管理员权限
admin_user = db.query(User).filter(
User.id == admin_user_id,
User.role.in_(["admin", "manager"])
).first()
if not admin_user:
logger.warning(f"User {admin_user_id} is not authorized to grant permissions")
return False
# 验证目标用户和算法存在
target_user = db.query(User).filter(User.id == user_id).first()
algorithm = db.query(Algorithm).filter(Algorithm.id == algorithm_id).first()
if not target_user or not algorithm:
logger.warning("Target user or algorithm not found")
return False
# 在实际实现中,这里应该创建权限记录
# 由于当前数据模型中没有专门的权限表,我们记录日志
logger.info(f"Permission granted: user {user_id} -> algorithm {algorithm_id}, "
f"level: {access_level.value}, by admin: {admin_user_id}")
# 在实际实现中,应该创建权限记录到数据库
# db.add(PermissionRecord(user_id=user_id, algorithm_id=algorithm_id, access_level=access_level))
# db.commit()
return True
except Exception as e:
logger.error(f"Error granting permission: {str(e)}")
return False
def revoke_permission(self, db: Session, admin_user_id: str, user_id: str,
algorithm_id: str) -> bool:
"""撤销用户对算法的权限"""
try:
# 验证管理员权限
admin_user = db.query(User).filter(
User.id == admin_user_id,
User.role.in_(["admin", "manager"])
).first()
if not admin_user:
logger.warning(f"User {admin_user_id} is not authorized to revoke permissions")
return False
# 在实际实现中,这里应该删除权限记录
logger.info(f"Permission revoked: user {user_id} -> algorithm {algorithm_id}, "
f"by admin: {admin_user_id}")
return True
except Exception as e:
logger.error(f"Error revoking permission: {str(e)}")
return False
def get_user_permissions(self, db: Session, user_id: str) -> List[Dict[str, Any]]:
"""获取用户的所有权限"""
try:
user = db.query(User).filter(User.id == user_id).first()
if not user:
return []
permissions = []
if user.role == "admin":
# 管理员拥有所有算法的完全权限
algorithms = db.query(Algorithm).all()
for algorithm in algorithms:
permissions.append({
"algorithm_id": algorithm.id,
"algorithm_name": algorithm.name,
"access_level": AccessLevel.FULL.value,
"granted_at": algorithm.created_at,
"granted_by": "system",
"permission_type": "administrative"
})
else:
# 普通用户只能访问活跃算法
algorithms = db.query(Algorithm).filter(Algorithm.status == "active").all()
for algorithm in algorithms:
permissions.append({
"algorithm_id": algorithm.id,
"algorithm_name": algorithm.name,
"access_level": AccessLevel.READ.value, # 默认只读权限
"granted_at": algorithm.created_at,
"granted_by": "system",
"permission_type": "public"
})
return permissions
except Exception as e:
logger.error(f"Error getting user permissions: {str(e)}")
return []
def get_algorithm_permissions(self, db: Session, algorithm_id: str) -> List[Dict[str, Any]]:
"""获取算法的所有权限分配"""
try:
algorithm = db.query(Algorithm).filter(Algorithm.id == algorithm_id).first()
if not algorithm:
return []
permissions = []
# 如果是活跃算法,所有用户都有读取权限
if algorithm.status == "active":
# 添加公共访问权限信息
permissions.append({
"user_id": "*",
"user_name": "All Users",
"access_level": AccessLevel.READ.value,
"granted_at": algorithm.created_at,
"granted_by": "system",
"permission_type": "public"
})
# 获取特定用户的权限(如果有专门的权限表的话)
# 这里我们模拟返回一些示例数据
# 在实际实现中,应该查询权限表
return permissions
except Exception as e:
logger.error(f"Error getting algorithm permissions: {str(e)}")
return []
def validate_user_algorithm_operation(self, db: Session, user_id: str, algorithm_id: str,
operation: str) -> bool:
"""验证用户对算法的操作权限"""
try:
perm_type = {
'read': PermissionType.READ,
'execute': PermissionType.EXECUTE,
'write': PermissionType.WRITE,
'admin': PermissionType.ADMIN
}.get(operation.lower(), PermissionType.READ)
return self.check_algorithm_access(db, user_id, algorithm_id, perm_type)
except Exception as e:
logger.error(f"Error validating user operation: {str(e)}")
return False
class RBACManager:
"""基于角色的访问控制管理器"""
def __init__(self):
# 定义角色权限映射
self.role_permissions = {
"admin": [
PermissionType.READ, PermissionType.WRITE,
PermissionType.EXECUTE, PermissionType.ADMIN
],
"manager": [
PermissionType.READ, PermissionType.WRITE, PermissionType.EXECUTE
],
"user": [PermissionType.READ, PermissionType.EXECUTE],
"guest": [PermissionType.READ]
}
def get_role_permissions(self, role: str) -> List[PermissionType]:
"""获取角色的权限列表"""
return self.role_permissions.get(role, [])
def user_has_permission(self, db: Session, user_id: str, permission_type: PermissionType) -> bool:
"""检查用户是否具有某种权限"""
try:
user = db.query(User).filter(User.id == user_id).first()
if not user:
return False
user_perms = self.get_role_permissions(user.role)
return permission_type in user_perms
except Exception as e:
logger.error(f"Error checking user permission: {str(e)}")
return False
# 全局权限管理器实例
permission_manager = PermissionManager()
rbac_manager = RBACManager()