File size: 8,123 Bytes
2d8433b |
|
"""
core/runner_env.py
Minimal HTTP-based environment client.
- Talks to a single env worker exposing: POST /reset, POST /step
Future hooks (commented below) for:
- episode_id, seed on reset
- request_id on step
- custom headers (auth/trace)
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, Optional, Type, TYPE_CHECKING, TypeVar
import requests
from .client_types import StepResult
from .containers.runtime.uv_provider import UVProvider
from .containers.runtime import LocalDockerProvider
if TYPE_CHECKING:
from .containers.runtime import ContainerProvider
ActT = TypeVar("ActT")
ObsT = TypeVar("ObsT")
EnvClientT = TypeVar("EnvClientT", bound="HTTPEnvClient")
class HTTPEnvClient(ABC, Generic[ActT, ObsT]):
def __init__(
self,
base_url: str,
request_timeout_s: float = 15.0,
default_headers: Optional[Dict[str, str]] = None,
provider: Optional["ContainerProvider"] = None,
):
self._base = base_url.rstrip("/")
self._timeout = float(request_timeout_s)
self._http = requests.Session()
self._headers = default_headers or {}
self._provider = provider
@classmethod
def from_docker_image(
cls: Type[EnvClientT],
image: str,
provider: Optional["ContainerProvider"] = None,
**kwargs: Any,
) -> EnvClientT:
"""
Create an environment client by spinning up a Docker container locally.
This is a development utility that:
1. Starts a Docker container from the specified image
2. Waits for the server to be ready
3. Creates and returns a client instance connected to the container
Note:
The caller or a higher-level orchestrator manages the container
lifecycle. The container continues running until it is stopped.
Args:
image: Docker image name to run (e.g., "echo-env:latest")
provider: Container provider to use (defaults to
``LocalDockerProvider``)
**kwargs: Additional arguments passed to
``provider.start_container()`` (e.g., env_vars, port)
Returns:
An instance of the client class connected to the running container
Example:
>>> from envs.coding_env.client import CodingEnv
>>> from envs.coding_env.models import CodeAction
>>>
>>> # Create environment from image
>>> env = CodingEnv.from_docker_image("coding-env:latest")
>>>
>>> # Create environment with custom env vars
>>> env = CodingEnv.from_docker_image(
... "coding-env:latest",
... env_vars={"MY_VAR": "value"}
... )
>>>
>>> # Use the environment
>>> result = env.reset()
>>> print(result.observation)
>>>
>>> step_result = env.step(CodeAction(code="print('hello')"))
>>> print(step_result.observation.stdout)
>>>
>>> # Cleanup (optional)
>>> env.close()
"""
# Use default provider if none provided
if provider is None:
provider = LocalDockerProvider()
# 1. Start container with optional kwargs (e.g., env_vars, port)
base_url = provider.start_container(image, **kwargs)
# 2. Wait for server to be ready
provider.wait_for_ready(base_url)
# 3. Create and return client instance with provider reference
return cls(base_url=base_url, provider=provider)
@classmethod
def from_hub(
cls: Type[EnvClientT],
space_id: str,
*,
use_docker: bool = False,
provider: Optional["ContainerProvider"] = None,
host: str = "0.0.0.0",
port: Optional[int] = None,
reload: bool = False,
timeout_s: float = 60.0,
runner: Optional[UVProvider] = None,
project_url: Optional[str] = None,
connect_host: Optional[str] = None,
extra_env: Optional[Dict[str, str]] = None,
**provider_kwargs: Any,
) -> EnvClientT:
"""Create a client from a Hugging Face Space."""
if use_docker:
if provider is None:
provider = LocalDockerProvider()
tag = provider_kwargs.pop("tag", "latest")
image = provider_kwargs.pop(
"image",
f"registry.hf.space/{space_id.replace('/', '-')}:{tag}",
)
base_url = provider.start_container(image, **provider_kwargs)
provider.wait_for_ready(base_url, timeout_s=timeout_s)
return cls(base_url=base_url, provider=provider)
uv_runner = runner or UVProvider(
space_id=space_id,
host=host,
port=port,
reload=reload,
project_url=project_url,
connect_host=connect_host,
extra_env=extra_env,
)
non_docker_kwargs = dict(provider_kwargs)
env_vars = non_docker_kwargs.pop("env_vars", None)
base_url = uv_runner.start_container(
space_id,
port=port,
env_vars=env_vars,
**non_docker_kwargs,
)
try:
uv_runner.wait_for_ready(base_url, timeout_s=timeout_s)
except Exception:
uv_runner.stop_container()
raise
return cls(base_url=base_url, provider=uv_runner)
@abstractmethod
def _step_payload(self, action: ActT) -> dict:
"""Convert an action to the JSON payload expected by the server."""
raise NotImplementedError
@abstractmethod
def _parse_result(self, payload: dict) -> StepResult[ObsT]:
"""Convert a JSON response into :class:`StepResult`."""
raise NotImplementedError
@abstractmethod
def _parse_state(self, payload: dict) -> Any:
"""Convert state JSON into a :class:`State` object."""
raise NotImplementedError
# ---------- Environment Server Interface Methods ----------
def reset(self) -> StepResult[ObsT]:
body: Dict[str, Any] = {}
# TODO: later:
# body["seed"] = seed
# body["episode_id"] = episode_id
r = self._http.post(
f"{self._base}/reset",
json=body,
headers=self._headers,
timeout=self._timeout,
)
r.raise_for_status()
return self._parse_result(r.json())
def step(self, action: ActT) -> StepResult[ObsT]:
body: Dict[str, Any] = {
"action": self._step_payload(action),
"timeout_s": int(self._timeout),
}
# TODO: later:
# body["request_id"] = str(uuid.uuid4())
# body["episode_id"] = current_episode_id
r = self._http.post(
f"{self._base}/step",
json=body,
headers=self._headers,
timeout=self._timeout,
)
r.raise_for_status()
return self._parse_result(r.json())
def state(self) -> Any:
"""
Get the current environment state from the server.
Returns:
State object with environment state information (e.g.,
episode_id, step_count)
Example:
>>> client = EchoEnv.from_docker_image("echo-env:latest")
>>> result = client.reset()
>>> state = client.state()
>>> print(state.episode_id)
>>> print(state.step_count)
"""
r = self._http.get(
f"{self._base}/state",
headers=self._headers,
timeout=self._timeout,
)
r.raise_for_status()
return self._parse_state(r.json())
def close(self) -> None:
"""
Close the environment and clean up resources.
If this client was created via from_docker_image(), this will stop
and remove the associated container.
"""
if self._provider is not None:
self._provider.stop_container()
|