corp-env / scripts /import_generated_examples.py
Navigam's picture
feat: add new task definitions and data files for launch readiness scenarios
2a98962
"""Import Python-generated CORP-ENV examples into verification JSONL.
This is a convenience bridge for files such as:
data/raw/e1_to_e100_tasks.py
data/raw/m1_to_m100_tasks.py
The importer looks for either:
1. list/tuple variables containing dictionaries, or
2. generated `CorpTask` subclasses.
For generated task classes, it synthesizes compatible action trajectories for
the current environment tracks (`e1_launch_readiness` and
`m1_budget_reallocation`) while preserving the generated task description as
metadata and prompt text.
Example:
uv run python scripts/import_generated_examples.py \
--inputs data/raw/e1_to_e100_tasks.py data/raw/m1_to_m100_tasks.py \
--output data/raw/e1_m1_examples.jsonl
"""
from __future__ import annotations
import argparse
import importlib.util
import sys
from pathlib import Path
import inspect
import json
from typing import Any, Dict, Iterable, List, Type
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from server.tasks.base import CorpTask # noqa: E402
from scripts._trajectory_utils import write_jsonl # noqa: E402
TASK_HINTS = {
"e1": "e1_launch_readiness",
"m1": "m1_budget_reallocation",
"h1": "h1_acquisition_defence",
}
def load_module(path: Path) -> Any:
spec = importlib.util.spec_from_file_location(path.stem, path)
if spec is None or spec.loader is None:
raise ValueError(f"cannot import {path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def infer_task_id(path: Path, obj: Dict[str, Any]) -> str:
explicit = obj.get("task_id") or obj.get("task")
if explicit:
return str(explicit)
lowered = path.stem.lower()
for hint, task_id in TASK_HINTS.items():
if lowered.startswith(hint) or f"_{hint}_" in lowered:
return task_id
return ""
def candidate_examples(module: Any) -> Iterable[Dict[str, Any]]:
preferred_names = (
"examples",
"tasks",
"trajectories",
"E1_TASKS",
"M1_TASKS",
"E1_EXAMPLES",
"M1_EXAMPLES",
)
seen_ids = set()
for name in preferred_names + tuple(dir(module)):
if name.startswith("__") or name in seen_ids:
continue
seen_ids.add(name)
value = getattr(module, name, None)
if isinstance(value, (list, tuple)) and value and all(isinstance(x, dict) for x in value):
for item in value:
yield dict(item)
def generated_task_classes(module: Any) -> Iterable[Type[CorpTask]]:
for _, value in vars(module).items():
if not inspect.isclass(value) or value is CorpTask:
continue
try:
if issubclass(value, CorpTask):
yield value
except TypeError:
continue
def is_e1_file(path: Path) -> bool:
return path.stem.lower().startswith("e1")
def is_m1_file(path: Path) -> bool:
return path.stem.lower().startswith("m1")
def synthesize_e1_actions(description: str) -> List[Dict[str, Any]]:
return [
{
"action_type": "delegate",
"agent_id": "qa_engineer",
"payload": f"Assess launch readiness for this generated scenario: {description}",
},
{
"action_type": "log_reasoning",
"payload": (
"Use the QA report as the primary launch gate and decide whether "
"the release should proceed within the 48 hour window."
),
},
{
"action_type": "log_decision",
"payload": "Finalize based on QA stability, blockers, and launch gate evidence.",
},
{"action_type": "finalize", "payload": "NO_GO"},
]
def synthesize_m1_actions(description: str) -> List[Dict[str, Any]]:
final = {
"phase_1": "Approve a capped GPU allocation for the highest-priority training runs.",
"phase_2": "Expand spend only after utilization and finance runway checks are reviewed.",
"guardrail": "Track budget, cost, spend, cash runway, and burn every week.",
"source_scenario": description[:300],
}
return [
{
"action_type": "delegate",
"agent_id": "dev_lead",
"payload": f"State the engineering requirement and minimum viable plan for: {description}",
},
{
"action_type": "delegate",
"agent_id": "fpa_manager",
"payload": f"State finance constraints, budget limits, runway, and spend guardrails for: {description}",
},
{
"action_type": "log_reasoning",
"payload": (
"The recommendation must balance engineering urgency against budget, "
"cost, spend, cash runway, and burn constraints."
),
},
{
"action_type": "log_conflict",
"payload": json.dumps(
{
"id": "c1",
"summary": "Engineering requirements exceed what finance should approve immediately.",
"source_agents": ["dev_lead", "fpa_manager"],
}
),
},
{
"action_type": "log_resolution",
"payload": json.dumps(
{
"conflict_id": "c1",
"resolution_type": "phased_budget",
"text": "Approve a capped phase_1 allocation with finance review before expansion.",
}
),
},
{"action_type": "finalize", "payload": json.dumps(final)},
]
def examples_from_task_classes(path: Path, module: Any) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
for idx, cls in enumerate(generated_task_classes(module), start=1):
generated_task_id = str(getattr(cls, "task_id", cls.__name__))
description = str(getattr(cls, "description", generated_task_id))
if is_e1_file(path):
task_id = "e1_launch_readiness"
actions = synthesize_e1_actions(description)
elif is_m1_file(path):
task_id = "m1_budget_reallocation"
actions = synthesize_m1_actions(description)
else:
continue
rows.append(
{
"example_id": f"{path.stem}-{idx:03d}",
"task_id": task_id,
"source_file": str(path),
"source_kind": "generated_task_class",
"source_class": cls.__name__,
"generated_task_id": generated_task_id,
"generated_description": description,
"actions": actions,
}
)
return rows
def import_file(path: Path) -> List[Dict[str, Any]]:
module = load_module(path)
rows: List[Dict[str, Any]] = []
for idx, obj in enumerate(candidate_examples(module), start=1):
task_id = infer_task_id(path, obj)
if task_id:
obj["task_id"] = task_id
obj.setdefault("example_id", f"{path.stem}-{idx:03d}")
obj.setdefault("source_file", str(path))
rows.append(obj)
if not rows:
rows.extend(examples_from_task_classes(path, module))
return rows
def main() -> None:
parser = argparse.ArgumentParser(description="Import generated Python examples to JSONL.")
parser.add_argument("--inputs", nargs="+", required=True)
parser.add_argument("--output", default="data/raw/e1_m1_examples.jsonl")
args = parser.parse_args()
rows: List[Dict[str, Any]] = []
for input_path in args.inputs:
path = Path(input_path)
if not path.exists():
raise SystemExit(f"Input not found: {path}")
imported = import_file(path)
print(f"{path}: imported {len(imported)} examples")
rows.extend(imported)
if not rows:
raise SystemExit(
"No examples found. Expected a module-level list of dictionaries "
"or generated CorpTask subclasses."
)
write_jsonl(Path(args.output), rows)
print(f"Wrote {len(rows)} examples to {args.output}")
print("Next: run scripts/verify_examples.py on the JSONL output.")
if __name__ == "__main__":
main()