Spaces:
Running
Running
| """ | |
| dashboard_server.py | |
| ==================== | |
| Iroha Financial Intelligence β gr.Server entry point. | |
| Architecture | |
| ------------ | |
| gr.Server (extends FastAPI) | |
| βββ GET / β serves frontend/index.html | |
| βββ GET /static/* β serves frontend/{style.css, app.js} (StaticFiles) | |
| β | |
| βββ @server.api run_inference β DoFlow / SCM causal query (via BACKEND_API) | |
| β | |
| βββ GET /v2/health β health-check | |
| β | |
| βββ All existing /v2/* routers from main.py are included here too | |
| (so this server is a superset of main.py). | |
| Usage | |
| ----- | |
| python dashboard_server.py | |
| Or with uvicorn: | |
| uvicorn dashboard_server:server --host 0.0.0.0 --port 7860 --reload | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import sys | |
| import json | |
| import logging | |
| import urllib.error | |
| import urllib.parse | |
| import urllib.request | |
| from pathlib import Path | |
| import gradio as gr | |
| from typing import Any, Dict, List, Optional | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| BASE_DIR = Path(__file__).parent.resolve() | |
| if str(BASE_DIR) not in sys.path: | |
| sys.path.insert(0, str(BASE_DIR)) | |
| # Also add the backend directory to sys.path so we can import 'app', 'causal', etc. | |
| BACKEND_DIR = (BASE_DIR.parent / "noisy_boy_backend").resolve() | |
| if BACKEND_DIR.exists() and str(BACKEND_DIR) not in sys.path: | |
| sys.path.insert(0, str(BACKEND_DIR)) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| # Logging | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| handlers=[logging.StreamHandler()], | |
| ) | |
| logger = logging.getLogger("dashboard-server") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| # Backend URL | |
| # ββοΏ½οΏ½βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| _BACKEND_BASE_URL: str = os.environ.get("BACKEND_API_URL", "http://localhost:7860") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| # Public API | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| def run_pipeline( | |
| ticker: str = "RELIANCE", | |
| threshold: float = 0.5, | |
| treatment: Optional[str] = None, | |
| outcome: Optional[str] = None, | |
| include_pywhyllm: bool = False, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Fetch the validated causal matrix for *ticker* from the backend API. | |
| Parameters | |
| ---------- | |
| ticker : NSE symbol (e.g. RELIANCE, HDFCBANK) | |
| threshold : adjacency threshold for DAG construction | |
| treatment : optional treatment node for pywhyllm assumptions | |
| outcome : optional outcome node for pywhyllm assumptions | |
| include_pywhyllm: request pywhyllm assumption report from backend | |
| Returns | |
| ------- | |
| dict with keys: | |
| nodes, adj_matrix, dag_adj, equations, data_level, | |
| topological_order, nodes_graph, links_graph | |
| Raises RuntimeError if the backend cannot be reached or returns an error. | |
| """ | |
| params: dict = {"threshold": threshold} | |
| if treatment: | |
| params["treatment"] = treatment | |
| if outcome: | |
| params["outcome"] = outcome | |
| if include_pywhyllm: | |
| params["include_pywhyllm"] = "true" | |
| qs = urllib.parse.urlencode(params) | |
| url = f"{_BACKEND_BASE_URL}/v2/api/singular-causal/causal-matrix/{ticker.upper()}?{qs}" | |
| logger.info("run_pipeline: fetching %s", url) | |
| try: | |
| with urllib.request.urlopen(url, timeout=60) as resp: | |
| raw = resp.read() | |
| except urllib.error.URLError as exc: | |
| raise RuntimeError( | |
| f"Could not reach backend at {_BACKEND_BASE_URL}. " | |
| f"Ensure noisy_boy_backend is running. Original error: {exc}" | |
| ) from exc | |
| payload = json.loads(raw) | |
| status = payload.get("status") | |
| if status == "not_found": | |
| raise RuntimeError( | |
| payload.get( | |
| "detail", | |
| f"No cached pipeline data for {ticker} on backend. " | |
| "Run the singular-causal pipeline on the backend first.", | |
| ) | |
| ) | |
| if status not in ("success", None, "ok"): | |
| raise RuntimeError( | |
| f"Backend returned unexpected status '{status}' for {ticker}. " | |
| f"Payload: {payload}" | |
| ) | |
| # Build frontend-friendly graph representation | |
| nodes: List[str] = payload.get("nodes", []) | |
| adj_matrix = payload.get("adj_matrix", []) | |
| dag_adj = payload.get("dag_adj", []) | |
| nodes_graph = [{"id": n, "label": n} for n in nodes] | |
| links_graph = [] | |
| for i, src in enumerate(nodes): | |
| for j, dst in enumerate(nodes): | |
| if i != j: | |
| try: | |
| score = float(adj_matrix[i][j]) | |
| except (IndexError, TypeError, ValueError): | |
| score = 0.0 | |
| if score >= threshold: | |
| links_graph.append({"source": src, "target": dst, "score": round(score, 4)}) | |
| return { | |
| **payload, | |
| "nodes_graph": nodes_graph, | |
| "links_graph": links_graph, | |
| } | |
| # βββββββββββββββββοΏ½οΏ½ββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| # Helpers | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| # URL of the noisy_boy_backend β used to fetch the validated causal matrix. | |
| # By default, point to ourselves since we now successfully mount the backend routers. | |
| # Override via BACKEND_API_URL env var if running a separate backend on 8000. | |
| _BACKEND_BASE_URL = os.environ.get("BACKEND_API_URL", "http://localhost:7860") | |
| def _fetch_causal_matrix( | |
| ticker: str, | |
| treatment: Optional[str] = None, | |
| outcome: Optional[str] = None, | |
| include_pywhyllm: bool = False, | |
| threshold: float = 0.5, | |
| ) -> Optional[dict]: | |
| """ | |
| Fetch the fully validated causal matrix from the backend API. | |
| Calls GET {BACKEND_BASE_URL}/v2/api/singular-causal/causal-matrix/{ticker} | |
| and returns the parsed JSON payload, or None on failure. | |
| The payload contains: | |
| nodes β ordered list of node names | |
| adj_matrix β raw float adjacency matrix | |
| dag_adj β thresholded 0/1 DAG | |
| equations β per-node structural equations (coefficients, intercepts, residual_std) | |
| data_level β (T, N) time-series observations used to fit the SCM | |
| topological_order β nodes in topological traversal order | |
| pywhyllm_report β (optional) assumption analysis for treatmentβoutcome | |
| """ | |
| import urllib.request | |
| import urllib.error | |
| import urllib.parse | |
| params: dict = {"threshold": threshold} | |
| if treatment: | |
| params["treatment"] = treatment | |
| if outcome: | |
| params["outcome"] = outcome | |
| if include_pywhyllm: | |
| params["include_pywhyllm"] = "true" | |
| query_string = urllib.parse.urlencode(params) | |
| url = f"{_BACKEND_BASE_URL}/v2/api/singular-causal/causal-matrix/{ticker.upper()}?{query_string}" | |
| try: | |
| with urllib.request.urlopen(url, timeout=30) as resp: | |
| raw = resp.read() | |
| data = json.loads(raw) | |
| if data.get("status") not in ("success", None): | |
| logger.warning( | |
| "_fetch_causal_matrix: backend returned status=%s for URL %s. Payload: %s", | |
| data.get("status"), url, data, | |
| ) | |
| return None | |
| return data | |
| except Exception as exc: | |
| logger.warning("_fetch_causal_matrix failed for %s: %s", ticker, exc) | |
| return None | |
| def _safe_json(obj: Any) -> Any: | |
| """Recursively make numpy types JSON-serialisable.""" | |
| try: | |
| import numpy as np | |
| if isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| if isinstance(obj, np.integer): | |
| return int(obj) | |
| if isinstance(obj, np.floating): | |
| return float(obj) | |
| except ImportError: | |
| pass | |
| if isinstance(obj, dict): | |
| return {k: _safe_json(v) for k, v in obj.items()} | |
| if isinstance(obj, (list, tuple)): | |
| return [_safe_json(v) for v in obj] | |
| return obj | |
| def _resolve_value(value: float, value_type: str, current: float) -> float: | |
| """Convert a user-supplied value + value_type to the absolute node value.""" | |
| vt = value_type.strip().lower() | |
| if vt == "absolute": | |
| return value | |
| if vt == "multiplier": | |
| return current * value | |
| if vt == "percent_change": | |
| return current * (1.0 + value / 100.0) | |
| # default: treat as absolute | |
| return value | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| # Pure-numpy inference helpers (no local causal training imports) | |
| # These functions work entirely from the payload returned by the backend API. | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| def _build_dag_from_payload(payload: dict): | |
| """ | |
| Return a numpy bool DAG adjacency matrix and list of node names | |
| from the backend causal-matrix payload. | |
| """ | |
| import numpy as np | |
| nodes = payload["nodes"] | |
| dag_adj = np.array(payload["dag_adj"], dtype=bool) | |
| adj_matrix = np.array(payload["adj_matrix"], dtype=float) | |
| return nodes, dag_adj, adj_matrix | |
| def _propagate_intervention( | |
| nodes: list, | |
| dag_adj, | |
| equations: dict, | |
| data_level, | |
| topological_order: list, | |
| treatment: str, | |
| abs_value: float, | |
| targets: list, | |
| horizon: int = 5, | |
| ): | |
| """ | |
| Propagate a hard intervention (do(treatment=abs_value)) through the | |
| structural equations for `horizon` steps, returning ATE per target node. | |
| Uses only numpy β no local causal model imports. | |
| """ | |
| import numpy as np | |
| node_to_idx = {n: i for i, n in enumerate(nodes)} | |
| n = len(nodes) | |
| T = data_level.shape[0] | |
| # Start from the last observed time step | |
| state = data_level[-1].copy().astype(float) | |
| # Fix the treatment node | |
| t_idx = node_to_idx[treatment] | |
| state[t_idx] = abs_value | |
| ate_per_target: Dict[str, float] = {} | |
| baseline = data_level[-1].copy().astype(float) | |
| for _ in range(horizon): | |
| new_state = state.copy() | |
| for node_name in topological_order: | |
| if node_name == treatment: | |
| continue | |
| eq = equations.get(node_name) | |
| if eq is None: | |
| continue | |
| parents = eq.get("parents", []) | |
| coefficients = eq.get("coefficients", {}) | |
| intercept = float(eq.get("intercept", 0.0)) | |
| if not parents: | |
| continue | |
| val = intercept | |
| for p in parents: | |
| p_idx = node_to_idx.get(p) | |
| if p_idx is not None: | |
| val += float(coefficients.get(p, 0.0)) * float(state[p_idx]) | |
| n_idx = node_to_idx[node_name] | |
| new_state[n_idx] = val | |
| state = new_state | |
| for target in targets: | |
| t_i = node_to_idx.get(target) | |
| if t_i is not None: | |
| ate_per_target[target] = float(state[t_i] - baseline[t_i]) | |
| return ate_per_target, state | |
| def _abduct_and_predict( | |
| nodes: list, | |
| dag_adj, | |
| equations: dict, | |
| data_level, | |
| topological_order: list, | |
| treatment: str, | |
| cf_value: float, | |
| target: str, | |
| observed_t: int, | |
| ): | |
| """ | |
| Simple SCM abduction for counterfactual: | |
| 1. Abduct residuals from the observed time step. | |
| 2. Re-run structural equations with treatment fixed to cf_value. | |
| 3. Return factual_outcome, cf_outcome, ITE. | |
| """ | |
| import numpy as np | |
| node_to_idx = {n: i for i, n in enumerate(nodes)} | |
| obs = data_level[observed_t].copy().astype(float) | |
| # Abduct residuals | |
| residuals: Dict[str, float] = {} | |
| for node_name in topological_order: | |
| eq = equations.get(node_name) | |
| if eq is None or not eq.get("parents"): | |
| residuals[node_name] = 0.0 | |
| continue | |
| parents = eq.get("parents", []) | |
| coefficients = eq.get("coefficients", {}) | |
| intercept = float(eq.get("intercept", 0.0)) | |
| predicted = intercept | |
| for p in parents: | |
| p_idx = node_to_idx.get(p) | |
| if p_idx is not None: | |
| predicted += float(coefficients.get(p, 0.0)) * float(obs[node_to_idx[p]]) | |
| residuals[node_name] = float(obs[node_to_idx[node_name]]) - predicted | |
| # Counterfactual: fix treatment, replay equations with abducted noise | |
| cf_state = obs.copy() | |
| cf_state[node_to_idx[treatment]] = cf_value | |
| for node_name in topological_order: | |
| if node_name == treatment: | |
| continue | |
| eq = equations.get(node_name) | |
| if eq is None or not eq.get("parents"): | |
| continue | |
| parents = eq.get("parents", []) | |
| coefficients = eq.get("coefficients", {}) | |
| intercept = float(eq.get("intercept", 0.0)) | |
| predicted = intercept | |
| for p in parents: | |
| p_idx = node_to_idx.get(p) | |
| if p_idx is not None: | |
| predicted += float(coefficients.get(p, 0.0)) * float(cf_state[p_idx]) | |
| n_idx = node_to_idx[node_name] | |
| cf_state[n_idx] = predicted + residuals.get(node_name, 0.0) | |
| factual_outcome = float(obs[node_to_idx[target]]) | |
| cf_outcome = float(cf_state[node_to_idx[target]]) | |
| ite = cf_outcome - factual_outcome | |
| return factual_outcome, cf_outcome, ite | |
| # ββ API: Causal inference (assert / intervene / counterfactual) βββββββββββ | |
| # | |
| # Architecture: | |
| # 1. Fetch the VALIDATED causal matrix from noisy_boy_backend via HTTP. | |
| # The backend has already run CUTS+ learning + pywhyllm + DoWhy validation. | |
| # 2. Use the payload data (equations, adj, data_level) for inference | |
| # using pure numpy/pandas β no local causal training imports required. | |
| # 3. Optionally consult pywhyllm guidance from the backend payload. | |
| def run_inference( | |
| ticker: str = "RELIANCE", | |
| mode: str = "assert", | |
| treatment: str = "Revenue", | |
| outcome: Optional[str] = "NetIncome", | |
| target: Optional[str] = None, | |
| value: float = 1.1, | |
| cf_value: Optional[float] = None, | |
| value_type: str = "multiplier", | |
| horizon: int = 5, | |
| observed_t: int = -1, | |
| threshold: float = 0.5, | |
| use_pywhyllm: bool = False, | |
| return_assumption_report: bool = False, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Three-layer causal inference driven by the backend's validated causal matrix. | |
| Parameters | |
| ---------- | |
| ticker : NSE ticker (backend must have a cached pipeline run for it) | |
| mode : "assert" | "intervene" | "counterfactual" | |
| treatment : source node name | |
| outcome : outcome node (assert / Layer-1 association) | |
| target : target node (counterfactual / Layer-3); if None, falls back to outcome | |
| value : intervention magnitude (Layer 2) | |
| cf_value : explicit counterfactual value (Layer 3); if None, 'value' + 'value_type' used | |
| value_type : "absolute" | "multiplier" | "percent_change" | |
| horizon : propagation horizon for intervention (Layer 2, steps) | |
| observed_t : time index for counterfactual abduction (Layer 3; -1 = last obs) | |
| threshold : adjacency threshold used when loading the graph | |
| use_pywhyllm : consult pywhyllm for structural assumptions before running DoWhy | |
| return_assumption_report : include the pywhyllm report dict in the response | |
| Returns | |
| ------- | |
| JSON with ate, ci_lower, ci_upper, probability, ripple_effects, | |
| and (for counterfactual) factual_outcome, counterfactual_outcome, ite, | |
| shapley_contributions. | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| try: | |
| # ββ 0. Determine target node ββββββββββββββββββββββββββββββββββββββββββ | |
| target_node = target if target else outcome | |
| if not target_node: | |
| return {"status": "error", "detail": "Either 'outcome' or 'target' must be provided."} | |
| # ββ 1. Fetch validated causal matrix from backend βββββββββββββββββββββ | |
| # This includes the adjacency matrix, fitted structural equations, | |
| # level-domain data, and optionally a pywhyllm assumption report. | |
| payload = _fetch_causal_matrix( | |
| ticker=ticker, | |
| treatment=treatment if use_pywhyllm else None, | |
| outcome=target_node if use_pywhyllm else None, | |
| include_pywhyllm=use_pywhyllm, | |
| threshold=threshold, | |
| ) | |
| if payload is None: | |
| return { | |
| "status": "error", | |
| "detail": ( | |
| f"Could not fetch causal matrix for {ticker} from backend. " | |
| "Ensure noisy_boy_backend is running and the pipeline has been run for this ticker." | |
| ), | |
| } | |
| if payload.get("status") == "not_found": | |
| return { | |
| "status": "error", | |
| "detail": payload.get("detail", f"No cached pipeline data for {ticker}."), | |
| } | |
| # ββ 2. Unpack payload (no local causal training imports) ββββββββββββββ | |
| nodes, dag_adj, adj_matrix = _build_dag_from_payload(payload) | |
| node_to_idx = {n: i for i, n in enumerate(nodes)} | |
| data_level = np.array(payload["data_level"], dtype=float) | |
| equations_raw = payload.get("equations", {}) | |
| topo_order = payload.get("topological_order", nodes) | |
| T = data_level.shape[0] | |
| df = pd.DataFrame(data_level, columns=nodes) | |
| if treatment not in node_to_idx: | |
| return {"status": "error", "detail": f"Unknown treatment node: {treatment}"} | |
| if target_node not in node_to_idx: | |
| return {"status": "error", "detail": f"Unknown outcome/target node: {target_node}"} | |
| if df.shape[0] < 5: | |
| return { | |
| "status": "error", | |
| "detail": f"Insufficient observations ({df.shape[0]}) to run inference.", | |
| } | |
| # ββ 3. pywhyllm structural guidance (from backend payload) ββββββββββββ | |
| pywhyllm_report: Optional[dict] = payload.get("pywhyllm_report") | |
| adjustment_sets: List[List[str]] = [] | |
| if use_pywhyllm and pywhyllm_report and pywhyllm_report.get("available"): | |
| raw_backdoor = pywhyllm_report.get("suggested_backdoor_sets") or [] | |
| valid_nodes = set(nodes) - {treatment, target_node} | |
| for suggested_set in raw_backdoor: | |
| clean = [n for n in suggested_set if n in valid_nodes] | |
| if clean and clean not in adjustment_sets: | |
| adjustment_sets.append(clean) | |
| confounders = [ | |
| n for n in (pywhyllm_report.get("suggested_confounders") or []) | |
| if n in valid_nodes | |
| ] | |
| if confounders and confounders not in adjustment_sets: | |
| adjustment_sets.append(confounders) | |
| result: Dict[str, Any] = {} | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| # LAYER 1 β Association: "What does Y look like given X?" | |
| # Uses DoWhy with the backend-provided DAG, falling back to OLS. | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| if mode == "assert": | |
| try: | |
| from dowhy import CausalModel | |
| # Build DOT graph string from dag_adj | |
| edges = [] | |
| for si, src in enumerate(nodes): | |
| for di, dst in enumerate(nodes): | |
| if dag_adj[si, di]: | |
| edges.append(f"{src} -> {dst}") | |
| graph_dot = "digraph{" + "; ".join(edges) + "}" | |
| dowhy_model = CausalModel( | |
| data=df, | |
| treatment=treatment, | |
| outcome=target_node, | |
| graph=graph_dot, | |
| ) | |
| identified_estimand = dowhy_model.identify_effect( | |
| proceed_when_unidentifiable=True | |
| ) | |
| estimate = dowhy_model.estimate_effect( | |
| identified_estimand, | |
| method_name="backdoor.linear_regression", | |
| ) | |
| ate = float(estimate.value) | |
| # Confidence interval from OLS residuals | |
| se: float = 0.0 | |
| try: | |
| import numpy.linalg as nla | |
| X = df[[c for c in df.columns if c != target_node]].values | |
| y = df[target_node].values | |
| XtX_inv = nla.pinv(X.T @ X) | |
| resid = y - X @ nla.lstsq(X, y, rcond=None)[0] | |
| sigma2 = float(np.sum(resid ** 2) / max(1, len(y) - X.shape[1])) | |
| t_idx_local = list(df.columns).index(treatment) | |
| se = float(np.sqrt(max(0.0, sigma2 * XtX_inv[t_idx_local, t_idx_local]))) | |
| except Exception: | |
| se = abs(ate) * 0.15 # graceful fallback | |
| ci_lower = ate - 1.96 * se | |
| ci_upper = ate + 1.96 * se | |
| prob = min(1.0, abs(ate) / (abs(ate) + se + 1e-9)) | |
| # Ripple effects: direct downstream neighbours of treatment | |
| ripple_effects = [] | |
| t_idx_g = node_to_idx[treatment] | |
| for j, node in enumerate(nodes): | |
| if node == treatment or node == target_node: | |
| continue | |
| if dag_adj[t_idx_g, j]: | |
| edge_score = float(adj_matrix[t_idx_g, j]) | |
| ripple_effects.append({ | |
| "ticker": node, | |
| "direction": 1 if ate > 0 else -1, | |
| "magnitude": round(edge_score * abs(ate), 4), | |
| }) | |
| result = { | |
| "ate": ate, | |
| "ci_lower": ci_lower, | |
| "ci_upper": ci_upper, | |
| "probability": prob, | |
| "strategy": "backdoor.linear_regression", | |
| "adjustment_set": adjustment_sets[0] if adjustment_sets else [], | |
| "ripple_effects": ripple_effects, | |
| } | |
| except Exception as dowhy_exc: | |
| # DoWhy not installed or identification failed β fall back to OLS | |
| logger.warning("DoWhy association failed (%s), falling back to OLS", dowhy_exc) | |
| t_idx_g = node_to_idx[treatment] | |
| out_idx = node_to_idx[target_node] | |
| # Simple OLS: regress target on treatment | |
| X = df[[treatment]].values | |
| y = df[target_node].values | |
| import numpy.linalg as nla | |
| coef = nla.lstsq(np.c_[np.ones(len(X)), X], y, rcond=None)[0] | |
| ate = float(coef[1]) | |
| se = abs(ate) * 0.15 | |
| ci_lower = ate - 1.96 * se | |
| ci_upper = ate + 1.96 * se | |
| ripple_effects = [] | |
| for j, node in enumerate(nodes): | |
| if node == treatment or node == target_node: | |
| continue | |
| if dag_adj[t_idx_g, j]: | |
| ripple_effects.append({ | |
| "ticker": node, | |
| "direction": 1 if ate > 0 else -1, | |
| "magnitude": round(float(adj_matrix[t_idx_g, j]) * abs(ate), 4), | |
| }) | |
| result = { | |
| "ate": ate, | |
| "ci_lower": ci_lower, | |
| "ci_upper": ci_upper, | |
| "probability": min(1.0, abs(ate) / (abs(ate) + se + 1e-9)), | |
| "strategy": "ols_fallback", | |
| "adjustment_set": adjustment_sets[0] if adjustment_sets else [], | |
| "ripple_effects": ripple_effects, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| # LAYER 2 β Intervention: "What will happen to Y if we do X=value?" | |
| # Propagates through structural equations from the backend payload. | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| elif mode == "intervene": | |
| current_val = float(data_level[-1, node_to_idx[treatment]]) | |
| abs_value = _resolve_value(value, value_type, current_val) | |
| # Try DoWhy for ATE estimation first | |
| ate = 0.0 | |
| method_used = "scm_propagation" | |
| try: | |
| from dowhy import CausalModel | |
| edges = [] | |
| for si, src in enumerate(nodes): | |
| for di, dst in enumerate(nodes): | |
| if dag_adj[si, di]: | |
| edges.append(f"{src} -> {dst}") | |
| graph_dot = "digraph{" + "; ".join(edges) + "}" | |
| dowhy_model = CausalModel( | |
| data=df, | |
| treatment=treatment, | |
| outcome=target_node, | |
| graph=graph_dot, | |
| ) | |
| identified_estimand = dowhy_model.identify_effect( | |
| proceed_when_unidentifiable=True | |
| ) | |
| estimate = dowhy_model.estimate_effect( | |
| identified_estimand, | |
| method_name="backdoor.linear_regression", | |
| ) | |
| ate_unit = float(estimate.value) | |
| delta = abs_value - current_val | |
| ate = ate_unit * delta | |
| method_used = "backdoor.linear_regression" | |
| except Exception as dowhy_exc: | |
| logger.warning("DoWhy intervention failed (%s), using SCM propagation", dowhy_exc) | |
| # SCM propagation for ripple effects (pure numpy, no training imports) | |
| ate_per_target, final_state = _propagate_intervention( | |
| nodes=nodes, | |
| dag_adj=dag_adj, | |
| equations=equations_raw, | |
| data_level=data_level, | |
| topological_order=topo_order, | |
| treatment=treatment, | |
| abs_value=abs_value, | |
| targets=[target_node] + [n for n in nodes if n != treatment], | |
| horizon=horizon, | |
| ) | |
| if method_used == "scm_propagation" and target_node in ate_per_target: | |
| ate = float(ate_per_target[target_node]) | |
| se = abs(ate) * 0.12 | |
| ci_lower = ate - 1.96 * se | |
| ci_upper = ate + 1.96 * se | |
| ripple_effects = [] | |
| for node, delta_val in ate_per_target.items(): | |
| if node == treatment: | |
| continue | |
| ripple_effects.append({ | |
| "ticker": node, | |
| "direction": 1 if float(delta_val) > 0 else -1, | |
| "magnitude": round(abs(float(delta_val)), 4), | |
| }) | |
| result = { | |
| "ate": ate, | |
| "ci_lower": ci_lower, | |
| "ci_upper": ci_upper, | |
| "probability": min(1.0, abs(ate) / (abs(ate) + abs(ci_upper - ci_lower) / 2 + 1e-9)), | |
| "strategy": method_used, | |
| "intervention_value": abs_value, | |
| "value_type": value_type, | |
| "horizon": horizon, | |
| "ripple_effects": ripple_effects, | |
| "adjustment_set": adjustment_sets[0] if adjustment_sets else [], | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| # LAYER 3 β Counterfactual: "What if X had been different in the past?" | |
| # Uses SCM abduction via pure numpy structural equations. | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| elif mode in ("counterfactual", "counter"): | |
| # Resolve observed timestep | |
| t = observed_t if observed_t >= 0 else (T + observed_t) | |
| t = max(0, min(T - 1, t)) | |
| # Resolve counterfactual value | |
| current_val = float(data_level[t, node_to_idx[treatment]]) | |
| if cf_value is not None: | |
| abs_cf_value = float(cf_value) | |
| else: | |
| abs_cf_value = _resolve_value(value, value_type, current_val) | |
| # Try DoWhy GCM first | |
| gcm_used = False | |
| factual_outcome = 0.0 | |
| cf_outcome_val = 0.0 | |
| ite = 0.0 | |
| try: | |
| import dowhy.gcm as gcm_module | |
| import networkx as nx | |
| causal_graph = nx.DiGraph() | |
| for si, src in enumerate(nodes): | |
| for di, dst in enumerate(nodes): | |
| if dag_adj[si, di]: | |
| causal_graph.add_edge(src, dst) | |
| for node in nodes: | |
| if node not in causal_graph.nodes: | |
| causal_graph.add_node(node) | |
| gcm_model = gcm_module.InvertibleStructuralCausalModel(causal_graph) | |
| gcm_module.auto.assign_mechanisms(gcm_model, df) | |
| gcm_module.fit(gcm_model, df) | |
| observed_data = df.iloc[[t]] | |
| cf_val_fixed = abs_cf_value | |
| cf_samples = gcm_module.counterfactual_samples( | |
| gcm_model, | |
| {treatment: lambda x, v=cf_val_fixed: np.full(x.shape, v)}, | |
| observed_data=observed_data, | |
| num_samples_to_draw=1, | |
| ) | |
| factual_outcome = float(observed_data[target_node].iloc[0]) | |
| cf_outcome_val = float(cf_samples[target_node].iloc[0]) | |
| ite = cf_outcome_val - factual_outcome | |
| gcm_used = True | |
| except Exception as gcm_exc: | |
| logger.warning("DoWhy GCM counterfactual failed (%s), using SCM abduction", gcm_exc) | |
| if not gcm_used: | |
| factual_outcome, cf_outcome_val, ite = _abduct_and_predict( | |
| nodes=nodes, | |
| dag_adj=dag_adj, | |
| equations=equations_raw, | |
| data_level=data_level, | |
| topological_order=topo_order, | |
| treatment=treatment, | |
| cf_value=abs_cf_value, | |
| target=target_node, | |
| observed_t=t, | |
| ) | |
| # Shapley: single-treatment β just use the ITE directly | |
| shapley = {treatment: ite} | |
| # SE from residual_std of the target equation (from backend payload) | |
| target_eq_data = equations_raw.get(target_node, {}) | |
| se = float(target_eq_data.get("residual_std", abs(ite) * 0.15)) | |
| ci_lower = ite - 1.96 * se | |
| ci_upper = ite + 1.96 * se | |
| result = { | |
| "ate": ite, | |
| "ite": ite, | |
| "factual_outcome": factual_outcome, | |
| "counterfactual_outcome": cf_outcome_val, | |
| "ci_lower": ci_lower, | |
| "ci_upper": ci_upper, | |
| "probability": min(1.0, abs(ite) / (abs(ite) + se + 1e-9)), | |
| "strategy": "dowhy_gcm" if gcm_used else "scm_abduction", | |
| "counterfactual_value": abs_cf_value, | |
| "value_type": value_type, | |
| "observed_t": t, | |
| "shapley_contributions": shapley, | |
| "ripple_effects": [], | |
| } | |
| else: | |
| return { | |
| "status": "error", | |
| "detail": f"Unknown mode '{mode}'. Must be one of: assert, intervene, counterfactual.", | |
| } | |
| # ββ Attach pywhyllm assumption report if requested ββββββββββββββββββββ | |
| if return_assumption_report and pywhyllm_report: | |
| result["pywhyllm_report"] = pywhyllm_report | |
| return _safe_json({"status": "ok", "ticker": ticker.upper(), "mode": mode, **result}) | |
| except Exception as exc: | |
| logger.exception("run_inference failed") | |
| return {"status": "error", "detail": str(exc)} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| # Gradio UI & Entry point | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ[...] | |
| def create_demo(): | |
| """Create and return the Gradio Blocks interface.""" | |
| with gr.Blocks(title="Iroha Causal Terminal") as demo: | |
| gr.Markdown("# Iroha Causal Terminal") | |
| gr.Markdown("Iroha Financial Intelligence β real-time causal probability matrix, HHKD decomposition, DoFlow inference and sector hierarchy over NIFTY50.") | |
| with gr.Row(): | |
| ticker = gr.Textbox(label="Ticker", value="RELIANCE") | |
| mode = gr.Dropdown(choices=["assert", "intervene", "counterfactual"], label="Mode", value="assert") | |
| treatment = gr.Textbox(label="Treatment", value="Revenue") | |
| outcome = gr.Textbox(label="Outcome", value="NetIncome") | |
| target = gr.Textbox(label="Target", value="") | |
| with gr.Row(): | |
| value = gr.Number(label="Value", value=1.1) | |
| cf_value = gr.Number(label="CF Value") | |
| value_type = gr.Dropdown(choices=["absolute", "multiplier", "percent_change"], label="Value Type", value="multiplier") | |
| horizon = gr.Number(label="Horizon", value=5, precision=0) | |
| observed_t = gr.Number(label="Observed T", value=-1, precision=0) | |
| threshold = gr.Number(label="Threshold", value=0.5) | |
| with gr.Row(): | |
| use_pywhyllm = gr.Checkbox(label="Use PyWhyLLM", value=False) | |
| return_assumption_report = gr.Checkbox(label="Return Assumption Report", value=False) | |
| btn = gr.Button("Run Inference") | |
| out = gr.JSON(label="Result") | |
| btn.click( | |
| fn=run_inference, | |
| inputs=[ | |
| ticker, mode, treatment, outcome, target, value, cf_value, value_type, | |
| horizon, observed_t, threshold, use_pywhyllm, return_assumption_report | |
| ], | |
| outputs=out, | |
| api_name="run_inference" | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("GRADIO_SERVER_PORT", os.environ.get("PORT", "7860"))) | |
| host = os.environ.get("GRADIO_SERVER_NAME", "0.0.0.0") | |
| logger.info(f"Starting Iroha Causal Terminal on {host}:{port}") | |
| demo = create_demo() | |
| demo.launch( | |
| server_name=host, | |
| server_port=port, | |
| show_error=True, | |
| ) | |