| import json |
| import argparse |
| import os |
| from collections import Counter |
| import math |
| import numpy as np |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
| from difflib import SequenceMatcher |
| from tqdm import tqdm |
|
|
| |
| sns.set_theme(style="whitegrid", context="paper") |
| plt.rcParams.update({ |
| "figure.dpi": 300, |
| "savefig.dpi": 300, |
| "font.size": 11, |
| "axes.titlesize": 12, |
| "axes.labelsize": 11, |
| |
| "font.family": "serif", |
| |
| "mathtext.fontset": "cm", |
| "axes.unicode_minus": False, |
| }) |
|
|
| |
| def get_longest_common_prefix(str_list: list[str]) -> str: |
| """Calculates the longest common prefix for a list of strings.""" |
| if not str_list: |
| return "" |
| prefix = str_list[0] |
| for s in str_list[1:]: |
| while not s.startswith(prefix): |
| prefix = prefix[:-1] |
| if not prefix: |
| return "" |
| return prefix |
|
|
|
|
| def get_character_entropy(s: str) -> float: |
| """Calculates the Shannon entropy for a string.""" |
| if not s: |
| return 0.0 |
| counts = Counter(s) |
| total_len = len(s) |
| entropy = 0.0 |
| for count in counts.values(): |
| p = count / total_len |
| entropy -= p * math.log2(p) |
| return entropy |
|
|
|
|
| def build_case_record(case: dict, tag: str | None = None) -> dict: |
| """Create a lightweight JSON-friendly summary of a collision case.""" |
| record = { |
| "num_raw_variants": case["num_raw_variants"], |
| "raw_chunk_variants_preview": [v[:80] for v in case.get("raw_chunk_variants", [])], |
| "analysis_plus": case.get("analysis_plus", {}), |
| } |
| if tag is not None: |
| record["tag"] = tag |
| return record |
|
|
|
|
| |
| def analyze_collision_report(report_path: str, output_dir: str, max_chars_for_diff: int = 256): |
| if not os.path.exists(report_path): |
| print(f"❌ Error: Report file not found at '{report_path}'") |
| return |
|
|
| print(f"🔍 Reading report file: {report_path}") |
| with open(report_path, "r", encoding="utf-8") as f: |
| all_collisions = json.load(f) |
|
|
| if not all_collisions: |
| print("🎉 No collisions found in the report. Nothing to analyze.") |
| return |
|
|
| print(f"Report contains {len(all_collisions)} colliding token sequences.") |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| |
| print("\n--- 1. Enriching data with LCP and entropy statistics ---") |
| enriched_collisions = [] |
| for collision in tqdm(all_collisions, desc="Analyzing content features"): |
| variants = collision["raw_chunk_variants"] |
| lcp = get_longest_common_prefix(variants) |
| avg_len = np.mean([len(v) for v in variants]) if variants else 0 |
| lcp_ratio = len(lcp) / avg_len if avg_len > 0 else 0.0 |
| lengths = [len(v) for v in variants] |
| entropies = [get_character_entropy(v) for v in variants] |
|
|
| collision["analysis_plus"] = { |
| "lcp_ratio": float(lcp_ratio), |
| "length_stats": { |
| "min": int(min(lengths)), |
| "max": int(max(lengths)), |
| "mean": float(np.mean(lengths)), |
| "std": float(np.std(lengths)), |
| }, |
| "entropy_stats": { |
| "min": float(min(entropies)), |
| "max": float(max(entropies)), |
| "mean": float(np.mean(entropies)), |
| }, |
| } |
| enriched_collisions.append(collision) |
|
|
| |
| num_variants_list = [c["num_raw_variants"] for c in enriched_collisions] |
| lcp_ratios = [c["analysis_plus"]["lcp_ratio"] for c in enriched_collisions] |
| entropy_means = [c["analysis_plus"]["entropy_stats"]["mean"] for c in enriched_collisions] |
|
|
| |
| print("\n--- 2. Selecting representative collision cases and generating text-based previews ---") |
|
|
| |
| max_collision_case = max(enriched_collisions, key=lambda c: c["num_raw_variants"]) |
| min_collision_case = min(enriched_collisions, key=lambda c: c["num_raw_variants"]) |
|
|
| |
| high_lcp_case = max(enriched_collisions, key=lambda c: c["analysis_plus"]["lcp_ratio"]) |
| low_lcp_case = min(enriched_collisions, key=lambda c: c["analysis_plus"]["lcp_ratio"]) |
|
|
| analysis_summary = { |
| "total_colliding_sequences": len(all_collisions), |
| "representative_cases": { |
| "max_collision": build_case_record(max_collision_case, "Maximum collision scale"), |
| "min_collision": build_case_record(min_collision_case, "Minimum collision scale"), |
| "high_lcp": build_case_record(high_lcp_case, "Highest LCP ratio"), |
| "low_lcp": build_case_record(low_lcp_case, "Lowest LCP ratio"), |
| } |
| } |
|
|
| |
| summary_report_path = os.path.join(output_dir, "final_analysis_summary.json") |
| with open(summary_report_path, "w", encoding="utf-8") as f: |
| json.dump(analysis_summary, f, indent=2, ensure_ascii=False) |
| print(f"\n💾 Final structured analysis summary saved to: {summary_report_path}") |
| print("\n--- 2. Aggregate visualization of collision patterns ---") |
|
|
| |
| print("Plotting collision scale histogram (Figure 10.1)...") |
| fig1, ax1 = plt.subplots(figsize=(6.2, 4.0)) |
| max_count = max(num_variants_list) |
| bins = np.arange(1.5, max_count + 1.5, 1) |
| sns.histplot( |
| num_variants_list, |
| bins=bins, |
| discrete=True, |
| shrink=0.8, |
| ax=ax1, |
| ) |
| ax1.set_yscale("log") |
| |
| ax1.set_xlabel("Raw chunks per compressed segment") |
| ax1.set_ylabel("Compressed segments (log scale)") |
| ax1.grid(True, which="both", linestyle="--", alpha=0.5) |
| fig1.tight_layout() |
| save_figure(fig1, output_dir, "1_collision_scale") |
|
|
| |
| print("Plotting LCP ratio histogram (Figure 10.2)...") |
| fig2, ax2 = plt.subplots(figsize=(6.2, 4.0)) |
| sns.histplot( |
| lcp_ratios, |
| bins=50, |
| binrange=(0.0, 1.0), |
| kde=False, |
| ax=ax2, |
| ) |
| |
| ax2.set_xlabel("LCP ratio") |
| ax2.set_ylabel("Compressed symbols") |
| ax2.set_xlim(0.0, 1.0) |
| ax2.grid(True, which="both", linestyle="--", alpha=0.5) |
| fig2.tight_layout() |
| save_figure(fig2, output_dir, "2_lcp_ratio") |
|
|
| |
| if len(lcp_ratios) > 1: |
| print("Plotting 2D density of LCP ratio vs entropy (Figure 10.3)...") |
| fig3, ax3 = plt.subplots(figsize=(6.2, 4.2)) |
| |
| sns.kdeplot( |
| x=lcp_ratios, |
| y=entropy_means, |
| fill=True, |
| thresh=0.01, |
| levels=40, |
| cmap="mako", |
| ax=ax3, |
| ) |
| ax3.set_title("Joint density of LCP ratio and character entropy") |
| ax3.set_xlabel("LCP ratio") |
| ax3.set_ylabel("Mean character entropy") |
| ax3.set_xlim(0.0, 1.0) |
| ax3.grid(True, which="both", linestyle="--", alpha=0.4) |
| fig3.tight_layout() |
| save_figure(fig3, output_dir, "3_lcp_vs_entropy") |
| else: |
| print("Not enough points to plot 2D KDE, skipping Figure 10.3.") |
|
|
| |
| |
| try: |
| print("Plotting auxiliary edit-distance based figures (optional)...") |
| avg_distances = [c["levenshtein_analysis"]["average_distance"] for c in enriched_collisions] |
|
|
| |
| fig4, ax4 = plt.subplots(figsize=(6.0, 4.0)) |
| scatter = ax4.scatter( |
| avg_distances, |
| lcp_ratios, |
| c=num_variants_list, |
| cmap="viridis", |
| alpha=0.6, |
| s=np.log1p(num_variants_list) * 18, |
| ) |
| cbar = fig4.colorbar(scatter, ax=ax4) |
| cbar.set_label("Number of raw variants") |
| ax4.set_title("Average Levenshtein distance vs LCP ratio") |
| ax4.set_xlabel("Average Levenshtein distance") |
| ax4.set_ylabel("LCP ratio") |
| ax4.grid(True, linestyle="--", alpha=0.4) |
| fig4.tight_layout() |
| save_figure(fig4, output_dir, "4_distance_vs_lcp_scatter") |
|
|
| |
| len_stds = [c["analysis_plus"]["length_stats"]["std"] for c in enriched_collisions] |
| fig5, ax5 = plt.subplots(figsize=(6.0, 4.0)) |
| scatter2 = ax5.scatter( |
| len_stds, |
| entropy_means, |
| c=lcp_ratios, |
| cmap="plasma", |
| alpha=0.7, |
| s=np.log1p(num_variants_list) * 18, |
| ) |
| cbar2 = fig5.colorbar(scatter2, ax=ax5) |
| cbar2.set_label("LCP ratio") |
| ax5.set_title("Length std. deviation vs mean character entropy") |
| ax5.set_xlabel("Std. deviation of raw chunk length") |
| ax5.set_ylabel("Mean character entropy") |
| ax5.set_xscale("log") |
| ax5.grid(True, which="both", linestyle="--", alpha=0.4) |
| fig5.tight_layout() |
| save_figure(fig5, output_dir, "5_length_std_vs_entropy_scatter") |
| except KeyError: |
| print("Some entries do not contain 'levenshtein_analysis'; skipping auxiliary edit-distance plots.") |
|
|
| print("\n✅ All analyses complete! Please check the output directory for the summary JSON.") |
|
|
|
|
|
|
| def save_figure(fig, output_dir: str, filename: str): |
| """ |
| Save a Matplotlib figure as both PNG and PDF with a common base filename. |
| """ |
| base = os.path.join(output_dir, filename) |
| for ext in ("png", "pdf"): |
| fig.savefig(f"{base}.{ext}", bbox_inches="tight") |
| plt.close(fig) |
| print(f"📁 Saved figure: {base}.png / .pdf") |
|
|
|
|
| |
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser( |
| description="Perform an in-depth, multi-dimensional, and visual analysis of a token collision report.", |
| formatter_class=argparse.RawTextHelpFormatter, |
| ) |
| parser.add_argument( |
| "report_json", |
| type=str, |
| help="Path to the token_sequence_collision_report.json file generated by the main analyzer.", |
| ) |
| parser.add_argument( |
| "-o", |
| "--output_dir", |
| type=str, |
| default="final_deep_analysis", |
| help="Output directory to store all analysis plots and summaries.", |
| ) |
|
|
| args = parser.parse_args() |
| analyze_collision_report(args.report_json, args.output_dir) |
|
|
| |
| """ |
| python deep_visual_analysis.py analysis_output_token_collision/token_collision_report.json |
| pip install numpy matplotlib seaborn tqdm |
| """ |