|
|
|
|
|
""" |
|
|
test_quickstart.py |
|
|
|
|
|
功能: |
|
|
1. 构造 1 小时窗口,演示实时异常检测(正常 / 异常两种场景) |
|
|
2. 构造 7 天数据,演示异常模式聚合 |
|
|
3. 输出格式化的LLM文案,方便直接接入大模型 |
|
|
|
|
|
运行方式: |
|
|
python test_quickstart.py |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import sys |
|
|
from pathlib import Path |
|
|
from datetime import datetime, timedelta |
|
|
import json |
|
|
import numpy as np |
|
|
import random |
|
|
import random |
|
|
|
|
|
ROOT_DIR = Path(__file__).parent.resolve() |
|
|
sys.path.insert(0, str(ROOT_DIR)) |
|
|
|
|
|
from wearable_anomaly_detector import WearableAnomalyDetector |
|
|
import importlib.util |
|
|
|
|
|
|
|
|
formatter_spec = importlib.util.spec_from_file_location( |
|
|
"formatter", ROOT_DIR / "utils" / "formatter.py" |
|
|
) |
|
|
formatter_module = importlib.util.module_from_spec(formatter_spec) |
|
|
formatter_spec.loader.exec_module(formatter_module) |
|
|
AnomalyFormatter = formatter_module.AnomalyFormatter |
|
|
FORMATTER = AnomalyFormatter() |
|
|
TEST_WINDOW_FILE = ROOT_DIR / "test_data" / "example_window.json" |
|
|
|
|
|
WINDOW_SIZE = 12 |
|
|
INTERVAL_MINUTES = 5 |
|
|
|
|
|
|
|
|
def make_point(ts: datetime, device_id: str, hrv: float, hr: float, include_static: bool = True) -> dict: |
|
|
"""构造单个数据点""" |
|
|
return { |
|
|
"timestamp": ts.isoformat(), |
|
|
"deviceId": device_id, |
|
|
"features": { |
|
|
"hr": float(hr), |
|
|
"hr_resting": 65.0, |
|
|
"hrv_rmssd": float(hrv), |
|
|
"hrv_sdnn": float(hrv * 1.2), |
|
|
"time_period_primary": "day", |
|
|
"time_period_secondary": "workday", |
|
|
"is_weekend": 0.0, |
|
|
"data_quality": "high", |
|
|
"baseline_hrv_mean": 75.0, |
|
|
"baseline_hrv_std": 5.0, |
|
|
}, |
|
|
"static_features": { |
|
|
"age_group": 2, |
|
|
"sex": 0, |
|
|
"exercise": 1, |
|
|
"coffee": 1, |
|
|
"drinking": 0, |
|
|
"MEQ": 50.0, |
|
|
} if include_static else {}, |
|
|
} |
|
|
|
|
|
|
|
|
def generate_window( |
|
|
device_id: str, |
|
|
start: datetime, |
|
|
base_hrv: float, |
|
|
base_hr: float, |
|
|
anomaly_level: float = 0.0, |
|
|
include_static: bool = True, |
|
|
missing_ratio: float = 0.0, |
|
|
) -> list: |
|
|
"""生成 1 小时窗口数据""" |
|
|
data = [] |
|
|
base_hrv_for_day = max(30, base_hrv - 18 * anomaly_level) |
|
|
base_hr_for_day = min(125, base_hr + 10 * anomaly_level) |
|
|
for i in range(WINDOW_SIZE): |
|
|
noise_hrv = np.random.normal(0, 3) |
|
|
noise_hr = np.random.normal(0, 1.5) |
|
|
decline = -15 * anomaly_level * (i / WINDOW_SIZE) |
|
|
increase = 8 * anomaly_level * (i / WINDOW_SIZE) |
|
|
hrv = max(25, base_hrv_for_day + noise_hrv + decline) |
|
|
hr = min(125, base_hr_for_day + noise_hr + increase) |
|
|
ts = start + timedelta(minutes=INTERVAL_MINUTES * i) |
|
|
point = make_point(ts, device_id, hrv, hr, include_static=include_static) |
|
|
|
|
|
if missing_ratio > 0 and random.random() < missing_ratio: |
|
|
point["features"].pop("hr_resting", None) |
|
|
point["features"].pop("baseline_hrv_mean", None) |
|
|
point["features"].pop("baseline_hrv_std", None) |
|
|
if random.random() < 0.5: |
|
|
point["static_features"] = {} |
|
|
|
|
|
data.append(point) |
|
|
return data |
|
|
|
|
|
|
|
|
def load_window_from_file(path: Path) -> list | None: |
|
|
try: |
|
|
with open(path, "r", encoding="utf-8") as f: |
|
|
data = json.load(f) |
|
|
assert isinstance(data, list) and data, "JSON needs to be a non-empty list" |
|
|
return data |
|
|
except Exception as exc: |
|
|
print(f" ⚠️ 读取 {path.name} 失败: {exc}") |
|
|
return None |
|
|
|
|
|
|
|
|
def demo_from_file(detector: WearableAnomalyDetector) -> None: |
|
|
print("\n" + "=" * 80) |
|
|
print("示例文件推理(test_data/example_window.json)") |
|
|
print("=" * 80) |
|
|
|
|
|
if not TEST_WINDOW_FILE.exists(): |
|
|
print(f" ⚠️ 未找到 {TEST_WINDOW_FILE}, 请确认仓库中存在该文件") |
|
|
return |
|
|
|
|
|
window = load_window_from_file(TEST_WINDOW_FILE) |
|
|
if not window: |
|
|
return |
|
|
|
|
|
avg_hrv = np.nanmean([pt["features"]["hrv_rmssd"] for pt in window]) |
|
|
avg_hr = np.nanmean([pt["features"]["hr"] for pt in window]) |
|
|
print(f" - 数据点数: {len(window)}") |
|
|
print(f" - 平均 HRV: {avg_hrv:.2f} ms, 平均心率: {avg_hr:.1f} bpm") |
|
|
|
|
|
result = detector.detect_realtime(window, update_baseline=False) |
|
|
print( |
|
|
f" -> 是否异常: {'是 ⚠️' if result.get('is_anomaly') else '否'} | " |
|
|
f"分数: {result.get('anomaly_score', 0):.4f} | 阈值: {result.get('threshold', 0):.4f}" |
|
|
) |
|
|
|
|
|
baseline_info = { |
|
|
"baseline_mean": 76.0, |
|
|
"baseline_std": 5.0, |
|
|
"current_value": avg_hrv, |
|
|
"deviation_pct": (avg_hrv - 76.0) / 76.0 * 100, |
|
|
} |
|
|
llm_text = FORMATTER.format_for_llm(result, baseline_info=baseline_info) |
|
|
print("\n LLM 文本片段(前 350 字符):") |
|
|
print("-" * 60) |
|
|
print(llm_text[:350]) |
|
|
print("...") |
|
|
print("-" * 60) |
|
|
|
|
|
|
|
|
def demo_realtime(detector: WearableAnomalyDetector) -> None: |
|
|
print("\n" + "=" * 80) |
|
|
print("实时检测示例") |
|
|
print("=" * 80) |
|
|
|
|
|
start = datetime.now() - timedelta(hours=1) |
|
|
normal_window = generate_window("demo_normal", start, base_hrv=76, base_hr=68, anomaly_level=0.0) |
|
|
anomaly_window = generate_window("demo_anomaly", start, base_hrv=74, base_hr=70, anomaly_level=0.7) |
|
|
|
|
|
for title, window in [("正常窗口", normal_window), ("异常窗口", anomaly_window)]: |
|
|
avg_hrv = np.mean([pt["features"]["hrv_rmssd"] for pt in window]) |
|
|
avg_hr = np.mean([pt["features"]["hr"] for pt in window]) |
|
|
print(f"\n[{title}] HRV≈{avg_hrv:.2f} ms, HR≈{avg_hr:.1f} bpm") |
|
|
result = detector.detect_realtime(window, update_baseline=False) |
|
|
print( |
|
|
f" -> 是否异常: {'是 ⚠️' if result.get('is_anomaly') else '否'} | " |
|
|
f"分数: {result.get('anomaly_score', 0):.4f} | 阈值: {result.get('threshold', 0):.4f}" |
|
|
) |
|
|
|
|
|
|
|
|
def demo_pattern(detector: WearableAnomalyDetector) -> None: |
|
|
print("\n" + "=" * 80) |
|
|
print("7 天异常模式聚合示例") |
|
|
print("=" * 80) |
|
|
|
|
|
base_date = datetime.now() - timedelta(days=7) |
|
|
daily_data = [] |
|
|
anomaly_plan = [0.0, 0.1, 0.3, 1.0, 1.4, 1.8, 1.8] |
|
|
avg_hrv_per_day = [] |
|
|
|
|
|
for day, anomaly_level in enumerate(anomaly_plan): |
|
|
day_start = base_date + timedelta(days=day) |
|
|
window = generate_window( |
|
|
device_id="demo_pattern", |
|
|
start=day_start.replace(hour=8, minute=0, second=0, microsecond=0), |
|
|
base_hrv=75, |
|
|
base_hr=69, |
|
|
anomaly_level=anomaly_level, |
|
|
) |
|
|
daily_data.append(window) |
|
|
avg_hrv_per_day.append(np.mean([pt["features"]["hrv_rmssd"] for pt in window])) |
|
|
|
|
|
print(" 日均HRV轨迹: " + ", ".join(f"{val:.1f}" for val in avg_hrv_per_day)) |
|
|
|
|
|
result = detector.detect_pattern( |
|
|
daily_data, |
|
|
days=len(daily_data), |
|
|
min_duration_days=2, |
|
|
format_for_llm=True |
|
|
) |
|
|
pattern = result.get("anomaly_pattern", {}) |
|
|
print( |
|
|
f" -> 是否有模式: {'是' if pattern.get('has_pattern') else '否'} | " |
|
|
f"持续天数: {pattern.get('duration_days', 0)} | 趋势: {pattern.get('trend', '未知')}" |
|
|
) |
|
|
|
|
|
if "formatted_for_llm" in result: |
|
|
print("\n格式化输出(前 400 字符):") |
|
|
print("-" * 60) |
|
|
print(result["formatted_for_llm"][:400]) |
|
|
print("...") |
|
|
print("-" * 60) |
|
|
|
|
|
|
|
|
def demo_missing_data(detector: WearableAnomalyDetector) -> None: |
|
|
print("\n" + "=" * 80) |
|
|
print("数据缺失 / 质量下降示例") |
|
|
print("=" * 80) |
|
|
|
|
|
start = datetime.now() - timedelta(hours=1) |
|
|
incomplete_window = generate_window( |
|
|
device_id="demo_missing", |
|
|
start=start, |
|
|
base_hrv=74, |
|
|
base_hr=71, |
|
|
anomaly_level=0.5, |
|
|
include_static=True, |
|
|
missing_ratio=0.4, |
|
|
) |
|
|
|
|
|
|
|
|
for idx in (3, 7): |
|
|
incomplete_window[idx]["features"]["data_quality"] = "low" |
|
|
incomplete_window[idx]["features"]["hr"] = float("nan") |
|
|
|
|
|
avg_hrv = np.nanmean([pt["features"].get("hrv_rmssd", np.nan) for pt in incomplete_window]) |
|
|
available_static = sum(bool(pt["static_features"]) for pt in incomplete_window) |
|
|
print(f" - 有效静态特征点数: {available_static}/{len(incomplete_window)}") |
|
|
print(f" - 平均 HRV(忽略缺失): {avg_hrv:.2f} ms") |
|
|
|
|
|
result = detector.detect_realtime(incomplete_window, update_baseline=False) |
|
|
print( |
|
|
f" -> 是否异常: {'是' if result.get('is_anomaly') else '否'} | " |
|
|
f"分数: {result.get('anomaly_score', 0):.4f} | 阈值: {result.get('threshold', 0):.4f}" |
|
|
) |
|
|
|
|
|
|
|
|
def main() -> None: |
|
|
model_dir = ROOT_DIR / "checkpoints" / "phase2" / "exp_factor_balanced" |
|
|
detector = WearableAnomalyDetector(model_dir=model_dir, device="cpu") |
|
|
detector.update_threshold(0.50) |
|
|
demo_from_file(detector) |
|
|
demo_realtime(detector) |
|
|
demo_pattern(detector) |
|
|
demo_missing_data(detector) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|
|
|
|