|
|
""" |
|
|
core/runner_env.py |
|
|
Minimal HTTP-based environment client. |
|
|
- Talks to a single env worker exposing: POST /reset, POST /step |
|
|
|
|
|
Future hooks (commented below) for: |
|
|
- episode_id, seed on reset |
|
|
- request_id on step |
|
|
- custom headers (auth/trace) |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
from abc import ABC, abstractmethod |
|
|
from typing import Any, Dict, Generic, Optional, Type, TYPE_CHECKING, TypeVar |
|
|
|
|
|
import requests |
|
|
|
|
|
from .client_types import StepResult |
|
|
from .containers.runtime.uv_provider import UVProvider |
|
|
from .containers.runtime import LocalDockerProvider |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from .containers.runtime import ContainerProvider |
|
|
|
|
|
ActT = TypeVar("ActT") |
|
|
ObsT = TypeVar("ObsT") |
|
|
EnvClientT = TypeVar("EnvClientT", bound="HTTPEnvClient") |
|
|
|
|
|
|
|
|
class HTTPEnvClient(ABC, Generic[ActT, ObsT]): |
|
|
def __init__( |
|
|
self, |
|
|
base_url: str, |
|
|
request_timeout_s: float = 15.0, |
|
|
default_headers: Optional[Dict[str, str]] = None, |
|
|
provider: Optional["ContainerProvider"] = None, |
|
|
): |
|
|
self._base = base_url.rstrip("/") |
|
|
self._timeout = float(request_timeout_s) |
|
|
self._http = requests.Session() |
|
|
self._headers = default_headers or {} |
|
|
self._provider = provider |
|
|
|
|
|
@classmethod |
|
|
def from_docker_image( |
|
|
cls: Type[EnvClientT], |
|
|
image: str, |
|
|
provider: Optional["ContainerProvider"] = None, |
|
|
**kwargs: Any, |
|
|
) -> EnvClientT: |
|
|
""" |
|
|
Create an environment client by spinning up a Docker container locally. |
|
|
|
|
|
This is a development utility that: |
|
|
1. Starts a Docker container from the specified image |
|
|
2. Waits for the server to be ready |
|
|
3. Creates and returns a client instance connected to the container |
|
|
|
|
|
Note: |
|
|
The caller or a higher-level orchestrator manages the container |
|
|
lifecycle. The container continues running until it is stopped. |
|
|
|
|
|
Args: |
|
|
image: Docker image name to run (e.g., "echo-env:latest") |
|
|
provider: Container provider to use (defaults to |
|
|
``LocalDockerProvider``) |
|
|
**kwargs: Additional arguments passed to |
|
|
``provider.start_container()`` (e.g., env_vars, port) |
|
|
|
|
|
Returns: |
|
|
An instance of the client class connected to the running container |
|
|
|
|
|
Example: |
|
|
>>> from envs.coding_env.client import CodingEnv |
|
|
>>> from envs.coding_env.models import CodeAction |
|
|
>>> |
|
|
>>> # Create environment from image |
|
|
>>> env = CodingEnv.from_docker_image("coding-env:latest") |
|
|
>>> |
|
|
>>> # Create environment with custom env vars |
|
|
>>> env = CodingEnv.from_docker_image( |
|
|
... "coding-env:latest", |
|
|
... env_vars={"MY_VAR": "value"} |
|
|
... ) |
|
|
>>> |
|
|
>>> # Use the environment |
|
|
>>> result = env.reset() |
|
|
>>> print(result.observation) |
|
|
>>> |
|
|
>>> step_result = env.step(CodeAction(code="print('hello')")) |
|
|
>>> print(step_result.observation.stdout) |
|
|
>>> |
|
|
>>> # Cleanup (optional) |
|
|
>>> env.close() |
|
|
""" |
|
|
|
|
|
|
|
|
if provider is None: |
|
|
provider = LocalDockerProvider() |
|
|
|
|
|
|
|
|
base_url = provider.start_container(image, **kwargs) |
|
|
|
|
|
|
|
|
provider.wait_for_ready(base_url) |
|
|
|
|
|
|
|
|
return cls(base_url=base_url, provider=provider) |
|
|
|
|
|
@classmethod |
|
|
def from_hub( |
|
|
cls: Type[EnvClientT], |
|
|
space_id: str, |
|
|
*, |
|
|
use_docker: bool = False, |
|
|
provider: Optional["ContainerProvider"] = None, |
|
|
host: str = "0.0.0.0", |
|
|
port: Optional[int] = None, |
|
|
reload: bool = False, |
|
|
timeout_s: float = 60.0, |
|
|
runner: Optional[UVProvider] = None, |
|
|
project_url: Optional[str] = None, |
|
|
connect_host: Optional[str] = None, |
|
|
extra_env: Optional[Dict[str, str]] = None, |
|
|
**provider_kwargs: Any, |
|
|
) -> EnvClientT: |
|
|
"""Create a client from a Hugging Face Space.""" |
|
|
|
|
|
if use_docker: |
|
|
if provider is None: |
|
|
provider = LocalDockerProvider() |
|
|
|
|
|
tag = provider_kwargs.pop("tag", "latest") |
|
|
image = provider_kwargs.pop( |
|
|
"image", |
|
|
f"registry.hf.space/{space_id.replace('/', '-')}:{tag}", |
|
|
) |
|
|
|
|
|
base_url = provider.start_container(image, **provider_kwargs) |
|
|
provider.wait_for_ready(base_url, timeout_s=timeout_s) |
|
|
return cls(base_url=base_url, provider=provider) |
|
|
|
|
|
uv_runner = runner or UVProvider( |
|
|
space_id=space_id, |
|
|
host=host, |
|
|
port=port, |
|
|
reload=reload, |
|
|
project_url=project_url, |
|
|
connect_host=connect_host, |
|
|
extra_env=extra_env, |
|
|
) |
|
|
|
|
|
non_docker_kwargs = dict(provider_kwargs) |
|
|
env_vars = non_docker_kwargs.pop("env_vars", None) |
|
|
|
|
|
base_url = uv_runner.start_container( |
|
|
space_id, |
|
|
port=port, |
|
|
env_vars=env_vars, |
|
|
**non_docker_kwargs, |
|
|
) |
|
|
|
|
|
try: |
|
|
uv_runner.wait_for_ready(base_url, timeout_s=timeout_s) |
|
|
except Exception: |
|
|
uv_runner.stop_container() |
|
|
raise |
|
|
|
|
|
return cls(base_url=base_url, provider=uv_runner) |
|
|
|
|
|
@abstractmethod |
|
|
def _step_payload(self, action: ActT) -> dict: |
|
|
"""Convert an action to the JSON payload expected by the server.""" |
|
|
raise NotImplementedError |
|
|
|
|
|
@abstractmethod |
|
|
def _parse_result(self, payload: dict) -> StepResult[ObsT]: |
|
|
"""Convert a JSON response into :class:`StepResult`.""" |
|
|
raise NotImplementedError |
|
|
|
|
|
@abstractmethod |
|
|
def _parse_state(self, payload: dict) -> Any: |
|
|
"""Convert state JSON into a :class:`State` object.""" |
|
|
raise NotImplementedError |
|
|
|
|
|
|
|
|
def reset(self) -> StepResult[ObsT]: |
|
|
body: Dict[str, Any] = {} |
|
|
|
|
|
|
|
|
|
|
|
r = self._http.post( |
|
|
f"{self._base}/reset", |
|
|
json=body, |
|
|
headers=self._headers, |
|
|
timeout=self._timeout, |
|
|
) |
|
|
r.raise_for_status() |
|
|
return self._parse_result(r.json()) |
|
|
|
|
|
def step(self, action: ActT) -> StepResult[ObsT]: |
|
|
body: Dict[str, Any] = { |
|
|
"action": self._step_payload(action), |
|
|
"timeout_s": int(self._timeout), |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
r = self._http.post( |
|
|
f"{self._base}/step", |
|
|
json=body, |
|
|
headers=self._headers, |
|
|
timeout=self._timeout, |
|
|
) |
|
|
r.raise_for_status() |
|
|
return self._parse_result(r.json()) |
|
|
|
|
|
def state(self) -> Any: |
|
|
""" |
|
|
Get the current environment state from the server. |
|
|
|
|
|
Returns: |
|
|
State object with environment state information (e.g., |
|
|
episode_id, step_count) |
|
|
|
|
|
Example: |
|
|
>>> client = EchoEnv.from_docker_image("echo-env:latest") |
|
|
>>> result = client.reset() |
|
|
>>> state = client.state() |
|
|
>>> print(state.episode_id) |
|
|
>>> print(state.step_count) |
|
|
""" |
|
|
r = self._http.get( |
|
|
f"{self._base}/state", |
|
|
headers=self._headers, |
|
|
timeout=self._timeout, |
|
|
) |
|
|
r.raise_for_status() |
|
|
return self._parse_state(r.json()) |
|
|
|
|
|
def close(self) -> None: |
|
|
""" |
|
|
Close the environment and clean up resources. |
|
|
|
|
|
If this client was created via from_docker_image(), this will stop |
|
|
and remove the associated container. |
|
|
""" |
|
|
if self._provider is not None: |
|
|
self._provider.stop_container() |
|
|
|