Process_Mining / app.py
DrMostafa's picture
Upload 5 files
219c63b verified
import streamlit as st
import shutil
import importlib
import pandas as pd
# ----------------------------
# Config
# ----------------------------
st.set_page_config(page_title="Mini Process Miner", layout="wide")
DEBUG = True # set to False to hide the env checks from users
# Optional: quick environment/dependency check
if DEBUG:
st.write("Python OK. Checking deps…")
st.write("pm4py import:", bool(importlib.util.find_spec("pm4py")))
st.write("graphviz (pip) import:", bool(importlib.util.find_spec("graphviz")))
st.write("dot in PATH:", shutil.which("dot"))
# ----------------------------
# Page setup
# ----------------------------
st.title("Mini Process Miner (vibe-coded)")
# Uploader with clear instructions
uploaded = st.file_uploader(
"Upload your event log (CSV)",
type=["csv"],
help="Use EXACT headers (lowercase): required → case_id, activity, timestamp; optional → column1, column2, column3."
)
st.caption(
"**Required columns:** case_id, activity, timestamp • "
"**Optional:** column1, column2, column3 (e.g., resource, team, location) • "
"Need a sample dataset? [Download a test CSV here](https://drive.google.com/drive/folders/1q0iqn5_FFz4EttLDl0zR09RQ3z4JsdDR) • "
"**Disclaimer:** This demo tool offers no guarantees regarding data security or accuracy; use at your own risk. • "
"Created by Dennis Arrindell, powered by [PM4Py](https://pm4py.fit.fraunhofer.de/), and 100% vibe-coded with ChatGPT."
)
# ----------------------------
# Helpers
# ----------------------------
def ensure_parsed(df: pd.DataFrame) -> pd.DataFrame:
"""Normalize columns and parse timestamp."""
df = df.copy()
df.columns = [c.strip().lower() for c in df.columns]
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
df = df.dropna(subset=["timestamp"])
return df
def compute_ordered(df: pd.DataFrame) -> pd.DataFrame:
return df.sort_values(["case_id", "timestamp"])
def apply_case_level_exclusion(df: pd.DataFrame, activities_to_drop: list) -> pd.DataFrame:
"""Remove entire cases that contain any of the selected activities."""
if not activities_to_drop:
return df
cases_with_forbidden = df.loc[df["activity"].isin(activities_to_drop), "case_id"].unique()
return df.loc[~df["case_id"].isin(cases_with_forbidden)].copy()
def apply_event_level_exclusion(df: pd.DataFrame, activities_to_remove: list) -> pd.DataFrame:
"""Remove only those activity events, keep the rest of the case."""
if not activities_to_remove:
return df
out = df.loc[~df["activity"].isin(activities_to_remove)].copy()
valid_cases = out["case_id"].value_counts()
keep_cases = valid_cases[valid_cases > 0].index
return out.loc[out["case_id"].isin(keep_cases)].copy()
def apply_activity_threshold(df: pd.DataFrame, min_freq: int) -> pd.DataFrame:
"""Drop events whose activity total frequency < min_freq."""
if min_freq <= 1 or df.empty:
return df
counts = df["activity"].value_counts()
keep_acts = counts[counts >= min_freq].index
return df.loc[df["activity"].isin(keep_acts)].copy()
def build_edges(ordered_df: pd.DataFrame) -> pd.DataFrame:
"""Build directly-follows edges with counts."""
if ordered_df.empty:
return pd.DataFrame(columns=["edge", "count"])
tmp = ordered_df.copy()
tmp["next_activity"] = tmp.groupby("case_id")["activity"].shift(-1)
edges = tmp.dropna(subset=["next_activity"])[["activity", "next_activity"]]
if edges.empty:
return pd.DataFrame(columns=["edge", "count"])
edges["edge"] = edges["activity"] + " → " + edges["next_activity"]
edge_counts = edges["edge"].value_counts().rename_axis("edge").reset_index(name="count")
return edge_counts
def apply_optional_column_includes(df: pd.DataFrame, colname: str, selected: list) -> pd.DataFrame:
"""If selections provided for a column, keep only rows where column ∈ selected."""
if colname in df.columns and selected:
return df[df[colname].astype(str).isin([str(x) for x in selected])]
return df
# ----------------------------
# Main
# ----------------------------
if uploaded:
raw_df = pd.read_csv(uploaded)
# Validate columns early (we normalize to lowercase)
required = {"case_id", "activity", "timestamp"}
if not required.issubset(set([c.strip().lower() for c in raw_df.columns])):
st.error("CSV must include required columns: case_id, activity, timestamp. Optional: column1, column2, column3.")
st.stop()
df = ensure_parsed(raw_df)
# ----------------------------
# Sidebar filters (case/event + optional column1/2/3) FIRST
# ----------------------------
st.sidebar.header("Filters")
# Optional extra columns (exact names after normalization): column1, column2, column3
extra_cols_present = [c for c in ["column1", "column2", "column3"] if c in df.columns]
# Case-level exclusion
all_activities = sorted(df["activity"].astype(str).unique().tolist())
case_exclude = st.sidebar.multiselect(
"Remove all CASES containing these activities",
options=all_activities,
help="If a case contains one of these activities, the entire case is removed."
)
# Event-level exclusion
event_exclude = st.sidebar.multiselect(
"Remove only EVENTS with these activities (keep cases)",
options=all_activities,
help="Events with these activities are dropped, but the case remains if other events exist."
)
# Optional include filters for extra columns
if extra_cols_present:
st.sidebar.markdown("---")
st.sidebar.subheader("Optional column filters")
selections = {}
for col in extra_cols_present:
options = sorted(df[col].dropna().astype(str).unique().tolist())
selections[col] = st.sidebar.multiselect(
f"Include only {col} values",
options=options,
help=f"Leave empty to include all {col} values."
)
else:
selections = {}
# Apply case/event filters
df_filt = apply_case_level_exclusion(df, case_exclude)
df_filt = apply_event_level_exclusion(df_filt, event_exclude)
# Apply optional column includes
for col, sel in selections.items():
df_filt = apply_optional_column_includes(df_filt, col, sel)
if df_filt.empty:
st.warning("All data filtered out. Adjust filters to see results.")
st.stop()
ordered = compute_ordered(df_filt)
# ----------------------------
# Sidebar sliders (activity & connection thresholds)
# ----------------------------
act_counts_for_slider = ordered["activity"].value_counts()
max_act_allowed = int(act_counts_for_slider.max()) if not act_counts_for_slider.empty else 1
if max_act_allowed < 1:
max_act_allowed = 1
apply_act_thresh_to_model = st.sidebar.checkbox(
"Apply activity frequency threshold to the model",
value=True,
help="If enabled, activities below the threshold are removed before discovery/visualization."
)
min_act = st.sidebar.slider(
"Min activity frequency to KEEP",
min_value=1, max_value=max_act_allowed, value=1,
help="Drops activities whose total frequency is below this value (if enabled above)."
)
# Create df_model after activity slider decision
if apply_act_thresh_to_model:
df_model = apply_activity_threshold(ordered, min_act)
else:
df_model = ordered
df_model = compute_ordered(df_model)
if df_model.empty:
st.warning("All events dropped by the activity frequency threshold. Lower the threshold.")
st.stop()
# Connection frequency slider (visual-only)
edge_counts_for_slider = build_edges(df_model)
max_edge_allowed = int(edge_counts_for_slider["count"].max()) if not edge_counts_for_slider.empty else 1
if max_edge_allowed < 1:
max_edge_allowed = 1
min_edge = st.sidebar.slider(
"Min connection frequency to SHOW",
min_value=1, max_value=max_edge_allowed, value=1,
help="Hides low-frequency connections in the Connections/DFG views (visual-only)."
)
st.sidebar.markdown("---")
st.sidebar.caption("Activity threshold may modify the model; connection threshold only affects visuals.")
# ----------------------------
# Metrics
# ----------------------------
total_cases = df_model["case_id"].nunique()
total_events = len(df_model)
unique_acts = df_model["activity"].nunique()
c1, c2, c3 = st.columns(3)
c1.metric("Total cases", total_cases)
c2.metric("Total events", total_events)
c3.metric("Unique activities", unique_acts)
# ----------------------------
# Activity frequency (reflects min_act)
# ----------------------------
st.subheader("Activity frequency")
act_counts = df_model["activity"].value_counts().rename_axis("activity").reset_index(name="count")
st.dataframe(act_counts[act_counts["count"] >= min_act], use_container_width=True)
st.bar_chart(act_counts.set_index("activity")["count"])
# ----------------------------
# Variants (quick & dirty)
# ----------------------------
try:
variants = (
df_model.groupby("case_id")["activity"]
.apply(lambda s: " → ".join(s))
.value_counts()
)
st.subheader("Top variants (quick & dirty)")
st.dataframe(
variants.rename("count").reset_index().rename(columns={"index": "variant"}).head(20),
use_container_width=True
)
except Exception:
st.info("Could not compute variants; check your timestamp and activity values.")
# ----------------------------
# Connections (transitions) — respects min_edge (visual-only)
# ----------------------------
st.subheader("Connections (transitions)")
edge_counts = build_edges(df_model)
if edge_counts.empty:
st.info("No transitions found after current filters.")
else:
st.dataframe(edge_counts[edge_counts["count"] >= min_edge], use_container_width=True)
# ----------------------------
# PM4Py visualizations (clean, frequency, performance, DFG)
# ----------------------------
st.subheader("Discovered Process Map")
try:
# Lazy imports so app still loads without pm4py
from pm4py.objects.log.util import dataframe_utils
from pm4py.objects.conversion.log import converter as log_converter
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
from pm4py.visualization.petri_net import visualizer as pn_visualizer
from pm4py.visualization.process_tree import visualizer as pt_visualizer
from pm4py.objects.conversion.process_tree import converter as pt_converter
from pm4py.objects.process_tree import obj as pt_obj
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
from pm4py.visualization.dfg import visualizer as dfg_visualization
# Prepare dataframe for PM4Py
pm_df = df_model.rename(columns={
"case_id": "case:concept:name",
"activity": "concept:name",
"timestamp": "time:timestamp"
}).copy()
pm_df["time:timestamp"] = pd.to_datetime(pm_df["time:timestamp"], errors="coerce")
pm_df = pm_df.dropna(subset=["time:timestamp"])
pm_df = dataframe_utils.convert_timestamp_columns_in_df(pm_df)
# Convert to event log
event_log = log_converter.apply(pm_df)
# Discover model
model = inductive_miner.apply(event_log)
if isinstance(model, pt_obj.ProcessTree):
tree = model
net, im, fm = pt_converter.apply(tree)
tree_gviz = pt_visualizer.apply(tree)
else:
net, im, fm = model
tree_gviz = None
tabs = st.tabs(["Clean Petri Net", "Frequency", "Performance", "DFG (with numbers)"])
# --- Clean Petri net ---
with tabs[0]:
gviz_pn = pn_visualizer.apply(net, im, fm)
st.graphviz_chart(gviz_pn.source, use_container_width=True)
if tree_gviz is not None:
st.caption("Process Tree (discovered)")
st.graphviz_chart(tree_gviz.source, use_container_width=True)
# --- Frequency-decorated Petri net ---
with tabs[1]:
try:
gviz_freq = pn_visualizer.apply(
net, im, fm,
variant=pn_visualizer.Variants.FREQUENCY,
log=event_log
)
st.graphviz_chart(gviz_freq.source, use_container_width=True)
st.caption("Numbers reflect frequencies from the filtered log.")
except Exception as e:
st.info(f"Frequency decoration not available: {e}")
# --- Performance-decorated Petri net ---
with tabs[2]:
try:
gviz_perf = pn_visualizer.apply(
net, im, fm,
variant=pn_visualizer.Variants.PERFORMANCE,
log=event_log
)
st.graphviz_chart(gviz_perf.source, use_container_width=True)
st.caption("Numbers reflect performance (e.g., average durations) computed from timestamps.")
except Exception as e:
st.info(f"Performance decoration not available: {e}")
# --- DFG with numbers (respects min_edge visually) ---
with tabs[3]:
try:
dfg_freq = dfg_discovery.apply(event_log) # {(a,b): count}
dfg_freq_filtered = {k: v for k, v in dfg_freq.items() if v >= min_edge}
dfg_freq_gviz = dfg_visualization.apply(
dfg_freq_filtered if dfg_freq_filtered else dfg_freq,
log=event_log,
variant=dfg_visualization.Variants.FREQUENCY
)
st.graphviz_chart(dfg_freq_gviz.source, use_container_width=True)
st.caption("DFG (Frequency): edge labels show counts. Low-frequency edges hidden per slider.")
dfg_perf_gviz = dfg_visualization.apply(
dfg_freq_filtered if dfg_freq_filtered else dfg_freq,
log=event_log,
variant=dfg_visualization.Variants.PERFORMANCE
)
st.graphviz_chart(dfg_perf_gviz.source, use_container_width=True)
st.caption("DFG (Performance): edge labels show avg durations. Low-frequency edges hidden per slider.")
except Exception as e:
st.info(f"DFG visualization not available: {e}")
except ModuleNotFoundError:
st.error("PM4Py not found. Please ensure pm4py and graphviz are installed.")
except Exception as e:
st.warning(f"Could not render process map: {e}")
# ----------------------------
# Credits
# ----------------------------
st.markdown("---")
with st.expander("Credits", expanded=False):
st.markdown(
"""
**Credits**
Created by **Dennis Arrindell** — creator of the best selling online course about Process Mining on Udemy.
100% Vibe coded using ChatGPT
Inspired by the pioneering work of **Wil van der Aalst**, the “godfather of process mining.”
Powered by the **PM4Py** process mining library, created by **Sebastiaan J. van Zelst** and contributors: https://pm4py.fit.fraunhofer.de/
Built with Python and other open-source libraries (pandas, Streamlit, Graphviz, etc.).
Full technical information, installation steps, and source code available in the **GitHub repository**.
"""
)