"""claude_code_ingester.py — Claude Code session JSONL → TraceState iterator. Maps the user's local `~/.claude/projects//.jsonl` files to the existing `TraceState` schema (state_id + messages + student_action). Design (per ADR-002): - One TraceState per assistant TURN (not per tool_use block). Multiple tool_use blocks in one assistant message belong to a single reasoning step. - `student_action` = JSON-serialized list of (text + tool_use) blocks of the assistant message. Teacher gets the message history before this turn and is asked "what should the assistant do here?". Comparison vs the literal student action gives our DPO signal. - `messages` = OpenAI-style history of all records BEFORE this assistant turn. System + user messages preserved; previous assistant turns flattened to text. - `thinking` blocks STRIPPED from messages passed to teachers (teachers don't have access to Claude's reasoning trace) but KEPT in student_action so the reproduction loop sees what the student actually emitted. - A synthetic system prompt is injected at messages[0] for trace IDs without one (most Claude Code sessions don't have one written into the JSONL). - Subagent traces (filenames starting with `agent-` OR records with `isSidechain: True`) are SKIPPED in v0.1. This is the v0.1 ingester. Non-goals: - Reference-policy logprob precompute (lives in the data collator). - Error-site detection (separate concern; uses tool_result is_error flag). - DPO-pair extraction (lives in teacher_replay.extract_dpo_pairs). """ from __future__ import annotations import json import logging import re import sys from collections.abc import Iterator from dataclasses import dataclass from pathlib import Path from typing import Any, TypedDict from composer_replication.teacher_replay import TraceState logger = logging.getLogger(__name__) SUPPORTED_VERSIONS = re.compile(r"^2\.\d+\.\d+$") SYSTEM_PROMPT = ( "You are a senior software engineer working as a coding agent in a terminal " "environment. You can call tools (Bash, Read, Write, Edit, Grep, etc.) and " "see their outputs. Reason carefully before each action. When a tool fails, " "diagnose the cause and adjust." ) @dataclass class IngestionStats: n_records_total: int = 0 n_records_skipped: int = 0 n_states_emitted: int = 0 n_assistant_turns: int = 0 n_tool_use_blocks: int = 0 n_text_blocks: int = 0 skipped_subagent: int = 0 skipped_summary: int = 0 skipped_truncated_lines: int = 0 version_warnings: list[str] | None = None def __post_init__(self) -> None: if self.version_warnings is None: self.version_warnings = [] class ClaudeCodeIngester: """Convert one or more Claude Code session JSONL files to TraceState records. Usage: ingester = ClaudeCodeIngester() for state in ingester.ingest(Path("session.jsonl")): ... stats = ingester.last_stats """ def __init__( self, *, system_prompt: str = SYSTEM_PROMPT, skip_sidechain: bool = True, strip_thinking: bool = True, max_history_tokens: int | None = None, ) -> None: self.system_prompt = system_prompt self.skip_sidechain = skip_sidechain self.strip_thinking = strip_thinking self.max_history_tokens = max_history_tokens self.last_stats = IngestionStats() def ingest(self, path: Path) -> Iterator[TraceState]: """Yield one TraceState per assistant turn in the given session JSONL.""" self.last_stats = IngestionStats() stats = self.last_stats # Skip subagent files by filename convention if self.skip_sidechain and path.name.startswith("agent-"): logger.info("Skipping subagent file: %s", path) stats.skipped_subagent = 1 return records = list(self._iter_records(path)) # Build a quick lookup of records that ARE assistant turns; everything # else feeds the message history we hand to teachers. history: list[dict[str, Any]] = [ {"role": "system", "content": self.system_prompt} ] state_idx = 0 for rec in records: stats.n_records_total += 1 rec_type = rec.get("type") if rec_type == "summary": stats.skipped_summary += 1 continue if rec_type in {"attachment", "queue-operation", "file-history-snapshot", "last-prompt", "system"}: stats.n_records_skipped += 1 continue if self.skip_sidechain and rec.get("isSidechain") is True: stats.skipped_subagent += 1 continue if rec_type == "user": msg = rec.get("message", {}) content = msg.get("content") if isinstance(content, str): history.append({"role": "user", "content": content}) elif isinstance(content, list): # Either text blocks (a real human prompt) or tool_result # blocks (an observation). Both go into history as user # messages, but we serialize them differently. flat = self._flatten_user_content(content) if flat: history.append({"role": "user", "content": flat}) elif rec_type == "assistant": msg = rec.get("message", {}) content = msg.get("content") if not isinstance(content, list): stats.n_records_skipped += 1 continue # Build student_action from this assistant message's content # (KEEPING thinking blocks in student_action — that's the # actual student emission we'd be RL-training). student_action = self._serialize_assistant_content( content, strip_thinking=False, ) if not student_action: # Empty assistant turn — skip stats.n_records_skipped += 1 continue # Track block counts for block in content: if isinstance(block, dict): bt = block.get("type") if bt == "tool_use": stats.n_tool_use_blocks += 1 elif bt == "text": stats.n_text_blocks += 1 # Build the messages handed to teachers — strip thinking # blocks if configured. teacher_history = self._maybe_strip_thinking(history) state = TraceState( state_id=f"{path.stem}::{state_idx:04d}", messages=list(teacher_history), # snapshot student_action=student_action, ) yield state stats.n_states_emitted += 1 state_idx += 1 stats.n_assistant_turns += 1 # Append a flattened version of this assistant turn to history # for the NEXT teacher call (history grows with each turn). history.append({ "role": "assistant", "content": self._serialize_assistant_content( content, strip_thinking=self.strip_thinking, ), }) # Validate version field of last seen record (best-effort) if records: v = records[-1].get("version") if v and not SUPPORTED_VERSIONS.match(str(v)): stats.version_warnings.append( f"Unrecognized version {v!r} in {path.name} — ingester " "tested against 2.x.x. Check schema compatibility." ) # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _iter_records(self, path: Path) -> Iterator[dict[str, Any]]: with path.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: yield json.loads(line) except json.JSONDecodeError as e: self.last_stats.skipped_truncated_lines += 1 logger.debug("Truncated/malformed line in %s: %s", path, e) continue def _flatten_user_content(self, content: list[Any]) -> str: """Convert a user record's content list to a single string.""" parts: list[str] = [] for block in content: if not isinstance(block, dict): continue bt = block.get("type") if bt == "text": txt = block.get("text", "") if txt: parts.append(txt) elif bt == "tool_result": tc = block.get("content", "") if isinstance(tc, list): # Sometimes content is itself a list of blocks sub = [] for sb in tc: if isinstance(sb, dict) and sb.get("type") == "text": sub.append(sb.get("text", "")) tc = "\n".join(sub) tu_id = block.get("tool_use_id", "") is_err = block.get("is_error", False) tag = "[TOOL_RESULT (ERROR)]" if is_err else "[TOOL_RESULT]" parts.append(f"{tag} (id={tu_id})\n{tc}") elif bt == "image": parts.append("[IMAGE OMITTED]") return "\n\n".join(parts) def _serialize_assistant_content( self, content: list[Any], *, strip_thinking: bool, ) -> str: """Serialize an assistant message's content list to a string. Preserves: text blocks → as-is thinking blocks → "[THINKING] ..." (or stripped) tool_use blocks → "[TOOL_USE] name=... input={json}" """ parts: list[str] = [] for block in content: if not isinstance(block, dict): continue bt = block.get("type") if bt == "text": parts.append(block.get("text", "")) elif bt == "thinking": if not strip_thinking: parts.append(f"[THINKING] {block.get('thinking', '')}") elif bt == "tool_use": name = block.get("name", "") inp = block.get("input", {}) try: inp_str = json.dumps(inp, separators=(",", ":")) except (TypeError, ValueError): inp_str = str(inp) parts.append(f"[TOOL_USE] name={name} input={inp_str}") return "\n\n".join(p for p in parts if p) def _maybe_strip_thinking(self, history: list[dict[str, Any]]) -> list[dict[str, Any]]: if not self.strip_thinking: return history out = [] for msg in history: if msg["role"] != "assistant": out.append(msg) continue # Strip [THINKING] lines from assistant content content = msg["content"] if isinstance(content, str): lines = content.split("\n\n") kept = [l for l in lines if not l.strip().startswith("[THINKING]")] out.append({"role": "assistant", "content": "\n\n".join(kept)}) else: out.append(msg) return out __all__ = ["ClaudeCodeIngester", "IngestionStats", "SYSTEM_PROMPT"]