# 文件名: npu_yolo_onnx.py import cv2 import numpy as np import onnxruntime as ort import os import time def letterbox(img, new_shape=(640, 640), color=(114, 114, 114)): shape = img.shape[:2] # h, w r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] dw /= 2 dh /= 2 if shape[::-1] != new_unpad: img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR) top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) return img, r, (dw, dh) class YOLOv8_ONNX: def __init__(self, onnx_path, conf_threshold=0.25, iou_threshold=0.45): # 使用 CANNExecutionProvider providers = [("CANNExecutionProvider", { "device_id": 0, "arena_extend_strategy": "kNextPowerOfTwo", "npu_mem_limit": 16 * 1024 * 1024 * 1024, "precision_mode": "allow_fp32_to_fp16", # 修改:不降精度:must_keep_origin_dtype "op_select_impl_mode": "high_precision", "enable_cann_graph": True, })] # 创建 Session(ORT 自动忽略不存在的 EP,不会抛异常) self.session = ort.InferenceSession(onnx_path, providers=providers) # 获取真实工作 provider actual_providers = self.session.get_providers() print("YOLO Providers:", actual_providers) if "CANNExecutionProvider" in actual_providers: print("[INFO] YOLO 使用 CANNExecutionProvider(昇腾)") else: print("[INFO] YOLO 使用 CPUExecutionProvider(非昇腾环境)") self.conf_threshold = conf_threshold self.iou_threshold = iou_threshold self.input_name = self.session.get_inputs()[0].name print(f"YOLO模型输入名称: {self.input_name}") print(f"YOLO模型输入形状: {self.session.get_inputs()[0].shape}") print(f"YOLO模型输出形状: {self.session.get_outputs()[0].shape}") def preprocess(self, img): self.orig_shape = img.shape[:2] img, self.ratio, (self.dw, self.dh) = letterbox(img, (640, 640)) # ===== 新增:保存letterbox处理后的图像 ===== # 确保保存目录存在(如不存在则创建) # save_dir = "../YOLO_Pipe_results" # os.makedirs(save_dir, exist_ok=True) # # 生成唯一文件名(例如按时间戳命名,避免覆盖) # timestamp = int(time.time() * 1000) # 毫秒级时间戳 # save_path = os.path.join(save_dir, f"letterbox_{timestamp}.jpg") # # 注意:letterbox处理后的img是BGR格式(因为输入的img是BGR,letterbox未改变通道顺序) # cv2.imwrite(save_path, img) # print(f"letterbox处理后的图像已保存至:{save_path}") # ========================================== img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.transpose(2, 0, 1).astype(np.float32) img /= 255.0 img = np.expand_dims(img, axis=0) # (1,3,640,640) return img def postprocess_v8(self, pred, im0_shape): """ 根据测试结果调整的后处理 输出格式: [x_center, y_center, width, height, class0_score, class1_score] """ # pred 形状: (1, 6, 8400) #print(f"【YOLO调试】原始输出形状: {pred.shape}") # 转置: (1,6,8400) -> (8400,6) x = pred[0].T #print(f"【YOLO调试】转置后形状: {x.shape}") # 提取坐标和类别分数 boxes = x[:, :4] # [x_center, y_center, width, height] scores = x[:, 4:6] # [class0_score, class1_score] # 置信度 = 两个类别分数的最大值 conf = np.max(scores, axis=1) # 类别 = 最大值的索引 (0=supervisor, 1=suspect) class_pred = np.argmax(scores, axis=1) # 阈值过滤 mask = conf > self.conf_threshold if not mask.any(): #print(f"【YOLO调试】没有检测到超过阈值 {self.conf_threshold} 的目标") return [] boxes = boxes[mask] conf = conf[mask] class_pred = class_pred[mask] #print(f"【YOLO调试】阈值过滤后: {len(boxes)} 个目标") # if len(class_pred) > 0: # print(f"【YOLO调试】类别分布: 0={np.sum(class_pred == 0)}(supervisor), 1={np.sum(class_pred == 1)}(suspect)") # 中心坐标转角点坐标 x1 = boxes[:, 0] - boxes[:, 2] / 2 y1 = boxes[:, 1] - boxes[:, 3] / 2 x2 = boxes[:, 0] + boxes[:, 2] / 2 y2 = boxes[:, 1] + boxes[:, 3] / 2 # 去掉letterbox的padding,缩放到原始图像尺寸 x1 = (x1 - self.dw) / self.ratio y1 = (y1 - self.dh) / self.ratio x2 = (x2 - self.dw) / self.ratio y2 = (y2 - self.dh) / self.ratio # clip到图像边界 x1 = np.clip(x1, 0, im0_shape[1]) y1 = np.clip(y1, 0, im0_shape[0]) x2 = np.clip(x2, 0, im0_shape[1]) y2 = np.clip(y2, 0, im0_shape[0]) # 准备NMS bboxes = np.stack([x1, y1, x2, y2], axis=1) # 执行NMS indices = cv2.dnn.NMSBoxes( bboxes.tolist(), conf.tolist(), score_threshold=self.conf_threshold, nms_threshold=self.iou_threshold ) #print(f"【YOLO调试】NMS后保留: {len(indices) if indices is not None else 0} 个目标") result = [] if len(indices) > 0: indices = indices.flatten() if isinstance(indices, np.ndarray) else [i[0] for i in indices] # 统计NMS后的类别分布 final_classes = [] supervisor_count = 0 suspect_count = 0 for i in indices: cls_id = int(class_pred[i]) if cls_id == 0: supervisor_count += 1 final_classes.append("supervisor") else: suspect_count += 1 final_classes.append("suspect") result.append([ int(bboxes[i, 0]), int(bboxes[i, 1]), int(bboxes[i, 2]), int(bboxes[i, 3]), float(conf[i]), cls_id ]) #print(f"【YOLO调试】最终类别分布: supervisor={supervisor_count}, suspect={suspect_count}") #print(f"【YOLO调试】最终检测详情:") # for i, idx in enumerate(indices): # print( # f" 目标{i + 1}: {final_classes[i]}, 置信度{conf[idx]:.3f}, 坐标({int(bboxes[idx, 0])},{int(bboxes[idx, 1])},{int(bboxes[idx, 2])},{int(bboxes[idx, 3])})") return result def __call__(self, frame): input_data = self.preprocess(frame) pred = self.session.run(None, {self.input_name: input_data})[0] return self.postprocess_v8(pred, frame.shape)