|
|
|
|
|
import re |
|
|
import torch |
|
|
import numpy as np |
|
|
from typing import Any |
|
|
from typing import Dict |
|
|
from typing import List |
|
|
from loguru import logger |
|
|
from transformers import pipeline |
|
|
from config.threshold_config import Domain |
|
|
from metrics.base_metric import BaseMetric |
|
|
from metrics.base_metric import MetricResult |
|
|
from models.model_manager import get_model_manager |
|
|
from config.threshold_config import get_threshold_for_domain |
|
|
|
|
|
|
|
|
|
|
|
class MultiPerturbationStabilityMetric(BaseMetric): |
|
|
""" |
|
|
Multi-Perturbation Stability Metric (MPSM) |
|
|
|
|
|
A hybrid approach for combining multiple perturbation techniques for robust AI-generated text detection |
|
|
|
|
|
Measures: |
|
|
- Text stability under random perturbations |
|
|
- Likelihood curvature analysis |
|
|
- Masked token prediction analysis |
|
|
|
|
|
Perturbation Methods: |
|
|
- Word deletation & swapping |
|
|
- RoBERTa mask filling |
|
|
- Synonym replacement |
|
|
- Chunk-based stability Analysis |
|
|
""" |
|
|
def __init__(self): |
|
|
super().__init__(name = "multi_perturbation_stability", |
|
|
description = "Text stability analysis under multi-perturbations techniques", |
|
|
) |
|
|
|
|
|
self.gpt_model = None |
|
|
self.gpt_tokenizer = None |
|
|
self.mask_model = None |
|
|
self.mask_tokenizer = None |
|
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu") |
|
|
|
|
|
|
|
|
def initialize(self) -> bool: |
|
|
""" |
|
|
Initialize the MultiPerturbationStability metric |
|
|
""" |
|
|
try: |
|
|
logger.info("Initializing MultiPerturbationStability metric...") |
|
|
|
|
|
|
|
|
model_manager = get_model_manager() |
|
|
gpt_result = model_manager.load_model(model_name = "multi_perturbation_base") |
|
|
|
|
|
if isinstance(gpt_result, tuple): |
|
|
self.gpt_model, self.gpt_tokenizer = gpt_result |
|
|
|
|
|
self.gpt_model.to(self.device) |
|
|
|
|
|
else: |
|
|
logger.error("Failed to load GPT-2 model for MultiPerturbationStability") |
|
|
return False |
|
|
|
|
|
|
|
|
mask_result = model_manager.load_model("multi_perturbation_mask") |
|
|
|
|
|
if (isinstance(mask_result, tuple)): |
|
|
self.mask_model, self.mask_tokenizer = mask_result |
|
|
|
|
|
self.mask_model.to(self.device) |
|
|
|
|
|
|
|
|
if (self.mask_tokenizer.pad_token is None): |
|
|
self.mask_tokenizer.pad_token = self.mask_tokenizer.eos_token or '[PAD]' |
|
|
|
|
|
else: |
|
|
logger.warning("Failed to load mask model, using GPT-2 only") |
|
|
|
|
|
self.is_initialized = True |
|
|
|
|
|
logger.success("MultiPerturbationStability metric initialized successfully") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize MultiPerturbationStability metric: {repr(e)}") |
|
|
return False |
|
|
|
|
|
|
|
|
def compute(self, text: str, **kwargs) -> MetricResult: |
|
|
""" |
|
|
Compute MultiPerturbationStability analysis with FULL DOMAIN THRESHOLD INTEGRATION |
|
|
""" |
|
|
try: |
|
|
if ((not text) or (len(text.strip()) < 100)): |
|
|
return MetricResult(metric_name = self.name, |
|
|
ai_probability = 0.5, |
|
|
human_probability = 0.5, |
|
|
mixed_probability = 0.0, |
|
|
confidence = 0.1, |
|
|
error = "Text too short for MultiPerturbationStability analysis", |
|
|
) |
|
|
|
|
|
|
|
|
domain = kwargs.get('domain', Domain.GENERAL) |
|
|
domain_thresholds = get_threshold_for_domain(domain) |
|
|
multi_perturbation_stability_thresholds = domain_thresholds.multi_perturbation_stability |
|
|
|
|
|
|
|
|
if (kwargs.get('skip_expensive', False)): |
|
|
logger.info("Skipping MultiPerturbationStability due to computational constraints") |
|
|
|
|
|
return MetricResult(metric_name = self.name, |
|
|
ai_probability = 0.5, |
|
|
human_probability = 0.5, |
|
|
mixed_probability = 0.0, |
|
|
confidence = 0.3, |
|
|
error = "Skipped for performance", |
|
|
) |
|
|
|
|
|
|
|
|
features = self._calculate_stability_features(text) |
|
|
|
|
|
|
|
|
raw_stability_score, confidence = self._analyze_stability_patterns(features) |
|
|
|
|
|
|
|
|
ai_prob, human_prob, mixed_prob = self._apply_domain_thresholds(raw_stability_score, multi_perturbation_stability_thresholds, features) |
|
|
|
|
|
|
|
|
confidence *= multi_perturbation_stability_thresholds.confidence_multiplier |
|
|
confidence = max(0.0, min(1.0, confidence)) |
|
|
|
|
|
return MetricResult(metric_name = self.name, |
|
|
ai_probability = ai_prob, |
|
|
human_probability = human_prob, |
|
|
mixed_probability = mixed_prob, |
|
|
confidence = confidence, |
|
|
details = {**features, |
|
|
'domain_used' : domain.value, |
|
|
'ai_threshold' : multi_perturbation_stability_thresholds.ai_threshold, |
|
|
'human_threshold' : multi_perturbation_stability_thresholds.human_threshold, |
|
|
'raw_score' : raw_stability_score, |
|
|
}, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in MultiPerturbationStability computation: {repr(e)}") |
|
|
|
|
|
return MetricResult(metric_name = self.name, |
|
|
ai_probability = 0.5, |
|
|
human_probability = 0.5, |
|
|
mixed_probability = 0.0, |
|
|
confidence = 0.0, |
|
|
error = str(e), |
|
|
) |
|
|
|
|
|
|
|
|
def _apply_domain_thresholds(self, raw_score: float, thresholds: Any, features: Dict[str, Any]) -> tuple: |
|
|
""" |
|
|
Apply domain-specific thresholds to convert raw score to probabilities |
|
|
""" |
|
|
ai_threshold = thresholds.ai_threshold |
|
|
human_threshold = thresholds.human_threshold |
|
|
|
|
|
|
|
|
if (raw_score >= ai_threshold): |
|
|
|
|
|
distance_from_threshold = raw_score - ai_threshold |
|
|
ai_prob = 0.7 + (distance_from_threshold * 0.3) |
|
|
human_prob = 0.3 - (distance_from_threshold * 0.3) |
|
|
|
|
|
elif (raw_score <= human_threshold): |
|
|
|
|
|
distance_from_threshold = human_threshold - raw_score |
|
|
ai_prob = 0.3 - (distance_from_threshold * 0.3) |
|
|
human_prob = 0.7 + (distance_from_threshold * 0.3) |
|
|
|
|
|
else: |
|
|
|
|
|
range_width = ai_threshold - human_threshold |
|
|
|
|
|
if (range_width > 0): |
|
|
position_in_range = (raw_score - human_threshold) / range_width |
|
|
ai_prob = 0.3 + (position_in_range * 0.4) |
|
|
human_prob = 0.7 - (position_in_range * 0.4) |
|
|
|
|
|
else: |
|
|
ai_prob = 0.5 |
|
|
human_prob = 0.5 |
|
|
|
|
|
|
|
|
ai_prob = max(0.0, min(1.0, ai_prob)) |
|
|
human_prob = max(0.0, min(1.0, human_prob)) |
|
|
|
|
|
|
|
|
mixed_prob = self._calculate_mixed_probability(features) |
|
|
|
|
|
|
|
|
total = ai_prob + human_prob + mixed_prob |
|
|
|
|
|
if (total > 0): |
|
|
ai_prob /= total |
|
|
human_prob /= total |
|
|
mixed_prob /= total |
|
|
|
|
|
return ai_prob, human_prob, mixed_prob |
|
|
|
|
|
|
|
|
def _calculate_stability_features(self, text: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Calculate comprehensive MultiPerturbationStability features |
|
|
""" |
|
|
if not self.gpt_model or not self.gpt_tokenizer: |
|
|
return self._get_default_features() |
|
|
|
|
|
try: |
|
|
|
|
|
processed_text = self._preprocess_text_for_analysis(text) |
|
|
|
|
|
|
|
|
original_likelihood = self._calculate_likelihood(processed_text) |
|
|
|
|
|
|
|
|
perturbations = self._generate_perturbations(processed_text, num_perturbations = 5) |
|
|
perturbed_likelihoods = list() |
|
|
|
|
|
for perturbed_text in perturbations: |
|
|
if (perturbed_text and (perturbed_text != processed_text)): |
|
|
likelihood = self._calculate_likelihood(perturbed_text) |
|
|
|
|
|
if (likelihood > 0): |
|
|
perturbed_likelihoods.append(likelihood) |
|
|
|
|
|
|
|
|
if perturbed_likelihoods: |
|
|
stability_score = self._calculate_stability_score(original_likelihood, perturbed_likelihoods) |
|
|
curvature_score = self._calculate_curvature_score(original_likelihood, perturbed_likelihoods) |
|
|
variance_score = np.var(perturbed_likelihoods) if len(perturbed_likelihoods) > 1 else 0.0 |
|
|
avg_perturbed_likelihood = np.mean(perturbed_likelihoods) |
|
|
|
|
|
else: |
|
|
stability_score = 0.5 |
|
|
curvature_score = 0.5 |
|
|
variance_score = 0.1 |
|
|
avg_perturbed_likelihood = original_likelihood |
|
|
|
|
|
|
|
|
likelihood_ratio = original_likelihood / avg_perturbed_likelihood if avg_perturbed_likelihood > 0 else 1.0 |
|
|
|
|
|
|
|
|
chunk_stabilities = self._calculate_chunk_stability(processed_text, chunk_size=150) |
|
|
stability_variance = np.var(chunk_stabilities) if chunk_stabilities else 0.0 |
|
|
avg_chunk_stability = np.mean(chunk_stabilities) if chunk_stabilities else stability_score |
|
|
|
|
|
|
|
|
normalized_stability = min(1.0, max(0.0, stability_score)) |
|
|
normalized_curvature = min(1.0, max(0.0, curvature_score)) |
|
|
normalized_likelihood_ratio = min(2.0, likelihood_ratio) / 2.0 |
|
|
|
|
|
return {"original_likelihood" : round(original_likelihood, 4), |
|
|
"avg_perturbed_likelihood" : round(avg_perturbed_likelihood, 4), |
|
|
"likelihood_ratio" : round(likelihood_ratio, 4), |
|
|
"normalized_likelihood_ratio" : round(normalized_likelihood_ratio, 4), |
|
|
"stability_score" : round(normalized_stability, 4), |
|
|
"curvature_score" : round(normalized_curvature, 4), |
|
|
"perturbation_variance" : round(variance_score, 4), |
|
|
"avg_chunk_stability" : round(avg_chunk_stability, 4), |
|
|
"stability_variance" : round(stability_variance, 4), |
|
|
"num_perturbations" : len(perturbations), |
|
|
"num_valid_perturbations" : len(perturbed_likelihoods), |
|
|
"num_chunks_analyzed" : len(chunk_stabilities), |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"MultiPerturbationStability feature calculation failed: {repr(e)}") |
|
|
return self._get_default_features() |
|
|
|
|
|
|
|
|
def _calculate_likelihood(self, text: str) -> float: |
|
|
""" |
|
|
Calculate log-likelihood of text using GPT-2 with robust error handling |
|
|
""" |
|
|
try: |
|
|
|
|
|
if (len(text.strip()) < 10): |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
tokenizer = self._configure_tokenizer_padding(self.gpt_tokenizer) |
|
|
|
|
|
|
|
|
encodings = tokenizer(text, |
|
|
return_tensors = 'pt', |
|
|
truncation = True, |
|
|
max_length = 512, |
|
|
padding = True, |
|
|
return_attention_mask = True, |
|
|
) |
|
|
|
|
|
input_ids = encodings.input_ids.to(self.device) |
|
|
attention_mask = encodings.attention_mask.to(self.device) |
|
|
|
|
|
|
|
|
if ((input_ids.numel() == 0) or (input_ids.size(1) < 5)): |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.gpt_model(input_ids, |
|
|
attention_mask = attention_mask, |
|
|
labels = input_ids, |
|
|
) |
|
|
|
|
|
loss = outputs.loss |
|
|
|
|
|
|
|
|
log_likelihood = -loss.item() |
|
|
|
|
|
|
|
|
if (abs(log_likelihood) > 100): |
|
|
logger.warning(f"Extreme likelihood value detected: {log_likelihood}") |
|
|
return 0.0 |
|
|
|
|
|
return log_likelihood |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Likelihood calculation failed: {repr(e)}") |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
def _generate_perturbations(self, text: str, num_perturbations: int = 5) -> List[str]: |
|
|
""" |
|
|
Generate perturbed versions of the text with robust error handling |
|
|
""" |
|
|
perturbations = list() |
|
|
|
|
|
try: |
|
|
|
|
|
processed_text = self._preprocess_text_for_perturbation(text) |
|
|
words = processed_text.split() |
|
|
|
|
|
if (len(words) < 3): |
|
|
return [processed_text] |
|
|
|
|
|
|
|
|
if (len(words) > 5): |
|
|
for _ in range(min(3, num_perturbations)): |
|
|
try: |
|
|
|
|
|
delete_count = max(1, len(words) // 10) |
|
|
indices_to_keep = np.random.choice(len(words), len(words) - delete_count, replace = False) |
|
|
|
|
|
perturbed_words = [words[i] for i in sorted(indices_to_keep)] |
|
|
perturbed_text = ' '.join(perturbed_words) |
|
|
|
|
|
if (self._is_valid_perturbation(perturbed_text, processed_text)): |
|
|
perturbations.append(perturbed_text) |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Word deletion perturbation failed: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
if (len(words) > 4) and (len(perturbations) < num_perturbations): |
|
|
for _ in range(min(2, num_perturbations - len(perturbations))): |
|
|
try: |
|
|
perturbed_words = words.copy() |
|
|
|
|
|
|
|
|
if (len(perturbed_words) >= 3): |
|
|
swap_pos = np.random.randint(0, len(perturbed_words) - 2) |
|
|
perturbed_words[swap_pos], perturbed_words[swap_pos + 1] = perturbed_words[swap_pos + 1], perturbed_words[swap_pos] |
|
|
|
|
|
perturbed_text = ' '.join(perturbed_words) |
|
|
|
|
|
if (self._is_valid_perturbation(perturbed_text, processed_text)): |
|
|
perturbations.append(perturbed_text) |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Word swapping perturbation failed: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
if (self.mask_model and self.mask_tokenizer and (len(words) > 4) and len(perturbations) < num_perturbations): |
|
|
|
|
|
try: |
|
|
roberta_perturbations = self._generate_roberta_masked_perturbations(processed_text, |
|
|
words, |
|
|
num_perturbations - len(perturbations)) |
|
|
perturbations.extend(roberta_perturbations) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"RoBERTa masked perturbation failed: {repr(e)}") |
|
|
|
|
|
|
|
|
if (len(perturbations) < num_perturbations): |
|
|
try: |
|
|
synonym_perturbations = self._generate_synonym_perturbations(processed_text, |
|
|
words, |
|
|
num_perturbations - len(perturbations)) |
|
|
perturbations.extend(synonym_perturbations) |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Synonym replacement failed: {e}") |
|
|
|
|
|
|
|
|
if not perturbations: |
|
|
|
|
|
fallback_perturbations = self._generate_fallback_perturbations(processed_text, words) |
|
|
perturbations.extend(fallback_perturbations) |
|
|
|
|
|
|
|
|
unique_perturbations = list() |
|
|
|
|
|
for p in perturbations: |
|
|
if (p and (p != processed_text) and (p not in unique_perturbations) and (self._is_valid_perturbation(p, processed_text))): |
|
|
unique_perturbations.append(p) |
|
|
|
|
|
return unique_perturbations[:num_perturbations] |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Perturbation generation failed: {repr(e)}") |
|
|
|
|
|
return [text] |
|
|
|
|
|
|
|
|
def _generate_roberta_masked_perturbations(self, text: str, words: List[str], max_perturbations: int) -> List[str]: |
|
|
""" |
|
|
Generate perturbations using RoBERTa mask filling |
|
|
""" |
|
|
perturbations = list() |
|
|
|
|
|
try: |
|
|
|
|
|
roberta_mask_token = "<mask>" |
|
|
|
|
|
|
|
|
candidate_positions = [i for i, word in enumerate(words) if (len(word) > 3) and word.isalpha() and word.lower() not in ['the', 'and', 'but', 'for', 'with']] |
|
|
|
|
|
if not candidate_positions: |
|
|
candidate_positions = [i for i, word in enumerate(words) if len(word) > 2] |
|
|
|
|
|
if not candidate_positions: |
|
|
return perturbations |
|
|
|
|
|
|
|
|
attempts = min(max_perturbations * 2, len(candidate_positions)) |
|
|
positions_to_try = np.random.choice(candidate_positions, min(attempts, len(candidate_positions)), replace=False) |
|
|
|
|
|
for pos in positions_to_try: |
|
|
if (len(perturbations) >= max_perturbations): |
|
|
break |
|
|
|
|
|
try: |
|
|
|
|
|
masked_words = words.copy() |
|
|
original_word = masked_words[pos] |
|
|
masked_words[pos] = roberta_mask_token |
|
|
masked_text = ' '.join(masked_words) |
|
|
|
|
|
|
|
|
if not masked_text.endswith(('.', '!', '?')): |
|
|
masked_text += '.' |
|
|
|
|
|
|
|
|
inputs = self.mask_tokenizer(masked_text, |
|
|
return_tensors = "pt", |
|
|
truncation = True, |
|
|
max_length = min(128, self.mask_tokenizer.model_max_length), |
|
|
padding = True, |
|
|
) |
|
|
|
|
|
|
|
|
inputs = {k: v.to(self.device) for k, v in inputs.items()} |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.mask_model(**inputs) |
|
|
predictions = outputs.logits |
|
|
|
|
|
|
|
|
mask_token_index = torch.where(inputs["input_ids"][0] == self.mask_tokenizer.mask_token_id)[0] |
|
|
|
|
|
if (len(mask_token_index) == 0): |
|
|
continue |
|
|
|
|
|
mask_token_index = mask_token_index[0] |
|
|
|
|
|
|
|
|
probs = torch.nn.functional.softmax(predictions[0, mask_token_index], dim = -1) |
|
|
top_tokens = torch.topk(probs, 3, dim = -1) |
|
|
|
|
|
for token_id in top_tokens.indices: |
|
|
predicted_token = self.mask_tokenizer.decode(token_id).strip() |
|
|
|
|
|
|
|
|
predicted_token = self._clean_roberta_token(predicted_token) |
|
|
|
|
|
if (predicted_token and (predicted_token != original_word) and (len(predicted_token) > 1)): |
|
|
|
|
|
|
|
|
new_words = words.copy() |
|
|
new_words[pos] = predicted_token |
|
|
new_text = ' '.join(new_words) |
|
|
|
|
|
if (self._is_valid_perturbation(new_text, text)): |
|
|
perturbations.append(new_text) |
|
|
|
|
|
break |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"RoBERTa mask filling failed for position {pos}: {e}") |
|
|
continue |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"RoBERTa masked perturbations failed: {e}") |
|
|
|
|
|
return perturbations |
|
|
|
|
|
|
|
|
def _generate_synonym_perturbations(self, text: str, words: List[str], max_perturbations: int) -> List[str]: |
|
|
""" |
|
|
Simple synonym replacement as fallback |
|
|
""" |
|
|
perturbations = list() |
|
|
|
|
|
try: |
|
|
|
|
|
synonym_dict = {'good' : ['great', 'excellent', 'fine', 'nice'], |
|
|
'bad' : ['poor', 'terrible', 'awful', 'horrible'], |
|
|
'big' : ['large', 'huge', 'enormous', 'massive'], |
|
|
'small' : ['tiny', 'little', 'miniature', 'compact'], |
|
|
'fast' : ['quick', 'rapid', 'speedy', 'brisk'], |
|
|
'slow' : ['sluggish', 'leisurely', 'gradual', 'unhurried'], |
|
|
} |
|
|
|
|
|
|
|
|
replaceable_positions = [i for i, word in enumerate(words) if word.lower() in synonym_dict] |
|
|
|
|
|
if not replaceable_positions: |
|
|
return perturbations |
|
|
|
|
|
positions_to_try = np.random.choice(replaceable_positions, min(max_perturbations, len(replaceable_positions)), replace = False) |
|
|
|
|
|
for pos in positions_to_try: |
|
|
original_word = words[pos].lower() |
|
|
synonyms = synonym_dict.get(original_word, []) |
|
|
|
|
|
if synonyms: |
|
|
synonym = np.random.choice(synonyms) |
|
|
new_words = words.copy() |
|
|
new_words[pos] = synonym |
|
|
new_text = ' '.join(new_words) |
|
|
|
|
|
if (self._is_valid_perturbation(new_text, text)): |
|
|
perturbations.append(new_text) |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Synonym replacement failed: {e}") |
|
|
|
|
|
return perturbations |
|
|
|
|
|
|
|
|
def _generate_fallback_perturbations(self, text: str, words: List[str]) -> List[str]: |
|
|
""" |
|
|
Generate fallback perturbations when other methods fail |
|
|
""" |
|
|
perturbations = list() |
|
|
|
|
|
try: |
|
|
|
|
|
if (len(words) > 3): |
|
|
perturbations.append(' '.join(words[1:-1])) |
|
|
|
|
|
|
|
|
elif (len(words) > 1): |
|
|
perturbations.append(' '.join(words[1:])) |
|
|
|
|
|
|
|
|
if text: |
|
|
perturbations.append(text.lower()) |
|
|
perturbations.append(text.capitalize()) |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Fallback perturbation failed: {e}") |
|
|
|
|
|
return [p for p in perturbations if p and p != text][:3] |
|
|
|
|
|
|
|
|
def _calculate_stability_score(self, original_likelihood: float, perturbed_likelihoods: List[float]) -> float: |
|
|
""" |
|
|
Calculate text stability score under perturbations : AI text tends to be less stable (larger likelihood drops) |
|
|
""" |
|
|
if ((not perturbed_likelihoods) or (original_likelihood <= 0)): |
|
|
return 0.5 |
|
|
|
|
|
|
|
|
likelihood_drops = [(original_likelihood - pl) / original_likelihood for pl in perturbed_likelihoods] |
|
|
avg_drop = np.mean(likelihood_drops) if likelihood_drops else 0.0 |
|
|
|
|
|
|
|
|
stability_score = min(1.0, avg_drop / 0.5) |
|
|
|
|
|
return stability_score |
|
|
|
|
|
|
|
|
def _calculate_curvature_score(self, original_likelihood: float, perturbed_likelihoods: List[float]) -> float: |
|
|
""" |
|
|
Calculate likelihood curvature score : AI text often has different curvature properties |
|
|
""" |
|
|
if ((not perturbed_likelihoods) or (original_likelihood <= 0)): |
|
|
return 0.5 |
|
|
|
|
|
|
|
|
likelihood_changes = [abs(original_likelihood - pl) for pl in perturbed_likelihoods] |
|
|
change_variance = np.var(likelihood_changes) if len(likelihood_changes) > 1 else 0.0 |
|
|
|
|
|
|
|
|
curvature_score = min(1.0, change_variance * 10.0) |
|
|
|
|
|
return curvature_score |
|
|
|
|
|
|
|
|
def _calculate_chunk_stability(self, text: str, chunk_size: int = 150) -> List[float]: |
|
|
""" |
|
|
Calculate stability across text chunks for whole-text analysis |
|
|
""" |
|
|
stabilities = list() |
|
|
words = text.split() |
|
|
|
|
|
|
|
|
for i in range(0, len(words), chunk_size // 2): |
|
|
chunk = ' '.join(words[i:i + chunk_size]) |
|
|
|
|
|
if (len(chunk) > 50): |
|
|
try: |
|
|
chunk_likelihood = self._calculate_likelihood(chunk) |
|
|
|
|
|
if (chunk_likelihood > 0): |
|
|
|
|
|
chunk_words = chunk.split() |
|
|
|
|
|
if (len(chunk_words) > 5): |
|
|
|
|
|
delete_count = max(1, len(chunk_words) // 10) |
|
|
indices_to_keep = np.random.choice(len(chunk_words), len(chunk_words) - delete_count, replace=False) |
|
|
perturbed_chunk = ' '.join([chunk_words[i] for i in sorted(indices_to_keep)]) |
|
|
|
|
|
perturbed_likelihood = self._calculate_likelihood(perturbed_chunk) |
|
|
|
|
|
if (perturbed_likelihood > 0): |
|
|
stability = (chunk_likelihood - perturbed_likelihood) / chunk_likelihood |
|
|
stabilities.append(min(1.0, max(0.0, stability))) |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
return stabilities |
|
|
|
|
|
|
|
|
def _analyze_stability_patterns(self, features: Dict[str, Any]) -> tuple: |
|
|
""" |
|
|
Analyze MultiPerturbationStability patterns to determine RAW MultiPerturbationStability score (0-1 scale) : Higher score = more AI-like |
|
|
""" |
|
|
|
|
|
required_features = ['stability_score', 'curvature_score', 'normalized_likelihood_ratio', 'stability_variance', 'perturbation_variance'] |
|
|
|
|
|
valid_features = [features.get(feat, 0) for feat in required_features if features.get(feat, 0) > 0] |
|
|
|
|
|
if (len(valid_features) < 3): |
|
|
|
|
|
return 0.5, 0.3 |
|
|
|
|
|
|
|
|
|
|
|
ai_indicators = list() |
|
|
|
|
|
|
|
|
if (features['stability_score'] > 0.6): |
|
|
ai_indicators.append(0.8) |
|
|
|
|
|
elif (features['stability_score'] > 0.3): |
|
|
ai_indicators.append(0.5) |
|
|
|
|
|
else: |
|
|
ai_indicators.append(0.2) |
|
|
|
|
|
|
|
|
if (features['curvature_score'] > 0.7): |
|
|
ai_indicators.append(0.7) |
|
|
|
|
|
elif (features['curvature_score'] > 0.4): |
|
|
ai_indicators.append(0.4) |
|
|
|
|
|
else: |
|
|
ai_indicators.append(0.2) |
|
|
|
|
|
|
|
|
if (features['normalized_likelihood_ratio'] > 0.8): |
|
|
ai_indicators.append(0.9) |
|
|
|
|
|
elif (features['normalized_likelihood_ratio'] > 0.6): |
|
|
ai_indicators.append(0.6) |
|
|
|
|
|
else: |
|
|
ai_indicators.append(0.3) |
|
|
|
|
|
|
|
|
if (features['stability_variance'] < 0.05): |
|
|
ai_indicators.append(0.7) |
|
|
|
|
|
elif (features['stability_variance'] < 0.1): |
|
|
ai_indicators.append(0.4) |
|
|
|
|
|
else: |
|
|
ai_indicators.append(0.2) |
|
|
|
|
|
|
|
|
if (features['perturbation_variance'] > 0.1): |
|
|
ai_indicators.append(0.6) |
|
|
|
|
|
elif (features['perturbation_variance'] > 0.05): |
|
|
ai_indicators.append(0.4) |
|
|
|
|
|
else: |
|
|
ai_indicators.append(0.2) |
|
|
|
|
|
|
|
|
raw_score = np.mean(ai_indicators) if ai_indicators else 0.5 |
|
|
confidence = 1.0 - (np.std(ai_indicators) / 0.5) if ai_indicators else 0.5 |
|
|
confidence = max(0.1, min(0.9, confidence)) |
|
|
|
|
|
return raw_score, confidence |
|
|
|
|
|
|
|
|
def _calculate_mixed_probability(self, features: Dict[str, Any]) -> float: |
|
|
""" |
|
|
Calculate probability of mixed AI/Human content |
|
|
""" |
|
|
mixed_indicators = list() |
|
|
|
|
|
|
|
|
if (0.35 <= features['stability_score'] <= 0.55): |
|
|
mixed_indicators.append(0.3) |
|
|
|
|
|
else: |
|
|
mixed_indicators.append(0.0) |
|
|
|
|
|
|
|
|
if (features['stability_variance'] > 0.15): |
|
|
mixed_indicators.append(0.4) |
|
|
|
|
|
elif (features['stability_variance'] > 0.1): |
|
|
mixed_indicators.append(0.2) |
|
|
|
|
|
else: |
|
|
mixed_indicators.append(0.0) |
|
|
|
|
|
|
|
|
if (0.5 <= features['normalized_likelihood_ratio'] <= 0.8): |
|
|
mixed_indicators.append(0.3) |
|
|
|
|
|
else: |
|
|
mixed_indicators.append(0.0) |
|
|
|
|
|
return min(0.3, np.mean(mixed_indicators)) if mixed_indicators else 0.0 |
|
|
|
|
|
|
|
|
def _get_default_features(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Return default features when analysis is not possible |
|
|
""" |
|
|
return {"original_likelihood" : 2.0, |
|
|
"avg_perturbed_likelihood" : 1.8, |
|
|
"likelihood_ratio" : 1.1, |
|
|
"normalized_likelihood_ratio" : 0.55, |
|
|
"stability_score" : 0.5, |
|
|
"curvature_score" : 0.5, |
|
|
"perturbation_variance" : 0.05, |
|
|
"avg_chunk_stability" : 0.5, |
|
|
"stability_variance" : 0.1, |
|
|
"num_perturbations" : 0, |
|
|
"num_valid_perturbations" : 0, |
|
|
"num_chunks_analyzed" : 0, |
|
|
} |
|
|
|
|
|
|
|
|
def _preprocess_text_for_analysis(self, text: str) -> str: |
|
|
""" |
|
|
Preprocess text for MultiPerturbationStability analysis |
|
|
""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = ' '.join(text.split()) |
|
|
|
|
|
|
|
|
if len(text) > 2000: |
|
|
text = text[:2000] + "..." |
|
|
|
|
|
return text |
|
|
|
|
|
|
|
|
def _preprocess_text_for_perturbation(self, text: str) -> str: |
|
|
""" |
|
|
Preprocess text specifically for perturbation generation |
|
|
""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = ' '.join(text.split()) |
|
|
|
|
|
|
|
|
if not text.endswith(('.', '!', '?')): |
|
|
text += '.' |
|
|
|
|
|
|
|
|
if (len(text) > 1000): |
|
|
sentences = text.split('. ') |
|
|
if len(sentences) > 1: |
|
|
|
|
|
text = '. '.join(sentences[:3]) + '.' |
|
|
|
|
|
else: |
|
|
text = text[:1000] |
|
|
|
|
|
return text |
|
|
|
|
|
|
|
|
def _configure_tokenizer_padding(self, tokenizer) -> Any: |
|
|
""" |
|
|
Configure tokenizer for proper padding |
|
|
""" |
|
|
if tokenizer.pad_token is None: |
|
|
if tokenizer.eos_token is not None: |
|
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
|
|
else: |
|
|
tokenizer.add_special_tokens({'pad_token': '[PAD]'}) |
|
|
|
|
|
tokenizer.padding_side = "left" |
|
|
|
|
|
return tokenizer |
|
|
|
|
|
|
|
|
def _clean_roberta_token(self, token: str) -> str: |
|
|
""" |
|
|
Clean tokens from RoBERTa tokenizer |
|
|
""" |
|
|
if not token: |
|
|
return "" |
|
|
|
|
|
|
|
|
token = token.replace('Ġ', ' ') |
|
|
token = token.replace('</s>', '') |
|
|
token = token.replace('<s>', '') |
|
|
token = token.replace('<pad>', '') |
|
|
|
|
|
|
|
|
token = token.strip(' .,!?;:"\'') |
|
|
|
|
|
return token |
|
|
|
|
|
|
|
|
def _is_valid_perturbation(self, perturbed_text: str, original_text: str) -> bool: |
|
|
""" |
|
|
Check if a perturbation is valid |
|
|
""" |
|
|
|
|
|
return (perturbed_text and |
|
|
len(perturbed_text.strip()) > 10 and |
|
|
perturbed_text != original_text and |
|
|
len(perturbed_text) > len(original_text) * 0.5) |
|
|
|
|
|
|
|
|
def cleanup(self): |
|
|
""" |
|
|
Clean up resources |
|
|
""" |
|
|
self.gpt_model = None |
|
|
self.gpt_tokenizer = None |
|
|
self.mask_model = None |
|
|
self.mask_tokenizer = None |
|
|
|
|
|
super().cleanup() |
|
|
|
|
|
|
|
|
|
|
|
__all__ = ["MultiPerturbationStabilityMetric"] |