276 lines
9.2 KiB
Python
276 lines
9.2 KiB
Python
# npu_yolo_pose_onnx.py
|
||
# 修复要点:
|
||
# 1. 正确处理 YOLOv8 Pose anchor 输出(避免 40+ 人)
|
||
# 2. 关键点坐标正确逆 letterbox(减 padding 再除 ratio)
|
||
# 3. visibility 使用 sigmoid
|
||
# 4. NMS 后限制最大人数,保证工程稳定性
|
||
|
||
import cv2
|
||
import numpy as np
|
||
import onnxruntime as ort
|
||
|
||
|
||
|
||
|
||
# -------------------------------------------------
|
||
# Letterbox
|
||
# -------------------------------------------------
|
||
def letterbox(img, new_shape=(1280, 1280), color=(114, 114, 114)):
|
||
shape = img.shape[:2] # h, w
|
||
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
|
||
|
||
new_unpad = (int(round(shape[1] * r)), int(round(shape[0] * r)))
|
||
dw = new_shape[1] - new_unpad[0]
|
||
dh = new_shape[0] - new_unpad[1]
|
||
dw /= 2
|
||
dh /= 2
|
||
|
||
if shape[::-1] != new_unpad:
|
||
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
|
||
|
||
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
|
||
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
|
||
img = cv2.copyMakeBorder(
|
||
img, top, bottom, left, right,
|
||
cv2.BORDER_CONSTANT, value=color
|
||
)
|
||
return img, r, (dw, dh)
|
||
|
||
|
||
# -------------------------------------------------
|
||
# Pose Skeleton Definition (COCO-17)
|
||
# -------------------------------------------------
|
||
POSE_SKELETON = [
|
||
(16,14),(14,12),(17,15),(15,13),(12,13),
|
||
(6,12),(7,13),(6,7),
|
||
(6,8),(7,9),(8,10),(9,11),
|
||
(2,3),(1,2),(1,3),
|
||
(2,4),(3,5),(4,6),(5,7)
|
||
]
|
||
|
||
POSE_SKELETON = [(a-1, b-1) for (a, b) in POSE_SKELETON]
|
||
|
||
POSE_COLORS = [
|
||
(255,0,0),(255,85,0),(255,170,0),(255,255,0),
|
||
(170,255,0),(85,255,0),(0,255,0),
|
||
(0,255,85),(0,255,170),(0,255,255),
|
||
(0,170,255),(0,85,255),(0,0,255),
|
||
(85,0,255),(170,0,255),(255,0,255),(255,0,170)
|
||
]
|
||
|
||
|
||
# -------------------------------------------------
|
||
# YOLOv8 Pose ONNX
|
||
# -------------------------------------------------
|
||
class YOLOv8_Pose_ONNX:
|
||
def __init__(
|
||
self,
|
||
onnx_path,
|
||
conf_threshold=0.6, # ★ 提高阈值,避免 anchor 噪声
|
||
iou_threshold=0.45,
|
||
input_size=1280,
|
||
max_persons=5 # ★ 限制最大人数
|
||
):
|
||
providers = [
|
||
("CANNExecutionProvider", {
|
||
"device_id": 0,
|
||
"arena_extend_strategy": "kNextPowerOfTwo",
|
||
"npu_mem_limit": 16 * 1024 * 1024 * 1024,
|
||
"precision_mode": "allow_fp32_to_fp16",
|
||
"op_select_impl_mode": "high_precision",
|
||
"enable_cann_graph": True,
|
||
}),
|
||
"CUDAExecutionProvider",
|
||
"CPUExecutionProvider",
|
||
]
|
||
|
||
self.session = ort.InferenceSession(onnx_path, providers=providers)
|
||
|
||
# 获取真实工作 provider
|
||
actual_providers = self.session.get_providers()
|
||
|
||
print("YOLO Providers:", actual_providers)
|
||
|
||
if "CANNExecutionProvider" in actual_providers:
|
||
print("[INFO] YOLO 使用 CANNExecutionProvider(昇腾)")
|
||
elif 'CUDAExecutionProvider' in actual_providers:
|
||
print("[INFO] YOLO 使用 CUDAExecutionProvider(NVIDIA GPU)")
|
||
else:
|
||
print("[INFO] YOLO 使用 CPUExecutionProvider(非昇腾环境)")
|
||
|
||
self.conf_threshold = conf_threshold
|
||
self.iou_threshold = iou_threshold
|
||
self.max_persons = max_persons
|
||
|
||
self.input_name = self.session.get_inputs()[0].name
|
||
self.input_size = (input_size, input_size)
|
||
print(f"模型输入名称: {self.input_name}")
|
||
print(f"模型输入形状: {self.session.get_inputs()[0].shape}")
|
||
print(f"模型输出形状: {self.session.get_outputs()[0].shape}")
|
||
|
||
|
||
def nms(self, boxes, scores, iou_threshold=0.45):
|
||
"""
|
||
boxes: [N,4] xyxy
|
||
scores: [N]
|
||
"""
|
||
x1 = boxes[:, 0]
|
||
y1 = boxes[:, 1]
|
||
x2 = boxes[:, 2]
|
||
y2 = boxes[:, 3]
|
||
|
||
areas = (x2 - x1) * (y2 - y1)
|
||
order = scores.argsort()[::-1]
|
||
|
||
keep = []
|
||
while order.size > 0:
|
||
i = order[0]
|
||
keep.append(i)
|
||
xx1 = np.maximum(x1[i], x1[order[1:]])
|
||
yy1 = np.maximum(y1[i], y1[order[1:]])
|
||
xx2 = np.minimum(x2[i], x2[order[1:]])
|
||
yy2 = np.minimum(y2[i], y2[order[1:]])
|
||
|
||
w = np.maximum(0.0, xx2 - xx1)
|
||
h = np.maximum(0.0, yy2 - yy1)
|
||
inter = w * h
|
||
ovr = inter / (areas[i] + areas[order[1:]] - inter)
|
||
|
||
inds = np.where(ovr <= iou_threshold)[0]
|
||
order = order[inds + 1]
|
||
|
||
# 限制最大人数
|
||
if len(keep) >= self.max_persons:
|
||
break
|
||
|
||
return np.array(keep, dtype=int)
|
||
# -------------------------------------------------
|
||
def preprocess(self, img):
|
||
self.orig_shape = img.shape[:2]
|
||
img, self.ratio, (self.dw, self.dh) = letterbox(img, self.input_size)
|
||
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
||
img = img.transpose(2, 0, 1).astype(np.float32) / 255.0
|
||
img = np.expand_dims(img, axis=0)
|
||
return img
|
||
|
||
def postprocess(self, preds, im0_shape):
|
||
"""
|
||
preds: onnx output, shape = [1, 56, 33600]
|
||
im0_shape: (h, w) of original frame
|
||
"""
|
||
|
||
preds = preds[0] # [56, 33600]
|
||
preds = preds.transpose(1, 0) # [33600, 56]
|
||
|
||
# =============================
|
||
# 1. 拆分输出
|
||
# =============================
|
||
boxes = preds[:, 0:4] # cx, cy, w, h (input scale)
|
||
scores = preds[:, 4] # obj conf
|
||
kpts_raw = preds[:, 5:] # [33600, 51] = 17*3
|
||
|
||
# =============================
|
||
# 2. 置信度筛选
|
||
# =============================
|
||
mask = scores > self.conf_threshold
|
||
boxes = boxes[mask]
|
||
scores = scores[mask]
|
||
kpts_raw = kpts_raw[mask]
|
||
|
||
if boxes.shape[0] == 0:
|
||
return []
|
||
|
||
# =============================
|
||
# 3. bbox cxcywh -> xyxy(input scale)
|
||
# =============================
|
||
boxes_xyxy = np.zeros_like(boxes)
|
||
boxes_xyxy[:, 0] = boxes[:, 0] - boxes[:, 2] / 2 # x1
|
||
boxes_xyxy[:, 1] = boxes[:, 1] - boxes[:, 3] / 2 # y1
|
||
boxes_xyxy[:, 2] = boxes[:, 0] + boxes[:, 2] / 2 # x2
|
||
boxes_xyxy[:, 3] = boxes[:, 1] + boxes[:, 3] / 2 # y2
|
||
|
||
# =============================
|
||
# 4. inverse letterbox(bbox)
|
||
# =============================
|
||
boxes_xyxy[:, [0, 2]] = (boxes_xyxy[:, [0, 2]] - self.dw) / self.ratio
|
||
boxes_xyxy[:, [1, 3]] = (boxes_xyxy[:, [1, 3]] - self.dh) / self.ratio
|
||
|
||
boxes_xyxy[:, 0] = np.clip(boxes_xyxy[:, 0], 0, im0_shape[1])
|
||
boxes_xyxy[:, 1] = np.clip(boxes_xyxy[:, 1], 0, im0_shape[0])
|
||
boxes_xyxy[:, 2] = np.clip(boxes_xyxy[:, 2], 0, im0_shape[1])
|
||
boxes_xyxy[:, 3] = np.clip(boxes_xyxy[:, 3], 0, im0_shape[0])
|
||
|
||
# =============================
|
||
# 5. NMS
|
||
# =============================
|
||
keep = self.nms(boxes_xyxy, scores, self.iou_threshold)
|
||
boxes_xyxy = boxes_xyxy[keep]
|
||
scores = scores[keep]
|
||
kpts_raw = kpts_raw[keep]
|
||
|
||
# =============================
|
||
# 6. 逐人处理 keypoints(关键)
|
||
# =============================
|
||
results = []
|
||
|
||
for i in range(len(boxes_xyxy)):
|
||
x1, y1, x2, y2 = boxes_xyxy[i]
|
||
|
||
# (51,) -> (17,3)
|
||
kpts = kpts_raw[i].reshape(17, 3).copy()
|
||
|
||
|
||
kpts[:, 0] = (kpts[:, 0] - self.dw) / self.ratio
|
||
kpts[:, 1] = (kpts[:, 1] - self.dh) / self.ratio
|
||
|
||
# ✅ 加回 bbox offset(核心修复点)
|
||
#kpts[:, 0] += x1
|
||
#kpts[:, 1] += y1
|
||
|
||
# clip
|
||
kpts[:, 0] = np.clip(kpts[:, 0], 0, im0_shape[1])
|
||
kpts[:, 1] = np.clip(kpts[:, 1], 0, im0_shape[0])
|
||
|
||
# visibility sigmoid(防溢出)
|
||
kpts[:, 2] = 1.0 / (1.0 + np.exp(-np.clip(kpts[:, 2], -50, 50)))
|
||
|
||
results.append({
|
||
"bbox": [float(x1), float(y1), float(x2), float(y2)],
|
||
"conf": float(scores[i]),
|
||
"kpts": kpts
|
||
})
|
||
|
||
return results
|
||
|
||
# -------------------------------------------------
|
||
def __call__(self, frame):
|
||
inp = self.preprocess(frame)
|
||
pred = self.session.run(None, {self.input_name: inp})[0]
|
||
return self.postprocess(pred, frame.shape[:2])
|
||
|
||
@staticmethod
|
||
def draw_keypoints(frame, pose_results, vis_thres=0.3):
|
||
for res in pose_results:
|
||
kpts = res.get("kpts", None) # 注意这里对应 postprocess 返回的 key
|
||
if kpts is None or len(kpts) != 17:
|
||
continue
|
||
# 如果是 ndarray,转换为 list
|
||
if isinstance(kpts, np.ndarray):
|
||
kpts = kpts.tolist()
|
||
|
||
for i, (x, y, v) in enumerate(kpts):
|
||
if v > vis_thres:
|
||
cv2.circle(frame, (int(x), int(y)), 5, POSE_COLORS[i], -1)
|
||
|
||
for a, b in POSE_SKELETON:
|
||
if kpts[a][2] > vis_thres and kpts[b][2] > vis_thres:
|
||
cv2.line(
|
||
frame,
|
||
(int(kpts[a][0]), int(kpts[a][1])),
|
||
(int(kpts[b][0]), int(kpts[b][1])),
|
||
POSE_COLORS[a],
|
||
2
|
||
)
|
||
return frame
|
||
|