Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import cv2 | |
| from scipy.optimize import linear_sum_assignment | |
| from motion_estimator import GlobalMotionEstimator | |
| class PointTrack: | |
| def __init__(self, pt, track_id): | |
| self.id = track_id | |
| self.pt = np.array(pt, dtype=np.float32) | |
| self.velocity = 0.0 | |
| self.time_since_update = 0 | |
| class Tracker: | |
| def __init__(self, max_distance=50.0, max_age=5): | |
| self.max_distance = max_distance | |
| self.max_age = max_age | |
| self.tracks = [] | |
| self.next_id = 1 | |
| self.motion_estimator = GlobalMotionEstimator() | |
| def apply_motion_compensation(self, transform): | |
| if transform is None or len(self.tracks) == 0: | |
| return | |
| # transform: 2x3 matrix | |
| pts = np.array([t.pt for t in self.tracks], dtype=np.float32).reshape(-1, 1, 2) | |
| pts_transformed = cv2.transform(pts, transform).reshape(-1, 2) | |
| for i, t in enumerate(self.tracks): | |
| t.pt = pts_transformed[i] | |
| def update(self, frame_bgr, detected_points): | |
| """ | |
| Updates standard tracking variables and detects chaos. | |
| Returns: | |
| tuple[list[PointTrack], int, bool]: active tracks, cumulative unique count, and anomaly flag. | |
| """ | |
| # 1. Global motion compensation via Drone drift | |
| transform = self.motion_estimator.update(frame_bgr) | |
| self.apply_motion_compensation(transform) | |
| # 2. Increment age for all | |
| for t in self.tracks: | |
| t.time_since_update += 1 | |
| detected_points = np.array(detected_points, dtype=np.float32) | |
| if len(self.tracks) == 0: | |
| # First initialization | |
| for pt in detected_points: | |
| self.tracks.append(PointTrack(pt, self.next_id)) | |
| self.next_id += 1 | |
| return self.tracks.copy(), self.next_id - 1, False | |
| if len(detected_points) == 0: | |
| # No points detected, clear out old ones based on constraint | |
| self.tracks = [t for t in self.tracks if t.time_since_update <= self.max_age] | |
| return self.tracks.copy(), self.next_id - 1, False | |
| # 3. Hungarian matching assignment optimally pairing tracked to current | |
| track_pts = np.array([t.pt for t in self.tracks], dtype=np.float32) | |
| # Cost matrix: NxM distance mapping | |
| diff = track_pts[:, np.newaxis, :] - detected_points[np.newaxis, :, :] | |
| dist_matrix = np.sqrt(np.sum(diff**2, axis=2)) | |
| # Optimization resolution | |
| row_ind, col_ind = linear_sum_assignment(dist_matrix) | |
| assigned_tracks = set() | |
| assigned_detections = set() | |
| for r, c in zip(row_ind, col_ind): | |
| if dist_matrix[r, c] <= self.max_distance: | |
| # Update velocity (distance from last position) | |
| self.tracks[r].velocity = dist_matrix[r, c] | |
| self.tracks[r].pt = detected_points[c] | |
| self.tracks[r].time_since_update = 0 | |
| assigned_tracks.add(r) | |
| assigned_detections.add(c) | |
| # 4. Handle unassigned fresh detections | |
| for i, pt in enumerate(detected_points): | |
| if i not in assigned_detections: | |
| self.tracks.append(PointTrack(pt, self.next_id)) | |
| self.next_id += 1 | |
| # 5. Remove permanently lost/dead tracks | |
| self.tracks = [t for t in self.tracks if t.time_since_update <= self.max_age] | |
| # 6. Chaos detection criteria: if > 5 tracks are moving anomalously rapidly | |
| chaotic_count = sum(1 for t in self.tracks if t.velocity > self.max_distance * 0.7 and t.time_since_update == 0) | |
| anomaly = chaotic_count >= 5 | |
| return self.tracks.copy(), self.next_id - 1, anomaly | |