Spaces:
Sleeping
Sleeping
| from typing import Dict, Optional | |
| from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer | |
| import joblib | |
| import os | |
| import re | |
| import torch | |
| from deep_translator import GoogleTranslator | |
| class HateSpeechClassifier: | |
| def __init__(self): | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| models_dir = os.path.join(base_dir, "models", "model_weights", "custom_models") | |
| # Initialize translator | |
| self.translator = GoogleTranslator(source='bn', target='en') | |
| # Use multiple pretrained models for better accuracy | |
| self.pretrained_models = { | |
| "primary": { | |
| "name": "facebook/roberta-hate-speech-dynabench-r4-target", | |
| "pipeline": None, | |
| "weight": 0.6 | |
| }, | |
| "secondary": { | |
| "name": "cardiffnlp/twitter-roberta-base-hate-latest", | |
| "pipeline": None, | |
| "weight": 0.4 | |
| } | |
| } | |
| # English custom model paths | |
| self.english_model_path = os.path.join(models_dir, "english_model.pkl") | |
| self.english_vectorizer_path = os.path.join(models_dir, "english_vectorizer.pkl") | |
| self.english_model = None | |
| self.english_vectorizer = None | |
| self.english_model_loaded = False | |
| # Bengali custom model paths | |
| self.bengali_model_path = os.path.join(models_dir, "bengali_model.pkl") | |
| self.bengali_vectorizer_path = os.path.join(models_dir, "bengali_vectorizer.pkl") | |
| self.bengali_model = None | |
| self.bengali_vectorizer = None | |
| self.bengali_model_loaded = False | |
| # Load models | |
| self._load_custom_models() | |
| # Enhanced hate keywords | |
| self.hate_keywords = { | |
| "english": [ | |
| "hate", "kill", "death", "violence", "murder", "attack", "destroy", "eliminate", | |
| "die", "dead", "shoot", "stab", "burn", "hang", "lynch", | |
| "terrorist", "racist", "sexist", "discrimination", "discriminate", | |
| "scheduled caste", "scheduled tribe", "dalit", "lower caste", "untouchable", | |
| "chamar", "bhangi", "sc/st", "reservation quota", | |
| "no right to live", "don't deserve", "shouldn't exist", "subhuman", | |
| "inferior", "worthless", "scum", "vermin", "parasite", | |
| "should be killed", "must die", "deserve to die", "need to be eliminated", | |
| "jihadi", "kafir", "infidel", "terrorist religion", "religious extremist", | |
| "nigger", "chink", "paki", "kike", "faggot", "tranny" | |
| ], | |
| "bengali": [ | |
| "শালা", "হালা", "মাগি", "কুত্তা", "হারামি", "চোদ", "বাল", | |
| "ঘৃণা", "মারো", "মৃত্যু", "সন্ত্রাসী", "বোকা", "মূর্খ", | |
| "বিদ্বেষ", "ভয়ঙ্কর", "জঘন্য", "হত্যা", "আক্রমণ", | |
| "দলিত", "নিম্নবর্ণ", "অস্পৃশ্য" | |
| ] | |
| } | |
| self.hate_patterns = { | |
| "english": [ | |
| r"no right to (live|exist|be here|survive)", | |
| r"(should|must|need to|ought to) (die|be killed|be eliminated|perish)", | |
| r"don'?t deserve (to live|life|existence|to exist)", | |
| r"(get rid of|eliminate|exterminate|wipe out) (them|these|those|the)", | |
| r"(scheduled caste|dalit|lower caste|sc/st).{0,50}(no right|shouldn't|don't deserve)", | |
| r"(religious|ethnic|caste|racial) (cleansing|purification|genocide)", | |
| r"(send|throw|kick|drive) (them|back) (out|away|home)", | |
| r"(all|these) .{0,30} (should die|must be killed|need to go)", | |
| r"(death to|kill all|eliminate all) .{0,30}", | |
| r"(inferior|subhuman|less than human|not human)", | |
| ], | |
| "bengali": [ | |
| r"বাঁচার অধিকার নেই", | |
| r"মরে যাওয়া উচিত", | |
| r"নিশ্চিহ্ন করা উচিত" | |
| ] | |
| } | |
| self.offensive_keywords = { | |
| "english": [ | |
| "damn", "hell", "crap", "suck", "dumb", "loser", "trash", | |
| "stupid", "idiot", "moron", "pathetic", "bad", "ugly", | |
| "disgusting", "nasty", "filthy", "asshole", "bitch", "bastard" | |
| ], | |
| "bengali": ["বাজে", "খারাপ", "নোংরা", "বেকুব"] | |
| } | |
| def _translate_to_english(self, text: str) -> Optional[str]: | |
| """Translate Bengali to English using deep-translator""" | |
| try: | |
| print(f"🔄 Translating Bengali text to English...") | |
| # deep-translator has a 5000 character limit per request | |
| max_chars = 4500 | |
| if len(text) > max_chars: | |
| text_to_translate = text[:max_chars] | |
| print(f"⚠️ Text truncated to {max_chars} characters for translation") | |
| else: | |
| text_to_translate = text | |
| # Translate using Google Translate | |
| translated_text = self.translator.translate(text_to_translate) | |
| print(f"✓ Translation successful") | |
| print(f" Original (Bengali): {text_to_translate[:100]}...") | |
| print(f" Translated (English): {translated_text[:100]}...") | |
| return translated_text | |
| except Exception as e: | |
| print(f"❌ Translation failed: {e}") | |
| # Try splitting into smaller chunks if it fails | |
| try: | |
| print("🔄 Retrying with smaller chunks...") | |
| words = text.split() | |
| chunks = [] | |
| current_chunk = [] | |
| current_length = 0 | |
| for word in words: | |
| if current_length + len(word) > 1000: # Smaller chunks | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| current_chunk = [word] | |
| current_length = len(word) | |
| else: | |
| current_chunk.append(word) | |
| current_length += len(word) + 1 | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| translated_chunks = [] | |
| for chunk in chunks[:5]: # Translate max 5 chunks | |
| translated_chunk = self.translator.translate(chunk) | |
| translated_chunks.append(translated_chunk) | |
| translated_text = ' '.join(translated_chunks) | |
| print(f"✓ Translation successful with chunking") | |
| return translated_text | |
| except Exception as e2: | |
| print(f"❌ Translation with chunking also failed: {e2}") | |
| return None | |
| def _load_custom_models(self): | |
| """Load language-specific custom models""" | |
| try: | |
| if os.path.exists(self.english_model_path) and os.path.exists(self.english_vectorizer_path): | |
| print("Loading English custom model...") | |
| self.english_model = joblib.load(self.english_model_path) | |
| self.english_vectorizer = joblib.load(self.english_vectorizer_path) | |
| self.english_model_loaded = True | |
| print("✓ English custom model loaded") | |
| else: | |
| print("❌ English custom model not found") | |
| self.english_model_loaded = False | |
| except Exception as e: | |
| print(f"❌ Error loading English model: {e}") | |
| self.english_model_loaded = False | |
| try: | |
| if os.path.exists(self.bengali_model_path) and os.path.exists(self.bengali_vectorizer_path): | |
| print("Loading Bengali custom model...") | |
| self.bengali_model = joblib.load(self.bengali_model_path) | |
| self.bengali_vectorizer = joblib.load(self.bengali_vectorizer_path) | |
| self.bengali_model_loaded = True | |
| print("✓ Bengali custom model loaded") | |
| else: | |
| print("❌ Bengali custom model not found") | |
| self.bengali_model_loaded = False | |
| except Exception as e: | |
| print(f"❌ Error loading Bengali model: {e}") | |
| self.bengali_model_loaded = False | |
| def _load_pretrained_model(self, model_key: str): | |
| """Lazy load pretrained model""" | |
| model_info = self.pretrained_models.get(model_key) | |
| if not model_info: | |
| return | |
| if model_info["pipeline"] is None: | |
| try: | |
| print(f"Loading {model_key} pretrained model: {model_info['name']}...") | |
| model_info["pipeline"] = pipeline( | |
| "text-classification", | |
| model=model_info["name"], | |
| device=-1, | |
| top_k=None, | |
| truncation=True, | |
| max_length=512 | |
| ) | |
| print(f"✓ {model_key} pretrained model loaded") | |
| except Exception as e: | |
| print(f"❌ Error loading {model_key} pretrained model: {e}") | |
| model_info["pipeline"] = None | |
| # async def classify_with_custom_model(self, text: str, language: str) -> Dict: | |
| # """Classify using language-specific custom model""" | |
| # if language == "english": | |
| # if not self.english_model_loaded: | |
| # return None | |
| # model = self.english_model | |
| # vectorizer = self.english_vectorizer | |
| # elif language == "bengali": | |
| # if not self.bengali_model_loaded: | |
| # return None | |
| # model = self.bengali_model | |
| # vectorizer = self.bengali_vectorizer | |
| # else: | |
| # return None | |
| # try: | |
| # X = vectorizer.transform([text]) | |
| # prediction = model.predict(X)[0] | |
| # if hasattr(model, 'predict_proba'): | |
| # probabilities = model.predict_proba(X)[0] | |
| # confidence = float(max(probabilities)) | |
| # else: | |
| # confidence = 0.75 | |
| # if language == "english": | |
| # if prediction == 0: | |
| # category = "neutral" | |
| # else: | |
| # category = "hate_speech" | |
| # else: | |
| # if prediction == 0: | |
| # category = "neutral" | |
| # elif prediction == 1: | |
| # category = "offensive" | |
| # else: | |
| # category = "hate_speech" | |
| # return { | |
| # "category": category, | |
| # "confidence": confidence, | |
| # "method": f"custom_model_{language}", | |
| # "raw_prediction": int(prediction) | |
| # } | |
| # except Exception as e: | |
| # print(f"❌ Custom model classification failed: {e}") | |
| # return None | |
| # async def classify_with_custom_model(self, text: str, language: str) -> Dict: | |
| # """Classify using language-specific custom model""" | |
| # if language == "english": | |
| # if not self.english_model_loaded: | |
| # return None | |
| # model = self.english_model | |
| # vectorizer = self.english_vectorizer | |
| # elif language == "bengali": | |
| # if not self.bengali_model_loaded: | |
| # return None | |
| # model = self.bengali_model | |
| # vectorizer = self.bengali_vectorizer | |
| # else: | |
| # return None | |
| # try: | |
| # X = vectorizer.transform([text]) | |
| # prediction = model.predict(X)[0] | |
| # if hasattr(model, 'predict_proba'): | |
| # probabilities = model.predict_proba(X)[0] | |
| # # ✅ FIX: Use probability of the PREDICTED class, not max | |
| # confidence = float(probabilities[prediction]) | |
| # # Debug logging | |
| # print(f"🔍 Custom Model Debug:") | |
| # print(f" Prediction: {prediction}") | |
| # print(f" Probabilities: {probabilities}") | |
| # print(f" Confidence: {confidence:.4f}") | |
| # else: | |
| # confidence = 0.75 | |
| # if language == "english": | |
| # if prediction == 0: | |
| # category = "neutral" | |
| # else: | |
| # category = "hate_speech" | |
| # else: | |
| # if prediction == 0: | |
| # category = "neutral" | |
| # elif prediction == 1: | |
| # category = "offensive" | |
| # else: | |
| # category = "hate_speech" | |
| # return { | |
| # "category": category, | |
| # "confidence": confidence, | |
| # "method": f"custom_model_{language}", | |
| # "raw_prediction": int(prediction), | |
| # "probabilities": probabilities.tolist() if hasattr(model, 'predict_proba') else None | |
| # } | |
| # except Exception as e: | |
| # print(f"❌ Custom model classification failed: {e}") | |
| # import traceback | |
| # traceback.print_exc() | |
| # return None | |
| async def classify_with_custom_model(self, text: str, language: str) -> Dict: | |
| """Classify using language-specific custom model""" | |
| if language == "english": | |
| if not self.english_model_loaded: | |
| print("❌ English model not loaded, returning None") | |
| return None | |
| model = self.english_model | |
| vectorizer = self.english_vectorizer | |
| elif language == "bengali": | |
| if not self.bengali_model_loaded: | |
| print("❌ Bengali model not loaded, returning None") | |
| return None | |
| model = self.bengali_model | |
| vectorizer = self.bengali_vectorizer | |
| else: | |
| return None | |
| try: | |
| # Debug: Check model type | |
| print(f"🔍 Model type: {type(model)}") | |
| print(f"🔍 Has predict_proba: {hasattr(model, 'predict_proba')}") | |
| X = vectorizer.transform([text]) | |
| prediction = model.predict(X)[0] | |
| print(f"🔍 Raw prediction: {prediction}") | |
| if hasattr(model, 'predict_proba'): | |
| probabilities = model.predict_proba(X)[0] | |
| confidence = float(probabilities[prediction]) | |
| print(f"🔍 Custom Model Debug:") | |
| print(f" Prediction: {prediction}") | |
| print(f" Probabilities: {probabilities}") | |
| print(f" Confidence (probabilities[{prediction}]): {confidence:.4f}") | |
| else: | |
| print("⚠️ Model doesn't have predict_proba, using fallback 0.75") | |
| confidence = 0.75 | |
| if language == "english": | |
| if prediction == 0: | |
| category = "neutral" | |
| else: | |
| category = "hate_speech" | |
| else: | |
| if prediction == 0: | |
| category = "neutral" | |
| elif prediction == 1: | |
| category = "offensive" | |
| else: | |
| category = "hate_speech" | |
| return { | |
| "category": category, | |
| "confidence": confidence, | |
| "method": f"custom_model_{language}", | |
| "raw_prediction": int(prediction), | |
| "probabilities": probabilities.tolist() if hasattr(model, 'predict_proba') else None | |
| } | |
| except Exception as e: | |
| print(f"❌ Custom model classification failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| async def classify_with_pretrained_model(self, text: str, language: str = "english") -> Dict: | |
| """Classify using ensemble of pretrained models with translation support""" | |
| # Translate Bengali text to English | |
| translated_text = None | |
| if language == "bengali": | |
| translated_text = self._translate_to_english(text) | |
| if not translated_text: | |
| print("❌ Translation failed, skipping pretrained models") | |
| return None | |
| text_to_analyze = translated_text | |
| else: | |
| text_to_analyze = text | |
| results = [] | |
| # For long texts, analyze first 400 words | |
| words = text_to_analyze.split() | |
| if len(words) > 400: | |
| truncated_text = ' '.join(words[:400]) | |
| print(f"⚠️ Text too long ({len(words)} words), analyzing first 400 words") | |
| else: | |
| truncated_text = text_to_analyze | |
| # Try primary model | |
| self._load_pretrained_model("primary") | |
| primary = self.pretrained_models["primary"] | |
| if primary["pipeline"] is not None: | |
| try: | |
| result = primary["pipeline"](truncated_text)[0] | |
| if isinstance(result, list): | |
| result = result[0] | |
| label = result['label'].lower() | |
| confidence = float(result['score']) | |
| if 'hate' in label and 'not' not in label: | |
| category = "hate_speech" | |
| elif 'not' in label or 'non' in label: | |
| category = "neutral" | |
| else: | |
| category = "offensive" | |
| results.append({ | |
| "category": category, | |
| "confidence": confidence, | |
| "weight": primary["weight"], | |
| "model": "primary", | |
| "raw_label": result['label'] | |
| }) | |
| print(f"[Primary Model] {result['label']} -> {category} ({confidence:.2%})") | |
| except Exception as e: | |
| print(f"❌ Primary model failed: {e}") | |
| # Try secondary model | |
| self._load_pretrained_model("secondary") | |
| secondary = self.pretrained_models["secondary"] | |
| if secondary["pipeline"] is not None: | |
| try: | |
| result = secondary["pipeline"](truncated_text)[0] | |
| if isinstance(result, list): | |
| result = result[0] | |
| label = result['label'].lower() | |
| confidence = float(result['score']) | |
| if 'hate' in label: | |
| category = "hate_speech" | |
| elif 'offensive' in label: | |
| category = "offensive" | |
| else: | |
| category = "neutral" | |
| results.append({ | |
| "category": category, | |
| "confidence": confidence, | |
| "weight": secondary["weight"], | |
| "model": "secondary", | |
| "raw_label": result['label'] | |
| }) | |
| print(f"[Secondary Model] {result['label']} -> {category} ({confidence:.2%})") | |
| except Exception as e: | |
| print(f"❌ Secondary model failed: {e}") | |
| if not results: | |
| return None | |
| # Ensemble voting | |
| category_scores = {} | |
| for result in results: | |
| cat = result["category"] | |
| score = result["confidence"] * result["weight"] | |
| category_scores[cat] = category_scores.get(cat, 0) + score | |
| final_category = max(category_scores, key=category_scores.get) | |
| total_weight = sum(r["weight"] for r in results) | |
| final_confidence = category_scores[final_category] / total_weight | |
| raw_labels = [r["raw_label"] for r in results] | |
| return { | |
| "category": final_category, | |
| "confidence": final_confidence, | |
| "method": "pretrained_ensemble", | |
| "raw_labels": raw_labels, | |
| "models_used": [r["model"] for r in results], | |
| "translated": language == "bengali", | |
| "translated_text": translated_text[:200] + "..." if translated_text and len(translated_text) > 200 else translated_text | |
| } | |
| def classify_with_keywords(self, text: str, language: str) -> Dict: | |
| """Classify using keyword and pattern matching""" | |
| text_lower = text.lower() | |
| hate_count = sum(1 for keyword in self.hate_keywords.get(language, []) | |
| if keyword.lower() in text_lower) | |
| offensive_count = sum(1 for keyword in self.offensive_keywords.get(language, []) | |
| if keyword.lower() in text_lower) | |
| pattern_matches = [] | |
| matched_patterns = [] | |
| for pattern in self.hate_patterns.get(language, []): | |
| match = re.search(pattern, text_lower, re.IGNORECASE) | |
| if match: | |
| pattern_matches.append(pattern) | |
| matched_patterns.append(match.group(0)) | |
| if pattern_matches or hate_count > 0: | |
| category = "hate_speech" | |
| base_confidence = 0.90 if pattern_matches else 0.7 | |
| confidence = min(base_confidence + (hate_count * 0.03), 0.98) | |
| elif offensive_count > 0: | |
| category = "offensive" | |
| confidence = min(0.6 + (offensive_count * 0.08), 0.88) | |
| else: | |
| category = "neutral" | |
| confidence = 0.7 | |
| detected_keywords = [] | |
| for keyword in self.hate_keywords.get(language, []): | |
| if keyword.lower() in text_lower: | |
| detected_keywords.append(keyword) | |
| for keyword in self.offensive_keywords.get(language, []): | |
| if keyword.lower() in text_lower: | |
| detected_keywords.append(keyword) | |
| return { | |
| "category": category, | |
| "confidence": confidence, | |
| "method": "keyword_matching", | |
| "detected_keywords": detected_keywords, | |
| "hate_count": hate_count, | |
| "offensive_count": offensive_count, | |
| "pattern_matches": len(pattern_matches), | |
| "matched_patterns": matched_patterns[:3] | |
| } |