# npu_yolo_pose_onnx.py # 修复要点: # 1. 正确处理 YOLOv8 Pose anchor 输出(避免 40+ 人) # 2. 关键点坐标正确逆 letterbox(减 padding 再除 ratio) # 3. visibility 使用 sigmoid # 4. NMS 后限制最大人数,保证工程稳定性 import cv2 import numpy as np import onnxruntime as ort from utils.logger import get_logger logger = get_logger(__name__) # ------------------------------------------------- # Letterbox # ------------------------------------------------- def letterbox(img, new_shape=(1280, 1280), color=(114, 114, 114)): shape = img.shape[:2] # h, w r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) new_unpad = (int(round(shape[1] * r)), int(round(shape[0] * r))) dw = new_shape[1] - new_unpad[0] dh = new_shape[0] - new_unpad[1] dw /= 2 dh /= 2 if shape[::-1] != new_unpad: img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR) top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) img = cv2.copyMakeBorder( img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color ) return img, r, (dw, dh) # ------------------------------------------------- # Pose Skeleton Definition (COCO-17) # ------------------------------------------------- POSE_SKELETON = [ (16,14),(14,12),(17,15),(15,13),(12,13), (6,12),(7,13),(6,7), (6,8),(7,9),(8,10),(9,11), (2,3),(1,2),(1,3), (2,4),(3,5),(4,6),(5,7) ] POSE_SKELETON = [(a-1, b-1) for (a, b) in POSE_SKELETON] POSE_COLORS = [ (255,0,0),(255,85,0),(255,170,0),(255,255,0), (170,255,0),(85,255,0),(0,255,0), (0,255,85),(0,255,170),(0,255,255), (0,170,255),(0,85,255),(0,0,255), (85,0,255),(170,0,255),(255,0,255),(255,0,170) ] # ------------------------------------------------- # YOLOv8 Pose ONNX # ------------------------------------------------- class YOLOv8_Pose_ONNX: def __init__( self, onnx_path, conf_threshold=0.6, # ★ 提高阈值,避免 anchor 噪声 iou_threshold=0.45, input_size=1280, max_persons=5 # ★ 限制最大人数 ): providers = [ ("CANNExecutionProvider", { "device_id": 0, "arena_extend_strategy": "kNextPowerOfTwo", "npu_mem_limit": 16 * 1024 * 1024 * 1024, "precision_mode": "allow_fp32_to_fp16", "op_select_impl_mode": "high_precision", "enable_cann_graph": True, }), "CUDAExecutionProvider", "CPUExecutionProvider", ] self.session = ort.InferenceSession(onnx_path, providers=providers) # 获取真实工作 provider actual_providers = self.session.get_providers() logger.info("YOLO Providers:", actual_providers) if "CANNExecutionProvider" in actual_providers: logger.info("[INFO] YOLO 使用 CANNExecutionProvider(昇腾)") elif 'CUDAExecutionProvider' in actual_providers: logger.info("[INFO] YOLO 使用 CUDAExecutionProvider(NVIDIA GPU)") else: logger.info("[INFO] YOLO 使用 CPUExecutionProvider(非昇腾环境)") self.conf_threshold = conf_threshold self.iou_threshold = iou_threshold self.max_persons = max_persons self.input_name = self.session.get_inputs()[0].name self.input_size = (input_size, input_size) logger.info(f"模型输入名称: {self.input_name}") logger.info(f"模型输入形状: {self.session.get_inputs()[0].shape}") logger.info(f"模型输出形状: {self.session.get_outputs()[0].shape}") def nms(self, boxes, scores, iou_threshold=0.45): """ boxes: [N,4] xyxy scores: [N] """ x1 = boxes[:, 0] y1 = boxes[:, 1] x2 = boxes[:, 2] y2 = boxes[:, 3] areas = (x2 - x1) * (y2 - y1) order = scores.argsort()[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(x1[i], x1[order[1:]]) yy1 = np.maximum(y1[i], y1[order[1:]]) xx2 = np.minimum(x2[i], x2[order[1:]]) yy2 = np.minimum(y2[i], y2[order[1:]]) w = np.maximum(0.0, xx2 - xx1) h = np.maximum(0.0, yy2 - yy1) inter = w * h ovr = inter / (areas[i] + areas[order[1:]] - inter) inds = np.where(ovr <= iou_threshold)[0] order = order[inds + 1] # 限制最大人数 if len(keep) >= self.max_persons: break return np.array(keep, dtype=int) # ------------------------------------------------- def preprocess(self, img): self.orig_shape = img.shape[:2] img, self.ratio, (self.dw, self.dh) = letterbox(img, self.input_size) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.transpose(2, 0, 1).astype(np.float32) / 255.0 img = np.expand_dims(img, axis=0) return img def postprocess(self, preds, im0_shape): """ preds: onnx output, shape = [1, 56, 33600] im0_shape: (h, w) of original frame """ preds = preds[0] # [56, 33600] preds = preds.transpose(1, 0) # [33600, 56] # ============================= # 1. 拆分输出 # ============================= boxes = preds[:, 0:4] # cx, cy, w, h (input scale) scores = preds[:, 4] # obj conf kpts_raw = preds[:, 5:] # [33600, 51] = 17*3 # ============================= # 2. 置信度筛选 # ============================= mask = scores > self.conf_threshold boxes = boxes[mask] scores = scores[mask] kpts_raw = kpts_raw[mask] if boxes.shape[0] == 0: return [] # ============================= # 3. bbox cxcywh -> xyxy(input scale) # ============================= boxes_xyxy = np.zeros_like(boxes) boxes_xyxy[:, 0] = boxes[:, 0] - boxes[:, 2] / 2 # x1 boxes_xyxy[:, 1] = boxes[:, 1] - boxes[:, 3] / 2 # y1 boxes_xyxy[:, 2] = boxes[:, 0] + boxes[:, 2] / 2 # x2 boxes_xyxy[:, 3] = boxes[:, 1] + boxes[:, 3] / 2 # y2 # ============================= # 4. inverse letterbox(bbox) # ============================= boxes_xyxy[:, [0, 2]] = (boxes_xyxy[:, [0, 2]] - self.dw) / self.ratio boxes_xyxy[:, [1, 3]] = (boxes_xyxy[:, [1, 3]] - self.dh) / self.ratio boxes_xyxy[:, 0] = np.clip(boxes_xyxy[:, 0], 0, im0_shape[1]) boxes_xyxy[:, 1] = np.clip(boxes_xyxy[:, 1], 0, im0_shape[0]) boxes_xyxy[:, 2] = np.clip(boxes_xyxy[:, 2], 0, im0_shape[1]) boxes_xyxy[:, 3] = np.clip(boxes_xyxy[:, 3], 0, im0_shape[0]) # ============================= # 5. NMS # ============================= keep = self.nms(boxes_xyxy, scores, self.iou_threshold) boxes_xyxy = boxes_xyxy[keep] scores = scores[keep] kpts_raw = kpts_raw[keep] # ============================= # 6. 逐人处理 keypoints(关键) # ============================= results = [] for i in range(len(boxes_xyxy)): x1, y1, x2, y2 = boxes_xyxy[i] # (51,) -> (17,3) kpts = kpts_raw[i].reshape(17, 3).copy() kpts[:, 0] = (kpts[:, 0] - self.dw) / self.ratio kpts[:, 1] = (kpts[:, 1] - self.dh) / self.ratio # ✅ 加回 bbox offset(核心修复点) #kpts[:, 0] += x1 #kpts[:, 1] += y1 # clip kpts[:, 0] = np.clip(kpts[:, 0], 0, im0_shape[1]) kpts[:, 1] = np.clip(kpts[:, 1], 0, im0_shape[0]) # visibility sigmoid(防溢出) kpts[:, 2] = 1.0 / (1.0 + np.exp(-np.clip(kpts[:, 2], -50, 50))) results.append({ "bbox": [float(x1), float(y1), float(x2), float(y2)], "conf": float(scores[i]), "kpts": kpts }) return results # ------------------------------------------------- def __call__(self, frame): inp = self.preprocess(frame) pred = self.session.run(None, {self.input_name: inp})[0] return self.postprocess(pred, frame.shape[:2]) @staticmethod def draw_keypoints(frame, pose_results, vis_thres=0.3): for res in pose_results: kpts = res.get("kpts", None) # 注意这里对应 postprocess 返回的 key if kpts is None or len(kpts) != 17: continue # 如果是 ndarray,转换为 list if isinstance(kpts, np.ndarray): kpts = kpts.tolist() for i, (x, y, v) in enumerate(kpts): if v > vis_thres: cv2.circle(frame, (int(x), int(y)), 5, POSE_COLORS[i], -1) for a, b in POSE_SKELETON: if kpts[a][2] > vis_thres and kpts[b][2] > vis_thres: cv2.line( frame, (int(kpts[a][0]), int(kpts[a][1])), (int(kpts[b][0]), int(kpts[b][1])), POSE_COLORS[a], 2 ) return frame