housemonitor / engine.py
lyimo's picture
Upload 5 files
e5ef030 verified
Raw
History Blame Contribute Delete
15 kB
"""
Simulation + detection engine for the AMPds2 appliance-health / leakage monitor.
Design notes
------------
* TimesFM is used to build an *expected envelope* (q10..q90) for cyclic appliances from
the CLEAN healthy data once, at init. Playback then just compares the (possibly injected)
live signal against that cached envelope -> responsive even on CPU.
* Injections are applied only to the live playback stream, never to the envelope.
* Physical consistency: over/under-drawing an appliance also moves the mains by the same
delta, so the house residual (mains - sum(submeters)) is UNAFFECTED by appliance
injections and moves ONLY for a true leakage injection. That makes the two alert types
independent, exactly as in reality.
"""
from __future__ import annotations
import os
import numpy as np
import pandas as pd
# ---- meter code -> human name (single-load names confirmed from AMPds2 docs) ----
AMPDS2_NAMES = {
"WHE": "Whole house (mains)",
"CDE": "Clothes dryer", "CWE": "Clothes washer", "DWE": "Dishwasher",
"FGE": "Kitchen fridge", "FRE": "HVAC / furnace fan", "HPE": "Heat pump",
"WOE": "Wall oven", "BME": "Basement", "TVE": "Entertainment (TV/PVR)",
"UNE": "Unmetered (calc.)", "RSE": "Rental suite", "GRE": "Garage",
"B1E": "Bedroom 1", "B2E": "Bedroom 2 / master", "DNE": "Dining plugs",
"EBE": "Electronics bench", "EQE": "Security / network", "OFE": "Home office",
"OUE": "Outside plugs", "UTE": "Utility plugs", "HTE": "Instant hot water",
"MHE": "Main panel",
}
# appliances whose "normal" varies over time/season -> good TimesFM candidates
CYCLIC_CODES = {"FGE", "HPE", "FRE", "HTE", "CDE", "WOE"}
# ----------------------------------------------------------------------------- data
def load_hourly(path: str):
"""Load the hourly parquet produced by prepare_data.py. Returns (df, mains, submeters)."""
if not os.path.exists(path):
raise FileNotFoundError(
f"{path} not found. Run prepare_data.py on your AMPds2 zip first to create it."
)
df = pd.read_parquet(path)
if not isinstance(df.index, pd.DatetimeIndex):
# first column might be the timestamp
df.index = pd.to_datetime(df.iloc[:, 0])
df = df.iloc[:, 1:]
df = df.apply(pd.to_numeric, errors="coerce").sort_index()
mains = "WHE" if "WHE" in df.columns else df.mean().idxmax()
submeters = [c for c in df.columns if c != mains and c != "UNE"]
return df, mains, submeters
def _on_threshold(s: np.ndarray, frac=0.05) -> float:
s = s[~np.isnan(s)]
if s.size == 0:
return 5.0
return max(5.0, frac * float(np.quantile(s, 0.999)))
# ----------------------------------------------------------------------- TimesFM
_MODEL = None
def get_timesfm():
"""Lazy-load TimesFM 2.5. Returns the model, or None if unavailable."""
global _MODEL
if _MODEL is not None:
return _MODEL
try:
import torch
import timesfm
torch.set_float32_matmul_precision("high")
m = timesfm.TimesFM_2p5_200M_torch.from_pretrained("google/timesfm-2.5-200m-pytorch")
m.compile(timesfm.ForecastConfig(
max_context=1024, max_horizon=256, normalize_inputs=True,
use_continuous_quantile_head=True, force_flip_invariance=True,
infer_is_positive=True, fix_quantile_crossing=True,
))
_MODEL = m
return m
except Exception as e: # pragma: no cover
print("TimesFM unavailable, falling back to statistical envelope:", repr(e))
return None
def _envelope_timesfm(values_full, play_start, play_len, model, max_ctx=512, max_h=256):
point = np.full(play_len, np.nan)
q10 = np.full(play_len, np.nan)
q90 = np.full(play_len, np.nan)
i = 0
while i < play_len:
h = min(max_h, play_len - i)
ctx_end = play_start + i
ctx = values_full[max(0, ctx_end - max_ctx):ctx_end].astype(float)
ctx = np.nan_to_num(ctx, nan=float(np.nanmedian(ctx)) if np.isfinite(ctx).any() else 0.0)
pf, qf = model.forecast(horizon=h, inputs=[ctx])
p = np.asarray(pf)[0]
qq = np.asarray(qf)[0] # (h, 10): [mean, q10..q90]
point[i:i + h] = p[:h]
q10[i:i + h] = qq[:h, 1]
q90[i:i + h] = qq[:h, 9]
i += h
return point, q10, q90
def _envelope_seasonal(values_full, play_start, play_len, hours):
"""Cheap fallback envelope: hour-of-day mean +/- robust band from the context window."""
ctx = values_full[max(0, play_start - 24 * 30):play_start]
ctx_hours = hours[max(0, play_start - 24 * 30):play_start] if hours is not None else None
if ctx_hours is None or ctx.size == 0:
m = float(np.nanmean(values_full[:play_start])) if play_start else 0.0
return (np.full(play_len, m), np.full(play_len, m * 0.7), np.full(play_len, m * 1.3))
s = pd.Series(ctx, index=ctx_hours)
med = s.groupby(level=0).median()
mad = s.groupby(level=0).apply(lambda x: (x - x.median()).abs().median()).replace(0, 1.0)
play_hours = hours[play_start:play_start + play_len]
point = play_hours.map(med).to_numpy(dtype=float)
band = play_hours.map(1.4826 * 2 * mad).to_numpy(dtype=float)
return point, np.clip(point - band, 0, None), point + band
# --------------------------------------------------------------------- Simulator
class Simulator:
def __init__(self, df, mains, submeters):
self.df = df
self.mains = mains
self.submeters = submeters
self.focus = None
self.injections = {} # code -> {kind, factor, start}
self.leak = None # {"W": float, "start": int}
self.envelopes = {} # code -> (point, q10, q90)
self._live = None
self._dirty = True
# ---- setup -----------------------------------------------------------------
def setup(self, window_days=45, ctx_days=60):
ctx_len = int(ctx_days * 24)
play_len = int(window_days * 24)
play_start = ctx_len
if play_start + play_len > len(self.df):
play_len = max(24, len(self.df) - play_start)
self.ctx_len, self.play_len, self.play_start = ctx_len, play_len, play_start
self.play_index = self.df.index[play_start:play_start + play_len]
self.base_mains = self.df[self.mains].to_numpy()[play_start:play_start + play_len]
self.base_sub = self.df[self.submeters].iloc[play_start:play_start + play_len].copy()
self.base_residual = self.base_mains - self.base_sub.sum(axis=1).to_numpy()
# per-appliance baselines from the (healthy) context window
ctx = self.df.iloc[play_start - ctx_len:play_start]
self.rated, self.on_thr, self.duty = {}, {}, {}
for c in self.submeters:
v = ctx[c].to_numpy()
thr = _on_threshold(v)
on = v[v > thr]
self.on_thr[c] = thr
self.rated[c] = float(np.median(on)) if on.size else 0.0
self.duty[c] = float(np.mean(v > thr)) if v.size else 0.0
# residual normal band: hour-of-day profile + a global robust scale (+ absolute floor)
res_ctx = pd.Series(ctx[self.mains].to_numpy() - ctx[self.submeters].sum(axis=1).to_numpy(),
index=ctx.index)
prof = res_ctx.groupby(res_ctx.index.hour).median()
resid = res_ctx.to_numpy() - res_ctx.index.hour.map(prof).to_numpy(dtype=float)
sigma = 1.4826 * float(np.median(np.abs(resid - np.median(resid))))
margin = max(4.0 * sigma, 40.0) # never trust a degenerate (near-zero) band
ph = pd.Series(self.play_index.hour)
self.res_expected = ph.map(prof).to_numpy(dtype=float)
self.res_upper = self.res_expected + margin
self.focus = self._default_focus()
self.injections, self.leak, self.envelopes = {}, None, {}
self._dirty = True
def _default_focus(self):
cyc = [c for c in self.submeters if c in CYCLIC_CODES and self.rated.get(c, 0) > 0]
pool = cyc if cyc else self.submeters
return max(pool, key=lambda c: self.rated.get(c, 0) * max(self.duty.get(c, 0), 0.01))
def appliance_choices(self):
return [(f"{AMPDS2_NAMES.get(c, c)} ({c})", c)
for c in sorted(self.submeters, key=lambda c: -self.rated.get(c, 0))]
# ---- envelopes -------------------------------------------------------------
def build_envelopes(self, codes, model):
full = {c: self.df[c].to_numpy() for c in codes}
hours = pd.Series(self.df.index.hour)
for c in codes:
if c in self.envelopes:
continue
if model is not None:
self.envelopes[c] = _envelope_timesfm(full[c], self.play_start, self.play_len, model)
else:
self.envelopes[c] = _envelope_seasonal(full[c], self.play_start, self.play_len, hours)
# ---- injections ------------------------------------------------------------
def set_injection(self, code, kind, pct, start_pct):
if code is None:
return
if kind == "Off":
factor = 0.0
elif kind == "Over-draw":
factor = 1.0 + pct / 100.0
elif kind == "Under-draw":
factor = max(0.0, 1.0 - pct / 100.0)
else:
return
start = int(np.clip(start_pct / 100.0, 0, 1) * self.play_len)
self.injections[code] = {"kind": kind, "factor": factor, "start": start}
self._dirty = True
def clear_injections(self):
self.injections = {}
self._dirty = True
def set_leak(self, watts, start_pct):
start = int(np.clip(start_pct / 100.0, 0, 1) * self.play_len)
self.leak = {"W": float(watts), "start": start}
self._dirty = True
def clear_leak(self):
self.leak = None
self._dirty = True
# ---- live frame (vectorised over the whole playback window) -----------------
def live(self):
if self._live is not None and not self._dirty:
return self._live
live_sub = self.base_sub.copy()
idx = np.arange(self.play_len)
for c, inj in self.injections.items():
if c not in live_sub.columns:
continue
mask = idx >= inj["start"]
col = live_sub[c].to_numpy().copy()
col[mask] = col[mask] * inj["factor"]
live_sub[c] = col
delta = (live_sub.sum(axis=1).to_numpy() - self.base_sub.sum(axis=1).to_numpy())
leak_arr = np.zeros(self.play_len)
if self.leak is not None:
leak_arr[idx >= self.leak["start"]] = self.leak["W"]
mains_live = self.base_mains + delta + leak_arr
residual_live = mains_live - live_sub.sum(axis=1).to_numpy() # == base_residual + leak
self._live = {"mains": mains_live, "sub": live_sub, "res": residual_live, "leak": leak_arr}
self._dirty = False
return self._live
# ---- detection -------------------------------------------------------------
def detect(self, cursor, over_tol=0.15, under_tol=0.15, win_app=168, win_leak=24, min_on=3):
L = self.live()
lo = max(0, cursor - win_app + 1)
alerts, rows = [], []
for c in self.submeters:
name = AMPDS2_NAMES.get(c, c)
rated = self.rated.get(c, 0.0)
thr = self.on_thr.get(c, 5.0)
seg = L["sub"][c].to_numpy()[lo:cursor + 1]
base = self.base_sub[c].to_numpy()[lo:cursor + 1]
base_duty = float(np.mean(base > thr)) if base.size else 0.0
window_len = cursor - lo + 1
on_frac_live = float(np.mean(seg > thr)) if seg.size else 0.0
on_live = seg[seg > thr]
live_on = float(np.median(on_live)) if on_live.size >= min_on else 0.0
# Robust rule for ALL appliances (TimesFM band is the chart's visual evidence, not the
# trigger, since a forecast band can't bracket sharp on/off loads):
# * warm up before judging (need a filled window)
# * OFF only for normally-busy appliances that have gone essentially absent
# * otherwise compare trailing ON-power to the healthy rated value
if window_len < 12 or base_duty < 0.02:
status = "idle"
live_w = live_on if live_on > 0 else (float(np.mean(seg)) if seg.size else 0.0)
elif base_duty > 0.1 and on_frac_live < 0.15 * base_duty:
status = "OFF"
live_w = float(np.mean(seg)) if seg.size else 0.0
elif live_on > 0:
ratio = live_on / rated if rated > 0 else 1.0
status = "OVER" if ratio > 1 + over_tol else "UNDER" if ratio < 1 - under_tol else "OK"
live_w = live_on
else:
status = "idle"
live_w = float(np.mean(seg)) if seg.size else 0.0
rows.append({"code": c, "name": name, "live_W": round(live_w),
"rated_W": round(rated), "status": status})
if status in ("OVER", "UNDER", "OFF"):
pct = (live_w / rated * 100) if rated > 0 else 0
if status == "OVER":
msg = f"drawing {pct:.0f}% of rated ({live_w:.0f} W vs {rated:.0f} W)"
elif status == "UNDER":
msg = f"only {pct:.0f}% of rated ({live_w:.0f} W vs {rated:.0f} W)"
else:
msg = f"off / no draw while normally active (expected ~{rated:.0f} W)"
alerts.append({"code": c, "name": name, "status": status,
"severity": "crit" if status in ("OVER", "OFF") else "warn",
"msg": msg})
# leakage
res = L["res"]
ll = max(0, cursor - win_leak + 1)
seg = res[ll:cursor + 1]
up = self.res_upper[ll:cursor + 1]
leak_now = float(res[cursor])
exp = float(self.res_expected[cursor])
sustained = float(np.mean(seg > up)) if seg.size else 0.0
leak_flag = sustained > 0.6 and (leak_now - exp) > 50
if leak_flag:
alerts.insert(0, {"code": "HOUSE", "name": "Whole house",
"status": "LEAK", "severity": "crit",
"msg": f"unaccounted load ~{leak_now - exp:.0f} W above normal "
f"(residual {leak_now:.0f} W vs ~{exp:.0f} W expected)"})
leak_info = {"now": leak_now, "expected": exp, "flag": leak_flag}
return alerts, rows, leak_info
# ---- KPIs ------------------------------------------------------------------
def kpis(self, cursor):
L = self.live()
load = float(L["mains"][cursor])
energy = float(np.sum(L["mains"][:cursor + 1])) / 1000.0 # hourly W -> kWh
ts = self.play_index[cursor]
return {"time": ts, "load_W": load, "energy_kWh": energy,
"i": cursor, "n": self.play_len}