| import json |
| import base64 |
| import argparse |
| import os |
| import re |
| import sys |
|
|
| |
|
|
| def vread(buf: bytes, i: int): |
| shift = val = 0 |
| while True: |
| if i >= len(buf): |
| raise IndexError("Buffer exhausted during vread") |
| b = buf[i] |
| i += 1 |
| val |= (b & 0x7F) << shift |
| if b < 0x80: |
| return val, i |
| shift += 7 |
|
|
| def decompress_windows_starts_lens(b64_stream: str) -> tuple[list[int], list[int]]: |
| try: |
| buf = base64.b64decode(b64_stream) |
| i = 0 |
| cursor= 0 |
| starts, lens = [], [] |
| while i < len(buf): |
| gap, i = vread(buf, i) |
| size, i = vread(buf, i) |
| start = cursor + gap |
| length = size |
| starts.append(start) |
| lens.append(length) |
| cursor = start + length |
| return starts, lens |
| except (base64.binascii.Error, IndexError) as e: |
| print(f" [解码窗口时出错: {e}]") |
| return [], [] |
|
|
| def packed_bytes_to_pseudo(b: bytes) -> list[int]: |
| |
| out, acc, bits = [], 0, 0 |
| for byte in b: |
| acc |= byte << bits |
| bits += 8 |
| while bits >= 9: |
| out.append(acc & 0x1FF) |
| acc >>= 9 |
| bits -= 9 |
| |
| return out |
|
|
| |
|
|
| def parse_parameters_from_path(path_name: str) -> dict: |
| params = {} |
| base_name = os.path.basename(os.path.normpath(path_name)) |
| parts = base_name.split('_') |
| for part in parts: |
| if '-' in part: |
| key, value = part.split('-', 1) |
| params[key.lower()] = value.lower() |
| else: |
| match = re.match(r'([a-zA-Z]+)(\d+)', part) |
| if match: |
| key, value = match.groups() |
| params[key.lower()] = value |
| return params |
|
|
| def construct_compression_key(params: dict) -> str: |
| ow = params.get('ow', '20') |
| escape_fb = 'True' if params.get('escapefb', 'false') == 'true' else 'False' |
| iterative = 'True' if params.get('iterative', 'false') == 'true' else 'False' |
| force_padding = 'True' if params.get('forcepadding', 'false') == 'true' else 'False' |
| key = f"m1_ac_ow{ow}_escapefb-{escape_fb}_iterative-{iterative}_forcepadding-{force_padding}" |
| return key |
|
|
| |
|
|
| def debug_line(args): |
| print(f"--- 开始调试: 文件夹 '{args.input_dir}', 行号 {args.line_number} ---") |
|
|
| |
| target_line = args.line_number |
| current_line_count = 0 |
| line_content = None |
| |
| jsonl_files = sorted([os.path.join(r, f) for r, _, fs in os.walk(args.input_dir) for f in fs if f.endswith('.jsonl')]) |
| if not jsonl_files: |
| print(f"❌ 错误: 文件夹中没有 .jsonl 文件。") |
| return |
|
|
| for file_path in jsonl_files: |
| with open(file_path, 'r', errors='ignore') as f: |
| for line in f: |
| current_line_count += 1 |
| if current_line_count == target_line: |
| line_content = line |
| print(f"✅ 找到了第 {target_line} 行,位于文件: {file_path}") |
| break |
| if line_content: |
| break |
| |
| if not line_content: |
| print(f"❌ 错误: 未能找到第 {target_line} 行 (总共扫描了 {current_line_count} 行)。") |
| return |
|
|
| |
| params = parse_parameters_from_path(args.input_dir) |
| compression_key = construct_compression_key(params) |
| print(f"\n[步骤 A] 构建的压缩 Key: '{compression_key}'") |
| |
| try: |
| data = json.loads(line_content) |
| print(" -> JSON 加载成功。") |
| if compression_key not in data: |
| print(f" -> ❌ 错误: 构建的 Key 不在 JSON 对象中!") |
| print(f" JSON 中的可用 Keys: {list(data.keys())}") |
| return |
| print(" -> ✅ Key 匹配成功!") |
| except json.JSONDecodeError as e: |
| print(f"❌ 错误: JSON 解码失败: {e}") |
| return |
|
|
| |
| print("\n[步骤 B] 解码 'windows_starts_lens_b64'") |
| b64_windows = data.get('windows_starts_lens_b64', '') |
| print(f" -> 输入的 Base64 (前64字节): '{b64_windows[:64]}...'") |
| starts, lens = decompress_windows_starts_lens(b64_windows) |
| print(f" -> 解码结果: 共有 {len(starts)} 个窗口。") |
| if starts: |
| print(f" -> 前 5 个窗口 (start, length): {list(zip(starts, lens))[:5]}") |
|
|
| |
| print(f"\n[步骤 C] 解码压缩数据字段 '{compression_key}'") |
| b64_compressed = data.get(compression_key, '') |
| print(f" -> 输入的 Base64 (前64字节): '{b64_compressed[:64]}...'") |
| try: |
| decoded_bytes = base64.b64decode(b64_compressed) |
| print(f" -> Base64 解码后的字节长度: {len(decoded_bytes)}") |
| |
| mixed_pseudo_bytes = packed_bytes_to_pseudo(decoded_bytes) |
| print(f" -> `packed_bytes_to_pseudo` 输出的总元素数量: {len(mixed_pseudo_bytes)}") |
| print(f" -> 前 20 个元素: {mixed_pseudo_bytes[:20]}") |
| |
| pseudo_tokens = [t for t in mixed_pseudo_bytes if t >= 256] |
| print(f" -> 过滤后 (>= 256) 的压缩 Token 数量: {len(pseudo_tokens)}") |
| print(f" -> 前 20 个压缩 Token: {pseudo_tokens[:20]}") |
| |
| except Exception as e: |
| print(f" -> ❌ 在此步骤中发生错误: {e}") |
| return |
|
|
| |
| print("\n" + "="*20 + " 最终诊断 " + "="*20) |
| print(f"窗口数量 (来自 windows_starts_lens_b64): {len(starts)}") |
| print(f"压缩 Token 数量 (来自 {compression_key}): {len(pseudo_tokens)}") |
| if len(starts) == len(pseudo_tokens): |
| print("\n🟢 结论: 长度匹配!之前的脚本可能存在其他问题。") |
| else: |
| print("\n🔴 结论: 长度不匹配!这是导致100%失败的根本原因。") |
| print(" 这表明数据生成逻辑与我们的解析逻辑之间存在根本性的不一致。") |
| print(" 可能的原因:") |
| print(" 1. `windows_starts_lens_b64` 可能不包含所有压缩块的信息(例如,跳过了某些小块)。") |
| print(" 2. 最终的压缩流中可能包含了一些不对应于 `windows` 的特殊符号。") |
| print(" 3. `packed_bytes_to_pseudo` 的行为可能比我们想象的更复杂。") |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser( |
| description="调试单行压缩数据,以找出长度不匹配的根本原因。", |
| formatter_class=argparse.RawTextHelpFormatter |
| ) |
| parser.add_argument("input_dir", type=str, help="包含 .jsonl 数据文件的输入文件夹路径。") |
| parser.add_argument("--line_number", type=int, required=True, help="要检查的具体行号 (从1开始)。") |
| |
| args = parser.parse_args() |
| debug_line(args) |
|
|
|
|