""" core/runner_env.py Minimal HTTP-based environment client. - Talks to a single env worker exposing: POST /reset, POST /step Future hooks (commented below) for: - episode_id, seed on reset - request_id on step - custom headers (auth/trace) """ from __future__ import annotations from abc import ABC, abstractmethod from typing import Any, Dict, Generic, Optional, Type, TYPE_CHECKING, TypeVar import requests from .client_types import StepResult from .containers.runtime.uv_provider import UVProvider from .containers.runtime import LocalDockerProvider if TYPE_CHECKING: from .containers.runtime import ContainerProvider ActT = TypeVar("ActT") ObsT = TypeVar("ObsT") EnvClientT = TypeVar("EnvClientT", bound="HTTPEnvClient") class HTTPEnvClient(ABC, Generic[ActT, ObsT]): def __init__( self, base_url: str, request_timeout_s: float = 15.0, default_headers: Optional[Dict[str, str]] = None, provider: Optional["ContainerProvider"] = None, ): self._base = base_url.rstrip("/") self._timeout = float(request_timeout_s) self._http = requests.Session() self._headers = default_headers or {} self._provider = provider @classmethod def from_docker_image( cls: Type[EnvClientT], image: str, provider: Optional["ContainerProvider"] = None, **kwargs: Any, ) -> EnvClientT: """ Create an environment client by spinning up a Docker container locally. This is a development utility that: 1. Starts a Docker container from the specified image 2. Waits for the server to be ready 3. Creates and returns a client instance connected to the container Note: The caller or a higher-level orchestrator manages the container lifecycle. The container continues running until it is stopped. Args: image: Docker image name to run (e.g., "echo-env:latest") provider: Container provider to use (defaults to ``LocalDockerProvider``) **kwargs: Additional arguments passed to ``provider.start_container()`` (e.g., env_vars, port) Returns: An instance of the client class connected to the running container Example: >>> from envs.coding_env.client import CodingEnv >>> from envs.coding_env.models import CodeAction >>> >>> # Create environment from image >>> env = CodingEnv.from_docker_image("coding-env:latest") >>> >>> # Create environment with custom env vars >>> env = CodingEnv.from_docker_image( ... "coding-env:latest", ... env_vars={"MY_VAR": "value"} ... ) >>> >>> # Use the environment >>> result = env.reset() >>> print(result.observation) >>> >>> step_result = env.step(CodeAction(code="print('hello')")) >>> print(step_result.observation.stdout) >>> >>> # Cleanup (optional) >>> env.close() """ # Use default provider if none provided if provider is None: provider = LocalDockerProvider() # 1. Start container with optional kwargs (e.g., env_vars, port) base_url = provider.start_container(image, **kwargs) # 2. Wait for server to be ready provider.wait_for_ready(base_url) # 3. Create and return client instance with provider reference return cls(base_url=base_url, provider=provider) @classmethod def from_hub( cls: Type[EnvClientT], space_id: str, *, use_docker: bool = False, provider: Optional["ContainerProvider"] = None, host: str = "0.0.0.0", port: Optional[int] = None, reload: bool = False, timeout_s: float = 60.0, runner: Optional[UVProvider] = None, project_url: Optional[str] = None, connect_host: Optional[str] = None, extra_env: Optional[Dict[str, str]] = None, **provider_kwargs: Any, ) -> EnvClientT: """Create a client from a Hugging Face Space.""" if use_docker: if provider is None: provider = LocalDockerProvider() tag = provider_kwargs.pop("tag", "latest") image = provider_kwargs.pop( "image", f"registry.hf.space/{space_id.replace('/', '-')}:{tag}", ) base_url = provider.start_container(image, **provider_kwargs) provider.wait_for_ready(base_url, timeout_s=timeout_s) return cls(base_url=base_url, provider=provider) uv_runner = runner or UVProvider( space_id=space_id, host=host, port=port, reload=reload, project_url=project_url, connect_host=connect_host, extra_env=extra_env, ) non_docker_kwargs = dict(provider_kwargs) env_vars = non_docker_kwargs.pop("env_vars", None) base_url = uv_runner.start_container( space_id, port=port, env_vars=env_vars, **non_docker_kwargs, ) try: uv_runner.wait_for_ready(base_url, timeout_s=timeout_s) except Exception: uv_runner.stop_container() raise return cls(base_url=base_url, provider=uv_runner) @abstractmethod def _step_payload(self, action: ActT) -> dict: """Convert an action to the JSON payload expected by the server.""" raise NotImplementedError @abstractmethod def _parse_result(self, payload: dict) -> StepResult[ObsT]: """Convert a JSON response into :class:`StepResult`.""" raise NotImplementedError @abstractmethod def _parse_state(self, payload: dict) -> Any: """Convert state JSON into a :class:`State` object.""" raise NotImplementedError # ---------- Environment Server Interface Methods ---------- def reset(self) -> StepResult[ObsT]: body: Dict[str, Any] = {} # TODO: later: # body["seed"] = seed # body["episode_id"] = episode_id r = self._http.post( f"{self._base}/reset", json=body, headers=self._headers, timeout=self._timeout, ) r.raise_for_status() return self._parse_result(r.json()) def step(self, action: ActT) -> StepResult[ObsT]: body: Dict[str, Any] = { "action": self._step_payload(action), "timeout_s": int(self._timeout), } # TODO: later: # body["request_id"] = str(uuid.uuid4()) # body["episode_id"] = current_episode_id r = self._http.post( f"{self._base}/step", json=body, headers=self._headers, timeout=self._timeout, ) r.raise_for_status() return self._parse_result(r.json()) def state(self) -> Any: """ Get the current environment state from the server. Returns: State object with environment state information (e.g., episode_id, step_count) Example: >>> client = EchoEnv.from_docker_image("echo-env:latest") >>> result = client.reset() >>> state = client.state() >>> print(state.episode_id) >>> print(state.step_count) """ r = self._http.get( f"{self._base}/state", headers=self._headers, timeout=self._timeout, ) r.raise_for_status() return self._parse_state(r.json()) def close(self) -> None: """ Close the environment and clean up resources. If this client was created via from_docker_image(), this will stop and remove the associated container. """ if self._provider is not None: self._provider.stop_container()