Spaces:
Sleeping
Sleeping
| """ | |
| Simulation + detection engine for the AMPds2 appliance-health / leakage monitor. | |
| Design notes | |
| ------------ | |
| * TimesFM is used to build an *expected envelope* (q10..q90) for cyclic appliances from | |
| the CLEAN healthy data once, at init. Playback then just compares the (possibly injected) | |
| live signal against that cached envelope -> responsive even on CPU. | |
| * Injections are applied only to the live playback stream, never to the envelope. | |
| * Physical consistency: over/under-drawing an appliance also moves the mains by the same | |
| delta, so the house residual (mains - sum(submeters)) is UNAFFECTED by appliance | |
| injections and moves ONLY for a true leakage injection. That makes the two alert types | |
| independent, exactly as in reality. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import numpy as np | |
| import pandas as pd | |
| # ---- meter code -> human name (single-load names confirmed from AMPds2 docs) ---- | |
| AMPDS2_NAMES = { | |
| "WHE": "Whole house (mains)", | |
| "CDE": "Clothes dryer", "CWE": "Clothes washer", "DWE": "Dishwasher", | |
| "FGE": "Kitchen fridge", "FRE": "HVAC / furnace fan", "HPE": "Heat pump", | |
| "WOE": "Wall oven", "BME": "Basement", "TVE": "Entertainment (TV/PVR)", | |
| "UNE": "Unmetered (calc.)", "RSE": "Rental suite", "GRE": "Garage", | |
| "B1E": "Bedroom 1", "B2E": "Bedroom 2 / master", "DNE": "Dining plugs", | |
| "EBE": "Electronics bench", "EQE": "Security / network", "OFE": "Home office", | |
| "OUE": "Outside plugs", "UTE": "Utility plugs", "HTE": "Instant hot water", | |
| "MHE": "Main panel", | |
| } | |
| # appliances whose "normal" varies over time/season -> good TimesFM candidates | |
| CYCLIC_CODES = {"FGE", "HPE", "FRE", "HTE", "CDE", "WOE"} | |
| # ----------------------------------------------------------------------------- data | |
| def load_hourly(path: str): | |
| """Load the hourly parquet produced by prepare_data.py. Returns (df, mains, submeters).""" | |
| if not os.path.exists(path): | |
| raise FileNotFoundError( | |
| f"{path} not found. Run prepare_data.py on your AMPds2 zip first to create it." | |
| ) | |
| df = pd.read_parquet(path) | |
| if not isinstance(df.index, pd.DatetimeIndex): | |
| # first column might be the timestamp | |
| df.index = pd.to_datetime(df.iloc[:, 0]) | |
| df = df.iloc[:, 1:] | |
| df = df.apply(pd.to_numeric, errors="coerce").sort_index() | |
| mains = "WHE" if "WHE" in df.columns else df.mean().idxmax() | |
| submeters = [c for c in df.columns if c != mains and c != "UNE"] | |
| return df, mains, submeters | |
| def _on_threshold(s: np.ndarray, frac=0.05) -> float: | |
| s = s[~np.isnan(s)] | |
| if s.size == 0: | |
| return 5.0 | |
| return max(5.0, frac * float(np.quantile(s, 0.999))) | |
| # ----------------------------------------------------------------------- TimesFM | |
| _MODEL = None | |
| def get_timesfm(): | |
| """Lazy-load TimesFM 2.5. Returns the model, or None if unavailable.""" | |
| global _MODEL | |
| if _MODEL is not None: | |
| return _MODEL | |
| try: | |
| import torch | |
| import timesfm | |
| torch.set_float32_matmul_precision("high") | |
| m = timesfm.TimesFM_2p5_200M_torch.from_pretrained("google/timesfm-2.5-200m-pytorch") | |
| m.compile(timesfm.ForecastConfig( | |
| max_context=1024, max_horizon=256, normalize_inputs=True, | |
| use_continuous_quantile_head=True, force_flip_invariance=True, | |
| infer_is_positive=True, fix_quantile_crossing=True, | |
| )) | |
| _MODEL = m | |
| return m | |
| except Exception as e: # pragma: no cover | |
| print("TimesFM unavailable, falling back to statistical envelope:", repr(e)) | |
| return None | |
| def _envelope_timesfm(values_full, play_start, play_len, model, max_ctx=512, max_h=256): | |
| point = np.full(play_len, np.nan) | |
| q10 = np.full(play_len, np.nan) | |
| q90 = np.full(play_len, np.nan) | |
| i = 0 | |
| while i < play_len: | |
| h = min(max_h, play_len - i) | |
| ctx_end = play_start + i | |
| ctx = values_full[max(0, ctx_end - max_ctx):ctx_end].astype(float) | |
| ctx = np.nan_to_num(ctx, nan=float(np.nanmedian(ctx)) if np.isfinite(ctx).any() else 0.0) | |
| pf, qf = model.forecast(horizon=h, inputs=[ctx]) | |
| p = np.asarray(pf)[0] | |
| qq = np.asarray(qf)[0] # (h, 10): [mean, q10..q90] | |
| point[i:i + h] = p[:h] | |
| q10[i:i + h] = qq[:h, 1] | |
| q90[i:i + h] = qq[:h, 9] | |
| i += h | |
| return point, q10, q90 | |
| def _envelope_seasonal(values_full, play_start, play_len, hours): | |
| """Cheap fallback envelope: hour-of-day mean +/- robust band from the context window.""" | |
| ctx = values_full[max(0, play_start - 24 * 30):play_start] | |
| ctx_hours = hours[max(0, play_start - 24 * 30):play_start] if hours is not None else None | |
| if ctx_hours is None or ctx.size == 0: | |
| m = float(np.nanmean(values_full[:play_start])) if play_start else 0.0 | |
| return (np.full(play_len, m), np.full(play_len, m * 0.7), np.full(play_len, m * 1.3)) | |
| s = pd.Series(ctx, index=ctx_hours) | |
| med = s.groupby(level=0).median() | |
| mad = s.groupby(level=0).apply(lambda x: (x - x.median()).abs().median()).replace(0, 1.0) | |
| play_hours = hours[play_start:play_start + play_len] | |
| point = play_hours.map(med).to_numpy(dtype=float) | |
| band = play_hours.map(1.4826 * 2 * mad).to_numpy(dtype=float) | |
| return point, np.clip(point - band, 0, None), point + band | |
| # --------------------------------------------------------------------- Simulator | |
| class Simulator: | |
| def __init__(self, df, mains, submeters): | |
| self.df = df | |
| self.mains = mains | |
| self.submeters = submeters | |
| self.focus = None | |
| self.injections = {} # code -> {kind, factor, start} | |
| self.leak = None # {"W": float, "start": int} | |
| self.envelopes = {} # code -> (point, q10, q90) | |
| self._live = None | |
| self._dirty = True | |
| # ---- setup ----------------------------------------------------------------- | |
| def setup(self, window_days=45, ctx_days=60): | |
| ctx_len = int(ctx_days * 24) | |
| play_len = int(window_days * 24) | |
| play_start = ctx_len | |
| if play_start + play_len > len(self.df): | |
| play_len = max(24, len(self.df) - play_start) | |
| self.ctx_len, self.play_len, self.play_start = ctx_len, play_len, play_start | |
| self.play_index = self.df.index[play_start:play_start + play_len] | |
| self.base_mains = self.df[self.mains].to_numpy()[play_start:play_start + play_len] | |
| self.base_sub = self.df[self.submeters].iloc[play_start:play_start + play_len].copy() | |
| self.base_residual = self.base_mains - self.base_sub.sum(axis=1).to_numpy() | |
| # per-appliance baselines from the (healthy) context window | |
| ctx = self.df.iloc[play_start - ctx_len:play_start] | |
| self.rated, self.on_thr, self.duty = {}, {}, {} | |
| for c in self.submeters: | |
| v = ctx[c].to_numpy() | |
| thr = _on_threshold(v) | |
| on = v[v > thr] | |
| self.on_thr[c] = thr | |
| self.rated[c] = float(np.median(on)) if on.size else 0.0 | |
| self.duty[c] = float(np.mean(v > thr)) if v.size else 0.0 | |
| # residual normal band: hour-of-day profile + a global robust scale (+ absolute floor) | |
| res_ctx = pd.Series(ctx[self.mains].to_numpy() - ctx[self.submeters].sum(axis=1).to_numpy(), | |
| index=ctx.index) | |
| prof = res_ctx.groupby(res_ctx.index.hour).median() | |
| resid = res_ctx.to_numpy() - res_ctx.index.hour.map(prof).to_numpy(dtype=float) | |
| sigma = 1.4826 * float(np.median(np.abs(resid - np.median(resid)))) | |
| margin = max(4.0 * sigma, 40.0) # never trust a degenerate (near-zero) band | |
| ph = pd.Series(self.play_index.hour) | |
| self.res_expected = ph.map(prof).to_numpy(dtype=float) | |
| self.res_upper = self.res_expected + margin | |
| self.focus = self._default_focus() | |
| self.injections, self.leak, self.envelopes = {}, None, {} | |
| self._dirty = True | |
| def _default_focus(self): | |
| cyc = [c for c in self.submeters if c in CYCLIC_CODES and self.rated.get(c, 0) > 0] | |
| pool = cyc if cyc else self.submeters | |
| return max(pool, key=lambda c: self.rated.get(c, 0) * max(self.duty.get(c, 0), 0.01)) | |
| def appliance_choices(self): | |
| return [(f"{AMPDS2_NAMES.get(c, c)} ({c})", c) | |
| for c in sorted(self.submeters, key=lambda c: -self.rated.get(c, 0))] | |
| # ---- envelopes ------------------------------------------------------------- | |
| def build_envelopes(self, codes, model): | |
| full = {c: self.df[c].to_numpy() for c in codes} | |
| hours = pd.Series(self.df.index.hour) | |
| for c in codes: | |
| if c in self.envelopes: | |
| continue | |
| if model is not None: | |
| self.envelopes[c] = _envelope_timesfm(full[c], self.play_start, self.play_len, model) | |
| else: | |
| self.envelopes[c] = _envelope_seasonal(full[c], self.play_start, self.play_len, hours) | |
| # ---- injections ------------------------------------------------------------ | |
| def set_injection(self, code, kind, pct, start_pct): | |
| if code is None: | |
| return | |
| if kind == "Off": | |
| factor = 0.0 | |
| elif kind == "Over-draw": | |
| factor = 1.0 + pct / 100.0 | |
| elif kind == "Under-draw": | |
| factor = max(0.0, 1.0 - pct / 100.0) | |
| else: | |
| return | |
| start = int(np.clip(start_pct / 100.0, 0, 1) * self.play_len) | |
| self.injections[code] = {"kind": kind, "factor": factor, "start": start} | |
| self._dirty = True | |
| def clear_injections(self): | |
| self.injections = {} | |
| self._dirty = True | |
| def set_leak(self, watts, start_pct): | |
| start = int(np.clip(start_pct / 100.0, 0, 1) * self.play_len) | |
| self.leak = {"W": float(watts), "start": start} | |
| self._dirty = True | |
| def clear_leak(self): | |
| self.leak = None | |
| self._dirty = True | |
| # ---- live frame (vectorised over the whole playback window) ----------------- | |
| def live(self): | |
| if self._live is not None and not self._dirty: | |
| return self._live | |
| live_sub = self.base_sub.copy() | |
| idx = np.arange(self.play_len) | |
| for c, inj in self.injections.items(): | |
| if c not in live_sub.columns: | |
| continue | |
| mask = idx >= inj["start"] | |
| col = live_sub[c].to_numpy().copy() | |
| col[mask] = col[mask] * inj["factor"] | |
| live_sub[c] = col | |
| delta = (live_sub.sum(axis=1).to_numpy() - self.base_sub.sum(axis=1).to_numpy()) | |
| leak_arr = np.zeros(self.play_len) | |
| if self.leak is not None: | |
| leak_arr[idx >= self.leak["start"]] = self.leak["W"] | |
| mains_live = self.base_mains + delta + leak_arr | |
| residual_live = mains_live - live_sub.sum(axis=1).to_numpy() # == base_residual + leak | |
| self._live = {"mains": mains_live, "sub": live_sub, "res": residual_live, "leak": leak_arr} | |
| self._dirty = False | |
| return self._live | |
| # ---- detection ------------------------------------------------------------- | |
| def detect(self, cursor, over_tol=0.15, under_tol=0.15, win_app=168, win_leak=24, min_on=3): | |
| L = self.live() | |
| lo = max(0, cursor - win_app + 1) | |
| alerts, rows = [], [] | |
| for c in self.submeters: | |
| name = AMPDS2_NAMES.get(c, c) | |
| rated = self.rated.get(c, 0.0) | |
| thr = self.on_thr.get(c, 5.0) | |
| seg = L["sub"][c].to_numpy()[lo:cursor + 1] | |
| base = self.base_sub[c].to_numpy()[lo:cursor + 1] | |
| base_duty = float(np.mean(base > thr)) if base.size else 0.0 | |
| window_len = cursor - lo + 1 | |
| on_frac_live = float(np.mean(seg > thr)) if seg.size else 0.0 | |
| on_live = seg[seg > thr] | |
| live_on = float(np.median(on_live)) if on_live.size >= min_on else 0.0 | |
| # Robust rule for ALL appliances (TimesFM band is the chart's visual evidence, not the | |
| # trigger, since a forecast band can't bracket sharp on/off loads): | |
| # * warm up before judging (need a filled window) | |
| # * OFF only for normally-busy appliances that have gone essentially absent | |
| # * otherwise compare trailing ON-power to the healthy rated value | |
| if window_len < 12 or base_duty < 0.02: | |
| status = "idle" | |
| live_w = live_on if live_on > 0 else (float(np.mean(seg)) if seg.size else 0.0) | |
| elif base_duty > 0.1 and on_frac_live < 0.15 * base_duty: | |
| status = "OFF" | |
| live_w = float(np.mean(seg)) if seg.size else 0.0 | |
| elif live_on > 0: | |
| ratio = live_on / rated if rated > 0 else 1.0 | |
| status = "OVER" if ratio > 1 + over_tol else "UNDER" if ratio < 1 - under_tol else "OK" | |
| live_w = live_on | |
| else: | |
| status = "idle" | |
| live_w = float(np.mean(seg)) if seg.size else 0.0 | |
| rows.append({"code": c, "name": name, "live_W": round(live_w), | |
| "rated_W": round(rated), "status": status}) | |
| if status in ("OVER", "UNDER", "OFF"): | |
| pct = (live_w / rated * 100) if rated > 0 else 0 | |
| if status == "OVER": | |
| msg = f"drawing {pct:.0f}% of rated ({live_w:.0f} W vs {rated:.0f} W)" | |
| elif status == "UNDER": | |
| msg = f"only {pct:.0f}% of rated ({live_w:.0f} W vs {rated:.0f} W)" | |
| else: | |
| msg = f"off / no draw while normally active (expected ~{rated:.0f} W)" | |
| alerts.append({"code": c, "name": name, "status": status, | |
| "severity": "crit" if status in ("OVER", "OFF") else "warn", | |
| "msg": msg}) | |
| # leakage | |
| res = L["res"] | |
| ll = max(0, cursor - win_leak + 1) | |
| seg = res[ll:cursor + 1] | |
| up = self.res_upper[ll:cursor + 1] | |
| leak_now = float(res[cursor]) | |
| exp = float(self.res_expected[cursor]) | |
| sustained = float(np.mean(seg > up)) if seg.size else 0.0 | |
| leak_flag = sustained > 0.6 and (leak_now - exp) > 50 | |
| if leak_flag: | |
| alerts.insert(0, {"code": "HOUSE", "name": "Whole house", | |
| "status": "LEAK", "severity": "crit", | |
| "msg": f"unaccounted load ~{leak_now - exp:.0f} W above normal " | |
| f"(residual {leak_now:.0f} W vs ~{exp:.0f} W expected)"}) | |
| leak_info = {"now": leak_now, "expected": exp, "flag": leak_flag} | |
| return alerts, rows, leak_info | |
| # ---- KPIs ------------------------------------------------------------------ | |
| def kpis(self, cursor): | |
| L = self.live() | |
| load = float(L["mains"][cursor]) | |
| energy = float(np.sum(L["mains"][:cursor + 1])) / 1000.0 # hourly W -> kWh | |
| ts = self.play_index[cursor] | |
| return {"time": ts, "load_W": load, "energy_kWh": energy, | |
| "i": cursor, "n": self.play_len} | |