| """Import Python-generated CORP-ENV examples into verification JSONL. |
| |
| This is a convenience bridge for files such as: |
| |
| data/raw/e1_to_e100_tasks.py |
| data/raw/m1_to_m100_tasks.py |
| |
| The importer looks for either: |
| |
| 1. list/tuple variables containing dictionaries, or |
| 2. generated `CorpTask` subclasses. |
| |
| For generated task classes, it synthesizes compatible action trajectories for |
| the current environment tracks (`e1_launch_readiness` and |
| `m1_budget_reallocation`) while preserving the generated task description as |
| metadata and prompt text. |
| |
| Example: |
| uv run python scripts/import_generated_examples.py \ |
| --inputs data/raw/e1_to_e100_tasks.py data/raw/m1_to_m100_tasks.py \ |
| --output data/raw/e1_m1_examples.jsonl |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import importlib.util |
| import sys |
| from pathlib import Path |
| import inspect |
| import json |
| from typing import Any, Dict, Iterable, List, Type |
|
|
| ROOT = Path(__file__).resolve().parents[1] |
| if str(ROOT) not in sys.path: |
| sys.path.insert(0, str(ROOT)) |
|
|
| from server.tasks.base import CorpTask |
| from scripts._trajectory_utils import write_jsonl |
|
|
|
|
| TASK_HINTS = { |
| "e1": "e1_launch_readiness", |
| "m1": "m1_budget_reallocation", |
| "h1": "h1_acquisition_defence", |
| } |
|
|
|
|
| def load_module(path: Path) -> Any: |
| spec = importlib.util.spec_from_file_location(path.stem, path) |
| if spec is None or spec.loader is None: |
| raise ValueError(f"cannot import {path}") |
| module = importlib.util.module_from_spec(spec) |
| spec.loader.exec_module(module) |
| return module |
|
|
|
|
| def infer_task_id(path: Path, obj: Dict[str, Any]) -> str: |
| explicit = obj.get("task_id") or obj.get("task") |
| if explicit: |
| return str(explicit) |
| lowered = path.stem.lower() |
| for hint, task_id in TASK_HINTS.items(): |
| if lowered.startswith(hint) or f"_{hint}_" in lowered: |
| return task_id |
| return "" |
|
|
|
|
| def candidate_examples(module: Any) -> Iterable[Dict[str, Any]]: |
| preferred_names = ( |
| "examples", |
| "tasks", |
| "trajectories", |
| "E1_TASKS", |
| "M1_TASKS", |
| "E1_EXAMPLES", |
| "M1_EXAMPLES", |
| ) |
| seen_ids = set() |
| for name in preferred_names + tuple(dir(module)): |
| if name.startswith("__") or name in seen_ids: |
| continue |
| seen_ids.add(name) |
| value = getattr(module, name, None) |
| if isinstance(value, (list, tuple)) and value and all(isinstance(x, dict) for x in value): |
| for item in value: |
| yield dict(item) |
|
|
|
|
| def generated_task_classes(module: Any) -> Iterable[Type[CorpTask]]: |
| for _, value in vars(module).items(): |
| if not inspect.isclass(value) or value is CorpTask: |
| continue |
| try: |
| if issubclass(value, CorpTask): |
| yield value |
| except TypeError: |
| continue |
|
|
|
|
| def is_e1_file(path: Path) -> bool: |
| return path.stem.lower().startswith("e1") |
|
|
|
|
| def is_m1_file(path: Path) -> bool: |
| return path.stem.lower().startswith("m1") |
|
|
|
|
| def synthesize_e1_actions(description: str) -> List[Dict[str, Any]]: |
| return [ |
| { |
| "action_type": "delegate", |
| "agent_id": "qa_engineer", |
| "payload": f"Assess launch readiness for this generated scenario: {description}", |
| }, |
| { |
| "action_type": "log_reasoning", |
| "payload": ( |
| "Use the QA report as the primary launch gate and decide whether " |
| "the release should proceed within the 48 hour window." |
| ), |
| }, |
| { |
| "action_type": "log_decision", |
| "payload": "Finalize based on QA stability, blockers, and launch gate evidence.", |
| }, |
| {"action_type": "finalize", "payload": "NO_GO"}, |
| ] |
|
|
|
|
| def synthesize_m1_actions(description: str) -> List[Dict[str, Any]]: |
| final = { |
| "phase_1": "Approve a capped GPU allocation for the highest-priority training runs.", |
| "phase_2": "Expand spend only after utilization and finance runway checks are reviewed.", |
| "guardrail": "Track budget, cost, spend, cash runway, and burn every week.", |
| "source_scenario": description[:300], |
| } |
| return [ |
| { |
| "action_type": "delegate", |
| "agent_id": "dev_lead", |
| "payload": f"State the engineering requirement and minimum viable plan for: {description}", |
| }, |
| { |
| "action_type": "delegate", |
| "agent_id": "fpa_manager", |
| "payload": f"State finance constraints, budget limits, runway, and spend guardrails for: {description}", |
| }, |
| { |
| "action_type": "log_reasoning", |
| "payload": ( |
| "The recommendation must balance engineering urgency against budget, " |
| "cost, spend, cash runway, and burn constraints." |
| ), |
| }, |
| { |
| "action_type": "log_conflict", |
| "payload": json.dumps( |
| { |
| "id": "c1", |
| "summary": "Engineering requirements exceed what finance should approve immediately.", |
| "source_agents": ["dev_lead", "fpa_manager"], |
| } |
| ), |
| }, |
| { |
| "action_type": "log_resolution", |
| "payload": json.dumps( |
| { |
| "conflict_id": "c1", |
| "resolution_type": "phased_budget", |
| "text": "Approve a capped phase_1 allocation with finance review before expansion.", |
| } |
| ), |
| }, |
| {"action_type": "finalize", "payload": json.dumps(final)}, |
| ] |
|
|
|
|
| def examples_from_task_classes(path: Path, module: Any) -> List[Dict[str, Any]]: |
| rows: List[Dict[str, Any]] = [] |
| for idx, cls in enumerate(generated_task_classes(module), start=1): |
| generated_task_id = str(getattr(cls, "task_id", cls.__name__)) |
| description = str(getattr(cls, "description", generated_task_id)) |
| if is_e1_file(path): |
| task_id = "e1_launch_readiness" |
| actions = synthesize_e1_actions(description) |
| elif is_m1_file(path): |
| task_id = "m1_budget_reallocation" |
| actions = synthesize_m1_actions(description) |
| else: |
| continue |
| rows.append( |
| { |
| "example_id": f"{path.stem}-{idx:03d}", |
| "task_id": task_id, |
| "source_file": str(path), |
| "source_kind": "generated_task_class", |
| "source_class": cls.__name__, |
| "generated_task_id": generated_task_id, |
| "generated_description": description, |
| "actions": actions, |
| } |
| ) |
| return rows |
|
|
|
|
| def import_file(path: Path) -> List[Dict[str, Any]]: |
| module = load_module(path) |
| rows: List[Dict[str, Any]] = [] |
| for idx, obj in enumerate(candidate_examples(module), start=1): |
| task_id = infer_task_id(path, obj) |
| if task_id: |
| obj["task_id"] = task_id |
| obj.setdefault("example_id", f"{path.stem}-{idx:03d}") |
| obj.setdefault("source_file", str(path)) |
| rows.append(obj) |
| if not rows: |
| rows.extend(examples_from_task_classes(path, module)) |
| return rows |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description="Import generated Python examples to JSONL.") |
| parser.add_argument("--inputs", nargs="+", required=True) |
| parser.add_argument("--output", default="data/raw/e1_m1_examples.jsonl") |
| args = parser.parse_args() |
|
|
| rows: List[Dict[str, Any]] = [] |
| for input_path in args.inputs: |
| path = Path(input_path) |
| if not path.exists(): |
| raise SystemExit(f"Input not found: {path}") |
| imported = import_file(path) |
| print(f"{path}: imported {len(imported)} examples") |
| rows.extend(imported) |
|
|
| if not rows: |
| raise SystemExit( |
| "No examples found. Expected a module-level list of dictionaries " |
| "or generated CorpTask subclasses." |
| ) |
|
|
| write_jsonl(Path(args.output), rows) |
| print(f"Wrote {len(rows)} examples to {args.output}") |
| print("Next: run scripts/verify_examples.py on the JSONL output.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|