File size: 5,177 Bytes
1e05592
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""
文本上下文构造器
将结构化的车辆/ADAS数据序列化为VLM可理解的文本

Here needs some modification, need to get the correct data segemnts. For ACC, LKA....

"""

from typing import Dict, Optional, Any
from enum import IntEnum


class Action(IntEnum):
    """动作枚举"""
    SILENT = 0
    OBSERVE = 1
    ALERT = 2


def build_vehicle_context(
    openpilot_data: Dict[str, Any],
    prev_action: Optional[int] = None,
    prev_tta: Optional[float] = None,
    missing_modalities: Optional[list] = None
) -> str:
    """
    构造车辆状态上下文文本
    
    Args:
        openpilot_data: 车辆/ADAS数据字典
        prev_action: 上一步动作(0/1/2)
        prev_tta: 上一步TTA估计
        missing_modalities: 缺失的模态列表
    
    Returns:
        格式化的文本上下文
    """
    # 动作名称映射
    action_names = {0: "silent", 1: "observe", 2: "alert"}
    
    # 基础车辆状态
    context = f"""Vehicle State:"""
    
    # 速度
    if 'speed' in openpilot_data:
        context += f"\n- Speed: {openpilot_data['speed']:.1f} km/h"
    else:
        context += f"\n- Speed: Unknown"
    
    # ADAS状态
    if 'acc' in openpilot_data and 'lka' in openpilot_data:
        acc_status = 'ON' if openpilot_data['acc'] else 'OFF'
        lka_status = 'ON' if openpilot_data['lka'] else 'OFF'
        context += f"\n- ACC: {acc_status}, LKA: {lka_status}"
    else:
        context += f"\n- ADAS: Unknown (assumed OFF)"
    
    # 车道置信度
    if 'lane_left_prob' in openpilot_data and 'lane_right_prob' in openpilot_data:
        context += f"\n- Lane confidence: L={openpilot_data['lane_left_prob']:.2f}, R={openpilot_data['lane_right_prob']:.2f}"
    
    # 路径规划置信度
    if 'path_confidence' in openpilot_data:
        context += f"\n- Path plan confidence: {openpilot_data['path_confidence']:.2f}"
    
    # 横向偏移
    if 'lateral_offset' in openpilot_data:
        context += f"\n- Lateral offset: {openpilot_data['lateral_offset']:.2f}m"
    
    # 转向角(如果有)
    if 'steering_angle' in openpilot_data:
        context += f"\n- Steering angle: {openpilot_data['steering_angle']:.1f}°"
    
    # 环境信息
    context += f"\n\nEnvironment:"
    if 'weather' in openpilot_data:
        context += f"\n- Weather: {openpilot_data['weather']}"
    if 'time_of_day' in openpilot_data:
        context += f"\n- Time: {openpilot_data['time_of_day']}"
    if 'road_type' in openpilot_data:
        context += f"\n- Road type: {openpilot_data['road_type']}"
    
    # 历史信息(如果有)
    if prev_action is not None and prev_tta is not None:
        context += f"\n\nPrevious State:"
        context += f"\n- Action taken: {action_names[prev_action]}"
        context += f"\n- TTA estimate: {prev_tta:.2f}s"
    
    # OBSERVE动作的特殊标记
    if prev_action == Action.OBSERVE:
        context += f"\n\n[Extended observation with focused spatial attention]"
        context += f"\n[Temporal window: 3s | Spatial: ROI applied]"
    
    # 缺失模态警告
    if missing_modalities:
        context += f"\n\n[Note: Missing modalities: {', '.join(missing_modalities)}]"
        if 'dms' in missing_modalities:
            context += f"\n[Driver state inferred from ADAS/scene context]"
        if 'can_data' in missing_modalities:
            context += f"\n[Vehicle telemetry estimated from visual cues]"
    
    # 任务描述
    context += f"\n\nTask: Estimate time-to-accident (TTA) from multimodal observations."
    
    return context


def build_simple_context(
    speed: float = 60.0,
    weather: str = "clear",
    prev_action: Optional[int] = None
) -> str:
    """
    构造简化的上下文(用于快速测试)
    
    Args:
        speed: 车速 (km/h)
        weather: 天气条件
        prev_action: 上一步动作
    
    Returns:
        简化的文本上下文
    """
    action_names = {0: "silent", 1: "observe", 2: "alert"}
    
    context = f"""Vehicle State:
- Speed: {speed:.1f} km/h
- ADAS: OFF (human driving)

Environment:
- Weather: {weather}
"""
    
    if prev_action is not None:
        context += f"\nPrevious Action: {action_names[prev_action]}"
    
    if prev_action == Action.OBSERVE:
        context += f"\n[Extended observation mode]"
    
    context += f"\n\nTask: Estimate TTA."
    
    return context


def parse_context_from_text(context_text: str) -> Dict[str, Any]:
    """
    从文本上下文中解析出结构化数据(用于调试)
    
    Args:
        context_text: 文本上下文
    
    Returns:
        解析后的字典
    """
    data = {}
    
    # 简单的关键词提取
    lines = context_text.split('\n')
    for line in lines:
        if 'Speed:' in line:
            try:
                data['speed'] = float(line.split(':')[1].strip().split()[0])
            except:
                pass
        elif 'ACC:' in line:
            data['acc'] = 'ON' in line
        elif 'LKA:' in line:
            data['lka'] = 'ON' in line
        elif 'Weather:' in line:
            data['weather'] = line.split(':')[1].strip()
    
    return data