File size: 9,015 Bytes
1e05592
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""Per-clip object-motion features for LKAlert-MCB Channel 2.

Given an ordered sequence of YOLO detections (with track IDs from
ByteTrack), compute the 16-D feature vector that downstream MCB
fusion will consume.

The 16 feature names are fixed; downstream code joins by *position*,
so feature order MUST be stable. New features only appended at the
end (and `D_obj` updated).

Definition of "critical actor": at the LAST frame of the clip, the
detected box that maximises  `area * approach_score * ego_path_overlap`.
"""
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple

import numpy as np

# ─── feature schema (paper Table 6, fast-path columns) ────────────────────────

FEATURE_NAMES: List[str] = [
    "actor_velocity",            # px / frame, last frame
    "lateral_velocity",          # signed x-velocity
    "bbox_area_growth",          # mean Ξ”(area) per frame on critical actor
    "max_box_area_growth",       # max single-step Ξ”(area)
    "last_box_area_growth",      # last-step Ξ”(area)  (most recent motion)
    "ego_path_overlap",          # fraction of frames actor is in ego-path strip
    "min_distance_to_ego_path",  # min |actor_x βˆ’ img_w/2| / img_w on actor frames
    "track_approach_score",      # √(Ξ”area_normΒ² + Ξ”y_to_egoΒ²)
    "lateral_crossing_score",    # |Ξ£ sign(dx)| / track_len β†’ 0 = symmetric, 1 = crossing
    "ttc_proxy",                 # area / Ξ”area  (smaller = sooner)
    "object_enters_path",        # 1 if actor first appears outside path then enters
    "object_leaves_path",        # 1 if actor was in path then leaves
    "clearance_score",           # mean (1 βˆ’ ego_path_overlap_window) over last 25 % of clip
    "track_confidence",          # mean det conf on critical track
    "n_tracks",                  # log1p(num distinct tracks)
    "track_len_norm",            # critical track length / num frames seen
]
D_OBJ = len(FEATURE_NAMES)

EGO_PATH_X_HALFWIDTH = 0.20      # strip = central 40 % of width
EGO_PATH_Y_BOTTOM    = 0.40      # bottom 60 % of height


@dataclass
class Detection:
    frame_idx: int               # 0-based
    track_id:  int               # ByteTrack id (-1 if unassociated)
    cls:       int               # COCO class id
    conf:      float
    x1:  float
    y1:  float
    x2:  float
    y2:  float
    img_w: int
    img_h: int

    @property
    def cx(self) -> float: return 0.5 * (self.x1 + self.x2)
    @property
    def cy(self) -> float: return 0.5 * (self.y1 + self.y2)
    @property
    def w(self) -> float:  return max(0.0, self.x2 - self.x1)
    @property
    def h(self) -> float:  return max(0.0, self.y2 - self.y1)
    @property
    def area_norm(self) -> float:
        return (self.w * self.h) / (self.img_w * self.img_h + 1e-6)
    @property
    def cx_norm(self) -> float: return self.cx / max(1, self.img_w)
    @property
    def cy_norm(self) -> float: return self.cy / max(1, self.img_h)
    @property
    def in_ego_path(self) -> bool:
        x = abs(self.cx_norm - 0.5) <= EGO_PATH_X_HALFWIDTH
        y = self.cy_norm >= EGO_PATH_Y_BOTTOM
        return x and y


# ─── critical-actor selection ────────────────────────────────────────────────

def _track_table(detections: List[Detection]) -> Dict[int, List[Detection]]:
    out: Dict[int, List[Detection]] = {}
    for d in detections:
        if d.track_id < 0:
            continue
        out.setdefault(d.track_id, []).append(d)
    for tid in out:
        out[tid].sort(key=lambda d: d.frame_idx)
    return out


def _critical_actor_id(tracks: Dict[int, List[Detection]],
                         n_frames: int) -> Optional[int]:
    if not tracks:
        return None
    best_score = -1.0
    best_tid: Optional[int] = None
    last_idx = n_frames - 1
    for tid, ds in tracks.items():
        # last detection on or before last_idx
        last = max((d for d in ds if d.frame_idx <= last_idx),
                    key=lambda d: d.frame_idx, default=None)
        if last is None:
            continue
        approach = 0.0
        if len(ds) >= 2:
            d0, d1 = ds[-2], ds[-1]
            d_area = (d1.area_norm - d0.area_norm)
            d_y    = (d1.cy_norm - d0.cy_norm)
            approach = float(np.sqrt(d_area*d_area + d_y*d_y))
        score = (last.area_norm
                  * (1.0 + approach)
                  * (1.5 if last.in_ego_path else 1.0))
        if score > best_score:
            best_score = score
            best_tid = tid
    return best_tid


# ─── 16-D feature builder ────────────────────────────────────────────────────

def compute_features(detections: List[Detection], n_frames: int
                       ) -> Tuple[np.ndarray, Dict, Dict]:
    """Return (features [D_obj], tracks_summary dict, quality dict)."""
    tracks = _track_table(detections)
    tid = _critical_actor_id(tracks, n_frames)

    # baseline zeros β€” all-zero features are safe for missing/empty
    feat = np.zeros(D_OBJ, dtype=np.float32)
    quality = {
        "det_ok":      bool(detections),
        "track_len":   0,
        "missing_rate": 1.0,
        "critical_track_id": int(tid) if tid is not None else -1,
        "num_tracks":  len(tracks),
    }
    tracks_summary = {
        "num_tracks":  int(len(tracks)),
        "critical_track_id": int(tid) if tid is not None else -1,
        "track_len_distribution": [len(ds) for ds in tracks.values()],
    }
    if tid is None:
        return feat, tracks_summary, quality

    ds = tracks[tid]                        # critical actor ordered detections
    quality["track_len"] = len(ds)
    quality["missing_rate"] = max(0.0, 1.0 - len(ds) / max(1, n_frames))

    # build per-step delta arrays
    cx     = np.asarray([d.cx_norm   for d in ds])
    cy     = np.asarray([d.cy_norm   for d in ds])
    area   = np.asarray([d.area_norm for d in ds])
    in_ego = np.asarray([d.in_ego_path for d in ds], dtype=bool)
    confs  = np.asarray([d.conf      for d in ds])

    if len(ds) >= 2:
        dx = np.diff(cx)
        dy = np.diff(cy)
        d_area = np.diff(area)
        velocity = float(np.sqrt(dx[-1]**2 + dy[-1]**2))
        lateral_velocity = float(dx[-1])
        bbox_area_growth = float(d_area.mean())
        max_growth = float(d_area.max(initial=0.0))
        last_growth = float(d_area[-1])
        # crossing score: sum signed dx normalised
        sgn = np.sign(dx).sum()
        lateral_cross = float(abs(sgn)) / max(1, len(dx))
        # ttc proxy: positive area-growth β†’ time = area / Ξ”area
        if d_area[-1] > 1e-5:
            ttc_proxy = float(area[-1] / d_area[-1])
        else:
            ttc_proxy = 30.0  # sentinel for "no expansion"
        # ego-path enter/leave events
        enter = bool(in_ego[-1] and not in_ego[0])
        leave = bool(in_ego[0] and not in_ego[-1])
        approach = float(np.sqrt(d_area[-1]**2 + dy[-1]**2))
    else:
        velocity = 0.0; lateral_velocity = 0.0
        bbox_area_growth = 0.0; max_growth = 0.0; last_growth = 0.0
        lateral_cross = 0.0; ttc_proxy = 30.0
        enter = False; leave = False; approach = 0.0

    ego_overlap = float(in_ego.mean())
    min_dist_x = float(np.abs(cx - 0.5).min())

    last_quarter_start = max(0, int(0.75 * n_frames))
    last_quarter = [d for d in ds if d.frame_idx >= last_quarter_start]
    if last_quarter:
        clear = 1.0 - float(np.mean([d.in_ego_path for d in last_quarter]))
    else:
        clear = 0.5    # uncertain

    track_conf = float(confs.mean())
    n_tracks   = float(np.log1p(len(tracks)))
    track_len_norm = float(len(ds) / max(1, n_frames))

    feat = np.asarray([
        velocity,
        lateral_velocity,
        bbox_area_growth,
        max_growth,
        last_growth,
        ego_overlap,
        min_dist_x,
        approach,
        lateral_cross,
        ttc_proxy,
        float(enter),
        float(leave),
        clear,
        track_conf,
        n_tracks,
        track_len_norm,
    ], dtype=np.float32)
    assert feat.shape == (D_OBJ,), (feat.shape, D_OBJ)
    return feat, tracks_summary, quality


# ─── reserved-channel placeholder schema ─────────────────────────────────────

def empty_reserved_slots() -> Dict:
    """Per Red Line 3: schema must reserve fields for SAM2 / CoTracker /
    flow / depth even though Day-9 fast path doesn't fill them."""
    return {
        "sam2_masks":            None,
        "cotracker_points":      None,
        "raft_flow_per_frame":   None,
        "sea_raft_flow":         None,
        "video_depth_anything":  None,
        "actor_depth_trend":     None,
        "filled":                False,
    }