ScoreVision-Fire / miner.py
meaculpitt's picture
v2 fire weights (yolo26n 960 e2e, 3 classes validator-aligned, synth+sim+dfire+z5atr 25k merged, synth val mAP50=0.6395)
71ae3d3 verified
# build-marker: fire-v2-blob-imgsz960
"""SN44 fire detection miner — yolo26n single-pass @ imgsz=960.
v2 (2026-05-09): trained on merged 25k pool (validator-synth + D-Fire +
Simuletic + z5atr). FP16 ONNX, ~5 MB. Single forward pass at imgsz=960
fits the 50 ms p95 latency gate (~35 ms on 4090, blobFromImage preproc).
SAHI tiling was tested but blew the latency budget (5x preproc/postproc
overhead). Code preserved at fire/deploy/miner_sahi.py for later experiments.
Classes (validator order from manak0/Detect-fire class_names.txt):
0=fire, 1=fire extinguisher, 2=smoke
Single ONNX expected at path_hf_repo/weights.onnx (yolo26n e2e [1,300,6]).
"""
import math
from pathlib import Path
import cv2
import numpy as np
import onnxruntime as ort
from pydantic import BaseModel
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
class Miner:
def __init__(self, path_hf_repo) -> None:
self.path_hf_repo = Path(path_hf_repo)
# Validator's actual GT class order is [fire, smoke, fire extinguisher]
# — verified by audit of alfred8995/fire001 (scores 1.00) and
# navierstocks/fire (scores 0.96), both using this order. The published
# manak0/Detect-fire class_names.txt list [fire, fire_ext, smoke] does
# NOT match the actual scoring index.
# Our model was trained with [fire, fire_ext, smoke] (cls=1=ext, cls=2=smoke).
# cls_remap translates model output index → validator GT index.
self.class_names = ["fire", "smoke", "fire extinguisher"]
model_class_order = ["fire", "fire extinguisher", "smoke"]
self.cls_remap = np.array(
[self.class_names.index(n) for n in model_class_order],
dtype=np.int32,
) # → [0, 2, 1]: model cls 0→0, 1→2, 2→1
try:
ort.preload_dlls()
except Exception:
pass
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
try:
self.session = ort.InferenceSession(
str(self.path_hf_repo / "weights.onnx"),
sess_options=sess_options,
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
except Exception:
self.session = ort.InferenceSession(
str(self.path_hf_repo / "weights.onnx"),
sess_options=sess_options,
providers=["CPUExecutionProvider"],
)
self.input_name = self.session.get_inputs()[0].name
self.output_names = [o.name for o in self.session.get_outputs()]
self.input_dtype = (np.float16
if 'float16' in self.session.get_inputs()[0].type
else np.float32)
self.input_h = 960
self.input_w = 960
self.conf_thres_per_class = np.array([0.20, 0.20, 0.20], dtype=np.float32)
self.iou_thresh = 0.5
self.cross_iou_thresh = 0.7
self.max_det = 100
self.min_box_area = 64
self.min_side = 6
self.max_aspect_ratio = 10.0
warm = np.zeros((768, 1408, 3), dtype=np.uint8)
for _ in range(3):
try: self._infer_single(warm)
except Exception: break
def __repr__(self):
thr = ",".join(f"{n[:4]}={t:.2f}" for n, t
in zip(self.class_names, self.conf_thres_per_class.tolist()))
return (f"FireMiner v2 yolo26n@{self.input_w} single-pass blob "
f"conf=[{thr}] iou={self.iou_thresh}")
def _preprocess(self, image_bgr):
"""Letterbox + cv2.dnn.blobFromImage (fused C++ resize/normalize/transpose)."""
h, w = image_bgr.shape[:2]
ratio = min(self.input_w / w, self.input_h / h)
nw, nh = int(round(w * ratio)), int(round(h * ratio))
if (nw, nh) != (w, h):
interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR
resized = cv2.resize(image_bgr, (nw, nh), interpolation=interp)
else:
resized = image_bgr
canvas = np.full((self.input_h, self.input_w, 3), 114, dtype=np.uint8)
dy = (self.input_h - nh) // 2
dx = (self.input_w - nw) // 2
canvas[dy:dy+nh, dx:dx+nw] = resized
# blobFromImage: fused BGR→RGB (swapRB) + /255 + transpose CHW + add batch dim
blob = cv2.dnn.blobFromImage(
canvas, scalefactor=1/255.0,
size=(self.input_w, self.input_h),
mean=(0, 0, 0), swapRB=True, crop=False,
)
if self.input_dtype == np.float16:
blob = blob.astype(np.float16)
return blob, ratio, (float(dx), float(dy))
def _infer_single(self, image_bgr):
inp, ratio, (dx, dy) = self._preprocess(image_bgr)
out = self.session.run(self.output_names, {self.input_name: inp})[0]
if out.ndim == 3: out = out[0]
confs_all = out[:, 4].astype(np.float32)
cls_all = self.cls_remap[out[:, 5].astype(np.int32)]
cls_idx = np.clip(cls_all, 0, len(self.conf_thres_per_class) - 1)
keep = confs_all >= self.conf_thres_per_class[cls_idx]
if not keep.any(): return []
out = out[keep]
boxes = out[:, :4].astype(np.float32).copy()
confs = out[:, 4].astype(np.float32)
cls_ids = self.cls_remap[out[:, 5].astype(np.int32)]
boxes[:, [0, 2]] = (boxes[:, [0, 2]] - dx) / ratio
boxes[:, [1, 3]] = (boxes[:, [1, 3]] - dy) / ratio
oh, ow = image_bgr.shape[:2]
boxes[:, [0, 2]] = np.clip(boxes[:, [0, 2]], 0, ow - 1)
boxes[:, [1, 3]] = np.clip(boxes[:, [1, 3]], 0, oh - 1)
if len(boxes) > 1:
keep_idx = self._per_class_hard_nms(boxes, confs, cls_ids, self.iou_thresh)
keep_idx = keep_idx[: self.max_det]
boxes, confs, cls_ids = boxes[keep_idx], confs[keep_idx], cls_ids[keep_idx]
boxes, confs, cls_ids = self._cross_class_dedup(
boxes, confs, cls_ids, self.cross_iou_thresh)
return self._to_boundingboxes(boxes, confs, cls_ids, ow, oh)
@staticmethod
def _hard_nms(boxes, scores, iou_thresh):
n = len(boxes)
if n == 0: return np.array([], dtype=np.intp)
order = np.argsort(scores)[::-1]
keep, suppressed = [], np.zeros(n, dtype=bool)
for i in range(n):
idx = order[i]
if suppressed[idx]: continue
keep.append(int(idx))
bi = boxes[idx]
for k in range(i + 1, n):
jdx = order[k]
if suppressed[jdx]: continue
bj = boxes[jdx]
xx1, yy1 = max(bi[0], bj[0]), max(bi[1], bj[1])
xx2, yy2 = min(bi[2], bj[2]), min(bi[3], bj[3])
inter = max(0.0, xx2-xx1) * max(0.0, yy2-yy1)
ai = (bi[2]-bi[0])*(bi[3]-bi[1]); aj = (bj[2]-bj[0])*(bj[3]-bj[1])
iou = inter / (ai + aj - inter + 1e-7)
if iou > iou_thresh: suppressed[jdx] = True
return np.array(keep, dtype=np.intp)
def _per_class_hard_nms(self, boxes, scores, cls_ids, iou_thresh):
if len(boxes) == 0: return np.array([], dtype=np.intp)
all_keep = []
for c in np.unique(cls_ids):
mask = cls_ids == c
indices = np.where(mask)[0]
keep = self._hard_nms(boxes[mask], scores[mask], iou_thresh)
all_keep.extend(indices[keep].tolist())
all_keep.sort()
return np.array(all_keep, dtype=np.intp)
@staticmethod
def _cross_class_dedup(boxes, scores, cls_ids, iou_thresh):
n = len(boxes)
if n <= 1: return boxes, scores, cls_ids
areas = np.maximum(0.0, boxes[:, 2]-boxes[:, 0]) * np.maximum(0.0, boxes[:, 3]-boxes[:, 1])
order = np.lexsort((-scores, -areas))
suppressed = np.zeros(n, dtype=bool); keep = []
for i in order:
if suppressed[i]: continue
keep.append(int(i))
bi = boxes[i]
xx1 = np.maximum(bi[0], boxes[:, 0]); yy1 = np.maximum(bi[1], boxes[:, 1])
xx2 = np.minimum(bi[2], boxes[:, 2]); yy2 = np.minimum(bi[3], boxes[:, 3])
inter = np.maximum(0.0, xx2-xx1) * np.maximum(0.0, yy2-yy1)
ai = max(1e-7, float((bi[2]-bi[0])*(bi[3]-bi[1])))
iou = inter / (ai + areas - inter + 1e-7)
dup = iou > iou_thresh; dup[i] = False
suppressed |= dup
kept = np.array(keep, dtype=np.intp)
return boxes[kept], scores[kept], cls_ids[kept]
def _to_boundingboxes(self, boxes, confs, cls_ids, orig_w, orig_h):
out = []
for i in range(len(boxes)):
x1, y1, x2, y2 = boxes[i]
ix1 = max(0, min(orig_w, math.floor(x1)))
iy1 = max(0, min(orig_h, math.floor(y1)))
ix2 = max(0, min(orig_w, math.ceil(x2)))
iy2 = max(0, min(orig_h, math.ceil(y2)))
if ix2 <= ix1 or iy2 <= iy1: continue
bw, bh = ix2 - ix1, iy2 - iy1
if bw * bh < self.min_box_area: continue
if min(bw, bh) < self.min_side: continue
ar = max(bw / max(bh, 1), bh / max(bw, 1))
if ar > self.max_aspect_ratio: continue
out.append(BoundingBox(x1=ix1, y1=iy1, x2=ix2, y2=iy2, cls_id=int(cls_ids[i]),
conf=max(0.0, min(1.0, float(confs[i])))))
return out
def predict_batch(self, batch_images, offset, n_keypoints):
results = []
for idx, image in enumerate(batch_images):
boxes = self._infer_single(image)
results.append(TVFrameResult(
frame_id=offset + idx,
boxes=boxes,
keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))],
))
return results