import gradio as gr import pandas as pd import numpy as np import json import io import os import zipfile import tempfile # ========================= # GLOBAL STATE (in-memory) # ========================= STATE = {} # ========================= # UTILS # ========================= def read_file(file): if file.name.endswith(".csv"): return pd.read_csv(file.name) elif file.name.endswith(".parquet"): return pd.read_parquet(file.name) else: raise ValueError("Unsupported format") # ========================= # COMPONENT 1: PROFILING # ========================= def profile_data(df, training=True): profile = {} profile["shape"] = df.shape profile["missing_ratio"] = df.isna().mean().to_dict() num_cols = df.select_dtypes(include=np.number).columns.tolist() cat_cols = df.select_dtypes(exclude=np.number).columns.tolist() profile["numerical"] = num_cols profile["categorical"] = cat_cols if training: STATE["profile"] = profile return profile # ========================= # COMPONENT 2: OUTLIER + IMPUTATION # ========================= def handle_outliers_impute(df, training=True): df = df.copy() dropped_cols = [] impute_values = {} outlier_bounds = {} for col in df.columns: if df[col].isna().mean() > 0.9: dropped_cols.append(col) df.drop(columns=dropped_cols, inplace=True) for col in df.select_dtypes(include=np.number).columns: if training: q1 = df[col].quantile(0.25) q3 = df[col].quantile(0.75) iqr = q3 - q1 lower = q1 - 1.5 * iqr upper = q3 + 1.5 * iqr outlier_bounds[col] = (lower, upper) else: lower, upper = STATE["outliers"][col] df[col] = np.clip(df[col], lower, upper) if training: impute_values[col] = df[col].median() else: impute_values[col] = STATE["impute"][col] df[col].fillna(impute_values[col], inplace=True) for col in df.select_dtypes(exclude=np.number).columns: if training: impute_values[col] = df[col].mode()[0] else: impute_values[col] = STATE["impute"][col] df[col].fillna(impute_values[col], inplace=True) if training: STATE["impute"] = impute_values STATE["outliers"] = outlier_bounds return df, dropped_cols, impute_values # ========================= # COMPONENT 3: ENCODING # ========================= def encode_data(df, training=True): df = df.copy() new_cols = [] if training: STATE["encoding"] = {} for col in df.select_dtypes(exclude=np.number).columns: if training: uniques = df[col].unique().tolist() STATE["encoding"][col] = uniques else: uniques = STATE["encoding"][col] for val in uniques: new_col = f"{col}_{val}" df[new_col] = (df[col] == val).astype(int) new_cols.append(new_col) df.drop(columns=[col], inplace=True) return df, new_cols # ========================= # COMPONENT 4: MEMORY OPT # ========================= def optimize_memory(df): before = df.memory_usage(deep=True).sum() for col in df.select_dtypes(include=["int64"]).columns: df[col] = pd.to_numeric(df[col], downcast="integer") for col in df.select_dtypes(include=["float64"]).columns: df[col] = pd.to_numeric(df[col], downcast="float") after = df.memory_usage(deep=True).sum() saved = 100 * (before - after) / before return df, before, after, saved # ========================= # MAIN PIPELINE # ========================= def run_pipeline(file, mode): if file is None: return "Upload file first", None, None, None, None df = read_file(file) training = mode == "Training" if not training and "profile" not in STATE: return "ERROR: Run Training first!", None, None, None, None # STEP 1 profile = profile_data(df, training) # STEP 2 df, dropped, impute = handle_outliers_impute(df, training) # STEP 3 df, new_cols = encode_data(df, training) # STEP 4 df, before, after, saved = optimize_memory(df) # SAVE OUTPUT csv_buffer = io.StringIO() df.to_csv(csv_buffer, index=False) zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "w") as zf: for k, v in STATE.items(): zf.writestr(f"{k}.json", json.dumps(v, indent=2)) return ( json.dumps(profile, indent=2), df.head(), csv_buffer.getvalue(), zip_buffer.getvalue(), f"RAM saved: {saved:.2f}%" ) # ========================= # GRADIO UI # ========================= with gr.Blocks() as app: gr.Markdown("# Auto Data Processor (MLOps Version)") with gr.Row(): file = gr.File(label="Upload CSV/Parquet") mode = gr.Radio(["Training", "Inference"], value="Training") run_btn = gr.Button("Run Pipeline") profile_out = gr.Textbox(label="Data Profiling", lines=15) df_out = gr.Dataframe() ram_out = gr.Textbox(label="Memory Optimization") csv_out = gr.File(label="Download Cleaned CSV") zip_out = gr.File(label="Download State ZIP") def wrapper(file, mode): profile, df_head, csv_data, zip_data, ram = run_pipeline(file, mode) if df_head is None: return profile, None, None, None, None # Create a temporary directory to store the output files safely temp_dir = tempfile.mkdtemp() csv_path = os.path.join(temp_dir, "cleaned.csv") zip_path = os.path.join(temp_dir, "state.zip") # Write the CSV string to a file with open(csv_path, "w", encoding="utf-8") as f: f.write(csv_data) # Write the ZIP bytes to a file with open(zip_path, "wb") as f: f.write(zip_data) return ( profile, df_head, ram, csv_path, # Now we pass the actual string file path zip_path # Now we pass the actual string file path ) run_btn.click( wrapper, inputs=[file, mode], outputs=[profile_out, df_out, ram_out, csv_out, zip_out] ) if __name__ == "__main__": app.launch()