修改算法文件路径改为common

This commit is contained in:
zqc
2026-02-26 11:39:14 +08:00
parent cd13709998
commit 7ce50cfd4f
11 changed files with 11 additions and 10 deletions

View File

View File

@@ -0,0 +1,156 @@
# 文件名: npu_yolo_onnx.py
import cv2
import numpy as np
import onnxruntime as ort
import os
import time
from utils.logger import get_logger
logger = get_logger(__name__)
def letterbox(img, new_shape=(640, 640), color=(114, 114, 114)):
shape = img.shape[:2] # h, w
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]
dw /= 2
dh /= 2
if shape[::-1] != new_unpad:
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)
return img, r, (dw, dh)
class YOLOv8_ONNX:
def __init__(self, onnx_path, conf_threshold=0.25, iou_threshold=0.45, input_size=640):
providers = [("CANNExecutionProvider", {
"device_id": 0,
"arena_extend_strategy": "kNextPowerOfTwo",
"npu_mem_limit": 16 * 1024 * 1024 * 1024,
"precision_mode": "allow_fp32_to_fp16",
"op_select_impl_mode": "high_precision",
"enable_cann_graph": True,
}),
"CUDAExecutionProvider",
"CPUExecutionProvider",
]
self.session = ort.InferenceSession(onnx_path, providers=providers)
actual_providers = self.session.get_providers()
logger.info("YOLO Providers:", actual_providers)
if "CANNExecutionProvider" in actual_providers:
logger.info("[INFO] YOLO 使用 CANNExecutionProvider昇腾 NPU")
elif 'CUDAExecutionProvider' in actual_providers:
logger.info("[INFO] YOLO 使用 CUDAExecutionProviderNVIDIA GPU")
else:
logger.info("[INFO] YOLO 使用 CPUExecutionProvider")
self.conf_threshold = conf_threshold
self.iou_threshold = iou_threshold
self.input_name = self.session.get_inputs()[0].name
self.input_size = (input_size, input_size) if isinstance(input_size, int) else input_size
logger.info(f"模型输入名称: {self.input_name}")
logger.info(f"模型输入形状: {self.session.get_inputs()[0].shape}")
logger.info(f"模型输出形状: {self.session.get_outputs()[0].shape}")
def preprocess(self, img):
self.orig_shape = img.shape[:2]
img, self.ratio, (self.dw, self.dh) = letterbox(img, self.input_size)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.transpose(2, 0, 1).astype(np.float32)
img /= 255.0
img = np.expand_dims(img, axis=0)
return img
def postprocess(self, pred, im0_shape):
# 1. 转置:从 [1, 4+cls, 8400] -> [8400, 4+cls]
pred = pred[0].T
# 2. 获取数据
boxes = pred[:, :4] # cx, cy, w, h
scores = pred[:, 4:]
# 3. 获取最大置信度和类别
conf = np.max(scores, axis=1)
class_pred = np.argmax(scores, axis=1)
# 4. 初步过滤
mask = conf > self.conf_threshold
if not mask.any():
return []
boxes = boxes[mask]
conf = conf[mask]
class_pred = class_pred[mask]
# =========================================================
# 还原坐标 (逆 Letterbox)
# =========================================================
boxes[:, 0] = (boxes[:, 0] - self.dw) / self.ratio # cx
boxes[:, 1] = (boxes[:, 1] - self.dh) / self.ratio # cy
boxes[:, 2] = boxes[:, 2] / self.ratio # w
boxes[:, 3] = boxes[:, 3] / self.ratio # h
# 转换格式Center(cx,cy) -> TopLeft(x,y)
x = boxes[:, 0] - boxes[:, 2] / 2
y = boxes[:, 1] - boxes[:, 3] / 2
w = boxes[:, 2]
h = boxes[:, 3]
# 原始框(用于最终输出)
bboxes_original = np.stack([x, y, w, h], axis=1)
# =========================================================
# 【核心修复】Class-Aware NMS (偏移量法)
# 给不同类别的框增加不同的偏移量,使得不同类别的框绝对不会重叠
# 从而避免 "车" 把 "人" 过滤掉的情况
# =========================================================
max_wh = 4096 # 只要大于图片最大分辨率即可
class_offset = class_pred * max_wh
# NMS 专用的框坐标 (加上了偏移量)
bboxes_for_nms = bboxes_original.copy()
bboxes_for_nms[:, 0] += class_offset
bboxes_for_nms[:, 1] += class_offset
# =========================================================
# 执行 NMS
# =========================================================
indices = cv2.dnn.NMSBoxes(
bboxes_for_nms.tolist(),
conf.tolist(),
self.conf_threshold,
self.iou_threshold
)
result = []
if len(indices) > 0:
indices = indices.flatten()
for i in indices:
# 注意:这里取数据要从 bboxes_original 取 (没有加偏移量的)
bx, by, bw, bh = bboxes_original[i]
# 转换回 x1, y1, x2, y2 供业务代码画图使用
x1 = np.clip(bx, 0, im0_shape[1])
y1 = np.clip(by, 0, im0_shape[0])
x2 = np.clip(bx + bw, 0, im0_shape[1])
y2 = np.clip(by + bh, 0, im0_shape[0])
result.append([
float(x1),
float(y1),
float(x2),
float(y2),
float(conf[i]),
int(class_pred[i])
])
return result
def __call__(self, frame):
input_data = self.preprocess(frame)
pred = self.session.run(None, {self.input_name: input_data})[0]
results = self.postprocess(pred, frame.shape[:2])
return results

