Noisy_Boy_Inference / server.py
Paul Babu Kadali
removec hfoauth
aed24ea
Raw
History Blame Contribute Delete
36.8 kB
"""
dashboard_server.py
====================
Iroha Financial Intelligence β€” gr.Server entry point.
Architecture
------------
gr.Server (extends FastAPI)
β”œβ”€β”€ GET / β†’ serves frontend/index.html
β”œβ”€β”€ GET /static/* β†’ serves frontend/{style.css, app.js} (StaticFiles)
β”‚
β”œβ”€β”€ @server.api run_inference β†’ DoFlow / SCM causal query (via BACKEND_API)
β”‚
β”œβ”€β”€ GET /v2/health β†’ health-check
β”‚
└── All existing /v2/* routers from main.py are included here too
(so this server is a superset of main.py).
Usage
-----
python dashboard_server.py
Or with uvicorn:
uvicorn dashboard_server:server --host 0.0.0.0 --port 7860 --reload
"""
from __future__ import annotations
import os
import sys
import json
import logging
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
import gradio as gr
from typing import Any, Dict, List, Optional
from dotenv import load_dotenv
load_dotenv()
BASE_DIR = Path(__file__).parent.resolve()
if str(BASE_DIR) not in sys.path:
sys.path.insert(0, str(BASE_DIR))
# Also add the backend directory to sys.path so we can import 'app', 'causal', etc.
BACKEND_DIR = (BASE_DIR.parent / "noisy_boy_backend").resolve()
if BACKEND_DIR.exists() and str(BACKEND_DIR) not in sys.path:
sys.path.insert(0, str(BACKEND_DIR))
# ──────────────────────────────────────────────────────────────────[...]
# Logging
# ──────────────────────────────────────────────────────────────────[...]
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler()],
)
logger = logging.getLogger("dashboard-server")
# ──────────────────────────────────────────────────────────────────[...]
# Backend URL
# ──��───────────────────────────────────────────────────────────────[...]
_BACKEND_BASE_URL: str = os.environ.get("BACKEND_API_URL", "http://localhost:7860")
# ──────────────────────────────────────────────────────────────────[...]
# Public API
# ──────────────────────────────────────────────────────────────────[...]
def run_pipeline(
ticker: str = "RELIANCE",
threshold: float = 0.5,
treatment: Optional[str] = None,
outcome: Optional[str] = None,
include_pywhyllm: bool = False,
) -> Dict[str, Any]:
"""
Fetch the validated causal matrix for *ticker* from the backend API.
Parameters
----------
ticker : NSE symbol (e.g. RELIANCE, HDFCBANK)
threshold : adjacency threshold for DAG construction
treatment : optional treatment node for pywhyllm assumptions
outcome : optional outcome node for pywhyllm assumptions
include_pywhyllm: request pywhyllm assumption report from backend
Returns
-------
dict with keys:
nodes, adj_matrix, dag_adj, equations, data_level,
topological_order, nodes_graph, links_graph
Raises RuntimeError if the backend cannot be reached or returns an error.
"""
params: dict = {"threshold": threshold}
if treatment:
params["treatment"] = treatment
if outcome:
params["outcome"] = outcome
if include_pywhyllm:
params["include_pywhyllm"] = "true"
qs = urllib.parse.urlencode(params)
url = f"{_BACKEND_BASE_URL}/v2/api/singular-causal/causal-matrix/{ticker.upper()}?{qs}"
logger.info("run_pipeline: fetching %s", url)
try:
with urllib.request.urlopen(url, timeout=60) as resp:
raw = resp.read()
except urllib.error.URLError as exc:
raise RuntimeError(
f"Could not reach backend at {_BACKEND_BASE_URL}. "
f"Ensure noisy_boy_backend is running. Original error: {exc}"
) from exc
payload = json.loads(raw)
status = payload.get("status")
if status == "not_found":
raise RuntimeError(
payload.get(
"detail",
f"No cached pipeline data for {ticker} on backend. "
"Run the singular-causal pipeline on the backend first.",
)
)
if status not in ("success", None, "ok"):
raise RuntimeError(
f"Backend returned unexpected status '{status}' for {ticker}. "
f"Payload: {payload}"
)
# Build frontend-friendly graph representation
nodes: List[str] = payload.get("nodes", [])
adj_matrix = payload.get("adj_matrix", [])
dag_adj = payload.get("dag_adj", [])
nodes_graph = [{"id": n, "label": n} for n in nodes]
links_graph = []
for i, src in enumerate(nodes):
for j, dst in enumerate(nodes):
if i != j:
try:
score = float(adj_matrix[i][j])
except (IndexError, TypeError, ValueError):
score = 0.0
if score >= threshold:
links_graph.append({"source": src, "target": dst, "score": round(score, 4)})
return {
**payload,
"nodes_graph": nodes_graph,
"links_graph": links_graph,
}
# ─────────────────��────────────────────────────────────────────────[...]
# Helpers
# ──────────────────────────────────────────────────────────────────[...]
# URL of the noisy_boy_backend β€” used to fetch the validated causal matrix.
# By default, point to ourselves since we now successfully mount the backend routers.
# Override via BACKEND_API_URL env var if running a separate backend on 8000.
_BACKEND_BASE_URL = os.environ.get("BACKEND_API_URL", "http://localhost:7860")
def _fetch_causal_matrix(
ticker: str,
treatment: Optional[str] = None,
outcome: Optional[str] = None,
include_pywhyllm: bool = False,
threshold: float = 0.5,
) -> Optional[dict]:
"""
Fetch the fully validated causal matrix from the backend API.
Calls GET {BACKEND_BASE_URL}/v2/api/singular-causal/causal-matrix/{ticker}
and returns the parsed JSON payload, or None on failure.
The payload contains:
nodes β€” ordered list of node names
adj_matrix β€” raw float adjacency matrix
dag_adj β€” thresholded 0/1 DAG
equations β€” per-node structural equations (coefficients, intercepts, residual_std)
data_level β€” (T, N) time-series observations used to fit the SCM
topological_order β€” nodes in topological traversal order
pywhyllm_report — (optional) assumption analysis for treatment→outcome
"""
import urllib.request
import urllib.error
import urllib.parse
params: dict = {"threshold": threshold}
if treatment:
params["treatment"] = treatment
if outcome:
params["outcome"] = outcome
if include_pywhyllm:
params["include_pywhyllm"] = "true"
query_string = urllib.parse.urlencode(params)
url = f"{_BACKEND_BASE_URL}/v2/api/singular-causal/causal-matrix/{ticker.upper()}?{query_string}"
try:
with urllib.request.urlopen(url, timeout=30) as resp:
raw = resp.read()
data = json.loads(raw)
if data.get("status") not in ("success", None):
logger.warning(
"_fetch_causal_matrix: backend returned status=%s for URL %s. Payload: %s",
data.get("status"), url, data,
)
return None
return data
except Exception as exc:
logger.warning("_fetch_causal_matrix failed for %s: %s", ticker, exc)
return None
def _safe_json(obj: Any) -> Any:
"""Recursively make numpy types JSON-serialisable."""
try:
import numpy as np
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
except ImportError:
pass
if isinstance(obj, dict):
return {k: _safe_json(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [_safe_json(v) for v in obj]
return obj
def _resolve_value(value: float, value_type: str, current: float) -> float:
"""Convert a user-supplied value + value_type to the absolute node value."""
vt = value_type.strip().lower()
if vt == "absolute":
return value
if vt == "multiplier":
return current * value
if vt == "percent_change":
return current * (1.0 + value / 100.0)
# default: treat as absolute
return value
# ──────────────────────────────────────────────────────────────────[...]
# Pure-numpy inference helpers (no local causal training imports)
# These functions work entirely from the payload returned by the backend API.
# ──────────────────────────────────────────────────────────────────[...]
def _build_dag_from_payload(payload: dict):
"""
Return a numpy bool DAG adjacency matrix and list of node names
from the backend causal-matrix payload.
"""
import numpy as np
nodes = payload["nodes"]
dag_adj = np.array(payload["dag_adj"], dtype=bool)
adj_matrix = np.array(payload["adj_matrix"], dtype=float)
return nodes, dag_adj, adj_matrix
def _propagate_intervention(
nodes: list,
dag_adj,
equations: dict,
data_level,
topological_order: list,
treatment: str,
abs_value: float,
targets: list,
horizon: int = 5,
):
"""
Propagate a hard intervention (do(treatment=abs_value)) through the
structural equations for `horizon` steps, returning ATE per target node.
Uses only numpy β€” no local causal model imports.
"""
import numpy as np
node_to_idx = {n: i for i, n in enumerate(nodes)}
n = len(nodes)
T = data_level.shape[0]
# Start from the last observed time step
state = data_level[-1].copy().astype(float)
# Fix the treatment node
t_idx = node_to_idx[treatment]
state[t_idx] = abs_value
ate_per_target: Dict[str, float] = {}
baseline = data_level[-1].copy().astype(float)
for _ in range(horizon):
new_state = state.copy()
for node_name in topological_order:
if node_name == treatment:
continue
eq = equations.get(node_name)
if eq is None:
continue
parents = eq.get("parents", [])
coefficients = eq.get("coefficients", {})
intercept = float(eq.get("intercept", 0.0))
if not parents:
continue
val = intercept
for p in parents:
p_idx = node_to_idx.get(p)
if p_idx is not None:
val += float(coefficients.get(p, 0.0)) * float(state[p_idx])
n_idx = node_to_idx[node_name]
new_state[n_idx] = val
state = new_state
for target in targets:
t_i = node_to_idx.get(target)
if t_i is not None:
ate_per_target[target] = float(state[t_i] - baseline[t_i])
return ate_per_target, state
def _abduct_and_predict(
nodes: list,
dag_adj,
equations: dict,
data_level,
topological_order: list,
treatment: str,
cf_value: float,
target: str,
observed_t: int,
):
"""
Simple SCM abduction for counterfactual:
1. Abduct residuals from the observed time step.
2. Re-run structural equations with treatment fixed to cf_value.
3. Return factual_outcome, cf_outcome, ITE.
"""
import numpy as np
node_to_idx = {n: i for i, n in enumerate(nodes)}
obs = data_level[observed_t].copy().astype(float)
# Abduct residuals
residuals: Dict[str, float] = {}
for node_name in topological_order:
eq = equations.get(node_name)
if eq is None or not eq.get("parents"):
residuals[node_name] = 0.0
continue
parents = eq.get("parents", [])
coefficients = eq.get("coefficients", {})
intercept = float(eq.get("intercept", 0.0))
predicted = intercept
for p in parents:
p_idx = node_to_idx.get(p)
if p_idx is not None:
predicted += float(coefficients.get(p, 0.0)) * float(obs[node_to_idx[p]])
residuals[node_name] = float(obs[node_to_idx[node_name]]) - predicted
# Counterfactual: fix treatment, replay equations with abducted noise
cf_state = obs.copy()
cf_state[node_to_idx[treatment]] = cf_value
for node_name in topological_order:
if node_name == treatment:
continue
eq = equations.get(node_name)
if eq is None or not eq.get("parents"):
continue
parents = eq.get("parents", [])
coefficients = eq.get("coefficients", {})
intercept = float(eq.get("intercept", 0.0))
predicted = intercept
for p in parents:
p_idx = node_to_idx.get(p)
if p_idx is not None:
predicted += float(coefficients.get(p, 0.0)) * float(cf_state[p_idx])
n_idx = node_to_idx[node_name]
cf_state[n_idx] = predicted + residuals.get(node_name, 0.0)
factual_outcome = float(obs[node_to_idx[target]])
cf_outcome = float(cf_state[node_to_idx[target]])
ite = cf_outcome - factual_outcome
return factual_outcome, cf_outcome, ite
# ── API: Causal inference (assert / intervene / counterfactual) ───────────
#
# Architecture:
# 1. Fetch the VALIDATED causal matrix from noisy_boy_backend via HTTP.
# The backend has already run CUTS+ learning + pywhyllm + DoWhy validation.
# 2. Use the payload data (equations, adj, data_level) for inference
# using pure numpy/pandas β€” no local causal training imports required.
# 3. Optionally consult pywhyllm guidance from the backend payload.
def run_inference(
ticker: str = "RELIANCE",
mode: str = "assert",
treatment: str = "Revenue",
outcome: Optional[str] = "NetIncome",
target: Optional[str] = None,
value: float = 1.1,
cf_value: Optional[float] = None,
value_type: str = "multiplier",
horizon: int = 5,
observed_t: int = -1,
threshold: float = 0.5,
use_pywhyllm: bool = False,
return_assumption_report: bool = False,
) -> Dict[str, Any]:
"""
Three-layer causal inference driven by the backend's validated causal matrix.
Parameters
----------
ticker : NSE ticker (backend must have a cached pipeline run for it)
mode : "assert" | "intervene" | "counterfactual"
treatment : source node name
outcome : outcome node (assert / Layer-1 association)
target : target node (counterfactual / Layer-3); if None, falls back to outcome
value : intervention magnitude (Layer 2)
cf_value : explicit counterfactual value (Layer 3); if None, 'value' + 'value_type' used
value_type : "absolute" | "multiplier" | "percent_change"
horizon : propagation horizon for intervention (Layer 2, steps)
observed_t : time index for counterfactual abduction (Layer 3; -1 = last obs)
threshold : adjacency threshold used when loading the graph
use_pywhyllm : consult pywhyllm for structural assumptions before running DoWhy
return_assumption_report : include the pywhyllm report dict in the response
Returns
-------
JSON with ate, ci_lower, ci_upper, probability, ripple_effects,
and (for counterfactual) factual_outcome, counterfactual_outcome, ite,
shapley_contributions.
"""
import numpy as np
import pandas as pd
try:
# ── 0. Determine target node ──────────────────────────────────────────
target_node = target if target else outcome
if not target_node:
return {"status": "error", "detail": "Either 'outcome' or 'target' must be provided."}
# ── 1. Fetch validated causal matrix from backend ─────────────────────
# This includes the adjacency matrix, fitted structural equations,
# level-domain data, and optionally a pywhyllm assumption report.
payload = _fetch_causal_matrix(
ticker=ticker,
treatment=treatment if use_pywhyllm else None,
outcome=target_node if use_pywhyllm else None,
include_pywhyllm=use_pywhyllm,
threshold=threshold,
)
if payload is None:
return {
"status": "error",
"detail": (
f"Could not fetch causal matrix for {ticker} from backend. "
"Ensure noisy_boy_backend is running and the pipeline has been run for this ticker."
),
}
if payload.get("status") == "not_found":
return {
"status": "error",
"detail": payload.get("detail", f"No cached pipeline data for {ticker}."),
}
# ── 2. Unpack payload (no local causal training imports) ──────────────
nodes, dag_adj, adj_matrix = _build_dag_from_payload(payload)
node_to_idx = {n: i for i, n in enumerate(nodes)}
data_level = np.array(payload["data_level"], dtype=float)
equations_raw = payload.get("equations", {})
topo_order = payload.get("topological_order", nodes)
T = data_level.shape[0]
df = pd.DataFrame(data_level, columns=nodes)
if treatment not in node_to_idx:
return {"status": "error", "detail": f"Unknown treatment node: {treatment}"}
if target_node not in node_to_idx:
return {"status": "error", "detail": f"Unknown outcome/target node: {target_node}"}
if df.shape[0] < 5:
return {
"status": "error",
"detail": f"Insufficient observations ({df.shape[0]}) to run inference.",
}
# ── 3. pywhyllm structural guidance (from backend payload) ────────────
pywhyllm_report: Optional[dict] = payload.get("pywhyllm_report")
adjustment_sets: List[List[str]] = []
if use_pywhyllm and pywhyllm_report and pywhyllm_report.get("available"):
raw_backdoor = pywhyllm_report.get("suggested_backdoor_sets") or []
valid_nodes = set(nodes) - {treatment, target_node}
for suggested_set in raw_backdoor:
clean = [n for n in suggested_set if n in valid_nodes]
if clean and clean not in adjustment_sets:
adjustment_sets.append(clean)
confounders = [
n for n in (pywhyllm_report.get("suggested_confounders") or [])
if n in valid_nodes
]
if confounders and confounders not in adjustment_sets:
adjustment_sets.append(confounders)
result: Dict[str, Any] = {}
# ═══════════════════════════════════════════════════════════════[...]
# LAYER 1 β€” Association: "What does Y look like given X?"
# Uses DoWhy with the backend-provided DAG, falling back to OLS.
# ═══════════════════════════════════════════════════════════════[...]
if mode == "assert":
try:
from dowhy import CausalModel
# Build DOT graph string from dag_adj
edges = []
for si, src in enumerate(nodes):
for di, dst in enumerate(nodes):
if dag_adj[si, di]:
edges.append(f"{src} -> {dst}")
graph_dot = "digraph{" + "; ".join(edges) + "}"
dowhy_model = CausalModel(
data=df,
treatment=treatment,
outcome=target_node,
graph=graph_dot,
)
identified_estimand = dowhy_model.identify_effect(
proceed_when_unidentifiable=True
)
estimate = dowhy_model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression",
)
ate = float(estimate.value)
# Confidence interval from OLS residuals
se: float = 0.0
try:
import numpy.linalg as nla
X = df[[c for c in df.columns if c != target_node]].values
y = df[target_node].values
XtX_inv = nla.pinv(X.T @ X)
resid = y - X @ nla.lstsq(X, y, rcond=None)[0]
sigma2 = float(np.sum(resid ** 2) / max(1, len(y) - X.shape[1]))
t_idx_local = list(df.columns).index(treatment)
se = float(np.sqrt(max(0.0, sigma2 * XtX_inv[t_idx_local, t_idx_local])))
except Exception:
se = abs(ate) * 0.15 # graceful fallback
ci_lower = ate - 1.96 * se
ci_upper = ate + 1.96 * se
prob = min(1.0, abs(ate) / (abs(ate) + se + 1e-9))
# Ripple effects: direct downstream neighbours of treatment
ripple_effects = []
t_idx_g = node_to_idx[treatment]
for j, node in enumerate(nodes):
if node == treatment or node == target_node:
continue
if dag_adj[t_idx_g, j]:
edge_score = float(adj_matrix[t_idx_g, j])
ripple_effects.append({
"ticker": node,
"direction": 1 if ate > 0 else -1,
"magnitude": round(edge_score * abs(ate), 4),
})
result = {
"ate": ate,
"ci_lower": ci_lower,
"ci_upper": ci_upper,
"probability": prob,
"strategy": "backdoor.linear_regression",
"adjustment_set": adjustment_sets[0] if adjustment_sets else [],
"ripple_effects": ripple_effects,
}
except Exception as dowhy_exc:
# DoWhy not installed or identification failed β€” fall back to OLS
logger.warning("DoWhy association failed (%s), falling back to OLS", dowhy_exc)
t_idx_g = node_to_idx[treatment]
out_idx = node_to_idx[target_node]
# Simple OLS: regress target on treatment
X = df[[treatment]].values
y = df[target_node].values
import numpy.linalg as nla
coef = nla.lstsq(np.c_[np.ones(len(X)), X], y, rcond=None)[0]
ate = float(coef[1])
se = abs(ate) * 0.15
ci_lower = ate - 1.96 * se
ci_upper = ate + 1.96 * se
ripple_effects = []
for j, node in enumerate(nodes):
if node == treatment or node == target_node:
continue
if dag_adj[t_idx_g, j]:
ripple_effects.append({
"ticker": node,
"direction": 1 if ate > 0 else -1,
"magnitude": round(float(adj_matrix[t_idx_g, j]) * abs(ate), 4),
})
result = {
"ate": ate,
"ci_lower": ci_lower,
"ci_upper": ci_upper,
"probability": min(1.0, abs(ate) / (abs(ate) + se + 1e-9)),
"strategy": "ols_fallback",
"adjustment_set": adjustment_sets[0] if adjustment_sets else [],
"ripple_effects": ripple_effects,
}
# ═══════════════════════════════════════════════════════════════[...]
# LAYER 2 β€” Intervention: "What will happen to Y if we do X=value?"
# Propagates through structural equations from the backend payload.
# ═══════════════════════════════════════════════════════════════[...]
elif mode == "intervene":
current_val = float(data_level[-1, node_to_idx[treatment]])
abs_value = _resolve_value(value, value_type, current_val)
# Try DoWhy for ATE estimation first
ate = 0.0
method_used = "scm_propagation"
try:
from dowhy import CausalModel
edges = []
for si, src in enumerate(nodes):
for di, dst in enumerate(nodes):
if dag_adj[si, di]:
edges.append(f"{src} -> {dst}")
graph_dot = "digraph{" + "; ".join(edges) + "}"
dowhy_model = CausalModel(
data=df,
treatment=treatment,
outcome=target_node,
graph=graph_dot,
)
identified_estimand = dowhy_model.identify_effect(
proceed_when_unidentifiable=True
)
estimate = dowhy_model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression",
)
ate_unit = float(estimate.value)
delta = abs_value - current_val
ate = ate_unit * delta
method_used = "backdoor.linear_regression"
except Exception as dowhy_exc:
logger.warning("DoWhy intervention failed (%s), using SCM propagation", dowhy_exc)
# SCM propagation for ripple effects (pure numpy, no training imports)
ate_per_target, final_state = _propagate_intervention(
nodes=nodes,
dag_adj=dag_adj,
equations=equations_raw,
data_level=data_level,
topological_order=topo_order,
treatment=treatment,
abs_value=abs_value,
targets=[target_node] + [n for n in nodes if n != treatment],
horizon=horizon,
)
if method_used == "scm_propagation" and target_node in ate_per_target:
ate = float(ate_per_target[target_node])
se = abs(ate) * 0.12
ci_lower = ate - 1.96 * se
ci_upper = ate + 1.96 * se
ripple_effects = []
for node, delta_val in ate_per_target.items():
if node == treatment:
continue
ripple_effects.append({
"ticker": node,
"direction": 1 if float(delta_val) > 0 else -1,
"magnitude": round(abs(float(delta_val)), 4),
})
result = {
"ate": ate,
"ci_lower": ci_lower,
"ci_upper": ci_upper,
"probability": min(1.0, abs(ate) / (abs(ate) + abs(ci_upper - ci_lower) / 2 + 1e-9)),
"strategy": method_used,
"intervention_value": abs_value,
"value_type": value_type,
"horizon": horizon,
"ripple_effects": ripple_effects,
"adjustment_set": adjustment_sets[0] if adjustment_sets else [],
}
# ═══════════════════════════════════════════════════════════════[...]
# LAYER 3 β€” Counterfactual: "What if X had been different in the past?"
# Uses SCM abduction via pure numpy structural equations.
# ═══════════════════════════════════════════════════════════════[...]
elif mode in ("counterfactual", "counter"):
# Resolve observed timestep
t = observed_t if observed_t >= 0 else (T + observed_t)
t = max(0, min(T - 1, t))
# Resolve counterfactual value
current_val = float(data_level[t, node_to_idx[treatment]])
if cf_value is not None:
abs_cf_value = float(cf_value)
else:
abs_cf_value = _resolve_value(value, value_type, current_val)
# Try DoWhy GCM first
gcm_used = False
factual_outcome = 0.0
cf_outcome_val = 0.0
ite = 0.0
try:
import dowhy.gcm as gcm_module
import networkx as nx
causal_graph = nx.DiGraph()
for si, src in enumerate(nodes):
for di, dst in enumerate(nodes):
if dag_adj[si, di]:
causal_graph.add_edge(src, dst)
for node in nodes:
if node not in causal_graph.nodes:
causal_graph.add_node(node)
gcm_model = gcm_module.InvertibleStructuralCausalModel(causal_graph)
gcm_module.auto.assign_mechanisms(gcm_model, df)
gcm_module.fit(gcm_model, df)
observed_data = df.iloc[[t]]
cf_val_fixed = abs_cf_value
cf_samples = gcm_module.counterfactual_samples(
gcm_model,
{treatment: lambda x, v=cf_val_fixed: np.full(x.shape, v)},
observed_data=observed_data,
num_samples_to_draw=1,
)
factual_outcome = float(observed_data[target_node].iloc[0])
cf_outcome_val = float(cf_samples[target_node].iloc[0])
ite = cf_outcome_val - factual_outcome
gcm_used = True
except Exception as gcm_exc:
logger.warning("DoWhy GCM counterfactual failed (%s), using SCM abduction", gcm_exc)
if not gcm_used:
factual_outcome, cf_outcome_val, ite = _abduct_and_predict(
nodes=nodes,
dag_adj=dag_adj,
equations=equations_raw,
data_level=data_level,
topological_order=topo_order,
treatment=treatment,
cf_value=abs_cf_value,
target=target_node,
observed_t=t,
)
# Shapley: single-treatment β€” just use the ITE directly
shapley = {treatment: ite}
# SE from residual_std of the target equation (from backend payload)
target_eq_data = equations_raw.get(target_node, {})
se = float(target_eq_data.get("residual_std", abs(ite) * 0.15))
ci_lower = ite - 1.96 * se
ci_upper = ite + 1.96 * se
result = {
"ate": ite,
"ite": ite,
"factual_outcome": factual_outcome,
"counterfactual_outcome": cf_outcome_val,
"ci_lower": ci_lower,
"ci_upper": ci_upper,
"probability": min(1.0, abs(ite) / (abs(ite) + se + 1e-9)),
"strategy": "dowhy_gcm" if gcm_used else "scm_abduction",
"counterfactual_value": abs_cf_value,
"value_type": value_type,
"observed_t": t,
"shapley_contributions": shapley,
"ripple_effects": [],
}
else:
return {
"status": "error",
"detail": f"Unknown mode '{mode}'. Must be one of: assert, intervene, counterfactual.",
}
# ── Attach pywhyllm assumption report if requested ────────────────────
if return_assumption_report and pywhyllm_report:
result["pywhyllm_report"] = pywhyllm_report
return _safe_json({"status": "ok", "ticker": ticker.upper(), "mode": mode, **result})
except Exception as exc:
logger.exception("run_inference failed")
return {"status": "error", "detail": str(exc)}
# ──────────────────────────────────────────────────────────────────[...]
# Gradio UI & Entry point
# ──────────────────────────────────────────────────────────────────[...]
def create_demo():
"""Create and return the Gradio Blocks interface."""
with gr.Blocks(title="Iroha Causal Terminal") as demo:
gr.Markdown("# Iroha Causal Terminal")
gr.Markdown("Iroha Financial Intelligence β€” real-time causal probability matrix, HHKD decomposition, DoFlow inference and sector hierarchy over NIFTY50.")
with gr.Row():
ticker = gr.Textbox(label="Ticker", value="RELIANCE")
mode = gr.Dropdown(choices=["assert", "intervene", "counterfactual"], label="Mode", value="assert")
treatment = gr.Textbox(label="Treatment", value="Revenue")
outcome = gr.Textbox(label="Outcome", value="NetIncome")
target = gr.Textbox(label="Target", value="")
with gr.Row():
value = gr.Number(label="Value", value=1.1)
cf_value = gr.Number(label="CF Value")
value_type = gr.Dropdown(choices=["absolute", "multiplier", "percent_change"], label="Value Type", value="multiplier")
horizon = gr.Number(label="Horizon", value=5, precision=0)
observed_t = gr.Number(label="Observed T", value=-1, precision=0)
threshold = gr.Number(label="Threshold", value=0.5)
with gr.Row():
use_pywhyllm = gr.Checkbox(label="Use PyWhyLLM", value=False)
return_assumption_report = gr.Checkbox(label="Return Assumption Report", value=False)
btn = gr.Button("Run Inference")
out = gr.JSON(label="Result")
btn.click(
fn=run_inference,
inputs=[
ticker, mode, treatment, outcome, target, value, cf_value, value_type,
horizon, observed_t, threshold, use_pywhyllm, return_assumption_report
],
outputs=out,
api_name="run_inference"
)
return demo
if __name__ == "__main__":
port = int(os.environ.get("GRADIO_SERVER_PORT", os.environ.get("PORT", "7860")))
host = os.environ.get("GRADIO_SERVER_NAME", "0.0.0.0")
logger.info(f"Starting Iroha Causal Terminal on {host}:{port}")
demo = create_demo()
demo.launch(
server_name=host,
server_port=port,
show_error=True,
)