| """Per-clip object-motion features for LKAlert-MCB Channel 2. |
| |
| Given an ordered sequence of YOLO detections (with track IDs from |
| ByteTrack), compute the 16-D feature vector that downstream MCB |
| fusion will consume. |
| |
| The 16 feature names are fixed; downstream code joins by *position*, |
| so feature order MUST be stable. New features only appended at the |
| end (and `D_obj` updated). |
| |
| Definition of "critical actor": at the LAST frame of the clip, the |
| detected box that maximises `area * approach_score * ego_path_overlap`. |
| """ |
| from __future__ import annotations |
|
|
| from dataclasses import dataclass, field |
| from typing import Dict, List, Optional, Tuple |
|
|
| import numpy as np |
|
|
| |
|
|
| FEATURE_NAMES: List[str] = [ |
| "actor_velocity", |
| "lateral_velocity", |
| "bbox_area_growth", |
| "max_box_area_growth", |
| "last_box_area_growth", |
| "ego_path_overlap", |
| "min_distance_to_ego_path", |
| "track_approach_score", |
| "lateral_crossing_score", |
| "ttc_proxy", |
| "object_enters_path", |
| "object_leaves_path", |
| "clearance_score", |
| "track_confidence", |
| "n_tracks", |
| "track_len_norm", |
| ] |
| D_OBJ = len(FEATURE_NAMES) |
|
|
| EGO_PATH_X_HALFWIDTH = 0.20 |
| EGO_PATH_Y_BOTTOM = 0.40 |
|
|
|
|
| @dataclass |
| class Detection: |
| frame_idx: int |
| track_id: int |
| cls: int |
| conf: float |
| x1: float |
| y1: float |
| x2: float |
| y2: float |
| img_w: int |
| img_h: int |
|
|
| @property |
| def cx(self) -> float: return 0.5 * (self.x1 + self.x2) |
| @property |
| def cy(self) -> float: return 0.5 * (self.y1 + self.y2) |
| @property |
| def w(self) -> float: return max(0.0, self.x2 - self.x1) |
| @property |
| def h(self) -> float: return max(0.0, self.y2 - self.y1) |
| @property |
| def area_norm(self) -> float: |
| return (self.w * self.h) / (self.img_w * self.img_h + 1e-6) |
| @property |
| def cx_norm(self) -> float: return self.cx / max(1, self.img_w) |
| @property |
| def cy_norm(self) -> float: return self.cy / max(1, self.img_h) |
| @property |
| def in_ego_path(self) -> bool: |
| x = abs(self.cx_norm - 0.5) <= EGO_PATH_X_HALFWIDTH |
| y = self.cy_norm >= EGO_PATH_Y_BOTTOM |
| return x and y |
|
|
|
|
| |
|
|
| def _track_table(detections: List[Detection]) -> Dict[int, List[Detection]]: |
| out: Dict[int, List[Detection]] = {} |
| for d in detections: |
| if d.track_id < 0: |
| continue |
| out.setdefault(d.track_id, []).append(d) |
| for tid in out: |
| out[tid].sort(key=lambda d: d.frame_idx) |
| return out |
|
|
|
|
| def _critical_actor_id(tracks: Dict[int, List[Detection]], |
| n_frames: int) -> Optional[int]: |
| if not tracks: |
| return None |
| best_score = -1.0 |
| best_tid: Optional[int] = None |
| last_idx = n_frames - 1 |
| for tid, ds in tracks.items(): |
| |
| last = max((d for d in ds if d.frame_idx <= last_idx), |
| key=lambda d: d.frame_idx, default=None) |
| if last is None: |
| continue |
| approach = 0.0 |
| if len(ds) >= 2: |
| d0, d1 = ds[-2], ds[-1] |
| d_area = (d1.area_norm - d0.area_norm) |
| d_y = (d1.cy_norm - d0.cy_norm) |
| approach = float(np.sqrt(d_area*d_area + d_y*d_y)) |
| score = (last.area_norm |
| * (1.0 + approach) |
| * (1.5 if last.in_ego_path else 1.0)) |
| if score > best_score: |
| best_score = score |
| best_tid = tid |
| return best_tid |
|
|
|
|
| |
|
|
| def compute_features(detections: List[Detection], n_frames: int |
| ) -> Tuple[np.ndarray, Dict, Dict]: |
| """Return (features [D_obj], tracks_summary dict, quality dict).""" |
| tracks = _track_table(detections) |
| tid = _critical_actor_id(tracks, n_frames) |
|
|
| |
| feat = np.zeros(D_OBJ, dtype=np.float32) |
| quality = { |
| "det_ok": bool(detections), |
| "track_len": 0, |
| "missing_rate": 1.0, |
| "critical_track_id": int(tid) if tid is not None else -1, |
| "num_tracks": len(tracks), |
| } |
| tracks_summary = { |
| "num_tracks": int(len(tracks)), |
| "critical_track_id": int(tid) if tid is not None else -1, |
| "track_len_distribution": [len(ds) for ds in tracks.values()], |
| } |
| if tid is None: |
| return feat, tracks_summary, quality |
|
|
| ds = tracks[tid] |
| quality["track_len"] = len(ds) |
| quality["missing_rate"] = max(0.0, 1.0 - len(ds) / max(1, n_frames)) |
|
|
| |
| cx = np.asarray([d.cx_norm for d in ds]) |
| cy = np.asarray([d.cy_norm for d in ds]) |
| area = np.asarray([d.area_norm for d in ds]) |
| in_ego = np.asarray([d.in_ego_path for d in ds], dtype=bool) |
| confs = np.asarray([d.conf for d in ds]) |
|
|
| if len(ds) >= 2: |
| dx = np.diff(cx) |
| dy = np.diff(cy) |
| d_area = np.diff(area) |
| velocity = float(np.sqrt(dx[-1]**2 + dy[-1]**2)) |
| lateral_velocity = float(dx[-1]) |
| bbox_area_growth = float(d_area.mean()) |
| max_growth = float(d_area.max(initial=0.0)) |
| last_growth = float(d_area[-1]) |
| |
| sgn = np.sign(dx).sum() |
| lateral_cross = float(abs(sgn)) / max(1, len(dx)) |
| |
| if d_area[-1] > 1e-5: |
| ttc_proxy = float(area[-1] / d_area[-1]) |
| else: |
| ttc_proxy = 30.0 |
| |
| enter = bool(in_ego[-1] and not in_ego[0]) |
| leave = bool(in_ego[0] and not in_ego[-1]) |
| approach = float(np.sqrt(d_area[-1]**2 + dy[-1]**2)) |
| else: |
| velocity = 0.0; lateral_velocity = 0.0 |
| bbox_area_growth = 0.0; max_growth = 0.0; last_growth = 0.0 |
| lateral_cross = 0.0; ttc_proxy = 30.0 |
| enter = False; leave = False; approach = 0.0 |
|
|
| ego_overlap = float(in_ego.mean()) |
| min_dist_x = float(np.abs(cx - 0.5).min()) |
|
|
| last_quarter_start = max(0, int(0.75 * n_frames)) |
| last_quarter = [d for d in ds if d.frame_idx >= last_quarter_start] |
| if last_quarter: |
| clear = 1.0 - float(np.mean([d.in_ego_path for d in last_quarter])) |
| else: |
| clear = 0.5 |
|
|
| track_conf = float(confs.mean()) |
| n_tracks = float(np.log1p(len(tracks))) |
| track_len_norm = float(len(ds) / max(1, n_frames)) |
|
|
| feat = np.asarray([ |
| velocity, |
| lateral_velocity, |
| bbox_area_growth, |
| max_growth, |
| last_growth, |
| ego_overlap, |
| min_dist_x, |
| approach, |
| lateral_cross, |
| ttc_proxy, |
| float(enter), |
| float(leave), |
| clear, |
| track_conf, |
| n_tracks, |
| track_len_norm, |
| ], dtype=np.float32) |
| assert feat.shape == (D_OBJ,), (feat.shape, D_OBJ) |
| return feat, tracks_summary, quality |
|
|
|
|
| |
|
|
| def empty_reserved_slots() -> Dict: |
| """Per Red Line 3: schema must reserve fields for SAM2 / CoTracker / |
| flow / depth even though Day-9 fast path doesn't fill them.""" |
| return { |
| "sam2_masks": None, |
| "cotracker_points": None, |
| "raft_flow_per_frame": None, |
| "sea_raft_flow": None, |
| "video_depth_anything": None, |
| "actor_depth_trend": None, |
| "filled": False, |
| } |
|
|