182 lines
7.2 KiB
Python
182 lines
7.2 KiB
Python
# 文件名: npu_yolo_onnx.py
|
||
import cv2
|
||
import numpy as np
|
||
import onnxruntime as ort
|
||
import os
|
||
import time
|
||
|
||
|
||
def letterbox(img, new_shape=(640, 640), color=(114, 114, 114)):
|
||
shape = img.shape[:2] # h, w
|
||
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
|
||
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
|
||
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]
|
||
dw /= 2
|
||
dh /= 2
|
||
if shape[::-1] != new_unpad:
|
||
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
|
||
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
|
||
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
|
||
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)
|
||
return img, r, (dw, dh)
|
||
|
||
|
||
class YOLOv8_ONNX:
|
||
def __init__(self, onnx_path, conf_threshold=0.25, iou_threshold=0.45):
|
||
# 使用 CANNExecutionProvider
|
||
providers = [("CANNExecutionProvider", {
|
||
"device_id": 0,
|
||
"arena_extend_strategy": "kNextPowerOfTwo",
|
||
"npu_mem_limit": 16 * 1024 * 1024 * 1024,
|
||
"precision_mode": "allow_fp32_to_fp16", # 修改:不降精度:must_keep_origin_dtype
|
||
"op_select_impl_mode": "high_precision",
|
||
"enable_cann_graph": True,
|
||
})]
|
||
|
||
# 创建 Session(ORT 自动忽略不存在的 EP,不会抛异常)
|
||
self.session = ort.InferenceSession(onnx_path, providers=providers)
|
||
|
||
# 获取真实工作 provider
|
||
actual_providers = self.session.get_providers()
|
||
|
||
print("YOLO Providers:", actual_providers)
|
||
|
||
if "CANNExecutionProvider" in actual_providers:
|
||
print("[INFO] YOLO 使用 CANNExecutionProvider(昇腾)")
|
||
else:
|
||
print("[INFO] YOLO 使用 CPUExecutionProvider(非昇腾环境)")
|
||
|
||
self.conf_threshold = conf_threshold
|
||
self.iou_threshold = iou_threshold
|
||
self.input_name = self.session.get_inputs()[0].name
|
||
|
||
print(f"YOLO模型输入名称: {self.input_name}")
|
||
print(f"YOLO模型输入形状: {self.session.get_inputs()[0].shape}")
|
||
print(f"YOLO模型输出形状: {self.session.get_outputs()[0].shape}")
|
||
|
||
def preprocess(self, img):
|
||
self.orig_shape = img.shape[:2]
|
||
img, self.ratio, (self.dw, self.dh) = letterbox(img, (640, 640))
|
||
|
||
# ===== 新增:保存letterbox处理后的图像 =====
|
||
# 确保保存目录存在(如不存在则创建)
|
||
# save_dir = "../YOLO_Pipe_results"
|
||
# os.makedirs(save_dir, exist_ok=True)
|
||
# # 生成唯一文件名(例如按时间戳命名,避免覆盖)
|
||
# timestamp = int(time.time() * 1000) # 毫秒级时间戳
|
||
# save_path = os.path.join(save_dir, f"letterbox_{timestamp}.jpg")
|
||
# # 注意:letterbox处理后的img是BGR格式(因为输入的img是BGR,letterbox未改变通道顺序)
|
||
# cv2.imwrite(save_path, img)
|
||
# print(f"letterbox处理后的图像已保存至:{save_path}")
|
||
# ==========================================
|
||
|
||
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
||
img = img.transpose(2, 0, 1).astype(np.float32)
|
||
img /= 255.0
|
||
img = np.expand_dims(img, axis=0) # (1,3,640,640)
|
||
return img
|
||
|
||
def postprocess_v8(self, pred, im0_shape):
|
||
"""
|
||
根据测试结果调整的后处理
|
||
输出格式: [x_center, y_center, width, height, class0_score, class1_score]
|
||
"""
|
||
# pred 形状: (1, 6, 8400)
|
||
#print(f"【YOLO调试】原始输出形状: {pred.shape}")
|
||
|
||
# 转置: (1,6,8400) -> (8400,6)
|
||
x = pred[0].T
|
||
|
||
#print(f"【YOLO调试】转置后形状: {x.shape}")
|
||
|
||
# 提取坐标和类别分数
|
||
boxes = x[:, :4] # [x_center, y_center, width, height]
|
||
scores = x[:, 4:6] # [class0_score, class1_score]
|
||
|
||
# 置信度 = 两个类别分数的最大值
|
||
conf = np.max(scores, axis=1)
|
||
# 类别 = 最大值的索引 (0=supervisor, 1=suspect)
|
||
class_pred = np.argmax(scores, axis=1)
|
||
|
||
# 阈值过滤
|
||
mask = conf > self.conf_threshold
|
||
if not mask.any():
|
||
#print(f"【YOLO调试】没有检测到超过阈值 {self.conf_threshold} 的目标")
|
||
return []
|
||
|
||
boxes = boxes[mask]
|
||
conf = conf[mask]
|
||
class_pred = class_pred[mask]
|
||
|
||
#print(f"【YOLO调试】阈值过滤后: {len(boxes)} 个目标")
|
||
# if len(class_pred) > 0:
|
||
# print(f"【YOLO调试】类别分布: 0={np.sum(class_pred == 0)}(supervisor), 1={np.sum(class_pred == 1)}(suspect)")
|
||
|
||
# 中心坐标转角点坐标
|
||
x1 = boxes[:, 0] - boxes[:, 2] / 2
|
||
y1 = boxes[:, 1] - boxes[:, 3] / 2
|
||
x2 = boxes[:, 0] + boxes[:, 2] / 2
|
||
y2 = boxes[:, 1] + boxes[:, 3] / 2
|
||
|
||
# 去掉letterbox的padding,缩放到原始图像尺寸
|
||
x1 = (x1 - self.dw) / self.ratio
|
||
y1 = (y1 - self.dh) / self.ratio
|
||
x2 = (x2 - self.dw) / self.ratio
|
||
y2 = (y2 - self.dh) / self.ratio
|
||
|
||
# clip到图像边界
|
||
x1 = np.clip(x1, 0, im0_shape[1])
|
||
y1 = np.clip(y1, 0, im0_shape[0])
|
||
x2 = np.clip(x2, 0, im0_shape[1])
|
||
y2 = np.clip(y2, 0, im0_shape[0])
|
||
|
||
# 准备NMS
|
||
bboxes = np.stack([x1, y1, x2, y2], axis=1)
|
||
|
||
# 执行NMS
|
||
indices = cv2.dnn.NMSBoxes(
|
||
bboxes.tolist(),
|
||
conf.tolist(),
|
||
score_threshold=self.conf_threshold,
|
||
nms_threshold=self.iou_threshold
|
||
)
|
||
|
||
#print(f"【YOLO调试】NMS后保留: {len(indices) if indices is not None else 0} 个目标")
|
||
|
||
result = []
|
||
if len(indices) > 0:
|
||
indices = indices.flatten() if isinstance(indices, np.ndarray) else [i[0] for i in indices]
|
||
|
||
# 统计NMS后的类别分布
|
||
final_classes = []
|
||
supervisor_count = 0
|
||
suspect_count = 0
|
||
|
||
for i in indices:
|
||
cls_id = int(class_pred[i])
|
||
if cls_id == 0:
|
||
supervisor_count += 1
|
||
final_classes.append("supervisor")
|
||
else:
|
||
suspect_count += 1
|
||
final_classes.append("suspect")
|
||
|
||
result.append([
|
||
int(bboxes[i, 0]), int(bboxes[i, 1]),
|
||
int(bboxes[i, 2]), int(bboxes[i, 3]),
|
||
float(conf[i]),
|
||
cls_id
|
||
])
|
||
|
||
#print(f"【YOLO调试】最终类别分布: supervisor={supervisor_count}, suspect={suspect_count}")
|
||
#print(f"【YOLO调试】最终检测详情:")
|
||
# for i, idx in enumerate(indices):
|
||
# print(
|
||
# f" 目标{i + 1}: {final_classes[i]}, 置信度{conf[idx]:.3f}, 坐标({int(bboxes[idx, 0])},{int(bboxes[idx, 1])},{int(bboxes[idx, 2])},{int(bboxes[idx, 3])})")
|
||
|
||
return result
|
||
|
||
def __call__(self, frame):
|
||
input_data = self.preprocess(frame)
|
||
pred = self.session.run(None, {self.input_name: input_data})[0]
|
||
return self.postprocess_v8(pred, frame.shape) |