View File

@@ -0,0 +1,277 @@
# npu_yolo_pose_onnx.py
# 修复要点:
# 1. 正确处理 YOLOv8 Pose anchor 输出(避免 40+ 人)
# 2. 关键点坐标正确逆 letterbox减 padding 再除 ratio
# 3. visibility 使用 sigmoid
# 4. NMS 后限制最大人数,保证工程稳定性
import cv2
import numpy as np
import onnxruntime as ort
from utils.logger import get_logger
logger = get_logger(__name__)
# -------------------------------------------------
# Letterbox
# -------------------------------------------------
def letterbox(img, new_shape=(1280, 1280), color=(114, 114, 114)):
shape = img.shape[:2] # h, w
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
new_unpad = (int(round(shape[1] * r)), int(round(shape[0] * r)))
dw = new_shape[1] - new_unpad[0]
dh = new_shape[0] - new_unpad[1]
dw /= 2
dh /= 2
if shape[::-1] != new_unpad:
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
img = cv2.copyMakeBorder(
img, top, bottom, left, right,
cv2.BORDER_CONSTANT, value=color
)
return img, r, (dw, dh)
# -------------------------------------------------
# Pose Skeleton Definition (COCO-17)
# -------------------------------------------------
POSE_SKELETON = [
(16,14),(14,12),(17,15),(15,13),(12,13),
(6,12),(7,13),(6,7),
(6,8),(7,9),(8,10),(9,11),
(2,3),(1,2),(1,3),
(2,4),(3,5),(4,6),(5,7)
]
POSE_SKELETON = [(a-1, b-1) for (a, b) in POSE_SKELETON]
POSE_COLORS = [
(255,0,0),(255,85,0),(255,170,0),(255,255,0),
(170,255,0),(85,255,0),(0,255,0),
(0,255,85),(0,255,170),(0,255,255),
(0,170,255),(0,85,255),(0,0,255),
(85,0,255),(170,0,255),(255,0,255),(255,0,170)
]
# -------------------------------------------------
# YOLOv8 Pose ONNX
# -------------------------------------------------
class YOLOv8_Pose_ONNX:
def __init__(
self,
onnx_path,
conf_threshold=0.6, # ★ 提高阈值,避免 anchor 噪声
iou_threshold=0.45,
input_size=1280,
max_persons=5 # ★ 限制最大人数
):
providers = [
("CANNExecutionProvider", {
"device_id": 0,
"arena_extend_strategy": "kNextPowerOfTwo",
"npu_mem_limit": 16 * 1024 * 1024 * 1024,
"precision_mode": "allow_fp32_to_fp16",
"op_select_impl_mode": "high_precision",
"enable_cann_graph": True,
}),
"CUDAExecutionProvider",
"CPUExecutionProvider",
]
self.session = ort.InferenceSession(onnx_path, providers=providers)
# 获取真实工作 provider
actual_providers = self.session.get_providers()
logger.info("YOLO Providers:", actual_providers)
if "CANNExecutionProvider" in actual_providers:
logger.info("[INFO] YOLO 使用 CANNExecutionProvider昇腾")
elif 'CUDAExecutionProvider' in actual_providers:
logger.info("[INFO] YOLO 使用 CUDAExecutionProviderNVIDIA GPU")
else:
logger.info("[INFO] YOLO 使用 CPUExecutionProvider非昇腾环境")
self.conf_threshold = conf_threshold
self.iou_threshold = iou_threshold
self.max_persons = max_persons
self.input_name = self.session.get_inputs()[0].name
self.input_size = (input_size, input_size)
logger.info(f"模型输入名称: {self.input_name}")
logger.info(f"模型输入形状: {self.session.get_inputs()[0].shape}")
logger.info(f"模型输出形状: {self.session.get_outputs()[0].shape}")
def nms(self, boxes, scores, iou_threshold=0.45):
"""
boxes: [N,4] xyxy
scores: [N]
"""
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]
areas = (x2 - x1) * (y2 - y1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1)
h = np.maximum(0.0, yy2 - yy1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= iou_threshold)[0]
order = order[inds + 1]
# 限制最大人数
if len(keep) >= self.max_persons:
break
return np.array(keep, dtype=int)
# -------------------------------------------------
def preprocess(self, img):
self.orig_shape = img.shape[:2]
img, self.ratio, (self.dw, self.dh) = letterbox(img, self.input_size)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.transpose(2, 0, 1).astype(np.float32) / 255.0
img = np.expand_dims(img, axis=0)
return img
def postprocess(self, preds, im0_shape):
"""
preds: onnx output, shape = [1, 56, 33600]
im0_shape: (h, w) of original frame
"""
preds = preds[0] # [56, 33600]
preds = preds.transpose(1, 0) # [33600, 56]
# =============================
# 1. 拆分输出
# =============================
boxes = preds[:, 0:4] # cx, cy, w, h (input scale)
scores = preds[:, 4] # obj conf
kpts_raw = preds[:, 5:] # [33600, 51] = 17*3
# =============================
# 2. 置信度筛选
# =============================
mask = scores > self.conf_threshold
boxes = boxes[mask]
scores = scores[mask]
kpts_raw = kpts_raw[mask]
if boxes.shape[0] == 0:
return []
# =============================
# 3. bbox cxcywh -> xyxyinput scale
# =============================
boxes_xyxy = np.zeros_like(boxes)
boxes_xyxy[:, 0] = boxes[:, 0] - boxes[:, 2] / 2 # x1
boxes_xyxy[:, 1] = boxes[:, 1] - boxes[:, 3] / 2 # y1
boxes_xyxy[:, 2] = boxes[:, 0] + boxes[:, 2] / 2 # x2
boxes_xyxy[:, 3] = boxes[:, 1] + boxes[:, 3] / 2 # y2
# =============================
# 4. inverse letterboxbbox
# =============================
boxes_xyxy[:, [0, 2]] = (boxes_xyxy[:, [0, 2]] - self.dw) / self.ratio
boxes_xyxy[:, [1, 3]] = (boxes_xyxy[:, [1, 3]] - self.dh) / self.ratio
boxes_xyxy[:, 0] = np.clip(boxes_xyxy[:, 0], 0, im0_shape[1])
boxes_xyxy[:, 1] = np.clip(boxes_xyxy[:, 1], 0, im0_shape[0])
boxes_xyxy[:, 2] = np.clip(boxes_xyxy[:, 2], 0, im0_shape[1])
boxes_xyxy[:, 3] = np.clip(boxes_xyxy[:, 3], 0, im0_shape[0])
# =============================
# 5. NMS
# =============================
keep = self.nms(boxes_xyxy, scores, self.iou_threshold)
boxes_xyxy = boxes_xyxy[keep]
scores = scores[keep]
kpts_raw = kpts_raw[keep]
# =============================
# 6. 逐人处理 keypoints关键
# =============================
results = []
for i in range(len(boxes_xyxy)):
x1, y1, x2, y2 = boxes_xyxy[i]
# (51,) -> (17,3)
kpts = kpts_raw[i].reshape(17, 3).copy()
kpts[:, 0] = (kpts[:, 0] - self.dw) / self.ratio
kpts[:, 1] = (kpts[:, 1] - self.dh) / self.ratio
# ✅ 加回 bbox offset核心修复点
#kpts[:, 0] += x1
#kpts[:, 1] += y1
# clip
kpts[:, 0] = np.clip(kpts[:, 0], 0, im0_shape[1])
kpts[:, 1] = np.clip(kpts[:, 1], 0, im0_shape[0])
# visibility sigmoid防溢出
kpts[:, 2] = 1.0 / (1.0 + np.exp(-np.clip(kpts[:, 2], -50, 50)))
results.append({
"bbox": [float(x1), float(y1), float(x2), float(y2)],
"conf": float(scores[i]),
"kpts": kpts
})
return results
# -------------------------------------------------
def __call__(self, frame):
inp = self.preprocess(frame)
# 耗时操作
pred = self.session.run(None, {self.input_name: inp})[0]
return self.postprocess(pred, frame.shape[:2])
@staticmethod
def draw_keypoints(frame, pose_results, vis_thres=0.3):
for res in pose_results:
kpts = res.get("kpts", None) # 注意这里对应 postprocess 返回的 key
if kpts is None or len(kpts) != 17:
continue
# 如果是 ndarray转换为 list
if isinstance(kpts, np.ndarray):
kpts = kpts.tolist()
for i, (x, y, v) in enumerate(kpts):
if v > vis_thres:
cv2.circle(frame, (int(x), int(y)), 5, POSE_COLORS[i], -1)
for a, b in POSE_SKELETON:
if kpts[a][2] > vis_thres and kpts[b][2] > vis_thres:
cv2.line(
frame,
(int(kpts[a][0]), int(kpts[a][1])),
(int(kpts[b][0]), int(kpts[b][1])),
POSE_COLORS[a],
2
)
return frame