Spaces:
Sleeping
Sleeping
| """ | |
| Training script for HateShield-BN Custom Model | |
| Trains SEPARATE models for English and Bengali datasets | |
| Compares multiple algorithms and saves the best one | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.svm import SVC | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.svm import LinearSVC | |
| from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score | |
| import joblib | |
| import os | |
| from typing import Tuple, Dict | |
| import warnings | |
| from tqdm import tqdm | |
| import time | |
| import json | |
| warnings.filterwarnings('ignore') | |
| # Configuration | |
| ENGLISH_DATASET_PATH = "data/english_hate_speech.csv" | |
| BENGALI_DATASET_PATH = "data/bengali_hate_speech.csv" | |
| MODEL_OUTPUT_PATH = "models/model_weights/custom_models" | |
| RANDOM_STATE = 42 | |
| def load_english_dataset() -> pd.DataFrame: | |
| """Load and preprocess English dataset""" | |
| print("π Loading English dataset...") | |
| try: | |
| df = pd.read_csv(ENGLISH_DATASET_PATH) | |
| print(f" β Loaded: {len(df):,} samples") | |
| # Standardize column names | |
| if 'content' in df.columns: | |
| df = df.rename(columns={'content': 'text'}) | |
| elif 'Content' in df.columns: | |
| df = df.rename(columns={'Content': 'text'}) | |
| # Ensure label column | |
| if 'Label' in df.columns: | |
| df['label'] = df['Label'].astype(int) | |
| elif 'label' in df.columns: | |
| df['label'] = df['label'].astype(int) | |
| else: | |
| raise ValueError("English dataset must have 'Label' or 'label' column") | |
| # Keep only text and label | |
| df = df[['text', 'label']].copy() | |
| # Clean data | |
| df = df.dropna(subset=['text', 'label']) | |
| df = df[df['text'].str.strip().str.len() > 0] | |
| # Ensure binary labels (0, 1) | |
| unique_labels = df['label'].unique() | |
| print(f" π Unique labels: {sorted(unique_labels)}") | |
| if set(unique_labels) == {0, 1}: | |
| print(" β Binary classification: 0=Non-Hate, 1=Hate") | |
| else: | |
| print(f" β οΈ Warning: Expected binary labels, found: {unique_labels}") | |
| # Convert to binary if needed | |
| df['label'] = (df['label'] > 0).astype(int) | |
| print(f" β After preprocessing: {len(df):,} samples") | |
| return df | |
| except FileNotFoundError: | |
| print(f" β Error: File not found at {ENGLISH_DATASET_PATH}") | |
| return pd.DataFrame(columns=['text', 'label']) | |
| except Exception as e: | |
| print(f" β Error loading English dataset: {e}") | |
| return pd.DataFrame(columns=['text', 'label']) | |
| def load_bengali_dataset() -> pd.DataFrame: | |
| """Load and preprocess Bengali dataset""" | |
| print("\nπ Loading Bengali dataset...") | |
| try: | |
| df = pd.read_csv(BENGALI_DATASET_PATH) | |
| print(f" β Loaded: {len(df):,} samples") | |
| # Standardize column names | |
| if 'sentence' in df.columns: | |
| df = df.rename(columns={'sentence': 'text'}) | |
| elif 'sentences' in df.columns: | |
| df = df.rename(columns={'sentences': 'text'}) | |
| # Convert hate/category to standard labels | |
| if 'hate' in df.columns: | |
| if 'category' in df.columns: | |
| category_map = { | |
| 'non-hate': 0, | |
| 'offensive': 1, | |
| 'hate': 2, | |
| } | |
| df['label'] = df['category'].map(category_map) | |
| # Fill missing with hate column | |
| df.loc[df['label'].isna() & (df['hate'] == 1), 'label'] = 2 | |
| df.loc[df['label'].isna() & (df['hate'] == 0), 'label'] = 0 | |
| else: | |
| # If only 'hate' column, map: 0=non-hate, 1=hate (as offensive), 2=hate | |
| df['label'] = df['hate'].apply(lambda x: 2 if x == 1 else 0) | |
| df['label'] = df['label'].astype(int) | |
| df = df[['text', 'label']].copy() | |
| # Clean data | |
| df = df.dropna(subset=['text', 'label']) | |
| df = df[df['text'].str.strip().str.len() > 0] | |
| # Ensure multi-class labels (0, 1, 2) | |
| unique_labels = df['label'].unique() | |
| print(f" π Unique labels: {sorted(unique_labels)}") | |
| if set(unique_labels) == {0, 1, 2}: | |
| print(" β Multi-class: 0=Neutral, 1=Offensive, 2=Hate Speech") | |
| elif set(unique_labels) == {0, 1}: | |
| print(" β οΈ Warning: Only binary labels found, expected 3 classes") | |
| else: | |
| print(f" β οΈ Warning: Unexpected labels: {unique_labels}") | |
| print(f" β After preprocessing: {len(df):,} samples") | |
| return df | |
| except FileNotFoundError: | |
| print(f" β Error: File not found at {BENGALI_DATASET_PATH}") | |
| return pd.DataFrame(columns=['text', 'label']) | |
| except Exception as e: | |
| print(f" β Error loading Bengali dataset: {e}") | |
| return pd.DataFrame(columns=['text', 'label']) | |
| def analyze_distribution(df: pd.DataFrame, name: str): | |
| """Print dataset statistics""" | |
| if len(df) == 0: | |
| print(f"\n{'='*50}") | |
| print(f"β {name} Dataset: EMPTY") | |
| print('='*50) | |
| return | |
| print(f"\n{'='*50}") | |
| print(f"π {name} Dataset Distribution") | |
| print('='*50) | |
| unique_labels = sorted(df['label'].unique()) | |
| print(f"Unique labels: {unique_labels}") | |
| print(f"Total samples: {len(df):,}\n") | |
| # Dynamic label names | |
| if set(unique_labels) == {0, 1}: | |
| label_names = {0: 'Non-Hate/Neutral', 1: 'Hate/Offensive'} | |
| elif set(unique_labels) == {0, 1, 2}: | |
| label_names = {0: 'Neutral', 1: 'Offensive', 2: 'Hate Speech'} | |
| else: | |
| label_names = {label: f'Class {label}' for label in unique_labels} | |
| # Show distribution | |
| for label in unique_labels: | |
| count = len(df[df['label'] == label]) | |
| percentage = count / len(df) * 100 | |
| label_name = label_names.get(label, f'Unknown({label})') | |
| print(f" {label} - {label_name:20s}: {count:6,} ({percentage:5.1f}%)") | |
| # def train_single_model(X_train, X_test, y_train, y_test, model_type: str, language: str) -> Dict: | |
| # """Train a single model and return results""" | |
| # print(f"\n π§ Training {model_type.upper()}...") | |
| # # Choose model | |
| # if model_type == 'logistic': | |
| # model = LogisticRegression( | |
| # max_iter=1000, | |
| # random_state=RANDOM_STATE, | |
| # class_weight='balanced', | |
| # n_jobs=-1 | |
| # ) | |
| # elif model_type == 'svm': | |
| # model = LinearSVC( | |
| # random_state=RANDOM_STATE, | |
| # class_weight='balanced', | |
| # max_iter=2000 | |
| # ) | |
| # elif model_type == 'random_forest': | |
| # model = RandomForestClassifier( | |
| # n_estimators=100, | |
| # random_state=RANDOM_STATE, | |
| # class_weight='balanced', | |
| # n_jobs=-1 | |
| # ) | |
| # else: | |
| # raise ValueError(f"Unknown model type: {model_type}") | |
| # # Train | |
| # start_time = time.time() | |
| # model.fit(X_train, y_train) | |
| # y_pred = model.predict(X_test) | |
| # training_time = time.time() - start_time | |
| # # Evaluate | |
| # accuracy = accuracy_score(y_test, y_pred) | |
| # f1 = f1_score(y_test, y_pred, average='weighted') | |
| # print(f" β Accuracy: {accuracy:.4f} ({accuracy*100:.2f}%)") | |
| # print(f" β F1-Score: {f1:.4f}") | |
| # print(f" β Time: {training_time:.2f}s") | |
| # return { | |
| # 'model': model, | |
| # 'accuracy': accuracy, | |
| # 'f1_score': f1, | |
| # 'training_time': training_time, | |
| # 'predictions': y_pred | |
| # } | |
| def train_single_model(X_train, X_test, y_train, y_test, model_type: str, language: str) -> Dict: | |
| """Train a single model and return results""" | |
| print(f"\n π§ Training {model_type.upper()}...") | |
| # Choose model | |
| if model_type == 'logistic': | |
| model = LogisticRegression( | |
| max_iter=1000, | |
| random_state=RANDOM_STATE, | |
| class_weight='balanced', | |
| n_jobs=-1 | |
| ) | |
| elif model_type == 'svm': | |
| # β Use SVC instead of LinearSVC | |
| model = SVC( | |
| kernel='linear', | |
| probability=True, # β CRITICAL: Enable probability estimates | |
| random_state=RANDOM_STATE, | |
| class_weight='balanced', | |
| max_iter=2000 | |
| ) | |
| elif model_type == 'random_forest': | |
| model = RandomForestClassifier( | |
| n_estimators=100, | |
| random_state=RANDOM_STATE, | |
| class_weight='balanced', | |
| n_jobs=-1 | |
| ) | |
| else: | |
| raise ValueError(f"Unknown model type: {model_type}") | |
| # β ADD THIS: Train and evaluate | |
| start_time = time.time() | |
| model.fit(X_train, y_train) | |
| y_pred = model.predict(X_test) | |
| training_time = time.time() - start_time | |
| # Evaluate | |
| accuracy = accuracy_score(y_test, y_pred) | |
| f1 = f1_score(y_test, y_pred, average='weighted') | |
| print(f" β Accuracy: {accuracy:.4f} ({accuracy*100:.2f}%)") | |
| print(f" β F1-Score: {f1:.4f}") | |
| print(f" β Time: {training_time:.2f}s") | |
| # β Verify predict_proba works | |
| if hasattr(model, 'predict_proba'): | |
| proba = model.predict_proba(X_test[:1]) | |
| print(f" β predict_proba: Available (shape: {proba.shape})") | |
| else: | |
| print(f" β οΈ predict_proba: NOT Available") | |
| # β ADD THIS: Return results | |
| return { | |
| 'model': model, | |
| 'accuracy': accuracy, | |
| 'f1_score': f1, | |
| 'training_time': training_time, | |
| 'predictions': y_pred | |
| } | |
| def train_and_compare_models(X_train, X_test, y_train, y_test, language: str) -> Tuple: | |
| """Train multiple models and return the best one""" | |
| print(f"\nπ€ Training Multiple Models for {language.upper()}...") | |
| print("=" * 60) | |
| models_to_train = ['logistic', 'svm'] | |
| results = {} | |
| # Train all models | |
| for model_type in models_to_train: | |
| try: | |
| result = train_single_model(X_train, X_test, y_train, y_test, model_type, language) | |
| results[model_type] = result | |
| except Exception as e: | |
| print(f" β Error training {model_type}: {e}") | |
| continue | |
| if not results: | |
| print("β No models trained successfully!") | |
| return None, None, {} | |
| # Compare models | |
| print(f"\n{'='*60}") | |
| print(f"π Model Comparison for {language.upper()}") | |
| print('='*60) | |
| print(f"{'Model':<20} {'Accuracy':<12} {'F1-Score':<12} {'Time (s)':<10}") | |
| print('-'*60) | |
| best_model_name = None | |
| best_score = 0 | |
| for model_name, result in results.items(): | |
| accuracy = result['accuracy'] | |
| f1 = result['f1_score'] | |
| time_taken = result['training_time'] | |
| # Use F1-score as primary metric (better for imbalanced datasets) | |
| score = f1 | |
| print(f"{model_name:<20} {accuracy:<12.4f} {f1:<12.4f} {time_taken:<10.2f}") | |
| if score > best_score: | |
| best_score = score | |
| best_model_name = model_name | |
| print('='*60) | |
| print(f"π Best Model: {best_model_name.upper()} (F1-Score: {best_score:.4f})") | |
| print('='*60) | |
| # Get best model | |
| best_result = results[best_model_name] | |
| best_model = best_result['model'] | |
| # Detailed report for best model | |
| print(f"\nπ Detailed Report for {best_model_name.upper()}:") | |
| unique_labels = sorted(np.unique(y_test)) | |
| if set(unique_labels) == {0, 1}: | |
| target_names = ['Non-Hate', 'Hate'] | |
| elif set(unique_labels) == {0, 1, 2}: | |
| target_names = ['Neutral', 'Offensive', 'Hate Speech'] | |
| else: | |
| target_names = [f'Class {i}' for i in unique_labels] | |
| print(classification_report(y_test, best_result['predictions'], | |
| target_names=target_names, | |
| zero_division=0)) | |
| print("π’ Confusion Matrix:") | |
| print(confusion_matrix(y_test, best_result['predictions'])) | |
| # Return comparison data | |
| comparison = { | |
| model_name: { | |
| 'accuracy': result['accuracy'], | |
| 'f1_score': result['f1_score'], | |
| 'training_time': result['training_time'] | |
| } | |
| for model_name, result in results.items() | |
| } | |
| return best_model, best_model_name, comparison | |
| def train_language_specific_model(df: pd.DataFrame, language: str): | |
| """Train model for specific language with comparison""" | |
| print(f"\n{'='*60}") | |
| print(f"π Training {language.upper()} Model") | |
| print('='*60) | |
| if len(df) == 0: | |
| print(f"β No data for {language}!") | |
| return None, None, None, None, {} | |
| # Analyze distribution | |
| analyze_distribution(df, language.capitalize()) | |
| # Split data | |
| print(f"\nβοΈ Splitting data (80/20 train/test)...") | |
| X = df['text'] | |
| y = df['label'].astype(int) | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, | |
| test_size=0.2, | |
| random_state=RANDOM_STATE, | |
| stratify=y | |
| ) | |
| print(f" β Train size: {len(X_train):,}") | |
| print(f" β Test size: {len(X_test):,}") | |
| # Create TF-IDF vectorizer | |
| print(f"\nπ€ Creating TF-IDF vectorizer...") | |
| vectorizer = TfidfVectorizer( | |
| max_features=5000, | |
| ngram_range=(1, 2), | |
| min_df=2, | |
| max_df=0.8, | |
| strip_accents='unicode', | |
| analyzer='word', | |
| token_pattern=r'\w{1,}', | |
| sublinear_tf=True | |
| ) | |
| print(" β³ Vectorizing text...") | |
| X_train_vec = vectorizer.fit_transform(X_train) | |
| X_test_vec = vectorizer.transform(X_test) | |
| print(f" β Feature dimension: {X_train_vec.shape[1]:,}") | |
| # Train and compare models | |
| best_model, best_model_name, comparison = train_and_compare_models( | |
| X_train_vec, X_test_vec, y_train, y_test, language | |
| ) | |
| if best_model is None: | |
| return None, None, None, None, {} | |
| # Get final accuracy | |
| y_pred = best_model.predict(X_test_vec) | |
| final_accuracy = accuracy_score(y_test, y_pred) | |
| final_f1 = f1_score(y_test, y_pred, average='weighted') | |
| return best_model, vectorizer, best_model_name, final_f1, comparison | |
| def main(): | |
| """Main training pipeline""" | |
| print("\n" + "=" * 70) | |
| print("π‘οΈ HateShield-BN Model Training (Language-Specific with Comparison)") | |
| print("=" * 70 + "\n") | |
| # Load datasets separately | |
| df_english = load_english_dataset() | |
| df_bengali = load_bengali_dataset() | |
| if len(df_english) == 0 and len(df_bengali) == 0: | |
| print("\nβ Error: No data found!") | |
| return | |
| os.makedirs(MODEL_OUTPUT_PATH, exist_ok=True) | |
| results = {} | |
| # Train English model | |
| if len(df_english) > 0: | |
| print("\n" + "π¬π§ " * 35) | |
| english_model, english_vectorizer, english_best_name, english_f1, english_comparison = train_language_specific_model( | |
| df_english, 'english' | |
| ) | |
| if english_model is not None: | |
| # Save English model | |
| print(f"\nπΎ Saving English model ({english_best_name})...") | |
| english_model_path = os.path.join(MODEL_OUTPUT_PATH, "english_model.pkl") | |
| english_vec_path = os.path.join(MODEL_OUTPUT_PATH, "english_vectorizer.pkl") | |
| joblib.dump(english_model, english_model_path) | |
| joblib.dump(english_vectorizer, english_vec_path) | |
| print(f" β Model saved to: {english_model_path}") | |
| print(f" β Vectorizer saved to: {english_vec_path}") | |
| results['english'] = { | |
| 'best_model': english_best_name, | |
| 'f1_score': english_f1, | |
| 'num_classes': len(df_english['label'].unique()), | |
| 'samples': len(df_english), | |
| 'comparison': english_comparison | |
| } | |
| # Train Bengali model | |
| if len(df_bengali) > 0: | |
| print("\n" + "π§π© " * 35) | |
| bengali_model, bengali_vectorizer, bengali_best_name, bengali_f1, bengali_comparison = train_language_specific_model( | |
| df_bengali, 'bengali' | |
| ) | |
| if bengali_model is not None: | |
| # Save Bengali model | |
| print(f"\nπΎ Saving Bengali model ({bengali_best_name})...") | |
| bengali_model_path = os.path.join(MODEL_OUTPUT_PATH, "bengali_model.pkl") | |
| bengali_vec_path = os.path.join(MODEL_OUTPUT_PATH, "bengali_vectorizer.pkl") | |
| joblib.dump(bengali_model, bengali_model_path) | |
| joblib.dump(bengali_vectorizer, bengali_vec_path) | |
| print(f" β Model saved to: {bengali_model_path}") | |
| print(f" β Vectorizer saved to: {bengali_vec_path}") | |
| results['bengali'] = { | |
| 'best_model': bengali_best_name, | |
| 'f1_score': bengali_f1, | |
| 'num_classes': len(df_bengali['label'].unique()), | |
| 'samples': len(df_bengali), | |
| 'comparison': bengali_comparison | |
| } | |
| # Save metadata | |
| print(f"\nπΎ Saving metadata...") | |
| metadata = { | |
| 'training_date': time.strftime('%Y-%m-%d %H:%M:%S'), | |
| 'models': results, | |
| 'separate_models': True, | |
| 'algorithms_tested': ['logistic', 'svm', 'random_forest'] | |
| } | |
| with open(os.path.join(MODEL_OUTPUT_PATH, "metadata.json"), 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| # Final Summary | |
| print("\n" + "=" * 70) | |
| print("β Training Complete!") | |
| print("=" * 70) | |
| if 'english' in results: | |
| print(f"\nπ¬π§ English Model:") | |
| print(f" Best Algorithm: {results['english']['best_model'].upper()}") | |
| print(f" F1-Score: {results['english']['f1_score']:.4f}") | |
| print(f" Classes: {results['english']['num_classes']}") | |
| print(f" Samples: {results['english']['samples']:,}") | |
| print(f"\n Model Comparison:") | |
| for model_name, scores in results['english']['comparison'].items(): | |
| print(f" {model_name:<15}: Acc={scores['accuracy']:.4f}, F1={scores['f1_score']:.4f}") | |
| if 'bengali' in results: | |
| print(f"\nπ§π© Bengali Model:") | |
| print(f" Best Algorithm: {results['bengali']['best_model'].upper()}") | |
| print(f" F1-Score: {results['bengali']['f1_score']:.4f}") | |
| print(f" Classes: {results['bengali']['num_classes']}") | |
| print(f" Samples: {results['bengali']['samples']:,}") | |
| print(f"\n Model Comparison:") | |
| for model_name, scores in results['bengali']['comparison'].items(): | |
| print(f" {model_name:<15}: Acc={scores['accuracy']:.4f}, F1={scores['f1_score']:.4f}") | |
| print("\n" + "=" * 70 + "\n") | |
| if __name__ == "__main__": | |
| main() |