diff --git a/src/test_insightface_group.py b/src/test_insightface_group.py new file mode 100644 index 0000000..79bbb18 --- /dev/null +++ b/src/test_insightface_group.py @@ -0,0 +1,309 @@ +import cv2 +import insightface +import numpy as np +import os +import datetime +import argparse +from insightface.app import FaceAnalysis + +# 设备配置映射(NPU采用用户指定的完整参数) +DEVICE_CONFIG = { + "cpu": (['CPUExecutionProvider'], -1), + "gpu": (['CUDAExecutionProvider'], 0), + "npu": ( + [ + ( + "CANNExecutionProvider", + { + "device_id": 1, + "arena_extend_strategy": "kNextPowerOfTwo", + "npu_mem_limit": 16*1024*1024*1024, + "op_select_impl_mode": "high_precision", + "precision_mode": "allow_fp32_to_fp16", + "enable_cann_graph": True, + + }, + ), + "CPUExecutionProvider" + ], + 0 + ) +} +#allow_fp32_to_fp16 +#核心配置参数 +THRESHOLD = 0.65 +IMAGE_EXTENSIONS = ('.jpg','.jepg','.png','.bmp','.gif') +NPU_REQUIREMENTS = { + "依赖包": "onnxruntime-cann(华为官方)+ onnxruntime(基础)", + "驱动要求": "Ascend CANN Toolkit ≥ 5.0.3", + "硬件要求": "华为昇腾芯片(如Ascend 310/910)", + "文档链接": "https://onnxruntime.ai/docs/execution-providers/community-maintained/CANN-ExecutionProvider.html" +} + +# Initialize face analysis model +#app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider']) # Use 'CUDAExecutionProvider' for GPU +#app.prepare(ctx_id=-1) # ctx_id=-1 for CPU, 0 for GPU + +def parse_args(): + """解析命令行参数""" + parser = argparse.ArgumentParser(description="人脸特征提取与同一人照片分组工具(支持CPU/GPU/NPU)") + parser.add_argument("-d","--device", type=str, choices=DEVICE_CONFIG.keys(), + default="cpu", help=f"指定运行设备") + parser.add_argument("-t","--threshold", type=float, default=THRESHOLD, help=f"相似度阈值,值越大匹配越严格") + parser.add_argument("--npu-device-id", type=int, default="0", help=f"覆盖NPU设备ID") + parser.add_argument("--npu-mem-limit", type=int, default="16", help=f"NPU内存限制") + args = parser.parse_args() + return args + +def get_face_embedding(image_path, app): + """Extract face embedding from an image""" + img = cv2.imread(image_path) + if img is None: + raise ValueError(f"Could not read image: {image_path}") + + faces = app.get(img) + + if len(faces) < 1: + raise ValueError("No faces detected in the image") + if len(faces) > 1: + print("Warning: Multiple faces detected. Using first detected face") + + return faces[0].embedding + +def compare_faces(emb1, emb2, threshold): # Adjust this threshold according to your usecase. + """Compare two embeddings using cosine similarity""" + similarity = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2)) + return similarity, similarity > threshold + +def get_all_images_files(directory="."): + """获取目录下所有支持的图片文件""" + image_files = [] + for filename in os.listdir(directory): + file_path = os.path.abspath(filename) + if filename.lower().endswith(IMAGE_EXTENSIONS) and os.path.isfile(file_path): + image_files.append(file_path) + + return sorted(image_files) + +def find_same_person_groups(embedding_dict, threshold): + """基于特征向量分组同一人照片""" + ungrouped_files = list(embedding_dict.keys()) + same_person_groups = [] + single_groups = [] + + while ungrouped_files: + current_file =ungrouped_files.pop(0) + current_emb = embedding_dict[current_file] + current_group = [current_file] + to_remove = [] + + for candidate_file in ungrouped_files: + candidate_emb = embedding_dict[candidate_file] + similarity, is_same = compare_faces(current_emb, candidate_emb, threshold) + if is_same: + current_group.append(candidate_file) + to_remove.append(candidate_file) + print(f"匹配成功:{os.path.basename(current_file)}和{os.path.basename(candidate_file)}(相似度:{similarity:.4f})") + for file in to_remove: + ungrouped_files.remove(file) + + if len(current_group) >= 2: + same_person_groups.append(current_group) + else: + single_groups.append(current_group) + + return same_person_groups, single_groups + +def generate_log_filename(device): + """生成带设备类型和时间戳的日志文件名""" + timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') + return f"face_embeddings_{device}_{timestamp}.log" +def write_embedding_log(log_entries, args, npu_config, log_filename): + """写入详细日志""" + with open(log_filename, 'w', encoding='utf-8') as f: + f.write("="*80 + "\n") + f.write(f"人脸特征向量分析日志\n") + f.write(f"生成时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write(f"运行设备:{args.device}\n") + if args.device == "npu": + f.write(f"NPU配置:{npu_config}\n") + f.write(f"相似度阈值:{args.threshold}\n") + f.write(f"支持图片格式:{','.join(IMAGE_EXTENSIONS)}\n") + f.write(f"处理图片总数:{len(log_entries)}\n") + f.write("="*80 + "\n\n") + + for entry in log_entries: + f.write(entry + "\n" + "-"*60 + "\n\n") + + print(f"\n 日志文件已保存至:{os.path.abspath(log_filename)}") + +def check_npu_environment(): + """检查NPU环境是否满足要求(仅检查核心依赖,不验证环境变量)""" + try: + import onnxruntime as ort + available_providers = ort.get_available_providers() + if 'CANNExecutionProvider' not in available_providers: + return False, "未检测到CANNExecutionProvider(请安装onnxruntime-cann包)" + + return True,"NPU核心依赖检查通过" + except ImportError: + return False, "未安装onnxruntime(基础依赖)" + except Exception as e: + return False, f"NPU环境检查失败: {str(e)}" + +def main(): + args = parse_args() + device = args.device + threshold = args.threshold + npu_device_id = args.npu_device_id + npu_mem_limit = args.npu_mem_limit*1024*1024*1024 + #生成带设备和时间戳的日志文件名 + log_filename = generate_log_filename(device) + + #打印启动信息 + print("人脸特征提取与同一人照片分组工具(支持CPU/GPU/NPU)") + print("="*80) + print(f"核心配置:") + print(f"运行设备:{device}") + print(f"相似度阈值:{threshold}") + print(f"搜索目录:{os.getcwd()}") + print(f"支持格式:{','.join(IMAGE_EXTENSIONS)}") + print(f"日志文件:{log_filename}") + if device == "npu": + print(f"NPU设备ID:{npu_device_id}") + print(f"NPU内存限制:{args.npu_mem_limit}GB") + print("="*80 + "\n") + + #设备环境预检查 + if device == "gpu": + available_providers = insightface.utils.get_available_providers() + if 'CUDAExecutionProvider' not in available_providers: + print(" 警告:未检测到CUDA环境,GPU模式可能运行失败!") + print(" 解决方案:1.安装CUDA≥11.0 + cuDNN 2.安装onnxruntime-gpu 3. 切换至cpu模式") + elif device == "npu": + npu_ok,npu_msg = check_npu_environment() + print(f" NPU环境检查:{npu_msg}") + if not npu_ok: + print(" NPU环境不满足要求,建议按以下步骤配置:") + for key, value in NPU_REQUIREMENTS.items(): + print(f" - {key}:{value}") + return + + + #初始化模型 + try: + providers, ctx_id = DEVICE_CONFIG[device] + npu_config = None + + if device == "npu": + npu_provider = list(providers[0]) + npu_provider[1]["device_id"] = npu_device_id + npu_provider[1]["npu_mem_limit"] = npu_mem_limit + providers[0] = tuple(npu_provider) + npu_config = providers[0][1] + ctx_id = npu_device_id + app = FaceAnalysis(name='buffalo_l', providers=providers) + else: + app = FaceAnalysis(name='buffalo_l', providers=providers) + + app.prepare(ctx_id=ctx_id) + print(f" 模型初始化成功(设备:{device}, ctx_id: {ctx_id})") + if device == "npu": + print(f" NPU最终配置:") + for key, value in npu_config.items(): + if key == "npu_mem_limit": + print(f" - {key}: {value/(1024*1024*1024)}GB") + else: + print(f" - {key}: {value}") + except Exception as e: + print(f"模型初始化失败!") + print(f"Error:{str(e)}") + print(f" 解决方案:") + if device == "gpu": + print(" 1.确认CUDA驱动已安装 2.确认onnxruntime-gpu版本与CUDA匹配 3.尝试切换CPU模式") + elif device == "npu": + print(" 1.确认CANN Toolkit已正确安装 2.确认onnxruntime-cann版本兼容 3.检查设备ID和内存限制是否合理") + return + + #1. 获取所有图片文件 + image_files = get_all_images_files() + if not image_files: + print("未找到任何支持的图片文件(检查目录下是否有jpg/png等格式图片)") + return + print(f" 找到{len(image_files)}个图片文件,开始提取特征向量...\n") + + #2. 提取特征向量并记录日志 + embedding_dict = {} + log_entries = [] + for img_path in image_files: + img_name = os.path.basename(img_path) + try: + emb = get_face_embedding(img_path, app) + embedding_dict[img_name] = emb + log_entry = f"【文件】:{img_name}\n" \ + f"【路径】:{img_path}\n" \ + f"【状态】:成功\n" \ + f"【特征向量维度】:{len(emb)}\n" \ + f"【特征向量】:{emb.tolist()}" + log_entries.append(log_entry) + print(f" 处理成功: {img_name}(特征向量维度:{len(emb)})") + except Exception as e: + log_entry = f"【文件】:{img_name}\n" \ + f"【路径】:{img_path}\n" \ + f"【状态】:失败\n" \ + f":【错误信息】:{str(e)}" + log_entries.append(log_entry) + print(f" 处理失败: {img_name} - 原因:{str(e)}") + + #3. 写入日志文件(使用动态生成的文件名) + write_embedding_log(log_entries, args, npu_config, log_filename) + + #4. 同一人分组分析 + if not embedding_dict: + print("\n 没有成功提取到人脸特征向量,无法进行分组分析") + return + + print(f"\n 开始分组分析(有效人脸数:{len(embedding_dict)})...") + same_groups, single_groups = find_same_person_groups(embedding_dict, threshold) + + #5. 输出分组结果 + print("\n" + "="*80) + print("同一人照片分组结果(每组≥2张):") + print("="*80) + if same_groups: + for i, group in enumerate(same_groups, 1): + group_names = [os.path.basename(file) for file in group] + print(f"组{i}:{', '.join(group_names)}") + else: + print(f" 未发现同一人的多张照片") + + print("\n" + "="*80) + print("无匹配的单独照片:") + #print("\n"*80) + if single_groups: + for group in single_groups: + print(f" - {os.path.basename(group[0])}") + else: + print(f"所有照片均已分组(无单独照片)") + print("\n" + "="*80) + print(f" 处理完成!详细日志请查看:{log_filename}") + +if __name__ == "__main__": + main() +# Paths to your Indian face images +#image1_path = "path/to/face1.jpg" +#image2_path = "path/to/face2.jpg" + +#try: + # Get embeddings +# emb1 = get_face_embedding(image1_path) +# emb2 = get_face_embedding(image2_path) + + # Compare faces +# similarity_score, is_same_person = compare_faces(emb1, emb2) + +# print(f"Similarity Score: {similarity_score:.4f}") +# print(f"Same person? {'YES' if is_same_person else 'NO'}") + +#except Exception as e: +# print(f"Error: {str(e)}")