Files
AItst/AIMonitor/npu_yolo_onnx.py
2026-02-08 14:33:45 +08:00

182 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 文件名: npu_yolo_onnx.py
import cv2
import numpy as np
import onnxruntime as ort
import os
import time
def letterbox(img, new_shape=(640, 640), color=(114, 114, 114)):
shape = img.shape[:2] # h, w
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]
dw /= 2
dh /= 2
if shape[::-1] != new_unpad:
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)
return img, r, (dw, dh)
class YOLOv8_ONNX:
def __init__(self, onnx_path, conf_threshold=0.25, iou_threshold=0.45):
# 使用 CANNExecutionProvider
providers = [("CANNExecutionProvider", {
"device_id": 0,
"arena_extend_strategy": "kNextPowerOfTwo",
"npu_mem_limit": 16 * 1024 * 1024 * 1024,
"precision_mode": "allow_fp32_to_fp16", # 修改:不降精度:must_keep_origin_dtype
"op_select_impl_mode": "high_precision",
"enable_cann_graph": True,
})]
# 创建 SessionORT 自动忽略不存在的 EP不会抛异常
self.session = ort.InferenceSession(onnx_path, providers=providers)
# 获取真实工作 provider
actual_providers = self.session.get_providers()
print("YOLO Providers:", actual_providers)
if "CANNExecutionProvider" in actual_providers:
print("[INFO] YOLO 使用 CANNExecutionProvider昇腾")
else:
print("[INFO] YOLO 使用 CPUExecutionProvider非昇腾环境")
self.conf_threshold = conf_threshold
self.iou_threshold = iou_threshold
self.input_name = self.session.get_inputs()[0].name
print(f"YOLO模型输入名称: {self.input_name}")
print(f"YOLO模型输入形状: {self.session.get_inputs()[0].shape}")
print(f"YOLO模型输出形状: {self.session.get_outputs()[0].shape}")
def preprocess(self, img):
self.orig_shape = img.shape[:2]
img, self.ratio, (self.dw, self.dh) = letterbox(img, (640, 640))
# ===== 新增保存letterbox处理后的图像 =====
# 确保保存目录存在(如不存在则创建)
# save_dir = "../YOLO_Pipe_results"
# os.makedirs(save_dir, exist_ok=True)
# # 生成唯一文件名(例如按时间戳命名,避免覆盖)
# timestamp = int(time.time() * 1000) # 毫秒级时间戳
# save_path = os.path.join(save_dir, f"letterbox_{timestamp}.jpg")
# # 注意letterbox处理后的img是BGR格式因为输入的img是BGRletterbox未改变通道顺序
# cv2.imwrite(save_path, img)
# print(f"letterbox处理后的图像已保存至{save_path}")
# ==========================================
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.transpose(2, 0, 1).astype(np.float32)
img /= 255.0
img = np.expand_dims(img, axis=0) # (1,3,640,640)
return img
def postprocess_v8(self, pred, im0_shape):
"""
根据测试结果调整的后处理
输出格式: [x_center, y_center, width, height, class0_score, class1_score]
"""
# pred 形状: (1, 6, 8400)
#print(f"【YOLO调试】原始输出形状: {pred.shape}")
# 转置: (1,6,8400) -> (8400,6)
x = pred[0].T
#print(f"【YOLO调试】转置后形状: {x.shape}")
# 提取坐标和类别分数
boxes = x[:, :4] # [x_center, y_center, width, height]
scores = x[:, 4:6] # [class0_score, class1_score]
# 置信度 = 两个类别分数的最大值
conf = np.max(scores, axis=1)
# 类别 = 最大值的索引 (0=supervisor, 1=suspect)
class_pred = np.argmax(scores, axis=1)
# 阈值过滤
mask = conf > self.conf_threshold
if not mask.any():
#print(f"【YOLO调试】没有检测到超过阈值 {self.conf_threshold} 的目标")
return []
boxes = boxes[mask]
conf = conf[mask]
class_pred = class_pred[mask]
#print(f"【YOLO调试】阈值过滤后: {len(boxes)} 个目标")
# if len(class_pred) > 0:
# print(f"【YOLO调试】类别分布: 0={np.sum(class_pred == 0)}(supervisor), 1={np.sum(class_pred == 1)}(suspect)")
# 中心坐标转角点坐标
x1 = boxes[:, 0] - boxes[:, 2] / 2
y1 = boxes[:, 1] - boxes[:, 3] / 2
x2 = boxes[:, 0] + boxes[:, 2] / 2
y2 = boxes[:, 1] + boxes[:, 3] / 2
# 去掉letterbox的padding缩放到原始图像尺寸
x1 = (x1 - self.dw) / self.ratio
y1 = (y1 - self.dh) / self.ratio
x2 = (x2 - self.dw) / self.ratio
y2 = (y2 - self.dh) / self.ratio
# clip到图像边界
x1 = np.clip(x1, 0, im0_shape[1])
y1 = np.clip(y1, 0, im0_shape[0])
x2 = np.clip(x2, 0, im0_shape[1])
y2 = np.clip(y2, 0, im0_shape[0])
# 准备NMS
bboxes = np.stack([x1, y1, x2, y2], axis=1)
# 执行NMS
indices = cv2.dnn.NMSBoxes(
bboxes.tolist(),
conf.tolist(),
score_threshold=self.conf_threshold,
nms_threshold=self.iou_threshold
)
#print(f"【YOLO调试】NMS后保留: {len(indices) if indices is not None else 0} 个目标")
result = []
if len(indices) > 0:
indices = indices.flatten() if isinstance(indices, np.ndarray) else [i[0] for i in indices]
# 统计NMS后的类别分布
final_classes = []
supervisor_count = 0
suspect_count = 0
for i in indices:
cls_id = int(class_pred[i])
if cls_id == 0:
supervisor_count += 1
final_classes.append("supervisor")
else:
suspect_count += 1
final_classes.append("suspect")
result.append([
int(bboxes[i, 0]), int(bboxes[i, 1]),
int(bboxes[i, 2]), int(bboxes[i, 3]),
float(conf[i]),
cls_id
])
#print(f"【YOLO调试】最终类别分布: supervisor={supervisor_count}, suspect={suspect_count}")
#print(f"【YOLO调试】最终检测详情:")
# for i, idx in enumerate(indices):
# print(
# f" 目标{i + 1}: {final_classes[i]}, 置信度{conf[idx]:.3f}, 坐标({int(bboxes[idx, 0])},{int(bboxes[idx, 1])},{int(bboxes[idx, 2])},{int(bboxes[idx, 3])})")
return result
def __call__(self, frame):
input_data = self.preprocess(frame)
pred = self.session.run(None, {self.input_name: input_data})[0]
return self.postprocess_v8(pred, frame.shape)