"""项目分析服务,用于分析算法仓库的结构和特性""" import os import re import json from typing import Dict, List, Optional, Any class ProjectAnalyzer: """项目分析服务""" def analyze_project(self, repo_path: str) -> Dict[str, Any]: """分析项目结构和特性 Args: repo_path: 仓库路径 Returns: 包含项目分析结果的字典 """ try: # 1. 识别项目类型 project_type = self._detect_project_type(repo_path) # 2. 分析依赖 dependencies = self._analyze_dependencies(repo_path, project_type) # 3. 识别入口点 entry_point = self._detect_entry_point(repo_path, project_type) # 4. 分析API模式 api_pattern = self._detect_api_pattern(repo_path, project_type) # 5. 分析项目结构 structure = self._analyze_structure(repo_path) return { "success": True, "project_type": project_type, "dependencies": dependencies, "entry_point": entry_point, "api_pattern": api_pattern, "structure": structure, "error": None } except Exception as e: return { "success": False, "error": str(e), "project_type": None, "dependencies": None, "entry_point": None, "api_pattern": None, "structure": None } def _detect_project_type(self, repo_path: str) -> Optional[str]: """检测项目类型 Args: repo_path: 仓库路径 Returns: 项目类型,如 "python", "java", "nodejs" 等 """ # 检查Python项目 - 先检查根目录 if os.path.exists(os.path.join(repo_path, "requirements.txt")) or \ os.path.exists(os.path.join(repo_path, "pyproject.toml")) or \ any(file.endswith(".py") for file in os.listdir(repo_path)): return "python" # 检查Python项目 - 递归检查子目录 for root, dirs, files in os.walk(repo_path): if "requirements.txt" in files or "pyproject.toml" in files: return "python" if any(file.endswith(".py") for file in files): return "python" # 检查Java项目 - 先检查根目录 if os.path.exists(os.path.join(repo_path, "pom.xml")) or \ os.path.exists(os.path.join(repo_path, "build.gradle")) or \ os.path.exists(os.path.join(repo_path, "src")): return "java" # 检查Java项目 - 递归检查子目录 for root, dirs, files in os.walk(repo_path): if "pom.xml" in files or "build.gradle" in files: return "java" if "src" in dirs: return "java" # 检查Node.js项目 - 先检查根目录 if os.path.exists(os.path.join(repo_path, "package.json")): return "nodejs" # 检查Node.js项目 - 递归检查子目录 for root, dirs, files in os.walk(repo_path): if "package.json" in files: return "nodejs" # 检查其他项目类型 if os.path.exists(os.path.join(repo_path, "CMakeLists.txt")): return "c++" return None def _analyze_dependencies(self, repo_path: str, project_type: Optional[str]) -> List[str]: """分析项目依赖 Args: repo_path: 仓库路径 project_type: 项目类型 Returns: 依赖列表 """ dependencies = [] if project_type == "python": # 分析requirements.txt req_file = os.path.join(repo_path, "requirements.txt") if os.path.exists(req_file): with open(req_file, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line and not line.startswith("#"): dependencies.append(line) # 分析pyproject.toml pyproject_file = os.path.join(repo_path, "pyproject.toml") if os.path.exists(pyproject_file): with open(pyproject_file, "r", encoding="utf-8") as f: content = f.read() # 简单解析依赖部分 if "[dependencies]" in content: dep_section = content.split("[dependencies]")[1].split("[")[0] for line in dep_section.strip().split("\n"): line = line.strip() if line and not line.startswith("#"): dependencies.append(line) elif project_type == "java": # 分析pom.xml pom_file = os.path.join(repo_path, "pom.xml") if os.path.exists(pom_file): with open(pom_file, "r", encoding="utf-8") as f: content = f.read() # 简单解析依赖 for match in re.finditer(r'.*?', content, re.DOTALL): dep = match.group(0) group_id = re.search(r'(.*?)', dep) artifact_id = re.search(r'(.*?)', dep) version = re.search(r'(.*?)', dep) if group_id and artifact_id: dep_str = f"{group_id.group(1)}:{artifact_id.group(1)}" if version: dep_str += f":{version.group(1)}" dependencies.append(dep_str) elif project_type == "nodejs": # 分析package.json package_file = os.path.join(repo_path, "package.json") if os.path.exists(package_file): with open(package_file, "r", encoding="utf-8") as f: package_data = json.load(f) if "dependencies" in package_data: for dep, version in package_data["dependencies"].items(): dependencies.append(f"{dep}@{version}") if "devDependencies" in package_data: for dep, version in package_data["devDependencies"].items(): dependencies.append(f"{dep}@{version} (dev)") return dependencies def _detect_entry_point(self, repo_path: str, project_type: Optional[str]) -> Optional[str]: """检测项目入口点 Args: repo_path: 仓库路径 project_type: 项目类型 Returns: 入口点路径或函数名 """ if project_type == "python": # 查找主要的Python文件 main_files = ["main.py", "app.py", "run.py", "server.py"] for file in main_files: file_path = os.path.join(repo_path, file) if os.path.exists(file_path): return file # 查找包含__main__.py的包 for root, dirs, files in os.walk(repo_path): if "__main__.py" in files: return os.path.relpath(os.path.join(root, "__main__.py"), repo_path) # 查找包含main函数的文件 for root, dirs, files in os.walk(repo_path): for file in files: if file.endswith(".py"): file_path = os.path.join(root, file) try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() if "def main(" in content: return os.path.relpath(file_path, repo_path) except: pass elif project_type == "java": # 查找包含main方法的Java文件 for root, dirs, files in os.walk(repo_path): for file in files: if file.endswith(".java"): file_path = os.path.join(root, file) try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() if "public static void main(String[] args)" in content: return os.path.relpath(file_path, repo_path) except: pass elif project_type == "nodejs": # 检查package.json中的main字段 package_file = os.path.join(repo_path, "package.json") if os.path.exists(package_file): with open(package_file, "r", encoding="utf-8") as f: try: package_data = json.load(f) if "main" in package_data: return package_data["main"] elif "scripts" in package_data and "start" in package_data["scripts"]: return f"package.json (start: {package_data['scripts']['start']})" except: pass return None def _detect_api_pattern(self, repo_path: str, project_type: Optional[str]) -> Optional[str]: """检测API模式 Args: repo_path: 仓库路径 project_type: 项目类型 Returns: API模式,如 "fastapi", "flask", "express" 等 """ if project_type == "python": # 检查FastAPI for root, dirs, files in os.walk(repo_path): for file in files: if file.endswith(".py"): file_path = os.path.join(root, file) try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() if "from fastapi import" in content or "import fastapi" in content: return "fastapi" elif "from flask import" in content or "import flask" in content: return "flask" elif "from django import" in content or "import django" in content: return "django" except: pass elif project_type == "nodejs": # 检查Express package_file = os.path.join(repo_path, "package.json") if os.path.exists(package_file): with open(package_file, "r", encoding="utf-8") as f: try: package_data = json.load(f) dependencies = package_data.get("dependencies", {}) if "express" in dependencies: return "express" elif "koa" in dependencies: return "koa" elif "nestjs" in dependencies: return "nestjs" except: pass return None def _analyze_structure(self, repo_path: str) -> Dict[str, Any]: """分析项目结构 Args: repo_path: 仓库路径 Returns: 项目结构字典 """ structure = { "files": [], "directories": [], "size": 0 } for root, dirs, files in os.walk(repo_path): # 排除隐藏目录和文件 dirs[:] = [d for d in dirs if not d.startswith(".")] files = [f for f in files if not f.startswith(".")] # 添加目录 for dir_name in dirs: dir_path = os.path.join(root, dir_name) structure["directories"].append(os.path.relpath(dir_path, repo_path)) # 添加文件 for file_name in files: file_path = os.path.join(root, file_name) try: file_size = os.path.getsize(file_path) structure["files"].append({ "path": os.path.relpath(file_path, repo_path), "size": file_size }) structure["size"] += file_size except: pass return structure