Reality8081's picture
Init Src
b367bb7
import gradio as gr
import pandas as pd
import numpy as np
import json
import io
import os
import zipfile
import tempfile
# =========================
# GLOBAL STATE (in-memory)
# =========================
STATE = {}
# =========================
# UTILS
# =========================
def read_file(file):
if file.name.endswith(".csv"):
return pd.read_csv(file.name)
elif file.name.endswith(".parquet"):
return pd.read_parquet(file.name)
else:
raise ValueError("Unsupported format")
# =========================
# COMPONENT 1: PROFILING
# =========================
def profile_data(df, training=True):
profile = {}
profile["shape"] = df.shape
profile["missing_ratio"] = df.isna().mean().to_dict()
num_cols = df.select_dtypes(include=np.number).columns.tolist()
cat_cols = df.select_dtypes(exclude=np.number).columns.tolist()
profile["numerical"] = num_cols
profile["categorical"] = cat_cols
if training:
STATE["profile"] = profile
return profile
# =========================
# COMPONENT 2: OUTLIER + IMPUTATION
# =========================
def handle_outliers_impute(df, training=True):
df = df.copy()
dropped_cols = []
impute_values = {}
outlier_bounds = {}
for col in df.columns:
if df[col].isna().mean() > 0.9:
dropped_cols.append(col)
df.drop(columns=dropped_cols, inplace=True)
for col in df.select_dtypes(include=np.number).columns:
if training:
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
outlier_bounds[col] = (lower, upper)
else:
lower, upper = STATE["outliers"][col]
df[col] = np.clip(df[col], lower, upper)
if training:
impute_values[col] = df[col].median()
else:
impute_values[col] = STATE["impute"][col]
df[col].fillna(impute_values[col], inplace=True)
for col in df.select_dtypes(exclude=np.number).columns:
if training:
impute_values[col] = df[col].mode()[0]
else:
impute_values[col] = STATE["impute"][col]
df[col].fillna(impute_values[col], inplace=True)
if training:
STATE["impute"] = impute_values
STATE["outliers"] = outlier_bounds
return df, dropped_cols, impute_values
# =========================
# COMPONENT 3: ENCODING
# =========================
def encode_data(df, training=True):
df = df.copy()
new_cols = []
if training:
STATE["encoding"] = {}
for col in df.select_dtypes(exclude=np.number).columns:
if training:
uniques = df[col].unique().tolist()
STATE["encoding"][col] = uniques
else:
uniques = STATE["encoding"][col]
for val in uniques:
new_col = f"{col}_{val}"
df[new_col] = (df[col] == val).astype(int)
new_cols.append(new_col)
df.drop(columns=[col], inplace=True)
return df, new_cols
# =========================
# COMPONENT 4: MEMORY OPT
# =========================
def optimize_memory(df):
before = df.memory_usage(deep=True).sum()
for col in df.select_dtypes(include=["int64"]).columns:
df[col] = pd.to_numeric(df[col], downcast="integer")
for col in df.select_dtypes(include=["float64"]).columns:
df[col] = pd.to_numeric(df[col], downcast="float")
after = df.memory_usage(deep=True).sum()
saved = 100 * (before - after) / before
return df, before, after, saved
# =========================
# MAIN PIPELINE
# =========================
def run_pipeline(file, mode):
if file is None:
return "Upload file first", None, None, None, None
df = read_file(file)
training = mode == "Training"
if not training and "profile" not in STATE:
return "ERROR: Run Training first!", None, None, None, None
# STEP 1
profile = profile_data(df, training)
# STEP 2
df, dropped, impute = handle_outliers_impute(df, training)
# STEP 3
df, new_cols = encode_data(df, training)
# STEP 4
df, before, after, saved = optimize_memory(df)
# SAVE OUTPUT
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w") as zf:
for k, v in STATE.items():
zf.writestr(f"{k}.json", json.dumps(v, indent=2))
return (
json.dumps(profile, indent=2),
df.head(),
csv_buffer.getvalue(),
zip_buffer.getvalue(),
f"RAM saved: {saved:.2f}%"
)
# =========================
# GRADIO UI
# =========================
with gr.Blocks() as app:
gr.Markdown("# Auto Data Processor (MLOps Version)")
with gr.Row():
file = gr.File(label="Upload CSV/Parquet")
mode = gr.Radio(["Training", "Inference"], value="Training")
run_btn = gr.Button("Run Pipeline")
profile_out = gr.Textbox(label="Data Profiling", lines=15)
df_out = gr.Dataframe()
ram_out = gr.Textbox(label="Memory Optimization")
csv_out = gr.File(label="Download Cleaned CSV")
zip_out = gr.File(label="Download State ZIP")
def wrapper(file, mode):
profile, df_head, csv_data, zip_data, ram = run_pipeline(file, mode)
if df_head is None:
return profile, None, None, None, None
# Create a temporary directory to store the output files safely
temp_dir = tempfile.mkdtemp()
csv_path = os.path.join(temp_dir, "cleaned.csv")
zip_path = os.path.join(temp_dir, "state.zip")
# Write the CSV string to a file
with open(csv_path, "w", encoding="utf-8") as f:
f.write(csv_data)
# Write the ZIP bytes to a file
with open(zip_path, "wb") as f:
f.write(zip_data)
return (
profile,
df_head,
ram,
csv_path, # Now we pass the actual string file path
zip_path # Now we pass the actual string file path
)
run_btn.click(
wrapper,
inputs=[file, mode],
outputs=[profile_out, df_out, ram_out, csv_out, zip_out]
)
if __name__ == "__main__":
app.launch()