Hateshield-bn / models /hate_speech_classifier.py
sgAtdbd's picture
Update models/hate_speech_classifier.py
33dfaba verified
from typing import Dict, Optional
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer
import joblib
import os
import re
import torch
from deep_translator import GoogleTranslator
class HateSpeechClassifier:
def __init__(self):
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
models_dir = os.path.join(base_dir, "models", "model_weights", "custom_models")
# Initialize translator
self.translator = GoogleTranslator(source='bn', target='en')
# Use multiple pretrained models for better accuracy
self.pretrained_models = {
"primary": {
"name": "facebook/roberta-hate-speech-dynabench-r4-target",
"pipeline": None,
"weight": 0.6
},
"secondary": {
"name": "cardiffnlp/twitter-roberta-base-hate-latest",
"pipeline": None,
"weight": 0.4
}
}
# English custom model paths
self.english_model_path = os.path.join(models_dir, "english_model.pkl")
self.english_vectorizer_path = os.path.join(models_dir, "english_vectorizer.pkl")
self.english_model = None
self.english_vectorizer = None
self.english_model_loaded = False
# Bengali custom model paths
self.bengali_model_path = os.path.join(models_dir, "bengali_model.pkl")
self.bengali_vectorizer_path = os.path.join(models_dir, "bengali_vectorizer.pkl")
self.bengali_model = None
self.bengali_vectorizer = None
self.bengali_model_loaded = False
# Load models
self._load_custom_models()
# Enhanced hate keywords
self.hate_keywords = {
"english": [
"hate", "kill", "death", "violence", "murder", "attack", "destroy", "eliminate",
"die", "dead", "shoot", "stab", "burn", "hang", "lynch",
"terrorist", "racist", "sexist", "discrimination", "discriminate",
"scheduled caste", "scheduled tribe", "dalit", "lower caste", "untouchable",
"chamar", "bhangi", "sc/st", "reservation quota",
"no right to live", "don't deserve", "shouldn't exist", "subhuman",
"inferior", "worthless", "scum", "vermin", "parasite",
"should be killed", "must die", "deserve to die", "need to be eliminated",
"jihadi", "kafir", "infidel", "terrorist religion", "religious extremist",
"nigger", "chink", "paki", "kike", "faggot", "tranny"
],
"bengali": [
"শালা", "হালা", "মাগি", "কুত্তা", "হারামি", "চোদ", "বাল",
"ঘৃণা", "মারো", "মৃত্যু", "সন্ত্রাসী", "বোকা", "মূর্খ",
"বিদ্বেষ", "ভয়ঙ্কর", "জঘন্য", "হত্যা", "আক্রমণ",
"দলিত", "নিম্নবর্ণ", "অস্পৃশ্য"
]
}
self.hate_patterns = {
"english": [
r"no right to (live|exist|be here|survive)",
r"(should|must|need to|ought to) (die|be killed|be eliminated|perish)",
r"don'?t deserve (to live|life|existence|to exist)",
r"(get rid of|eliminate|exterminate|wipe out) (them|these|those|the)",
r"(scheduled caste|dalit|lower caste|sc/st).{0,50}(no right|shouldn't|don't deserve)",
r"(religious|ethnic|caste|racial) (cleansing|purification|genocide)",
r"(send|throw|kick|drive) (them|back) (out|away|home)",
r"(all|these) .{0,30} (should die|must be killed|need to go)",
r"(death to|kill all|eliminate all) .{0,30}",
r"(inferior|subhuman|less than human|not human)",
],
"bengali": [
r"বাঁচার অধিকার নেই",
r"মরে যাওয়া উচিত",
r"নিশ্চিহ্ন করা উচিত"
]
}
self.offensive_keywords = {
"english": [
"damn", "hell", "crap", "suck", "dumb", "loser", "trash",
"stupid", "idiot", "moron", "pathetic", "bad", "ugly",
"disgusting", "nasty", "filthy", "asshole", "bitch", "bastard"
],
"bengali": ["বাজে", "খারাপ", "নোংরা", "বেকুব"]
}
def _translate_to_english(self, text: str) -> Optional[str]:
"""Translate Bengali to English using deep-translator"""
try:
print(f"🔄 Translating Bengali text to English...")
# deep-translator has a 5000 character limit per request
max_chars = 4500
if len(text) > max_chars:
text_to_translate = text[:max_chars]
print(f"⚠️ Text truncated to {max_chars} characters for translation")
else:
text_to_translate = text
# Translate using Google Translate
translated_text = self.translator.translate(text_to_translate)
print(f"✓ Translation successful")
print(f" Original (Bengali): {text_to_translate[:100]}...")
print(f" Translated (English): {translated_text[:100]}...")
return translated_text
except Exception as e:
print(f"❌ Translation failed: {e}")
# Try splitting into smaller chunks if it fails
try:
print("🔄 Retrying with smaller chunks...")
words = text.split()
chunks = []
current_chunk = []
current_length = 0
for word in words:
if current_length + len(word) > 1000: # Smaller chunks
if current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [word]
current_length = len(word)
else:
current_chunk.append(word)
current_length += len(word) + 1
if current_chunk:
chunks.append(' '.join(current_chunk))
translated_chunks = []
for chunk in chunks[:5]: # Translate max 5 chunks
translated_chunk = self.translator.translate(chunk)
translated_chunks.append(translated_chunk)
translated_text = ' '.join(translated_chunks)
print(f"✓ Translation successful with chunking")
return translated_text
except Exception as e2:
print(f"❌ Translation with chunking also failed: {e2}")
return None
def _load_custom_models(self):
"""Load language-specific custom models"""
try:
if os.path.exists(self.english_model_path) and os.path.exists(self.english_vectorizer_path):
print("Loading English custom model...")
self.english_model = joblib.load(self.english_model_path)
self.english_vectorizer = joblib.load(self.english_vectorizer_path)
self.english_model_loaded = True
print("✓ English custom model loaded")
else:
print("❌ English custom model not found")
self.english_model_loaded = False
except Exception as e:
print(f"❌ Error loading English model: {e}")
self.english_model_loaded = False
try:
if os.path.exists(self.bengali_model_path) and os.path.exists(self.bengali_vectorizer_path):
print("Loading Bengali custom model...")
self.bengali_model = joblib.load(self.bengali_model_path)
self.bengali_vectorizer = joblib.load(self.bengali_vectorizer_path)
self.bengali_model_loaded = True
print("✓ Bengali custom model loaded")
else:
print("❌ Bengali custom model not found")
self.bengali_model_loaded = False
except Exception as e:
print(f"❌ Error loading Bengali model: {e}")
self.bengali_model_loaded = False
def _load_pretrained_model(self, model_key: str):
"""Lazy load pretrained model"""
model_info = self.pretrained_models.get(model_key)
if not model_info:
return
if model_info["pipeline"] is None:
try:
print(f"Loading {model_key} pretrained model: {model_info['name']}...")
model_info["pipeline"] = pipeline(
"text-classification",
model=model_info["name"],
device=-1,
top_k=None,
truncation=True,
max_length=512
)
print(f"✓ {model_key} pretrained model loaded")
except Exception as e:
print(f"❌ Error loading {model_key} pretrained model: {e}")
model_info["pipeline"] = None
# async def classify_with_custom_model(self, text: str, language: str) -> Dict:
# """Classify using language-specific custom model"""
# if language == "english":
# if not self.english_model_loaded:
# return None
# model = self.english_model
# vectorizer = self.english_vectorizer
# elif language == "bengali":
# if not self.bengali_model_loaded:
# return None
# model = self.bengali_model
# vectorizer = self.bengali_vectorizer
# else:
# return None
# try:
# X = vectorizer.transform([text])
# prediction = model.predict(X)[0]
# if hasattr(model, 'predict_proba'):
# probabilities = model.predict_proba(X)[0]
# confidence = float(max(probabilities))
# else:
# confidence = 0.75
# if language == "english":
# if prediction == 0:
# category = "neutral"
# else:
# category = "hate_speech"
# else:
# if prediction == 0:
# category = "neutral"
# elif prediction == 1:
# category = "offensive"
# else:
# category = "hate_speech"
# return {
# "category": category,
# "confidence": confidence,
# "method": f"custom_model_{language}",
# "raw_prediction": int(prediction)
# }
# except Exception as e:
# print(f"❌ Custom model classification failed: {e}")
# return None
# async def classify_with_custom_model(self, text: str, language: str) -> Dict:
# """Classify using language-specific custom model"""
# if language == "english":
# if not self.english_model_loaded:
# return None
# model = self.english_model
# vectorizer = self.english_vectorizer
# elif language == "bengali":
# if not self.bengali_model_loaded:
# return None
# model = self.bengali_model
# vectorizer = self.bengali_vectorizer
# else:
# return None
# try:
# X = vectorizer.transform([text])
# prediction = model.predict(X)[0]
# if hasattr(model, 'predict_proba'):
# probabilities = model.predict_proba(X)[0]
# # ✅ FIX: Use probability of the PREDICTED class, not max
# confidence = float(probabilities[prediction])
# # Debug logging
# print(f"🔍 Custom Model Debug:")
# print(f" Prediction: {prediction}")
# print(f" Probabilities: {probabilities}")
# print(f" Confidence: {confidence:.4f}")
# else:
# confidence = 0.75
# if language == "english":
# if prediction == 0:
# category = "neutral"
# else:
# category = "hate_speech"
# else:
# if prediction == 0:
# category = "neutral"
# elif prediction == 1:
# category = "offensive"
# else:
# category = "hate_speech"
# return {
# "category": category,
# "confidence": confidence,
# "method": f"custom_model_{language}",
# "raw_prediction": int(prediction),
# "probabilities": probabilities.tolist() if hasattr(model, 'predict_proba') else None
# }
# except Exception as e:
# print(f"❌ Custom model classification failed: {e}")
# import traceback
# traceback.print_exc()
# return None
async def classify_with_custom_model(self, text: str, language: str) -> Dict:
"""Classify using language-specific custom model"""
if language == "english":
if not self.english_model_loaded:
print("❌ English model not loaded, returning None")
return None
model = self.english_model
vectorizer = self.english_vectorizer
elif language == "bengali":
if not self.bengali_model_loaded:
print("❌ Bengali model not loaded, returning None")
return None
model = self.bengali_model
vectorizer = self.bengali_vectorizer
else:
return None
try:
# Debug: Check model type
print(f"🔍 Model type: {type(model)}")
print(f"🔍 Has predict_proba: {hasattr(model, 'predict_proba')}")
X = vectorizer.transform([text])
prediction = model.predict(X)[0]
print(f"🔍 Raw prediction: {prediction}")
if hasattr(model, 'predict_proba'):
probabilities = model.predict_proba(X)[0]
confidence = float(probabilities[prediction])
print(f"🔍 Custom Model Debug:")
print(f" Prediction: {prediction}")
print(f" Probabilities: {probabilities}")
print(f" Confidence (probabilities[{prediction}]): {confidence:.4f}")
else:
print("⚠️ Model doesn't have predict_proba, using fallback 0.75")
confidence = 0.75
if language == "english":
if prediction == 0:
category = "neutral"
else:
category = "hate_speech"
else:
if prediction == 0:
category = "neutral"
elif prediction == 1:
category = "offensive"
else:
category = "hate_speech"
return {
"category": category,
"confidence": confidence,
"method": f"custom_model_{language}",
"raw_prediction": int(prediction),
"probabilities": probabilities.tolist() if hasattr(model, 'predict_proba') else None
}
except Exception as e:
print(f"❌ Custom model classification failed: {e}")
import traceback
traceback.print_exc()
return None
async def classify_with_pretrained_model(self, text: str, language: str = "english") -> Dict:
"""Classify using ensemble of pretrained models with translation support"""
# Translate Bengali text to English
translated_text = None
if language == "bengali":
translated_text = self._translate_to_english(text)
if not translated_text:
print("❌ Translation failed, skipping pretrained models")
return None
text_to_analyze = translated_text
else:
text_to_analyze = text
results = []
# For long texts, analyze first 400 words
words = text_to_analyze.split()
if len(words) > 400:
truncated_text = ' '.join(words[:400])
print(f"⚠️ Text too long ({len(words)} words), analyzing first 400 words")
else:
truncated_text = text_to_analyze
# Try primary model
self._load_pretrained_model("primary")
primary = self.pretrained_models["primary"]
if primary["pipeline"] is not None:
try:
result = primary["pipeline"](truncated_text)[0]
if isinstance(result, list):
result = result[0]
label = result['label'].lower()
confidence = float(result['score'])
if 'hate' in label and 'not' not in label:
category = "hate_speech"
elif 'not' in label or 'non' in label:
category = "neutral"
else:
category = "offensive"
results.append({
"category": category,
"confidence": confidence,
"weight": primary["weight"],
"model": "primary",
"raw_label": result['label']
})
print(f"[Primary Model] {result['label']} -> {category} ({confidence:.2%})")
except Exception as e:
print(f"❌ Primary model failed: {e}")
# Try secondary model
self._load_pretrained_model("secondary")
secondary = self.pretrained_models["secondary"]
if secondary["pipeline"] is not None:
try:
result = secondary["pipeline"](truncated_text)[0]
if isinstance(result, list):
result = result[0]
label = result['label'].lower()
confidence = float(result['score'])
if 'hate' in label:
category = "hate_speech"
elif 'offensive' in label:
category = "offensive"
else:
category = "neutral"
results.append({
"category": category,
"confidence": confidence,
"weight": secondary["weight"],
"model": "secondary",
"raw_label": result['label']
})
print(f"[Secondary Model] {result['label']} -> {category} ({confidence:.2%})")
except Exception as e:
print(f"❌ Secondary model failed: {e}")
if not results:
return None
# Ensemble voting
category_scores = {}
for result in results:
cat = result["category"]
score = result["confidence"] * result["weight"]
category_scores[cat] = category_scores.get(cat, 0) + score
final_category = max(category_scores, key=category_scores.get)
total_weight = sum(r["weight"] for r in results)
final_confidence = category_scores[final_category] / total_weight
raw_labels = [r["raw_label"] for r in results]
return {
"category": final_category,
"confidence": final_confidence,
"method": "pretrained_ensemble",
"raw_labels": raw_labels,
"models_used": [r["model"] for r in results],
"translated": language == "bengali",
"translated_text": translated_text[:200] + "..." if translated_text and len(translated_text) > 200 else translated_text
}
def classify_with_keywords(self, text: str, language: str) -> Dict:
"""Classify using keyword and pattern matching"""
text_lower = text.lower()
hate_count = sum(1 for keyword in self.hate_keywords.get(language, [])
if keyword.lower() in text_lower)
offensive_count = sum(1 for keyword in self.offensive_keywords.get(language, [])
if keyword.lower() in text_lower)
pattern_matches = []
matched_patterns = []
for pattern in self.hate_patterns.get(language, []):
match = re.search(pattern, text_lower, re.IGNORECASE)
if match:
pattern_matches.append(pattern)
matched_patterns.append(match.group(0))
if pattern_matches or hate_count > 0:
category = "hate_speech"
base_confidence = 0.90 if pattern_matches else 0.7
confidence = min(base_confidence + (hate_count * 0.03), 0.98)
elif offensive_count > 0:
category = "offensive"
confidence = min(0.6 + (offensive_count * 0.08), 0.88)
else:
category = "neutral"
confidence = 0.7
detected_keywords = []
for keyword in self.hate_keywords.get(language, []):
if keyword.lower() in text_lower:
detected_keywords.append(keyword)
for keyword in self.offensive_keywords.get(language, []):
if keyword.lower() in text_lower:
detected_keywords.append(keyword)
return {
"category": category,
"confidence": confidence,
"method": "keyword_matching",
"detected_keywords": detected_keywords,
"hate_count": hate_count,
"offensive_count": offensive_count,
"pattern_matches": len(pattern_matches),
"matched_patterns": matched_patterns[:3]
}