Reinforcement Learning
Transformers
English
post-training
distillation
agentic-coding
composer-2.5
cursor
kimi-k2
grpo
dapo
diloco
openenv
trl
verl
research
methodology
Instructions to use Codeseys/composer-replication-framework with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use Codeseys/composer-replication-framework with Transformers:
# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("Codeseys/composer-replication-framework", dtype="auto") - Notebooks
- Google Colab
- Kaggle
| """claude_code_ingester.py — Claude Code session JSONL → TraceState iterator. | |
| Maps the user's local `~/.claude/projects/<encoded>/<sessionId>.jsonl` files to | |
| the existing `TraceState` schema (state_id + messages + student_action). | |
| Design (per ADR-002): | |
| - One TraceState per assistant TURN (not per tool_use block). Multiple tool_use | |
| blocks in one assistant message belong to a single reasoning step. | |
| - `student_action` = JSON-serialized list of (text + tool_use) blocks of the | |
| assistant message. Teacher gets the message history before this turn and is | |
| asked "what should the assistant do here?". Comparison vs the literal student | |
| action gives our DPO signal. | |
| - `messages` = OpenAI-style history of all records BEFORE this assistant turn. | |
| System + user messages preserved; previous assistant turns flattened to text. | |
| - `thinking` blocks STRIPPED from messages passed to teachers (teachers don't | |
| have access to Claude's reasoning trace) but KEPT in student_action so the | |
| reproduction loop sees what the student actually emitted. | |
| - A synthetic system prompt is injected at messages[0] for trace IDs without one | |
| (most Claude Code sessions don't have one written into the JSONL). | |
| - Subagent traces (filenames starting with `agent-` OR records with | |
| `isSidechain: True`) are SKIPPED in v0.1. | |
| This is the v0.1 ingester. Non-goals: | |
| - Reference-policy logprob precompute (lives in the data collator). | |
| - Error-site detection (separate concern; uses tool_result is_error flag). | |
| - DPO-pair extraction (lives in teacher_replay.extract_dpo_pairs). | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import re | |
| import sys | |
| from collections.abc import Iterator | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any, TypedDict | |
| from composer_replication.teacher_replay import TraceState | |
| logger = logging.getLogger(__name__) | |
| SUPPORTED_VERSIONS = re.compile(r"^2\.\d+\.\d+$") | |
| SYSTEM_PROMPT = ( | |
| "You are a senior software engineer working as a coding agent in a terminal " | |
| "environment. You can call tools (Bash, Read, Write, Edit, Grep, etc.) and " | |
| "see their outputs. Reason carefully before each action. When a tool fails, " | |
| "diagnose the cause and adjust." | |
| ) | |
| class IngestionStats: | |
| n_records_total: int = 0 | |
| n_records_skipped: int = 0 | |
| n_states_emitted: int = 0 | |
| n_assistant_turns: int = 0 | |
| n_tool_use_blocks: int = 0 | |
| n_text_blocks: int = 0 | |
| skipped_subagent: int = 0 | |
| skipped_summary: int = 0 | |
| skipped_truncated_lines: int = 0 | |
| version_warnings: list[str] | None = None | |
| def __post_init__(self) -> None: | |
| if self.version_warnings is None: | |
| self.version_warnings = [] | |
| class ClaudeCodeIngester: | |
| """Convert one or more Claude Code session JSONL files to TraceState records. | |
| Usage: | |
| ingester = ClaudeCodeIngester() | |
| for state in ingester.ingest(Path("session.jsonl")): | |
| ... | |
| stats = ingester.last_stats | |
| """ | |
| def __init__( | |
| self, | |
| *, | |
| system_prompt: str = SYSTEM_PROMPT, | |
| skip_sidechain: bool = True, | |
| strip_thinking: bool = True, | |
| max_history_tokens: int | None = None, | |
| ) -> None: | |
| self.system_prompt = system_prompt | |
| self.skip_sidechain = skip_sidechain | |
| self.strip_thinking = strip_thinking | |
| self.max_history_tokens = max_history_tokens | |
| self.last_stats = IngestionStats() | |
| def ingest(self, path: Path) -> Iterator[TraceState]: | |
| """Yield one TraceState per assistant turn in the given session JSONL.""" | |
| self.last_stats = IngestionStats() | |
| stats = self.last_stats | |
| # Skip subagent files by filename convention | |
| if self.skip_sidechain and path.name.startswith("agent-"): | |
| logger.info("Skipping subagent file: %s", path) | |
| stats.skipped_subagent = 1 | |
| return | |
| records = list(self._iter_records(path)) | |
| # Build a quick lookup of records that ARE assistant turns; everything | |
| # else feeds the message history we hand to teachers. | |
| history: list[dict[str, Any]] = [ | |
| {"role": "system", "content": self.system_prompt} | |
| ] | |
| state_idx = 0 | |
| for rec in records: | |
| stats.n_records_total += 1 | |
| rec_type = rec.get("type") | |
| if rec_type == "summary": | |
| stats.skipped_summary += 1 | |
| continue | |
| if rec_type in {"attachment", "queue-operation", "file-history-snapshot", | |
| "last-prompt", "system"}: | |
| stats.n_records_skipped += 1 | |
| continue | |
| if self.skip_sidechain and rec.get("isSidechain") is True: | |
| stats.skipped_subagent += 1 | |
| continue | |
| if rec_type == "user": | |
| msg = rec.get("message", {}) | |
| content = msg.get("content") | |
| if isinstance(content, str): | |
| history.append({"role": "user", "content": content}) | |
| elif isinstance(content, list): | |
| # Either text blocks (a real human prompt) or tool_result | |
| # blocks (an observation). Both go into history as user | |
| # messages, but we serialize them differently. | |
| flat = self._flatten_user_content(content) | |
| if flat: | |
| history.append({"role": "user", "content": flat}) | |
| elif rec_type == "assistant": | |
| msg = rec.get("message", {}) | |
| content = msg.get("content") | |
| if not isinstance(content, list): | |
| stats.n_records_skipped += 1 | |
| continue | |
| # Build student_action from this assistant message's content | |
| # (KEEPING thinking blocks in student_action — that's the | |
| # actual student emission we'd be RL-training). | |
| student_action = self._serialize_assistant_content( | |
| content, strip_thinking=False, | |
| ) | |
| if not student_action: | |
| # Empty assistant turn — skip | |
| stats.n_records_skipped += 1 | |
| continue | |
| # Track block counts | |
| for block in content: | |
| if isinstance(block, dict): | |
| bt = block.get("type") | |
| if bt == "tool_use": | |
| stats.n_tool_use_blocks += 1 | |
| elif bt == "text": | |
| stats.n_text_blocks += 1 | |
| # Build the messages handed to teachers — strip thinking | |
| # blocks if configured. | |
| teacher_history = self._maybe_strip_thinking(history) | |
| state = TraceState( | |
| state_id=f"{path.stem}::{state_idx:04d}", | |
| messages=list(teacher_history), # snapshot | |
| student_action=student_action, | |
| ) | |
| yield state | |
| stats.n_states_emitted += 1 | |
| state_idx += 1 | |
| stats.n_assistant_turns += 1 | |
| # Append a flattened version of this assistant turn to history | |
| # for the NEXT teacher call (history grows with each turn). | |
| history.append({ | |
| "role": "assistant", | |
| "content": self._serialize_assistant_content( | |
| content, strip_thinking=self.strip_thinking, | |
| ), | |
| }) | |
| # Validate version field of last seen record (best-effort) | |
| if records: | |
| v = records[-1].get("version") | |
| if v and not SUPPORTED_VERSIONS.match(str(v)): | |
| stats.version_warnings.append( | |
| f"Unrecognized version {v!r} in {path.name} — ingester " | |
| "tested against 2.x.x. Check schema compatibility." | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Helpers | |
| # ------------------------------------------------------------------ | |
| def _iter_records(self, path: Path) -> Iterator[dict[str, Any]]: | |
| with path.open("r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| yield json.loads(line) | |
| except json.JSONDecodeError as e: | |
| self.last_stats.skipped_truncated_lines += 1 | |
| logger.debug("Truncated/malformed line in %s: %s", path, e) | |
| continue | |
| def _flatten_user_content(self, content: list[Any]) -> str: | |
| """Convert a user record's content list to a single string.""" | |
| parts: list[str] = [] | |
| for block in content: | |
| if not isinstance(block, dict): | |
| continue | |
| bt = block.get("type") | |
| if bt == "text": | |
| txt = block.get("text", "") | |
| if txt: | |
| parts.append(txt) | |
| elif bt == "tool_result": | |
| tc = block.get("content", "") | |
| if isinstance(tc, list): | |
| # Sometimes content is itself a list of blocks | |
| sub = [] | |
| for sb in tc: | |
| if isinstance(sb, dict) and sb.get("type") == "text": | |
| sub.append(sb.get("text", "")) | |
| tc = "\n".join(sub) | |
| tu_id = block.get("tool_use_id", "<unknown>") | |
| is_err = block.get("is_error", False) | |
| tag = "[TOOL_RESULT (ERROR)]" if is_err else "[TOOL_RESULT]" | |
| parts.append(f"{tag} (id={tu_id})\n{tc}") | |
| elif bt == "image": | |
| parts.append("[IMAGE OMITTED]") | |
| return "\n\n".join(parts) | |
| def _serialize_assistant_content( | |
| self, content: list[Any], *, strip_thinking: bool, | |
| ) -> str: | |
| """Serialize an assistant message's content list to a string. | |
| Preserves: | |
| text blocks → as-is | |
| thinking blocks → "[THINKING] ..." (or stripped) | |
| tool_use blocks → "[TOOL_USE] name=... input={json}" | |
| """ | |
| parts: list[str] = [] | |
| for block in content: | |
| if not isinstance(block, dict): | |
| continue | |
| bt = block.get("type") | |
| if bt == "text": | |
| parts.append(block.get("text", "")) | |
| elif bt == "thinking": | |
| if not strip_thinking: | |
| parts.append(f"[THINKING] {block.get('thinking', '')}") | |
| elif bt == "tool_use": | |
| name = block.get("name", "") | |
| inp = block.get("input", {}) | |
| try: | |
| inp_str = json.dumps(inp, separators=(",", ":")) | |
| except (TypeError, ValueError): | |
| inp_str = str(inp) | |
| parts.append(f"[TOOL_USE] name={name} input={inp_str}") | |
| return "\n\n".join(p for p in parts if p) | |
| def _maybe_strip_thinking(self, history: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| if not self.strip_thinking: | |
| return history | |
| out = [] | |
| for msg in history: | |
| if msg["role"] != "assistant": | |
| out.append(msg) | |
| continue | |
| # Strip [THINKING] lines from assistant content | |
| content = msg["content"] | |
| if isinstance(content, str): | |
| lines = content.split("\n\n") | |
| kept = [l for l in lines if not l.strip().startswith("[THINKING]")] | |
| out.append({"role": "assistant", "content": "\n\n".join(kept)}) | |
| else: | |
| out.append(msg) | |
| return out | |
| __all__ = ["ClaudeCodeIngester", "IngestionStats", "SYSTEM_PROMPT"] | |