""" Training script for HateShield-BN Custom Model Trains SEPARATE models for English and Bengali datasets Compares multiple algorithms and saves the best one """ import pandas as pd import numpy as np from sklearn.model_selection import train_test_split from sklearn.svm import SVC from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.linear_model import LogisticRegression from sklearn.ensemble import RandomForestClassifier from sklearn.svm import LinearSVC from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score import joblib import os from typing import Tuple, Dict import warnings from tqdm import tqdm import time import json warnings.filterwarnings('ignore') # Configuration ENGLISH_DATASET_PATH = "data/english_hate_speech.csv" BENGALI_DATASET_PATH = "data/bengali_hate_speech.csv" MODEL_OUTPUT_PATH = "models/model_weights/custom_models" RANDOM_STATE = 42 def load_english_dataset() -> pd.DataFrame: """Load and preprocess English dataset""" print("šŸ“„ Loading English dataset...") try: df = pd.read_csv(ENGLISH_DATASET_PATH) print(f" āœ“ Loaded: {len(df):,} samples") # Standardize column names if 'content' in df.columns: df = df.rename(columns={'content': 'text'}) elif 'Content' in df.columns: df = df.rename(columns={'Content': 'text'}) # Ensure label column if 'Label' in df.columns: df['label'] = df['Label'].astype(int) elif 'label' in df.columns: df['label'] = df['label'].astype(int) else: raise ValueError("English dataset must have 'Label' or 'label' column") # Keep only text and label df = df[['text', 'label']].copy() # Clean data df = df.dropna(subset=['text', 'label']) df = df[df['text'].str.strip().str.len() > 0] # Ensure binary labels (0, 1) unique_labels = df['label'].unique() print(f" šŸ“Š Unique labels: {sorted(unique_labels)}") if set(unique_labels) == {0, 1}: print(" āœ“ Binary classification: 0=Non-Hate, 1=Hate") else: print(f" āš ļø Warning: Expected binary labels, found: {unique_labels}") # Convert to binary if needed df['label'] = (df['label'] > 0).astype(int) print(f" āœ“ After preprocessing: {len(df):,} samples") return df except FileNotFoundError: print(f" āŒ Error: File not found at {ENGLISH_DATASET_PATH}") return pd.DataFrame(columns=['text', 'label']) except Exception as e: print(f" āŒ Error loading English dataset: {e}") return pd.DataFrame(columns=['text', 'label']) def load_bengali_dataset() -> pd.DataFrame: """Load and preprocess Bengali dataset""" print("\nšŸ“„ Loading Bengali dataset...") try: df = pd.read_csv(BENGALI_DATASET_PATH) print(f" āœ“ Loaded: {len(df):,} samples") # Standardize column names if 'sentence' in df.columns: df = df.rename(columns={'sentence': 'text'}) elif 'sentences' in df.columns: df = df.rename(columns={'sentences': 'text'}) # Convert hate/category to standard labels if 'hate' in df.columns: if 'category' in df.columns: category_map = { 'non-hate': 0, 'offensive': 1, 'hate': 2, } df['label'] = df['category'].map(category_map) # Fill missing with hate column df.loc[df['label'].isna() & (df['hate'] == 1), 'label'] = 2 df.loc[df['label'].isna() & (df['hate'] == 0), 'label'] = 0 else: # If only 'hate' column, map: 0=non-hate, 1=hate (as offensive), 2=hate df['label'] = df['hate'].apply(lambda x: 2 if x == 1 else 0) df['label'] = df['label'].astype(int) df = df[['text', 'label']].copy() # Clean data df = df.dropna(subset=['text', 'label']) df = df[df['text'].str.strip().str.len() > 0] # Ensure multi-class labels (0, 1, 2) unique_labels = df['label'].unique() print(f" šŸ“Š Unique labels: {sorted(unique_labels)}") if set(unique_labels) == {0, 1, 2}: print(" āœ“ Multi-class: 0=Neutral, 1=Offensive, 2=Hate Speech") elif set(unique_labels) == {0, 1}: print(" āš ļø Warning: Only binary labels found, expected 3 classes") else: print(f" āš ļø Warning: Unexpected labels: {unique_labels}") print(f" āœ“ After preprocessing: {len(df):,} samples") return df except FileNotFoundError: print(f" āŒ Error: File not found at {BENGALI_DATASET_PATH}") return pd.DataFrame(columns=['text', 'label']) except Exception as e: print(f" āŒ Error loading Bengali dataset: {e}") return pd.DataFrame(columns=['text', 'label']) def analyze_distribution(df: pd.DataFrame, name: str): """Print dataset statistics""" if len(df) == 0: print(f"\n{'='*50}") print(f"āŒ {name} Dataset: EMPTY") print('='*50) return print(f"\n{'='*50}") print(f"šŸ“Š {name} Dataset Distribution") print('='*50) unique_labels = sorted(df['label'].unique()) print(f"Unique labels: {unique_labels}") print(f"Total samples: {len(df):,}\n") # Dynamic label names if set(unique_labels) == {0, 1}: label_names = {0: 'Non-Hate/Neutral', 1: 'Hate/Offensive'} elif set(unique_labels) == {0, 1, 2}: label_names = {0: 'Neutral', 1: 'Offensive', 2: 'Hate Speech'} else: label_names = {label: f'Class {label}' for label in unique_labels} # Show distribution for label in unique_labels: count = len(df[df['label'] == label]) percentage = count / len(df) * 100 label_name = label_names.get(label, f'Unknown({label})') print(f" {label} - {label_name:20s}: {count:6,} ({percentage:5.1f}%)") # def train_single_model(X_train, X_test, y_train, y_test, model_type: str, language: str) -> Dict: # """Train a single model and return results""" # print(f"\n šŸ”§ Training {model_type.upper()}...") # # Choose model # if model_type == 'logistic': # model = LogisticRegression( # max_iter=1000, # random_state=RANDOM_STATE, # class_weight='balanced', # n_jobs=-1 # ) # elif model_type == 'svm': # model = LinearSVC( # random_state=RANDOM_STATE, # class_weight='balanced', # max_iter=2000 # ) # elif model_type == 'random_forest': # model = RandomForestClassifier( # n_estimators=100, # random_state=RANDOM_STATE, # class_weight='balanced', # n_jobs=-1 # ) # else: # raise ValueError(f"Unknown model type: {model_type}") # # Train # start_time = time.time() # model.fit(X_train, y_train) # y_pred = model.predict(X_test) # training_time = time.time() - start_time # # Evaluate # accuracy = accuracy_score(y_test, y_pred) # f1 = f1_score(y_test, y_pred, average='weighted') # print(f" āœ“ Accuracy: {accuracy:.4f} ({accuracy*100:.2f}%)") # print(f" āœ“ F1-Score: {f1:.4f}") # print(f" āœ“ Time: {training_time:.2f}s") # return { # 'model': model, # 'accuracy': accuracy, # 'f1_score': f1, # 'training_time': training_time, # 'predictions': y_pred # } def train_single_model(X_train, X_test, y_train, y_test, model_type: str, language: str) -> Dict: """Train a single model and return results""" print(f"\n šŸ”§ Training {model_type.upper()}...") # Choose model if model_type == 'logistic': model = LogisticRegression( max_iter=1000, random_state=RANDOM_STATE, class_weight='balanced', n_jobs=-1 ) elif model_type == 'svm': # āœ… Use SVC instead of LinearSVC model = SVC( kernel='linear', probability=True, # āœ… CRITICAL: Enable probability estimates random_state=RANDOM_STATE, class_weight='balanced', max_iter=2000 ) elif model_type == 'random_forest': model = RandomForestClassifier( n_estimators=100, random_state=RANDOM_STATE, class_weight='balanced', n_jobs=-1 ) else: raise ValueError(f"Unknown model type: {model_type}") # āœ… ADD THIS: Train and evaluate start_time = time.time() model.fit(X_train, y_train) y_pred = model.predict(X_test) training_time = time.time() - start_time # Evaluate accuracy = accuracy_score(y_test, y_pred) f1 = f1_score(y_test, y_pred, average='weighted') print(f" āœ“ Accuracy: {accuracy:.4f} ({accuracy*100:.2f}%)") print(f" āœ“ F1-Score: {f1:.4f}") print(f" āœ“ Time: {training_time:.2f}s") # āœ… Verify predict_proba works if hasattr(model, 'predict_proba'): proba = model.predict_proba(X_test[:1]) print(f" āœ… predict_proba: Available (shape: {proba.shape})") else: print(f" āš ļø predict_proba: NOT Available") # āœ… ADD THIS: Return results return { 'model': model, 'accuracy': accuracy, 'f1_score': f1, 'training_time': training_time, 'predictions': y_pred } def train_and_compare_models(X_train, X_test, y_train, y_test, language: str) -> Tuple: """Train multiple models and return the best one""" print(f"\nšŸ¤– Training Multiple Models for {language.upper()}...") print("=" * 60) models_to_train = ['logistic', 'svm'] results = {} # Train all models for model_type in models_to_train: try: result = train_single_model(X_train, X_test, y_train, y_test, model_type, language) results[model_type] = result except Exception as e: print(f" āŒ Error training {model_type}: {e}") continue if not results: print("āŒ No models trained successfully!") return None, None, {} # Compare models print(f"\n{'='*60}") print(f"šŸ“Š Model Comparison for {language.upper()}") print('='*60) print(f"{'Model':<20} {'Accuracy':<12} {'F1-Score':<12} {'Time (s)':<10}") print('-'*60) best_model_name = None best_score = 0 for model_name, result in results.items(): accuracy = result['accuracy'] f1 = result['f1_score'] time_taken = result['training_time'] # Use F1-score as primary metric (better for imbalanced datasets) score = f1 print(f"{model_name:<20} {accuracy:<12.4f} {f1:<12.4f} {time_taken:<10.2f}") if score > best_score: best_score = score best_model_name = model_name print('='*60) print(f"šŸ† Best Model: {best_model_name.upper()} (F1-Score: {best_score:.4f})") print('='*60) # Get best model best_result = results[best_model_name] best_model = best_result['model'] # Detailed report for best model print(f"\nšŸ“ˆ Detailed Report for {best_model_name.upper()}:") unique_labels = sorted(np.unique(y_test)) if set(unique_labels) == {0, 1}: target_names = ['Non-Hate', 'Hate'] elif set(unique_labels) == {0, 1, 2}: target_names = ['Neutral', 'Offensive', 'Hate Speech'] else: target_names = [f'Class {i}' for i in unique_labels] print(classification_report(y_test, best_result['predictions'], target_names=target_names, zero_division=0)) print("šŸ”¢ Confusion Matrix:") print(confusion_matrix(y_test, best_result['predictions'])) # Return comparison data comparison = { model_name: { 'accuracy': result['accuracy'], 'f1_score': result['f1_score'], 'training_time': result['training_time'] } for model_name, result in results.items() } return best_model, best_model_name, comparison def train_language_specific_model(df: pd.DataFrame, language: str): """Train model for specific language with comparison""" print(f"\n{'='*60}") print(f"šŸŽ“ Training {language.upper()} Model") print('='*60) if len(df) == 0: print(f"āŒ No data for {language}!") return None, None, None, None, {} # Analyze distribution analyze_distribution(df, language.capitalize()) # Split data print(f"\nāœ‚ļø Splitting data (80/20 train/test)...") X = df['text'] y = df['label'].astype(int) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y ) print(f" āœ“ Train size: {len(X_train):,}") print(f" āœ“ Test size: {len(X_test):,}") # Create TF-IDF vectorizer print(f"\nšŸ”¤ Creating TF-IDF vectorizer...") vectorizer = TfidfVectorizer( max_features=5000, ngram_range=(1, 2), min_df=2, max_df=0.8, strip_accents='unicode', analyzer='word', token_pattern=r'\w{1,}', sublinear_tf=True ) print(" ā³ Vectorizing text...") X_train_vec = vectorizer.fit_transform(X_train) X_test_vec = vectorizer.transform(X_test) print(f" āœ“ Feature dimension: {X_train_vec.shape[1]:,}") # Train and compare models best_model, best_model_name, comparison = train_and_compare_models( X_train_vec, X_test_vec, y_train, y_test, language ) if best_model is None: return None, None, None, None, {} # Get final accuracy y_pred = best_model.predict(X_test_vec) final_accuracy = accuracy_score(y_test, y_pred) final_f1 = f1_score(y_test, y_pred, average='weighted') return best_model, vectorizer, best_model_name, final_f1, comparison def main(): """Main training pipeline""" print("\n" + "=" * 70) print("šŸ›”ļø HateShield-BN Model Training (Language-Specific with Comparison)") print("=" * 70 + "\n") # Load datasets separately df_english = load_english_dataset() df_bengali = load_bengali_dataset() if len(df_english) == 0 and len(df_bengali) == 0: print("\nāŒ Error: No data found!") return os.makedirs(MODEL_OUTPUT_PATH, exist_ok=True) results = {} # Train English model if len(df_english) > 0: print("\n" + "šŸ‡¬šŸ‡§ " * 35) english_model, english_vectorizer, english_best_name, english_f1, english_comparison = train_language_specific_model( df_english, 'english' ) if english_model is not None: # Save English model print(f"\nšŸ’¾ Saving English model ({english_best_name})...") english_model_path = os.path.join(MODEL_OUTPUT_PATH, "english_model.pkl") english_vec_path = os.path.join(MODEL_OUTPUT_PATH, "english_vectorizer.pkl") joblib.dump(english_model, english_model_path) joblib.dump(english_vectorizer, english_vec_path) print(f" āœ“ Model saved to: {english_model_path}") print(f" āœ“ Vectorizer saved to: {english_vec_path}") results['english'] = { 'best_model': english_best_name, 'f1_score': english_f1, 'num_classes': len(df_english['label'].unique()), 'samples': len(df_english), 'comparison': english_comparison } # Train Bengali model if len(df_bengali) > 0: print("\n" + "šŸ‡§šŸ‡© " * 35) bengali_model, bengali_vectorizer, bengali_best_name, bengali_f1, bengali_comparison = train_language_specific_model( df_bengali, 'bengali' ) if bengali_model is not None: # Save Bengali model print(f"\nšŸ’¾ Saving Bengali model ({bengali_best_name})...") bengali_model_path = os.path.join(MODEL_OUTPUT_PATH, "bengali_model.pkl") bengali_vec_path = os.path.join(MODEL_OUTPUT_PATH, "bengali_vectorizer.pkl") joblib.dump(bengali_model, bengali_model_path) joblib.dump(bengali_vectorizer, bengali_vec_path) print(f" āœ“ Model saved to: {bengali_model_path}") print(f" āœ“ Vectorizer saved to: {bengali_vec_path}") results['bengali'] = { 'best_model': bengali_best_name, 'f1_score': bengali_f1, 'num_classes': len(df_bengali['label'].unique()), 'samples': len(df_bengali), 'comparison': bengali_comparison } # Save metadata print(f"\nšŸ’¾ Saving metadata...") metadata = { 'training_date': time.strftime('%Y-%m-%d %H:%M:%S'), 'models': results, 'separate_models': True, 'algorithms_tested': ['logistic', 'svm', 'random_forest'] } with open(os.path.join(MODEL_OUTPUT_PATH, "metadata.json"), 'w') as f: json.dump(metadata, f, indent=2) # Final Summary print("\n" + "=" * 70) print("āœ… Training Complete!") print("=" * 70) if 'english' in results: print(f"\nšŸ‡¬šŸ‡§ English Model:") print(f" Best Algorithm: {results['english']['best_model'].upper()}") print(f" F1-Score: {results['english']['f1_score']:.4f}") print(f" Classes: {results['english']['num_classes']}") print(f" Samples: {results['english']['samples']:,}") print(f"\n Model Comparison:") for model_name, scores in results['english']['comparison'].items(): print(f" {model_name:<15}: Acc={scores['accuracy']:.4f}, F1={scores['f1_score']:.4f}") if 'bengali' in results: print(f"\nšŸ‡§šŸ‡© Bengali Model:") print(f" Best Algorithm: {results['bengali']['best_model'].upper()}") print(f" F1-Score: {results['bengali']['f1_score']:.4f}") print(f" Classes: {results['bengali']['num_classes']}") print(f" Samples: {results['bengali']['samples']:,}") print(f"\n Model Comparison:") for model_name, scores in results['bengali']['comparison'].items(): print(f" {model_name:<15}: Acc={scores['accuracy']:.4f}, F1={scores['f1_score']:.4f}") print("\n" + "=" * 70 + "\n") if __name__ == "__main__": main()