File size: 8,123 Bytes
2d8433b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
"""
core/runner_env.py
Minimal HTTP-based environment client.
- Talks to a single env worker exposing: POST /reset, POST /step
Future hooks (commented below) for:
- episode_id, seed on reset
- request_id on step
- custom headers (auth/trace)
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, Optional, Type, TYPE_CHECKING, TypeVar
import requests
from .client_types import StepResult
from .containers.runtime.uv_provider import UVProvider
from .containers.runtime import LocalDockerProvider
if TYPE_CHECKING:
from .containers.runtime import ContainerProvider
ActT = TypeVar("ActT")
ObsT = TypeVar("ObsT")
EnvClientT = TypeVar("EnvClientT", bound="HTTPEnvClient")
class HTTPEnvClient(ABC, Generic[ActT, ObsT]):
def __init__(
self,
base_url: str,
request_timeout_s: float = 15.0,
default_headers: Optional[Dict[str, str]] = None,
provider: Optional["ContainerProvider"] = None,
):
self._base = base_url.rstrip("/")
self._timeout = float(request_timeout_s)
self._http = requests.Session()
self._headers = default_headers or {}
self._provider = provider
@classmethod
def from_docker_image(
cls: Type[EnvClientT],
image: str,
provider: Optional["ContainerProvider"] = None,
**kwargs: Any,
) -> EnvClientT:
"""
Create an environment client by spinning up a Docker container locally.
This is a development utility that:
1. Starts a Docker container from the specified image
2. Waits for the server to be ready
3. Creates and returns a client instance connected to the container
Note:
The caller or a higher-level orchestrator manages the container
lifecycle. The container continues running until it is stopped.
Args:
image: Docker image name to run (e.g., "echo-env:latest")
provider: Container provider to use (defaults to
``LocalDockerProvider``)
**kwargs: Additional arguments passed to
``provider.start_container()`` (e.g., env_vars, port)
Returns:
An instance of the client class connected to the running container
Example:
>>> from envs.coding_env.client import CodingEnv
>>> from envs.coding_env.models import CodeAction
>>>
>>> # Create environment from image
>>> env = CodingEnv.from_docker_image("coding-env:latest")
>>>
>>> # Create environment with custom env vars
>>> env = CodingEnv.from_docker_image(
... "coding-env:latest",
... env_vars={"MY_VAR": "value"}
... )
>>>
>>> # Use the environment
>>> result = env.reset()
>>> print(result.observation)
>>>
>>> step_result = env.step(CodeAction(code="print('hello')"))
>>> print(step_result.observation.stdout)
>>>
>>> # Cleanup (optional)
>>> env.close()
"""
# Use default provider if none provided
if provider is None:
provider = LocalDockerProvider()
# 1. Start container with optional kwargs (e.g., env_vars, port)
base_url = provider.start_container(image, **kwargs)
# 2. Wait for server to be ready
provider.wait_for_ready(base_url)
# 3. Create and return client instance with provider reference
return cls(base_url=base_url, provider=provider)
@classmethod
def from_hub(
cls: Type[EnvClientT],
space_id: str,
*,
use_docker: bool = False,
provider: Optional["ContainerProvider"] = None,
host: str = "0.0.0.0",
port: Optional[int] = None,
reload: bool = False,
timeout_s: float = 60.0,
runner: Optional[UVProvider] = None,
project_url: Optional[str] = None,
connect_host: Optional[str] = None,
extra_env: Optional[Dict[str, str]] = None,
**provider_kwargs: Any,
) -> EnvClientT:
"""Create a client from a Hugging Face Space."""
if use_docker:
if provider is None:
provider = LocalDockerProvider()
tag = provider_kwargs.pop("tag", "latest")
image = provider_kwargs.pop(
"image",
f"registry.hf.space/{space_id.replace('/', '-')}:{tag}",
)
base_url = provider.start_container(image, **provider_kwargs)
provider.wait_for_ready(base_url, timeout_s=timeout_s)
return cls(base_url=base_url, provider=provider)
uv_runner = runner or UVProvider(
space_id=space_id,
host=host,
port=port,
reload=reload,
project_url=project_url,
connect_host=connect_host,
extra_env=extra_env,
)
non_docker_kwargs = dict(provider_kwargs)
env_vars = non_docker_kwargs.pop("env_vars", None)
base_url = uv_runner.start_container(
space_id,
port=port,
env_vars=env_vars,
**non_docker_kwargs,
)
try:
uv_runner.wait_for_ready(base_url, timeout_s=timeout_s)
except Exception:
uv_runner.stop_container()
raise
return cls(base_url=base_url, provider=uv_runner)
@abstractmethod
def _step_payload(self, action: ActT) -> dict:
"""Convert an action to the JSON payload expected by the server."""
raise NotImplementedError
@abstractmethod
def _parse_result(self, payload: dict) -> StepResult[ObsT]:
"""Convert a JSON response into :class:`StepResult`."""
raise NotImplementedError
@abstractmethod
def _parse_state(self, payload: dict) -> Any:
"""Convert state JSON into a :class:`State` object."""
raise NotImplementedError
# ---------- Environment Server Interface Methods ----------
def reset(self) -> StepResult[ObsT]:
body: Dict[str, Any] = {}
# TODO: later:
# body["seed"] = seed
# body["episode_id"] = episode_id
r = self._http.post(
f"{self._base}/reset",
json=body,
headers=self._headers,
timeout=self._timeout,
)
r.raise_for_status()
return self._parse_result(r.json())
def step(self, action: ActT) -> StepResult[ObsT]:
body: Dict[str, Any] = {
"action": self._step_payload(action),
"timeout_s": int(self._timeout),
}
# TODO: later:
# body["request_id"] = str(uuid.uuid4())
# body["episode_id"] = current_episode_id
r = self._http.post(
f"{self._base}/step",
json=body,
headers=self._headers,
timeout=self._timeout,
)
r.raise_for_status()
return self._parse_result(r.json())
def state(self) -> Any:
"""
Get the current environment state from the server.
Returns:
State object with environment state information (e.g.,
episode_id, step_count)
Example:
>>> client = EchoEnv.from_docker_image("echo-env:latest")
>>> result = client.reset()
>>> state = client.state()
>>> print(state.episode_id)
>>> print(state.step_count)
"""
r = self._http.get(
f"{self._base}/state",
headers=self._headers,
timeout=self._timeout,
)
r.raise_for_status()
return self._parse_state(r.json())
def close(self) -> None:
"""
Close the environment and clean up resources.
If this client was created via from_docker_image(), this will stop
and remove the associated container.
"""
if self._provider is not None:
self._provider.stop_container()
|