169 lines
6.1 KiB
Python
169 lines
6.1 KiB
Python
# 文件名: npu_yolo_onnx.py
|
||
import cv2
|
||
import numpy as np
|
||
import onnxruntime as ort
|
||
import os
|
||
import time
|
||
|
||
|
||
def letterbox(img, new_shape=(640, 640), color=(114, 114, 114)):
|
||
shape = img.shape[:2] # h, w
|
||
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
|
||
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
|
||
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]
|
||
dw /= 2
|
||
dh /= 2
|
||
if shape[::-1] != new_unpad:
|
||
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
|
||
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
|
||
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
|
||
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)
|
||
return img, r, (dw, dh)
|
||
|
||
|
||
class YOLOv8_ONNX:
|
||
def __init__(self, onnx_path, conf_threshold=0.25, iou_threshold=0.45, preprocess_size_1=640,preprocess_size_2=640):
|
||
# 使用 CANNExecutionProvider
|
||
providers = [("CANNExecutionProvider", {
|
||
"device_id": 0,
|
||
"arena_extend_strategy": "kNextPowerOfTwo",
|
||
"npu_mem_limit": 16 * 1024 * 1024 * 1024,
|
||
"precision_mode": "allow_fp32_to_fp16", # 修改:不降精度:must_keep_origin_dtype
|
||
"op_select_impl_mode": "high_precision",
|
||
"enable_cann_graph": True,
|
||
}),
|
||
"CUDAExecutionProvider",
|
||
"CPUExecutionProvider", # 自动 fallback
|
||
|
||
]
|
||
|
||
# 创建 Session(ORT 自动忽略不存在的 EP,不会抛异常)
|
||
self.session = ort.InferenceSession(onnx_path, providers=providers)
|
||
|
||
# 获取真实工作 provider
|
||
actual_providers = self.session.get_providers()
|
||
|
||
print("YOLO Providers:", actual_providers)
|
||
|
||
if "CANNExecutionProvider" in actual_providers:
|
||
print("[INFO] YOLO 使用 CANNExecutionProvider(昇腾)")
|
||
else:
|
||
print("[INFO] YOLO 使用 CPUExecutionProvider(非昇腾环境)")
|
||
|
||
self.conf_threshold = conf_threshold
|
||
self.iou_threshold = iou_threshold
|
||
self.input_name = self.session.get_inputs()[0].name
|
||
self.preprocess_size_1 = preprocess_size_1
|
||
self.preprocess_size_2 = preprocess_size_2
|
||
|
||
print(f"YOLO模型输入名称: {self.input_name}")
|
||
print(f"YOLO模型输入形状: {self.session.get_inputs()[0].shape}")
|
||
print(f"YOLO模型输出形状: {self.session.get_outputs()[0].shape}")
|
||
|
||
def preprocess(self, img):
|
||
self.orig_shape = img.shape[:2]
|
||
img, self.ratio, (self.dw, self.dh) = letterbox(img, (self.preprocess_size_1, self.preprocess_size_2))
|
||
|
||
# ===== 新增:保存letterbox处理后的图像 =====
|
||
# 确保保存目录存在(如不存在则创建)
|
||
# save_dir = "../YOLO_Pipe_results"
|
||
# os.makedirs(save_dir, exist_ok=True)
|
||
# # 生成唯一文件名(例如按时间戳命名,避免覆盖)
|
||
# timestamp = int(time.time() * 1000) # 毫秒级时间戳
|
||
# save_path = os.path.join(save_dir, f"letterbox_{timestamp}.jpg")
|
||
# # 注意:letterbox处理后的img是BGR格式(因为输入的img是BGR,letterbox未改变通道顺序)
|
||
# cv2.imwrite(save_path, img)
|
||
# print(f"letterbox处理后的图像已保存至:{save_path}")
|
||
# ==========================================
|
||
|
||
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
||
img = img.transpose(2, 0, 1).astype(np.float32)
|
||
img /= 255.0
|
||
img = np.expand_dims(img, axis=0) # (1,3,640,640)
|
||
return img
|
||
|
||
def postprocess_v8(self, pred, im0_shape):
|
||
x = pred[0].T # (8400, 84)
|
||
|
||
boxes = x[:, :4]
|
||
scores = x[:, 4:] # 80 classes
|
||
|
||
class_ids = np.argmax(scores, axis=1)
|
||
conf = scores[np.arange(len(scores)), class_ids]
|
||
|
||
# 只保留 person(COCO class 0)
|
||
mask = (class_ids == 0) & (conf > self.conf_threshold)
|
||
if not mask.any():
|
||
return []
|
||
|
||
boxes = boxes[mask]
|
||
conf = conf[mask]
|
||
class_ids = class_ids[mask]
|
||
|
||
# xywh → xyxy
|
||
x1 = boxes[:, 0] - boxes[:, 2] / 2
|
||
y1 = boxes[:, 1] - boxes[:, 3] / 2
|
||
x2 = boxes[:, 0] + boxes[:, 2] / 2
|
||
y2 = boxes[:, 1] + boxes[:, 3] / 2
|
||
|
||
# 去 letterbox
|
||
x1 = (x1 - self.dw) / self.ratio
|
||
y1 = (y1 - self.dh) / self.ratio
|
||
x2 = (x2 - self.dw) / self.ratio
|
||
y2 = (y2 - self.dh) / self.ratio
|
||
|
||
x1 = np.clip(x1, 0, im0_shape[1])
|
||
y1 = np.clip(y1, 0, im0_shape[0])
|
||
x2 = np.clip(x2, 0, im0_shape[1])
|
||
y2 = np.clip(y2, 0, im0_shape[0])
|
||
|
||
bboxes = np.stack([x1, y1, x2, y2], axis=1)
|
||
|
||
indices = cv2.dnn.NMSBoxes(
|
||
bboxes.tolist(),
|
||
conf.tolist(),
|
||
self.conf_threshold,
|
||
self.iou_threshold
|
||
)
|
||
|
||
results = []
|
||
if len(indices) > 0:
|
||
indices = indices.flatten()
|
||
for i in indices:
|
||
results.append([
|
||
int(bboxes[i, 0]),
|
||
int(bboxes[i, 1]),
|
||
int(bboxes[i, 2]),
|
||
int(bboxes[i, 3]),
|
||
float(conf[i]),
|
||
0 # person
|
||
])
|
||
|
||
return results
|
||
|
||
def __call__(self, frame):
|
||
# ===== 前处理计时 =====
|
||
t_pre_start = time.perf_counter()
|
||
input_data = self.preprocess(frame)
|
||
t_pre_end = time.perf_counter()
|
||
|
||
# ===== 推理计时 =====
|
||
t_inf_start = time.perf_counter()
|
||
pred = self.session.run(None, {self.input_name: input_data})[0]
|
||
t_inf_end = time.perf_counter()
|
||
|
||
# ===== 后处理计时 =====
|
||
t_post_start = time.perf_counter()
|
||
results = self.postprocess_v8(pred, frame.shape)
|
||
t_post_end = time.perf_counter()
|
||
|
||
# ===== 打印耗时(毫秒)=====
|
||
pre_ms = (t_pre_end - t_pre_start) * 1000
|
||
inf_ms = (t_inf_end - t_inf_start) * 1000
|
||
post_ms = (t_post_end - t_post_start) * 1000
|
||
total_ms = pre_ms + inf_ms + post_ms
|
||
|
||
# print(
|
||
# f"[YOLO] Pre:{pre_ms:6.2f}ms | Inf:{inf_ms:6.2f}ms | Post:{post_ms:6.2f}ms | Total:{total_ms:6.2f}ms | Dets:{len(results)}")
|
||
|
||
return results |