import streamlit as st import shutil import importlib import pandas as pd # ---------------------------- # Config # ---------------------------- st.set_page_config(page_title="Mini Process Miner", layout="wide") DEBUG = True # set to False to hide the env checks from users # Optional: quick environment/dependency check if DEBUG: st.write("Python OK. Checking deps…") st.write("pm4py import:", bool(importlib.util.find_spec("pm4py"))) st.write("graphviz (pip) import:", bool(importlib.util.find_spec("graphviz"))) st.write("dot in PATH:", shutil.which("dot")) # ---------------------------- # Page setup # ---------------------------- st.title("Mini Process Miner (vibe-coded)") # Uploader with clear instructions uploaded = st.file_uploader( "Upload your event log (CSV)", type=["csv"], help="Use EXACT headers (lowercase): required → case_id, activity, timestamp; optional → column1, column2, column3." ) st.caption( "**Required columns:** case_id, activity, timestamp • " "**Optional:** column1, column2, column3 (e.g., resource, team, location) • " "Need a sample dataset? [Download a test CSV here](https://drive.google.com/drive/folders/1q0iqn5_FFz4EttLDl0zR09RQ3z4JsdDR) • " "**Disclaimer:** This demo tool offers no guarantees regarding data security or accuracy; use at your own risk. • " "Created by Dennis Arrindell, powered by [PM4Py](https://pm4py.fit.fraunhofer.de/), and 100% vibe-coded with ChatGPT." ) # ---------------------------- # Helpers # ---------------------------- def ensure_parsed(df: pd.DataFrame) -> pd.DataFrame: """Normalize columns and parse timestamp.""" df = df.copy() df.columns = [c.strip().lower() for c in df.columns] df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") df = df.dropna(subset=["timestamp"]) return df def compute_ordered(df: pd.DataFrame) -> pd.DataFrame: return df.sort_values(["case_id", "timestamp"]) def apply_case_level_exclusion(df: pd.DataFrame, activities_to_drop: list) -> pd.DataFrame: """Remove entire cases that contain any of the selected activities.""" if not activities_to_drop: return df cases_with_forbidden = df.loc[df["activity"].isin(activities_to_drop), "case_id"].unique() return df.loc[~df["case_id"].isin(cases_with_forbidden)].copy() def apply_event_level_exclusion(df: pd.DataFrame, activities_to_remove: list) -> pd.DataFrame: """Remove only those activity events, keep the rest of the case.""" if not activities_to_remove: return df out = df.loc[~df["activity"].isin(activities_to_remove)].copy() valid_cases = out["case_id"].value_counts() keep_cases = valid_cases[valid_cases > 0].index return out.loc[out["case_id"].isin(keep_cases)].copy() def apply_activity_threshold(df: pd.DataFrame, min_freq: int) -> pd.DataFrame: """Drop events whose activity total frequency < min_freq.""" if min_freq <= 1 or df.empty: return df counts = df["activity"].value_counts() keep_acts = counts[counts >= min_freq].index return df.loc[df["activity"].isin(keep_acts)].copy() def build_edges(ordered_df: pd.DataFrame) -> pd.DataFrame: """Build directly-follows edges with counts.""" if ordered_df.empty: return pd.DataFrame(columns=["edge", "count"]) tmp = ordered_df.copy() tmp["next_activity"] = tmp.groupby("case_id")["activity"].shift(-1) edges = tmp.dropna(subset=["next_activity"])[["activity", "next_activity"]] if edges.empty: return pd.DataFrame(columns=["edge", "count"]) edges["edge"] = edges["activity"] + " → " + edges["next_activity"] edge_counts = edges["edge"].value_counts().rename_axis("edge").reset_index(name="count") return edge_counts def apply_optional_column_includes(df: pd.DataFrame, colname: str, selected: list) -> pd.DataFrame: """If selections provided for a column, keep only rows where column ∈ selected.""" if colname in df.columns and selected: return df[df[colname].astype(str).isin([str(x) for x in selected])] return df # ---------------------------- # Main # ---------------------------- if uploaded: raw_df = pd.read_csv(uploaded) # Validate columns early (we normalize to lowercase) required = {"case_id", "activity", "timestamp"} if not required.issubset(set([c.strip().lower() for c in raw_df.columns])): st.error("CSV must include required columns: case_id, activity, timestamp. Optional: column1, column2, column3.") st.stop() df = ensure_parsed(raw_df) # ---------------------------- # Sidebar filters (case/event + optional column1/2/3) FIRST # ---------------------------- st.sidebar.header("Filters") # Optional extra columns (exact names after normalization): column1, column2, column3 extra_cols_present = [c for c in ["column1", "column2", "column3"] if c in df.columns] # Case-level exclusion all_activities = sorted(df["activity"].astype(str).unique().tolist()) case_exclude = st.sidebar.multiselect( "Remove all CASES containing these activities", options=all_activities, help="If a case contains one of these activities, the entire case is removed." ) # Event-level exclusion event_exclude = st.sidebar.multiselect( "Remove only EVENTS with these activities (keep cases)", options=all_activities, help="Events with these activities are dropped, but the case remains if other events exist." ) # Optional include filters for extra columns if extra_cols_present: st.sidebar.markdown("---") st.sidebar.subheader("Optional column filters") selections = {} for col in extra_cols_present: options = sorted(df[col].dropna().astype(str).unique().tolist()) selections[col] = st.sidebar.multiselect( f"Include only {col} values", options=options, help=f"Leave empty to include all {col} values." ) else: selections = {} # Apply case/event filters df_filt = apply_case_level_exclusion(df, case_exclude) df_filt = apply_event_level_exclusion(df_filt, event_exclude) # Apply optional column includes for col, sel in selections.items(): df_filt = apply_optional_column_includes(df_filt, col, sel) if df_filt.empty: st.warning("All data filtered out. Adjust filters to see results.") st.stop() ordered = compute_ordered(df_filt) # ---------------------------- # Sidebar sliders (activity & connection thresholds) # ---------------------------- act_counts_for_slider = ordered["activity"].value_counts() max_act_allowed = int(act_counts_for_slider.max()) if not act_counts_for_slider.empty else 1 if max_act_allowed < 1: max_act_allowed = 1 apply_act_thresh_to_model = st.sidebar.checkbox( "Apply activity frequency threshold to the model", value=True, help="If enabled, activities below the threshold are removed before discovery/visualization." ) min_act = st.sidebar.slider( "Min activity frequency to KEEP", min_value=1, max_value=max_act_allowed, value=1, help="Drops activities whose total frequency is below this value (if enabled above)." ) # Create df_model after activity slider decision if apply_act_thresh_to_model: df_model = apply_activity_threshold(ordered, min_act) else: df_model = ordered df_model = compute_ordered(df_model) if df_model.empty: st.warning("All events dropped by the activity frequency threshold. Lower the threshold.") st.stop() # Connection frequency slider (visual-only) edge_counts_for_slider = build_edges(df_model) max_edge_allowed = int(edge_counts_for_slider["count"].max()) if not edge_counts_for_slider.empty else 1 if max_edge_allowed < 1: max_edge_allowed = 1 min_edge = st.sidebar.slider( "Min connection frequency to SHOW", min_value=1, max_value=max_edge_allowed, value=1, help="Hides low-frequency connections in the Connections/DFG views (visual-only)." ) st.sidebar.markdown("---") st.sidebar.caption("Activity threshold may modify the model; connection threshold only affects visuals.") # ---------------------------- # Metrics # ---------------------------- total_cases = df_model["case_id"].nunique() total_events = len(df_model) unique_acts = df_model["activity"].nunique() c1, c2, c3 = st.columns(3) c1.metric("Total cases", total_cases) c2.metric("Total events", total_events) c3.metric("Unique activities", unique_acts) # ---------------------------- # Activity frequency (reflects min_act) # ---------------------------- st.subheader("Activity frequency") act_counts = df_model["activity"].value_counts().rename_axis("activity").reset_index(name="count") st.dataframe(act_counts[act_counts["count"] >= min_act], use_container_width=True) st.bar_chart(act_counts.set_index("activity")["count"]) # ---------------------------- # Variants (quick & dirty) # ---------------------------- try: variants = ( df_model.groupby("case_id")["activity"] .apply(lambda s: " → ".join(s)) .value_counts() ) st.subheader("Top variants (quick & dirty)") st.dataframe( variants.rename("count").reset_index().rename(columns={"index": "variant"}).head(20), use_container_width=True ) except Exception: st.info("Could not compute variants; check your timestamp and activity values.") # ---------------------------- # Connections (transitions) — respects min_edge (visual-only) # ---------------------------- st.subheader("Connections (transitions)") edge_counts = build_edges(df_model) if edge_counts.empty: st.info("No transitions found after current filters.") else: st.dataframe(edge_counts[edge_counts["count"] >= min_edge], use_container_width=True) # ---------------------------- # PM4Py visualizations (clean, frequency, performance, DFG) # ---------------------------- st.subheader("Discovered Process Map") try: # Lazy imports so app still loads without pm4py from pm4py.objects.log.util import dataframe_utils from pm4py.objects.conversion.log import converter as log_converter from pm4py.algo.discovery.inductive import algorithm as inductive_miner from pm4py.visualization.petri_net import visualizer as pn_visualizer from pm4py.visualization.process_tree import visualizer as pt_visualizer from pm4py.objects.conversion.process_tree import converter as pt_converter from pm4py.objects.process_tree import obj as pt_obj from pm4py.algo.discovery.dfg import algorithm as dfg_discovery from pm4py.visualization.dfg import visualizer as dfg_visualization # Prepare dataframe for PM4Py pm_df = df_model.rename(columns={ "case_id": "case:concept:name", "activity": "concept:name", "timestamp": "time:timestamp" }).copy() pm_df["time:timestamp"] = pd.to_datetime(pm_df["time:timestamp"], errors="coerce") pm_df = pm_df.dropna(subset=["time:timestamp"]) pm_df = dataframe_utils.convert_timestamp_columns_in_df(pm_df) # Convert to event log event_log = log_converter.apply(pm_df) # Discover model model = inductive_miner.apply(event_log) if isinstance(model, pt_obj.ProcessTree): tree = model net, im, fm = pt_converter.apply(tree) tree_gviz = pt_visualizer.apply(tree) else: net, im, fm = model tree_gviz = None tabs = st.tabs(["Clean Petri Net", "Frequency", "Performance", "DFG (with numbers)"]) # --- Clean Petri net --- with tabs[0]: gviz_pn = pn_visualizer.apply(net, im, fm) st.graphviz_chart(gviz_pn.source, use_container_width=True) if tree_gviz is not None: st.caption("Process Tree (discovered)") st.graphviz_chart(tree_gviz.source, use_container_width=True) # --- Frequency-decorated Petri net --- with tabs[1]: try: gviz_freq = pn_visualizer.apply( net, im, fm, variant=pn_visualizer.Variants.FREQUENCY, log=event_log ) st.graphviz_chart(gviz_freq.source, use_container_width=True) st.caption("Numbers reflect frequencies from the filtered log.") except Exception as e: st.info(f"Frequency decoration not available: {e}") # --- Performance-decorated Petri net --- with tabs[2]: try: gviz_perf = pn_visualizer.apply( net, im, fm, variant=pn_visualizer.Variants.PERFORMANCE, log=event_log ) st.graphviz_chart(gviz_perf.source, use_container_width=True) st.caption("Numbers reflect performance (e.g., average durations) computed from timestamps.") except Exception as e: st.info(f"Performance decoration not available: {e}") # --- DFG with numbers (respects min_edge visually) --- with tabs[3]: try: dfg_freq = dfg_discovery.apply(event_log) # {(a,b): count} dfg_freq_filtered = {k: v for k, v in dfg_freq.items() if v >= min_edge} dfg_freq_gviz = dfg_visualization.apply( dfg_freq_filtered if dfg_freq_filtered else dfg_freq, log=event_log, variant=dfg_visualization.Variants.FREQUENCY ) st.graphviz_chart(dfg_freq_gviz.source, use_container_width=True) st.caption("DFG (Frequency): edge labels show counts. Low-frequency edges hidden per slider.") dfg_perf_gviz = dfg_visualization.apply( dfg_freq_filtered if dfg_freq_filtered else dfg_freq, log=event_log, variant=dfg_visualization.Variants.PERFORMANCE ) st.graphviz_chart(dfg_perf_gviz.source, use_container_width=True) st.caption("DFG (Performance): edge labels show avg durations. Low-frequency edges hidden per slider.") except Exception as e: st.info(f"DFG visualization not available: {e}") except ModuleNotFoundError: st.error("PM4Py not found. Please ensure pm4py and graphviz are installed.") except Exception as e: st.warning(f"Could not render process map: {e}") # ---------------------------- # Credits # ---------------------------- st.markdown("---") with st.expander("Credits", expanded=False): st.markdown( """ **Credits** Created by **Dennis Arrindell** — creator of the best selling online course about Process Mining on Udemy. 100% Vibe coded using ChatGPT Inspired by the pioneering work of **Wil van der Aalst**, the “godfather of process mining.” Powered by the **PM4Py** process mining library, created by **Sebastiaan J. van Zelst** and contributors: https://pm4py.fit.fraunhofer.de/ Built with Python and other open-source libraries (pandas, Streamlit, Graphviz, etc.). Full technical information, installation steps, and source code available in the **GitHub repository**. """ )