Files
algorithm/backend/app/services/project_analyzer.py

326 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""项目分析服务,用于分析算法仓库的结构和特性"""
import os
import re
import json
from typing import Dict, List, Optional, Any
class ProjectAnalyzer:
"""项目分析服务"""
def analyze_project(self, repo_path: str) -> Dict[str, Any]:
"""分析项目结构和特性
Args:
repo_path: 仓库路径
Returns:
包含项目分析结果的字典
"""
try:
# 1. 识别项目类型
project_type = self._detect_project_type(repo_path)
# 2. 分析依赖
dependencies = self._analyze_dependencies(repo_path, project_type)
# 3. 识别入口点
entry_point = self._detect_entry_point(repo_path, project_type)
# 4. 分析API模式
api_pattern = self._detect_api_pattern(repo_path, project_type)
# 5. 分析项目结构
structure = self._analyze_structure(repo_path)
return {
"success": True,
"project_type": project_type,
"dependencies": dependencies,
"entry_point": entry_point,
"api_pattern": api_pattern,
"structure": structure,
"error": None
}
except Exception as e:
return {
"success": False,
"error": str(e),
"project_type": None,
"dependencies": None,
"entry_point": None,
"api_pattern": None,
"structure": None
}
def _detect_project_type(self, repo_path: str) -> Optional[str]:
"""检测项目类型
Args:
repo_path: 仓库路径
Returns:
项目类型,如 "python", "java", "nodejs"
"""
# 检查Python项目 - 先检查根目录
if os.path.exists(os.path.join(repo_path, "requirements.txt")) or \
os.path.exists(os.path.join(repo_path, "pyproject.toml")) or \
any(file.endswith(".py") for file in os.listdir(repo_path)):
return "python"
# 检查Python项目 - 递归检查子目录
for root, dirs, files in os.walk(repo_path):
if "requirements.txt" in files or "pyproject.toml" in files:
return "python"
if any(file.endswith(".py") for file in files):
return "python"
# 检查Java项目 - 先检查根目录
if os.path.exists(os.path.join(repo_path, "pom.xml")) or \
os.path.exists(os.path.join(repo_path, "build.gradle")) or \
os.path.exists(os.path.join(repo_path, "src")):
return "java"
# 检查Java项目 - 递归检查子目录
for root, dirs, files in os.walk(repo_path):
if "pom.xml" in files or "build.gradle" in files:
return "java"
if "src" in dirs:
return "java"
# 检查Node.js项目 - 先检查根目录
if os.path.exists(os.path.join(repo_path, "package.json")):
return "nodejs"
# 检查Node.js项目 - 递归检查子目录
for root, dirs, files in os.walk(repo_path):
if "package.json" in files:
return "nodejs"
# 检查其他项目类型
if os.path.exists(os.path.join(repo_path, "CMakeLists.txt")):
return "c++"
return None
def _analyze_dependencies(self, repo_path: str, project_type: Optional[str]) -> List[str]:
"""分析项目依赖
Args:
repo_path: 仓库路径
project_type: 项目类型
Returns:
依赖列表
"""
dependencies = []
if project_type == "python":
# 分析requirements.txt
req_file = os.path.join(repo_path, "requirements.txt")
if os.path.exists(req_file):
with open(req_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
dependencies.append(line)
# 分析pyproject.toml
pyproject_file = os.path.join(repo_path, "pyproject.toml")
if os.path.exists(pyproject_file):
with open(pyproject_file, "r", encoding="utf-8") as f:
content = f.read()
# 简单解析依赖部分
if "[dependencies]" in content:
dep_section = content.split("[dependencies]")[1].split("[")[0]
for line in dep_section.strip().split("\n"):
line = line.strip()
if line and not line.startswith("#"):
dependencies.append(line)
elif project_type == "java":
# 分析pom.xml
pom_file = os.path.join(repo_path, "pom.xml")
if os.path.exists(pom_file):
with open(pom_file, "r", encoding="utf-8") as f:
content = f.read()
# 简单解析依赖
for match in re.finditer(r'<dependency>.*?</dependency>', content, re.DOTALL):
dep = match.group(0)
group_id = re.search(r'<groupId>(.*?)</groupId>', dep)
artifact_id = re.search(r'<artifactId>(.*?)</artifactId>', dep)
version = re.search(r'<version>(.*?)</version>', dep)
if group_id and artifact_id:
dep_str = f"{group_id.group(1)}:{artifact_id.group(1)}"
if version:
dep_str += f":{version.group(1)}"
dependencies.append(dep_str)
elif project_type == "nodejs":
# 分析package.json
package_file = os.path.join(repo_path, "package.json")
if os.path.exists(package_file):
with open(package_file, "r", encoding="utf-8") as f:
package_data = json.load(f)
if "dependencies" in package_data:
for dep, version in package_data["dependencies"].items():
dependencies.append(f"{dep}@{version}")
if "devDependencies" in package_data:
for dep, version in package_data["devDependencies"].items():
dependencies.append(f"{dep}@{version} (dev)")
return dependencies
def _detect_entry_point(self, repo_path: str, project_type: Optional[str]) -> Optional[str]:
"""检测项目入口点
Args:
repo_path: 仓库路径
project_type: 项目类型
Returns:
入口点路径或函数名
"""
if project_type == "python":
# 查找主要的Python文件
main_files = ["main.py", "app.py", "run.py", "server.py"]
for file in main_files:
file_path = os.path.join(repo_path, file)
if os.path.exists(file_path):
return file
# 查找包含__main__.py的包
for root, dirs, files in os.walk(repo_path):
if "__main__.py" in files:
return os.path.relpath(os.path.join(root, "__main__.py"), repo_path)
# 查找包含main函数的文件
for root, dirs, files in os.walk(repo_path):
for file in files:
if file.endswith(".py"):
file_path = os.path.join(root, file)
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
if "def main(" in content:
return os.path.relpath(file_path, repo_path)
except:
pass
elif project_type == "java":
# 查找包含main方法的Java文件
for root, dirs, files in os.walk(repo_path):
for file in files:
if file.endswith(".java"):
file_path = os.path.join(root, file)
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
if "public static void main(String[] args)" in content:
return os.path.relpath(file_path, repo_path)
except:
pass
elif project_type == "nodejs":
# 检查package.json中的main字段
package_file = os.path.join(repo_path, "package.json")
if os.path.exists(package_file):
with open(package_file, "r", encoding="utf-8") as f:
try:
package_data = json.load(f)
if "main" in package_data:
return package_data["main"]
elif "scripts" in package_data and "start" in package_data["scripts"]:
return f"package.json (start: {package_data['scripts']['start']})"
except:
pass
return None
def _detect_api_pattern(self, repo_path: str, project_type: Optional[str]) -> Optional[str]:
"""检测API模式
Args:
repo_path: 仓库路径
project_type: 项目类型
Returns:
API模式"fastapi", "flask", "express"
"""
if project_type == "python":
# 检查FastAPI
for root, dirs, files in os.walk(repo_path):
for file in files:
if file.endswith(".py"):
file_path = os.path.join(root, file)
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
if "from fastapi import" in content or "import fastapi" in content:
return "fastapi"
elif "from flask import" in content or "import flask" in content:
return "flask"
elif "from django import" in content or "import django" in content:
return "django"
except:
pass
elif project_type == "nodejs":
# 检查Express
package_file = os.path.join(repo_path, "package.json")
if os.path.exists(package_file):
with open(package_file, "r", encoding="utf-8") as f:
try:
package_data = json.load(f)
dependencies = package_data.get("dependencies", {})
if "express" in dependencies:
return "express"
elif "koa" in dependencies:
return "koa"
elif "nestjs" in dependencies:
return "nestjs"
except:
pass
return None
def _analyze_structure(self, repo_path: str) -> Dict[str, Any]:
"""分析项目结构
Args:
repo_path: 仓库路径
Returns:
项目结构字典
"""
structure = {
"files": [],
"directories": [],
"size": 0
}
for root, dirs, files in os.walk(repo_path):
# 排除隐藏目录和文件
dirs[:] = [d for d in dirs if not d.startswith(".")]
files = [f for f in files if not f.startswith(".")]
# 添加目录
for dir_name in dirs:
dir_path = os.path.join(root, dir_name)
structure["directories"].append(os.path.relpath(dir_path, repo_path))
# 添加文件
for file_name in files:
file_path = os.path.join(root, file_name)
try:
file_size = os.path.getsize(file_path)
structure["files"].append({
"path": os.path.relpath(file_path, repo_path),
"size": file_size
})
structure["size"] += file_size
except:
pass
return structure