Hateshield-bn / models /train_model.py
sgAtdbd's picture
Update models/train_model.py
249be5e verified
"""
Training script for HateShield-BN Custom Model
Trains SEPARATE models for English and Bengali datasets
Compares multiple algorithms and saves the best one
"""
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import LinearSVC
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score
import joblib
import os
from typing import Tuple, Dict
import warnings
from tqdm import tqdm
import time
import json
warnings.filterwarnings('ignore')
# Configuration
ENGLISH_DATASET_PATH = "data/english_hate_speech.csv"
BENGALI_DATASET_PATH = "data/bengali_hate_speech.csv"
MODEL_OUTPUT_PATH = "models/model_weights/custom_models"
RANDOM_STATE = 42
def load_english_dataset() -> pd.DataFrame:
"""Load and preprocess English dataset"""
print("πŸ“„ Loading English dataset...")
try:
df = pd.read_csv(ENGLISH_DATASET_PATH)
print(f" βœ“ Loaded: {len(df):,} samples")
# Standardize column names
if 'content' in df.columns:
df = df.rename(columns={'content': 'text'})
elif 'Content' in df.columns:
df = df.rename(columns={'Content': 'text'})
# Ensure label column
if 'Label' in df.columns:
df['label'] = df['Label'].astype(int)
elif 'label' in df.columns:
df['label'] = df['label'].astype(int)
else:
raise ValueError("English dataset must have 'Label' or 'label' column")
# Keep only text and label
df = df[['text', 'label']].copy()
# Clean data
df = df.dropna(subset=['text', 'label'])
df = df[df['text'].str.strip().str.len() > 0]
# Ensure binary labels (0, 1)
unique_labels = df['label'].unique()
print(f" πŸ“Š Unique labels: {sorted(unique_labels)}")
if set(unique_labels) == {0, 1}:
print(" βœ“ Binary classification: 0=Non-Hate, 1=Hate")
else:
print(f" ⚠️ Warning: Expected binary labels, found: {unique_labels}")
# Convert to binary if needed
df['label'] = (df['label'] > 0).astype(int)
print(f" βœ“ After preprocessing: {len(df):,} samples")
return df
except FileNotFoundError:
print(f" ❌ Error: File not found at {ENGLISH_DATASET_PATH}")
return pd.DataFrame(columns=['text', 'label'])
except Exception as e:
print(f" ❌ Error loading English dataset: {e}")
return pd.DataFrame(columns=['text', 'label'])
def load_bengali_dataset() -> pd.DataFrame:
"""Load and preprocess Bengali dataset"""
print("\nπŸ“„ Loading Bengali dataset...")
try:
df = pd.read_csv(BENGALI_DATASET_PATH)
print(f" βœ“ Loaded: {len(df):,} samples")
# Standardize column names
if 'sentence' in df.columns:
df = df.rename(columns={'sentence': 'text'})
elif 'sentences' in df.columns:
df = df.rename(columns={'sentences': 'text'})
# Convert hate/category to standard labels
if 'hate' in df.columns:
if 'category' in df.columns:
category_map = {
'non-hate': 0,
'offensive': 1,
'hate': 2,
}
df['label'] = df['category'].map(category_map)
# Fill missing with hate column
df.loc[df['label'].isna() & (df['hate'] == 1), 'label'] = 2
df.loc[df['label'].isna() & (df['hate'] == 0), 'label'] = 0
else:
# If only 'hate' column, map: 0=non-hate, 1=hate (as offensive), 2=hate
df['label'] = df['hate'].apply(lambda x: 2 if x == 1 else 0)
df['label'] = df['label'].astype(int)
df = df[['text', 'label']].copy()
# Clean data
df = df.dropna(subset=['text', 'label'])
df = df[df['text'].str.strip().str.len() > 0]
# Ensure multi-class labels (0, 1, 2)
unique_labels = df['label'].unique()
print(f" πŸ“Š Unique labels: {sorted(unique_labels)}")
if set(unique_labels) == {0, 1, 2}:
print(" βœ“ Multi-class: 0=Neutral, 1=Offensive, 2=Hate Speech")
elif set(unique_labels) == {0, 1}:
print(" ⚠️ Warning: Only binary labels found, expected 3 classes")
else:
print(f" ⚠️ Warning: Unexpected labels: {unique_labels}")
print(f" βœ“ After preprocessing: {len(df):,} samples")
return df
except FileNotFoundError:
print(f" ❌ Error: File not found at {BENGALI_DATASET_PATH}")
return pd.DataFrame(columns=['text', 'label'])
except Exception as e:
print(f" ❌ Error loading Bengali dataset: {e}")
return pd.DataFrame(columns=['text', 'label'])
def analyze_distribution(df: pd.DataFrame, name: str):
"""Print dataset statistics"""
if len(df) == 0:
print(f"\n{'='*50}")
print(f"❌ {name} Dataset: EMPTY")
print('='*50)
return
print(f"\n{'='*50}")
print(f"πŸ“Š {name} Dataset Distribution")
print('='*50)
unique_labels = sorted(df['label'].unique())
print(f"Unique labels: {unique_labels}")
print(f"Total samples: {len(df):,}\n")
# Dynamic label names
if set(unique_labels) == {0, 1}:
label_names = {0: 'Non-Hate/Neutral', 1: 'Hate/Offensive'}
elif set(unique_labels) == {0, 1, 2}:
label_names = {0: 'Neutral', 1: 'Offensive', 2: 'Hate Speech'}
else:
label_names = {label: f'Class {label}' for label in unique_labels}
# Show distribution
for label in unique_labels:
count = len(df[df['label'] == label])
percentage = count / len(df) * 100
label_name = label_names.get(label, f'Unknown({label})')
print(f" {label} - {label_name:20s}: {count:6,} ({percentage:5.1f}%)")
# def train_single_model(X_train, X_test, y_train, y_test, model_type: str, language: str) -> Dict:
# """Train a single model and return results"""
# print(f"\n πŸ”§ Training {model_type.upper()}...")
# # Choose model
# if model_type == 'logistic':
# model = LogisticRegression(
# max_iter=1000,
# random_state=RANDOM_STATE,
# class_weight='balanced',
# n_jobs=-1
# )
# elif model_type == 'svm':
# model = LinearSVC(
# random_state=RANDOM_STATE,
# class_weight='balanced',
# max_iter=2000
# )
# elif model_type == 'random_forest':
# model = RandomForestClassifier(
# n_estimators=100,
# random_state=RANDOM_STATE,
# class_weight='balanced',
# n_jobs=-1
# )
# else:
# raise ValueError(f"Unknown model type: {model_type}")
# # Train
# start_time = time.time()
# model.fit(X_train, y_train)
# y_pred = model.predict(X_test)
# training_time = time.time() - start_time
# # Evaluate
# accuracy = accuracy_score(y_test, y_pred)
# f1 = f1_score(y_test, y_pred, average='weighted')
# print(f" βœ“ Accuracy: {accuracy:.4f} ({accuracy*100:.2f}%)")
# print(f" βœ“ F1-Score: {f1:.4f}")
# print(f" βœ“ Time: {training_time:.2f}s")
# return {
# 'model': model,
# 'accuracy': accuracy,
# 'f1_score': f1,
# 'training_time': training_time,
# 'predictions': y_pred
# }
def train_single_model(X_train, X_test, y_train, y_test, model_type: str, language: str) -> Dict:
"""Train a single model and return results"""
print(f"\n πŸ”§ Training {model_type.upper()}...")
# Choose model
if model_type == 'logistic':
model = LogisticRegression(
max_iter=1000,
random_state=RANDOM_STATE,
class_weight='balanced',
n_jobs=-1
)
elif model_type == 'svm':
# βœ… Use SVC instead of LinearSVC
model = SVC(
kernel='linear',
probability=True, # βœ… CRITICAL: Enable probability estimates
random_state=RANDOM_STATE,
class_weight='balanced',
max_iter=2000
)
elif model_type == 'random_forest':
model = RandomForestClassifier(
n_estimators=100,
random_state=RANDOM_STATE,
class_weight='balanced',
n_jobs=-1
)
else:
raise ValueError(f"Unknown model type: {model_type}")
# βœ… ADD THIS: Train and evaluate
start_time = time.time()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
training_time = time.time() - start_time
# Evaluate
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='weighted')
print(f" βœ“ Accuracy: {accuracy:.4f} ({accuracy*100:.2f}%)")
print(f" βœ“ F1-Score: {f1:.4f}")
print(f" βœ“ Time: {training_time:.2f}s")
# βœ… Verify predict_proba works
if hasattr(model, 'predict_proba'):
proba = model.predict_proba(X_test[:1])
print(f" βœ… predict_proba: Available (shape: {proba.shape})")
else:
print(f" ⚠️ predict_proba: NOT Available")
# βœ… ADD THIS: Return results
return {
'model': model,
'accuracy': accuracy,
'f1_score': f1,
'training_time': training_time,
'predictions': y_pred
}
def train_and_compare_models(X_train, X_test, y_train, y_test, language: str) -> Tuple:
"""Train multiple models and return the best one"""
print(f"\nπŸ€– Training Multiple Models for {language.upper()}...")
print("=" * 60)
models_to_train = ['logistic', 'svm']
results = {}
# Train all models
for model_type in models_to_train:
try:
result = train_single_model(X_train, X_test, y_train, y_test, model_type, language)
results[model_type] = result
except Exception as e:
print(f" ❌ Error training {model_type}: {e}")
continue
if not results:
print("❌ No models trained successfully!")
return None, None, {}
# Compare models
print(f"\n{'='*60}")
print(f"πŸ“Š Model Comparison for {language.upper()}")
print('='*60)
print(f"{'Model':<20} {'Accuracy':<12} {'F1-Score':<12} {'Time (s)':<10}")
print('-'*60)
best_model_name = None
best_score = 0
for model_name, result in results.items():
accuracy = result['accuracy']
f1 = result['f1_score']
time_taken = result['training_time']
# Use F1-score as primary metric (better for imbalanced datasets)
score = f1
print(f"{model_name:<20} {accuracy:<12.4f} {f1:<12.4f} {time_taken:<10.2f}")
if score > best_score:
best_score = score
best_model_name = model_name
print('='*60)
print(f"πŸ† Best Model: {best_model_name.upper()} (F1-Score: {best_score:.4f})")
print('='*60)
# Get best model
best_result = results[best_model_name]
best_model = best_result['model']
# Detailed report for best model
print(f"\nπŸ“ˆ Detailed Report for {best_model_name.upper()}:")
unique_labels = sorted(np.unique(y_test))
if set(unique_labels) == {0, 1}:
target_names = ['Non-Hate', 'Hate']
elif set(unique_labels) == {0, 1, 2}:
target_names = ['Neutral', 'Offensive', 'Hate Speech']
else:
target_names = [f'Class {i}' for i in unique_labels]
print(classification_report(y_test, best_result['predictions'],
target_names=target_names,
zero_division=0))
print("πŸ”’ Confusion Matrix:")
print(confusion_matrix(y_test, best_result['predictions']))
# Return comparison data
comparison = {
model_name: {
'accuracy': result['accuracy'],
'f1_score': result['f1_score'],
'training_time': result['training_time']
}
for model_name, result in results.items()
}
return best_model, best_model_name, comparison
def train_language_specific_model(df: pd.DataFrame, language: str):
"""Train model for specific language with comparison"""
print(f"\n{'='*60}")
print(f"πŸŽ“ Training {language.upper()} Model")
print('='*60)
if len(df) == 0:
print(f"❌ No data for {language}!")
return None, None, None, None, {}
# Analyze distribution
analyze_distribution(df, language.capitalize())
# Split data
print(f"\nβœ‚οΈ Splitting data (80/20 train/test)...")
X = df['text']
y = df['label'].astype(int)
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=0.2,
random_state=RANDOM_STATE,
stratify=y
)
print(f" βœ“ Train size: {len(X_train):,}")
print(f" βœ“ Test size: {len(X_test):,}")
# Create TF-IDF vectorizer
print(f"\nπŸ”€ Creating TF-IDF vectorizer...")
vectorizer = TfidfVectorizer(
max_features=5000,
ngram_range=(1, 2),
min_df=2,
max_df=0.8,
strip_accents='unicode',
analyzer='word',
token_pattern=r'\w{1,}',
sublinear_tf=True
)
print(" ⏳ Vectorizing text...")
X_train_vec = vectorizer.fit_transform(X_train)
X_test_vec = vectorizer.transform(X_test)
print(f" βœ“ Feature dimension: {X_train_vec.shape[1]:,}")
# Train and compare models
best_model, best_model_name, comparison = train_and_compare_models(
X_train_vec, X_test_vec, y_train, y_test, language
)
if best_model is None:
return None, None, None, None, {}
# Get final accuracy
y_pred = best_model.predict(X_test_vec)
final_accuracy = accuracy_score(y_test, y_pred)
final_f1 = f1_score(y_test, y_pred, average='weighted')
return best_model, vectorizer, best_model_name, final_f1, comparison
def main():
"""Main training pipeline"""
print("\n" + "=" * 70)
print("πŸ›‘οΈ HateShield-BN Model Training (Language-Specific with Comparison)")
print("=" * 70 + "\n")
# Load datasets separately
df_english = load_english_dataset()
df_bengali = load_bengali_dataset()
if len(df_english) == 0 and len(df_bengali) == 0:
print("\n❌ Error: No data found!")
return
os.makedirs(MODEL_OUTPUT_PATH, exist_ok=True)
results = {}
# Train English model
if len(df_english) > 0:
print("\n" + "πŸ‡¬πŸ‡§ " * 35)
english_model, english_vectorizer, english_best_name, english_f1, english_comparison = train_language_specific_model(
df_english, 'english'
)
if english_model is not None:
# Save English model
print(f"\nπŸ’Ύ Saving English model ({english_best_name})...")
english_model_path = os.path.join(MODEL_OUTPUT_PATH, "english_model.pkl")
english_vec_path = os.path.join(MODEL_OUTPUT_PATH, "english_vectorizer.pkl")
joblib.dump(english_model, english_model_path)
joblib.dump(english_vectorizer, english_vec_path)
print(f" βœ“ Model saved to: {english_model_path}")
print(f" βœ“ Vectorizer saved to: {english_vec_path}")
results['english'] = {
'best_model': english_best_name,
'f1_score': english_f1,
'num_classes': len(df_english['label'].unique()),
'samples': len(df_english),
'comparison': english_comparison
}
# Train Bengali model
if len(df_bengali) > 0:
print("\n" + "πŸ‡§πŸ‡© " * 35)
bengali_model, bengali_vectorizer, bengali_best_name, bengali_f1, bengali_comparison = train_language_specific_model(
df_bengali, 'bengali'
)
if bengali_model is not None:
# Save Bengali model
print(f"\nπŸ’Ύ Saving Bengali model ({bengali_best_name})...")
bengali_model_path = os.path.join(MODEL_OUTPUT_PATH, "bengali_model.pkl")
bengali_vec_path = os.path.join(MODEL_OUTPUT_PATH, "bengali_vectorizer.pkl")
joblib.dump(bengali_model, bengali_model_path)
joblib.dump(bengali_vectorizer, bengali_vec_path)
print(f" βœ“ Model saved to: {bengali_model_path}")
print(f" βœ“ Vectorizer saved to: {bengali_vec_path}")
results['bengali'] = {
'best_model': bengali_best_name,
'f1_score': bengali_f1,
'num_classes': len(df_bengali['label'].unique()),
'samples': len(df_bengali),
'comparison': bengali_comparison
}
# Save metadata
print(f"\nπŸ’Ύ Saving metadata...")
metadata = {
'training_date': time.strftime('%Y-%m-%d %H:%M:%S'),
'models': results,
'separate_models': True,
'algorithms_tested': ['logistic', 'svm', 'random_forest']
}
with open(os.path.join(MODEL_OUTPUT_PATH, "metadata.json"), 'w') as f:
json.dump(metadata, f, indent=2)
# Final Summary
print("\n" + "=" * 70)
print("βœ… Training Complete!")
print("=" * 70)
if 'english' in results:
print(f"\nπŸ‡¬πŸ‡§ English Model:")
print(f" Best Algorithm: {results['english']['best_model'].upper()}")
print(f" F1-Score: {results['english']['f1_score']:.4f}")
print(f" Classes: {results['english']['num_classes']}")
print(f" Samples: {results['english']['samples']:,}")
print(f"\n Model Comparison:")
for model_name, scores in results['english']['comparison'].items():
print(f" {model_name:<15}: Acc={scores['accuracy']:.4f}, F1={scores['f1_score']:.4f}")
if 'bengali' in results:
print(f"\nπŸ‡§πŸ‡© Bengali Model:")
print(f" Best Algorithm: {results['bengali']['best_model'].upper()}")
print(f" F1-Score: {results['bengali']['f1_score']:.4f}")
print(f" Classes: {results['bengali']['num_classes']}")
print(f" Samples: {results['bengali']['samples']:,}")
print(f"\n Model Comparison:")
for model_name, scores in results['bengali']['comparison'].items():
print(f" {model_name:<15}: Acc={scores['accuracy']:.4f}, F1={scores['f1_score']:.4f}")
print("\n" + "=" * 70 + "\n")
if __name__ == "__main__":
main()