crowncode-backend / app /training /train_classifier.py
Rthur2003's picture
feat: define families dictionary for candidate classifiers in _build_candidate_families function
1d86705
"""
Comprehensive multi-model training pipeline for AURIS.
Trains and evaluates multiple classifier families on extracted
audio features using stratified cross-validation, then selects
the best model and exports it for production use.
Models compared:
- Logistic Regression
- Random Forest
- Gradient Boosting
- Support Vector Machine (RBF)
- Multi-Layer Perceptron
- XGBoost (optional)
- LightGBM (optional)
Usage:
python -m app.training.train_classifier data/training/features.csv
Outputs:
models/auris_classifier_v1.pkl - best trained model
models/feature_scaler_v1.pkl - fitted StandardScaler
models/feature_columns_v1.json - ordered feature column names
models/training_results.json - model metrics and metadata
"""
from __future__ import annotations
import csv
import json
import pickle
import sys
import time
import warnings
from pathlib import Path
from typing import Any
import numpy as np
from sklearn.base import clone
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.exceptions import ConvergenceWarning
from sklearn.linear_model import LogisticRegression
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import (
accuracy_score,
f1_score,
precision_score,
recall_score,
roc_auc_score,
roc_curve,
)
from sklearn.model_selection import StratifiedKFold, cross_val_predict, train_test_split
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
# Optional: XGBoost
try:
import xgboost as xgb
HAS_XGB = True
except ImportError:
HAS_XGB = False
# Optional: LightGBM
try:
import lightgbm as lgb
HAS_LGBM = True
except ImportError:
HAS_LGBM = False
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
from app.training.evaluate import evaluate_predictions, load_features_csv
_EXCLUDED_COLUMNS = {"file_path", "label_int", "duration_sec", "sample_rate"}
_TUNED_PARAM_KEYS: dict[str, tuple[str, ...]] = {
"Logistic Regression": ("C", "class_weight", "max_iter"),
"Random Forest": (
"n_estimators",
"max_depth",
"min_samples_leaf",
"min_samples_split",
"class_weight",
"max_features",
),
"Gradient Boosting": (
"n_estimators",
"max_depth",
"learning_rate",
"subsample",
"min_samples_leaf",
"min_samples_split",
),
"SVM (RBF)": ("C", "gamma", "class_weight"),
"MLP Neural Network": (
"hidden_layer_sizes",
"alpha",
"max_iter",
"validation_fraction",
),
"XGBoost": (
"n_estimators",
"max_depth",
"learning_rate",
"subsample",
"colsample_bytree",
"min_child_weight",
"reg_alpha",
"reg_lambda",
"gamma",
),
"LightGBM": (
"n_estimators",
"max_depth",
"learning_rate",
"num_leaves",
"subsample",
"colsample_bytree",
"min_child_samples",
"reg_alpha",
"reg_lambda",
),
}
def train(
features_csv: str | Path,
models_dir: str | Path = "models",
n_folds: int = 5,
) -> dict[str, Any]:
"""
Train and evaluate all classifier candidates.
Returns:
Dict with per-model metrics, best model info, and saved paths.
"""
features_csv = Path(features_csv)
models_dir = Path(models_dir)
models_dir.mkdir(parents=True, exist_ok=True)
X, y = load_features_csv(features_csv)
feature_cols = _load_feature_columns(features_csv)
X = np.nan_to_num(X, nan=0.0, posinf=1.0, neginf=-1.0)
selected_candidates, tuning_results = _select_best_candidates(X, y)
cv = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
best_name = ""
best_auc = -1.0
all_results: dict[str, dict[str, Any]] = {}
for name, model in selected_candidates:
print("\n" + "-" * 56)
print(f"Training: {name}")
print("-" * 56)
t0 = time.time()
pipeline = _build_eval_pipeline(model)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ConvergenceWarning)
y_prob = cross_val_predict(
pipeline,
X,
y,
cv=cv,
method="predict_proba",
)[:, 1]
threshold = _optimal_threshold(y, y_prob)
y_pred = (y_prob >= threshold).astype(int)
cv_time = time.time() - t0
acc = accuracy_score(y, y_pred)
prec = precision_score(y, y_pred, zero_division=0)
rec = recall_score(y, y_pred, zero_division=0)
f1 = f1_score(y, y_pred, zero_division=0)
auc = roc_auc_score(y, y_prob)
tuning_meta = tuning_results.get(name, {})
print(f" Validation AUC: {tuning_meta.get('validation_auc', 0.0):.4f}")
print(f" CV Accuracy: {acc:.4f}")
print(f" CV Precision: {prec:.4f}")
print(f" CV Recall: {rec:.4f}")
print(f" CV F1 Score: {f1:.4f}")
print(f" CV ROC-AUC: {auc:.4f}")
print(f" CV Time: {cv_time:.1f}s")
all_results[name] = {
"accuracy": round(acc, 4),
"precision": round(prec, 4),
"recall": round(rec, 4),
"f1": round(f1, 4),
"roc_auc": round(auc, 4),
"optimal_threshold": round(threshold, 4),
"validation_auc": round(tuning_meta.get("validation_auc", 0.0), 4),
"selection_time_sec": round(tuning_meta.get("selection_time_sec", 0.0), 2),
"train_time_sec": round(cv_time, 2),
"selected_params": tuning_meta.get("selected_params", {}),
"y_true": y.tolist(),
"y_pred": y_pred.tolist(),
"y_prob": y_prob.tolist(),
}
if auc > best_auc:
best_auc = auc
best_name = name
print("\n" + "=" * 64)
print(f"BEST MODEL: {best_name} (ROC-AUC = {best_auc:.4f})")
print("=" * 64)
y_prob_best = np.array(all_results[best_name]["y_prob"])
y_pred_best = np.array(all_results[best_name]["y_pred"])
evaluate_predictions(y, y_pred_best, y_prob_best, title=f"Best: {best_name}")
fitted_models: dict[str, Any] = {}
all_model_paths: dict[str, str] = {}
for name, model in selected_candidates:
print(f"\nFitting final {name} on all {len(y)} samples...")
final_model = clone(model)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ConvergenceWarning)
final_model.fit(X_scaled, y)
fitted_models[name] = final_model
model_pkl = models_dir / f"model_{_safe_model_name(name)}.pkl"
with open(model_pkl, "wb") as f:
pickle.dump(final_model, f)
all_model_paths[name] = str(model_pkl)
print(f" Saved: {model_pkl}")
best_model = fitted_models[best_name]
importance_data = _extract_importance(best_model, feature_cols)
if importance_data:
print("\nTop 15 features:")
for fname, imp in importance_data[:15]:
print(f" {fname:<35} {imp:.4f}")
model_path = models_dir / "auris_classifier_v1.pkl"
scaler_path = models_dir / "feature_scaler_v1.pkl"
columns_path = models_dir / "feature_columns_v1.json"
results_path = models_dir / "training_results.json"
with open(model_path, "wb") as f:
pickle.dump(best_model, f)
with open(scaler_path, "wb") as f:
pickle.dump(scaler, f)
with open(columns_path, "w", encoding="utf-8") as f:
json.dump(feature_cols, f, indent=2)
json_results: dict[str, Any] = {}
for name, data in all_results.items():
json_results[name] = {
key: value
for key, value in data.items()
if key not in ("y_true", "y_pred", "y_prob")
}
json_results["_best_model"] = best_name
json_results["_n_samples"] = len(y)
json_results["_n_features"] = X.shape[1]
json_results["_n_folds"] = n_folds
json_results["_dataset_path"] = str(features_csv)
json_results["_class_balance"] = {
"ai": int(np.sum(y == 1)),
"human": int(np.sum(y == 0)),
}
json_results["_data_leakage_fix"] = (
"duration_sec and sample_rate removed from features; scaler fitted per fold during CV"
)
json_results["_model_paths"] = all_model_paths
if importance_data:
json_results["_feature_importance"] = {
feature_name: round(imp, 6) for feature_name, imp in importance_data
}
with open(results_path, "w", encoding="utf-8") as f:
json.dump(json_results, f, indent=2)
print("\nSaved artifacts:")
print(f" Model: {model_path}")
print(f" Scaler: {scaler_path}")
print(f" Columns: {columns_path}")
print(f" Results: {results_path}")
return {
"best_model": best_name,
"best_auc": best_auc,
"all_results": all_results,
"feature_cols": feature_cols,
"model_path": str(model_path),
}
def _load_feature_columns(features_csv: Path) -> list[str]:
with open(features_csv, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
return [
column
for column in (reader.fieldnames or [])
if column not in _EXCLUDED_COLUMNS
]
def _select_best_candidates(
X: np.ndarray,
y: np.ndarray,
) -> tuple[list[tuple[str, Any]], dict[str, dict[str, Any]]]:
"""
Pick one tuned configuration per model family using a stratified holdout.
"""
X_train, X_val, y_train, y_val = train_test_split(
X,
y,
test_size=0.2,
stratify=y,
random_state=42,
)
selected: list[tuple[str, Any]] = []
tuning_results: dict[str, dict[str, Any]] = {}
for name, variants in _build_candidate_families(y_train).items():
print("\n" + "." * 56)
print(f"Selecting hyperparameters for: {name}")
print("." * 56)
best_model = None
best_auc = -1.0
best_params: dict[str, Any] = {}
selection_start = time.time()
for idx, model in enumerate(variants, start=1):
pipeline = _build_eval_pipeline(model)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ConvergenceWarning)
pipeline.fit(X_train, y_train)
y_prob = pipeline.predict_proba(X_val)[:, 1]
auc = roc_auc_score(y_val, y_prob)
params = _summarize_selected_params(name, model)
print(f" Candidate {idx}: holdout AUC={auc:.4f} | params={params}")
if auc > best_auc:
best_auc = auc
best_model = model
best_params = params
if best_model is None:
raise RuntimeError(f"No valid candidate selected for {name}")
tuning_results[name] = {
"validation_auc": float(best_auc),
"selected_params": best_params,
"selection_time_sec": time.time() - selection_start,
}
selected.append((name, best_model))
print(f" Selected {name}: AUC={best_auc:.4f}")
return selected, tuning_results
def _class_ratio(y: np.ndarray) -> float:
"""Returns n_negative / n_positive for scale_pos_weight in XGBoost."""
n_pos = int(np.sum(y == 1))
n_neg = int(np.sum(y == 0))
return n_neg / n_pos if n_pos > 0 else 1.0
def _build_candidate_families(y: np.ndarray) -> dict[str, list[Any]]:
families: dict[str, list[Any]] = {
"Logistic Regression": [
LogisticRegression(
C=value,
max_iter=2500,
class_weight="balanced",
random_state=42,
)
for value in (0.25, 0.5, 1.0, 2.0)
],
"Random Forest": [
RandomForestClassifier(
n_estimators=300,
max_depth=12,
min_samples_leaf=4,
min_samples_split=8,
max_features="sqrt",
class_weight="balanced_subsample",
random_state=42,
n_jobs=-1,
),
RandomForestClassifier(
n_estimators=450,
max_depth=18,
min_samples_leaf=2,
min_samples_split=4,
max_features="sqrt",
class_weight="balanced_subsample",
random_state=42,
n_jobs=-1,
),
RandomForestClassifier(
n_estimators=500,
max_depth=None,
min_samples_leaf=1,
min_samples_split=2,
max_features="log2",
class_weight="balanced_subsample",
random_state=42,
n_jobs=-1,
),
],
"Gradient Boosting": [
GradientBoostingClassifier(
n_estimators=200,
max_depth=3,
learning_rate=0.05,
subsample=0.8,
min_samples_leaf=10,
min_samples_split=20,
random_state=42,
),
GradientBoostingClassifier(
n_estimators=260,
max_depth=2,
learning_rate=0.04,
subsample=0.85,
min_samples_leaf=12,
min_samples_split=24,
random_state=42,
),
GradientBoostingClassifier(
n_estimators=180,
max_depth=4,
learning_rate=0.07,
subsample=0.75,
min_samples_leaf=8,
min_samples_split=16,
random_state=42,
),
],
"SVM (RBF)": [
CalibratedClassifierCV(
SVC(kernel="rbf", C=c, gamma=g, class_weight="balanced", random_state=42),
method="isotonic", cv=3,
)
for c, g in ((1.0, "scale"), (3.0, "scale"), (6.0, 0.02), (10.0, 0.05))
],
"MLP Neural Network": [
MLPClassifier(
hidden_layer_sizes=(128, 64),
activation="relu",
solver="adam",
alpha=0.0005,
learning_rate="adaptive",
max_iter=500,
early_stopping=True,
validation_fraction=0.15,
random_state=42,
),
MLPClassifier(
hidden_layer_sizes=(192, 96, 32),
activation="relu",
solver="adam",
alpha=0.001,
learning_rate="adaptive",
max_iter=600,
early_stopping=True,
validation_fraction=0.15,
random_state=42,
),
MLPClassifier(
hidden_layer_sizes=(256, 128),
activation="relu",
solver="adam",
alpha=0.002,
learning_rate="adaptive",
max_iter=700,
early_stopping=True,
validation_fraction=0.15,
random_state=42,
),
],
}
if HAS_XGB:
_spw = _class_ratio(y)
families["XGBoost"] = [
xgb.XGBClassifier(
n_estimators=300,
max_depth=4,
learning_rate=0.05,
subsample=0.8,
colsample_bytree=0.8,
min_child_weight=4,
reg_alpha=0.2,
reg_lambda=1.2,
gamma=0.1,
scale_pos_weight=_spw,
eval_metric="logloss",
tree_method="hist",
random_state=42,
n_jobs=-1,
verbosity=0,
),
xgb.XGBClassifier(
n_estimators=500,
max_depth=3,
learning_rate=0.03,
subsample=0.9,
colsample_bytree=0.8,
min_child_weight=2,
reg_alpha=0.1,
reg_lambda=1.0,
gamma=0.0,
scale_pos_weight=_spw,
eval_metric="logloss",
tree_method="hist",
random_state=42,
n_jobs=-1,
verbosity=0,
),
xgb.XGBClassifier(
n_estimators=240,
max_depth=5,
learning_rate=0.06,
subsample=0.75,
colsample_bytree=0.75,
min_child_weight=6,
reg_alpha=0.4,
reg_lambda=1.5,
gamma=0.2,
scale_pos_weight=_spw,
eval_metric="logloss",
tree_method="hist",
random_state=42,
n_jobs=-1,
verbosity=0,
),
]
if HAS_LGBM:
families["LightGBM"] = [
lgb.LGBMClassifier(
n_estimators=300,
max_depth=-1,
learning_rate=0.05,
num_leaves=31,
subsample=0.8,
colsample_bytree=0.8,
min_child_samples=20,
reg_alpha=0.1,
reg_lambda=1.0,
class_weight="balanced",
random_state=42,
verbose=-1,
),
lgb.LGBMClassifier(
n_estimators=500,
max_depth=8,
learning_rate=0.03,
num_leaves=24,
subsample=0.9,
colsample_bytree=0.8,
min_child_samples=30,
reg_alpha=0.2,
reg_lambda=1.2,
class_weight="balanced",
random_state=42,
verbose=-1,
),
lgb.LGBMClassifier(
n_estimators=220,
max_depth=6,
learning_rate=0.07,
num_leaves=18,
subsample=0.75,
colsample_bytree=0.75,
min_child_samples=24,
reg_alpha=0.3,
reg_lambda=1.5,
class_weight="balanced",
random_state=42,
verbose=-1,
),
]
return families
def _optimal_threshold(y_true: np.ndarray, y_prob: np.ndarray) -> float:
"""Youden's J statistic: threshold that maximises sensitivity + specificity - 1."""
fpr, tpr, thresholds = roc_curve(y_true, y_prob)
j_scores = tpr - fpr
return float(thresholds[np.argmax(j_scores)])
def _build_eval_pipeline(model: Any) -> Pipeline:
return Pipeline(
[
("scaler", StandardScaler()),
("model", clone(model)),
]
)
def _safe_model_name(name: str) -> str:
return (
name.lower()
.replace(" ", "_")
.replace("(", "")
.replace(")", "")
.replace("/", "_")
)
def _summarize_selected_params(name: str, model: Any) -> dict[str, Any]:
tuned_keys = _TUNED_PARAM_KEYS.get(name, ())
params = model.get_params()
# CalibratedClassifierCV nests params as "estimator__<key>"
flat: dict[str, Any] = {}
for key, value in params.items():
flat_key = key.split("__")[-1]
if flat_key not in flat:
flat[flat_key] = value
return {key: flat[key] for key in tuned_keys if key in flat}
def _extract_importance(
model: Any,
feature_cols: list[str],
) -> list[tuple[str, float]]:
importances = None
if hasattr(model, "feature_importances_"):
importances = model.feature_importances_
elif hasattr(model, "coef_"):
importances = np.abs(model.coef_[0])
if importances is None:
return []
total = np.sum(importances)
if total > 0:
importances = importances / total
return sorted(
zip(feature_cols, importances.tolist()),
key=lambda item: item[1],
reverse=True,
)
if __name__ == "__main__":
csv_path = sys.argv[1] if len(sys.argv) > 1 else "data/training/features.csv"
model_dir = sys.argv[2] if len(sys.argv) > 2 else "models"
train(csv_path, model_dir)