| """ |
| 文本上下文构造器 |
| 将结构化的车辆/ADAS数据序列化为VLM可理解的文本 |
| |
| Here needs some modification, need to get the correct data segemnts. For ACC, LKA.... |
| |
| """ |
|
|
| from typing import Dict, Optional, Any |
| from enum import IntEnum |
|
|
|
|
| class Action(IntEnum): |
| """动作枚举""" |
| SILENT = 0 |
| OBSERVE = 1 |
| ALERT = 2 |
|
|
|
|
| def build_vehicle_context( |
| openpilot_data: Dict[str, Any], |
| prev_action: Optional[int] = None, |
| prev_tta: Optional[float] = None, |
| missing_modalities: Optional[list] = None |
| ) -> str: |
| """ |
| 构造车辆状态上下文文本 |
| |
| Args: |
| openpilot_data: 车辆/ADAS数据字典 |
| prev_action: 上一步动作(0/1/2) |
| prev_tta: 上一步TTA估计 |
| missing_modalities: 缺失的模态列表 |
| |
| Returns: |
| 格式化的文本上下文 |
| """ |
| |
| action_names = {0: "silent", 1: "observe", 2: "alert"} |
| |
| |
| context = f"""Vehicle State:""" |
| |
| |
| if 'speed' in openpilot_data: |
| context += f"\n- Speed: {openpilot_data['speed']:.1f} km/h" |
| else: |
| context += f"\n- Speed: Unknown" |
| |
| |
| if 'acc' in openpilot_data and 'lka' in openpilot_data: |
| acc_status = 'ON' if openpilot_data['acc'] else 'OFF' |
| lka_status = 'ON' if openpilot_data['lka'] else 'OFF' |
| context += f"\n- ACC: {acc_status}, LKA: {lka_status}" |
| else: |
| context += f"\n- ADAS: Unknown (assumed OFF)" |
| |
| |
| if 'lane_left_prob' in openpilot_data and 'lane_right_prob' in openpilot_data: |
| context += f"\n- Lane confidence: L={openpilot_data['lane_left_prob']:.2f}, R={openpilot_data['lane_right_prob']:.2f}" |
| |
| |
| if 'path_confidence' in openpilot_data: |
| context += f"\n- Path plan confidence: {openpilot_data['path_confidence']:.2f}" |
| |
| |
| if 'lateral_offset' in openpilot_data: |
| context += f"\n- Lateral offset: {openpilot_data['lateral_offset']:.2f}m" |
| |
| |
| if 'steering_angle' in openpilot_data: |
| context += f"\n- Steering angle: {openpilot_data['steering_angle']:.1f}°" |
| |
| |
| context += f"\n\nEnvironment:" |
| if 'weather' in openpilot_data: |
| context += f"\n- Weather: {openpilot_data['weather']}" |
| if 'time_of_day' in openpilot_data: |
| context += f"\n- Time: {openpilot_data['time_of_day']}" |
| if 'road_type' in openpilot_data: |
| context += f"\n- Road type: {openpilot_data['road_type']}" |
| |
| |
| if prev_action is not None and prev_tta is not None: |
| context += f"\n\nPrevious State:" |
| context += f"\n- Action taken: {action_names[prev_action]}" |
| context += f"\n- TTA estimate: {prev_tta:.2f}s" |
| |
| |
| if prev_action == Action.OBSERVE: |
| context += f"\n\n[Extended observation with focused spatial attention]" |
| context += f"\n[Temporal window: 3s | Spatial: ROI applied]" |
| |
| |
| if missing_modalities: |
| context += f"\n\n[Note: Missing modalities: {', '.join(missing_modalities)}]" |
| if 'dms' in missing_modalities: |
| context += f"\n[Driver state inferred from ADAS/scene context]" |
| if 'can_data' in missing_modalities: |
| context += f"\n[Vehicle telemetry estimated from visual cues]" |
| |
| |
| context += f"\n\nTask: Estimate time-to-accident (TTA) from multimodal observations." |
| |
| return context |
|
|
|
|
| def build_simple_context( |
| speed: float = 60.0, |
| weather: str = "clear", |
| prev_action: Optional[int] = None |
| ) -> str: |
| """ |
| 构造简化的上下文(用于快速测试) |
| |
| Args: |
| speed: 车速 (km/h) |
| weather: 天气条件 |
| prev_action: 上一步动作 |
| |
| Returns: |
| 简化的文本上下文 |
| """ |
| action_names = {0: "silent", 1: "observe", 2: "alert"} |
| |
| context = f"""Vehicle State: |
| - Speed: {speed:.1f} km/h |
| - ADAS: OFF (human driving) |
| |
| Environment: |
| - Weather: {weather} |
| """ |
| |
| if prev_action is not None: |
| context += f"\nPrevious Action: {action_names[prev_action]}" |
| |
| if prev_action == Action.OBSERVE: |
| context += f"\n[Extended observation mode]" |
| |
| context += f"\n\nTask: Estimate TTA." |
| |
| return context |
|
|
|
|
| def parse_context_from_text(context_text: str) -> Dict[str, Any]: |
| """ |
| 从文本上下文中解析出结构化数据(用于调试) |
| |
| Args: |
| context_text: 文本上下文 |
| |
| Returns: |
| 解析后的字典 |
| """ |
| data = {} |
| |
| |
| lines = context_text.split('\n') |
| for line in lines: |
| if 'Speed:' in line: |
| try: |
| data['speed'] = float(line.split(':')[1].strip().split()[0]) |
| except: |
| pass |
| elif 'ACC:' in line: |
| data['acc'] = 'ON' in line |
| elif 'LKA:' in line: |
| data['lka'] = 'ON' in line |
| elif 'Weather:' in line: |
| data['weather'] = line.split(':')[1].strip() |
| |
| return data |