""" dashboard_server.py ==================== Iroha Financial Intelligence — gr.Server entry point. Architecture ------------ gr.Server (extends FastAPI) ├── GET / → serves frontend/index.html ├── GET /static/* → serves frontend/{style.css, app.js} (StaticFiles) │ ├── @server.api run_inference → DoFlow / SCM causal query (via BACKEND_API) │ ├── GET /v2/health → health-check │ └── All existing /v2/* routers from main.py are included here too (so this server is a superset of main.py). Usage ----- python dashboard_server.py Or with uvicorn: uvicorn dashboard_server:server --host 0.0.0.0 --port 7860 --reload """ from __future__ import annotations import os import sys import json import logging import urllib.error import urllib.parse import urllib.request from pathlib import Path import gradio as gr from typing import Any, Dict, List, Optional from dotenv import load_dotenv load_dotenv() BASE_DIR = Path(__file__).parent.resolve() if str(BASE_DIR) not in sys.path: sys.path.insert(0, str(BASE_DIR)) # Also add the backend directory to sys.path so we can import 'app', 'causal', etc. BACKEND_DIR = (BASE_DIR.parent / "noisy_boy_backend").resolve() if BACKEND_DIR.exists() and str(BACKEND_DIR) not in sys.path: sys.path.insert(0, str(BACKEND_DIR)) # ──────────────────────────────────────────────────────────────────[...] # Logging # ──────────────────────────────────────────────────────────────────[...] logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[logging.StreamHandler()], ) logger = logging.getLogger("dashboard-server") # ──────────────────────────────────────────────────────────────────[...] # Backend URL # ──��───────────────────────────────────────────────────────────────[...] _BACKEND_BASE_URL: str = os.environ.get("BACKEND_API_URL", "http://localhost:7860") # ──────────────────────────────────────────────────────────────────[...] # Public API # ──────────────────────────────────────────────────────────────────[...] def run_pipeline( ticker: str = "RELIANCE", threshold: float = 0.5, treatment: Optional[str] = None, outcome: Optional[str] = None, include_pywhyllm: bool = False, ) -> Dict[str, Any]: """ Fetch the validated causal matrix for *ticker* from the backend API. Parameters ---------- ticker : NSE symbol (e.g. RELIANCE, HDFCBANK) threshold : adjacency threshold for DAG construction treatment : optional treatment node for pywhyllm assumptions outcome : optional outcome node for pywhyllm assumptions include_pywhyllm: request pywhyllm assumption report from backend Returns ------- dict with keys: nodes, adj_matrix, dag_adj, equations, data_level, topological_order, nodes_graph, links_graph Raises RuntimeError if the backend cannot be reached or returns an error. """ params: dict = {"threshold": threshold} if treatment: params["treatment"] = treatment if outcome: params["outcome"] = outcome if include_pywhyllm: params["include_pywhyllm"] = "true" qs = urllib.parse.urlencode(params) url = f"{_BACKEND_BASE_URL}/v2/api/singular-causal/causal-matrix/{ticker.upper()}?{qs}" logger.info("run_pipeline: fetching %s", url) try: with urllib.request.urlopen(url, timeout=60) as resp: raw = resp.read() except urllib.error.URLError as exc: raise RuntimeError( f"Could not reach backend at {_BACKEND_BASE_URL}. " f"Ensure noisy_boy_backend is running. Original error: {exc}" ) from exc payload = json.loads(raw) status = payload.get("status") if status == "not_found": raise RuntimeError( payload.get( "detail", f"No cached pipeline data for {ticker} on backend. " "Run the singular-causal pipeline on the backend first.", ) ) if status not in ("success", None, "ok"): raise RuntimeError( f"Backend returned unexpected status '{status}' for {ticker}. " f"Payload: {payload}" ) # Build frontend-friendly graph representation nodes: List[str] = payload.get("nodes", []) adj_matrix = payload.get("adj_matrix", []) dag_adj = payload.get("dag_adj", []) nodes_graph = [{"id": n, "label": n} for n in nodes] links_graph = [] for i, src in enumerate(nodes): for j, dst in enumerate(nodes): if i != j: try: score = float(adj_matrix[i][j]) except (IndexError, TypeError, ValueError): score = 0.0 if score >= threshold: links_graph.append({"source": src, "target": dst, "score": round(score, 4)}) return { **payload, "nodes_graph": nodes_graph, "links_graph": links_graph, } # ─────────────────��────────────────────────────────────────────────[...] # Helpers # ──────────────────────────────────────────────────────────────────[...] # URL of the noisy_boy_backend — used to fetch the validated causal matrix. # By default, point to ourselves since we now successfully mount the backend routers. # Override via BACKEND_API_URL env var if running a separate backend on 8000. _BACKEND_BASE_URL = os.environ.get("BACKEND_API_URL", "http://localhost:7860") def _fetch_causal_matrix( ticker: str, treatment: Optional[str] = None, outcome: Optional[str] = None, include_pywhyllm: bool = False, threshold: float = 0.5, ) -> Optional[dict]: """ Fetch the fully validated causal matrix from the backend API. Calls GET {BACKEND_BASE_URL}/v2/api/singular-causal/causal-matrix/{ticker} and returns the parsed JSON payload, or None on failure. The payload contains: nodes — ordered list of node names adj_matrix — raw float adjacency matrix dag_adj — thresholded 0/1 DAG equations — per-node structural equations (coefficients, intercepts, residual_std) data_level — (T, N) time-series observations used to fit the SCM topological_order — nodes in topological traversal order pywhyllm_report — (optional) assumption analysis for treatment→outcome """ import urllib.request import urllib.error import urllib.parse params: dict = {"threshold": threshold} if treatment: params["treatment"] = treatment if outcome: params["outcome"] = outcome if include_pywhyllm: params["include_pywhyllm"] = "true" query_string = urllib.parse.urlencode(params) url = f"{_BACKEND_BASE_URL}/v2/api/singular-causal/causal-matrix/{ticker.upper()}?{query_string}" try: with urllib.request.urlopen(url, timeout=30) as resp: raw = resp.read() data = json.loads(raw) if data.get("status") not in ("success", None): logger.warning( "_fetch_causal_matrix: backend returned status=%s for URL %s. Payload: %s", data.get("status"), url, data, ) return None return data except Exception as exc: logger.warning("_fetch_causal_matrix failed for %s: %s", ticker, exc) return None def _safe_json(obj: Any) -> Any: """Recursively make numpy types JSON-serialisable.""" try: import numpy as np if isinstance(obj, np.ndarray): return obj.tolist() if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) except ImportError: pass if isinstance(obj, dict): return {k: _safe_json(v) for k, v in obj.items()} if isinstance(obj, (list, tuple)): return [_safe_json(v) for v in obj] return obj def _resolve_value(value: float, value_type: str, current: float) -> float: """Convert a user-supplied value + value_type to the absolute node value.""" vt = value_type.strip().lower() if vt == "absolute": return value if vt == "multiplier": return current * value if vt == "percent_change": return current * (1.0 + value / 100.0) # default: treat as absolute return value # ──────────────────────────────────────────────────────────────────[...] # Pure-numpy inference helpers (no local causal training imports) # These functions work entirely from the payload returned by the backend API. # ──────────────────────────────────────────────────────────────────[...] def _build_dag_from_payload(payload: dict): """ Return a numpy bool DAG adjacency matrix and list of node names from the backend causal-matrix payload. """ import numpy as np nodes = payload["nodes"] dag_adj = np.array(payload["dag_adj"], dtype=bool) adj_matrix = np.array(payload["adj_matrix"], dtype=float) return nodes, dag_adj, adj_matrix def _propagate_intervention( nodes: list, dag_adj, equations: dict, data_level, topological_order: list, treatment: str, abs_value: float, targets: list, horizon: int = 5, ): """ Propagate a hard intervention (do(treatment=abs_value)) through the structural equations for `horizon` steps, returning ATE per target node. Uses only numpy — no local causal model imports. """ import numpy as np node_to_idx = {n: i for i, n in enumerate(nodes)} n = len(nodes) T = data_level.shape[0] # Start from the last observed time step state = data_level[-1].copy().astype(float) # Fix the treatment node t_idx = node_to_idx[treatment] state[t_idx] = abs_value ate_per_target: Dict[str, float] = {} baseline = data_level[-1].copy().astype(float) for _ in range(horizon): new_state = state.copy() for node_name in topological_order: if node_name == treatment: continue eq = equations.get(node_name) if eq is None: continue parents = eq.get("parents", []) coefficients = eq.get("coefficients", {}) intercept = float(eq.get("intercept", 0.0)) if not parents: continue val = intercept for p in parents: p_idx = node_to_idx.get(p) if p_idx is not None: val += float(coefficients.get(p, 0.0)) * float(state[p_idx]) n_idx = node_to_idx[node_name] new_state[n_idx] = val state = new_state for target in targets: t_i = node_to_idx.get(target) if t_i is not None: ate_per_target[target] = float(state[t_i] - baseline[t_i]) return ate_per_target, state def _abduct_and_predict( nodes: list, dag_adj, equations: dict, data_level, topological_order: list, treatment: str, cf_value: float, target: str, observed_t: int, ): """ Simple SCM abduction for counterfactual: 1. Abduct residuals from the observed time step. 2. Re-run structural equations with treatment fixed to cf_value. 3. Return factual_outcome, cf_outcome, ITE. """ import numpy as np node_to_idx = {n: i for i, n in enumerate(nodes)} obs = data_level[observed_t].copy().astype(float) # Abduct residuals residuals: Dict[str, float] = {} for node_name in topological_order: eq = equations.get(node_name) if eq is None or not eq.get("parents"): residuals[node_name] = 0.0 continue parents = eq.get("parents", []) coefficients = eq.get("coefficients", {}) intercept = float(eq.get("intercept", 0.0)) predicted = intercept for p in parents: p_idx = node_to_idx.get(p) if p_idx is not None: predicted += float(coefficients.get(p, 0.0)) * float(obs[node_to_idx[p]]) residuals[node_name] = float(obs[node_to_idx[node_name]]) - predicted # Counterfactual: fix treatment, replay equations with abducted noise cf_state = obs.copy() cf_state[node_to_idx[treatment]] = cf_value for node_name in topological_order: if node_name == treatment: continue eq = equations.get(node_name) if eq is None or not eq.get("parents"): continue parents = eq.get("parents", []) coefficients = eq.get("coefficients", {}) intercept = float(eq.get("intercept", 0.0)) predicted = intercept for p in parents: p_idx = node_to_idx.get(p) if p_idx is not None: predicted += float(coefficients.get(p, 0.0)) * float(cf_state[p_idx]) n_idx = node_to_idx[node_name] cf_state[n_idx] = predicted + residuals.get(node_name, 0.0) factual_outcome = float(obs[node_to_idx[target]]) cf_outcome = float(cf_state[node_to_idx[target]]) ite = cf_outcome - factual_outcome return factual_outcome, cf_outcome, ite # ── API: Causal inference (assert / intervene / counterfactual) ─────────── # # Architecture: # 1. Fetch the VALIDATED causal matrix from noisy_boy_backend via HTTP. # The backend has already run CUTS+ learning + pywhyllm + DoWhy validation. # 2. Use the payload data (equations, adj, data_level) for inference # using pure numpy/pandas — no local causal training imports required. # 3. Optionally consult pywhyllm guidance from the backend payload. def run_inference( ticker: str = "RELIANCE", mode: str = "assert", treatment: str = "Revenue", outcome: Optional[str] = "NetIncome", target: Optional[str] = None, value: float = 1.1, cf_value: Optional[float] = None, value_type: str = "multiplier", horizon: int = 5, observed_t: int = -1, threshold: float = 0.5, use_pywhyllm: bool = False, return_assumption_report: bool = False, ) -> Dict[str, Any]: """ Three-layer causal inference driven by the backend's validated causal matrix. Parameters ---------- ticker : NSE ticker (backend must have a cached pipeline run for it) mode : "assert" | "intervene" | "counterfactual" treatment : source node name outcome : outcome node (assert / Layer-1 association) target : target node (counterfactual / Layer-3); if None, falls back to outcome value : intervention magnitude (Layer 2) cf_value : explicit counterfactual value (Layer 3); if None, 'value' + 'value_type' used value_type : "absolute" | "multiplier" | "percent_change" horizon : propagation horizon for intervention (Layer 2, steps) observed_t : time index for counterfactual abduction (Layer 3; -1 = last obs) threshold : adjacency threshold used when loading the graph use_pywhyllm : consult pywhyllm for structural assumptions before running DoWhy return_assumption_report : include the pywhyllm report dict in the response Returns ------- JSON with ate, ci_lower, ci_upper, probability, ripple_effects, and (for counterfactual) factual_outcome, counterfactual_outcome, ite, shapley_contributions. """ import numpy as np import pandas as pd try: # ── 0. Determine target node ────────────────────────────────────────── target_node = target if target else outcome if not target_node: return {"status": "error", "detail": "Either 'outcome' or 'target' must be provided."} # ── 1. Fetch validated causal matrix from backend ───────────────────── # This includes the adjacency matrix, fitted structural equations, # level-domain data, and optionally a pywhyllm assumption report. payload = _fetch_causal_matrix( ticker=ticker, treatment=treatment if use_pywhyllm else None, outcome=target_node if use_pywhyllm else None, include_pywhyllm=use_pywhyllm, threshold=threshold, ) if payload is None: return { "status": "error", "detail": ( f"Could not fetch causal matrix for {ticker} from backend. " "Ensure noisy_boy_backend is running and the pipeline has been run for this ticker." ), } if payload.get("status") == "not_found": return { "status": "error", "detail": payload.get("detail", f"No cached pipeline data for {ticker}."), } # ── 2. Unpack payload (no local causal training imports) ────────────── nodes, dag_adj, adj_matrix = _build_dag_from_payload(payload) node_to_idx = {n: i for i, n in enumerate(nodes)} data_level = np.array(payload["data_level"], dtype=float) equations_raw = payload.get("equations", {}) topo_order = payload.get("topological_order", nodes) T = data_level.shape[0] df = pd.DataFrame(data_level, columns=nodes) if treatment not in node_to_idx: return {"status": "error", "detail": f"Unknown treatment node: {treatment}"} if target_node not in node_to_idx: return {"status": "error", "detail": f"Unknown outcome/target node: {target_node}"} if df.shape[0] < 5: return { "status": "error", "detail": f"Insufficient observations ({df.shape[0]}) to run inference.", } # ── 3. pywhyllm structural guidance (from backend payload) ──────────── pywhyllm_report: Optional[dict] = payload.get("pywhyllm_report") adjustment_sets: List[List[str]] = [] if use_pywhyllm and pywhyllm_report and pywhyllm_report.get("available"): raw_backdoor = pywhyllm_report.get("suggested_backdoor_sets") or [] valid_nodes = set(nodes) - {treatment, target_node} for suggested_set in raw_backdoor: clean = [n for n in suggested_set if n in valid_nodes] if clean and clean not in adjustment_sets: adjustment_sets.append(clean) confounders = [ n for n in (pywhyllm_report.get("suggested_confounders") or []) if n in valid_nodes ] if confounders and confounders not in adjustment_sets: adjustment_sets.append(confounders) result: Dict[str, Any] = {} # ═══════════════════════════════════════════════════════════════[...] # LAYER 1 — Association: "What does Y look like given X?" # Uses DoWhy with the backend-provided DAG, falling back to OLS. # ═══════════════════════════════════════════════════════════════[...] if mode == "assert": try: from dowhy import CausalModel # Build DOT graph string from dag_adj edges = [] for si, src in enumerate(nodes): for di, dst in enumerate(nodes): if dag_adj[si, di]: edges.append(f"{src} -> {dst}") graph_dot = "digraph{" + "; ".join(edges) + "}" dowhy_model = CausalModel( data=df, treatment=treatment, outcome=target_node, graph=graph_dot, ) identified_estimand = dowhy_model.identify_effect( proceed_when_unidentifiable=True ) estimate = dowhy_model.estimate_effect( identified_estimand, method_name="backdoor.linear_regression", ) ate = float(estimate.value) # Confidence interval from OLS residuals se: float = 0.0 try: import numpy.linalg as nla X = df[[c for c in df.columns if c != target_node]].values y = df[target_node].values XtX_inv = nla.pinv(X.T @ X) resid = y - X @ nla.lstsq(X, y, rcond=None)[0] sigma2 = float(np.sum(resid ** 2) / max(1, len(y) - X.shape[1])) t_idx_local = list(df.columns).index(treatment) se = float(np.sqrt(max(0.0, sigma2 * XtX_inv[t_idx_local, t_idx_local]))) except Exception: se = abs(ate) * 0.15 # graceful fallback ci_lower = ate - 1.96 * se ci_upper = ate + 1.96 * se prob = min(1.0, abs(ate) / (abs(ate) + se + 1e-9)) # Ripple effects: direct downstream neighbours of treatment ripple_effects = [] t_idx_g = node_to_idx[treatment] for j, node in enumerate(nodes): if node == treatment or node == target_node: continue if dag_adj[t_idx_g, j]: edge_score = float(adj_matrix[t_idx_g, j]) ripple_effects.append({ "ticker": node, "direction": 1 if ate > 0 else -1, "magnitude": round(edge_score * abs(ate), 4), }) result = { "ate": ate, "ci_lower": ci_lower, "ci_upper": ci_upper, "probability": prob, "strategy": "backdoor.linear_regression", "adjustment_set": adjustment_sets[0] if adjustment_sets else [], "ripple_effects": ripple_effects, } except Exception as dowhy_exc: # DoWhy not installed or identification failed — fall back to OLS logger.warning("DoWhy association failed (%s), falling back to OLS", dowhy_exc) t_idx_g = node_to_idx[treatment] out_idx = node_to_idx[target_node] # Simple OLS: regress target on treatment X = df[[treatment]].values y = df[target_node].values import numpy.linalg as nla coef = nla.lstsq(np.c_[np.ones(len(X)), X], y, rcond=None)[0] ate = float(coef[1]) se = abs(ate) * 0.15 ci_lower = ate - 1.96 * se ci_upper = ate + 1.96 * se ripple_effects = [] for j, node in enumerate(nodes): if node == treatment or node == target_node: continue if dag_adj[t_idx_g, j]: ripple_effects.append({ "ticker": node, "direction": 1 if ate > 0 else -1, "magnitude": round(float(adj_matrix[t_idx_g, j]) * abs(ate), 4), }) result = { "ate": ate, "ci_lower": ci_lower, "ci_upper": ci_upper, "probability": min(1.0, abs(ate) / (abs(ate) + se + 1e-9)), "strategy": "ols_fallback", "adjustment_set": adjustment_sets[0] if adjustment_sets else [], "ripple_effects": ripple_effects, } # ═══════════════════════════════════════════════════════════════[...] # LAYER 2 — Intervention: "What will happen to Y if we do X=value?" # Propagates through structural equations from the backend payload. # ═══════════════════════════════════════════════════════════════[...] elif mode == "intervene": current_val = float(data_level[-1, node_to_idx[treatment]]) abs_value = _resolve_value(value, value_type, current_val) # Try DoWhy for ATE estimation first ate = 0.0 method_used = "scm_propagation" try: from dowhy import CausalModel edges = [] for si, src in enumerate(nodes): for di, dst in enumerate(nodes): if dag_adj[si, di]: edges.append(f"{src} -> {dst}") graph_dot = "digraph{" + "; ".join(edges) + "}" dowhy_model = CausalModel( data=df, treatment=treatment, outcome=target_node, graph=graph_dot, ) identified_estimand = dowhy_model.identify_effect( proceed_when_unidentifiable=True ) estimate = dowhy_model.estimate_effect( identified_estimand, method_name="backdoor.linear_regression", ) ate_unit = float(estimate.value) delta = abs_value - current_val ate = ate_unit * delta method_used = "backdoor.linear_regression" except Exception as dowhy_exc: logger.warning("DoWhy intervention failed (%s), using SCM propagation", dowhy_exc) # SCM propagation for ripple effects (pure numpy, no training imports) ate_per_target, final_state = _propagate_intervention( nodes=nodes, dag_adj=dag_adj, equations=equations_raw, data_level=data_level, topological_order=topo_order, treatment=treatment, abs_value=abs_value, targets=[target_node] + [n for n in nodes if n != treatment], horizon=horizon, ) if method_used == "scm_propagation" and target_node in ate_per_target: ate = float(ate_per_target[target_node]) se = abs(ate) * 0.12 ci_lower = ate - 1.96 * se ci_upper = ate + 1.96 * se ripple_effects = [] for node, delta_val in ate_per_target.items(): if node == treatment: continue ripple_effects.append({ "ticker": node, "direction": 1 if float(delta_val) > 0 else -1, "magnitude": round(abs(float(delta_val)), 4), }) result = { "ate": ate, "ci_lower": ci_lower, "ci_upper": ci_upper, "probability": min(1.0, abs(ate) / (abs(ate) + abs(ci_upper - ci_lower) / 2 + 1e-9)), "strategy": method_used, "intervention_value": abs_value, "value_type": value_type, "horizon": horizon, "ripple_effects": ripple_effects, "adjustment_set": adjustment_sets[0] if adjustment_sets else [], } # ═══════════════════════════════════════════════════════════════[...] # LAYER 3 — Counterfactual: "What if X had been different in the past?" # Uses SCM abduction via pure numpy structural equations. # ═══════════════════════════════════════════════════════════════[...] elif mode in ("counterfactual", "counter"): # Resolve observed timestep t = observed_t if observed_t >= 0 else (T + observed_t) t = max(0, min(T - 1, t)) # Resolve counterfactual value current_val = float(data_level[t, node_to_idx[treatment]]) if cf_value is not None: abs_cf_value = float(cf_value) else: abs_cf_value = _resolve_value(value, value_type, current_val) # Try DoWhy GCM first gcm_used = False factual_outcome = 0.0 cf_outcome_val = 0.0 ite = 0.0 try: import dowhy.gcm as gcm_module import networkx as nx causal_graph = nx.DiGraph() for si, src in enumerate(nodes): for di, dst in enumerate(nodes): if dag_adj[si, di]: causal_graph.add_edge(src, dst) for node in nodes: if node not in causal_graph.nodes: causal_graph.add_node(node) gcm_model = gcm_module.InvertibleStructuralCausalModel(causal_graph) gcm_module.auto.assign_mechanisms(gcm_model, df) gcm_module.fit(gcm_model, df) observed_data = df.iloc[[t]] cf_val_fixed = abs_cf_value cf_samples = gcm_module.counterfactual_samples( gcm_model, {treatment: lambda x, v=cf_val_fixed: np.full(x.shape, v)}, observed_data=observed_data, num_samples_to_draw=1, ) factual_outcome = float(observed_data[target_node].iloc[0]) cf_outcome_val = float(cf_samples[target_node].iloc[0]) ite = cf_outcome_val - factual_outcome gcm_used = True except Exception as gcm_exc: logger.warning("DoWhy GCM counterfactual failed (%s), using SCM abduction", gcm_exc) if not gcm_used: factual_outcome, cf_outcome_val, ite = _abduct_and_predict( nodes=nodes, dag_adj=dag_adj, equations=equations_raw, data_level=data_level, topological_order=topo_order, treatment=treatment, cf_value=abs_cf_value, target=target_node, observed_t=t, ) # Shapley: single-treatment — just use the ITE directly shapley = {treatment: ite} # SE from residual_std of the target equation (from backend payload) target_eq_data = equations_raw.get(target_node, {}) se = float(target_eq_data.get("residual_std", abs(ite) * 0.15)) ci_lower = ite - 1.96 * se ci_upper = ite + 1.96 * se result = { "ate": ite, "ite": ite, "factual_outcome": factual_outcome, "counterfactual_outcome": cf_outcome_val, "ci_lower": ci_lower, "ci_upper": ci_upper, "probability": min(1.0, abs(ite) / (abs(ite) + se + 1e-9)), "strategy": "dowhy_gcm" if gcm_used else "scm_abduction", "counterfactual_value": abs_cf_value, "value_type": value_type, "observed_t": t, "shapley_contributions": shapley, "ripple_effects": [], } else: return { "status": "error", "detail": f"Unknown mode '{mode}'. Must be one of: assert, intervene, counterfactual.", } # ── Attach pywhyllm assumption report if requested ──────────────────── if return_assumption_report and pywhyllm_report: result["pywhyllm_report"] = pywhyllm_report return _safe_json({"status": "ok", "ticker": ticker.upper(), "mode": mode, **result}) except Exception as exc: logger.exception("run_inference failed") return {"status": "error", "detail": str(exc)} # ──────────────────────────────────────────────────────────────────[...] # Gradio UI & Entry point # ──────────────────────────────────────────────────────────────────[...] def create_demo(): """Create and return the Gradio Blocks interface.""" with gr.Blocks(title="Iroha Causal Terminal") as demo: gr.Markdown("# Iroha Causal Terminal") gr.Markdown("Iroha Financial Intelligence — real-time causal probability matrix, HHKD decomposition, DoFlow inference and sector hierarchy over NIFTY50.") with gr.Row(): ticker = gr.Textbox(label="Ticker", value="RELIANCE") mode = gr.Dropdown(choices=["assert", "intervene", "counterfactual"], label="Mode", value="assert") treatment = gr.Textbox(label="Treatment", value="Revenue") outcome = gr.Textbox(label="Outcome", value="NetIncome") target = gr.Textbox(label="Target", value="") with gr.Row(): value = gr.Number(label="Value", value=1.1) cf_value = gr.Number(label="CF Value") value_type = gr.Dropdown(choices=["absolute", "multiplier", "percent_change"], label="Value Type", value="multiplier") horizon = gr.Number(label="Horizon", value=5, precision=0) observed_t = gr.Number(label="Observed T", value=-1, precision=0) threshold = gr.Number(label="Threshold", value=0.5) with gr.Row(): use_pywhyllm = gr.Checkbox(label="Use PyWhyLLM", value=False) return_assumption_report = gr.Checkbox(label="Return Assumption Report", value=False) btn = gr.Button("Run Inference") out = gr.JSON(label="Result") btn.click( fn=run_inference, inputs=[ ticker, mode, treatment, outcome, target, value, cf_value, value_type, horizon, observed_t, threshold, use_pywhyllm, return_assumption_report ], outputs=out, api_name="run_inference" ) return demo if __name__ == "__main__": port = int(os.environ.get("GRADIO_SERVER_PORT", os.environ.get("PORT", "7860"))) host = os.environ.get("GRADIO_SERVER_NAME", "0.0.0.0") logger.info(f"Starting Iroha Causal Terminal on {host}:{port}") demo = create_demo() demo.launch( server_name=host, server_port=port, show_error=True, )