Spaces:
Sleeping
Sleeping
| """OpenEnv client for interacting with the support queue environment.""" | |
| from __future__ import annotations | |
| from typing import Any, Dict | |
| import requests | |
| from support_queue_env.models import TaskCard, SupportQueueAction, SupportQueueObservation, SupportQueueState | |
| try: | |
| from openenv.core.client_types import StepResult | |
| from openenv.core.env_client import EnvClient as OpenEnvClient | |
| except Exception: # pragma: no cover - fallback for environments without openenv-core | |
| OpenEnvClient = None | |
| StepResult = None | |
| if OpenEnvClient is not None: | |
| class SupportQueueEnv(OpenEnvClient[SupportQueueAction, SupportQueueObservation, SupportQueueState]): | |
| def __init__(self, base_url: str, **kwargs: Any) -> None: | |
| super().__init__(base_url=base_url, **kwargs) | |
| self.base_url = base_url.rstrip("/") | |
| def _step_payload(self, action: SupportQueueAction) -> Dict[str, Any]: | |
| return action.model_dump() | |
| def _parse_result(self, payload: Dict[str, Any]) -> StepResult[SupportQueueObservation]: | |
| observation = SupportQueueObservation.model_validate(payload.get("observation", {})) | |
| return StepResult( | |
| observation=observation, | |
| reward=payload.get("reward"), | |
| done=payload.get("done", False), | |
| ) | |
| def _parse_state(self, payload: Dict[str, Any]) -> SupportQueueState: | |
| return SupportQueueState.model_validate(payload) | |
| def list_tasks(self) -> list[TaskCard]: | |
| response = requests.get(f"{self.base_url.rstrip('/')}/tasks", timeout=30) | |
| response.raise_for_status() | |
| payload = response.json() | |
| return [TaskCard.model_validate(item) for item in payload["tasks"]] | |
| else: | |
| class _Result: | |
| def __init__(self, payload: dict[str, Any]) -> None: | |
| self.observation = SupportQueueObservation.model_validate(payload["observation"]) | |
| self.reward = float(payload.get("reward") or 0.0) | |
| self.done = bool(payload.get("done")) | |
| class SupportQueueEnv: | |
| def __init__(self, base_url: str, **_: Any) -> None: | |
| self.base_url = base_url.rstrip("/") | |
| async def from_docker_image(cls, image_name: str | None = None) -> "SupportQueueEnv": | |
| _ = image_name | |
| return cls(base_url="http://127.0.0.1:8000") | |
| def list_tasks(self) -> list[TaskCard]: | |
| response = requests.get(f"{self.base_url}/tasks", timeout=30) | |
| response.raise_for_status() | |
| payload = response.json() | |
| return [TaskCard.model_validate(item) for item in payload["tasks"]] | |
| async def reset(self, **kwargs: Any) -> _Result: | |
| response = requests.post(f"{self.base_url}/reset", json=kwargs or {}, timeout=30) | |
| response.raise_for_status() | |
| return _Result(response.json()) | |
| async def step(self, action: SupportQueueAction) -> _Result: | |
| response = requests.post( | |
| f"{self.base_url}/step", | |
| json={"action": action.model_dump()}, | |
| timeout=30, | |
| ) | |
| response.raise_for_status() | |
| return _Result(response.json()) | |
| async def state(self) -> SupportQueueState: | |
| response = requests.get(f"{self.base_url}/state", timeout=30) | |
| response.raise_for_status() | |
| return SupportQueueState.model_validate(response.json()) | |
| async def close(self) -> None: | |
| return None | |