v2 fire weights (yolo26n 960 e2e, 3 classes validator-aligned, synth+sim+dfire+z5atr 25k merged, synth val mAP50=0.6395)
71ae3d3 verified | # build-marker: fire-v2-blob-imgsz960 | |
| """SN44 fire detection miner — yolo26n single-pass @ imgsz=960. | |
| v2 (2026-05-09): trained on merged 25k pool (validator-synth + D-Fire + | |
| Simuletic + z5atr). FP16 ONNX, ~5 MB. Single forward pass at imgsz=960 | |
| fits the 50 ms p95 latency gate (~35 ms on 4090, blobFromImage preproc). | |
| SAHI tiling was tested but blew the latency budget (5x preproc/postproc | |
| overhead). Code preserved at fire/deploy/miner_sahi.py for later experiments. | |
| Classes (validator order from manak0/Detect-fire class_names.txt): | |
| 0=fire, 1=fire extinguisher, 2=smoke | |
| Single ONNX expected at path_hf_repo/weights.onnx (yolo26n e2e [1,300,6]). | |
| """ | |
| import math | |
| from pathlib import Path | |
| import cv2 | |
| import numpy as np | |
| import onnxruntime as ort | |
| from pydantic import BaseModel | |
| class BoundingBox(BaseModel): | |
| x1: int | |
| y1: int | |
| x2: int | |
| y2: int | |
| cls_id: int | |
| conf: float | |
| class TVFrameResult(BaseModel): | |
| frame_id: int | |
| boxes: list[BoundingBox] | |
| keypoints: list[tuple[int, int]] | |
| class Miner: | |
| def __init__(self, path_hf_repo) -> None: | |
| self.path_hf_repo = Path(path_hf_repo) | |
| # Validator's actual GT class order is [fire, smoke, fire extinguisher] | |
| # — verified by audit of alfred8995/fire001 (scores 1.00) and | |
| # navierstocks/fire (scores 0.96), both using this order. The published | |
| # manak0/Detect-fire class_names.txt list [fire, fire_ext, smoke] does | |
| # NOT match the actual scoring index. | |
| # Our model was trained with [fire, fire_ext, smoke] (cls=1=ext, cls=2=smoke). | |
| # cls_remap translates model output index → validator GT index. | |
| self.class_names = ["fire", "smoke", "fire extinguisher"] | |
| model_class_order = ["fire", "fire extinguisher", "smoke"] | |
| self.cls_remap = np.array( | |
| [self.class_names.index(n) for n in model_class_order], | |
| dtype=np.int32, | |
| ) # → [0, 2, 1]: model cls 0→0, 1→2, 2→1 | |
| try: | |
| ort.preload_dlls() | |
| except Exception: | |
| pass | |
| sess_options = ort.SessionOptions() | |
| sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| try: | |
| self.session = ort.InferenceSession( | |
| str(self.path_hf_repo / "weights.onnx"), | |
| sess_options=sess_options, | |
| providers=["CUDAExecutionProvider", "CPUExecutionProvider"], | |
| ) | |
| except Exception: | |
| self.session = ort.InferenceSession( | |
| str(self.path_hf_repo / "weights.onnx"), | |
| sess_options=sess_options, | |
| providers=["CPUExecutionProvider"], | |
| ) | |
| self.input_name = self.session.get_inputs()[0].name | |
| self.output_names = [o.name for o in self.session.get_outputs()] | |
| self.input_dtype = (np.float16 | |
| if 'float16' in self.session.get_inputs()[0].type | |
| else np.float32) | |
| self.input_h = 960 | |
| self.input_w = 960 | |
| self.conf_thres_per_class = np.array([0.20, 0.20, 0.20], dtype=np.float32) | |
| self.iou_thresh = 0.5 | |
| self.cross_iou_thresh = 0.7 | |
| self.max_det = 100 | |
| self.min_box_area = 64 | |
| self.min_side = 6 | |
| self.max_aspect_ratio = 10.0 | |
| warm = np.zeros((768, 1408, 3), dtype=np.uint8) | |
| for _ in range(3): | |
| try: self._infer_single(warm) | |
| except Exception: break | |
| def __repr__(self): | |
| thr = ",".join(f"{n[:4]}={t:.2f}" for n, t | |
| in zip(self.class_names, self.conf_thres_per_class.tolist())) | |
| return (f"FireMiner v2 yolo26n@{self.input_w} single-pass blob " | |
| f"conf=[{thr}] iou={self.iou_thresh}") | |
| def _preprocess(self, image_bgr): | |
| """Letterbox + cv2.dnn.blobFromImage (fused C++ resize/normalize/transpose).""" | |
| h, w = image_bgr.shape[:2] | |
| ratio = min(self.input_w / w, self.input_h / h) | |
| nw, nh = int(round(w * ratio)), int(round(h * ratio)) | |
| if (nw, nh) != (w, h): | |
| interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR | |
| resized = cv2.resize(image_bgr, (nw, nh), interpolation=interp) | |
| else: | |
| resized = image_bgr | |
| canvas = np.full((self.input_h, self.input_w, 3), 114, dtype=np.uint8) | |
| dy = (self.input_h - nh) // 2 | |
| dx = (self.input_w - nw) // 2 | |
| canvas[dy:dy+nh, dx:dx+nw] = resized | |
| # blobFromImage: fused BGR→RGB (swapRB) + /255 + transpose CHW + add batch dim | |
| blob = cv2.dnn.blobFromImage( | |
| canvas, scalefactor=1/255.0, | |
| size=(self.input_w, self.input_h), | |
| mean=(0, 0, 0), swapRB=True, crop=False, | |
| ) | |
| if self.input_dtype == np.float16: | |
| blob = blob.astype(np.float16) | |
| return blob, ratio, (float(dx), float(dy)) | |
| def _infer_single(self, image_bgr): | |
| inp, ratio, (dx, dy) = self._preprocess(image_bgr) | |
| out = self.session.run(self.output_names, {self.input_name: inp})[0] | |
| if out.ndim == 3: out = out[0] | |
| confs_all = out[:, 4].astype(np.float32) | |
| cls_all = self.cls_remap[out[:, 5].astype(np.int32)] | |
| cls_idx = np.clip(cls_all, 0, len(self.conf_thres_per_class) - 1) | |
| keep = confs_all >= self.conf_thres_per_class[cls_idx] | |
| if not keep.any(): return [] | |
| out = out[keep] | |
| boxes = out[:, :4].astype(np.float32).copy() | |
| confs = out[:, 4].astype(np.float32) | |
| cls_ids = self.cls_remap[out[:, 5].astype(np.int32)] | |
| boxes[:, [0, 2]] = (boxes[:, [0, 2]] - dx) / ratio | |
| boxes[:, [1, 3]] = (boxes[:, [1, 3]] - dy) / ratio | |
| oh, ow = image_bgr.shape[:2] | |
| boxes[:, [0, 2]] = np.clip(boxes[:, [0, 2]], 0, ow - 1) | |
| boxes[:, [1, 3]] = np.clip(boxes[:, [1, 3]], 0, oh - 1) | |
| if len(boxes) > 1: | |
| keep_idx = self._per_class_hard_nms(boxes, confs, cls_ids, self.iou_thresh) | |
| keep_idx = keep_idx[: self.max_det] | |
| boxes, confs, cls_ids = boxes[keep_idx], confs[keep_idx], cls_ids[keep_idx] | |
| boxes, confs, cls_ids = self._cross_class_dedup( | |
| boxes, confs, cls_ids, self.cross_iou_thresh) | |
| return self._to_boundingboxes(boxes, confs, cls_ids, ow, oh) | |
| def _hard_nms(boxes, scores, iou_thresh): | |
| n = len(boxes) | |
| if n == 0: return np.array([], dtype=np.intp) | |
| order = np.argsort(scores)[::-1] | |
| keep, suppressed = [], np.zeros(n, dtype=bool) | |
| for i in range(n): | |
| idx = order[i] | |
| if suppressed[idx]: continue | |
| keep.append(int(idx)) | |
| bi = boxes[idx] | |
| for k in range(i + 1, n): | |
| jdx = order[k] | |
| if suppressed[jdx]: continue | |
| bj = boxes[jdx] | |
| xx1, yy1 = max(bi[0], bj[0]), max(bi[1], bj[1]) | |
| xx2, yy2 = min(bi[2], bj[2]), min(bi[3], bj[3]) | |
| inter = max(0.0, xx2-xx1) * max(0.0, yy2-yy1) | |
| ai = (bi[2]-bi[0])*(bi[3]-bi[1]); aj = (bj[2]-bj[0])*(bj[3]-bj[1]) | |
| iou = inter / (ai + aj - inter + 1e-7) | |
| if iou > iou_thresh: suppressed[jdx] = True | |
| return np.array(keep, dtype=np.intp) | |
| def _per_class_hard_nms(self, boxes, scores, cls_ids, iou_thresh): | |
| if len(boxes) == 0: return np.array([], dtype=np.intp) | |
| all_keep = [] | |
| for c in np.unique(cls_ids): | |
| mask = cls_ids == c | |
| indices = np.where(mask)[0] | |
| keep = self._hard_nms(boxes[mask], scores[mask], iou_thresh) | |
| all_keep.extend(indices[keep].tolist()) | |
| all_keep.sort() | |
| return np.array(all_keep, dtype=np.intp) | |
| def _cross_class_dedup(boxes, scores, cls_ids, iou_thresh): | |
| n = len(boxes) | |
| if n <= 1: return boxes, scores, cls_ids | |
| areas = np.maximum(0.0, boxes[:, 2]-boxes[:, 0]) * np.maximum(0.0, boxes[:, 3]-boxes[:, 1]) | |
| order = np.lexsort((-scores, -areas)) | |
| suppressed = np.zeros(n, dtype=bool); keep = [] | |
| for i in order: | |
| if suppressed[i]: continue | |
| keep.append(int(i)) | |
| bi = boxes[i] | |
| xx1 = np.maximum(bi[0], boxes[:, 0]); yy1 = np.maximum(bi[1], boxes[:, 1]) | |
| xx2 = np.minimum(bi[2], boxes[:, 2]); yy2 = np.minimum(bi[3], boxes[:, 3]) | |
| inter = np.maximum(0.0, xx2-xx1) * np.maximum(0.0, yy2-yy1) | |
| ai = max(1e-7, float((bi[2]-bi[0])*(bi[3]-bi[1]))) | |
| iou = inter / (ai + areas - inter + 1e-7) | |
| dup = iou > iou_thresh; dup[i] = False | |
| suppressed |= dup | |
| kept = np.array(keep, dtype=np.intp) | |
| return boxes[kept], scores[kept], cls_ids[kept] | |
| def _to_boundingboxes(self, boxes, confs, cls_ids, orig_w, orig_h): | |
| out = [] | |
| for i in range(len(boxes)): | |
| x1, y1, x2, y2 = boxes[i] | |
| ix1 = max(0, min(orig_w, math.floor(x1))) | |
| iy1 = max(0, min(orig_h, math.floor(y1))) | |
| ix2 = max(0, min(orig_w, math.ceil(x2))) | |
| iy2 = max(0, min(orig_h, math.ceil(y2))) | |
| if ix2 <= ix1 or iy2 <= iy1: continue | |
| bw, bh = ix2 - ix1, iy2 - iy1 | |
| if bw * bh < self.min_box_area: continue | |
| if min(bw, bh) < self.min_side: continue | |
| ar = max(bw / max(bh, 1), bh / max(bw, 1)) | |
| if ar > self.max_aspect_ratio: continue | |
| out.append(BoundingBox(x1=ix1, y1=iy1, x2=ix2, y2=iy2, cls_id=int(cls_ids[i]), | |
| conf=max(0.0, min(1.0, float(confs[i]))))) | |
| return out | |
| def predict_batch(self, batch_images, offset, n_keypoints): | |
| results = [] | |
| for idx, image in enumerate(batch_images): | |
| boxes = self._infer_single(image) | |
| results.append(TVFrameResult( | |
| frame_id=offset + idx, | |
| boxes=boxes, | |
| keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))], | |
| )) | |
| return results | |