# 文件名: npu_yolo_onnx.py import cv2 import numpy as np import onnxruntime as ort import os import time def letterbox(img, new_shape=(640, 640), color=(114, 114, 114)): shape = img.shape[:2] # h, w r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] dw /= 2 dh /= 2 if shape[::-1] != new_unpad: img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR) top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) return img, r, (dw, dh) class YOLOv8_ONNX: def __init__(self, onnx_path, conf_threshold=0.25, iou_threshold=0.45, preprocess_size_1=640,preprocess_size_2=640): # 使用 CANNExecutionProvider providers = [("CANNExecutionProvider", { "device_id": 0, "arena_extend_strategy": "kNextPowerOfTwo", "npu_mem_limit": 16 * 1024 * 1024 * 1024, "precision_mode": "allow_fp32_to_fp16", # 修改:不降精度:must_keep_origin_dtype "op_select_impl_mode": "high_precision", "enable_cann_graph": True, }), "CUDAExecutionProvider", "CPUExecutionProvider", # 自动 fallback ] # 创建 Session(ORT 自动忽略不存在的 EP,不会抛异常) self.session = ort.InferenceSession(onnx_path, providers=providers) # 获取真实工作 provider actual_providers = self.session.get_providers() print("YOLO Providers:", actual_providers) if "CANNExecutionProvider" in actual_providers: print("[INFO] YOLO 使用 CANNExecutionProvider(昇腾)") else: print("[INFO] YOLO 使用 CPUExecutionProvider(非昇腾环境)") self.conf_threshold = conf_threshold self.iou_threshold = iou_threshold self.input_name = self.session.get_inputs()[0].name self.preprocess_size_1 = preprocess_size_1 self.preprocess_size_2 = preprocess_size_2 print(f"YOLO模型输入名称: {self.input_name}") print(f"YOLO模型输入形状: {self.session.get_inputs()[0].shape}") print(f"YOLO模型输出形状: {self.session.get_outputs()[0].shape}") def preprocess(self, img): self.orig_shape = img.shape[:2] img, self.ratio, (self.dw, self.dh) = letterbox(img, (self.preprocess_size_1, self.preprocess_size_2)) # ===== 新增:保存letterbox处理后的图像 ===== # 确保保存目录存在(如不存在则创建) # save_dir = "../YOLO_Pipe_results" # os.makedirs(save_dir, exist_ok=True) # # 生成唯一文件名(例如按时间戳命名,避免覆盖) # timestamp = int(time.time() * 1000) # 毫秒级时间戳 # save_path = os.path.join(save_dir, f"letterbox_{timestamp}.jpg") # # 注意:letterbox处理后的img是BGR格式(因为输入的img是BGR,letterbox未改变通道顺序) # cv2.imwrite(save_path, img) # print(f"letterbox处理后的图像已保存至:{save_path}") # ========================================== img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.transpose(2, 0, 1).astype(np.float32) img /= 255.0 img = np.expand_dims(img, axis=0) # (1,3,640,640) return img def postprocess_v8(self, pred, im0_shape): x = pred[0].T # (8400, 84) boxes = x[:, :4] scores = x[:, 4:] # 80 classes class_ids = np.argmax(scores, axis=1) conf = scores[np.arange(len(scores)), class_ids] # 只保留 person(COCO class 0) mask = (class_ids == 0) & (conf > self.conf_threshold) if not mask.any(): return [] boxes = boxes[mask] conf = conf[mask] class_ids = class_ids[mask] # xywh → xyxy x1 = boxes[:, 0] - boxes[:, 2] / 2 y1 = boxes[:, 1] - boxes[:, 3] / 2 x2 = boxes[:, 0] + boxes[:, 2] / 2 y2 = boxes[:, 1] + boxes[:, 3] / 2 # 去 letterbox x1 = (x1 - self.dw) / self.ratio y1 = (y1 - self.dh) / self.ratio x2 = (x2 - self.dw) / self.ratio y2 = (y2 - self.dh) / self.ratio x1 = np.clip(x1, 0, im0_shape[1]) y1 = np.clip(y1, 0, im0_shape[0]) x2 = np.clip(x2, 0, im0_shape[1]) y2 = np.clip(y2, 0, im0_shape[0]) bboxes = np.stack([x1, y1, x2, y2], axis=1) indices = cv2.dnn.NMSBoxes( bboxes.tolist(), conf.tolist(), self.conf_threshold, self.iou_threshold ) results = [] if len(indices) > 0: indices = indices.flatten() for i in indices: results.append([ int(bboxes[i, 0]), int(bboxes[i, 1]), int(bboxes[i, 2]), int(bboxes[i, 3]), float(conf[i]), 0 # person ]) return results def __call__(self, frame): # ===== 前处理计时 ===== t_pre_start = time.perf_counter() input_data = self.preprocess(frame) t_pre_end = time.perf_counter() # ===== 推理计时 ===== t_inf_start = time.perf_counter() pred = self.session.run(None, {self.input_name: input_data})[0] t_inf_end = time.perf_counter() # ===== 后处理计时 ===== t_post_start = time.perf_counter() results = self.postprocess_v8(pred, frame.shape) t_post_end = time.perf_counter() # ===== 打印耗时(毫秒)===== pre_ms = (t_pre_end - t_pre_start) * 1000 inf_ms = (t_inf_end - t_inf_start) * 1000 post_ms = (t_post_end - t_post_start) * 1000 total_ms = pre_ms + inf_ms + post_ms # print( # f"[YOLO] Pre:{pre_ms:6.2f}ms | Inf:{inf_ms:6.2f}ms | Post:{post_ms:6.2f}ms | Total:{total_ms:6.2f}ms | Dets:{len(results)}") return results