| """Replay generated examples against the current CORP-ENV implementation. |
| |
| The script keeps only trajectories that parse as `CorpAction`, run through |
| `CorpEnvironment`, terminate with `finalize`, and meet task-specific thresholds. |
| |
| Example: |
| uv run python scripts/verify_examples.py \ |
| --input data/raw/e1_m1_examples.jsonl \ |
| --clean data/processed/e1_m1_clean.jsonl \ |
| --rejected data/processed/e1_m1_rejected.jsonl \ |
| --summary results/example_verification_summary.json |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import sys |
| from collections import Counter, defaultdict |
| from pathlib import Path |
| from typing import Any, Dict, List |
|
|
| ROOT = Path(__file__).resolve().parents[1] |
| if str(ROOT) not in sys.path: |
| sys.path.insert(0, str(ROOT)) |
|
|
| from scripts._trajectory_utils import ( |
| extract_actions, |
| read_jsonl, |
| replay_actions, |
| write_jsonl, |
| ) |
|
|
|
|
| def verify_one( |
| example: Dict[str, Any], |
| strict_thresholds: bool, |
| strict_json: bool, |
| require_stepwise_deliberation: bool, |
| ) -> Dict[str, Any]: |
| example_id = str(example.get("example_id") or example.get("id") or "unknown") |
| task_id = str(example.get("task_id") or example.get("task") or "") |
| if example.get("_load_error"): |
| return { |
| "example_id": example_id, |
| "task_id": task_id, |
| "status": "rejected", |
| "reject_reason": example["_load_error"], |
| "actions": [], |
| } |
| if not task_id: |
| return { |
| "example_id": example_id, |
| "task_id": task_id, |
| "status": "rejected", |
| "reject_reason": "missing_task_id", |
| "actions": [], |
| } |
| try: |
| actions = extract_actions(example) |
| except Exception as exc: |
| return { |
| "example_id": example_id, |
| "task_id": task_id, |
| "status": "rejected", |
| "reject_reason": f"action_extraction_failed: {exc}", |
| "actions": [], |
| } |
| try: |
| result = replay_actions( |
| example_id=example_id, |
| task_id=task_id, |
| actions=actions, |
| strict_thresholds=strict_thresholds, |
| strict_json=strict_json, |
| require_stepwise_deliberation=require_stepwise_deliberation, |
| ) |
| except Exception as exc: |
| return { |
| "example_id": example_id, |
| "task_id": task_id, |
| "status": "rejected", |
| "reject_reason": f"replay_failed: {exc}", |
| "actions": actions, |
| } |
| return result.to_record() |
|
|
|
|
| def summarize(records: List[Dict[str, Any]]) -> Dict[str, Any]: |
| by_status = Counter(r["status"] for r in records) |
| by_reason = Counter(r.get("reject_reason") or "clean" for r in records) |
| by_task: Dict[str, Counter] = defaultdict(Counter) |
| for row in records: |
| by_task[row.get("task_id", "unknown")][row["status"]] += 1 |
| clean = [r for r in records if r["status"] == "clean"] |
| return { |
| "total": len(records), |
| "by_status": dict(by_status), |
| "by_reject_reason": dict(by_reason), |
| "by_task": {task: dict(counts) for task, counts in by_task.items()}, |
| "clean_avg_terminal_reward": ( |
| round(sum(float(r.get("terminal_reward", 0.0)) for r in clean) / len(clean), 6) |
| if clean |
| else 0.0 |
| ), |
| "clean_avg_verifier_pass_rate": ( |
| round(sum(float(r.get("verifier_pass_rate", 0.0)) for r in clean) / len(clean), 6) |
| if clean |
| else 0.0 |
| ), |
| "clean_avg_reasoning_steps": ( |
| round(sum(float(r.get("reasoning_steps", 0.0)) for r in clean) / len(clean), 6) |
| if clean |
| else 0.0 |
| ), |
| "clean_phase_progression_ok_rate": ( |
| round(sum(1.0 for r in clean if r.get("phase_progression_ok")) / len(clean), 6) |
| if clean |
| else 0.0 |
| ), |
| } |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description="Verify generated CORP-ENV examples.") |
| parser.add_argument("--input", required=True, help="Raw or normalized examples JSONL.") |
| parser.add_argument("--clean", default="data/processed/e1_m1_clean.jsonl") |
| parser.add_argument("--rejected", default="data/processed/e1_m1_rejected.jsonl") |
| parser.add_argument("--all-records", default="results/example_verification_all.jsonl") |
| parser.add_argument("--summary", default="results/example_verification_summary.json") |
| parser.add_argument( |
| "--lenient", |
| action="store_true", |
| help="Only require replay validity; do not apply task reward/pass thresholds.", |
| ) |
| parser.add_argument( |
| "--strict-json", |
| action="store_true", |
| help="Require strict single-object JSON action format with canonical action keys.", |
| ) |
| parser.add_argument( |
| "--require-stepwise-deliberation", |
| action="store_true", |
| help="Enforce task-specific step-wise deliberation checks (reasoning/conflict/resolution/phases).", |
| ) |
| args = parser.parse_args() |
|
|
| records = [ |
| verify_one( |
| example, |
| strict_thresholds=not args.lenient, |
| strict_json=args.strict_json, |
| require_stepwise_deliberation=args.require_stepwise_deliberation, |
| ) |
| for example in read_jsonl(Path(args.input)) |
| ] |
| clean = [r for r in records if r["status"] == "clean"] |
| rejected = [r for r in records if r["status"] != "clean"] |
|
|
| write_jsonl(Path(args.clean), clean) |
| write_jsonl(Path(args.rejected), rejected) |
| write_jsonl(Path(args.all_records), records) |
|
|
| summary = summarize(records) |
| summary_path = Path(args.summary) |
| summary_path.parent.mkdir(parents=True, exist_ok=True) |
| summary_path.write_text(json.dumps(summary, indent=2, ensure_ascii=False), encoding="utf-8") |
|
|
| print(json.dumps(summary, indent=2, ensure_ascii=False)) |
| print(f"\nClean examples: {args.clean}") |
| print(f"Rejected examples: {args.rejected}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|