oscarzhang's picture
Upload folder using huggingface_hub
76d412c verified
#!/usr/bin/env python3
"""
test_quickstart.py
功能:
1. 构造 1 小时窗口,演示实时异常检测(正常 / 异常两种场景)
2. 构造 7 天数据,演示异常模式聚合
3. 输出格式化的LLM文案,方便直接接入大模型
运行方式:
python test_quickstart.py
"""
from __future__ import annotations
import sys
from pathlib import Path
from datetime import datetime, timedelta
import json
import numpy as np
import random
import random
ROOT_DIR = Path(__file__).parent.resolve()
sys.path.insert(0, str(ROOT_DIR))
from wearable_anomaly_detector import WearableAnomalyDetector
import importlib.util
# 动态导入 utils.formatter,避免相对路径问题
formatter_spec = importlib.util.spec_from_file_location(
"formatter", ROOT_DIR / "utils" / "formatter.py"
)
formatter_module = importlib.util.module_from_spec(formatter_spec)
formatter_spec.loader.exec_module(formatter_module)
AnomalyFormatter = formatter_module.AnomalyFormatter
FORMATTER = AnomalyFormatter()
TEST_WINDOW_FILE = ROOT_DIR / "test_data" / "example_window.json"
WINDOW_SIZE = 12 # 12 * 5 分钟 = 1 小时
INTERVAL_MINUTES = 5
def make_point(ts: datetime, device_id: str, hrv: float, hr: float, include_static: bool = True) -> dict:
"""构造单个数据点"""
return {
"timestamp": ts.isoformat(),
"deviceId": device_id,
"features": {
"hr": float(hr),
"hr_resting": 65.0,
"hrv_rmssd": float(hrv),
"hrv_sdnn": float(hrv * 1.2),
"time_period_primary": "day",
"time_period_secondary": "workday",
"is_weekend": 0.0,
"data_quality": "high",
"baseline_hrv_mean": 75.0,
"baseline_hrv_std": 5.0,
},
"static_features": {
"age_group": 2,
"sex": 0,
"exercise": 1,
"coffee": 1,
"drinking": 0,
"MEQ": 50.0,
} if include_static else {},
}
def generate_window(
device_id: str,
start: datetime,
base_hrv: float,
base_hr: float,
anomaly_level: float = 0.0,
include_static: bool = True,
missing_ratio: float = 0.0,
) -> list:
"""生成 1 小时窗口数据"""
data = []
base_hrv_for_day = max(30, base_hrv - 18 * anomaly_level)
base_hr_for_day = min(125, base_hr + 10 * anomaly_level)
for i in range(WINDOW_SIZE):
noise_hrv = np.random.normal(0, 3)
noise_hr = np.random.normal(0, 1.5)
decline = -15 * anomaly_level * (i / WINDOW_SIZE)
increase = 8 * anomaly_level * (i / WINDOW_SIZE)
hrv = max(25, base_hrv_for_day + noise_hrv + decline)
hr = min(125, base_hr_for_day + noise_hr + increase)
ts = start + timedelta(minutes=INTERVAL_MINUTES * i)
point = make_point(ts, device_id, hrv, hr, include_static=include_static)
if missing_ratio > 0 and random.random() < missing_ratio:
point["features"].pop("hr_resting", None)
point["features"].pop("baseline_hrv_mean", None)
point["features"].pop("baseline_hrv_std", None)
if random.random() < 0.5:
point["static_features"] = {}
data.append(point)
return data
def load_window_from_file(path: Path) -> list | None:
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
assert isinstance(data, list) and data, "JSON needs to be a non-empty list"
return data
except Exception as exc:
print(f" ⚠️ 读取 {path.name} 失败: {exc}")
return None
def demo_from_file(detector: WearableAnomalyDetector) -> None:
print("\n" + "=" * 80)
print("示例文件推理(test_data/example_window.json)")
print("=" * 80)
if not TEST_WINDOW_FILE.exists():
print(f" ⚠️ 未找到 {TEST_WINDOW_FILE}, 请确认仓库中存在该文件")
return
window = load_window_from_file(TEST_WINDOW_FILE)
if not window:
return
avg_hrv = np.nanmean([pt["features"]["hrv_rmssd"] for pt in window])
avg_hr = np.nanmean([pt["features"]["hr"] for pt in window])
print(f" - 数据点数: {len(window)}")
print(f" - 平均 HRV: {avg_hrv:.2f} ms, 平均心率: {avg_hr:.1f} bpm")
result = detector.detect_realtime(window, update_baseline=False)
print(
f" -> 是否异常: {'是 ⚠️' if result.get('is_anomaly') else '否'} | "
f"分数: {result.get('anomaly_score', 0):.4f} | 阈值: {result.get('threshold', 0):.4f}"
)
baseline_info = {
"baseline_mean": 76.0,
"baseline_std": 5.0,
"current_value": avg_hrv,
"deviation_pct": (avg_hrv - 76.0) / 76.0 * 100,
}
llm_text = FORMATTER.format_for_llm(result, baseline_info=baseline_info)
print("\n LLM 文本片段(前 350 字符):")
print("-" * 60)
print(llm_text[:350])
print("...")
print("-" * 60)
def demo_realtime(detector: WearableAnomalyDetector) -> None:
print("\n" + "=" * 80)
print("实时检测示例")
print("=" * 80)
start = datetime.now() - timedelta(hours=1)
normal_window = generate_window("demo_normal", start, base_hrv=76, base_hr=68, anomaly_level=0.0)
anomaly_window = generate_window("demo_anomaly", start, base_hrv=74, base_hr=70, anomaly_level=0.7)
for title, window in [("正常窗口", normal_window), ("异常窗口", anomaly_window)]:
avg_hrv = np.mean([pt["features"]["hrv_rmssd"] for pt in window])
avg_hr = np.mean([pt["features"]["hr"] for pt in window])
print(f"\n[{title}] HRV≈{avg_hrv:.2f} ms, HR≈{avg_hr:.1f} bpm")
result = detector.detect_realtime(window, update_baseline=False)
print(
f" -> 是否异常: {'是 ⚠️' if result.get('is_anomaly') else '否'} | "
f"分数: {result.get('anomaly_score', 0):.4f} | 阈值: {result.get('threshold', 0):.4f}"
)
def demo_pattern(detector: WearableAnomalyDetector) -> None:
print("\n" + "=" * 80)
print("7 天异常模式聚合示例")
print("=" * 80)
base_date = datetime.now() - timedelta(days=7)
daily_data = []
anomaly_plan = [0.0, 0.1, 0.3, 1.0, 1.4, 1.8, 1.8]
avg_hrv_per_day = []
for day, anomaly_level in enumerate(anomaly_plan):
day_start = base_date + timedelta(days=day)
window = generate_window(
device_id="demo_pattern",
start=day_start.replace(hour=8, minute=0, second=0, microsecond=0),
base_hrv=75,
base_hr=69,
anomaly_level=anomaly_level,
)
daily_data.append(window)
avg_hrv_per_day.append(np.mean([pt["features"]["hrv_rmssd"] for pt in window]))
print(" 日均HRV轨迹: " + ", ".join(f"{val:.1f}" for val in avg_hrv_per_day))
result = detector.detect_pattern(
daily_data,
days=len(daily_data),
min_duration_days=2,
format_for_llm=True
)
pattern = result.get("anomaly_pattern", {})
print(
f" -> 是否有模式: {'是' if pattern.get('has_pattern') else '否'} | "
f"持续天数: {pattern.get('duration_days', 0)} | 趋势: {pattern.get('trend', '未知')}"
)
if "formatted_for_llm" in result:
print("\n格式化输出(前 400 字符):")
print("-" * 60)
print(result["formatted_for_llm"][:400])
print("...")
print("-" * 60)
def demo_missing_data(detector: WearableAnomalyDetector) -> None:
print("\n" + "=" * 80)
print("数据缺失 / 质量下降示例")
print("=" * 80)
start = datetime.now() - timedelta(hours=1)
incomplete_window = generate_window(
device_id="demo_missing",
start=start,
base_hrv=74,
base_hr=71,
anomaly_level=0.5,
include_static=True,
missing_ratio=0.4,
)
# 模拟传感器丢包:移除 2 个时间点 & 降低数据质量
for idx in (3, 7):
incomplete_window[idx]["features"]["data_quality"] = "low"
incomplete_window[idx]["features"]["hr"] = float("nan")
avg_hrv = np.nanmean([pt["features"].get("hrv_rmssd", np.nan) for pt in incomplete_window])
available_static = sum(bool(pt["static_features"]) for pt in incomplete_window)
print(f" - 有效静态特征点数: {available_static}/{len(incomplete_window)}")
print(f" - 平均 HRV(忽略缺失): {avg_hrv:.2f} ms")
result = detector.detect_realtime(incomplete_window, update_baseline=False)
print(
f" -> 是否异常: {'是' if result.get('is_anomaly') else '否'} | "
f"分数: {result.get('anomaly_score', 0):.4f} | 阈值: {result.get('threshold', 0):.4f}"
)
def main() -> None:
model_dir = ROOT_DIR / "checkpoints" / "phase2" / "exp_factor_balanced"
detector = WearableAnomalyDetector(model_dir=model_dir, device="cpu")
detector.update_threshold(0.50)
demo_from_file(detector)
demo_realtime(detector)
demo_pattern(detector)
demo_missing_data(detector)
if __name__ == "__main__":
main()