"""权限管理服务,负责算法访问权限和用户权限管理""" from typing import Dict, List, Optional, Any from enum import Enum from datetime import datetime import logging from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from app.models.models import User, Algorithm logger = logging.getLogger(__name__) class PermissionType(Enum): """权限类型枚举""" READ = "read" WRITE = "write" EXECUTE = "execute" ADMIN = "admin" class AccessLevel(Enum): """访问级别枚举""" NONE = "none" READ = "read" READ_WRITE = "read_write" EXECUTE = "execute" FULL = "full" class PermissionManager: """权限管理器,处理用户对算法的访问权限""" def __init__(self): pass def check_algorithm_access(self, db: Session, user_id: str, algorithm_id: str, permission_type: PermissionType) -> bool: """检查用户对算法的访问权限""" try: # 获取用户和算法信息 user = db.query(User).filter(User.id == user_id).first() algorithm = db.query(Algorithm).filter(Algorithm.id == algorithm_id).first() if not user or not algorithm: return False # 管理员拥有所有权限 if user.role == "admin": return True # 算法所有者拥有完全权限 # 注意:在这个模型中,我们没有直接的算法所有者字段 # 实际应用中可能需要添加算法创建者的关联 # 根据算法状态决定权限 if algorithm.status == "inactive": # 非活跃算法只有管理员可以访问 return False # 根据权限类型检查权限 if permission_type in [PermissionType.READ, PermissionType.EXECUTE]: # 读取和执行权限:活跃算法对所有认证用户开放 return algorithm.status == "active" elif permission_type == PermissionType.WRITE: # 写入权限:只有特定用户或管理员可以 return user.role in ["admin", "manager"] elif permission_type == PermissionType.ADMIN: # 管理权限:只有管理员可以 return user.role == "admin" return False except Exception as e: logger.error(f"Error checking algorithm access: {str(e)}") return False def grant_permission(self, db: Session, admin_user_id: str, user_id: str, algorithm_id: str, access_level: AccessLevel) -> bool: """授予用户对算法的权限""" try: # 验证管理员权限 admin_user = db.query(User).filter( User.id == admin_user_id, User.role.in_(["admin", "manager"]) ).first() if not admin_user: logger.warning(f"User {admin_user_id} is not authorized to grant permissions") return False # 验证目标用户和算法存在 target_user = db.query(User).filter(User.id == user_id).first() algorithm = db.query(Algorithm).filter(Algorithm.id == algorithm_id).first() if not target_user or not algorithm: logger.warning("Target user or algorithm not found") return False # 在实际实现中,这里应该创建权限记录 # 由于当前数据模型中没有专门的权限表,我们记录日志 logger.info(f"Permission granted: user {user_id} -> algorithm {algorithm_id}, " f"level: {access_level.value}, by admin: {admin_user_id}") # 在实际实现中,应该创建权限记录到数据库 # db.add(PermissionRecord(user_id=user_id, algorithm_id=algorithm_id, access_level=access_level)) # db.commit() return True except Exception as e: logger.error(f"Error granting permission: {str(e)}") return False def revoke_permission(self, db: Session, admin_user_id: str, user_id: str, algorithm_id: str) -> bool: """撤销用户对算法的权限""" try: # 验证管理员权限 admin_user = db.query(User).filter( User.id == admin_user_id, User.role.in_(["admin", "manager"]) ).first() if not admin_user: logger.warning(f"User {admin_user_id} is not authorized to revoke permissions") return False # 在实际实现中,这里应该删除权限记录 logger.info(f"Permission revoked: user {user_id} -> algorithm {algorithm_id}, " f"by admin: {admin_user_id}") return True except Exception as e: logger.error(f"Error revoking permission: {str(e)}") return False def get_user_permissions(self, db: Session, user_id: str) -> List[Dict[str, Any]]: """获取用户的所有权限""" try: user = db.query(User).filter(User.id == user_id).first() if not user: return [] permissions = [] if user.role == "admin": # 管理员拥有所有算法的完全权限 algorithms = db.query(Algorithm).all() for algorithm in algorithms: permissions.append({ "algorithm_id": algorithm.id, "algorithm_name": algorithm.name, "access_level": AccessLevel.FULL.value, "granted_at": algorithm.created_at, "granted_by": "system", "permission_type": "administrative" }) else: # 普通用户只能访问活跃算法 algorithms = db.query(Algorithm).filter(Algorithm.status == "active").all() for algorithm in algorithms: permissions.append({ "algorithm_id": algorithm.id, "algorithm_name": algorithm.name, "access_level": AccessLevel.READ.value, # 默认只读权限 "granted_at": algorithm.created_at, "granted_by": "system", "permission_type": "public" }) return permissions except Exception as e: logger.error(f"Error getting user permissions: {str(e)}") return [] def get_algorithm_permissions(self, db: Session, algorithm_id: str) -> List[Dict[str, Any]]: """获取算法的所有权限分配""" try: algorithm = db.query(Algorithm).filter(Algorithm.id == algorithm_id).first() if not algorithm: return [] permissions = [] # 如果是活跃算法,所有用户都有读取权限 if algorithm.status == "active": # 添加公共访问权限信息 permissions.append({ "user_id": "*", "user_name": "All Users", "access_level": AccessLevel.READ.value, "granted_at": algorithm.created_at, "granted_by": "system", "permission_type": "public" }) # 获取特定用户的权限(如果有专门的权限表的话) # 这里我们模拟返回一些示例数据 # 在实际实现中,应该查询权限表 return permissions except Exception as e: logger.error(f"Error getting algorithm permissions: {str(e)}") return [] def validate_user_algorithm_operation(self, db: Session, user_id: str, algorithm_id: str, operation: str) -> bool: """验证用户对算法的操作权限""" try: perm_type = { 'read': PermissionType.READ, 'execute': PermissionType.EXECUTE, 'write': PermissionType.WRITE, 'admin': PermissionType.ADMIN }.get(operation.lower(), PermissionType.READ) return self.check_algorithm_access(db, user_id, algorithm_id, perm_type) except Exception as e: logger.error(f"Error validating user operation: {str(e)}") return False class RBACManager: """基于角色的访问控制管理器""" def __init__(self): # 定义角色权限映射 self.role_permissions = { "admin": [ PermissionType.READ, PermissionType.WRITE, PermissionType.EXECUTE, PermissionType.ADMIN ], "manager": [ PermissionType.READ, PermissionType.WRITE, PermissionType.EXECUTE ], "user": [PermissionType.READ, PermissionType.EXECUTE], "guest": [PermissionType.READ] } def get_role_permissions(self, role: str) -> List[PermissionType]: """获取角色的权限列表""" return self.role_permissions.get(role, []) def user_has_permission(self, db: Session, user_id: str, permission_type: PermissionType) -> bool: """检查用户是否具有某种权限""" try: user = db.query(User).filter(User.id == user_id).first() if not user: return False user_perms = self.get_role_permissions(user.role) return permission_type in user_perms except Exception as e: logger.error(f"Error checking user permission: {str(e)}") return False # 全局权限管理器实例 permission_manager = PermissionManager() rbac_manager = RBACManager()