VLAlert / training /Policy /object_motion_features.py
AsianPlayer's picture
Add VLAlert code
1e05592 verified
Raw
History Blame Contribute Delete
9.02 kB
"""Per-clip object-motion features for LKAlert-MCB Channel 2.
Given an ordered sequence of YOLO detections (with track IDs from
ByteTrack), compute the 16-D feature vector that downstream MCB
fusion will consume.
The 16 feature names are fixed; downstream code joins by *position*,
so feature order MUST be stable. New features only appended at the
end (and `D_obj` updated).
Definition of "critical actor": at the LAST frame of the clip, the
detected box that maximises `area * approach_score * ego_path_overlap`.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
import numpy as np
# ─── feature schema (paper Table 6, fast-path columns) ────────────────────────
FEATURE_NAMES: List[str] = [
"actor_velocity", # px / frame, last frame
"lateral_velocity", # signed x-velocity
"bbox_area_growth", # mean Ξ”(area) per frame on critical actor
"max_box_area_growth", # max single-step Ξ”(area)
"last_box_area_growth", # last-step Ξ”(area) (most recent motion)
"ego_path_overlap", # fraction of frames actor is in ego-path strip
"min_distance_to_ego_path", # min |actor_x βˆ’ img_w/2| / img_w on actor frames
"track_approach_score", # √(Ξ”area_normΒ² + Ξ”y_to_egoΒ²)
"lateral_crossing_score", # |Ξ£ sign(dx)| / track_len β†’ 0 = symmetric, 1 = crossing
"ttc_proxy", # area / Ξ”area (smaller = sooner)
"object_enters_path", # 1 if actor first appears outside path then enters
"object_leaves_path", # 1 if actor was in path then leaves
"clearance_score", # mean (1 βˆ’ ego_path_overlap_window) over last 25 % of clip
"track_confidence", # mean det conf on critical track
"n_tracks", # log1p(num distinct tracks)
"track_len_norm", # critical track length / num frames seen
]
D_OBJ = len(FEATURE_NAMES)
EGO_PATH_X_HALFWIDTH = 0.20 # strip = central 40 % of width
EGO_PATH_Y_BOTTOM = 0.40 # bottom 60 % of height
@dataclass
class Detection:
frame_idx: int # 0-based
track_id: int # ByteTrack id (-1 if unassociated)
cls: int # COCO class id
conf: float
x1: float
y1: float
x2: float
y2: float
img_w: int
img_h: int
@property
def cx(self) -> float: return 0.5 * (self.x1 + self.x2)
@property
def cy(self) -> float: return 0.5 * (self.y1 + self.y2)
@property
def w(self) -> float: return max(0.0, self.x2 - self.x1)
@property
def h(self) -> float: return max(0.0, self.y2 - self.y1)
@property
def area_norm(self) -> float:
return (self.w * self.h) / (self.img_w * self.img_h + 1e-6)
@property
def cx_norm(self) -> float: return self.cx / max(1, self.img_w)
@property
def cy_norm(self) -> float: return self.cy / max(1, self.img_h)
@property
def in_ego_path(self) -> bool:
x = abs(self.cx_norm - 0.5) <= EGO_PATH_X_HALFWIDTH
y = self.cy_norm >= EGO_PATH_Y_BOTTOM
return x and y
# ─── critical-actor selection ────────────────────────────────────────────────
def _track_table(detections: List[Detection]) -> Dict[int, List[Detection]]:
out: Dict[int, List[Detection]] = {}
for d in detections:
if d.track_id < 0:
continue
out.setdefault(d.track_id, []).append(d)
for tid in out:
out[tid].sort(key=lambda d: d.frame_idx)
return out
def _critical_actor_id(tracks: Dict[int, List[Detection]],
n_frames: int) -> Optional[int]:
if not tracks:
return None
best_score = -1.0
best_tid: Optional[int] = None
last_idx = n_frames - 1
for tid, ds in tracks.items():
# last detection on or before last_idx
last = max((d for d in ds if d.frame_idx <= last_idx),
key=lambda d: d.frame_idx, default=None)
if last is None:
continue
approach = 0.0
if len(ds) >= 2:
d0, d1 = ds[-2], ds[-1]
d_area = (d1.area_norm - d0.area_norm)
d_y = (d1.cy_norm - d0.cy_norm)
approach = float(np.sqrt(d_area*d_area + d_y*d_y))
score = (last.area_norm
* (1.0 + approach)
* (1.5 if last.in_ego_path else 1.0))
if score > best_score:
best_score = score
best_tid = tid
return best_tid
# ─── 16-D feature builder ────────────────────────────────────────────────────
def compute_features(detections: List[Detection], n_frames: int
) -> Tuple[np.ndarray, Dict, Dict]:
"""Return (features [D_obj], tracks_summary dict, quality dict)."""
tracks = _track_table(detections)
tid = _critical_actor_id(tracks, n_frames)
# baseline zeros β€” all-zero features are safe for missing/empty
feat = np.zeros(D_OBJ, dtype=np.float32)
quality = {
"det_ok": bool(detections),
"track_len": 0,
"missing_rate": 1.0,
"critical_track_id": int(tid) if tid is not None else -1,
"num_tracks": len(tracks),
}
tracks_summary = {
"num_tracks": int(len(tracks)),
"critical_track_id": int(tid) if tid is not None else -1,
"track_len_distribution": [len(ds) for ds in tracks.values()],
}
if tid is None:
return feat, tracks_summary, quality
ds = tracks[tid] # critical actor ordered detections
quality["track_len"] = len(ds)
quality["missing_rate"] = max(0.0, 1.0 - len(ds) / max(1, n_frames))
# build per-step delta arrays
cx = np.asarray([d.cx_norm for d in ds])
cy = np.asarray([d.cy_norm for d in ds])
area = np.asarray([d.area_norm for d in ds])
in_ego = np.asarray([d.in_ego_path for d in ds], dtype=bool)
confs = np.asarray([d.conf for d in ds])
if len(ds) >= 2:
dx = np.diff(cx)
dy = np.diff(cy)
d_area = np.diff(area)
velocity = float(np.sqrt(dx[-1]**2 + dy[-1]**2))
lateral_velocity = float(dx[-1])
bbox_area_growth = float(d_area.mean())
max_growth = float(d_area.max(initial=0.0))
last_growth = float(d_area[-1])
# crossing score: sum signed dx normalised
sgn = np.sign(dx).sum()
lateral_cross = float(abs(sgn)) / max(1, len(dx))
# ttc proxy: positive area-growth β†’ time = area / Ξ”area
if d_area[-1] > 1e-5:
ttc_proxy = float(area[-1] / d_area[-1])
else:
ttc_proxy = 30.0 # sentinel for "no expansion"
# ego-path enter/leave events
enter = bool(in_ego[-1] and not in_ego[0])
leave = bool(in_ego[0] and not in_ego[-1])
approach = float(np.sqrt(d_area[-1]**2 + dy[-1]**2))
else:
velocity = 0.0; lateral_velocity = 0.0
bbox_area_growth = 0.0; max_growth = 0.0; last_growth = 0.0
lateral_cross = 0.0; ttc_proxy = 30.0
enter = False; leave = False; approach = 0.0
ego_overlap = float(in_ego.mean())
min_dist_x = float(np.abs(cx - 0.5).min())
last_quarter_start = max(0, int(0.75 * n_frames))
last_quarter = [d for d in ds if d.frame_idx >= last_quarter_start]
if last_quarter:
clear = 1.0 - float(np.mean([d.in_ego_path for d in last_quarter]))
else:
clear = 0.5 # uncertain
track_conf = float(confs.mean())
n_tracks = float(np.log1p(len(tracks)))
track_len_norm = float(len(ds) / max(1, n_frames))
feat = np.asarray([
velocity,
lateral_velocity,
bbox_area_growth,
max_growth,
last_growth,
ego_overlap,
min_dist_x,
approach,
lateral_cross,
ttc_proxy,
float(enter),
float(leave),
clear,
track_conf,
n_tracks,
track_len_norm,
], dtype=np.float32)
assert feat.shape == (D_OBJ,), (feat.shape, D_OBJ)
return feat, tracks_summary, quality
# ─── reserved-channel placeholder schema ─────────────────────────────────────
def empty_reserved_slots() -> Dict:
"""Per Red Line 3: schema must reserve fields for SAM2 / CoTracker /
flow / depth even though Day-9 fast path doesn't fill them."""
return {
"sam2_masks": None,
"cotracker_points": None,
"raft_flow_per_frame": None,
"sea_raft_flow": None,
"video_depth_anything": None,
"actor_depth_trend": None,
"filled": False,
}