Files
algorithm/backend/app/routes/user.py
2026-02-09 23:59:25 +08:00

308 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from typing import List
from jose import JWTError, jwt
from app.config.settings import settings
from app.models.database import get_db
from app.models.models import User, Role
from app.schemas.user import UserCreate, UserUpdate, UserResponse, UserListResponse, Token, LoginRequest, RoleCreate, RoleResponse
from app.services.user import UserService
# 创建路由器
router = APIRouter(prefix="/users", tags=["users"])
# OAuth2密码Bearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/users/login")
async def get_current_active_user(db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)):
"""获取当前活跃用户"""
try:
# 检查令牌是否在黑名单中
if UserService.is_token_blacklisted(token):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# 解码令牌
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# 使用UserService获取用户信息避免直接使用User模型
print(f"尝试通过用户名获取用户: {username}")
user = UserService.get_user_by_username(db, username)
print(f"获取用户结果: {user.id if user else 'None'}")
if not user:
# 尝试直接查询数据库
from app.models.models import User as UserModel
direct_user = db.query(UserModel).filter(UserModel.username == username).first()
print(f"直接查询数据库结果: {direct_user.id if direct_user else 'None'}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# 检查用户是否活跃
if user.status != "active":
raise HTTPException(status_code=400, detail="Inactive user")
# 构建角色响应
role_response = None
role_name = None
# 尝试获取角色信息
try:
# 先尝试使用预加载的角色
if hasattr(user, 'role') and user.role:
role = user.role
role_response = RoleResponse(
id=role.id,
name=role.name,
description=role.description,
created_at=role.created_at,
updated_at=role.updated_at
)
role_name = role.name
else:
# 如果没有预加载角色尝试通过role_id获取
role = UserService.get_role_by_id(db, user.role_id)
if role:
role_response = RoleResponse(
id=role.id,
name=role.name,
description=role.description,
created_at=role.created_at,
updated_at=role.updated_at
)
role_name = role.name
except Exception as e:
# 角色获取失败不影响用户认证
print(f"获取角色信息失败: {e}")
# 构建用户响应
user_response = UserResponse(
id=user.id,
username=user.username,
email=user.email,
role_id=user.role_id,
status=user.status,
created_at=user.created_at,
updated_at=user.updated_at,
role=role_response,
role_name=role_name
)
return user_response
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
except Exception as e:
print(f"获取当前用户失败: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
from app.schemas.user import LoginRequest
@router.post("/login", response_model=Token)
async def login(login_request: LoginRequest, db: Session = Depends(get_db)):
"""用户登录"""
user = UserService.authenticate_user(db, login_request.username, login_request.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# 创建访问令牌
access_token = UserService.create_access_token(
data={"sub": user.username, "user_id": user.id}
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/register", response_model=UserResponse)
async def register(user: UserCreate, db: Session = Depends(get_db)):
"""用户注册"""
# 检查用户名是否已存在
if UserService.get_user_by_username(db, user.username):
raise HTTPException(status_code=400, detail="Username already registered")
# 检查邮箱是否已存在
if UserService.get_user_by_email(db, user.email):
raise HTTPException(status_code=400, detail="Email already registered")
# 检查角色是否存在
if not UserService.get_role_by_id(db, user.role_id):
raise HTTPException(status_code=400, detail="Role not found")
# 创建用户
db_user = UserService.create_user(db, user)
return db_user
@router.get("/me", response_model=UserResponse)
async def read_users_me(current_user: UserResponse = Depends(get_current_active_user)):
"""获取当前用户信息"""
return current_user
@router.get("/", response_model=UserListResponse)
async def get_users(
skip: int = 0,
limit: int = 100,
current_user: UserResponse = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""获取用户列表"""
# 只有管理员可以查看用户列表
if current_user.role_name != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
users = UserService.get_users(db, skip=skip, limit=limit)
return {"users": users, "total": len(users)}
# 角色管理API
@router.post("/roles", response_model=RoleResponse)
async def create_role(
role: RoleCreate,
current_user: UserResponse = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""创建角色"""
# 只有管理员可以创建角色
if current_user.role_name != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
# 检查角色名称是否已存在
if UserService.get_role_by_name(db, role.name):
raise HTTPException(status_code=400, detail="Role name already exists")
# 创建角色
db_role = UserService.create_role(db, role)
return db_role
@router.get("/roles", response_model=List[RoleResponse])
async def get_roles(
current_user: UserResponse = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""获取角色列表"""
# 只有管理员可以查看所有角色
if current_user.role_name != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
roles = UserService.get_roles(db)
return roles
@router.get("/roles/{role_id}", response_model=RoleResponse)
async def get_role(
role_id: str,
current_user: UserResponse = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""获取角色详情"""
# 只有管理员可以查看角色详情
if current_user.role_name != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
role = UserService.get_role_by_id(db, role_id)
if not role:
raise HTTPException(status_code=404, detail="Role not found")
return role
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: str,
current_user: UserResponse = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""获取用户信息"""
# 只有管理员或用户本人可以查看用户信息
if current_user.role_name != "admin" and current_user.id != user_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
user = UserService.get_user_by_id(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: str,
user_update: UserUpdate,
current_user: UserResponse = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""更新用户信息"""
# 只有管理员或用户本人可以更新用户信息
if current_user.role_name != "admin" and current_user.id != user_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
# 非管理员只能更新自己的信息,不能更新角色
if current_user.role_name != "admin" and "role_id" in user_update.dict():
raise HTTPException(status_code=403, detail="Cannot update role")
# 检查角色是否存在
if "role_id" in user_update.dict():
if not UserService.get_role_by_id(db, user_update.role_id):
raise HTTPException(status_code=400, detail="Role not found")
user = UserService.update_user(db, user_id, user_update)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.delete("/{user_id}")
async def delete_user(
user_id: str,
current_user: UserResponse = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""删除用户"""
# 只有管理员可以删除用户
if current_user.role_name != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
# 检查用户是否存在
user = UserService.get_user_by_id(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
# 删除用户
db.delete(user)
db.commit()
return {"message": "User deleted successfully"}