Files
SupervisorAI/algorithm/checkpoint/npu_yolo_pose_onnx.py
2026-02-03 13:33:41 +08:00

278 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# npu_yolo_pose_onnx.py
# 修复要点:
# 1. 正确处理 YOLOv8 Pose anchor 输出(避免 40+ 人)
# 2. 关键点坐标正确逆 letterbox减 padding 再除 ratio
# 3. visibility 使用 sigmoid
# 4. NMS 后限制最大人数,保证工程稳定性
import cv2
import numpy as np
import onnxruntime as ort
from utils.logger import get_logger
logger = get_logger(__name__)
# -------------------------------------------------
# Letterbox
# -------------------------------------------------
def letterbox(img, new_shape=(1280, 1280), color=(114, 114, 114)):
shape = img.shape[:2] # h, w
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
new_unpad = (int(round(shape[1] * r)), int(round(shape[0] * r)))
dw = new_shape[1] - new_unpad[0]
dh = new_shape[0] - new_unpad[1]
dw /= 2
dh /= 2
if shape[::-1] != new_unpad:
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
img = cv2.copyMakeBorder(
img, top, bottom, left, right,
cv2.BORDER_CONSTANT, value=color
)
return img, r, (dw, dh)
# -------------------------------------------------
# Pose Skeleton Definition (COCO-17)
# -------------------------------------------------
POSE_SKELETON = [
(16,14),(14,12),(17,15),(15,13),(12,13),
(6,12),(7,13),(6,7),
(6,8),(7,9),(8,10),(9,11),
(2,3),(1,2),(1,3),
(2,4),(3,5),(4,6),(5,7)
]
POSE_SKELETON = [(a-1, b-1) for (a, b) in POSE_SKELETON]
POSE_COLORS = [
(255,0,0),(255,85,0),(255,170,0),(255,255,0),
(170,255,0),(85,255,0),(0,255,0),
(0,255,85),(0,255,170),(0,255,255),
(0,170,255),(0,85,255),(0,0,255),
(85,0,255),(170,0,255),(255,0,255),(255,0,170)
]
# -------------------------------------------------
# YOLOv8 Pose ONNX
# -------------------------------------------------
class YOLOv8_Pose_ONNX:
def __init__(
self,
onnx_path,
conf_threshold=0.6, # ★ 提高阈值,避免 anchor 噪声
iou_threshold=0.45,
input_size=1280,
max_persons=5 # ★ 限制最大人数
):
providers = [
("CANNExecutionProvider", {
"device_id": 0,
"arena_extend_strategy": "kNextPowerOfTwo",
"npu_mem_limit": 16 * 1024 * 1024 * 1024,
"precision_mode": "allow_fp32_to_fp16",
"op_select_impl_mode": "high_precision",
"enable_cann_graph": True,
}),
"CUDAExecutionProvider",
"CPUExecutionProvider",
]
self.session = ort.InferenceSession(onnx_path, providers=providers)
# 获取真实工作 provider
actual_providers = self.session.get_providers()
logger.info("YOLO Providers:", actual_providers)
if "CANNExecutionProvider" in actual_providers:
logger.info("[INFO] YOLO 使用 CANNExecutionProvider昇腾")
elif 'CUDAExecutionProvider' in actual_providers:
logger.info("[INFO] YOLO 使用 CUDAExecutionProviderNVIDIA GPU")
else:
logger.info("[INFO] YOLO 使用 CPUExecutionProvider非昇腾环境")
self.conf_threshold = conf_threshold
self.iou_threshold = iou_threshold
self.max_persons = max_persons
self.input_name = self.session.get_inputs()[0].name
self.input_size = (input_size, input_size)
logger.info(f"模型输入名称: {self.input_name}")
logger.info(f"模型输入形状: {self.session.get_inputs()[0].shape}")
logger.info(f"模型输出形状: {self.session.get_outputs()[0].shape}")
def nms(self, boxes, scores, iou_threshold=0.45):
"""
boxes: [N,4] xyxy
scores: [N]
"""
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]
areas = (x2 - x1) * (y2 - y1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1)
h = np.maximum(0.0, yy2 - yy1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= iou_threshold)[0]
order = order[inds + 1]
# 限制最大人数
if len(keep) >= self.max_persons:
break
return np.array(keep, dtype=int)
# -------------------------------------------------
def preprocess(self, img):
self.orig_shape = img.shape[:2]
img, self.ratio, (self.dw, self.dh) = letterbox(img, self.input_size)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.transpose(2, 0, 1).astype(np.float32) / 255.0
img = np.expand_dims(img, axis=0)
return img
def postprocess(self, preds, im0_shape):
"""
preds: onnx output, shape = [1, 56, 33600]
im0_shape: (h, w) of original frame
"""
preds = preds[0] # [56, 33600]
preds = preds.transpose(1, 0) # [33600, 56]
# =============================
# 1. 拆分输出
# =============================
boxes = preds[:, 0:4] # cx, cy, w, h (input scale)
scores = preds[:, 4] # obj conf
kpts_raw = preds[:, 5:] # [33600, 51] = 17*3
# =============================
# 2. 置信度筛选
# =============================
mask = scores > self.conf_threshold
boxes = boxes[mask]
scores = scores[mask]
kpts_raw = kpts_raw[mask]
if boxes.shape[0] == 0:
return []
# =============================
# 3. bbox cxcywh -> xyxyinput scale
# =============================
boxes_xyxy = np.zeros_like(boxes)
boxes_xyxy[:, 0] = boxes[:, 0] - boxes[:, 2] / 2 # x1
boxes_xyxy[:, 1] = boxes[:, 1] - boxes[:, 3] / 2 # y1
boxes_xyxy[:, 2] = boxes[:, 0] + boxes[:, 2] / 2 # x2
boxes_xyxy[:, 3] = boxes[:, 1] + boxes[:, 3] / 2 # y2
# =============================
# 4. inverse letterboxbbox
# =============================
boxes_xyxy[:, [0, 2]] = (boxes_xyxy[:, [0, 2]] - self.dw) / self.ratio
boxes_xyxy[:, [1, 3]] = (boxes_xyxy[:, [1, 3]] - self.dh) / self.ratio
boxes_xyxy[:, 0] = np.clip(boxes_xyxy[:, 0], 0, im0_shape[1])
boxes_xyxy[:, 1] = np.clip(boxes_xyxy[:, 1], 0, im0_shape[0])
boxes_xyxy[:, 2] = np.clip(boxes_xyxy[:, 2], 0, im0_shape[1])
boxes_xyxy[:, 3] = np.clip(boxes_xyxy[:, 3], 0, im0_shape[0])
# =============================
# 5. NMS
# =============================
keep = self.nms(boxes_xyxy, scores, self.iou_threshold)
boxes_xyxy = boxes_xyxy[keep]
scores = scores[keep]
kpts_raw = kpts_raw[keep]
# =============================
# 6. 逐人处理 keypoints关键
# =============================
results = []
for i in range(len(boxes_xyxy)):
x1, y1, x2, y2 = boxes_xyxy[i]
# (51,) -> (17,3)
kpts = kpts_raw[i].reshape(17, 3).copy()
kpts[:, 0] = (kpts[:, 0] - self.dw) / self.ratio
kpts[:, 1] = (kpts[:, 1] - self.dh) / self.ratio
# ✅ 加回 bbox offset核心修复点
#kpts[:, 0] += x1
#kpts[:, 1] += y1
# clip
kpts[:, 0] = np.clip(kpts[:, 0], 0, im0_shape[1])
kpts[:, 1] = np.clip(kpts[:, 1], 0, im0_shape[0])
# visibility sigmoid防溢出
kpts[:, 2] = 1.0 / (1.0 + np.exp(-np.clip(kpts[:, 2], -50, 50)))
results.append({
"bbox": [float(x1), float(y1), float(x2), float(y2)],
"conf": float(scores[i]),
"kpts": kpts
})
return results
# -------------------------------------------------
def __call__(self, frame):
inp = self.preprocess(frame)
# 耗时操作
pred = self.session.run(None, {self.input_name: inp})[0]
return self.postprocess(pred, frame.shape[:2])
@staticmethod
def draw_keypoints(frame, pose_results, vis_thres=0.3):
for res in pose_results:
kpts = res.get("kpts", None) # 注意这里对应 postprocess 返回的 key
if kpts is None or len(kpts) != 17:
continue
# 如果是 ndarray转换为 list
if isinstance(kpts, np.ndarray):
kpts = kpts.tolist()
for i, (x, y, v) in enumerate(kpts):
if v > vis_thres:
cv2.circle(frame, (int(x), int(y)), 5, POSE_COLORS[i], -1)
for a, b in POSE_SKELETON:
if kpts[a][2] > vis_thres and kpts[b][2] > vis_thres:
cv2.line(
frame,
(int(kpts[a][0]), int(kpts[a][1])),
(int(kpts[b][0]), int(kpts[b][1])),
POSE_COLORS[a],
2
)
return frame