"""Per-clip object-motion features for LKAlert-MCB Channel 2. Given an ordered sequence of YOLO detections (with track IDs from ByteTrack), compute the 16-D feature vector that downstream MCB fusion will consume. The 16 feature names are fixed; downstream code joins by *position*, so feature order MUST be stable. New features only appended at the end (and `D_obj` updated). Definition of "critical actor": at the LAST frame of the clip, the detected box that maximises `area * approach_score * ego_path_overlap`. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple import numpy as np # ─── feature schema (paper Table 6, fast-path columns) ──────────────────────── FEATURE_NAMES: List[str] = [ "actor_velocity", # px / frame, last frame "lateral_velocity", # signed x-velocity "bbox_area_growth", # mean Δ(area) per frame on critical actor "max_box_area_growth", # max single-step Δ(area) "last_box_area_growth", # last-step Δ(area) (most recent motion) "ego_path_overlap", # fraction of frames actor is in ego-path strip "min_distance_to_ego_path", # min |actor_x − img_w/2| / img_w on actor frames "track_approach_score", # √(Δarea_norm² + Δy_to_ego²) "lateral_crossing_score", # |Σ sign(dx)| / track_len → 0 = symmetric, 1 = crossing "ttc_proxy", # area / Δarea (smaller = sooner) "object_enters_path", # 1 if actor first appears outside path then enters "object_leaves_path", # 1 if actor was in path then leaves "clearance_score", # mean (1 − ego_path_overlap_window) over last 25 % of clip "track_confidence", # mean det conf on critical track "n_tracks", # log1p(num distinct tracks) "track_len_norm", # critical track length / num frames seen ] D_OBJ = len(FEATURE_NAMES) EGO_PATH_X_HALFWIDTH = 0.20 # strip = central 40 % of width EGO_PATH_Y_BOTTOM = 0.40 # bottom 60 % of height @dataclass class Detection: frame_idx: int # 0-based track_id: int # ByteTrack id (-1 if unassociated) cls: int # COCO class id conf: float x1: float y1: float x2: float y2: float img_w: int img_h: int @property def cx(self) -> float: return 0.5 * (self.x1 + self.x2) @property def cy(self) -> float: return 0.5 * (self.y1 + self.y2) @property def w(self) -> float: return max(0.0, self.x2 - self.x1) @property def h(self) -> float: return max(0.0, self.y2 - self.y1) @property def area_norm(self) -> float: return (self.w * self.h) / (self.img_w * self.img_h + 1e-6) @property def cx_norm(self) -> float: return self.cx / max(1, self.img_w) @property def cy_norm(self) -> float: return self.cy / max(1, self.img_h) @property def in_ego_path(self) -> bool: x = abs(self.cx_norm - 0.5) <= EGO_PATH_X_HALFWIDTH y = self.cy_norm >= EGO_PATH_Y_BOTTOM return x and y # ─── critical-actor selection ──────────────────────────────────────────────── def _track_table(detections: List[Detection]) -> Dict[int, List[Detection]]: out: Dict[int, List[Detection]] = {} for d in detections: if d.track_id < 0: continue out.setdefault(d.track_id, []).append(d) for tid in out: out[tid].sort(key=lambda d: d.frame_idx) return out def _critical_actor_id(tracks: Dict[int, List[Detection]], n_frames: int) -> Optional[int]: if not tracks: return None best_score = -1.0 best_tid: Optional[int] = None last_idx = n_frames - 1 for tid, ds in tracks.items(): # last detection on or before last_idx last = max((d for d in ds if d.frame_idx <= last_idx), key=lambda d: d.frame_idx, default=None) if last is None: continue approach = 0.0 if len(ds) >= 2: d0, d1 = ds[-2], ds[-1] d_area = (d1.area_norm - d0.area_norm) d_y = (d1.cy_norm - d0.cy_norm) approach = float(np.sqrt(d_area*d_area + d_y*d_y)) score = (last.area_norm * (1.0 + approach) * (1.5 if last.in_ego_path else 1.0)) if score > best_score: best_score = score best_tid = tid return best_tid # ─── 16-D feature builder ──────────────────────────────────────────────────── def compute_features(detections: List[Detection], n_frames: int ) -> Tuple[np.ndarray, Dict, Dict]: """Return (features [D_obj], tracks_summary dict, quality dict).""" tracks = _track_table(detections) tid = _critical_actor_id(tracks, n_frames) # baseline zeros — all-zero features are safe for missing/empty feat = np.zeros(D_OBJ, dtype=np.float32) quality = { "det_ok": bool(detections), "track_len": 0, "missing_rate": 1.0, "critical_track_id": int(tid) if tid is not None else -1, "num_tracks": len(tracks), } tracks_summary = { "num_tracks": int(len(tracks)), "critical_track_id": int(tid) if tid is not None else -1, "track_len_distribution": [len(ds) for ds in tracks.values()], } if tid is None: return feat, tracks_summary, quality ds = tracks[tid] # critical actor ordered detections quality["track_len"] = len(ds) quality["missing_rate"] = max(0.0, 1.0 - len(ds) / max(1, n_frames)) # build per-step delta arrays cx = np.asarray([d.cx_norm for d in ds]) cy = np.asarray([d.cy_norm for d in ds]) area = np.asarray([d.area_norm for d in ds]) in_ego = np.asarray([d.in_ego_path for d in ds], dtype=bool) confs = np.asarray([d.conf for d in ds]) if len(ds) >= 2: dx = np.diff(cx) dy = np.diff(cy) d_area = np.diff(area) velocity = float(np.sqrt(dx[-1]**2 + dy[-1]**2)) lateral_velocity = float(dx[-1]) bbox_area_growth = float(d_area.mean()) max_growth = float(d_area.max(initial=0.0)) last_growth = float(d_area[-1]) # crossing score: sum signed dx normalised sgn = np.sign(dx).sum() lateral_cross = float(abs(sgn)) / max(1, len(dx)) # ttc proxy: positive area-growth → time = area / Δarea if d_area[-1] > 1e-5: ttc_proxy = float(area[-1] / d_area[-1]) else: ttc_proxy = 30.0 # sentinel for "no expansion" # ego-path enter/leave events enter = bool(in_ego[-1] and not in_ego[0]) leave = bool(in_ego[0] and not in_ego[-1]) approach = float(np.sqrt(d_area[-1]**2 + dy[-1]**2)) else: velocity = 0.0; lateral_velocity = 0.0 bbox_area_growth = 0.0; max_growth = 0.0; last_growth = 0.0 lateral_cross = 0.0; ttc_proxy = 30.0 enter = False; leave = False; approach = 0.0 ego_overlap = float(in_ego.mean()) min_dist_x = float(np.abs(cx - 0.5).min()) last_quarter_start = max(0, int(0.75 * n_frames)) last_quarter = [d for d in ds if d.frame_idx >= last_quarter_start] if last_quarter: clear = 1.0 - float(np.mean([d.in_ego_path for d in last_quarter])) else: clear = 0.5 # uncertain track_conf = float(confs.mean()) n_tracks = float(np.log1p(len(tracks))) track_len_norm = float(len(ds) / max(1, n_frames)) feat = np.asarray([ velocity, lateral_velocity, bbox_area_growth, max_growth, last_growth, ego_overlap, min_dist_x, approach, lateral_cross, ttc_proxy, float(enter), float(leave), clear, track_conf, n_tracks, track_len_norm, ], dtype=np.float32) assert feat.shape == (D_OBJ,), (feat.shape, D_OBJ) return feat, tracks_summary, quality # ─── reserved-channel placeholder schema ───────────────────────────────────── def empty_reserved_slots() -> Dict: """Per Red Line 3: schema must reserve fields for SAM2 / CoTracker / flow / depth even though Day-9 fast path doesn't fill them.""" return { "sam2_masks": None, "cotracker_points": None, "raft_flow_per_frame": None, "sea_raft_flow": None, "video_depth_anything": None, "actor_depth_trend": None, "filled": False, }