Spaces:
Sleeping
Sleeping
| """ | |
| Comprehensive multi-model training pipeline for AURIS. | |
| Trains and evaluates multiple classifier families on extracted | |
| audio features using stratified cross-validation, then selects | |
| the best model and exports it for production use. | |
| Models compared: | |
| - Logistic Regression | |
| - Random Forest | |
| - Gradient Boosting | |
| - Support Vector Machine (RBF) | |
| - Multi-Layer Perceptron | |
| - XGBoost (optional) | |
| - LightGBM (optional) | |
| Usage: | |
| python -m app.training.train_classifier data/training/features.csv | |
| Outputs: | |
| models/auris_classifier_v1.pkl - best trained model | |
| models/feature_scaler_v1.pkl - fitted StandardScaler | |
| models/feature_columns_v1.json - ordered feature column names | |
| models/training_results.json - model metrics and metadata | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| import json | |
| import pickle | |
| import sys | |
| import time | |
| import warnings | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| from sklearn.base import clone | |
| from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier | |
| from sklearn.exceptions import ConvergenceWarning | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.calibration import CalibratedClassifierCV | |
| from sklearn.metrics import ( | |
| accuracy_score, | |
| f1_score, | |
| precision_score, | |
| recall_score, | |
| roc_auc_score, | |
| roc_curve, | |
| ) | |
| from sklearn.model_selection import StratifiedKFold, cross_val_predict, train_test_split | |
| from sklearn.neural_network import MLPClassifier | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.svm import SVC | |
| # Optional: XGBoost | |
| try: | |
| import xgboost as xgb | |
| HAS_XGB = True | |
| except ImportError: | |
| HAS_XGB = False | |
| # Optional: LightGBM | |
| try: | |
| import lightgbm as lgb | |
| HAS_LGBM = True | |
| except ImportError: | |
| HAS_LGBM = False | |
| sys.path.insert(0, str(Path(__file__).resolve().parents[2])) | |
| from app.training.evaluate import evaluate_predictions, load_features_csv | |
| _EXCLUDED_COLUMNS = {"file_path", "label_int", "duration_sec", "sample_rate"} | |
| _TUNED_PARAM_KEYS: dict[str, tuple[str, ...]] = { | |
| "Logistic Regression": ("C", "class_weight", "max_iter"), | |
| "Random Forest": ( | |
| "n_estimators", | |
| "max_depth", | |
| "min_samples_leaf", | |
| "min_samples_split", | |
| "class_weight", | |
| "max_features", | |
| ), | |
| "Gradient Boosting": ( | |
| "n_estimators", | |
| "max_depth", | |
| "learning_rate", | |
| "subsample", | |
| "min_samples_leaf", | |
| "min_samples_split", | |
| ), | |
| "SVM (RBF)": ("C", "gamma", "class_weight"), | |
| "MLP Neural Network": ( | |
| "hidden_layer_sizes", | |
| "alpha", | |
| "max_iter", | |
| "validation_fraction", | |
| ), | |
| "XGBoost": ( | |
| "n_estimators", | |
| "max_depth", | |
| "learning_rate", | |
| "subsample", | |
| "colsample_bytree", | |
| "min_child_weight", | |
| "reg_alpha", | |
| "reg_lambda", | |
| "gamma", | |
| ), | |
| "LightGBM": ( | |
| "n_estimators", | |
| "max_depth", | |
| "learning_rate", | |
| "num_leaves", | |
| "subsample", | |
| "colsample_bytree", | |
| "min_child_samples", | |
| "reg_alpha", | |
| "reg_lambda", | |
| ), | |
| } | |
| def train( | |
| features_csv: str | Path, | |
| models_dir: str | Path = "models", | |
| n_folds: int = 5, | |
| ) -> dict[str, Any]: | |
| """ | |
| Train and evaluate all classifier candidates. | |
| Returns: | |
| Dict with per-model metrics, best model info, and saved paths. | |
| """ | |
| features_csv = Path(features_csv) | |
| models_dir = Path(models_dir) | |
| models_dir.mkdir(parents=True, exist_ok=True) | |
| X, y = load_features_csv(features_csv) | |
| feature_cols = _load_feature_columns(features_csv) | |
| X = np.nan_to_num(X, nan=0.0, posinf=1.0, neginf=-1.0) | |
| selected_candidates, tuning_results = _select_best_candidates(X, y) | |
| cv = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42) | |
| scaler = StandardScaler() | |
| X_scaled = scaler.fit_transform(X) | |
| best_name = "" | |
| best_auc = -1.0 | |
| all_results: dict[str, dict[str, Any]] = {} | |
| for name, model in selected_candidates: | |
| print("\n" + "-" * 56) | |
| print(f"Training: {name}") | |
| print("-" * 56) | |
| t0 = time.time() | |
| pipeline = _build_eval_pipeline(model) | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore", category=ConvergenceWarning) | |
| y_prob = cross_val_predict( | |
| pipeline, | |
| X, | |
| y, | |
| cv=cv, | |
| method="predict_proba", | |
| )[:, 1] | |
| threshold = _optimal_threshold(y, y_prob) | |
| y_pred = (y_prob >= threshold).astype(int) | |
| cv_time = time.time() - t0 | |
| acc = accuracy_score(y, y_pred) | |
| prec = precision_score(y, y_pred, zero_division=0) | |
| rec = recall_score(y, y_pred, zero_division=0) | |
| f1 = f1_score(y, y_pred, zero_division=0) | |
| auc = roc_auc_score(y, y_prob) | |
| tuning_meta = tuning_results.get(name, {}) | |
| print(f" Validation AUC: {tuning_meta.get('validation_auc', 0.0):.4f}") | |
| print(f" CV Accuracy: {acc:.4f}") | |
| print(f" CV Precision: {prec:.4f}") | |
| print(f" CV Recall: {rec:.4f}") | |
| print(f" CV F1 Score: {f1:.4f}") | |
| print(f" CV ROC-AUC: {auc:.4f}") | |
| print(f" CV Time: {cv_time:.1f}s") | |
| all_results[name] = { | |
| "accuracy": round(acc, 4), | |
| "precision": round(prec, 4), | |
| "recall": round(rec, 4), | |
| "f1": round(f1, 4), | |
| "roc_auc": round(auc, 4), | |
| "optimal_threshold": round(threshold, 4), | |
| "validation_auc": round(tuning_meta.get("validation_auc", 0.0), 4), | |
| "selection_time_sec": round(tuning_meta.get("selection_time_sec", 0.0), 2), | |
| "train_time_sec": round(cv_time, 2), | |
| "selected_params": tuning_meta.get("selected_params", {}), | |
| "y_true": y.tolist(), | |
| "y_pred": y_pred.tolist(), | |
| "y_prob": y_prob.tolist(), | |
| } | |
| if auc > best_auc: | |
| best_auc = auc | |
| best_name = name | |
| print("\n" + "=" * 64) | |
| print(f"BEST MODEL: {best_name} (ROC-AUC = {best_auc:.4f})") | |
| print("=" * 64) | |
| y_prob_best = np.array(all_results[best_name]["y_prob"]) | |
| y_pred_best = np.array(all_results[best_name]["y_pred"]) | |
| evaluate_predictions(y, y_pred_best, y_prob_best, title=f"Best: {best_name}") | |
| fitted_models: dict[str, Any] = {} | |
| all_model_paths: dict[str, str] = {} | |
| for name, model in selected_candidates: | |
| print(f"\nFitting final {name} on all {len(y)} samples...") | |
| final_model = clone(model) | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore", category=ConvergenceWarning) | |
| final_model.fit(X_scaled, y) | |
| fitted_models[name] = final_model | |
| model_pkl = models_dir / f"model_{_safe_model_name(name)}.pkl" | |
| with open(model_pkl, "wb") as f: | |
| pickle.dump(final_model, f) | |
| all_model_paths[name] = str(model_pkl) | |
| print(f" Saved: {model_pkl}") | |
| best_model = fitted_models[best_name] | |
| importance_data = _extract_importance(best_model, feature_cols) | |
| if importance_data: | |
| print("\nTop 15 features:") | |
| for fname, imp in importance_data[:15]: | |
| print(f" {fname:<35} {imp:.4f}") | |
| model_path = models_dir / "auris_classifier_v1.pkl" | |
| scaler_path = models_dir / "feature_scaler_v1.pkl" | |
| columns_path = models_dir / "feature_columns_v1.json" | |
| results_path = models_dir / "training_results.json" | |
| with open(model_path, "wb") as f: | |
| pickle.dump(best_model, f) | |
| with open(scaler_path, "wb") as f: | |
| pickle.dump(scaler, f) | |
| with open(columns_path, "w", encoding="utf-8") as f: | |
| json.dump(feature_cols, f, indent=2) | |
| json_results: dict[str, Any] = {} | |
| for name, data in all_results.items(): | |
| json_results[name] = { | |
| key: value | |
| for key, value in data.items() | |
| if key not in ("y_true", "y_pred", "y_prob") | |
| } | |
| json_results["_best_model"] = best_name | |
| json_results["_n_samples"] = len(y) | |
| json_results["_n_features"] = X.shape[1] | |
| json_results["_n_folds"] = n_folds | |
| json_results["_dataset_path"] = str(features_csv) | |
| json_results["_class_balance"] = { | |
| "ai": int(np.sum(y == 1)), | |
| "human": int(np.sum(y == 0)), | |
| } | |
| json_results["_data_leakage_fix"] = ( | |
| "duration_sec and sample_rate removed from features; scaler fitted per fold during CV" | |
| ) | |
| json_results["_model_paths"] = all_model_paths | |
| if importance_data: | |
| json_results["_feature_importance"] = { | |
| feature_name: round(imp, 6) for feature_name, imp in importance_data | |
| } | |
| with open(results_path, "w", encoding="utf-8") as f: | |
| json.dump(json_results, f, indent=2) | |
| print("\nSaved artifacts:") | |
| print(f" Model: {model_path}") | |
| print(f" Scaler: {scaler_path}") | |
| print(f" Columns: {columns_path}") | |
| print(f" Results: {results_path}") | |
| return { | |
| "best_model": best_name, | |
| "best_auc": best_auc, | |
| "all_results": all_results, | |
| "feature_cols": feature_cols, | |
| "model_path": str(model_path), | |
| } | |
| def _load_feature_columns(features_csv: Path) -> list[str]: | |
| with open(features_csv, "r", encoding="utf-8") as f: | |
| reader = csv.DictReader(f) | |
| return [ | |
| column | |
| for column in (reader.fieldnames or []) | |
| if column not in _EXCLUDED_COLUMNS | |
| ] | |
| def _select_best_candidates( | |
| X: np.ndarray, | |
| y: np.ndarray, | |
| ) -> tuple[list[tuple[str, Any]], dict[str, dict[str, Any]]]: | |
| """ | |
| Pick one tuned configuration per model family using a stratified holdout. | |
| """ | |
| X_train, X_val, y_train, y_val = train_test_split( | |
| X, | |
| y, | |
| test_size=0.2, | |
| stratify=y, | |
| random_state=42, | |
| ) | |
| selected: list[tuple[str, Any]] = [] | |
| tuning_results: dict[str, dict[str, Any]] = {} | |
| for name, variants in _build_candidate_families(y_train).items(): | |
| print("\n" + "." * 56) | |
| print(f"Selecting hyperparameters for: {name}") | |
| print("." * 56) | |
| best_model = None | |
| best_auc = -1.0 | |
| best_params: dict[str, Any] = {} | |
| selection_start = time.time() | |
| for idx, model in enumerate(variants, start=1): | |
| pipeline = _build_eval_pipeline(model) | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore", category=ConvergenceWarning) | |
| pipeline.fit(X_train, y_train) | |
| y_prob = pipeline.predict_proba(X_val)[:, 1] | |
| auc = roc_auc_score(y_val, y_prob) | |
| params = _summarize_selected_params(name, model) | |
| print(f" Candidate {idx}: holdout AUC={auc:.4f} | params={params}") | |
| if auc > best_auc: | |
| best_auc = auc | |
| best_model = model | |
| best_params = params | |
| if best_model is None: | |
| raise RuntimeError(f"No valid candidate selected for {name}") | |
| tuning_results[name] = { | |
| "validation_auc": float(best_auc), | |
| "selected_params": best_params, | |
| "selection_time_sec": time.time() - selection_start, | |
| } | |
| selected.append((name, best_model)) | |
| print(f" Selected {name}: AUC={best_auc:.4f}") | |
| return selected, tuning_results | |
| def _class_ratio(y: np.ndarray) -> float: | |
| """Returns n_negative / n_positive for scale_pos_weight in XGBoost.""" | |
| n_pos = int(np.sum(y == 1)) | |
| n_neg = int(np.sum(y == 0)) | |
| return n_neg / n_pos if n_pos > 0 else 1.0 | |
| def _build_candidate_families(y: np.ndarray) -> dict[str, list[Any]]: | |
| families: dict[str, list[Any]] = { | |
| "Logistic Regression": [ | |
| LogisticRegression( | |
| C=value, | |
| max_iter=2500, | |
| class_weight="balanced", | |
| random_state=42, | |
| ) | |
| for value in (0.25, 0.5, 1.0, 2.0) | |
| ], | |
| "Random Forest": [ | |
| RandomForestClassifier( | |
| n_estimators=300, | |
| max_depth=12, | |
| min_samples_leaf=4, | |
| min_samples_split=8, | |
| max_features="sqrt", | |
| class_weight="balanced_subsample", | |
| random_state=42, | |
| n_jobs=-1, | |
| ), | |
| RandomForestClassifier( | |
| n_estimators=450, | |
| max_depth=18, | |
| min_samples_leaf=2, | |
| min_samples_split=4, | |
| max_features="sqrt", | |
| class_weight="balanced_subsample", | |
| random_state=42, | |
| n_jobs=-1, | |
| ), | |
| RandomForestClassifier( | |
| n_estimators=500, | |
| max_depth=None, | |
| min_samples_leaf=1, | |
| min_samples_split=2, | |
| max_features="log2", | |
| class_weight="balanced_subsample", | |
| random_state=42, | |
| n_jobs=-1, | |
| ), | |
| ], | |
| "Gradient Boosting": [ | |
| GradientBoostingClassifier( | |
| n_estimators=200, | |
| max_depth=3, | |
| learning_rate=0.05, | |
| subsample=0.8, | |
| min_samples_leaf=10, | |
| min_samples_split=20, | |
| random_state=42, | |
| ), | |
| GradientBoostingClassifier( | |
| n_estimators=260, | |
| max_depth=2, | |
| learning_rate=0.04, | |
| subsample=0.85, | |
| min_samples_leaf=12, | |
| min_samples_split=24, | |
| random_state=42, | |
| ), | |
| GradientBoostingClassifier( | |
| n_estimators=180, | |
| max_depth=4, | |
| learning_rate=0.07, | |
| subsample=0.75, | |
| min_samples_leaf=8, | |
| min_samples_split=16, | |
| random_state=42, | |
| ), | |
| ], | |
| "SVM (RBF)": [ | |
| CalibratedClassifierCV( | |
| SVC(kernel="rbf", C=c, gamma=g, class_weight="balanced", random_state=42), | |
| method="isotonic", cv=3, | |
| ) | |
| for c, g in ((1.0, "scale"), (3.0, "scale"), (6.0, 0.02), (10.0, 0.05)) | |
| ], | |
| "MLP Neural Network": [ | |
| MLPClassifier( | |
| hidden_layer_sizes=(128, 64), | |
| activation="relu", | |
| solver="adam", | |
| alpha=0.0005, | |
| learning_rate="adaptive", | |
| max_iter=500, | |
| early_stopping=True, | |
| validation_fraction=0.15, | |
| random_state=42, | |
| ), | |
| MLPClassifier( | |
| hidden_layer_sizes=(192, 96, 32), | |
| activation="relu", | |
| solver="adam", | |
| alpha=0.001, | |
| learning_rate="adaptive", | |
| max_iter=600, | |
| early_stopping=True, | |
| validation_fraction=0.15, | |
| random_state=42, | |
| ), | |
| MLPClassifier( | |
| hidden_layer_sizes=(256, 128), | |
| activation="relu", | |
| solver="adam", | |
| alpha=0.002, | |
| learning_rate="adaptive", | |
| max_iter=700, | |
| early_stopping=True, | |
| validation_fraction=0.15, | |
| random_state=42, | |
| ), | |
| ], | |
| } | |
| if HAS_XGB: | |
| _spw = _class_ratio(y) | |
| families["XGBoost"] = [ | |
| xgb.XGBClassifier( | |
| n_estimators=300, | |
| max_depth=4, | |
| learning_rate=0.05, | |
| subsample=0.8, | |
| colsample_bytree=0.8, | |
| min_child_weight=4, | |
| reg_alpha=0.2, | |
| reg_lambda=1.2, | |
| gamma=0.1, | |
| scale_pos_weight=_spw, | |
| eval_metric="logloss", | |
| tree_method="hist", | |
| random_state=42, | |
| n_jobs=-1, | |
| verbosity=0, | |
| ), | |
| xgb.XGBClassifier( | |
| n_estimators=500, | |
| max_depth=3, | |
| learning_rate=0.03, | |
| subsample=0.9, | |
| colsample_bytree=0.8, | |
| min_child_weight=2, | |
| reg_alpha=0.1, | |
| reg_lambda=1.0, | |
| gamma=0.0, | |
| scale_pos_weight=_spw, | |
| eval_metric="logloss", | |
| tree_method="hist", | |
| random_state=42, | |
| n_jobs=-1, | |
| verbosity=0, | |
| ), | |
| xgb.XGBClassifier( | |
| n_estimators=240, | |
| max_depth=5, | |
| learning_rate=0.06, | |
| subsample=0.75, | |
| colsample_bytree=0.75, | |
| min_child_weight=6, | |
| reg_alpha=0.4, | |
| reg_lambda=1.5, | |
| gamma=0.2, | |
| scale_pos_weight=_spw, | |
| eval_metric="logloss", | |
| tree_method="hist", | |
| random_state=42, | |
| n_jobs=-1, | |
| verbosity=0, | |
| ), | |
| ] | |
| if HAS_LGBM: | |
| families["LightGBM"] = [ | |
| lgb.LGBMClassifier( | |
| n_estimators=300, | |
| max_depth=-1, | |
| learning_rate=0.05, | |
| num_leaves=31, | |
| subsample=0.8, | |
| colsample_bytree=0.8, | |
| min_child_samples=20, | |
| reg_alpha=0.1, | |
| reg_lambda=1.0, | |
| class_weight="balanced", | |
| random_state=42, | |
| verbose=-1, | |
| ), | |
| lgb.LGBMClassifier( | |
| n_estimators=500, | |
| max_depth=8, | |
| learning_rate=0.03, | |
| num_leaves=24, | |
| subsample=0.9, | |
| colsample_bytree=0.8, | |
| min_child_samples=30, | |
| reg_alpha=0.2, | |
| reg_lambda=1.2, | |
| class_weight="balanced", | |
| random_state=42, | |
| verbose=-1, | |
| ), | |
| lgb.LGBMClassifier( | |
| n_estimators=220, | |
| max_depth=6, | |
| learning_rate=0.07, | |
| num_leaves=18, | |
| subsample=0.75, | |
| colsample_bytree=0.75, | |
| min_child_samples=24, | |
| reg_alpha=0.3, | |
| reg_lambda=1.5, | |
| class_weight="balanced", | |
| random_state=42, | |
| verbose=-1, | |
| ), | |
| ] | |
| return families | |
| def _optimal_threshold(y_true: np.ndarray, y_prob: np.ndarray) -> float: | |
| """Youden's J statistic: threshold that maximises sensitivity + specificity - 1.""" | |
| fpr, tpr, thresholds = roc_curve(y_true, y_prob) | |
| j_scores = tpr - fpr | |
| return float(thresholds[np.argmax(j_scores)]) | |
| def _build_eval_pipeline(model: Any) -> Pipeline: | |
| return Pipeline( | |
| [ | |
| ("scaler", StandardScaler()), | |
| ("model", clone(model)), | |
| ] | |
| ) | |
| def _safe_model_name(name: str) -> str: | |
| return ( | |
| name.lower() | |
| .replace(" ", "_") | |
| .replace("(", "") | |
| .replace(")", "") | |
| .replace("/", "_") | |
| ) | |
| def _summarize_selected_params(name: str, model: Any) -> dict[str, Any]: | |
| tuned_keys = _TUNED_PARAM_KEYS.get(name, ()) | |
| params = model.get_params() | |
| # CalibratedClassifierCV nests params as "estimator__<key>" | |
| flat: dict[str, Any] = {} | |
| for key, value in params.items(): | |
| flat_key = key.split("__")[-1] | |
| if flat_key not in flat: | |
| flat[flat_key] = value | |
| return {key: flat[key] for key in tuned_keys if key in flat} | |
| def _extract_importance( | |
| model: Any, | |
| feature_cols: list[str], | |
| ) -> list[tuple[str, float]]: | |
| importances = None | |
| if hasattr(model, "feature_importances_"): | |
| importances = model.feature_importances_ | |
| elif hasattr(model, "coef_"): | |
| importances = np.abs(model.coef_[0]) | |
| if importances is None: | |
| return [] | |
| total = np.sum(importances) | |
| if total > 0: | |
| importances = importances / total | |
| return sorted( | |
| zip(feature_cols, importances.tolist()), | |
| key=lambda item: item[1], | |
| reverse=True, | |
| ) | |
| if __name__ == "__main__": | |
| csv_path = sys.argv[1] if len(sys.argv) > 1 else "data/training/features.csv" | |
| model_dir = sys.argv[2] if len(sys.argv) > 2 else "models" | |
| train(csv_path, model_dir) | |