""" Simulation + detection engine for the AMPds2 appliance-health / leakage monitor. Design notes ------------ * TimesFM is used to build an *expected envelope* (q10..q90) for cyclic appliances from the CLEAN healthy data once, at init. Playback then just compares the (possibly injected) live signal against that cached envelope -> responsive even on CPU. * Injections are applied only to the live playback stream, never to the envelope. * Physical consistency: over/under-drawing an appliance also moves the mains by the same delta, so the house residual (mains - sum(submeters)) is UNAFFECTED by appliance injections and moves ONLY for a true leakage injection. That makes the two alert types independent, exactly as in reality. """ from __future__ import annotations import os import numpy as np import pandas as pd # ---- meter code -> human name (single-load names confirmed from AMPds2 docs) ---- AMPDS2_NAMES = { "WHE": "Whole house (mains)", "CDE": "Clothes dryer", "CWE": "Clothes washer", "DWE": "Dishwasher", "FGE": "Kitchen fridge", "FRE": "HVAC / furnace fan", "HPE": "Heat pump", "WOE": "Wall oven", "BME": "Basement", "TVE": "Entertainment (TV/PVR)", "UNE": "Unmetered (calc.)", "RSE": "Rental suite", "GRE": "Garage", "B1E": "Bedroom 1", "B2E": "Bedroom 2 / master", "DNE": "Dining plugs", "EBE": "Electronics bench", "EQE": "Security / network", "OFE": "Home office", "OUE": "Outside plugs", "UTE": "Utility plugs", "HTE": "Instant hot water", "MHE": "Main panel", } # appliances whose "normal" varies over time/season -> good TimesFM candidates CYCLIC_CODES = {"FGE", "HPE", "FRE", "HTE", "CDE", "WOE"} # ----------------------------------------------------------------------------- data def load_hourly(path: str): """Load the hourly parquet produced by prepare_data.py. Returns (df, mains, submeters).""" if not os.path.exists(path): raise FileNotFoundError( f"{path} not found. Run prepare_data.py on your AMPds2 zip first to create it." ) df = pd.read_parquet(path) if not isinstance(df.index, pd.DatetimeIndex): # first column might be the timestamp df.index = pd.to_datetime(df.iloc[:, 0]) df = df.iloc[:, 1:] df = df.apply(pd.to_numeric, errors="coerce").sort_index() mains = "WHE" if "WHE" in df.columns else df.mean().idxmax() submeters = [c for c in df.columns if c != mains and c != "UNE"] return df, mains, submeters def _on_threshold(s: np.ndarray, frac=0.05) -> float: s = s[~np.isnan(s)] if s.size == 0: return 5.0 return max(5.0, frac * float(np.quantile(s, 0.999))) # ----------------------------------------------------------------------- TimesFM _MODEL = None def get_timesfm(): """Lazy-load TimesFM 2.5. Returns the model, or None if unavailable.""" global _MODEL if _MODEL is not None: return _MODEL try: import torch import timesfm torch.set_float32_matmul_precision("high") m = timesfm.TimesFM_2p5_200M_torch.from_pretrained("google/timesfm-2.5-200m-pytorch") m.compile(timesfm.ForecastConfig( max_context=1024, max_horizon=256, normalize_inputs=True, use_continuous_quantile_head=True, force_flip_invariance=True, infer_is_positive=True, fix_quantile_crossing=True, )) _MODEL = m return m except Exception as e: # pragma: no cover print("TimesFM unavailable, falling back to statistical envelope:", repr(e)) return None def _envelope_timesfm(values_full, play_start, play_len, model, max_ctx=512, max_h=256): point = np.full(play_len, np.nan) q10 = np.full(play_len, np.nan) q90 = np.full(play_len, np.nan) i = 0 while i < play_len: h = min(max_h, play_len - i) ctx_end = play_start + i ctx = values_full[max(0, ctx_end - max_ctx):ctx_end].astype(float) ctx = np.nan_to_num(ctx, nan=float(np.nanmedian(ctx)) if np.isfinite(ctx).any() else 0.0) pf, qf = model.forecast(horizon=h, inputs=[ctx]) p = np.asarray(pf)[0] qq = np.asarray(qf)[0] # (h, 10): [mean, q10..q90] point[i:i + h] = p[:h] q10[i:i + h] = qq[:h, 1] q90[i:i + h] = qq[:h, 9] i += h return point, q10, q90 def _envelope_seasonal(values_full, play_start, play_len, hours): """Cheap fallback envelope: hour-of-day mean +/- robust band from the context window.""" ctx = values_full[max(0, play_start - 24 * 30):play_start] ctx_hours = hours[max(0, play_start - 24 * 30):play_start] if hours is not None else None if ctx_hours is None or ctx.size == 0: m = float(np.nanmean(values_full[:play_start])) if play_start else 0.0 return (np.full(play_len, m), np.full(play_len, m * 0.7), np.full(play_len, m * 1.3)) s = pd.Series(ctx, index=ctx_hours) med = s.groupby(level=0).median() mad = s.groupby(level=0).apply(lambda x: (x - x.median()).abs().median()).replace(0, 1.0) play_hours = hours[play_start:play_start + play_len] point = play_hours.map(med).to_numpy(dtype=float) band = play_hours.map(1.4826 * 2 * mad).to_numpy(dtype=float) return point, np.clip(point - band, 0, None), point + band # --------------------------------------------------------------------- Simulator class Simulator: def __init__(self, df, mains, submeters): self.df = df self.mains = mains self.submeters = submeters self.focus = None self.injections = {} # code -> {kind, factor, start} self.leak = None # {"W": float, "start": int} self.envelopes = {} # code -> (point, q10, q90) self._live = None self._dirty = True # ---- setup ----------------------------------------------------------------- def setup(self, window_days=45, ctx_days=60): ctx_len = int(ctx_days * 24) play_len = int(window_days * 24) play_start = ctx_len if play_start + play_len > len(self.df): play_len = max(24, len(self.df) - play_start) self.ctx_len, self.play_len, self.play_start = ctx_len, play_len, play_start self.play_index = self.df.index[play_start:play_start + play_len] self.base_mains = self.df[self.mains].to_numpy()[play_start:play_start + play_len] self.base_sub = self.df[self.submeters].iloc[play_start:play_start + play_len].copy() self.base_residual = self.base_mains - self.base_sub.sum(axis=1).to_numpy() # per-appliance baselines from the (healthy) context window ctx = self.df.iloc[play_start - ctx_len:play_start] self.rated, self.on_thr, self.duty = {}, {}, {} for c in self.submeters: v = ctx[c].to_numpy() thr = _on_threshold(v) on = v[v > thr] self.on_thr[c] = thr self.rated[c] = float(np.median(on)) if on.size else 0.0 self.duty[c] = float(np.mean(v > thr)) if v.size else 0.0 # residual normal band: hour-of-day profile + a global robust scale (+ absolute floor) res_ctx = pd.Series(ctx[self.mains].to_numpy() - ctx[self.submeters].sum(axis=1).to_numpy(), index=ctx.index) prof = res_ctx.groupby(res_ctx.index.hour).median() resid = res_ctx.to_numpy() - res_ctx.index.hour.map(prof).to_numpy(dtype=float) sigma = 1.4826 * float(np.median(np.abs(resid - np.median(resid)))) margin = max(4.0 * sigma, 40.0) # never trust a degenerate (near-zero) band ph = pd.Series(self.play_index.hour) self.res_expected = ph.map(prof).to_numpy(dtype=float) self.res_upper = self.res_expected + margin self.focus = self._default_focus() self.injections, self.leak, self.envelopes = {}, None, {} self._dirty = True def _default_focus(self): cyc = [c for c in self.submeters if c in CYCLIC_CODES and self.rated.get(c, 0) > 0] pool = cyc if cyc else self.submeters return max(pool, key=lambda c: self.rated.get(c, 0) * max(self.duty.get(c, 0), 0.01)) def appliance_choices(self): return [(f"{AMPDS2_NAMES.get(c, c)} ({c})", c) for c in sorted(self.submeters, key=lambda c: -self.rated.get(c, 0))] # ---- envelopes ------------------------------------------------------------- def build_envelopes(self, codes, model): full = {c: self.df[c].to_numpy() for c in codes} hours = pd.Series(self.df.index.hour) for c in codes: if c in self.envelopes: continue if model is not None: self.envelopes[c] = _envelope_timesfm(full[c], self.play_start, self.play_len, model) else: self.envelopes[c] = _envelope_seasonal(full[c], self.play_start, self.play_len, hours) # ---- injections ------------------------------------------------------------ def set_injection(self, code, kind, pct, start_pct): if code is None: return if kind == "Off": factor = 0.0 elif kind == "Over-draw": factor = 1.0 + pct / 100.0 elif kind == "Under-draw": factor = max(0.0, 1.0 - pct / 100.0) else: return start = int(np.clip(start_pct / 100.0, 0, 1) * self.play_len) self.injections[code] = {"kind": kind, "factor": factor, "start": start} self._dirty = True def clear_injections(self): self.injections = {} self._dirty = True def set_leak(self, watts, start_pct): start = int(np.clip(start_pct / 100.0, 0, 1) * self.play_len) self.leak = {"W": float(watts), "start": start} self._dirty = True def clear_leak(self): self.leak = None self._dirty = True # ---- live frame (vectorised over the whole playback window) ----------------- def live(self): if self._live is not None and not self._dirty: return self._live live_sub = self.base_sub.copy() idx = np.arange(self.play_len) for c, inj in self.injections.items(): if c not in live_sub.columns: continue mask = idx >= inj["start"] col = live_sub[c].to_numpy().copy() col[mask] = col[mask] * inj["factor"] live_sub[c] = col delta = (live_sub.sum(axis=1).to_numpy() - self.base_sub.sum(axis=1).to_numpy()) leak_arr = np.zeros(self.play_len) if self.leak is not None: leak_arr[idx >= self.leak["start"]] = self.leak["W"] mains_live = self.base_mains + delta + leak_arr residual_live = mains_live - live_sub.sum(axis=1).to_numpy() # == base_residual + leak self._live = {"mains": mains_live, "sub": live_sub, "res": residual_live, "leak": leak_arr} self._dirty = False return self._live # ---- detection ------------------------------------------------------------- def detect(self, cursor, over_tol=0.15, under_tol=0.15, win_app=168, win_leak=24, min_on=3): L = self.live() lo = max(0, cursor - win_app + 1) alerts, rows = [], [] for c in self.submeters: name = AMPDS2_NAMES.get(c, c) rated = self.rated.get(c, 0.0) thr = self.on_thr.get(c, 5.0) seg = L["sub"][c].to_numpy()[lo:cursor + 1] base = self.base_sub[c].to_numpy()[lo:cursor + 1] base_duty = float(np.mean(base > thr)) if base.size else 0.0 window_len = cursor - lo + 1 on_frac_live = float(np.mean(seg > thr)) if seg.size else 0.0 on_live = seg[seg > thr] live_on = float(np.median(on_live)) if on_live.size >= min_on else 0.0 # Robust rule for ALL appliances (TimesFM band is the chart's visual evidence, not the # trigger, since a forecast band can't bracket sharp on/off loads): # * warm up before judging (need a filled window) # * OFF only for normally-busy appliances that have gone essentially absent # * otherwise compare trailing ON-power to the healthy rated value if window_len < 12 or base_duty < 0.02: status = "idle" live_w = live_on if live_on > 0 else (float(np.mean(seg)) if seg.size else 0.0) elif base_duty > 0.1 and on_frac_live < 0.15 * base_duty: status = "OFF" live_w = float(np.mean(seg)) if seg.size else 0.0 elif live_on > 0: ratio = live_on / rated if rated > 0 else 1.0 status = "OVER" if ratio > 1 + over_tol else "UNDER" if ratio < 1 - under_tol else "OK" live_w = live_on else: status = "idle" live_w = float(np.mean(seg)) if seg.size else 0.0 rows.append({"code": c, "name": name, "live_W": round(live_w), "rated_W": round(rated), "status": status}) if status in ("OVER", "UNDER", "OFF"): pct = (live_w / rated * 100) if rated > 0 else 0 if status == "OVER": msg = f"drawing {pct:.0f}% of rated ({live_w:.0f} W vs {rated:.0f} W)" elif status == "UNDER": msg = f"only {pct:.0f}% of rated ({live_w:.0f} W vs {rated:.0f} W)" else: msg = f"off / no draw while normally active (expected ~{rated:.0f} W)" alerts.append({"code": c, "name": name, "status": status, "severity": "crit" if status in ("OVER", "OFF") else "warn", "msg": msg}) # leakage res = L["res"] ll = max(0, cursor - win_leak + 1) seg = res[ll:cursor + 1] up = self.res_upper[ll:cursor + 1] leak_now = float(res[cursor]) exp = float(self.res_expected[cursor]) sustained = float(np.mean(seg > up)) if seg.size else 0.0 leak_flag = sustained > 0.6 and (leak_now - exp) > 50 if leak_flag: alerts.insert(0, {"code": "HOUSE", "name": "Whole house", "status": "LEAK", "severity": "crit", "msg": f"unaccounted load ~{leak_now - exp:.0f} W above normal " f"(residual {leak_now:.0f} W vs ~{exp:.0f} W expected)"}) leak_info = {"now": leak_now, "expected": exp, "flag": leak_flag} return alerts, rows, leak_info # ---- KPIs ------------------------------------------------------------------ def kpis(self, cursor): L = self.live() load = float(L["mains"][cursor]) energy = float(np.sum(L["mains"][:cursor + 1])) / 1000.0 # hourly W -> kWh ts = self.play_index[cursor] return {"time": ts, "load_W": load, "energy_kWh": energy, "i": cursor, "n": self.play_len}