""" 文本上下文构造器 将结构化的车辆/ADAS数据序列化为VLM可理解的文本 Here needs some modification, need to get the correct data segemnts. For ACC, LKA.... """ from typing import Dict, Optional, Any from enum import IntEnum class Action(IntEnum): """动作枚举""" SILENT = 0 OBSERVE = 1 ALERT = 2 def build_vehicle_context( openpilot_data: Dict[str, Any], prev_action: Optional[int] = None, prev_tta: Optional[float] = None, missing_modalities: Optional[list] = None ) -> str: """ 构造车辆状态上下文文本 Args: openpilot_data: 车辆/ADAS数据字典 prev_action: 上一步动作(0/1/2) prev_tta: 上一步TTA估计 missing_modalities: 缺失的模态列表 Returns: 格式化的文本上下文 """ # 动作名称映射 action_names = {0: "silent", 1: "observe", 2: "alert"} # 基础车辆状态 context = f"""Vehicle State:""" # 速度 if 'speed' in openpilot_data: context += f"\n- Speed: {openpilot_data['speed']:.1f} km/h" else: context += f"\n- Speed: Unknown" # ADAS状态 if 'acc' in openpilot_data and 'lka' in openpilot_data: acc_status = 'ON' if openpilot_data['acc'] else 'OFF' lka_status = 'ON' if openpilot_data['lka'] else 'OFF' context += f"\n- ACC: {acc_status}, LKA: {lka_status}" else: context += f"\n- ADAS: Unknown (assumed OFF)" # 车道置信度 if 'lane_left_prob' in openpilot_data and 'lane_right_prob' in openpilot_data: context += f"\n- Lane confidence: L={openpilot_data['lane_left_prob']:.2f}, R={openpilot_data['lane_right_prob']:.2f}" # 路径规划置信度 if 'path_confidence' in openpilot_data: context += f"\n- Path plan confidence: {openpilot_data['path_confidence']:.2f}" # 横向偏移 if 'lateral_offset' in openpilot_data: context += f"\n- Lateral offset: {openpilot_data['lateral_offset']:.2f}m" # 转向角(如果有) if 'steering_angle' in openpilot_data: context += f"\n- Steering angle: {openpilot_data['steering_angle']:.1f}°" # 环境信息 context += f"\n\nEnvironment:" if 'weather' in openpilot_data: context += f"\n- Weather: {openpilot_data['weather']}" if 'time_of_day' in openpilot_data: context += f"\n- Time: {openpilot_data['time_of_day']}" if 'road_type' in openpilot_data: context += f"\n- Road type: {openpilot_data['road_type']}" # 历史信息(如果有) if prev_action is not None and prev_tta is not None: context += f"\n\nPrevious State:" context += f"\n- Action taken: {action_names[prev_action]}" context += f"\n- TTA estimate: {prev_tta:.2f}s" # OBSERVE动作的特殊标记 if prev_action == Action.OBSERVE: context += f"\n\n[Extended observation with focused spatial attention]" context += f"\n[Temporal window: 3s | Spatial: ROI applied]" # 缺失模态警告 if missing_modalities: context += f"\n\n[Note: Missing modalities: {', '.join(missing_modalities)}]" if 'dms' in missing_modalities: context += f"\n[Driver state inferred from ADAS/scene context]" if 'can_data' in missing_modalities: context += f"\n[Vehicle telemetry estimated from visual cues]" # 任务描述 context += f"\n\nTask: Estimate time-to-accident (TTA) from multimodal observations." return context def build_simple_context( speed: float = 60.0, weather: str = "clear", prev_action: Optional[int] = None ) -> str: """ 构造简化的上下文(用于快速测试) Args: speed: 车速 (km/h) weather: 天气条件 prev_action: 上一步动作 Returns: 简化的文本上下文 """ action_names = {0: "silent", 1: "observe", 2: "alert"} context = f"""Vehicle State: - Speed: {speed:.1f} km/h - ADAS: OFF (human driving) Environment: - Weather: {weather} """ if prev_action is not None: context += f"\nPrevious Action: {action_names[prev_action]}" if prev_action == Action.OBSERVE: context += f"\n[Extended observation mode]" context += f"\n\nTask: Estimate TTA." return context def parse_context_from_text(context_text: str) -> Dict[str, Any]: """ 从文本上下文中解析出结构化数据(用于调试) Args: context_text: 文本上下文 Returns: 解析后的字典 """ data = {} # 简单的关键词提取 lines = context_text.split('\n') for line in lines: if 'Speed:' in line: try: data['speed'] = float(line.split(':')[1].strip().split()[0]) except: pass elif 'ACC:' in line: data['acc'] = 'ON' in line elif 'LKA:' in line: data['lka'] = 'ON' in line elif 'Weather:' in line: data['weather'] = line.split(':')[1].strip() return data