Codeseys's picture
Wave 10 — packaging: composer_replication is now pip-installable
ac05fbf
"""claude_code_ingester.py — Claude Code session JSONL → TraceState iterator.
Maps the user's local `~/.claude/projects/<encoded>/<sessionId>.jsonl` files to
the existing `TraceState` schema (state_id + messages + student_action).
Design (per ADR-002):
- One TraceState per assistant TURN (not per tool_use block). Multiple tool_use
blocks in one assistant message belong to a single reasoning step.
- `student_action` = JSON-serialized list of (text + tool_use) blocks of the
assistant message. Teacher gets the message history before this turn and is
asked "what should the assistant do here?". Comparison vs the literal student
action gives our DPO signal.
- `messages` = OpenAI-style history of all records BEFORE this assistant turn.
System + user messages preserved; previous assistant turns flattened to text.
- `thinking` blocks STRIPPED from messages passed to teachers (teachers don't
have access to Claude's reasoning trace) but KEPT in student_action so the
reproduction loop sees what the student actually emitted.
- A synthetic system prompt is injected at messages[0] for trace IDs without one
(most Claude Code sessions don't have one written into the JSONL).
- Subagent traces (filenames starting with `agent-` OR records with
`isSidechain: True`) are SKIPPED in v0.1.
This is the v0.1 ingester. Non-goals:
- Reference-policy logprob precompute (lives in the data collator).
- Error-site detection (separate concern; uses tool_result is_error flag).
- DPO-pair extraction (lives in teacher_replay.extract_dpo_pairs).
"""
from __future__ import annotations
import json
import logging
import re
import sys
from collections.abc import Iterator
from dataclasses import dataclass
from pathlib import Path
from typing import Any, TypedDict
from composer_replication.teacher_replay import TraceState
logger = logging.getLogger(__name__)
SUPPORTED_VERSIONS = re.compile(r"^2\.\d+\.\d+$")
SYSTEM_PROMPT = (
"You are a senior software engineer working as a coding agent in a terminal "
"environment. You can call tools (Bash, Read, Write, Edit, Grep, etc.) and "
"see their outputs. Reason carefully before each action. When a tool fails, "
"diagnose the cause and adjust."
)
@dataclass
class IngestionStats:
n_records_total: int = 0
n_records_skipped: int = 0
n_states_emitted: int = 0
n_assistant_turns: int = 0
n_tool_use_blocks: int = 0
n_text_blocks: int = 0
skipped_subagent: int = 0
skipped_summary: int = 0
skipped_truncated_lines: int = 0
version_warnings: list[str] | None = None
def __post_init__(self) -> None:
if self.version_warnings is None:
self.version_warnings = []
class ClaudeCodeIngester:
"""Convert one or more Claude Code session JSONL files to TraceState records.
Usage:
ingester = ClaudeCodeIngester()
for state in ingester.ingest(Path("session.jsonl")):
...
stats = ingester.last_stats
"""
def __init__(
self,
*,
system_prompt: str = SYSTEM_PROMPT,
skip_sidechain: bool = True,
strip_thinking: bool = True,
max_history_tokens: int | None = None,
) -> None:
self.system_prompt = system_prompt
self.skip_sidechain = skip_sidechain
self.strip_thinking = strip_thinking
self.max_history_tokens = max_history_tokens
self.last_stats = IngestionStats()
def ingest(self, path: Path) -> Iterator[TraceState]:
"""Yield one TraceState per assistant turn in the given session JSONL."""
self.last_stats = IngestionStats()
stats = self.last_stats
# Skip subagent files by filename convention
if self.skip_sidechain and path.name.startswith("agent-"):
logger.info("Skipping subagent file: %s", path)
stats.skipped_subagent = 1
return
records = list(self._iter_records(path))
# Build a quick lookup of records that ARE assistant turns; everything
# else feeds the message history we hand to teachers.
history: list[dict[str, Any]] = [
{"role": "system", "content": self.system_prompt}
]
state_idx = 0
for rec in records:
stats.n_records_total += 1
rec_type = rec.get("type")
if rec_type == "summary":
stats.skipped_summary += 1
continue
if rec_type in {"attachment", "queue-operation", "file-history-snapshot",
"last-prompt", "system"}:
stats.n_records_skipped += 1
continue
if self.skip_sidechain and rec.get("isSidechain") is True:
stats.skipped_subagent += 1
continue
if rec_type == "user":
msg = rec.get("message", {})
content = msg.get("content")
if isinstance(content, str):
history.append({"role": "user", "content": content})
elif isinstance(content, list):
# Either text blocks (a real human prompt) or tool_result
# blocks (an observation). Both go into history as user
# messages, but we serialize them differently.
flat = self._flatten_user_content(content)
if flat:
history.append({"role": "user", "content": flat})
elif rec_type == "assistant":
msg = rec.get("message", {})
content = msg.get("content")
if not isinstance(content, list):
stats.n_records_skipped += 1
continue
# Build student_action from this assistant message's content
# (KEEPING thinking blocks in student_action — that's the
# actual student emission we'd be RL-training).
student_action = self._serialize_assistant_content(
content, strip_thinking=False,
)
if not student_action:
# Empty assistant turn — skip
stats.n_records_skipped += 1
continue
# Track block counts
for block in content:
if isinstance(block, dict):
bt = block.get("type")
if bt == "tool_use":
stats.n_tool_use_blocks += 1
elif bt == "text":
stats.n_text_blocks += 1
# Build the messages handed to teachers — strip thinking
# blocks if configured.
teacher_history = self._maybe_strip_thinking(history)
state = TraceState(
state_id=f"{path.stem}::{state_idx:04d}",
messages=list(teacher_history), # snapshot
student_action=student_action,
)
yield state
stats.n_states_emitted += 1
state_idx += 1
stats.n_assistant_turns += 1
# Append a flattened version of this assistant turn to history
# for the NEXT teacher call (history grows with each turn).
history.append({
"role": "assistant",
"content": self._serialize_assistant_content(
content, strip_thinking=self.strip_thinking,
),
})
# Validate version field of last seen record (best-effort)
if records:
v = records[-1].get("version")
if v and not SUPPORTED_VERSIONS.match(str(v)):
stats.version_warnings.append(
f"Unrecognized version {v!r} in {path.name} — ingester "
"tested against 2.x.x. Check schema compatibility."
)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _iter_records(self, path: Path) -> Iterator[dict[str, Any]]:
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
self.last_stats.skipped_truncated_lines += 1
logger.debug("Truncated/malformed line in %s: %s", path, e)
continue
def _flatten_user_content(self, content: list[Any]) -> str:
"""Convert a user record's content list to a single string."""
parts: list[str] = []
for block in content:
if not isinstance(block, dict):
continue
bt = block.get("type")
if bt == "text":
txt = block.get("text", "")
if txt:
parts.append(txt)
elif bt == "tool_result":
tc = block.get("content", "")
if isinstance(tc, list):
# Sometimes content is itself a list of blocks
sub = []
for sb in tc:
if isinstance(sb, dict) and sb.get("type") == "text":
sub.append(sb.get("text", ""))
tc = "\n".join(sub)
tu_id = block.get("tool_use_id", "<unknown>")
is_err = block.get("is_error", False)
tag = "[TOOL_RESULT (ERROR)]" if is_err else "[TOOL_RESULT]"
parts.append(f"{tag} (id={tu_id})\n{tc}")
elif bt == "image":
parts.append("[IMAGE OMITTED]")
return "\n\n".join(parts)
def _serialize_assistant_content(
self, content: list[Any], *, strip_thinking: bool,
) -> str:
"""Serialize an assistant message's content list to a string.
Preserves:
text blocks → as-is
thinking blocks → "[THINKING] ..." (or stripped)
tool_use blocks → "[TOOL_USE] name=... input={json}"
"""
parts: list[str] = []
for block in content:
if not isinstance(block, dict):
continue
bt = block.get("type")
if bt == "text":
parts.append(block.get("text", ""))
elif bt == "thinking":
if not strip_thinking:
parts.append(f"[THINKING] {block.get('thinking', '')}")
elif bt == "tool_use":
name = block.get("name", "")
inp = block.get("input", {})
try:
inp_str = json.dumps(inp, separators=(",", ":"))
except (TypeError, ValueError):
inp_str = str(inp)
parts.append(f"[TOOL_USE] name={name} input={inp_str}")
return "\n\n".join(p for p in parts if p)
def _maybe_strip_thinking(self, history: list[dict[str, Any]]) -> list[dict[str, Any]]:
if not self.strip_thinking:
return history
out = []
for msg in history:
if msg["role"] != "assistant":
out.append(msg)
continue
# Strip [THINKING] lines from assistant content
content = msg["content"]
if isinstance(content, str):
lines = content.split("\n\n")
kept = [l for l in lines if not l.strip().startswith("[THINKING]")]
out.append({"role": "assistant", "content": "\n\n".join(kept)})
else:
out.append(msg)
return out
__all__ = ["ClaudeCodeIngester", "IngestionStats", "SYSTEM_PROMPT"]