#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import json from pathlib import Path from typing import Any, Dict, List, Optional import gradio as gr # Disable telemetry / analytics for spaces os.environ.setdefault("GRADIO_ANALYTICS_ENABLED", "false") os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1") BASE_DIR = Path(__file__).parent PATCHAD_CASE_DIR = BASE_DIR / "demo_patchad_cases" PATCHAD_CASE_MANIFEST = PATCHAD_CASE_DIR / "manifest.json" # --- load PatchAD case manifest --- PATCHAD_CASES = {} # {case_name: {"platform": path, "official": path}} PATCHAD_CASE_INFO = {} # 存储案例的详细信息 if PATCHAD_CASE_MANIFEST.exists(): try: manifest = json.loads(PATCHAD_CASE_MANIFEST.read_text(encoding="utf-8")) for item in manifest: display = item.get("title") file_platform = item.get("file_platform") file_official = item.get("file_official") if display: case_data = { "sample": item.get("sample", ""), "description": item.get("description", ""), } if file_platform: path_platform = PATCHAD_CASE_DIR / file_platform if path_platform.exists(): case_data["platform"] = path_platform if file_official: path_official = PATCHAD_CASE_DIR / file_official if path_official.exists(): case_data["official"] = path_official if case_data.get("platform") or case_data.get("official"): PATCHAD_CASES[display] = case_data PATCHAD_CASE_INFO[display] = case_data except Exception: PATCHAD_CASES = {} PATCHAD_CASE_INFO = {} PATCHAD_CASE_NAMES = list(PATCHAD_CASES.keys()) # --- 导入实际执行模块 --- try: from utils.case_builder import CaseBuilder from utils.patchad_filter import PatchADFilter, PrecheckServer MODULES_AVAILABLE = True except ImportError as e: print(f"⚠️ 模块导入失败: {e}") MODULES_AVAILABLE = False def load_case_data(case_name: str, has_patchad: bool): """加载案例数据""" if not PATCHAD_CASES: return None case_info = PATCHAD_CASES.get(case_name) if not case_info: return None # 根据外部平台是否有PatchAD选择对应的文件 if has_patchad: path = case_info.get("platform") else: path = case_info.get("official") if not path: return None try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return None def get_case_raw_data(case_name: str) -> tuple: """获取案例的原始数据(JSONL格式)和实际使用说明""" if not PATCHAD_CASE_INFO: return None, None info = PATCHAD_CASE_INFO.get(case_name) if not info: return None, None sample = info.get("sample", "") if not sample: return None, None # 查找原始数据文件 data_file = BASE_DIR / "data_storage" / "users" / f"{sample}.jsonl" if not data_file.exists(): return None, None try: # 读取完整文件并统计(一次性读取,避免重复打开文件) all_lines = [] with open(data_file, "r", encoding="utf-8") as f: for line in f: if line.strip(): all_lines.append(line.strip()) total_lines = len(all_lines) if total_lines == 0: return "❌ 数据文件为空", None # 读取前12条作为"当前窗口"示例 current_window_lines = all_lines[:12] if len(all_lines) >= 12 else all_lines # 读取第13-24条作为"历史窗口"示例(如果存在) history_window_lines = [] if len(all_lines) >= 24: history_window_lines = all_lines[12:24] # 第二天的窗口 # 计算实际可用的天数(每12条为1天) available_days = total_lines // 12 history_days_needed = min(7, available_days - 1) # 最多7天,至少需要1天当前窗口 history_records_needed = history_days_needed * 12 if history_days_needed > 0 else 0 total_records_needed = 12 + history_records_needed # 当前窗口 + 历史窗口 # 构建说明文本(动态使用实际数据量) usage_info = f""" ## 📊 数据使用说明 ### 完整原始数据文件 - **总记录数**:**{total_lines} 条**(这是存储的完整历史数据) - **可提供天数**:约 {available_days} 天(每12条为1天) ### 实际传入的数据量 #### 步骤①:PatchAD 预筛 - **需要数据**:**12条记录**(当前窗口,约1小时的数据) - **数据来源**:从完整文件中取最后12条,或实时采集的最近1小时数据 - **用途**:快速判断当前窗口是否有异常 #### 步骤②:CaseBuilder 构建 - **需要数据**: - `window_data`:**12条**(当前窗口) - `history_windows`:**3-7天的历史窗口**(每天12条,共36-84条) - **本案例实际可用**:**{total_records_needed}条记录**(12条当前窗口 + {history_records_needed}条历史窗口,共{history_days_needed}天) - **数据来源**:从完整文件中按日期分割,取最近几天的数据 ### 示例数据格式 **当前窗口(12条)**:用于步骤①和步骤②的 `window_data` **历史窗口(每天12条)**:用于步骤②的 `history_windows` ⚠️ **重要**:外部平台不需要一次性传入全部 **{total_lines}条** 数据,而是: 1. 步骤①:传入当前窗口(**12条**) 2. 步骤②:传入当前窗口(**12条**)+ 历史窗口(**3-7天,每天12条,共{history_records_needed}条**) """ # 构建数据预览(显示当前窗口和历史窗口示例) preview_lines = [] preview_lines.append("# 当前窗口示例(12条,用于步骤①)") preview_lines.append("\n".join(current_window_lines)) if history_window_lines: preview_lines.append("\n\n# 历史窗口示例(第2天的12条,用于步骤②)") preview_lines.append("\n".join(history_window_lines)) preview_lines.append(f"\n\n# 完整文件信息") preview_lines.append(f"# 总记录数:{total_lines} 条(实际存储的完整历史数据)") preview_lines.append(f"# 实际使用:步骤①需要12条,步骤②需要12条+历史窗口(本案例可用{history_days_needed}天,共{total_records_needed}条)") return "\n".join(preview_lines), usage_info except Exception as e: import traceback error_msg = f"❌ 读取原始数据失败: {str(e)}\n{traceback.format_exc()}" return error_msg, None def load_jsonl(path: Path) -> List[Dict[str, Any]]: """加载 JSONL 文件""" records = [] with open(path, "r", encoding="utf-8") as f: for line in f: if line.strip(): records.append(json.loads(line)) return records def split_windows(records: List[Dict[str, Any]], days: int = 3) -> List[Dict[str, Any]]: """将记录按日期分割成窗口""" by_date: Dict[str, List[Dict[str, Any]]] = {} for rec in records: ts = rec.get("timestamp", "") date = ts[:10] if len(ts) >= 10 else "unknown" by_date.setdefault(date, []).append(rec) windows = [] sorted_dates = sorted(by_date.keys()) for date in sorted_dates: day_records = by_date[date][:12] if len(day_records) >= 12: windows.append({"date": date, "window": day_records}) if len(windows) >= days: break if windows and 0 < len(windows) < days: last = windows[-1] for idx in range(days - len(windows)): clone = { "date": f"{last['date']}+copy{idx+1}", "window": last["window"], } windows.append(clone) return windows def sample_profile() -> Dict[str, Any]: """示例用户档案""" return { "age_group": "30-35岁", "estimated_age": 33, "sex": "男性", "exercise": "每周5次以上", "coffee": "偶尔", "smoking": "不吸烟", "drinking": "经常饮酒", "MEQ": 62.0, "MEQ_type": "晨型", } # --- 示例数据加载函数 --- def load_example_step1() -> str: """加载步骤①的示例数据(12条数据)- 使用异常数据""" demo_file = BASE_DIR / "data_storage" / "users" / "demo_anomaly.jsonl" if demo_file.exists(): records = load_jsonl(demo_file) # 取最后12条 window = records[-12:] if len(records) >= 12 else records return "\n".join(json.dumps(r, ensure_ascii=False) for r in window) return "" def load_example_step2() -> str: """加载步骤②的示例数据(build_case payload)- 使用异常数据""" # 从demo_anomaly案例中构建示例payload demo_file = BASE_DIR / "data_storage" / "users" / "demo_anomaly.jsonl" if demo_file.exists(): records = load_jsonl(demo_file) window = records[-12:] if len(records) >= 12 else records history_windows = split_windows(records, days=3) user_id = window[0].get("deviceId", "demo_anomaly") if window else "demo_anomaly" payload = { "user_id": user_id, "window_data": window, "history_windows": history_windows, "user_profile": sample_profile(), "metadata": { "detector": "realtime_patchad", "patchad_score": 0.25, # 异常数据的分数会更高 "threshold": 0.18, }, } return json.dumps(payload, ensure_ascii=False, indent=2) return "" def load_example_step3() -> str: """加载步骤③的示例数据(case_bundle)- 使用异常数据""" # 从demo_anomaly_platform.json中读取 case_file = PATCHAD_CASE_DIR / "demo_anomaly_platform.json" if case_file.exists(): data = json.loads(case_file.read_text(encoding="utf-8")) bundle = data.get("case_bundle", {}) return json.dumps(bundle, ensure_ascii=False, indent=2) return "" def load_example_step4() -> str: """加载步骤④的示例数据(case_bundle,同步骤③)- 使用异常数据""" return load_example_step3() def load_example_step5() -> str: """加载步骤⑤的示例数据(LLM输入Markdown)- 使用异常数据""" case_file = PATCHAD_CASE_DIR / "demo_anomaly_platform.json" if case_file.exists(): data = json.loads(case_file.read_text(encoding="utf-8")) bundle = data.get("case_bundle", {}) return bundle.get("llm_input", "") return "" # --- 实时执行函数 --- def execute_step1_realtime_new(input_text: str) -> tuple: """实时执行步骤1:PatchAD 预筛(新版本,从输入框读取)""" if not MODULES_AVAILABLE: return {}, "❌ 模块未加载,无法执行" if not input_text or not input_text.strip(): return {}, "❌ 请输入数据(12条JSON对象,每行一个)" try: # 解析输入文本(每行一个JSON) lines = [line.strip() for line in input_text.strip().split("\n") if line.strip()] records = [] for line in lines: try: records.append(json.loads(line)) except json.JSONDecodeError as e: return {}, f"❌ JSON格式错误(第{len(records)+1}行):{str(e)}\n\n错误行内容:{line[:100]}..." if len(records) < 12: return {}, f"❌ 数据不足12条(当前{len(records)}条),请至少输入12条数据" # 获取当前窗口(最后12条) current_window = records[-12:] user_id = current_window[0].get("deviceId", "demo_user") if current_window else "demo_user" # 执行 PatchAD 预筛 patchad = PatchADFilter(score_threshold=0.18) precheck_result = patchad.precheck(user_id, current_window) # 提取值用于说明 status = precheck_result.get("status", "unknown") score = precheck_result.get("score", 0.0) threshold = precheck_result.get("threshold", 0.0) event_id = precheck_result.get("event_id", "N/A") user_id_result = precheck_result.get("user_id", user_id) created_at = precheck_result.get("created_at", "N/A") explanation = f""" ## 步骤 ①:PatchAD 预筛(实时执行) ### 功能说明 ✅ **已实时执行** PatchAD 预筛,分析当前数据窗口。 ### 工作原理 - 分析窗口内的 HRV、心率、基线偏差等关键指标 - 计算异常得分(0-1 范围) - 与预设阈值比较,判断是否需要继续处理 ### 当前结果摘要 - **状态**: {status} - **得分**: {score:.4f} - **阈值**: {threshold:.4f} - **Event ID**: {event_id} - **用户ID**: {user_id_result} - **创建时间**: {created_at} ### precheck 结果结构 返回的 `precheck_result` 包含以下字段: - **event_id**: 唯一事件标识符(用于后续步骤关联) - **user_id**: 用户ID - **status**: 异常状态("abnormal" 或 "normal") - **score**: 异常得分(0-1 范围,越高越异常) - **threshold**: 预设阈值 - **created_at**: 创建时间戳(ISO 8601 格式) ### 决策逻辑 - ✅ **status = "abnormal"**:得分超过阈值,继续后续流程 - ❌ **status = "normal"**:得分低于阈值,终止流程(无需生成健康方案) ### 下一步 如果预筛通过(status = "abnormal"),系统将汇总用户信息、历史数据,进入 CaseBuilder 构建阶段。 """ # 返回完整的 precheck_result,而不是简化的摘要 # 同时添加步骤标识 result_with_metadata = { "步骤": "① PatchAD 预筛(实时执行)", "precheck_result": precheck_result } return ( result_with_metadata, # 中间列:输出JSON explanation, # 右侧列:说明 gr.update(visible=True) # 显示中间列 ) except Exception as e: import traceback error_msg = f"❌ 执行错误: {str(e)}\n```\n{traceback.format_exc()}\n```" return {}, error_msg, gr.update(visible=True) def execute_step1_realtime(data_file: Optional[str], data_json: Optional[str]) -> tuple: """实时执行步骤1:PatchAD 预筛""" if not MODULES_AVAILABLE: return {}, "❌ 模块未加载,无法执行", gr.update(visible=False), gr.update(visible=False) try: # 加载数据 records = [] if data_file: records = load_jsonl(Path(data_file)) elif data_json: try: data = json.loads(data_json) if isinstance(data, list): records = data elif isinstance(data, dict): records = [data] except: return {}, "❌ JSON 格式错误", gr.update(visible=False), gr.update(visible=False) else: return {}, "❌ 请上传文件或输入 JSON 数据", gr.update(visible=False), gr.update(visible=False) if not records: return {}, "❌ 数据为空", gr.update(visible=False), gr.update(visible=False) # 获取当前窗口(最后12条) current_window = records[-12:] if len(records) >= 12 else records user_id = current_window[0].get("deviceId", "demo_user") if current_window else "demo_user" # 执行 PatchAD 预筛 patchad = PatchADFilter(score_threshold=0.18) precheck_result = patchad.precheck(user_id, current_window) step1_result = { "步骤": "① PatchAD 预筛(实时执行)", "状态": precheck_result.get("status", "unknown"), "得分": precheck_result.get("score", 0.0), "阈值": precheck_result.get("threshold", 0.0), "event_id": precheck_result.get("event_id", "N/A"), } explanation = f""" ## 步骤 ①:PatchAD 预筛(实时执行) ### 功能说明 ✅ **已实时执行** PatchAD 预筛,分析当前数据窗口。 ### 工作原理 - 分析窗口内的 HRV、心率、基线偏差等关键指标 - 计算异常得分(0-1 范围) - 与预设阈值比较,判断是否需要继续处理 ### 当前结果 - **状态**: {precheck_result.get("status", "unknown")} - **得分**: {precheck_result.get("score", 0.0):.4f} - **阈值**: {precheck_result.get("threshold", 0.0):.4f} - **Event ID**: {precheck_result.get("event_id", "N/A")} ### 决策逻辑 - ✅ **status = "abnormal"**:得分超过阈值,继续后续流程 - ❌ **status = "normal"**:得分低于阈值,终止流程(无需生成健康方案) ### 下一步 如果预筛通过,系统将汇总用户信息、历史数据,进入 CaseBuilder 构建阶段。 """ show_next = precheck_result.get("status") == "abnormal" # 存储中间数据供后续步骤使用 intermediate_data = { "records": records, "current_window": current_window, "user_id": user_id, "precheck_result": precheck_result, } return ( step1_result, explanation ) except Exception as e: import traceback error_msg = f"❌ 执行错误: {str(e)}\n\n```\n{traceback.format_exc()}\n```" return {}, error_msg def show_step1_precheck(case_name: str, has_patchad: bool): """步骤1:根据外部平台是否有PatchAD显示不同的第一步""" has_patchad_bool = (has_patchad == "是") data = load_case_data(case_name, has_patchad_bool) if not data: return {}, "❌ 未找到案例数据", gr.update(visible=False), gr.update(visible=False), "", None if has_patchad_bool: # 模式A:外部平台有PatchAD,第一步显示 build_case 格式要求 return show_step1_build_case_format(case_name, has_patchad_bool, data) else: # 模式B:外部平台没有PatchAD,第一步显示 precheck 格式要求 return show_step1_precheck_format(case_name, has_patchad_bool, data) def show_step1_build_case_format(case_name: str, has_patchad: bool, data: Dict): """步骤1:显示 build_case 格式要求和传入数据(外部平台有PatchAD)""" bundle = data.get("case_bundle", {}) case = bundle.get("case", {}) precheck = data.get("precheck", {}) # 构建 payload 示例(从案例数据中提取或构建) payload_example = { "event_id": case.get("case_id", "platform-demo-evt"), "user_id": case.get("user_id", "demo_pattern"), "window_data": [ { "timestamp": "2025-01-01T08:00:00", "deviceId": case.get("user_id", "demo_pattern"), "features": { "hr": 72.0, "hrv_rmssd": 35.0, "time_period_primary": "day", "data_quality": "high", "baseline_hrv_mean": 40.0, "baseline_hrv_std": 5.0 }, "static_features": { "age_group": 2, "sex": 0, "exercise": 1 } } # ... 共12条 ], "user_profile": case.get("user_profile", {}), "history_windows": [ { "date": "2025-01-01", "window": [ # ... 12条数据(第1天) ] } # ... 共3-7天 ], "metadata": { "detector": "platform_patchad", "score": precheck.get("score", 0.0), "threshold": precheck.get("threshold", 0.0) } } step1_result = { "步骤": "① build_case 接口调用格式", "模式": "模式A:外部平台有PatchAD", "必填字段": list(payload_example.keys()) } explanation = f""" ## 步骤 ①:build_case 接口调用格式 ### 功能说明 外部平台已集成PatchAD,自己完成预筛,直接调用 `build_case` 接口,传入必要的数据。 ### 模式说明 **模式A:外部平台有PatchAD** - 外部平台自己完成 PatchAD 预筛 - 外部平台自己合成历史窗口数据(history_windows) - 外部平台自己获取用户档案(user_profile) - 直接调用 build_case 接口 ### 必填字段说明 - **window_data**:当前窗口(12条数据点),外部平台自己传入 - **history_windows**:历史窗口(3-7天,每天12条),外部平台自己合成 - **user_profile**:用户档案,外部平台自己获取 - **metadata.detector**:标记为 "platform_patchad" ### 请求示例 ```json {json.dumps(payload_example, ensure_ascii=False, indent=2)} ``` ### 下一步 系统将基于传入的数据,使用 CaseBuilder 构建结构化的 case JSON。 """ payload_json = json.dumps(payload_example, ensure_ascii=False, indent=2) return step1_result, explanation, gr.update(visible=True), gr.update(visible=True), payload_json, None def show_step1_precheck_format(case_name: str, has_patchad: bool, data: Dict): """步骤1:显示 precheck 接口调用格式(外部平台没有PatchAD)""" precheck = data.get("precheck") or {} # 构建 precheck 请求示例 precheck_request = { "user_id": precheck.get("user_id", "demo_pattern"), "window_data": [ { "timestamp": "2025-01-01T08:00:00", "deviceId": precheck.get("user_id", "demo_pattern"), "features": { "hr": 72.0, "hrv_rmssd": 35.0, "time_period_primary": "day", "data_quality": "high", "baseline_hrv_mean": 40.0, "baseline_hrv_std": 5.0 }, "static_features": { "age_group": 2, "sex": 0, "exercise": 1 } } # ... 共12条(当前窗口) ] } step1_result = { "步骤": "① precheck 接口调用格式", "模式": "模式B:平台没有PatchAD", "状态": precheck.get("status", "unknown"), "得分": precheck.get("score", 0.0), "阈值": precheck.get("threshold", 0.0), "event_id": precheck.get("event_id", "N/A"), } explanation = f""" ## 步骤 ①:precheck 接口调用格式 ### 功能说明 外部平台没有PatchAD,需要先调用 `precheck` 接口,传入当前窗口数据(12条),系统返回是否异常和 event_id。 ### 模式说明 **模式B:外部平台没有PatchAD** - 外部平台先调用 precheck 接口,传入当前窗口(12条) - 系统返回异常状态和 event_id - 外部平台根据 event_id 和返回的状态,决定是否继续 ### 请求格式 **接口**:`POST /api/precheck` **Request Body**: ```json {json.dumps(precheck_request, ensure_ascii=False, indent=2)} ``` ### 响应结果 - **状态**: {precheck.get("status", "unknown")} - **得分**: {precheck.get("score", 0.0):.4f} - **阈值**: {precheck.get("threshold", 0.0):.4f} - **Event ID**: {precheck.get("event_id", "N/A")} ### 下一步 如果预筛通过(status = "abnormal"),外部平台需要: 1. 保存 event_id 2. 从历史数据平台获取最近几天的数据 3. 合成 history_windows 4. 调用 build_case 接口(见步骤②) """ precheck_json = json.dumps(precheck, ensure_ascii=False, indent=2) show_next = precheck.get("status") == "abnormal" return step1_result, explanation, gr.update(visible=True), gr.update(visible=show_next), precheck_json, None def show_step1_precheck_old(case_name: str): """步骤1:显示 PatchAD 预筛结果(预生成案例)- 旧版本,保留兼容""" data = load_case_data(case_name, False) if not data: return {}, "❌ 未找到案例数据", gr.update(visible=False), gr.update(visible=False), "" precheck = data.get("precheck") or {} step1_result = { "步骤": "① PatchAD 预筛", "状态": precheck.get("status", "unknown"), "得分": precheck.get("score", 0.0), "阈值": precheck.get("threshold", 0.0), "event_id": precheck.get("event_id", "N/A"), } explanation = f""" ## 步骤 ①:PatchAD 预筛 ### 功能说明 PatchAD / PatchTrAD 是一个**轻量级预筛模型**,用于快速判断当前数据窗口是否需要进入后续的深度分析流程。 ### 工作原理 - 分析窗口内的 HRV、心率、基线偏差等关键指标 - 计算异常得分(0-1 范围) - 与预设阈值比较,判断是否需要继续处理 ### 当前结果 - **状态**: {precheck.get("status", "unknown")} - **得分**: {precheck.get("score", 0.0):.4f} - **阈值**: {precheck.get("threshold", 0.0):.4f} - **Event ID**: {precheck.get("event_id", "N/A")} ### 决策逻辑 - ✅ **status = "abnormal"**:得分超过阈值,继续后续流程 - ❌ **status = "normal"**:得分低于阈值,终止流程(无需生成健康方案) ### 下一步 如果预筛通过,系统将汇总用户信息、历史数据,进入 CaseBuilder 构建阶段。 """ show_next = precheck.get("status") == "abnormal" precheck_json = json.dumps(precheck, ensure_ascii=False, indent=2) return step1_result, explanation, gr.update(visible=True), gr.update(visible=show_next), precheck_json, None def execute_step2_realtime_new(input_text: str) -> tuple: """实时执行步骤2:CaseBuilder 构建(新版本,从输入框读取)""" if not MODULES_AVAILABLE: return {}, "❌ 模块未加载,无法执行" if not input_text or not input_text.strip(): return {}, "❌ 请输入 build_case payload(JSON格式)" try: # 解析payload payload = json.loads(input_text) # 执行 CaseBuilder builder = CaseBuilder() result = builder.build_case(payload) # 这是完整的 case_bundle case = result.get("case", {}) anomaly_pattern = case.get("anomaly_pattern", {}) if isinstance(case, dict) else {} baseline_info = case.get("baseline_info", {}) if isinstance(case, dict) else {} # 提取值用于格式化 case_id = case.get("case_id", "N/A") if isinstance(case, dict) else "N/A" anomaly_type = anomaly_pattern.get("type", "N/A") if isinstance(anomaly_pattern, dict) else "N/A" duration_days = anomaly_pattern.get("duration_days", 0) if isinstance(anomaly_pattern, dict) else 0 trend = anomaly_pattern.get("trend", "N/A") if isinstance(anomaly_pattern, dict) else "N/A" avg_score = anomaly_pattern.get("avg_score", 0.0) if isinstance(anomaly_pattern, dict) else 0.0 baseline_type = baseline_info.get("baseline_type", "N/A") if isinstance(baseline_info, dict) else "N/A" baseline_reliability = baseline_info.get("baseline_reliability", "N/A") if isinstance(baseline_info, dict) else "N/A" explanation = f""" ## 步骤 ②:CaseBuilder 构建(实时执行) ### 功能说明 ✅ **已实时执行** CaseBuilder,将原始数据、用户信息、历史窗口整合成结构化的 case JSON。 ### 工作内容 1. **汇总用户信息**:年龄、性别、生活习惯、生物节律等 2. **分析历史窗口**:计算每日异常分数、HRV、心率等指标 3. **计算基线信息**:个人基线、群体基线、基线可靠性 4. **提取相关指标**:活动水平、睡眠质量、压力指标 5. **识别异常模式**:持续天数、趋势、异常分数范围 ### 当前结果摘要 - **Case ID**: {case_id} - **异常类型**: {anomaly_type} - **持续天数**: {duration_days} 天 - **异常趋势**: {trend} - **平均异常分数**: {avg_score:.4f} - **基线类型**: {baseline_type} - **基线可靠性**: {baseline_reliability} ### case_bundle 结构 返回的 `case_bundle` 包含以下字段: - **case**: 完整的 case JSON(包含所有异常信息、基线信息、用户档案等) - **llm_input**: Markdown 格式的健康分析报告(用于发送给 LLM) - **validation**: 数据验证结果 - **risk_confirmed**: 主时序模型风险确认结果(如果启用) - **main_model_result**: 主时序模型完整结果(如果启用) ### 下一步 Case JSON 构建完成,{'已启用主时序模型确认' if result.get('risk_confirmed') is not None else '主时序模型确认未启用'},将进入步骤③。 """ # 返回完整的 case_bundle,而不是简化的摘要 return result, explanation, gr.update(visible=True) # 显示中间列 except json.JSONDecodeError as e: return {}, f"❌ JSON格式错误: {str(e)}", gr.update(visible=True) except Exception as e: import traceback error_msg = f"❌ 执行错误: {str(e)}\n\n```\n{traceback.format_exc()}\n```" return {}, error_msg, gr.update(visible=True) def execute_step3_realtime_new(input_text: str) -> tuple: """实时执行步骤3:主时序模型确认(新版本,从输入框读取)""" if not MODULES_AVAILABLE: return {}, "❌ 模块未加载,无法执行" if not input_text or not input_text.strip(): return {}, "❌ 请输入 case_bundle(JSON格式,从步骤②复制)" try: # 解析case_bundle case_bundle = json.loads(input_text) case = case_bundle.get("case", {}) # 检查是否已有主时序模型确认结果 if case_bundle.get("main_model_result"): main_result = case_bundle.get("main_model_result", {}) pattern = main_result.get("anomaly_pattern", {}) daily_results = main_result.get("daily_results", []) # 提取值用于说明 risk_confirmed = case_bundle.get("risk_confirmed", False) has_pattern = pattern.get("has_pattern", False) duration_days = pattern.get("duration_days", 0) pattern_desc = pattern.get("pattern_description", "N/A") explanation = f""" ## 步骤 ③:主时序模型确认(实时执行) ### 功能说明 ✅ **已实时执行** 主时序模型确认,使用主时序模型深度分析,确认是否存在真实风险。 ### 当前结果摘要 - **风险确认**: {risk_confirmed} - **存在异常模式**: {has_pattern} - **持续天数**: {duration_days} 天 - **说明**: {pattern_desc} ### main_model_result 结构 返回的 `main_model_result` 包含以下字段: - **anomaly_pattern**: 异常模式分析结果 - `has_pattern`: 是否存在异常模式 - `duration_days`: 异常持续天数 - `anomaly_dates`: 异常日期列表 - `anomaly_scores`: 异常分数列表 - `pattern_description`: 模式描述 - **baseline_info**: 基线信息(如果可用) - **related_indicators**: 相关健康指标(如果可用) - **daily_results**: 每日详细结果数组 - 每天包含:`date`, `anomaly_score`, `is_anomaly`, `hrv_rmssd`, `hr` 等 ### 下一步 主时序模型确认完成,将进入步骤④生成LLM输入。 """ # 返回完整的 main_model_result,而不是简化的摘要 # 同时添加风险确认状态 result_with_metadata = { "步骤": "③ 主时序模型确认(实时执行)", "risk_confirmed": risk_confirmed, "main_model_result": main_result } return result_with_metadata, explanation, gr.update(visible=True) # 显示中间列, gr.update(visible=True) # 显示中间列 else: step3_result = { "步骤": "③ 主时序模型确认(实时执行)", "状态": "未启用", "说明": "case_bundle中未包含主时序模型确认结果,可能未启用该功能", } explanation = """ ## 步骤 ③:主时序模型确认(实时执行) ### 功能说明 ⚠️ **主时序模型确认未启用**,或case_bundle中未包含确认结果。 ### 说明 主时序模型确认需要在CaseBuilder构建时启用(见`configs/case_builder_config.json`)。 ### 下一步 可以跳过此步骤,直接进入步骤④生成LLM输入。 """ return step3_result, explanation, gr.update(visible=True) # 显示中间列 except json.JSONDecodeError as e: return {}, f"❌ JSON格式错误: {str(e)}", gr.update(visible=True) except Exception as e: import traceback error_msg = f"❌ 执行错误: {str(e)}\n\n```\n{traceback.format_exc()}\n```" return {}, error_msg, gr.update(visible=True) def execute_step4_realtime_new(input_text: str) -> tuple: """实时执行步骤4:LLM 输入生成(新版本,从输入框读取)""" if not MODULES_AVAILABLE: return "❌ 模块未加载,无法执行", "❌ 模块未加载,无法执行", gr.update(visible=True) if not input_text or not input_text.strip(): return "❌ 请输入 case_bundle(JSON格式,从步骤②或③复制)", "❌ 请输入 case_bundle(JSON格式,从步骤②或③复制)", gr.update(visible=True) try: # 解析case_bundle case_bundle = json.loads(input_text) llm_input = case_bundle.get("llm_input", "") if not llm_input: error_msg = "❌ case_bundle中未包含llm_input字段\n\n**提示**:请确保从步骤②复制完整的 case_bundle,它应该包含 'llm_input' 字段。" return error_msg, error_msg, gr.update(visible=True) explanation = """ ## 步骤 ④:LLM 输入生成(实时执行) ### 功能说明 ✅ **已实时执行** LLM 输入生成,从 case_bundle 中提取 llm_input 字段。 ### 说明 - `llm_input` 是 Markdown 格式的健康分析报告 - 可以直接复制到步骤⑤,用于生成健康干预方案 - 也可以直接发送给 LLM API 进行进一步处理 ### 下一步 将 `llm_input` 复制到步骤⑤,或直接调用 LLM API 生成健康干预方案。 """ return llm_input, explanation, gr.update(visible=True) # 显示中间列 except json.JSONDecodeError as e: error_msg = f"❌ JSON格式错误: {str(e)}\n\n**提示**:请确保输入的是有效的JSON格式,可以从步骤②的输出中复制完整的 case_bundle。" return error_msg, error_msg, gr.update(visible=True) except Exception as e: import traceback error_msg = f"❌ 执行错误: {str(e)}\n\n```\n{traceback.format_exc()}\n```" return error_msg, error_msg, gr.update(visible=True) def execute_step5_realtime_new(input_text: str) -> tuple: """实时执行步骤5:LLM 输出(新版本,从输入框读取)""" # 步骤5需要调用LLM API,这里只做占位 if not input_text or not input_text.strip(): return "❌ 请输入LLM输入(Markdown格式,从步骤④复制)", gr.update(visible=True) return "⚠️ LLM API调用功能需要配置API密钥,请参考文档配置后使用。\n\n当前输入内容:\n\n" + input_text[:500] + "...", gr.update(visible=True) def execute_step2_casebuilder(intermediate_data: Optional[Dict]) -> tuple: """实时执行步骤2:CaseBuilder 构建""" if not MODULES_AVAILABLE: return {}, "❌ 模块未加载,无法执行", gr.update(visible=False), gr.update(visible=False), None if not intermediate_data: return {}, "❌ 缺少步骤①的数据,请先执行步骤①", gr.update(visible=False), gr.update(visible=False), None try: records = intermediate_data.get("records", []) current_window = intermediate_data.get("current_window", []) user_id = intermediate_data.get("user_id", "demo_user") precheck_result = intermediate_data.get("precheck_result", {}) # 分割历史窗口 windows = split_windows(records, days=3) if len(windows) < 2: return {}, "❌ 数据不足以生成历史窗口(至少需要2天的数据)", gr.update(visible=False), gr.update(visible=False), None # 构建 payload payload = { "event_id": precheck_result.get("event_id", "demo-event"), "user_id": user_id, "window_data": current_window, "user_profile": sample_profile(), "history_windows": windows, "metadata": { "detector": "realtime_patchad", "patchad_score": precheck_result.get("score", 0.0), "threshold": precheck_result.get("threshold", 0.0), }, } # 执行 CaseBuilder builder = CaseBuilder() result = builder.build_case(payload) case = result.get("case", {}) anomaly_pattern = case.get("anomaly_pattern", {}) baseline_info = case.get("baseline_info", {}) step2_result = { "步骤": "② CaseBuilder 构建(实时执行)", "case_id": case.get("case_id", "N/A"), "用户ID": case.get("user_id", "N/A"), "异常模式": { "类型": anomaly_pattern.get("type", "N/A"), "持续天数": anomaly_pattern.get("duration_days", 0), "趋势": anomaly_pattern.get("trend", "N/A"), "平均分数": anomaly_pattern.get("avg_score", 0.0), }, "基线信息": { "类型": baseline_info.get("baseline_type", "N/A"), "可靠性": baseline_info.get("baseline_reliability", "N/A"), "个人均值": baseline_info.get("personal_mean", 0.0), }, } explanation = f""" ## 步骤 ②:CaseBuilder 构建(实时执行) ### 功能说明 ✅ **已实时执行** CaseBuilder,将原始数据、用户信息、历史窗口整合成结构化的 case JSON。 ### 工作内容 1. **汇总用户信息**:年龄、性别、生活习惯、生物节律等 2. **分析历史窗口**:计算每日异常分数、HRV、心率等指标 3. **计算基线信息**:个人基线、群体基线、基线可靠性 4. **提取相关指标**:活动水平、睡眠质量、压力指标 5. **识别异常模式**:持续天数、趋势、异常分数范围 ### 当前结果 - **Case ID**: {case.get("case_id", "N/A")} - **异常类型**: {anomaly_pattern.get("type", "N/A")} - **持续天数**: {anomaly_pattern.get("duration_days", 0)} 天 - **异常趋势**: {anomaly_pattern.get("trend", "N/A")} - **平均异常分数**: {anomaly_pattern.get("avg_score", 0.0):.4f} - **基线类型**: {baseline_info.get("baseline_type", "N/A")} - **基线可靠性**: {baseline_info.get("baseline_reliability", "N/A")} ### 下一步 Case JSON 构建完成,{'已启用主时序模型确认' if result.get('risk_confirmed') is not None else '主时序模型确认未启用'},将进入步骤③。 """ # 更新中间数据 intermediate_data["case_bundle"] = result return step2_result, explanation, gr.update(visible=True), gr.update(visible=True), intermediate_data except Exception as e: import traceback error_msg = f"❌ 执行错误: {str(e)}\n{traceback.format_exc()}" return {}, error_msg, gr.update(visible=False), gr.update(visible=False), None def show_step2_casebuilder(case_name: str, has_patchad: bool): """步骤2:显示 CaseBuilder 构建结果(预生成案例)""" has_patchad_bool = (has_patchad == "是") data = load_case_data(case_name, has_patchad_bool) if not data: return {}, "❌ 未找到案例数据", gr.update(visible=False), gr.update(visible=False), None # 如果是模式B(没有PatchAD),这一步需要显示 build_case 格式要求 if not has_patchad_bool: return show_step2_build_case_format(case_name, has_patchad_bool, data) # 如果是模式A(有PatchAD),直接显示 CaseBuilder 构建结果 return show_step2_casebuilder_result(case_name, has_patchad_bool, data) def show_step2_build_case_format(case_name: str, has_patchad: bool, data: Dict): """步骤2:显示 build_case 格式要求(外部平台没有PatchAD,在precheck之后)""" bundle = data.get("case_bundle", {}) case = bundle.get("case", {}) precheck = data.get("precheck", {}) # 构建 payload 示例(模式B:只传 event_id + history_windows,不传 window_data) payload_example = { "event_id": precheck.get("event_id", "demo_pattern-e89db848"), "user_id": case.get("user_id", "demo_pattern"), "user_profile": case.get("user_profile", {}), "history_windows": [ { "date": "2025-01-01", "window": [ # ... 12条数据(第1天) ] } # ... 共3-7天 ], "metadata": { "detector": "official_patchad", "patchad_score": precheck.get("score", 0.0), "threshold": precheck.get("threshold", 0.0) } } step2_result = { "步骤": "② build_case 接口调用格式", "模式": "模式B:平台没有PatchAD", "必填字段": list(payload_example.keys()), "注意": "不传 window_data,系统根据 event_id 从缓存获取" } explanation = f""" ## 步骤 ②:build_case 接口调用格式 ### 功能说明 外部平台收到 precheck 返回的 event_id 后,需要: 1. 从历史数据平台获取最近几天的数据 2. 合成 history_windows(3-7天,每天12条) 3. 获取用户档案(user_profile) 4. 调用 build_case 接口,传入 event_id 和合成的数据 ### 模式说明 **模式B:外部平台没有PatchAD** - 外部平台自己合成历史窗口数据(history_windows) - 外部平台自己获取用户档案(user_profile) - **不传 window_data**:系统根据 event_id 从缓存获取 - 传入 event_id,系统从 PrecheckServer 缓存获取 window_data ### 必填字段说明 - **event_id**:从 precheck 接口返回的 event_id(系统根据此从缓存获取 window_data) - **history_windows**:历史窗口(3-7天,每天12条),外部平台自己合成 - **user_profile**:用户档案,外部平台自己获取 - **不传 window_data**:系统根据 event_id 从缓存获取 ### 请求示例 ```json {json.dumps(payload_example, ensure_ascii=False, indent=2)} ``` ### 下一步 系统将基于传入的数据,使用 CaseBuilder 构建结构化的 case JSON。 """ payload_json = json.dumps(payload_example, ensure_ascii=False, indent=2) return step2_result, explanation, gr.update(visible=True), gr.update(visible=True), None def show_step2_casebuilder_result(case_name: str, has_patchad: bool, data: Dict): """步骤2:显示 CaseBuilder 构建结果""" bundle = data.get("case_bundle") or {} case = bundle.get("case", {}) step2_result = { "步骤": "② CaseBuilder 构建 Case", "case_id": case.get("case_id", "N/A"), "用户ID": case.get("user_id", "N/A"), "异常模式": case.get("anomaly_pattern", {}), "基线信息": case.get("baseline_info", {}), "相关指标": case.get("related_indicators", {}), } anomaly_pattern = case.get("anomaly_pattern", {}) baseline_info = case.get("baseline_info", {}) explanation = f""" ## 步骤 ②:CaseBuilder 构建结构化 Case ### 功能说明 CaseBuilder 负责将**原始数据、用户信息、历史窗口**整合成一个结构化的 case JSON,为后续的主时序模型确认和 LLM 输入生成做准备。 ### 工作内容 1. **汇总用户信息**:年龄、性别、生活习惯、生物节律等 2. **分析历史窗口**:计算每日异常分数、HRV、心率等指标 3. **计算基线信息**:个人基线、群体基线、基线可靠性 4. **提取相关指标**:活动水平、睡眠质量、压力指标 5. **识别异常模式**:持续天数、趋势、异常分数范围 ### 构建结果 - **Case ID**: {case.get("case_id", "N/A")} - **异常类型**: {anomaly_pattern.get("type", "N/A")} - **持续天数**: {anomaly_pattern.get("duration_days", 0)} 天 - **趋势**: {anomaly_pattern.get("trend", "N/A")} - **基线类型**: {baseline_info.get("baseline_type", "N/A")} - **基线可靠性**: {baseline_info.get("baseline_reliability", "N/A")} ### 下一步 Case JSON 构建完成,{'已启用主时序模型确认' if bundle.get('risk_confirmed') is not None else '主时序模型确认未启用'},将进入步骤③。 """ return step2_result, explanation, gr.update(visible=True), gr.update(visible=True), None bundle = data.get("case_bundle") or {} case = bundle.get("case", {}) step2_result = { "步骤": "② CaseBuilder 构建 Case", "case_id": case.get("case_id", "N/A"), "用户ID": case.get("user_id", "N/A"), "异常模式": case.get("anomaly_pattern", {}), "基线信息": case.get("baseline_info", {}), "相关指标": case.get("related_indicators", {}), } anomaly_pattern = case.get("anomaly_pattern", {}) baseline_info = case.get("baseline_info", {}) explanation = f""" ## 步骤 ②:CaseBuilder 构建结构化 Case ### 功能说明 CaseBuilder 负责将**原始数据、用户信息、历史窗口**整合成一个结构化的 case JSON,为后续的主时序模型确认和 LLM 输入生成做准备。 ### 工作内容 1. **汇总用户信息**:年龄、性别、生活习惯、生物节律等 2. **分析历史窗口**:计算每日异常分数、HRV、心率等指标 3. **计算基线信息**:个人基线、群体基线、基线可靠性 4. **提取相关指标**:活动水平、睡眠质量、压力指标 5. **识别异常模式**:持续天数、趋势、异常分数范围 ### 当前结果 - **Case ID**: {case.get("case_id", "N/A")} - **异常类型**: {anomaly_pattern.get("type", "N/A")} - **持续天数**: {anomaly_pattern.get("duration_days", 0)} 天 - **异常趋势**: {anomaly_pattern.get("trend", "N/A")} - **平均异常分数**: {anomaly_pattern.get("avg_score", 0.0):.4f} - **基线类型**: {baseline_info.get("baseline_type", "N/A")} - **基线可靠性**: {baseline_info.get("baseline_reliability", "N/A")} ### 下一步 Case JSON 构建完成后,将进入主时序模型确认阶段,使用 Phase2 TFT/Phased LSTM 对历史窗口进行深度分析。 """ return step2_result, explanation, gr.update(visible=True), gr.update(visible=True), None def execute_step3_confirmation(intermediate_data: Optional[Dict]) -> tuple: """实时执行步骤3:主时序模型确认""" if not MODULES_AVAILABLE: return {}, "❌ 模块未加载,无法执行", gr.update(visible=False), gr.update(visible=False), None if not intermediate_data: return {}, "❌ 缺少步骤②的数据,请先执行步骤②", gr.update(visible=False), gr.update(visible=False), None try: case_bundle = intermediate_data.get("case_bundle") if not case_bundle: return {}, "❌ 缺少 case_bundle 数据", gr.update(visible=False), gr.update(visible=False), None risk_confirmed = case_bundle.get("risk_confirmed") main_model_result = case_bundle.get("main_model_result") or {} # 检查配置状态 config_path = BASE_DIR / "configs" / "case_builder_config.json" config_enabled = False if config_path.exists(): try: config = json.loads(config_path.read_text(encoding="utf-8")) config_enabled = config.get("main_model_confirmation", {}).get("enabled", False) except Exception: pass if risk_confirmed is None and not config_enabled: step3_result = { "步骤": "③ 主时序模型确认", "状态": "未启用(配置 enabled=false)", } explanation = """ ## 步骤 ③:主时序模型确认 ### 功能说明 主时序模型确认是可选步骤,需要在配置文件中启用(`case_builder_config.json` 中设置 `enabled=true`)。 ### 当前状态 ⚠️ **未启用**:当前配置未启用主时序模型确认,系统只使用 PatchAD 分数进行判断。 ### 如何启用 修改 `configs/case_builder_config.json`: ```json { "main_model_confirmation": { "enabled": true, "mode": "auto", "model_dir": "checkpoints/phase2/exp_factor_balanced" } } ``` ### 下一步 即使未启用主时序模型确认,系统仍会继续生成 LLM 输入文本。 """ return step3_result, explanation, gr.update(visible=True), gr.update(visible=True), intermediate_data elif risk_confirmed is None and config_enabled: step3_result = { "步骤": "③ 主时序模型确认", "状态": "⚠️ 配置已启用,但当前案例数据未包含确认结果", "提示": "需要重新生成案例数据以包含主时序模型确认结果", } explanation = """ ## 步骤 ③:主时序模型确认 ### 功能说明 主时序模型(**Phase2 TFT/Phased LSTM**)对历史窗口进行深度分析,确认是否存在真实的健康风险。 ### 当前状态 ⚠️ **配置已启用,但案例数据缺失**: - ✅ 配置文件 `case_builder_config.json` 中 `enabled=true` - ❌ 当前案例数据是在配置未启用时生成的,不包含主时序模型确认结果 ### 解决方案 需要重新运行 CaseBuilder 生成新的案例数据,新案例将包含主时序模型的确认结果。 ### 下一步 重新生成案例后,这里将显示实际的风险确认结果。 """ return step3_result, explanation, gr.update(visible=True), gr.update(visible=True), intermediate_data else: # 有确认结果 anomaly_pattern = main_model_result.get("anomaly_pattern", {}) step3_result = { "步骤": "③ 主时序模型确认(实时执行)", "风险确认": "✅ 确认有风险" if risk_confirmed else "❌ 确认无风险", "异常模式": { "描述": anomaly_pattern.get("pattern_description", "N/A"), "持续天数": anomaly_pattern.get("duration_days", 0), "趋势": anomaly_pattern.get("trend", "N/A"), }, } explanation = f""" ## 步骤 ③:主时序模型确认(实时执行) ### 功能说明 ✅ **已实时执行** 主时序模型确认,使用 Phase2 TFT/Phased LSTM 对历史窗口进行深度分析。 ### 工作原理 - 使用训练好的 Phase2 模型对多天历史窗口进行异常检测 - 分析异常模式的持续性和趋势 - 结合个人基线和群体基线进行综合判断 - 输出风险确认结果 ### 当前结果 - **风险确认**: {'✅ 确认有风险' if risk_confirmed else '❌ 确认无风险'} - **异常模式**: {anomaly_pattern.get("pattern_description", "N/A")} - **持续天数**: {anomaly_pattern.get("duration_days", 0)} 天 - **趋势**: {anomaly_pattern.get("trend", "N/A")} ### 决策逻辑 - ✅ **确认有风险**:继续生成 LLM 输入,准备生成健康方案 - ❌ **确认无风险**:终止流程,不生成健康方案(可能是误报) ### 下一步 如果确认有风险,系统将生成 LLM 输入文本(Markdown 格式的健康分析报告),准备发送给 LLM 生成健康干预方案。 """ return step3_result, explanation, gr.update(visible=True), gr.update(visible=True), intermediate_data except Exception as e: import traceback error_msg = f"❌ 执行错误: {str(e)}\n{traceback.format_exc()}" return {}, error_msg, gr.update(visible=False), gr.update(visible=False), None def show_step3_confirmation(case_name: str, has_patchad: bool): """步骤3:显示主时序模型确认结果(预生成案例)""" import json from pathlib import Path has_patchad_bool = (has_patchad == "是") data = load_case_data(case_name, has_patchad_bool) if not data: return {}, "❌ 未找到案例数据", gr.update(visible=False), gr.update(visible=False), None, None # 检查配置文件状态 config_path = BASE_DIR / "configs" / "case_builder_config.json" config_enabled = False if config_path.exists(): try: config = json.loads(config_path.read_text(encoding="utf-8")) config_enabled = config.get("main_model_confirmation", {}).get("enabled", False) except Exception: pass bundle = data.get("case_bundle") or {} step3_confirmation = bundle.get("main_model_result") or {} risk_confirmed = bundle.get("risk_confirmed") # 如果配置已启用但案例数据中没有结果,说明案例是旧数据 if config_enabled and risk_confirmed is None: step3_result = { "步骤": "③ 主时序模型确认", "状态": "⚠️ 配置已启用,但当前案例数据未包含确认结果", "提示": "需要重新生成案例数据以包含主时序模型确认结果", } explanation = f""" ## 步骤 ③:主时序模型确认 ### 功能说明 主时序模型(**Phase2 TFT/Phased LSTM**)对历史窗口进行深度分析,确认是否存在真实的健康风险。这是对 PatchAD 预筛结果的二次验证。 ### 当前状态 ⚠️ **配置已启用,但案例数据缺失**: - ✅ 配置文件 `case_builder_config.json` 中 `enabled=true` - ❌ 当前案例数据是在配置未启用时生成的,不包含主时序模型确认结果 ### 解决方案 需要重新运行 `simulate_patchad_case_pipeline.py` 生成新的案例数据,新案例将包含主时序模型的确认结果。 ### 工作原理 - 使用训练好的 Phase2 模型对多天历史窗口进行异常检测 - 分析异常模式的持续性和趋势 - 结合个人基线和群体基线进行综合判断 - 输出风险确认结果 ### 下一步 重新生成案例后,这里将显示实际的风险确认结果。 """ return step3_result, explanation, gr.update(visible=True), gr.update(visible=True), None elif risk_confirmed is not None: # 配置已启用且案例数据中有结果 anomaly_pattern = step3_confirmation.get("anomaly_pattern", {}) step3_result = { "步骤": "③ 主时序模型确认", "风险确认": "✅ 确认有风险" if risk_confirmed else "❌ 确认无风险", "异常模式": anomaly_pattern, } explanation = f""" ## 步骤 ③:主时序模型确认 ### 功能说明 主时序模型(**Phase2 TFT/Phased LSTM**)对历史窗口进行深度分析,确认是否存在真实的健康风险。这是对 PatchAD 预筛结果的二次验证。 ### 工作原理 - 使用训练好的 Phase2 模型对多天历史窗口进行异常检测 - 分析异常模式的持续性和趋势 - 结合个人基线和群体基线进行综合判断 - 输出风险确认结果 ### 当前结果 - **风险确认**: {'✅ 确认有风险' if risk_confirmed else '❌ 确认无风险'} - **异常模式**: {anomaly_pattern.get("pattern_description", "N/A")} - **持续天数**: {anomaly_pattern.get("duration_days", 0)} 天 - **趋势**: {anomaly_pattern.get("trend", "N/A")} ### 决策逻辑 - ✅ **确认有风险**:继续生成 LLM 输入,准备生成健康方案 - ❌ **确认无风险**:终止流程,不生成健康方案(可能是误报) ### 下一步 如果确认有风险,系统将生成 LLM 输入文本(Markdown 格式的健康分析报告),准备发送给 LLM 生成健康干预方案。 """ return step3_result, explanation, gr.update(visible=True), gr.update(visible=True), None else: # 配置未启用 step3_result = { "步骤": "③ 主时序模型确认", "状态": "未启用(配置 enabled=false)", } explanation = """ ## 步骤 ③:主时序模型确认 ### 功能说明 主时序模型确认是可选步骤,需要在配置文件中启用(`case_builder_config.json` 中设置 `enabled=true`)。 ### 当前状态 ⚠️ **未启用**:当前配置未启用主时序模型确认,系统只使用 PatchAD 分数进行判断。 ### 如何启用 修改 `configs/case_builder_config.json`: ```json { "main_model_confirmation": { "enabled": true, "mode": "auto", "model_dir": "checkpoints/phase2/exp_factor_balanced" } } ``` ### 下一步 即使未启用主时序模型确认,系统仍会继续生成 LLM 输入文本。 """ return step3_result, explanation, gr.update(visible=True), gr.update(visible=True), None def execute_step4_llm_input(intermediate_data: Optional[Dict]) -> tuple: """实时执行步骤4:LLM 输入生成""" if not intermediate_data: return "❌ 缺少步骤②的数据,请先执行步骤②", "❌ 缺少步骤②的数据,请先执行步骤②", gr.update(visible=False), gr.update(visible=False) try: case_bundle = intermediate_data.get("case_bundle") if not case_bundle: return "❌ 缺少 case_bundle 数据", "❌ 缺少 case_bundle 数据", gr.update(visible=False), gr.update(visible=False) llm_input = case_bundle.get("llm_input", "(无)") explanation = f""" ## 步骤 ④:LLM 输入生成(实时执行) ### 功能说明 ✅ **已实时生成** LLM 输入,将结构化的 case JSON 转换为 Markdown 格式的健康分析报告。 ### 输入内容 LLM 输入包含以下部分: 1. **异常概览**:异常类型、持续时长、趋势 2. **异常评分分析**:异常分数范围、平均分数、检测阈值 3. **当前生理状态评估**:当前值、基线值、偏离基线百分比 4. **历史趋势分析**:每日 HRV、心率、异常分数 5. **相关健康指标分析**:活动水平、睡眠质量、压力指标 6. **用户背景信息**:年龄、性别、生活习惯、生物节律 ### 格式说明 - 使用 Markdown 格式,便于 LLM 解析 - 包含表格、列表等结构化信息 - 不包含判断性结论(由 LLM 自己分析) ### 下一步 将 LLM 输入发送给 LLM(如 DeepSeek、GPT 等),生成个性化的健康干预方案。 """ return llm_input, explanation, gr.update(visible=True), gr.update(visible=True) except Exception as e: import traceback error_msg = f"❌ 执行错误: {str(e)}\n{traceback.format_exc()}" return error_msg, error_msg, gr.update(visible=False), gr.update(visible=False) def show_step4_llm_input(case_name: str, has_patchad: bool): """步骤4:显示 LLM 输入(预生成案例)""" has_patchad_bool = (has_patchad == "是") data = load_case_data(case_name, has_patchad_bool) if not data: return "❌ 未找到案例数据", "❌ 未找到案例数据", gr.update(visible=False), gr.update(visible=False) bundle = data.get("case_bundle") or {} llm_input = bundle.get("llm_input", "(无)") explanation = f""" ## 步骤 ④:LLM 输入生成 ### 功能说明 LLM 输入是将**结构化的 case JSON** 转换为 **Markdown 格式的健康分析报告**,便于 LLM 理解和处理。 ### 输入内容 LLM 输入包含以下部分: 1. **异常概览**:异常类型、持续时长、趋势 2. **异常评分分析**:异常分数范围、平均分数、检测阈值 3. **当前生理状态评估**:当前值、基线值、偏离基线百分比 4. **历史趋势分析**:每日 HRV、心率、异常分数 5. **相关健康指标分析**:活动水平、睡眠质量、压力指标 6. **用户背景信息**:年龄、性别、生活习惯、生物节律 ### 格式说明 - 使用 Markdown 格式,便于 LLM 解析 - 包含表格、列表等结构化信息 - 不包含判断性结论(由 LLM 自己分析) ### 下一步 将 LLM 输入发送给 LLM(如 DeepSeek、GPT 等),生成个性化的健康干预方案。 """ return llm_input, explanation, gr.update(visible=True), gr.update(visible=True) def show_step5_llm_output(case_name: str, has_patchad: bool): """步骤5:显示 LLM 输出""" has_patchad_bool = (has_patchad == "是") data = load_case_data(case_name, has_patchad_bool) if not data: return "❌ 未找到案例数据", "❌ 未找到案例数据", gr.update(visible=False) bundle = data.get("case_bundle") or {} llm_output = bundle.get("intervention") or bundle.get("llm_output") or "(当前案例未包含 LLM 输出)" explanation = f""" ## 步骤 ⑤:LLM 输出(健康干预方案) ### 功能说明 LLM 根据输入的健康分析报告,生成**个性化的、可执行的健康干预方案**。 ### 输出内容 健康干预方案通常包含: 1. **异常原因分析**:解释为什么会出现异常 2. **风险等级评估**:评估健康风险的严重程度 3. **个性化建议**:基于用户年龄、性别、生活习惯的建议 4. **两周强化方案**:具体的、可执行的干预措施 5. **注意事项**:需要避免的行为或需要关注的事项 ### 特点 - ✅ **个性化**:基于用户的具体情况 - ✅ **可执行**:提供具体的行动建议 - ✅ **语言友好**:使用通俗易懂的语言 - ✅ **时间明确**:通常为两周的强化方案 ### 当前状态 {'✅ 已生成健康方案' if llm_output and llm_output != "(当前案例未包含 LLM 输出)" else '⏳ 待生成(需要调用 LLM API)'} ### 流程完成 至此,完整的健康监测流程已完成:从原始数据 → 异常检测 → 原因分析 → 健康方案。 """ return llm_output, explanation, gr.update(visible=True) css = """ @import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&display=swap'); :root{ --bg:#F7FAFC; --card:#FFFFFF; --muted:#64748B; --text:#0F172A; --accent:#0EA5A3; --accent-600:#0B8F88; --radius:14px; --shadow: 0 6px 18px rgba(15,23,42,0.06); } body { background: var(--bg); font-family: Inter, -apple-system, "Segoe UI", Roboto, "Helvetica Neue", Arial; } /* 全宽显示 */ .gradio-container { max-width: 100% !important; padding: 20px !important; } #component-0, #root { max-width: 100% !important; width: 100% !important; } .container { max-width: 100%; margin: 0 auto; padding: 20px; width: 100%; box-sizing: border-box; } .hero { background: linear-gradient(135deg, rgba(255,255,255,0.95) 0%, rgba(247,250,252,0.98) 100%); border-radius: 20px; box-shadow: 0 8px 32px rgba(15,23,42,0.08), 0 2px 8px rgba(15,23,42,0.04); padding: 48px 40px; text-align: center; border: 1px solid rgba(14,165,163,0.1); } .hero h1 { margin: 0; font-size: 42px; color: var(--text); font-weight: 700; background: linear-gradient(135deg, #0EA5A3 0%, #06B6D4 100%); -webkit-background-clip: text; -webkit-text-fill-color: transparent; background-clip: text; letter-spacing: -0.5px; } .hero p.lead { margin: 16px 0 8px; color: var(--muted); font-size: 17px; line-height: 1.7; font-weight: 500; } .hero p.lead strong { color: var(--accent); font-weight: 600; } .value-row { display: flex; gap: 24px; margin-top: 40px; justify-content: center; flex-wrap: wrap; } .value-card { background: linear-gradient(135deg, #FFFFFF 0%, #F8FAFC 100%); border-radius: 16px; padding: 28px 24px; box-shadow: 0 4px 16px rgba(15,23,42,0.06), 0 1px 4px rgba(15,23,42,0.04); flex: 1; max-width: 340px; min-width: 280px; border: 1px solid rgba(14,165,163,0.08); transition: all 0.3s ease; position: relative; overflow: hidden; } .value-card::before { content: ''; position: absolute; top: 0; left: 0; right: 0; height: 3px; background: linear-gradient(90deg, #0EA5A3 0%, #06B6D4 100%); opacity: 0; transition: opacity 0.3s ease; } .value-card:hover { transform: translateY(-4px); box-shadow: 0 8px 24px rgba(15,23,42,0.12), 0 2px 8px rgba(15,23,42,0.08); border-color: rgba(14,165,163,0.2); } .value-card:hover::before { opacity: 1; } .value-card .icon-wrapper { font-size: 36px; margin-bottom: 16px; display: inline-block; padding: 12px; background: linear-gradient(135deg, rgba(14,165,163,0.08) 0%, rgba(6,182,212,0.08) 100%); border-radius: 12px; width: 64px; height: 64px; display: flex; align-items: center; justify-content: center; margin: 0 auto 16px; } .value-card h3 { margin: 0 0 12px; font-size: 20px; color: var(--text); font-weight: 600; letter-spacing: -0.3px; } .value-card .card-desc { margin: 0 0 16px; color: var(--muted); font-size: 13px; line-height: 1.6; font-weight: 500; } .value-card .card-desc strong { color: var(--text); font-weight: 600; } .value-card .feature-list { text-align: left; margin: 0; padding: 0; list-style: none; } .value-card .feature-list li { margin: 10px 0; font-size: 13px; color: var(--muted); line-height: 1.6; padding-left: 24px; position: relative; } .value-card .feature-list li::before { content: '✓'; position: absolute; left: 0; color: var(--accent); font-weight: 600; font-size: 14px; } .value-card p { margin: 0; color: var(--muted); font-size: 14px; line-height: 1.6; } .demo-panel { margin-top:22px; background:var(--card); border-radius: var(--radius); padding:24px; box-shadow: var(--shadow); border: 1px solid rgba(15,23,42,0.06); } .case-selection-card { background: linear-gradient(135deg, #FFFFFF 0%, #F8FAFC 100%); border-radius: 12px; padding: 20px; border: 1px solid rgba(14,165,163,0.1); box-shadow: 0 2px 8px rgba(15,23,42,0.04); } .flow-description-card { background: linear-gradient(135deg, rgba(14,165,163,0.03) 0%, rgba(6,182,212,0.03) 100%); border-radius: 12px; padding: 20px; border: 1px solid rgba(14,165,163,0.15); box-shadow: 0 2px 8px rgba(15,23,42,0.04); } .flow-description-card h3 { margin-top: 0; color: var(--text); font-size: 18px; font-weight: 600; margin-bottom: 16px; } .panel-title { font-size: 28px; font-weight: 700; color: var(--text); margin: 0 0 24px 0; padding-bottom: 12px; border-bottom: 2px solid rgba(14,165,163,0.15); letter-spacing: -0.3px; } .comparison-title { font-size: 20px; font-weight: 600; color: var(--text); margin: 16px 0 12px 0; letter-spacing: -0.2px; } .step-panel { background: #F8FAFC; border-radius: 10px; padding: 20px; margin: 16px 0; border-left: 4px solid var(--accent); } .step-title { font-size: 20px; font-weight: 700; color: var(--text); margin-bottom: 12px; } .step-content { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; margin-top: 16px; } .step-explanation { background: white; padding: 16px; border-radius: 8px; border: 1px solid rgba(15,23,42,0.06); } .step-result { background: white; padding: 16px; border-radius: 8px; border: 1px solid rgba(15,23,42,0.06); } /* 流程图样式 */ .flow-container { padding: 20px; background: #F8FAFC; border-radius: 12px; border: 1px solid rgba(15,23,42,0.08); } .flow-step { background: white; border-radius: 8px; padding: 16px; margin: 12px 0; border-left: 4px solid; box-shadow: 0 2px 8px rgba(15,23,42,0.04); } .flow-step.platform { border-left-color: #3B82F6; /* 蓝色 */ } .flow-step.system { border-left-color: #F97316; /* 橙色 */ } .step-header { font-weight: 600; font-size: 15px; color: var(--text); margin-bottom: 12px; padding-bottom: 8px; border-bottom: 1px solid rgba(15,23,42,0.08); } .step-content { font-size: 13px; color: var(--muted); line-height: 1.6; } .step-content p { margin: 6px 0; } .step-content strong { color: var(--text); font-weight: 500; } .flow-arrow { text-align: center; color: var(--accent); font-size: 20px; font-weight: bold; margin: 4px 0; } /* 数据展开/收起样式 */ .data-details { margin-top: 12px; padding-top: 12px; border-top: 1px solid rgba(15,23,42,0.1); } .data-details summary { cursor: pointer; font-weight: 500; color: var(--accent); font-size: 13px; padding: 6px 0; user-select: none; } .data-details summary:hover { color: var(--accent-600); } .data-details summary::-webkit-details-marker { display: none; } .data-details summary::before { content: "▶ "; display: inline-block; transition: transform 0.2s; margin-right: 4px; } .data-details[open] summary::before { transform: rotate(90deg); } .data-json, .data-markdown { background: #F8FAFC; border: 1px solid rgba(15,23,42,0.1); border-radius: 6px; padding: 12px; margin-top: 8px; font-size: 12px; line-height: 1.5; overflow-x: auto; max-height: 400px; overflow-y: auto; font-family: 'Monaco', 'Menlo', 'Ubuntu Mono', monospace; } .data-markdown { white-space: pre-wrap; word-break: break-word; } @media (max-width:1000px){ .value-row { flex-direction: column; align-items: center; } .value-card { max-width: 100%; } .step-content { grid-template-columns: 1fr; } } """ with gr.Blocks( theme=gr.themes.Default(primary_hue="teal"), css=css, title="Wearable Health AI - 健康监测系统" ) as demo: gr.Markdown('
从可穿戴数据到 异常检测 → 原因分析 → 个性化干预方案 的一站式智能健康监测系统
🎯 核心优势:实时监测 · 精准识别 · 智能分析 · 个性化方案 · 可执行建议
双重检测机制:PatchAD/PatchTrAD 轻量级预筛 + Phase2 深度残差融合模型
全维度分析:CaseBuilder 数据整合 + AnomalyFormatter 智能格式化
AI 生成方案:基于 LLM 的智能健康管理建议
💡 适用场景:个人健康监测 · 企业员工健康管理 · 医疗机构辅助诊断 · 健康设备厂商集成 · 健康管理平台对接
请选择一个案例
" # 加载案例数据 has_patchad_bool = (mode == "A") case_data = load_case_data(case_name, has_patchad_bool) # 提取实际数据用于展示 precheck = case_data.get("precheck", {}) if case_data else {} bundle = case_data.get("case_bundle", {}) if case_data else {} case = bundle.get("case", {}) if bundle else {} if mode == "A": # 模式A:平台有PatchAD precheck_score = precheck.get("score", 0.0) precheck_status = precheck.get("status", "unknown") precheck_threshold = precheck.get("threshold", 0.0) anomaly_type = case.get("anomaly_pattern", {}).get("type", "N/A") duration_days = case.get("anomaly_pattern", {}).get("duration_days", 0) avg_score = case.get("anomaly_pattern", {}).get("avg_score", 0.0) case_id = case.get("case_id", "N/A") user_profile = case.get("user_profile", {}) daily_results = case.get("daily_results", []) baseline_info = case.get("baseline_info", {}) llm_input = bundle.get("llm_input", "") # 格式化JSON数据用于展示 precheck_json = json.dumps(precheck, ensure_ascii=False, indent=2) user_profile_json = json.dumps(user_profile, ensure_ascii=False, indent=2) daily_results_json = json.dumps(daily_results[:3], ensure_ascii=False, indent=2) # 只显示前3天 baseline_info_json = json.dumps(baseline_info, ensure_ascii=False, indent=2) case_json = json.dumps(case, ensure_ascii=False, indent=2) html = f"""输入:当前窗口(12条数据)
输出:异常状态 + 分数
实际结果:状态={precheck_status}, 分数={precheck_score:.4f}, 阈值={precheck_threshold:.4f}
位置:外部平台内部执行
说明:外部平台自己用PatchAD检测,发现异常后继续流程
{precheck_json}
输入:用户ID
输出:最近3-7天的原始数据
位置:从历史数据平台获取
说明:外部平台从历史数据平台获取最近几天的原始数据
外部平台内部操作,从历史数据平台API获取数据,格式为JSONL(每行一个JSON对象)
输入:原始历史数据
输出:按天分割的窗口数组(3-7天)
位置:外部平台内部处理
说明:外部平台按日期分割数据,生成history_windows数组
{daily_results_json}
输入:用户ID
输出:用户档案(年龄、性别、习惯等)
位置:从用户数据库获取
说明:外部平台从用户数据库获取用户档案信息
{user_profile_json}
传入:window_data + history_windows + user_profile
返回:case JSON + LLM输入
位置:系统服务器
说明:外部平台传入所有数据,系统开始处理
window_data: 12条数据点(当前窗口)
history_windows: 3-7天的窗口数组(见步骤③)
user_profile: 用户档案(见步骤④)
处理:汇总信息,生成结构化case
输出:case JSON
实际结果:Case ID={case_id}, 异常类型={anomaly_type}, 持续{duration_days}天
说明:系统内部汇总所有信息,生成结构化case
{case_json[:2000]}...
(完整数据较长,此处仅显示前2000字符)
处理:深度分析,确认风险
输出:风险确认结果
实际结果:平均异常分数={avg_score:.4f}
说明:使用主时序模型深度分析,确认是否存在真实风险
{baseline_info_json}
处理:转换为Markdown格式
输出:健康分析报告(Markdown)
说明:将case JSON转换为Markdown格式,准备发送给LLM
{llm_input[:1000]}...
(完整内容较长,此处仅显示前1000字符)
传入:当前窗口(12条数据)
返回:event_id + 异常状态 + 分数
实际结果:event_id={event_id}, 状态={precheck_status}, 分数={precheck_score:.4f}
位置:系统服务器(缓存window_data)
说明:外部平台调用系统接口,系统检测异常并返回event_id
{precheck_json}
输入:precheck返回的event_id
输出:保存event_id,准备后续使用
实际结果:保存 event_id={event_id}
位置:外部平台内部
说明:外部平台保存event_id,准备后续调用build_case时使用
外部平台内部操作,将event_id保存到内存或数据库,供后续步骤使用
输入:用户ID
输出:最近3-7天的原始数据
位置:从历史数据平台获取
说明:外部平台从历史数据平台获取最近几天的原始数据
外部平台内部操作,从历史数据平台API获取数据,格式为JSONL(每行一个JSON对象)
输入:原始历史数据
输出:按天分割的窗口数组(3-7天)
位置:外部平台内部处理
说明:外部平台按日期分割数据,生成history_windows数组
{daily_results_json}
输入:用户ID
输出:用户档案(年龄、性别、习惯等)
位置:从用户数据库获取
说明:外部平台从用户数据库获取用户档案信息
{user_profile_json}
传入:event_id + history_windows + user_profile
系统操作:根据event_id从缓存获取window_data
返回:case JSON + LLM输入
位置:系统服务器
说明:外部平台只传event_id,系统从缓存获取window_data
event_id: {event_id}
history_windows: 3-7天的窗口数组(见步骤④)
user_profile: 用户档案(见步骤⑤)
注意:不传window_data,系统根据event_id从缓存获取
处理:汇总信息,生成结构化case
输出:case JSON
实际结果:Case ID={case_id}, 异常类型={anomaly_type}, 持续{duration_days}天
说明:系统内部汇总所有信息,生成结构化case
{case_json[:2000]}...
(完整数据较长,此处仅显示前2000字符)
处理:深度分析,确认风险
输出:风险确认结果
实际结果:平均异常分数={avg_score:.4f}
说明:使用主时序模型深度分析,确认是否存在真实风险
{baseline_info_json}
处理:转换为Markdown格式
输出:健康分析报告(Markdown)
说明:将case JSON转换为Markdown格式,准备发送给LLM
{llm_input[:1000]}...
(完整内容较长,此处仅显示前1000字符)
请选择一个案例
"), gr.update(value="请选择一个案例
") ) # 获取案例信息 info = PATCHAD_CASE_INFO.get(case_name, {}) description = info.get("description", "") # 获取原始数据和使用说明 raw_data, usage_info = get_case_raw_data(case_name) if not raw_data: raw_data = "❌ 无法加载原始数据" if not usage_info: usage_info = "❌ 无法加载使用说明" # 生成描述文本 desc_text = "" if description: desc_text = f"**案例说明**:{description}\n\n" # 生成流程图 flow_a = generate_flow_diagram("A", case_name) flow_b = generate_flow_diagram("B", case_name) # 加载案例数据用于展示 data_a = load_case_data(case_name, True) data_b = load_case_data(case_name, False) # 生成详细数据展示 detail_a = "## 模式A:外部平台有 PatchAD\n\n" if data_a: precheck_a = data_a.get("precheck", {}) bundle_a = data_a.get("case_bundle", {}) case_a = bundle_a.get("case", {}) if bundle_a else {} detail_a += f""" ### 步骤①:PatchAD预筛结果 - **状态**: {precheck_a.get("status", "N/A")} - **分数**: {precheck_a.get("score", 0.0):.4f} - **阈值**: {precheck_a.get("threshold", 0.0):.4f} ### 步骤⑤-⑧:系统处理结果 - **Case ID**: {case_a.get("case_id", "N/A")} - **异常类型**: {case_a.get("anomaly_pattern", {}).get("type", "N/A")} - **持续天数**: {case_a.get("anomaly_pattern", {}).get("duration_days", 0)} 天 - **平均异常分数**: {case_a.get("anomaly_pattern", {}).get("avg_score", 0.0):.4f} - **基线类型**: {case_a.get("baseline_info", {}).get("baseline_type", "N/A")} - **基线可靠性**: {case_a.get("baseline_info", {}).get("baseline_reliability", "N/A")} """ else: detail_a += "❌ 无法加载案例数据" detail_b = "## 模式B:外部平台没有 PatchAD\n\n" if data_b: precheck_b = data_b.get("precheck", {}) bundle_b = data_b.get("case_bundle", {}) case_b = bundle_b.get("case", {}) if bundle_b else {} detail_b += f""" ### 步骤①:precheck接口返回 - **Event ID**: {precheck_b.get("event_id", "N/A")} - **状态**: {precheck_b.get("status", "N/A")} - **分数**: {precheck_b.get("score", 0.0):.4f} - **阈值**: {precheck_b.get("threshold", 0.0):.4f} ### 步骤⑥-⑨:系统处理结果 - **Case ID**: {case_b.get("case_id", "N/A")} - **异常类型**: {case_b.get("anomaly_pattern", {}).get("type", "N/A")} - **持续天数**: {case_b.get("anomaly_pattern", {}).get("duration_days", 0)} 天 - **平均异常分数**: {case_b.get("anomaly_pattern", {}).get("avg_score", 0.0):.4f} - **基线类型**: {case_b.get("baseline_info", {}).get("baseline_type", "N/A")} - **基线可靠性**: {case_b.get("baseline_info", {}).get("baseline_reliability", "N/A")} """ else: detail_b += "❌ 无法加载案例数据" return ( gr.update(value=raw_data), gr.update(value=usage_info), gr.update(value=desc_text, visible=bool(description)), gr.update(value=flow_a), gr.update(value=flow_b), gr.update(value=data_a, visible=bool(data_a)), gr.update(value=data_b, visible=bool(data_b)), gr.update(value=detail_a), gr.update(value=detail_b) ) pipeline_case_dropdown.change( update_case_preview, inputs=pipeline_case_dropdown, outputs=[case_raw_data_preview, case_usage_info, case_description, mode_a_flow, mode_b_flow, mode_a_data, mode_b_data, mode_a_detail, mode_b_detail] ) # 初始加载时也更新一次 if PATCHAD_CASE_NAMES: demo.load( lambda: update_case_preview(PATCHAD_CASE_NAMES[0]), outputs=[case_raw_data_preview, case_usage_info, case_description, mode_a_flow, mode_b_flow, mode_a_data, mode_b_data, mode_a_detail, mode_b_detail] ) # 旧的实时执行模式步骤面板已移除,现在使用新的Accordion方式 # 为了兼容模式切换,创建虚拟的step面板(不显示) step1_panel = gr.Column(visible=False) step2_panel = gr.Column(visible=False) step3_panel = gr.Column(visible=False) step4_panel = gr.Column(visible=False) step5_panel = gr.Column(visible=False) # 模式切换时显示/隐藏步骤面板 def switch_mode_with_steps(mode: str): if mode == "预生成案例": return ( gr.update(visible=True), # pregen_mode gr.update(visible=False), # realtime_mode gr.update(visible=False), # step1_panel gr.update(visible=False), # step2_panel gr.update(visible=False), # step3_panel gr.update(visible=False), # step4_panel gr.update(visible=False) # step5_panel ) else: return ( gr.update(visible=False), # pregen_mode gr.update(visible=True), # realtime_mode gr.update(visible=False), # step1_panel (不再使用) gr.update(visible=False), # step2_panel (不再使用) gr.update(visible=False), # step3_panel (不再使用) gr.update(visible=False), # step4_panel (不再使用) gr.update(visible=False) # step5_panel (不再使用) ) mode_radio.change( switch_mode_with_steps, inputs=mode_radio, outputs=[pregen_mode, realtime_mode, step1_panel, step2_panel, step3_panel, step4_panel, step5_panel] ) # 实时执行模式的按钮绑定(新版本) # 步骤① step1_load_example_btn.click( lambda: load_example_step1(), outputs=step1_input ) step1_run_btn.click( execute_step1_realtime_new, inputs=step1_input, outputs=[step1_output_json, step1_output_md, step1_output_col] ) # 步骤② step2_load_example_btn.click( lambda: load_example_step2(), outputs=step2_input ) step2_run_btn.click( execute_step2_realtime_new, inputs=step2_input, outputs=[step2_output_json, step2_output_md, step2_output_col] ) # 步骤③ step3_load_example_btn.click( lambda: load_example_step3(), outputs=step3_input ) step3_run_btn.click( execute_step3_realtime_new, inputs=step3_input, outputs=[step3_output_json, step3_output_md, step3_output_col] ) # 步骤④ step4_load_example_btn.click( lambda: load_example_step4(), outputs=step4_input ) step4_run_btn.click( execute_step4_realtime_new, inputs=step4_input, outputs=[step4_output_md, step4_explanation_md, step4_output_col] ) # 步骤⑤ step5_load_example_btn.click( lambda: load_example_step5(), outputs=step5_input ) step5_run_btn.click( execute_step5_realtime_new, inputs=step5_input, outputs=[step5_output_md, step5_explanation_md, step5_output_col] ) # API 文档页面 with gr.Tab("📚 API 文档", elem_id="api-doc"): gr.HTML("""本文档详细说明所有提供给外部平台的 API 接口,包括请求格式、响应结构和示例代码。
""") # 接口概览 with gr.Accordion("📋 接口概览", open=True): gr.Markdown(""" ### 提供的接口列表 | 接口 | 方法 | 路径 | 说明 | |------|------|------|------| | **预筛检测** | POST | `/api/precheck` | 轻量级异常预筛,返回 event_id 和异常状态 | | **构建案例** | POST | `/api/build_case` | 构建完整的健康异常案例,生成结构化 JSON 和 LLM 输入 | ### 两种接入模式 - **模式A**:外部平台有 PatchAD,直接调用 `build_case` - **模式B**:外部平台没有 PatchAD,先调用 `precheck`,再调用 `build_case` """) # 接口1: precheck with gr.Accordion("🔍 接口1: precheck - 预筛检测", open=True): gr.Markdown(""" ### 功能说明 轻量级异常预筛接口,适用于外部平台没有集成 PatchAD/PatchTrAD 的场景。系统使用内置的 PatchAD 算法对当前窗口数据进行快速检测,返回异常状态和唯一事件ID。 **适用场景**:模式B(外部平台没有 PatchAD) ### 请求格式 **接口路径**:`POST /api/precheck` **Content-Type**:`application/json` **请求体**: ```json { "user_id": "string", "window_data": [ { "timestamp": "2025-01-01T08:00:00", "deviceId": "user_001", "features": { "hr": 72.0, "hrv_rmssd": 35.0, "time_period_primary": "day", "data_quality": "high", "baseline_hrv_mean": 40.0, "baseline_hrv_std": 5.0 }, "static_features": { "age_group": 2, "sex": 0, "exercise": 1 } } ] } ``` **字段说明**: | 字段 | 类型 | 必填 | 说明 | |------|------|------|------| | `user_id` | string | ✅ | 用户唯一标识 | | `window_data` | array | ✅ | 当前窗口数据,至少12条,按时间顺序排列 | ### 响应格式 **成功响应**(HTTP 200): ```json { "event_id": "user_001-a1b2c3d4", "user_id": "user_001", "status": "abnormal", "score": 0.8234, "threshold": 0.35, "created_at": "2025-01-01T08:00:00Z", "expires_at": "2025-01-01T09:00:00Z", "required_fields": ["history_windows", "user_profile"] } ``` **响应字段说明**: | 字段 | 类型 | 说明 | |------|------|------| | `event_id` | string | 唯一事件ID,用于后续 build_case 调用 | | `status` | string | 异常状态:"normal" 或 "abnormal" | | `score` | float | 异常分数,范围 0.0 - 1.0 | | `threshold` | float | 系统使用的阈值 | | `expires_at` | string | event_id 过期时间 | ### Python 示例代码 ```python import requests url = "https://your-api-server.com/api/precheck" payload = { "user_id": "user_001", "window_data": current_window # 12条数据 } response = requests.post(url, json=payload) result = response.json() if result["status"] == "abnormal": event_id = result["event_id"] print(f"检测到异常,event_id: {event_id}") ``` """) # 接口2: build_case with gr.Accordion("🏗️ 接口2: build_case - 构建案例", open=True): gr.Markdown(""" ### 功能说明 构建完整的健康异常案例,整合当前窗口数据、历史窗口数据、用户档案等信息,生成结构化的 case JSON 和适合 LLM 处理的 Markdown 输入。 **适用场景**: - **模式A**:外部平台有 PatchAD,直接传入所有数据 - **模式B**:外部平台没有 PatchAD,传入 event_id(系统从缓存获取 window_data) ### 请求格式 **接口路径**:`POST /api/build_case` #### 模式A:外部平台有 PatchAD ```json { "event_id": "platform-evt-123", "user_id": "user_001", "window_data": [ /* 12条数据 */ ], "user_profile": { "age": 35, "gender": "male", "exercise_habit": "regular" }, "history_windows": [ /* 3-7天数据 */ ], "metadata": { "detector": "platform_patchad", "patchad_score": 0.82 } } ``` #### 模式B:外部平台没有 PatchAD ```json { "event_id": "user_001-a1b2c3d4", "user_id": "user_001", "user_profile": { /* 用户档案 */ }, "history_windows": [ /* 3-7天数据 */ ], "metadata": { "detector": "official_patchad" } } ``` **字段说明**: | 字段 | 类型 | 必填 | 说明 | |------|------|------|------| | `event_id` | string | 模式A可选,模式B必填 | 事件ID | | `user_id` | string | ✅ | 用户唯一标识 | | `window_data` | array | 模式A必填,模式B不传 | 当前窗口数据(12条) | | `user_profile` | object | ✅ | 用户档案信息 | | `history_windows` | array | ✅ | 历史窗口数据,3-7天,每天12条 | ### 响应格式 **成功响应**(HTTP 200): ```json { "validation": { "passed": true, "errors": [], "warnings": [] }, "case": { "case_id": "case-20250101-user_001-abc123", "user_id": "user_001", "created_at": "2025-01-01T08:00:00Z" }, "main_model_result": { "risk_confirmed": true, "anomaly_pattern": { /* ... */ } }, "llm_input": "# 健康异常检测分析报告\\n\\n...", "case_bundle": { /* 完整案例包 */ } } ``` ### Python 示例代码 #### 模式A示例 ```python import requests url = "https://your-api-server.com/api/build_case" payload = { "event_id": "platform-evt-123", "user_id": "user_001", "window_data": current_window, "user_profile": user_profile, "history_windows": history_windows, "metadata": {"detector": "platform_patchad"} } response = requests.post(url, json=payload) result = response.json() ``` #### 模式B示例 ```python # 步骤1:调用 precheck precheck_result = requests.post("/api/precheck", json=precheck_payload).json() if precheck_result["status"] == "abnormal": # 步骤2:调用 build_case build_case_payload = { "event_id": precheck_result["event_id"], "user_id": "user_001", "user_profile": user_profile, "history_windows": history_windows } result = requests.post("/api/build_case", json=build_case_payload).json() ``` """) # 常见问题 with gr.Accordion("❓ 常见问题", open=False): gr.Markdown(""" ### Q1: 模式A和模式B的主要区别是什么? **A**: - **模式A**:外部平台有 PatchAD,自己完成预筛,直接调用 `build_case` 传入所有数据 - **模式B**:外部平台没有 PatchAD,先调用 `precheck` 获取 `event_id`,再调用 `build_case` 时只传 `event_id` ### Q2: event_id 的有效期是多久? **A**: 通常为1小时,具体时间见 `precheck` 响应中的 `expires_at` 字段。 ### Q3: history_windows 需要多少天的数据? **A**: 建议3-7天,每天12条数据点。 ### Q4: 如果 window_data 不足12条怎么办? **A**: `window_data` 必须至少12条,否则会返回校验错误。 ### Q5: 主时序模型确认是什么? **A**: 系统可配置是否启用主时序模型进行二次确认。如果启用,`build_case` 响应中会包含 `main_model_result`。 """) gr.HTML("