Spaces:
Runtime error
Runtime error
| import io, math, json, gzip | |
| import numpy as np | |
| import pandas as pd | |
| import gradio as gr | |
| # ------------------------------- | |
| # Core metric helpers | |
| # ------------------------------- | |
| def shannon_entropy_from_counts(counts: np.ndarray) -> float: | |
| counts = counts.astype(float) | |
| total = counts.sum() | |
| if total <= 0: | |
| return 0.0 | |
| p = counts / total | |
| p = p[p > 0] | |
| return float(-(p * np.log2(p)).sum()) | |
| def numeric_binned_entropy(series: pd.Series, bins: int = 32): | |
| x = series.dropna().astype(float).values | |
| if x.size == 0: | |
| return 0.0, 0 | |
| try: | |
| qs = np.linspace(0, 1, bins + 1) | |
| edges = np.unique(np.nanpercentile(x, qs * 100)) | |
| if len(edges) < 2: | |
| edges = np.unique(x) | |
| hist, _ = np.histogram(x, bins=edges) | |
| except Exception: | |
| hist, _ = np.histogram(x, bins=bins) | |
| H = shannon_entropy_from_counts(hist) | |
| k = np.count_nonzero(hist) | |
| return H, max(k, 1) | |
| def categorical_entropy(series: pd.Series): | |
| x = series.dropna().astype(str).values | |
| if x.size == 0: | |
| return 0.0, 0 | |
| vals, counts = np.unique(x, return_counts=True) | |
| H = shannon_entropy_from_counts(counts) | |
| return H, len(vals) | |
| def monotone_runs_and_entropy(series: pd.Series): | |
| x = series.dropna().values | |
| n = len(x) | |
| if n <= 1: | |
| return 1, 0.0 | |
| runs = [1] | |
| for i in range(1, n): | |
| if x[i] >= x[i-1]: | |
| runs[-1] += 1 | |
| else: | |
| runs.append(1) | |
| run_lengths = np.array(runs, dtype=float) | |
| H = shannon_entropy_from_counts(run_lengths) | |
| return len(runs), H | |
| def sortedness_score(series: pd.Series) -> float: | |
| x = series.dropna().values | |
| if len(x) <= 1: | |
| return 1.0 | |
| return float(np.mean(np.diff(x) >= 0)) | |
| def gzip_compress_ratio_from_bytes(b: bytes) -> float: | |
| if len(b) == 0: | |
| return 1.0 | |
| out = io.BytesIO() | |
| with gzip.GzipFile(fileobj=out, mode="wb") as f: | |
| f.write(b) | |
| compressed = out.getvalue() | |
| return len(compressed) / len(b) | |
| def dataframe_gzip_ratio(df: pd.DataFrame, max_rows: int = 20000) -> float: | |
| s = df.sample(min(len(df), max_rows), random_state=0) if len(df) > max_rows else df | |
| raw = s.to_csv(index=False).encode("utf-8", errors="ignore") | |
| return gzip_compress_ratio_from_bytes(raw) | |
| def pareto_maxima_count(points: np.ndarray) -> int: | |
| if points.shape[1] < 2 or points.shape[0] == 0: | |
| return 0 | |
| P = points[:, :2] | |
| order = np.lexsort((-P[:, 1], -P[:, 0])) | |
| best_y = -np.inf | |
| count = 0 | |
| for idx in order: | |
| y = P[idx, 1] | |
| if y >= best_y: | |
| count += 1 | |
| best_y = y | |
| return int(count) | |
| def kd_entropy(points: np.ndarray, max_leaf: int = 128, axis: int = 0) -> float: | |
| n = points.shape[0] | |
| if n == 0: | |
| return 0.0 | |
| if n <= max_leaf: | |
| return 0.0 | |
| vals = points[:, axis] | |
| med = np.median(vals) | |
| left = points[vals <= med] | |
| right = points[vals > med] | |
| pL = len(left) / n | |
| pR = len(right) / n | |
| H_here = 0.0 | |
| for p in (pL, pR): | |
| if p > 0: | |
| H_here += -p * math.log(p, 2) | |
| next_axis = (axis + 1) % points.shape[1] | |
| return H_here + kd_entropy(left, max_leaf, next_axis) + kd_entropy(right, max_leaf, next_axis) | |
| def normalize(value: float, max_value: float) -> float: | |
| if max_value <= 0: | |
| return 0.0 | |
| v = max(0.0, min(1.0, value / max_value)) | |
| return float(v) | |
| # ------------------------------- | |
| # Scoring + interpretations | |
| # ------------------------------- | |
| def grade_band(value: float, thresholds: list, labels: list): | |
| """Generic banding helper: thresholds ascending; returns (label_idx, label).""" | |
| for i, t in enumerate(thresholds): | |
| if value <= t: | |
| return i, labels[i] | |
| return len(labels)-1, labels[-1] | |
| def interpret_report(report: dict) -> dict: | |
| """Produce human-friendly interpretations with color badges and advice.""" | |
| r, c = report["shape"]["rows"], report["shape"]["cols"] | |
| max_bits = math.log2(max(2, r)) | |
| # Harvestable Energy (0..1) | |
| he = report.get("harvestable_energy_score", 0.0) | |
| he_pct = round(100 * he) | |
| he_idx, he_label = grade_band(1.0 - he, [0.15, 0.35, 0.6, 0.85], # invert so higher is better | |
| ["Excellent", "High", "Moderate", "Low", "Very Low"]) | |
| he_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][he_idx] | |
| # Gzip ratio (lower is better) | |
| gz = report.get("gzip_compression_ratio", 1.0) | |
| gz_idx, gz_label = grade_band(gz, [0.45, 0.7, 0.9, 1.1], ["Highly compressible", "Compressible", "Some structure", "Low structure", "Unstructured"]) | |
| gz_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][gz_idx] | |
| # kd-entropy (lower is better). Normalize by log2(n) | |
| Hkd = float(report.get("kd_partition_entropy_bits", 0.0)) | |
| Hkd_norm = normalize(Hkd, max_bits) | |
| kd_idx, kd_label = grade_band(Hkd_norm, [0.15, 0.3, 0.5, 0.75], ["Simple spatial blocks", "Moderately simple", "Mixed", "Complex", "Highly complex"]) | |
| kd_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][kd_idx] | |
| # Run-entropy / Sortedness aggregation for numeric columns | |
| per_col = report.get("per_column", {}) | |
| run_H = [] | |
| sorted_fracs = [] | |
| for col, st in per_col.items(): | |
| if "run_entropy_bits" in st: | |
| run_H.append(st["run_entropy_bits"]) | |
| sorted_fracs.append(st.get("sortedness_fraction", 0.0)) | |
| if run_H: | |
| runH_mean = float(np.mean(run_H)) | |
| runH_norm = normalize(runH_mean, max_bits) | |
| sort_mean = float(np.mean(sorted_fracs)) if sorted_fracs else 0.0 | |
| else: | |
| runH_norm = 1.0 | |
| sort_mean = 0.0 | |
| run_idx, run_label = grade_band(runH_norm, [0.15, 0.3, 0.5, 0.75], ["Long smooth runs", "Mostly smooth", "Mixed runs", "Choppy", "Highly choppy"]) | |
| run_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][run_idx] | |
| sort_idx, sort_label = grade_band(1.0 - sort_mean, [0.15, 0.3, 0.5, 0.75], ["Highly sorted", "Mostly sorted", "Partially sorted", "Barely sorted", "Unsorted"]) | |
| sort_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][sort_idx] | |
| # Duplicate rows | |
| dup = report.get("duplicate_row_fraction", 0.0) | |
| dup_idx, dup_label = grade_band(dup, [0.01, 0.05, 0.15, 0.3], ["Clean", "Light dups", "Moderate dups", "High dups", "Very high dups"]) | |
| dup_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][dup_idx] | |
| # Recommendations (simple rule-based) | |
| recs = [] | |
| if he >= 0.7: | |
| recs.append("Leverage **adaptive algorithms** (TimSort-style merges, linear hull/skyline passes) for near-linear performance.") | |
| elif he >= 0.4: | |
| recs.append("Consider **light preprocessing** (bucketing, dedupe) to unlock more adaptive speedups.") | |
| else: | |
| recs.append("Expect **near worst-case costs**; use robust algorithms and consider feature engineering/cleaning.") | |
| if gz <= 0.7: | |
| recs.append("Data is **highly compressible** → try dictionary/columnar encoding and caching to cut memory/IO.") | |
| elif gz >= 1.0: | |
| recs.append("Data is **hard to compress** → prioritize dimensionality reduction or noise filtering.") | |
| if runH_norm <= 0.3 or sort_mean >= 0.7: | |
| recs.append("Columns show **long monotone runs** → merges and single-pass scans will be efficient.") | |
| else: | |
| recs.append("Columns are **choppy** → batch/aggregate before sorting to reduce comparisons.") | |
| if Hkd_norm <= 0.3: | |
| recs.append("Spatial structure is **simple** → kd/quad trees will be shallow; range queries will be fast.") | |
| elif Hkd_norm >= 0.6: | |
| recs.append("Spatial structure is **complex** → consider clustering/tiling before building indexes.") | |
| if dup >= 0.05: | |
| recs.append("De-duplicate rows to lower entropy and improve compression & joins.") | |
| # Summary verdict | |
| verdict = ["Outstanding structure for fast algorithms.", | |
| "Strong latent order; plenty of speed to harvest.", | |
| "Mixed: some order present; moderate gains possible.", | |
| "Low order; focus on cleaning and feature engineering.", | |
| "Chaotic: assume worst-case runtimes."][he_idx] | |
| return { | |
| "he": {"pct": he_pct, "label": he_label, "color": he_color}, | |
| "gzip": {"value": gz, "label": gz_label, "color": gz_color}, | |
| "kd": {"value": Hkd, "label": kd_label, "color": kd_color}, | |
| "runs": {"value": runH_norm, "label": run_label, "color": run_color}, | |
| "sorted": {"value": sort_mean, "label": sort_label, "color": sort_color}, | |
| "dup": {"value": dup, "label": dup_label, "color": dup_color}, | |
| "verdict": verdict, | |
| "recs": recs[:6] | |
| } | |
| # ------------------------------- | |
| # Compute metrics | |
| # ------------------------------- | |
| def compute_metrics(df: pd.DataFrame) -> dict: | |
| report = {} | |
| n_rows, n_cols = df.shape | |
| report["shape"] = {"rows": int(n_rows), "cols": int(n_cols)} | |
| # Types | |
| types = {} | |
| for c in df.columns: | |
| s = df[c] | |
| if pd.api.types.is_numeric_dtype(s): | |
| types[c] = "numeric" | |
| elif pd.api.types.is_datetime64_any_dtype(s) or "date" in str(s.dtype).lower(): | |
| types[c] = "datetime" | |
| else: | |
| types[c] = "categorical" | |
| report["column_types"] = types | |
| missing = df.isna().mean().to_dict() | |
| dup_ratio = float((len(df) - len(df.drop_duplicates())) / max(1, len(df))) | |
| report["missing_fraction_per_column"] = {k: float(v) for k, v in missing.items()} | |
| report["duplicate_row_fraction"] = dup_ratio | |
| col_stats = {} | |
| for c in df.columns: | |
| s = df[c] | |
| if types[c] == "numeric": | |
| H, k = numeric_binned_entropy(s) | |
| runs, Hruns = monotone_runs_and_entropy(s) | |
| sorted_frac = sortedness_score(s) | |
| col_stats[c] = { | |
| "entropy_binned_bits": float(H), | |
| "active_bins": int(k), | |
| "monotone_runs": int(runs), | |
| "run_entropy_bits": float(Hruns), | |
| "sortedness_fraction": float(sorted_frac), | |
| "min": float(np.nanmin(s.values)) if s.dropna().shape[0] else None, | |
| "max": float(np.nanmax(s.values)) if s.dropna().shape[0] else None, | |
| "mean": float(np.nanmean(s.values)) if s.dropna().shape[0] else None, | |
| "std": float(np.nanstd(s.values)) if s.dropna().shape[0] else None, | |
| } | |
| elif types[c] == "datetime": | |
| try: | |
| sd = pd.to_datetime(s, errors="coerce") | |
| min_dt = sd.min() | |
| max_dt = sd.max() | |
| col_stats[c] = { | |
| "entropy_bits": 0.0, | |
| "unique_values": int(sd.nunique(dropna=True)), | |
| "min_datetime": None if pd.isna(min_dt) else min_dt.isoformat(), | |
| "max_datetime": None if pd.isna(max_dt) else max_dt.isoformat(), | |
| } | |
| except Exception: | |
| col_stats[c] = {"entropy_bits": 0.0, "unique_values": int(s.nunique(dropna=True))} | |
| else: | |
| H, k = categorical_entropy(s) | |
| # top-5 categories | |
| vc = s.astype(str).value_counts(dropna=True).head(5) | |
| top5 = [{"value": str(idx), "count": int(cnt)} for idx, cnt in vc.items()] | |
| col_stats[c] = {"entropy_bits": float(H), "unique_values": int(k), "top_values": top5} | |
| report["per_column"] = col_stats | |
| try: | |
| gzip_ratio = dataframe_gzip_ratio(df) | |
| except Exception: | |
| gzip_ratio = 1.0 | |
| report["gzip_compression_ratio"] = float(gzip_ratio) | |
| num_cols = [c for c, t in types.items() if t == "numeric"] | |
| if len(num_cols) >= 2: | |
| X = df[num_cols].select_dtypes(include=[np.number]).values.astype(float) | |
| X = X[~np.isnan(X).any(axis=1)] | |
| if X.shape[0] >= 3: | |
| pts2 = X[:, :2] | |
| report["pareto_maxima_2d"] = int(pareto_maxima_count(pts2)) | |
| try: | |
| H_kd = kd_entropy(pts2, max_leaf=128, axis=0) | |
| except Exception: | |
| H_kd = 0.0 | |
| report["kd_partition_entropy_bits"] = float(H_kd) | |
| else: | |
| report["pareto_maxima_2d"] = 0 | |
| report["kd_partition_entropy_bits"] = 0.0 | |
| else: | |
| report["pareto_maxima_2d"] = 0 | |
| report["kd_partition_entropy_bits"] = 0.0 | |
| # Harvestable Energy | |
| max_bits = math.log2(max(2, n_rows)) | |
| he_parts = [] | |
| he_parts.append(1.0 - max(0.0, min(1.0, report["gzip_compression_ratio"]))) | |
| num_run_entropies = [] | |
| for c in df.columns: | |
| st = col_stats.get(c, {}) | |
| if "run_entropy_bits" in st: | |
| num_run_entropies.append(st["run_entropy_bits"]) | |
| if num_run_entropies: | |
| mean_run_H = float(np.mean(num_run_entropies)) | |
| he_parts.append(1.0 - normalize(mean_run_H, max_bits)) | |
| H_kd = report.get("kd_partition_entropy_bits", 0.0) | |
| if H_kd is not None: | |
| he_parts.append(1.0 - normalize(float(H_kd), max_bits)) | |
| if he_parts: | |
| HE = float(np.mean([max(0.0, min(1.0, v)) for v in he_parts])) | |
| else: | |
| HE = 0.0 | |
| report["harvestable_energy_score"] = HE | |
| return report | |
| # ------------------------------- | |
| # Dataset shape summary for other models | |
| # ------------------------------- | |
| def dataset_shape_summary(df: pd.DataFrame, report: dict, max_examples: int = 3) -> dict: | |
| """Compact JSON describing the dataset schema, ranges, and examples for LLM ingestion.""" | |
| cols = [] | |
| for name, t in report["column_types"].items(): | |
| col_info = {"name": name, "type": t} | |
| per = report["per_column"].get(name, {}) | |
| if t == "numeric": | |
| col_info.update({ | |
| "min": per.get("min"), | |
| "max": per.get("max"), | |
| "mean": per.get("mean"), | |
| "std": per.get("std"), | |
| "missing_frac": report["missing_fraction_per_column"].get(name, 0.0) | |
| }) | |
| elif t == "datetime": | |
| col_info.update({ | |
| "min": per.get("min_datetime"), | |
| "max": per.get("max_datetime"), | |
| "missing_frac": report["missing_fraction_per_column"].get(name, 0.0) | |
| }) | |
| else: # categorical or other | |
| col_info.update({ | |
| "unique_values": per.get("unique_values"), | |
| "top_values": per.get("top_values", []), | |
| "missing_frac": report["missing_fraction_per_column"].get(name, 0.0) | |
| }) | |
| cols.append(col_info) | |
| # few example rows (stringified to be safe) | |
| examples = df.head(max_examples).astype(str).to_dict(orient="records") | |
| shape = { | |
| "n_rows": report["shape"]["rows"], | |
| "n_cols": report["shape"]["cols"], | |
| "columns": cols, | |
| "duplicates_fraction": report.get("duplicate_row_fraction", 0.0), | |
| "gzip_compression_ratio": report.get("gzip_compression_ratio", None), | |
| "harvestable_energy_score": report.get("harvestable_energy_score", None), | |
| "examples": examples | |
| } | |
| return shape | |
| # ------------------------------- | |
| # UI rendering helpers | |
| # ------------------------------- | |
| def badge(text: str, color: str) -> str: | |
| return f"<span style='background:{color};color:white;padding:6px 10px;border-radius:999px;font-weight:600'>{text}</span>" | |
| def metric_card(title: str, value: str, badge_html: str) -> str: | |
| return f""" | |
| <div style="flex:1;min-width:220px;border:1px solid #e5e7eb;border-radius:14px;padding:14px 16px;"> | |
| <div style="font-size:14px;color:#6b7280;margin-bottom:8px">{title}</div> | |
| <div style="font-size:22px;font-weight:700;margin-bottom:10px">{value}</div> | |
| {badge_html} | |
| </div> | |
| """ | |
| def render_dashboard(report: dict, interp: dict) -> str: | |
| he = interp["he"] | |
| gz = interp["gzip"] | |
| kd = interp["kd"] | |
| runs = interp["runs"] | |
| sortb = interp["sorted"] | |
| dup = interp["dup"] | |
| cards = [] | |
| cards.append(metric_card("Harvestable Energy", f"{he['pct']} / 100", badge(he['label'], he['color']))) | |
| cards.append(metric_card("Compressibility (gzip)", f"{gz['value']:.3f}", badge(gz['label'], gz['color']))) | |
| cards.append(metric_card("Range-Partition Entropy (kd bits)", f"{kd['value']:.3f}", badge(kd['label'], kd['color']))) | |
| cards.append(metric_card("Run-Entropy (avg, normalized)", f"{runs['value']:.2f}", badge(runs['label'], runs['color']))) | |
| cards.append(metric_card("Sortedness (avg fraction)", f"{sortb['value']:.2f}", badge(sortb['label'], sortb['color']))) | |
| cards.append(metric_card("Duplicate Rows (fraction)", f"{dup['value']:.2f}", badge(dup['label'], dup['color']))) | |
| grid = "<div style='display:flex;flex-wrap:wrap;gap:12px'>" + "".join(cards) + "</div>" | |
| verdict = f"<div style='margin-top:12px;padding:14px 16px;background:#f9fafb;border:1px solid #e5e7eb;border-radius:14px'><b>Verdict:</b> {interp['verdict']}</div>" | |
| return grid + verdict | |
| def render_recs(interp: dict) -> str: | |
| lis = "".join([f"<li>{r}</li>" for r in interp["recs"]]) | |
| return f"<ul>{lis}</ul>" | |
| def render_columns(report: dict) -> str: | |
| rows = [] | |
| for c, st in report.get("per_column", {}).items(): | |
| miss = report["missing_fraction_per_column"].get(c, 0.0) | |
| if "entropy_binned_bits" in st: | |
| rows.append(f"<tr><td><b>{c}</b> (num)</td><td>{miss:.1%}</td><td>{st['entropy_binned_bits']:.2f}</td><td>{st['monotone_runs']}</td><td>{st['run_entropy_bits']:.2f}</td><td>{st['sortedness_fraction']:.2f}</td></tr>") | |
| elif "entropy_bits" in st: | |
| rows.append(f"<tr><td><b>{c}</b> (cat)</td><td>{miss:.1%}</td><td>{st['entropy_bits']:.2f}</td><td>-</td><td>-</td><td>-</td></tr>") | |
| else: | |
| rows.append(f"<tr><td><b>{c}</b></td><td>{miss:.1%}</td><td>-</td><td>-</td><td>-</td><td>-</td></tr>") | |
| header = "<tr><th>Column</th><th>Missing</th><th>Entropy</th><th>Monotone Runs</th><th>Run-Entropy</th><th>Sortedness</th></tr>" | |
| table = "<table style='width:100%;border-collapse:collapse'>" + header + "".join(rows) + "</table>" | |
| table = table.replace("<tr>", "<tr style='border-bottom:1px solid #e5e7eb'>") | |
| table = table.replace("<th>", "<th style='text-align:left;padding:8px 6px;color:#374151'>") | |
| table = table.replace("<td>", "<td style='padding:8px 6px;color:#111827'>") | |
| return table | |
| # ------------------------------- | |
| # Gradio app | |
| # ------------------------------- | |
| def analyze(file): | |
| if file is None: | |
| return "{}", "Please upload a CSV.", "", "", "{}" | |
| try: | |
| df = pd.read_csv(file.name) | |
| except Exception as e: | |
| return "{}", f"Failed to read CSV: {e}", "", "", "{}" | |
| report = compute_metrics(df) | |
| interp = interpret_report(report) | |
| shape = dataset_shape_summary(df, report, max_examples=3) | |
| report_json = json.dumps(report, indent=2) | |
| dashboard_html = render_dashboard(report, interp) | |
| recs_html = render_recs(interp) | |
| cols_html = render_columns(report) | |
| shape_json = json.dumps(shape, indent=2) | |
| return report_json, dashboard_html, recs_html, cols_html, shape_json | |
| with gr.Blocks(title="OrderLens — Data Interpreter") as demo: | |
| gr.Markdown("# OrderLens — Data Interpreter") | |
| gr.Markdown("Upload a CSV and get **readable** structure metrics with plain-language guidance.") | |
| with gr.Row(): | |
| inp = gr.File(file_types=[".csv"], label="CSV file") | |
| btn = gr.Button("Analyze", variant="primary") | |
| gr.Markdown("---") | |
| gr.Markdown("### Dashboard") # color-coded cards + verdict | |
| dash = gr.HTML() | |
| gr.Markdown("### Recommendations") # actionable tips | |
| recs = gr.HTML() | |
| gr.Markdown("### Column Details") # per-column table | |
| cols = gr.HTML() | |
| gr.Markdown("### Dataset Shape Summary (JSON)") # compact schema for other models | |
| shape_out = gr.Code(label="Shape", language="json") | |
| gr.Markdown("### Raw report (JSON)") # API-friendly | |
| json_out = gr.Code(label="Report", language="json") | |
| btn.click(analyze, inputs=inp, outputs=[json_out, dash, recs, cols, shape_out]) | |
| if __name__ == "__main__": | |
| demo.launch() |