Spaces:
Sleeping
Sleeping
Commit
·
0a3fef3
1
Parent(s):
a5b4436
dexined configurable
Browse files
backend/py/app/gradio_demo/ui.py
CHANGED
|
@@ -35,6 +35,11 @@ def _gradio_runtime(
|
|
| 35 |
hough_max_gap: int,
|
| 36 |
ellipse_min_area: int,
|
| 37 |
max_ellipses: int,
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 38 |
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], Dict[str, Any]]:
|
| 39 |
if image is None:
|
| 40 |
return None, None, {"info": "No image provided."}
|
|
@@ -50,6 +55,11 @@ def _gradio_runtime(
|
|
| 50 |
"hough_max_gap": int(hough_max_gap),
|
| 51 |
"ellipse_min_area": int(ellipse_min_area),
|
| 52 |
"max_ellipses": int(max_ellipses),
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 53 |
}
|
| 54 |
|
| 55 |
params["line_detector"] = LINE_METHOD_LABELS.get(line_method, "hough")
|
|
@@ -119,6 +129,41 @@ def build_demo() -> gr.Blocks:
|
|
| 119 |
ellipse_min_area = gr.Slider(10, 50000, value=defaults["ellipse_min_area"], step=10, label="Ellipse min area (px^2)")
|
| 120 |
max_ellipses = gr.Slider(1, 20, value=defaults["max_ellipses"], step=1, label="Max ellipses")
|
| 121 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 122 |
with gr.Accordion("DL Models (how to enable)", open=False):
|
| 123 |
dl_lines = "\n".join(
|
| 124 |
f"- **{det}**: {', '.join(models) if models else 'n/a'}" for det, models in DL_MODELS.items()
|
|
@@ -160,6 +205,11 @@ def build_demo() -> gr.Blocks:
|
|
| 160 |
hough_max_gap,
|
| 161 |
ellipse_min_area,
|
| 162 |
max_ellipses,
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 163 |
],
|
| 164 |
outputs=[out_img_classical, out_img_dl, meta_json],
|
| 165 |
)
|
|
@@ -183,6 +233,11 @@ def build_demo() -> gr.Blocks:
|
|
| 183 |
hough_max_gap,
|
| 184 |
ellipse_min_area,
|
| 185 |
max_ellipses,
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 186 |
],
|
| 187 |
outputs=[out_img_classical, out_img_dl, meta_json],
|
| 188 |
)
|
|
|
|
| 35 |
hough_max_gap: int,
|
| 36 |
ellipse_min_area: int,
|
| 37 |
max_ellipses: int,
|
| 38 |
+
dexined_use_marching_squares: bool,
|
| 39 |
+
dexined_threshold_mode: str,
|
| 40 |
+
dexined_threshold_sigma: float,
|
| 41 |
+
dexined_threshold_value: float,
|
| 42 |
+
dexined_threshold_offset: float,
|
| 43 |
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], Dict[str, Any]]:
|
| 44 |
if image is None:
|
| 45 |
return None, None, {"info": "No image provided."}
|
|
|
|
| 55 |
"hough_max_gap": int(hough_max_gap),
|
| 56 |
"ellipse_min_area": int(ellipse_min_area),
|
| 57 |
"max_ellipses": int(max_ellipses),
|
| 58 |
+
"dexined_use_marching_squares": bool(dexined_use_marching_squares),
|
| 59 |
+
"dexined_threshold_mode": str(dexined_threshold_mode).lower(),
|
| 60 |
+
"dexined_threshold_sigma": float(dexined_threshold_sigma),
|
| 61 |
+
"dexined_threshold_value": float(dexined_threshold_value),
|
| 62 |
+
"dexined_threshold_offset": float(dexined_threshold_offset),
|
| 63 |
}
|
| 64 |
|
| 65 |
params["line_detector"] = LINE_METHOD_LABELS.get(line_method, "hough")
|
|
|
|
| 129 |
ellipse_min_area = gr.Slider(10, 50000, value=defaults["ellipse_min_area"], step=10, label="Ellipse min area (px^2)")
|
| 130 |
max_ellipses = gr.Slider(1, 20, value=defaults["max_ellipses"], step=1, label="Max ellipses")
|
| 131 |
|
| 132 |
+
with gr.Accordion("DexiNed Options", open=False):
|
| 133 |
+
dexined_use_marching_squares = gr.Checkbox(
|
| 134 |
+
value=defaults.get("dexined_use_marching_squares", False),
|
| 135 |
+
label="Use marching squares iso-contours",
|
| 136 |
+
info="Enable subpixel iso-contours (marching squares) for DexiNed outputs.",
|
| 137 |
+
)
|
| 138 |
+
dexined_threshold_mode = gr.Radio(
|
| 139 |
+
choices=["adaptive", "fixed"],
|
| 140 |
+
value=defaults.get("dexined_threshold_mode", "adaptive"),
|
| 141 |
+
label="Threshold mode",
|
| 142 |
+
info="Adaptive uses mean + σ·std; fixed uses a constant probability threshold.",
|
| 143 |
+
)
|
| 144 |
+
dexined_threshold_sigma = gr.Slider(
|
| 145 |
+
0.0,
|
| 146 |
+
5.0,
|
| 147 |
+
value=defaults.get("dexined_threshold_sigma", 1.0),
|
| 148 |
+
step=0.05,
|
| 149 |
+
label="Adaptive σ multiplier",
|
| 150 |
+
)
|
| 151 |
+
dexined_threshold_value = gr.Slider(
|
| 152 |
+
0.0,
|
| 153 |
+
1.0,
|
| 154 |
+
value=defaults.get("dexined_threshold_value", 0.3),
|
| 155 |
+
step=0.01,
|
| 156 |
+
label="Fixed probability threshold",
|
| 157 |
+
)
|
| 158 |
+
dexined_threshold_offset = gr.Slider(
|
| 159 |
+
-0.5,
|
| 160 |
+
0.5,
|
| 161 |
+
value=defaults.get("dexined_threshold_offset", 0.0),
|
| 162 |
+
step=0.01,
|
| 163 |
+
label="Threshold offset",
|
| 164 |
+
info="Applied after the selected mode (use small adjustments).",
|
| 165 |
+
)
|
| 166 |
+
|
| 167 |
with gr.Accordion("DL Models (how to enable)", open=False):
|
| 168 |
dl_lines = "\n".join(
|
| 169 |
f"- **{det}**: {', '.join(models) if models else 'n/a'}" for det, models in DL_MODELS.items()
|
|
|
|
| 205 |
hough_max_gap,
|
| 206 |
ellipse_min_area,
|
| 207 |
max_ellipses,
|
| 208 |
+
dexined_use_marching_squares,
|
| 209 |
+
dexined_threshold_mode,
|
| 210 |
+
dexined_threshold_sigma,
|
| 211 |
+
dexined_threshold_value,
|
| 212 |
+
dexined_threshold_offset,
|
| 213 |
],
|
| 214 |
outputs=[out_img_classical, out_img_dl, meta_json],
|
| 215 |
)
|
|
|
|
| 233 |
hough_max_gap,
|
| 234 |
ellipse_min_area,
|
| 235 |
max_ellipses,
|
| 236 |
+
dexined_use_marching_squares,
|
| 237 |
+
dexined_threshold_mode,
|
| 238 |
+
dexined_threshold_sigma,
|
| 239 |
+
dexined_threshold_value,
|
| 240 |
+
dexined_threshold_offset,
|
| 241 |
],
|
| 242 |
outputs=[out_img_classical, out_img_dl, meta_json],
|
| 243 |
)
|
backend/py/app/inference/dl.py
CHANGED
|
@@ -44,10 +44,27 @@ def _load_session(path: str):
|
|
| 44 |
raise RuntimeError(f"Failed to load ONNX model '{path}': {e}")
|
| 45 |
|
| 46 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 47 |
def detect_dl(
|
| 48 |
image: np.ndarray,
|
| 49 |
detector: str,
|
| 50 |
model_choice: Optional[str],
|
|
|
|
| 51 |
) -> Tuple[np.ndarray, Dict[str, Any]]:
|
| 52 |
bgr = to_bgr(image)
|
| 53 |
rgb = to_rgb(bgr)
|
|
@@ -77,6 +94,10 @@ def detect_dl(
|
|
| 77 |
meta["error"] = f"Preprocess failed: {e}"
|
| 78 |
return rgb, meta
|
| 79 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 80 |
try:
|
| 81 |
outputs = sess.run(None, feed)
|
| 82 |
except Exception as e:
|
|
@@ -86,6 +107,8 @@ def detect_dl(
|
|
| 86 |
try:
|
| 87 |
overlay, post_meta = adapter.postprocess(outputs, rgb, ctx, detector)
|
| 88 |
meta.update(post_meta)
|
|
|
|
|
|
|
| 89 |
except Exception as e:
|
| 90 |
meta["error"] = f"Postprocess failed: {e}"
|
| 91 |
return rgb, meta
|
|
|
|
| 44 |
raise RuntimeError(f"Failed to load ONNX model '{path}': {e}")
|
| 45 |
|
| 46 |
|
| 47 |
+
def _extract_adapter_options(model_path: str, params: Optional[Dict[str, Any]]) -> Dict[str, Any]:
|
| 48 |
+
if not params:
|
| 49 |
+
return {}
|
| 50 |
+
name = os.path.basename(model_path).lower()
|
| 51 |
+
if "dexined" in name:
|
| 52 |
+
keys = [
|
| 53 |
+
"dexined_threshold_mode",
|
| 54 |
+
"dexined_threshold_sigma",
|
| 55 |
+
"dexined_threshold_offset",
|
| 56 |
+
"dexined_threshold_value",
|
| 57 |
+
"dexined_use_marching_squares",
|
| 58 |
+
]
|
| 59 |
+
return {k: params.get(k) for k in keys if k in params}
|
| 60 |
+
return {}
|
| 61 |
+
|
| 62 |
+
|
| 63 |
def detect_dl(
|
| 64 |
image: np.ndarray,
|
| 65 |
detector: str,
|
| 66 |
model_choice: Optional[str],
|
| 67 |
+
params: Optional[Dict[str, Any]] = None,
|
| 68 |
) -> Tuple[np.ndarray, Dict[str, Any]]:
|
| 69 |
bgr = to_bgr(image)
|
| 70 |
rgb = to_rgb(bgr)
|
|
|
|
| 94 |
meta["error"] = f"Preprocess failed: {e}"
|
| 95 |
return rgb, meta
|
| 96 |
|
| 97 |
+
adapter_options = _extract_adapter_options(model_path, params)
|
| 98 |
+
if adapter_options and isinstance(ctx.extra, dict):
|
| 99 |
+
ctx.extra.update(adapter_options)
|
| 100 |
+
|
| 101 |
try:
|
| 102 |
outputs = sess.run(None, feed)
|
| 103 |
except Exception as e:
|
|
|
|
| 107 |
try:
|
| 108 |
overlay, post_meta = adapter.postprocess(outputs, rgb, ctx, detector)
|
| 109 |
meta.update(post_meta)
|
| 110 |
+
if adapter_options:
|
| 111 |
+
meta["adapter_options"] = adapter_options
|
| 112 |
except Exception as e:
|
| 113 |
meta["error"] = f"Postprocess failed: {e}"
|
| 114 |
return rgb, meta
|
backend/py/app/inference/dl_adapters.py
CHANGED
|
@@ -4,6 +4,8 @@ import os
|
|
| 4 |
from dataclasses import dataclass
|
| 5 |
from typing import Any, Dict, List, Optional, Tuple
|
| 6 |
|
|
|
|
|
|
|
| 7 |
import cv2
|
| 8 |
import numpy as np
|
| 9 |
|
|
@@ -114,9 +116,35 @@ class EdgesAdapter(DLAdapter):
|
|
| 114 |
|
| 115 |
|
| 116 |
class DexiNedAdapter(DLAdapter):
|
| 117 |
-
"""DexiNed-specific adapter replicating the reference OpenCV DNN pipeline."""
|
| 118 |
|
| 119 |
_MEAN_BGR = (103.5, 116.2, 123.6)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 120 |
|
| 121 |
def preprocess(self, rgb: np.ndarray, sess) -> Tuple[Dict[str, np.ndarray], AdapterContext]:
|
| 122 |
input_name, in_wh = _first_input(sess)
|
|
@@ -157,14 +185,150 @@ class DexiNedAdapter(DLAdapter):
|
|
| 157 |
ave_u8 = np.mean(np.stack(maps_u8, axis=0), axis=0).astype(np.uint8)
|
| 158 |
return fuse_prob, ave_prob, fuse_u8, ave_u8
|
| 159 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 160 |
def postprocess(
|
| 161 |
self, outputs: List[np.ndarray], rgb: np.ndarray, ctx: AdapterContext, detector: str
|
| 162 |
) -> Tuple[np.ndarray, Dict[str, Any]]:
|
| 163 |
fuse_prob, ave_prob, fuse_u8, ave_u8 = self._post_process_maps(outputs, ctx.orig_size)
|
| 164 |
-
|
| 165 |
-
|
|
|
|
|
|
|
| 166 |
overlay = rgb.copy()
|
| 167 |
-
|
| 168 |
meta: Dict[str, Any] = {
|
| 169 |
"edge_prob_mean": float(np.mean(fuse_prob)),
|
| 170 |
"edge_prob_std": float(np.std(fuse_prob)),
|
|
@@ -174,9 +338,41 @@ class DexiNedAdapter(DLAdapter):
|
|
| 174 |
"ave_mean": float(np.mean(ave_u8) / 255.0),
|
| 175 |
"fuse_min": float(np.min(fuse_prob)),
|
| 176 |
"fuse_max": float(np.max(fuse_prob)),
|
|
|
|
|
|
|
|
|
|
| 177 |
"adapter": "dexined",
|
| 178 |
"resize": {"h": ctx.in_size[0], "w": ctx.in_size[1]},
|
| 179 |
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 180 |
return overlay, meta
|
| 181 |
|
| 182 |
|
|
|
|
| 4 |
from dataclasses import dataclass
|
| 5 |
from typing import Any, Dict, List, Optional, Tuple
|
| 6 |
|
| 7 |
+
import math
|
| 8 |
+
|
| 9 |
import cv2
|
| 10 |
import numpy as np
|
| 11 |
|
|
|
|
| 116 |
|
| 117 |
|
| 118 |
class DexiNedAdapter(DLAdapter):
|
| 119 |
+
"""DexiNed-specific adapter replicating the reference OpenCV DNN pipeline with optional refinements."""
|
| 120 |
|
| 121 |
_MEAN_BGR = (103.5, 116.2, 123.6)
|
| 122 |
+
_MARCHING_CASES: Dict[int, Tuple[Tuple[int, int], ...]] = {
|
| 123 |
+
0: (),
|
| 124 |
+
1: ((3, 0),),
|
| 125 |
+
2: ((0, 1),),
|
| 126 |
+
3: ((3, 1),),
|
| 127 |
+
4: ((1, 2),),
|
| 128 |
+
5: ((3, 2), (0, 1)),
|
| 129 |
+
6: ((0, 2),),
|
| 130 |
+
7: ((3, 2),),
|
| 131 |
+
8: ((2, 3),),
|
| 132 |
+
9: ((2, 0),),
|
| 133 |
+
10: ((1, 3), (0, 2)),
|
| 134 |
+
11: ((2, 1),),
|
| 135 |
+
12: ((1, 3),),
|
| 136 |
+
13: ((1, 0),),
|
| 137 |
+
14: ((3, 2),),
|
| 138 |
+
15: (),
|
| 139 |
+
}
|
| 140 |
+
_EDGE_CORNERS: Tuple[Tuple[int, int], ...] = (
|
| 141 |
+
(0, 1), # top
|
| 142 |
+
(1, 2), # right
|
| 143 |
+
(2, 3), # bottom
|
| 144 |
+
(3, 0), # left
|
| 145 |
+
)
|
| 146 |
+
_SUBPIXEL_MAX_SAMPLES = 48
|
| 147 |
+
_MS_MAX_SAMPLES = 48
|
| 148 |
|
| 149 |
def preprocess(self, rgb: np.ndarray, sess) -> Tuple[Dict[str, np.ndarray], AdapterContext]:
|
| 150 |
input_name, in_wh = _first_input(sess)
|
|
|
|
| 185 |
ave_u8 = np.mean(np.stack(maps_u8, axis=0), axis=0).astype(np.uint8)
|
| 186 |
return fuse_prob, ave_prob, fuse_u8, ave_u8
|
| 187 |
|
| 188 |
+
@staticmethod
|
| 189 |
+
def _clamp_threshold(threshold: float) -> float:
|
| 190 |
+
return float(np.clip(threshold, 1e-6, 1.0))
|
| 191 |
+
|
| 192 |
+
def _resolve_threshold(self, fuse_prob: np.ndarray, options: Dict[str, Any]) -> Tuple[float, str]:
|
| 193 |
+
mode_raw = options.get("dexined_threshold_mode", "adaptive")
|
| 194 |
+
mode = mode_raw.lower() if isinstance(mode_raw, str) else "adaptive"
|
| 195 |
+
offset = float(options.get("dexined_threshold_offset", 0.0) or 0.0)
|
| 196 |
+
mean = float(np.mean(fuse_prob))
|
| 197 |
+
std = float(np.std(fuse_prob))
|
| 198 |
+
if mode == "fixed":
|
| 199 |
+
value = float(options.get("dexined_threshold_value", 0.3) or 0.0)
|
| 200 |
+
threshold = self._clamp_threshold(value + offset)
|
| 201 |
+
return threshold, "fixed"
|
| 202 |
+
sigma = float(options.get("dexined_threshold_sigma", 1.0) or 0.0)
|
| 203 |
+
threshold = self._clamp_threshold(mean + sigma * std + offset)
|
| 204 |
+
return threshold, "adaptive"
|
| 205 |
+
|
| 206 |
+
@staticmethod
|
| 207 |
+
def _interpolate(
|
| 208 |
+
p0: Tuple[float, float], p1: Tuple[float, float], v0: float, v1: float, level: float
|
| 209 |
+
) -> Tuple[float, float]:
|
| 210 |
+
denom = v1 - v0
|
| 211 |
+
if abs(denom) < 1e-6:
|
| 212 |
+
t = 0.5
|
| 213 |
+
else:
|
| 214 |
+
t = (level - v0) / denom
|
| 215 |
+
t = float(np.clip(t, 0.0, 1.0))
|
| 216 |
+
return p0[0] + t * (p1[0] - p0[0]), p0[1] + t * (p1[1] - p0[1])
|
| 217 |
+
|
| 218 |
+
def _marching_squares(
|
| 219 |
+
self, prob: np.ndarray, level: float
|
| 220 |
+
) -> List[Tuple[Tuple[float, float], Tuple[float, float]]]:
|
| 221 |
+
H, W = prob.shape
|
| 222 |
+
segments: List[Tuple[Tuple[float, float], Tuple[float, float]]] = []
|
| 223 |
+
for y in range(H - 1):
|
| 224 |
+
for x in range(W - 1):
|
| 225 |
+
v0 = prob[y, x]
|
| 226 |
+
v1 = prob[y, x + 1]
|
| 227 |
+
v2 = prob[y + 1, x + 1]
|
| 228 |
+
v3 = prob[y + 1, x]
|
| 229 |
+
mask = 0
|
| 230 |
+
if v0 >= level:
|
| 231 |
+
mask |= 1
|
| 232 |
+
if v1 >= level:
|
| 233 |
+
mask |= 2
|
| 234 |
+
if v2 >= level:
|
| 235 |
+
mask |= 4
|
| 236 |
+
if v3 >= level:
|
| 237 |
+
mask |= 8
|
| 238 |
+
edges = self._MARCHING_CASES[mask]
|
| 239 |
+
if not edges:
|
| 240 |
+
continue
|
| 241 |
+
corners = (
|
| 242 |
+
(float(x), float(y)),
|
| 243 |
+
(float(x + 1), float(y)),
|
| 244 |
+
(float(x + 1), float(y + 1)),
|
| 245 |
+
(float(x), float(y + 1)),
|
| 246 |
+
)
|
| 247 |
+
values = (v0, v1, v2, v3)
|
| 248 |
+
for edge_a, edge_b in edges:
|
| 249 |
+
a0, a1 = self._EDGE_CORNERS[edge_a]
|
| 250 |
+
b0, b1 = self._EDGE_CORNERS[edge_b]
|
| 251 |
+
pa = self._interpolate(corners[a0], corners[a1], values[a0], values[a1], level)
|
| 252 |
+
pb = self._interpolate(corners[b0], corners[b1], values[b0], values[b1], level)
|
| 253 |
+
segments.append((pa, pb))
|
| 254 |
+
return segments
|
| 255 |
+
|
| 256 |
+
@staticmethod
|
| 257 |
+
def _skeletonize(mask: np.ndarray) -> np.ndarray:
|
| 258 |
+
skeleton = np.zeros_like(mask)
|
| 259 |
+
element = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3))
|
| 260 |
+
working = mask.copy()
|
| 261 |
+
while True:
|
| 262 |
+
eroded = cv2.erode(working, element)
|
| 263 |
+
opened = cv2.dilate(eroded, element)
|
| 264 |
+
temp = cv2.subtract(working, opened)
|
| 265 |
+
skeleton = cv2.bitwise_or(skeleton, temp)
|
| 266 |
+
working = eroded
|
| 267 |
+
if cv2.countNonZero(working) == 0:
|
| 268 |
+
break
|
| 269 |
+
return skeleton
|
| 270 |
+
|
| 271 |
+
@staticmethod
|
| 272 |
+
def _sample_bilinear(prob: np.ndarray, x: float, y: float, width: int, height: int) -> float:
|
| 273 |
+
x = float(np.clip(x, 0.0, width - 1.0))
|
| 274 |
+
y = float(np.clip(y, 0.0, height - 1.0))
|
| 275 |
+
x0 = int(math.floor(x))
|
| 276 |
+
y0 = int(math.floor(y))
|
| 277 |
+
x1 = min(x0 + 1, width - 1)
|
| 278 |
+
y1 = min(y0 + 1, height - 1)
|
| 279 |
+
dx = x - x0
|
| 280 |
+
dy = y - y0
|
| 281 |
+
return float(
|
| 282 |
+
(1 - dx) * (1 - dy) * prob[y0, x0]
|
| 283 |
+
+ dx * (1 - dy) * prob[y0, x1]
|
| 284 |
+
+ (1 - dx) * dy * prob[y1, x0]
|
| 285 |
+
+ dx * dy * prob[y1, x1]
|
| 286 |
+
)
|
| 287 |
+
|
| 288 |
+
def _refine_subpixel(self, prob: np.ndarray, skeleton: np.ndarray) -> Tuple[int, List[Tuple[float, float]]]:
|
| 289 |
+
if not np.any(skeleton):
|
| 290 |
+
return 0, []
|
| 291 |
+
smooth = cv2.GaussianBlur(prob, (0, 0), sigmaX=1.0, sigmaY=1.0)
|
| 292 |
+
gx = cv2.Sobel(smooth, cv2.CV_32F, 1, 0, ksize=3)
|
| 293 |
+
gy = cv2.Sobel(smooth, cv2.CV_32F, 0, 1, ksize=3)
|
| 294 |
+
ys, xs = np.where(skeleton > 0)
|
| 295 |
+
height, width = prob.shape
|
| 296 |
+
count = 0
|
| 297 |
+
samples: List[Tuple[float, float]] = []
|
| 298 |
+
for x, y in zip(xs.tolist(), ys.tolist()):
|
| 299 |
+
gx_val = float(gx[y, x])
|
| 300 |
+
gy_val = float(gy[y, x])
|
| 301 |
+
mag = math.hypot(gx_val, gy_val)
|
| 302 |
+
if mag < 1e-6:
|
| 303 |
+
continue
|
| 304 |
+
nx = gx_val / mag
|
| 305 |
+
ny = gy_val / mag
|
| 306 |
+
p0 = self._sample_bilinear(smooth, x, y, width, height)
|
| 307 |
+
pm = self._sample_bilinear(smooth, x - nx, y - ny, width, height)
|
| 308 |
+
pp = self._sample_bilinear(smooth, x + nx, y + ny, width, height)
|
| 309 |
+
denom = pm - 2 * p0 + pp
|
| 310 |
+
if abs(denom) < 1e-8:
|
| 311 |
+
t = 0.0
|
| 312 |
+
else:
|
| 313 |
+
t = 0.5 * (pm - pp) / denom
|
| 314 |
+
t = float(np.clip(t, -1.0, 1.0))
|
| 315 |
+
x_sub = float(np.clip(x + t * nx, 0.0, width - 1.0))
|
| 316 |
+
y_sub = float(np.clip(y + t * ny, 0.0, height - 1.0))
|
| 317 |
+
count += 1
|
| 318 |
+
if len(samples) < self._SUBPIXEL_MAX_SAMPLES:
|
| 319 |
+
samples.append((x_sub, y_sub))
|
| 320 |
+
return count, samples
|
| 321 |
+
|
| 322 |
def postprocess(
|
| 323 |
self, outputs: List[np.ndarray], rgb: np.ndarray, ctx: AdapterContext, detector: str
|
| 324 |
) -> Tuple[np.ndarray, Dict[str, Any]]:
|
| 325 |
fuse_prob, ave_prob, fuse_u8, ave_u8 = self._post_process_maps(outputs, ctx.orig_size)
|
| 326 |
+
options = ctx.extra if isinstance(ctx.extra, dict) else {}
|
| 327 |
+
threshold, threshold_mode = self._resolve_threshold(fuse_prob, options)
|
| 328 |
+
use_marching_squares = bool(options.get("dexined_use_marching_squares", False))
|
| 329 |
+
|
| 330 |
overlay = rgb.copy()
|
| 331 |
+
height, width = fuse_prob.shape
|
| 332 |
meta: Dict[str, Any] = {
|
| 333 |
"edge_prob_mean": float(np.mean(fuse_prob)),
|
| 334 |
"edge_prob_std": float(np.std(fuse_prob)),
|
|
|
|
| 338 |
"ave_mean": float(np.mean(ave_u8) / 255.0),
|
| 339 |
"fuse_min": float(np.min(fuse_prob)),
|
| 340 |
"fuse_max": float(np.max(fuse_prob)),
|
| 341 |
+
"dexined_threshold": threshold,
|
| 342 |
+
"dexined_threshold_mode": threshold_mode,
|
| 343 |
+
"dexined_use_marching_squares": use_marching_squares,
|
| 344 |
"adapter": "dexined",
|
| 345 |
"resize": {"h": ctx.in_size[0], "w": ctx.in_size[1]},
|
| 346 |
}
|
| 347 |
+
|
| 348 |
+
if use_marching_squares:
|
| 349 |
+
segments = self._marching_squares(fuse_prob, threshold)
|
| 350 |
+
for p0, p1 in segments:
|
| 351 |
+
x0 = int(np.clip(round(p0[0]), 0, width - 1))
|
| 352 |
+
y0 = int(np.clip(round(p0[1]), 0, height - 1))
|
| 353 |
+
x1 = int(np.clip(round(p1[0]), 0, width - 1))
|
| 354 |
+
y1 = int(np.clip(round(p1[1]), 0, height - 1))
|
| 355 |
+
cv2.line(overlay, (x0, y0), (x1, y1), (0, 255, 0), 1, lineType=cv2.LINE_AA)
|
| 356 |
+
meta["marching_squares_segments"] = int(len(segments))
|
| 357 |
+
if segments:
|
| 358 |
+
sample_segments = [
|
| 359 |
+
[round(p0[0], 3), round(p0[1], 3), round(p1[0], 3), round(p1[1], 3)]
|
| 360 |
+
for p0, p1 in segments[: self._MS_MAX_SAMPLES]
|
| 361 |
+
]
|
| 362 |
+
meta["marching_squares_samples"] = sample_segments
|
| 363 |
+
else:
|
| 364 |
+
binary = (fuse_prob >= threshold).astype(np.uint8)
|
| 365 |
+
mask = (binary * 255).astype(np.uint8)
|
| 366 |
+
skeleton = self._skeletonize(mask)
|
| 367 |
+
overlay[skeleton > 0] = (0, 255, 0)
|
| 368 |
+
subpixel_count, subpixel_samples = self._refine_subpixel(fuse_prob, skeleton)
|
| 369 |
+
meta["skeleton_pixels"] = int(np.count_nonzero(skeleton))
|
| 370 |
+
meta["subpixel_points"] = int(subpixel_count)
|
| 371 |
+
if subpixel_samples:
|
| 372 |
+
meta["subpixel_samples"] = [
|
| 373 |
+
[round(x, 3), round(y, 3)] for x, y in subpixel_samples[: self._SUBPIXEL_MAX_SAMPLES]
|
| 374 |
+
]
|
| 375 |
+
|
| 376 |
return overlay, meta
|
| 377 |
|
| 378 |
|
backend/py/app/services/runtime_adapter.py
CHANGED
|
@@ -22,8 +22,21 @@ DEFAULT_PARAMS: Dict[str, Any] = {
|
|
| 22 |
"ellipse_min_area": 300,
|
| 23 |
"max_ellipses": 5,
|
| 24 |
"line_detector": "hough",
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 25 |
}
|
| 26 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 27 |
PARAM_TYPES: Dict[str, Any] = {
|
| 28 |
"canny_low": int,
|
| 29 |
"canny_high": int,
|
|
@@ -36,6 +49,11 @@ PARAM_TYPES: Dict[str, Any] = {
|
|
| 36 |
"ellipse_min_area": int,
|
| 37 |
"max_ellipses": int,
|
| 38 |
"line_detector": lambda x: str(x).lower(),
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 39 |
}
|
| 40 |
|
| 41 |
CLASSICAL_MODEL_INFO = {"name": "opencv-classical", "version": cv2.__version__}
|
|
@@ -118,7 +136,7 @@ def run_detection(
|
|
| 118 |
|
| 119 |
if execute_dl:
|
| 120 |
t0 = time.perf_counter()
|
| 121 |
-
dl_img, dl_meta = detect_dl(image, detector, dl_choice)
|
| 122 |
t_ms = (time.perf_counter() - t0) * 1000.0
|
| 123 |
overlays["dl"] = dl_img
|
| 124 |
features["dl"] = dl_meta
|
|
|
|
| 22 |
"ellipse_min_area": 300,
|
| 23 |
"max_ellipses": 5,
|
| 24 |
"line_detector": "hough",
|
| 25 |
+
"dexined_threshold_mode": "adaptive",
|
| 26 |
+
"dexined_threshold_sigma": 1.0,
|
| 27 |
+
"dexined_threshold_offset": 0.0,
|
| 28 |
+
"dexined_threshold_value": 0.3,
|
| 29 |
+
"dexined_use_marching_squares": False,
|
| 30 |
}
|
| 31 |
|
| 32 |
+
def _to_bool(value: Any) -> bool:
|
| 33 |
+
if isinstance(value, bool):
|
| 34 |
+
return value
|
| 35 |
+
if isinstance(value, str):
|
| 36 |
+
return value.strip().lower() in {"1", "true", "yes", "on"}
|
| 37 |
+
return bool(value)
|
| 38 |
+
|
| 39 |
+
|
| 40 |
PARAM_TYPES: Dict[str, Any] = {
|
| 41 |
"canny_low": int,
|
| 42 |
"canny_high": int,
|
|
|
|
| 49 |
"ellipse_min_area": int,
|
| 50 |
"max_ellipses": int,
|
| 51 |
"line_detector": lambda x: str(x).lower(),
|
| 52 |
+
"dexined_threshold_mode": lambda x: str(x).lower(),
|
| 53 |
+
"dexined_threshold_sigma": float,
|
| 54 |
+
"dexined_threshold_offset": float,
|
| 55 |
+
"dexined_threshold_value": float,
|
| 56 |
+
"dexined_use_marching_squares": _to_bool,
|
| 57 |
}
|
| 58 |
|
| 59 |
CLASSICAL_MODEL_INFO = {"name": "opencv-classical", "version": cv2.__version__}
|
|
|
|
| 136 |
|
| 137 |
if execute_dl:
|
| 138 |
t0 = time.perf_counter()
|
| 139 |
+
dl_img, dl_meta = detect_dl(image, detector, dl_choice, params=merged)
|
| 140 |
t_ms = (time.perf_counter() - t0) * 1000.0
|
| 141 |
overlays["dl"] = dl_img
|
| 142 |
features["dl"] = dl_meta
|
tests/test_edge_detection.py
CHANGED
|
@@ -2,6 +2,7 @@ import sys
|
|
| 2 |
from pathlib import Path
|
| 3 |
|
| 4 |
import numpy as np
|
|
|
|
| 5 |
|
| 6 |
# Ensure project root is on the import path when tests run directly.
|
| 7 |
ROOT = Path(__file__).resolve().parents[1]
|
|
@@ -46,9 +47,9 @@ def test_detect_classical_edges_highlights_pixels():
|
|
| 46 |
assert np.any(np.all(overlay == np.array([0, 255, 0], dtype=np.uint8), axis=-1))
|
| 47 |
|
| 48 |
|
| 49 |
-
def
|
| 50 |
img = _synthetic_edge_image()
|
| 51 |
-
overlay, meta = detect_dl(img, detector="Edges (Canny)", model_choice="dexined.onnx")
|
| 52 |
|
| 53 |
assert overlay.shape == img.shape
|
| 54 |
assert overlay.dtype == np.uint8
|
|
@@ -59,4 +60,31 @@ def test_detect_dl_dexined_preprocess_postprocess_matches_expectations():
|
|
| 59 |
assert 0.0 <= meta["edge_map_mean"] <= 1.0
|
| 60 |
assert 0.0 <= meta["edge_map_std"] <= 1.0
|
| 61 |
assert meta["model_path"].endswith("dexined.onnx")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 62 |
assert np.any(np.all(overlay == np.array([0, 255, 0], dtype=np.uint8), axis=-1))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 2 |
from pathlib import Path
|
| 3 |
|
| 4 |
import numpy as np
|
| 5 |
+
import pytest
|
| 6 |
|
| 7 |
# Ensure project root is on the import path when tests run directly.
|
| 8 |
ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
| 47 |
assert np.any(np.all(overlay == np.array([0, 255, 0], dtype=np.uint8), axis=-1))
|
| 48 |
|
| 49 |
|
| 50 |
+
def test_detect_dl_dexined_morphological_thinning_default():
|
| 51 |
img = _synthetic_edge_image()
|
| 52 |
+
overlay, meta = detect_dl(img, detector="Edges (Canny)", model_choice="dexined.onnx", params={})
|
| 53 |
|
| 54 |
assert overlay.shape == img.shape
|
| 55 |
assert overlay.dtype == np.uint8
|
|
|
|
| 60 |
assert 0.0 <= meta["edge_map_mean"] <= 1.0
|
| 61 |
assert 0.0 <= meta["edge_map_std"] <= 1.0
|
| 62 |
assert meta["model_path"].endswith("dexined.onnx")
|
| 63 |
+
assert meta["dexined_use_marching_squares"] is False
|
| 64 |
+
assert meta["dexined_threshold_mode"] == "adaptive"
|
| 65 |
+
assert 0.0 <= meta["dexined_threshold"] <= 1.0
|
| 66 |
+
assert meta.get("skeleton_pixels", 0) > 0
|
| 67 |
+
assert meta.get("subpixel_points", 0) >= 0
|
| 68 |
assert np.any(np.all(overlay == np.array([0, 255, 0], dtype=np.uint8), axis=-1))
|
| 69 |
+
|
| 70 |
+
|
| 71 |
+
def test_detect_dl_dexined_marching_squares_with_fixed_threshold():
|
| 72 |
+
img = _synthetic_edge_image()
|
| 73 |
+
params = {
|
| 74 |
+
"dexined_use_marching_squares": True,
|
| 75 |
+
"dexined_threshold_mode": "fixed",
|
| 76 |
+
"dexined_threshold_value": 0.05,
|
| 77 |
+
}
|
| 78 |
+
overlay, meta = detect_dl(img, detector="Edges (Canny)", model_choice="dexined.onnx", params=params)
|
| 79 |
+
|
| 80 |
+
assert overlay.shape == img.shape
|
| 81 |
+
assert overlay.dtype == np.uint8
|
| 82 |
+
assert meta.get("error") is None
|
| 83 |
+
assert meta["dexined_use_marching_squares"] is True
|
| 84 |
+
assert meta["dexined_threshold_mode"] == "fixed"
|
| 85 |
+
assert meta["dexined_threshold"] == pytest.approx(0.05, abs=1e-4)
|
| 86 |
+
assert meta.get("marching_squares_segments", 0) >= 0
|
| 87 |
+
if meta.get("marching_squares_segments", 0):
|
| 88 |
+
assert "marching_squares_samples" in meta
|
| 89 |
+
greenish = (overlay[..., 1] > overlay[..., 0] + 10) & (overlay[..., 1] > overlay[..., 2] + 10)
|
| 90 |
+
assert np.count_nonzero(greenish) > 0
|