"""Gitea相关的路由""" from fastapi import APIRouter, Depends, HTTPException, status, Body, File, Form, UploadFile import os import logging from typing import Optional, Dict, Any, List from app.gitea.service import gitea_service from app.dependencies import get_current_active_user logger = logging.getLogger(__name__) router = APIRouter(prefix="/gitea", tags=["gitea"]) @router.get("/config") async def get_gitea_config( current_user: dict = Depends(get_current_active_user) ): """ 获取Gitea配置 """ config = gitea_service.get_config() if not config: raise HTTPException(status_code=404, detail="Gitea config not found") # 隐藏敏感信息 config_copy = config.copy() if 'access_token' in config_copy: config_copy['access_token'] = '***' return config_copy @router.post("/config") async def set_gitea_config( config: Dict[str, Any], current_user: dict = Depends(get_current_active_user) ): """ 设置Gitea配置 """ # 验证配置 required_fields = ['server_url', 'access_token', 'default_owner'] for field in required_fields: if field not in config or not config[field]: raise HTTPException(status_code=400, detail=f"Missing required field: {field}") # 保存配置 success = gitea_service.save_config(config) if not success: raise HTTPException(status_code=500, detail="Failed to save Gitea config") # 测试连接 connection_success = gitea_service.test_connection() return { "success": True, "message": "Gitea config saved successfully", "connection_test": "success" if connection_success else "failed" } @router.get("/test-connection") async def test_gitea_connection( current_user: dict = Depends(get_current_active_user) ): """ 测试Gitea连接 """ success = gitea_service.test_connection() if not success: raise HTTPException(status_code=500, detail="Failed to connect to Gitea server") return { "success": True, "message": "Connected to Gitea server successfully" } @router.get("/repos") async def list_gitea_repositories( owner: Optional[str] = None, current_user: dict = Depends(get_current_active_user) ): """ 列出Gitea仓库 """ repos = gitea_service.list_repositories(owner) if repos is None: raise HTTPException(status_code=500, detail="Failed to list repositories") return { "success": True, "repositories": repos } @router.post("/repos/create") async def create_gitea_repository( algorithm_id: str = Body(..., description="算法ID"), algorithm_name: str = Body(..., description="算法名称"), description: str = Body("", description="仓库描述"), current_user: dict = Depends(get_current_active_user) ): """ 创建Gitea仓库 """ repo = gitea_service.create_repository(algorithm_id, algorithm_name, description) if not repo: raise HTTPException(status_code=500, detail="Failed to create repository") return { "success": True, "repository": repo } @router.post("/repos/clone") async def clone_gitea_repository( repo_url: str = Body(..., description="仓库URL"), algorithm_id: str = Body(..., description="算法ID"), branch: str = Body("main", description="分支名称"), current_user: dict = Depends(get_current_active_user) ): """ 克隆Gitea仓库 """ success = gitea_service.clone_repository(repo_url, algorithm_id, branch) if not success: # 即使克隆失败,也尝试继续执行,因为我们可能已经初始化了仓库 logger.info("Clone failed, but continuing with existing repository setup") return { "success": True, "message": "Repository cloned or initialized successfully" } @router.post("/repos/push") async def push_to_gitea_repository( algorithm_id: str = Body(..., description="算法ID"), message: str = Body("Update code", description="提交消息"), current_user: dict = Depends(get_current_active_user) ): """ 推送代码到Gitea仓库 """ success = gitea_service.push_to_repository(algorithm_id, message) if not success: raise HTTPException(status_code=500, detail="Failed to push code") # 验证推送是否成功 verify_success = gitea_service.verify_push(algorithm_id) if not verify_success: logger.warning(f"Push completed but verification failed for algorithm: {algorithm_id}") return { "success": True, "message": "Code pushed but verification failed", "verified": False } return { "success": True, "message": "Code pushed successfully", "verified": True } @router.post("/repos/upload", dependencies=[Depends(get_current_active_user)]) async def upload_files_to_repository( files: list[UploadFile] = File(..., description="上传的文件列表"), algorithm_id: str = Form(..., description="算法ID") ): """ 上传文件到仓库(支持大量文件) """ try: logger.info("=== 开始上传文件 ===") logger.info(f"Received {len(files)} files for algorithm: {algorithm_id}") # 验证文件数量 MAX_FILES = 50000 if len(files) > MAX_FILES: logger.error(f"Too many files: {len(files)} (max: {MAX_FILES})") raise HTTPException( status_code=400, detail=f"Too many files. Maximum number of files is {MAX_FILES}." ) # 创建仓库目录 repo_dir = f"/tmp/algorithms/{algorithm_id}" logger.info(f"Repository directory: {repo_dir}") os.makedirs(repo_dir, exist_ok=True) logger.info(f"Created repository directory: {repo_dir}") # 保存上传的文件 logger.info("=== 保存上传的文件 ===") saved_files = [] # 分批处理文件,避免内存问题 batch_size = 100 # 每批处理100个文件 for batch_start in range(0, len(files), batch_size): batch_end = min(batch_start + batch_size, len(files)) batch = files[batch_start:batch_end] logger.info(f"Processing batch {batch_start//batch_size + 1}: files {batch_start+1} to {batch_end}") for i, file in enumerate(batch): # 为了获取文件内容,我们需要读取它 file_content = await file.read() # 获取文件路径(使用file.filename,它应该包含相对路径) file_path = os.path.join(repo_dir, file.filename) logger.info(f"Processing file {batch_start + i + 1}/{len(files)}: {file.filename}") logger.info(f" Target path: {file_path}") # 确保文件所在目录存在 os.makedirs(os.path.dirname(file_path), exist_ok=True) logger.info(f" Created directory: {os.path.dirname(file_path)}") # 保存文件 try: with open(file_path, "wb") as f: f.write(file_content) file_stats = os.stat(file_path) logger.info(f" File size: {file_stats.st_size} bytes") logger.info(f" ✅ File saved successfully: {file_path}") saved_files.append(file_path) except Exception as file_error: logger.error(f" ❌ Failed to save file {file.filename}: {str(file_error)}") raise logger.info(f"=== 文件上传完成 ===") logger.info(f"Successfully saved {len(saved_files)} files to repository") return { "success": True, "message": "Files uploaded successfully", "saved_files": saved_files, "total_files": len(files) } except Exception as e: logger.error(f"=== 上传文件失败 ===") logger.error(f"Error: {str(e)}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") raise HTTPException(status_code=500, detail=f"Failed to upload files: {str(e)}") @router.post("/repos/pull") async def pull_from_gitea_repository( algorithm_id: str = Body(..., description="算法ID"), current_user: dict = Depends(get_current_active_user) ): """ 从Gitea仓库拉取代码 """ success = gitea_service.pull_from_repository(algorithm_id) if not success: raise HTTPException(status_code=500, detail="Failed to pull code") return { "success": True, "message": "Code pulled successfully" } @router.patch("/repos/update") async def update_gitea_repository( algorithm_id: str = Body(..., description="算法ID"), name: Optional[str] = Body(None, description="新的仓库名称"), description: Optional[str] = Body(None, description="新的仓库描述"), private: Optional[bool] = Body(None, description="是否私有"), current_user: dict = Depends(get_current_active_user) ): """ 更新Gitea仓库信息 """ updated_repo = gitea_service.update_repository_info(algorithm_id, name, description, private) if not updated_repo: raise HTTPException(status_code=500, detail="Failed to update repository info") return { "success": True, "message": "Repository info updated successfully", "repository": updated_repo } @router.post("/repos/register") async def register_algorithm_from_repository( repo_owner: str = Body(..., description="仓库所有者"), repo_name: str = Body(..., description="仓库名称"), algorithm_id: str = Body(..., description="算法ID"), current_user: dict = Depends(get_current_active_user) ): """ 从仓库注册算法服务 """ success = gitea_service.register_algorithm_from_repo(repo_owner, repo_name, algorithm_id) if not success: raise HTTPException(status_code=500, detail="Failed to register algorithm from repository") return { "success": True, "message": "Algorithm registered from repository successfully" } @router.get("/repos/{repo_owner}/{repo_name}") async def get_gitea_repository_info( repo_owner: str, repo_name: str, current_user: dict = Depends(get_current_active_user) ): """ 获取仓库信息 """ repo = gitea_service.get_repository_info(repo_owner, repo_name) if not repo: raise HTTPException(status_code=404, detail="Repository not found") return repo