File size: 5,177 Bytes
1e05592 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | """
文本上下文构造器
将结构化的车辆/ADAS数据序列化为VLM可理解的文本
Here needs some modification, need to get the correct data segemnts. For ACC, LKA....
"""
from typing import Dict, Optional, Any
from enum import IntEnum
class Action(IntEnum):
"""动作枚举"""
SILENT = 0
OBSERVE = 1
ALERT = 2
def build_vehicle_context(
openpilot_data: Dict[str, Any],
prev_action: Optional[int] = None,
prev_tta: Optional[float] = None,
missing_modalities: Optional[list] = None
) -> str:
"""
构造车辆状态上下文文本
Args:
openpilot_data: 车辆/ADAS数据字典
prev_action: 上一步动作(0/1/2)
prev_tta: 上一步TTA估计
missing_modalities: 缺失的模态列表
Returns:
格式化的文本上下文
"""
# 动作名称映射
action_names = {0: "silent", 1: "observe", 2: "alert"}
# 基础车辆状态
context = f"""Vehicle State:"""
# 速度
if 'speed' in openpilot_data:
context += f"\n- Speed: {openpilot_data['speed']:.1f} km/h"
else:
context += f"\n- Speed: Unknown"
# ADAS状态
if 'acc' in openpilot_data and 'lka' in openpilot_data:
acc_status = 'ON' if openpilot_data['acc'] else 'OFF'
lka_status = 'ON' if openpilot_data['lka'] else 'OFF'
context += f"\n- ACC: {acc_status}, LKA: {lka_status}"
else:
context += f"\n- ADAS: Unknown (assumed OFF)"
# 车道置信度
if 'lane_left_prob' in openpilot_data and 'lane_right_prob' in openpilot_data:
context += f"\n- Lane confidence: L={openpilot_data['lane_left_prob']:.2f}, R={openpilot_data['lane_right_prob']:.2f}"
# 路径规划置信度
if 'path_confidence' in openpilot_data:
context += f"\n- Path plan confidence: {openpilot_data['path_confidence']:.2f}"
# 横向偏移
if 'lateral_offset' in openpilot_data:
context += f"\n- Lateral offset: {openpilot_data['lateral_offset']:.2f}m"
# 转向角(如果有)
if 'steering_angle' in openpilot_data:
context += f"\n- Steering angle: {openpilot_data['steering_angle']:.1f}°"
# 环境信息
context += f"\n\nEnvironment:"
if 'weather' in openpilot_data:
context += f"\n- Weather: {openpilot_data['weather']}"
if 'time_of_day' in openpilot_data:
context += f"\n- Time: {openpilot_data['time_of_day']}"
if 'road_type' in openpilot_data:
context += f"\n- Road type: {openpilot_data['road_type']}"
# 历史信息(如果有)
if prev_action is not None and prev_tta is not None:
context += f"\n\nPrevious State:"
context += f"\n- Action taken: {action_names[prev_action]}"
context += f"\n- TTA estimate: {prev_tta:.2f}s"
# OBSERVE动作的特殊标记
if prev_action == Action.OBSERVE:
context += f"\n\n[Extended observation with focused spatial attention]"
context += f"\n[Temporal window: 3s | Spatial: ROI applied]"
# 缺失模态警告
if missing_modalities:
context += f"\n\n[Note: Missing modalities: {', '.join(missing_modalities)}]"
if 'dms' in missing_modalities:
context += f"\n[Driver state inferred from ADAS/scene context]"
if 'can_data' in missing_modalities:
context += f"\n[Vehicle telemetry estimated from visual cues]"
# 任务描述
context += f"\n\nTask: Estimate time-to-accident (TTA) from multimodal observations."
return context
def build_simple_context(
speed: float = 60.0,
weather: str = "clear",
prev_action: Optional[int] = None
) -> str:
"""
构造简化的上下文(用于快速测试)
Args:
speed: 车速 (km/h)
weather: 天气条件
prev_action: 上一步动作
Returns:
简化的文本上下文
"""
action_names = {0: "silent", 1: "observe", 2: "alert"}
context = f"""Vehicle State:
- Speed: {speed:.1f} km/h
- ADAS: OFF (human driving)
Environment:
- Weather: {weather}
"""
if prev_action is not None:
context += f"\nPrevious Action: {action_names[prev_action]}"
if prev_action == Action.OBSERVE:
context += f"\n[Extended observation mode]"
context += f"\n\nTask: Estimate TTA."
return context
def parse_context_from_text(context_text: str) -> Dict[str, Any]:
"""
从文本上下文中解析出结构化数据(用于调试)
Args:
context_text: 文本上下文
Returns:
解析后的字典
"""
data = {}
# 简单的关键词提取
lines = context_text.split('\n')
for line in lines:
if 'Speed:' in line:
try:
data['speed'] = float(line.split(':')[1].strip().split()[0])
except:
pass
elif 'ACC:' in line:
data['acc'] = 'ON' in line
elif 'LKA:' in line:
data['lka'] = 'ON' in line
elif 'Weather:' in line:
data['weather'] = line.split(':')[1].strip()
return data |