| |
| |
| |
| |
| |
|
|
| """ |
| REPL Environment clients. |
| |
| `REPLEnv` is the standard async OpenEnv client for remote/server-backed usage. |
| Use `async with` / `await` directly, or call `.sync()` for synchronous code. |
| |
| This module intentionally contains only the remote OpenEnv client. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from typing import Any |
|
|
| try: |
| from openenv.core.client_types import StepResult |
| from openenv.core.env_client import EnvClient |
|
|
| from .models import CodeBlockResult, REPLAction, REPLObservation, REPLState |
| except ImportError: |
| from models import CodeBlockResult, REPLAction, REPLObservation, REPLState |
| from openenv.core.client_types import StepResult |
| from openenv.core.env_client import EnvClient |
|
|
|
|
| class REPLEnv(EnvClient[REPLAction, REPLObservation, REPLState]): |
| """ |
| Async client for the remote REPL environment. |
| |
| Use this client when connecting to a running OpenEnv server over WebSocket. |
| For synchronous code, call `.sync()` on an instance. |
| |
| Example: |
| >>> async with REPLEnv(base_url="http://localhost:8000") as env: |
| ... result = await env.reset(context="Hello World", task_prompt="Count chars") |
| ... result = await env.execute("count = len(context)") |
| ... result = await env.execute("print(f'FINAL({count})')") |
| ... print(result.done) |
| |
| >>> with REPLEnv(base_url="http://localhost:8000").sync() as env: |
| ... result = env.reset(context="Hello World", task_prompt="Count chars") |
| ... result = env.execute("count = len(context)") |
| ... result = env.execute("print(f'FINAL({count})')") |
| ... print(result.done) |
| """ |
|
|
| def _step_payload(self, action: REPLAction) -> dict[str, Any]: |
| return { |
| "code": action.code, |
| "is_final": action.is_final, |
| "final_answer": action.final_answer, |
| } |
|
|
| def _parse_result(self, payload: dict[str, Any]) -> StepResult[REPLObservation]: |
| obs_data = payload.get("observation", {}) |
| result_data = obs_data.get("result", {}) |
|
|
| observation = REPLObservation( |
| result=CodeBlockResult( |
| stdout=result_data.get("stdout", ""), |
| stderr=result_data.get("stderr", ""), |
| locals_snapshot=result_data.get("locals_snapshot", {}), |
| execution_time=result_data.get("execution_time", 0.0), |
| success=result_data.get("success", True), |
| exception=result_data.get("exception"), |
| ), |
| context_preview=obs_data.get("context_preview"), |
| context_length=obs_data.get("context_length", 0), |
| available_variables=obs_data.get("available_variables", []), |
| iteration=obs_data.get("iteration", 0), |
| max_iterations=obs_data.get("max_iterations", 30), |
| done=payload.get("done", False), |
| reward=payload.get("reward"), |
| metadata=obs_data.get("metadata", {}), |
| ) |
|
|
| return StepResult( |
| observation=observation, |
| reward=payload.get("reward"), |
| done=payload.get("done", False), |
| ) |
|
|
| def _parse_state(self, payload: dict[str, Any]) -> REPLState: |
| return REPLState( |
| episode_id=payload.get("episode_id"), |
| step_count=payload.get("step_count", 0), |
| context=payload.get("context"), |
| task_prompt=payload.get("task_prompt"), |
| iteration=payload.get("iteration", 0), |
| max_iterations=payload.get("max_iterations", 30), |
| namespace_keys=payload.get("namespace_keys", []), |
| final_answer=payload.get("final_answer"), |
| total_execution_time=payload.get("total_execution_time", 0.0), |
| ) |
|
|
| async def execute(self, code: str) -> StepResult[REPLObservation]: |
| """Execute Python code in the REPL.""" |
| return await self.step(REPLAction(code=code)) |
|
|
| async def submit_final_answer(self, answer: str) -> StepResult[REPLObservation]: |
| """Submit a final answer and terminate the episode.""" |
| return await self.step(REPLAction(code="", is_final=True, final_answer=answer)) |
|
|
| async def get_variable(self, name: str) -> StepResult[REPLObservation]: |
| """Retrieve and print a variable from the REPL namespace.""" |
| return await self.execute(f"print(repr({name}))") |
|
|
| async def list_variables(self) -> list[str]: |
| """Return the current REPL namespace keys.""" |
| return (await self.state()).namespace_keys |
|
|