AI_Text_Authenticator / metrics /semantic_analysis.py
satyaki-mitra
feat: Add Text Auth AI Detection System
edf1149
raw
history blame
20.7 kB
# DEPENDENCIES
import re
import numpy as np
from typing import Any
from typing import Dict
from typing import List
from loguru import logger
from collections import Counter
from config.threshold_config import Domain
from metrics.base_metric import BaseMetric
from metrics.base_metric import MetricResult
from models.model_manager import get_model_manager
from sklearn.metrics.pairwise import cosine_similarity
from config.threshold_config import get_threshold_for_domain
class SemanticAnalysisMetric(BaseMetric):
"""
Semantic coherence and consistency analysis
Measures (Aligned with Documentation):
- Semantic similarity between sentences
- Topic consistency across text
- Coherence and logical flow
- Repetition patterns and redundancy
- Contextual consistency
"""
def __init__(self):
super().__init__(name = "semantic_analysis",
description = "Semantic coherence, repetition patterns, and contextual consistency analysis",
)
self.sentence_model = None
def initialize(self) -> bool:
"""
Initialize the semantic analysis metric
"""
try:
logger.info("Initializing semantic analysis metric...")
# Load sentence transformer for semantic embeddings
model_manager = get_model_manager()
self.sentence_model = model_manager.load_model("semantic_primary")
self.is_initialized = True
logger.success("Semantic analysis metric initialized successfully")
return True
except Exception as e:
logger.error(f"Failed to initialize semantic analysis metric: {repr(e)}")
return False
def compute(self, text: str, **kwargs) -> MetricResult:
"""
Compute semantic analysis measures with FULL DOMAIN THRESHOLD INTEGRATION
"""
try:
if (not text or (len(text.strip()) < 50)):
return MetricResult(metric_name = self.name,
ai_probability = 0.5,
human_probability = 0.5,
mixed_probability = 0.0,
confidence = 0.1,
error = "Text too short for semantic analysis",
)
# Get domain-specific thresholds
domain = kwargs.get('domain', Domain.GENERAL)
domain_thresholds = get_threshold_for_domain(domain)
semantic_thresholds = domain_thresholds.semantic_analysis
# Calculate comprehensive semantic features
features = self._calculate_semantic_features(text)
# Calculate raw semantic score (0-1 scale)
raw_semantic_score, confidence = self._analyze_semantic_patterns(features)
# Apply domain-specific thresholds to convert raw score to probabilities
ai_prob, human_prob, mixed_prob = self._apply_domain_thresholds(raw_semantic_score, semantic_thresholds, features)
# Apply confidence multiplier from domain thresholds
confidence *= semantic_thresholds.confidence_multiplier
confidence = max(0.0, min(1.0, confidence))
return MetricResult(metric_name = self.name,
ai_probability = ai_prob,
human_probability = human_prob,
mixed_probability = mixed_prob,
confidence = confidence,
details = {**features,
'domain_used' : domain.value,
'ai_threshold' : semantic_thresholds.ai_threshold,
'human_threshold' : semantic_thresholds.human_threshold,
'raw_score' : raw_semantic_score,
},
)
except Exception as e:
logger.error(f"Error in semantic analysis computation: {repr(e)}")
return MetricResult(metric_name = self.name,
ai_probability = 0.5,
human_probability = 0.5,
mixed_probability = 0.0,
confidence = 0.0,
error = str(e),
)
def _apply_domain_thresholds(self, raw_score: float, thresholds: Any, features: Dict[str, Any]) -> tuple:
"""
Apply domain-specific thresholds to convert raw score to probabilities
"""
ai_threshold = thresholds.ai_threshold # e.g., 0.65 for GENERAL, 0.70 for ACADEMIC
human_threshold = thresholds.human_threshold # e.g., 0.35 for GENERAL, 0.30 for ACADEMIC
# Calculate probabilities based on threshold distances
if (raw_score >= ai_threshold):
# Above AI threshold - strongly AI
distance_from_threshold = raw_score - ai_threshold
ai_prob = 0.7 + (distance_from_threshold * 0.3) # 0.7 to 1.0
human_prob = 0.3 - (distance_from_threshold * 0.3) # 0.3 to 0.0
elif (raw_score <= human_threshold):
# Below human threshold - strongly human
distance_from_threshold = human_threshold - raw_score
ai_prob = 0.3 - (distance_from_threshold * 0.3) # 0.3 to 0.0
human_prob = 0.7 + (distance_from_threshold * 0.3) # 0.7 to 1.0
else:
# Between thresholds - uncertain zone
range_width = ai_threshold - human_threshold
if (range_width > 0):
position_in_range = (raw_score - human_threshold) / range_width
ai_prob = 0.3 + (position_in_range * 0.4) # 0.3 to 0.7
human_prob = 0.7 - (position_in_range * 0.4) # 0.7 to 0.3
else:
ai_prob = 0.5
human_prob = 0.5
# Ensure probabilities are valid
ai_prob = max(0.0, min(1.0, ai_prob))
human_prob = max(0.0, min(1.0, human_prob))
# Calculate mixed probability based on semantic variance
mixed_prob = self._calculate_mixed_probability(features)
# Normalize to sum to 1.0
total = ai_prob + human_prob + mixed_prob
if (total > 0):
ai_prob /= total
human_prob /= total
mixed_prob /= total
return ai_prob, human_prob, mixed_prob
def _calculate_semantic_features(self, text: str) -> Dict[str, Any]:
"""
Calculate comprehensive semantic analysis features
"""
# Split text into sentences
sentences = self._split_sentences(text)
if (len(sentences) < 3):
return self._get_default_features()
# Calculate semantic embeddings for all sentences
sentence_embeddings = self._get_sentence_embeddings(sentences)
if sentence_embeddings is None:
return self._get_default_features()
# Calculate semantic similarity matrix
similarity_matrix = cosine_similarity(sentence_embeddings)
# Calculate various semantic metrics
coherence_score = self._calculate_coherence(similarity_matrix)
consistency_score = self._calculate_consistency(similarity_matrix)
repetition_score = self._detect_repetition_patterns(sentences, similarity_matrix)
topic_drift_score = self._calculate_topic_drift(similarity_matrix)
contextual_consistency = self._calculate_contextual_consistency(sentences)
# Chunk-based analysis for whole-text understanding
chunk_coherence = self._calculate_chunk_coherence(text, chunk_size=200)
return {"coherence_score" : round(coherence_score, 4),
"consistency_score" : round(consistency_score, 4),
"repetition_score" : round(repetition_score, 4),
"topic_drift_score" : round(topic_drift_score, 4),
"contextual_consistency" : round(contextual_consistency, 4),
"avg_chunk_coherence" : round(np.mean(chunk_coherence) if chunk_coherence else 0.0, 4),
"coherence_variance" : round(np.var(chunk_coherence) if chunk_coherence else 0.0, 4),
"num_sentences" : len(sentences),
"num_chunks_analyzed" : len(chunk_coherence),
}
def _split_sentences(self, text: str) -> List[str]:
"""
Split text into sentences
"""
sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|\!)\s', text)
return [s.strip() for s in sentences if s.strip() and len(s.strip()) > 10]
def _get_sentence_embeddings(self, sentences: List[str]) -> np.ndarray:
"""
Get semantic embeddings for sentences
"""
try:
if not self.sentence_model:
return None
# Filter out very short sentences that might cause issues
valid_sentences = [s for s in sentences if len(s.strip()) > 5]
if not valid_sentences:
return None
# Encode sentences to get embeddings
embeddings = self.sentence_model.encode(valid_sentences)
# Check if embeddings are valid
if ((embeddings is None) or (len(embeddings) == 0)):
return None
return embeddings
except Exception as e:
logger.warning(f"Sentence embedding failed: {repr(e)}")
return None
def _calculate_coherence(self, similarity_matrix: np.ndarray) -> float:
"""
Calculate overall text coherence : Higher coherence = more logically connected sentences
"""
if similarity_matrix.size == 0:
return 0.0
# Calculate average similarity between adjacent sentences
adjacent_similarities = list()
for i in range(len(similarity_matrix) - 1):
adjacent_similarities.append(similarity_matrix[i, i + 1])
if (not adjacent_similarities):
return 0.0
return np.mean(adjacent_similarities)
def _calculate_consistency(self, similarity_matrix: np.ndarray) -> float:
"""
Calculate topic consistency throughout the text : Lower variance in similarities = more consistent
"""
if (similarity_matrix.size == 0):
return 0.0
# Calculate variance of similarities (lower variance = more consistent)
all_similarities = similarity_matrix[np.triu_indices_from(similarity_matrix, k=1)]
if (len(all_similarities) == 0):
return 0.0
variance = np.var(all_similarities)
# Convert to consistency score (higher = more consistent)
consistency = 1.0 - min(1.0, variance * 5.0) # Normalize
return max(0.0, consistency)
def _detect_repetition_patterns(self, sentences: List[str], similarity_matrix: np.ndarray) -> float:
"""
Detect repetition patterns in semantic content : AI text sometimes shows more semantic repetition
"""
if (len(sentences) < 5):
return 0.0
# Look for high similarity between non-adjacent sentences
repetition_count = 0
total_comparisons = 0
for i in range(len(sentences)):
for j in range(i + 2, len(sentences)): # Skip adjacent sentences
# High semantic similarity
if (similarity_matrix[i, j] > 0.8):
repetition_count += 1
total_comparisons += 1
if (total_comparisons == 0):
return 0.0
repetition_score = repetition_count / total_comparisons
# Scale to make differences more noticeable
return min(1.0, repetition_score * 3.0)
def _calculate_topic_drift(self, similarity_matrix: np.ndarray) -> float:
"""
Calculate topic drift throughout the text : Higher drift = less focused content
"""
if (len(similarity_matrix) < 3):
return 0.0
# Calculate similarity between beginning and end sections
start_size = min(3, len(similarity_matrix) // 3)
end_size = min(3, len(similarity_matrix) // 3)
start_indices = list(range(start_size))
end_indices = list(range(len(similarity_matrix) - end_size, len(similarity_matrix)))
cross_similarities = list()
for i in start_indices:
for j in end_indices:
cross_similarities.append(similarity_matrix[i, j])
if not cross_similarities:
return 0.0
avg_cross_similarity = np.mean(cross_similarities)
# Lower similarity between start and end = higher topic drift
topic_drift = 1.0 - avg_cross_similarity
return max(0.0, topic_drift)
def _calculate_contextual_consistency(self, sentences: List[str]) -> float:
"""
Calculate contextual consistency using keyword and entity analysis
"""
if (len(sentences) < 3):
return 0.0
# Simple keyword consistency analysis : Extract meaningful words (nouns, adjectives)
all_words = list()
for sentence in sentences:
words = re.findall(r'\b[a-zA-Z]{4,}\b', sentence.lower())
all_words.extend(words)
if (len(all_words) < 10):
return 0.0
# Calculate how consistently keywords are used across sentences
word_freq = Counter(all_words)
top_keywords = [word for word, count in word_freq.most_common(10) if count > 1]
if not top_keywords:
return 0.0
# Check if top keywords appear consistently across sentences
keyword_presence = list()
for keyword in top_keywords:
sentences_with_keyword = sum(1 for sentence in sentences if keyword in sentence.lower())
presence_ratio = sentences_with_keyword / len(sentences)
keyword_presence.append(presence_ratio)
consistency = np.mean(keyword_presence)
return consistency
def _calculate_chunk_coherence(self, text: str, chunk_size: int = 200) -> List[float]:
"""
Calculate coherence across text chunks for whole-text analysis
"""
chunks = list()
words = text.split()
# Create overlapping chunks
for i in range(0, len(words), chunk_size // 2):
chunk = ' '.join(words[i:i + chunk_size])
# Minimum chunk size
if (len(chunk) > 50):
chunk_sentences = self._split_sentences(chunk)
if (len(chunk_sentences) >= 2):
embeddings = self._get_sentence_embeddings(chunk_sentences)
if ((embeddings is not None) and (len(embeddings) >= 2)):
similarity_matrix = cosine_similarity(embeddings)
coherence = self._calculate_coherence(similarity_matrix)
chunks.append(coherence)
return chunks if chunks else [0.0]
def _analyze_semantic_patterns(self, features: Dict[str, Any]) -> tuple:
"""
Analyze semantic patterns to determine RAW semantic score (0-1 scale)
"""
# Check feature validity first
required_features = ['coherence_score', 'consistency_score', 'repetition_score', 'topic_drift_score', 'coherence_variance']
valid_features = [features.get(feat, 0) for feat in required_features if features.get(feat, 0) > 0]
if (len(valid_features) < 3):
# Low confidence if insufficient features
return 0.5, 0.3
# Initialize ai_indicator list
ai_indicators = list()
# AI text often has very high coherence (too perfect)
if (features['coherence_score'] > 0.7):
# Suspiciously high coherence
ai_indicators.append(0.8)
elif (features['coherence_score'] > 0.5):
# Moderate coherence
ai_indicators.append(0.5)
else:
# Low coherence - more human-like
ai_indicators.append(0.2)
# Very high consistency suggests AI (unnaturally consistent)
if (features['consistency_score'] > 0.8):
ai_indicators.append(0.9)
elif (features['consistency_score'] > 0.6):
ai_indicators.append(0.6)
else:
ai_indicators.append(0.3)
# High repetition suggests AI
if (features['repetition_score'] > 0.3):
ai_indicators.append(0.7)
elif (features['repetition_score'] > 0.1):
ai_indicators.append(0.4)
else:
ai_indicators.append(0.2)
# Very low topic drift suggests AI (stays too focused)
if (features['topic_drift_score'] < 0.2):
ai_indicators.append(0.8)
elif (features['topic_drift_score'] < 0.4):
ai_indicators.append(0.5)
else:
ai_indicators.append(0.3)
# Low coherence variance across chunks suggests AI
if (features['coherence_variance'] < 0.05):
ai_indicators.append(0.7)
elif (features['coherence_variance'] < 0.1):
ai_indicators.append(0.4)
else:
ai_indicators.append(0.2)
# Calculate raw score and confidence
raw_score = np.mean(ai_indicators) if ai_indicators else 0.5
confidence = 1.0 - (np.std(ai_indicators) / 0.5) if ai_indicators else 0.5
confidence = max(0.1, min(0.9, confidence))
return raw_score, confidence
def _calculate_mixed_probability(self, features: Dict[str, Any]) -> float:
"""
Calculate probability of mixed AI/Human content
"""
mixed_indicators = list()
# Moderate coherence values might indicate mixing
if (0.4 <= features['coherence_score'] <= 0.6):
mixed_indicators.append(0.3)
else:
mixed_indicators.append(0.0)
# High coherence variance suggests mixed content
if (features['coherence_variance'] > 0.15):
mixed_indicators.append(0.4)
elif (features['coherence_variance'] > 0.1):
mixed_indicators.append(0.2)
else:
mixed_indicators.append(0.0)
# Inconsistent repetition patterns
if (0.15 <= features['repetition_score'] <= 0.35):
mixed_indicators.append(0.3)
else:
mixed_indicators.append(0.0)
return min(0.3, np.mean(mixed_indicators)) if mixed_indicators else 0.0
def _get_default_features(self) -> Dict[str, Any]:
"""
Return default features when analysis is not possible
"""
return {"coherence_score" : 0.5,
"consistency_score" : 0.5,
"repetition_score" : 0.0,
"topic_drift_score" : 0.5,
"contextual_consistency" : 0.5,
"avg_chunk_coherence" : 0.5,
"coherence_variance" : 0.1,
"num_sentences" : 0,
"num_chunks_analyzed" : 0,
}
def cleanup(self):
"""
Clean up resources
"""
self.sentence_model = None
super().cleanup()
# Export
__all__ = ["SemanticAnalysisMetric"]