Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import os | |
| import sys | |
| from typing import Any | |
| from openai import OpenAI | |
| from support_ticket_env import BENCHMARK_NAME, DEFAULT_SUCCESS_THRESHOLD, SupportTicketEnv, fallback_action, list_task_ids, parse_action | |
| API_KEY = os.getenv("HF_TOKEN") | |
| API_BASE_URL = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1" | |
| MODEL_NAME = os.getenv("MODEL_NAME") or "Qwen/Qwen2.5-72B-Instruct" | |
| ENV_BASE_URL = os.getenv("ENV_BASE_URL") | |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") | |
| TEMPERATURE = 0.0 | |
| MAX_TOKENS = 220 | |
| SUCCESS_THRESHOLD = DEFAULT_SUCCESS_THRESHOLD | |
| SYSTEM_PROMPT = """You are operating a deterministic customer-support environment. | |
| Choose exactly one tool action at each step and respond with exactly one JSON object. | |
| Valid actions: | |
| - {\"action_type\": \"search_kb\", \"query\": \"...\"} | |
| - {\"action_type\": \"lookup_account\", \"customer_id\": \"...\"} | |
| - {\"action_type\": \"send_reply\", \"message\": \"...\"} | |
| - {\"action_type\": \"issue_refund\", \"amount_cents\": 4900, \"reason_code\": \"duplicate_charge\"} | |
| - {\"action_type\": \"resolve_ticket\", \"resolution_code\": \"password_reset_guidance\"} | |
| - {\"action_type\": \"resolve_ticket\", \"resolution_code\": \"billing_refund_processed\"} | |
| - {\"action_type\": \"escalate_ticket\", \"queue\": \"support_lead\", \"priority\": \"P2\", \"summary\": \"...\"} | |
| - {\"action_type\": \"escalate_ticket\", \"queue\": \"legal_data_incident\", \"priority\": \"P0\", \"summary\": \"...\"} | |
| Do not include markdown, code fences, or explanations.""" | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: str | None) -> None: | |
| error_value = "null" if not error else error.replace("\n", " ") | |
| print( | |
| f"[STEP] step={step} action={action} reward={reward:.2f} done={str(done).lower()} error={error_value}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: list[float]) -> None: | |
| rewards_str = ",".join(f"{reward:.2f}" for reward in rewards) | |
| print( | |
| f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}", | |
| flush=True, | |
| ) | |
| def _strip_code_fences(text: str) -> str: | |
| cleaned = text.strip() | |
| if cleaned.startswith("```"): | |
| lines = cleaned.splitlines() | |
| if lines and lines[0].startswith("```"): | |
| lines = lines[1:] | |
| if lines and lines[-1].startswith("```"): | |
| lines = lines[:-1] | |
| cleaned = "\n".join(lines).strip() | |
| return cleaned | |
| def _extract_json_object(text: str) -> dict[str, Any]: | |
| cleaned = _strip_code_fences(text) | |
| start = cleaned.find("{") | |
| end = cleaned.rfind("}") | |
| if start == -1 or end == -1 or end <= start: | |
| raise ValueError("No JSON object found in model response") | |
| return json.loads(cleaned[start : end + 1]) | |
| def build_user_prompt(observation: dict[str, Any]) -> str: | |
| return ( | |
| "Choose the next best action for this support ticket. " | |
| "Keep it valid and deterministic. Observation JSON:\n" | |
| f"{json.dumps(observation, indent=2)}" | |
| ) | |
| def choose_action(client: OpenAI | None, observation) -> Any: | |
| fallback = fallback_action(observation) | |
| if client is None: | |
| return fallback | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": build_user_prompt(observation.model_dump(mode="json"))}, | |
| ], | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| ) | |
| content = (completion.choices[0].message.content or "").strip() | |
| return parse_action(_extract_json_object(content)) | |
| except Exception as exc: # pragma: no cover - depends on external endpoint | |
| print(f"[DEBUG] Falling back to scripted policy: {exc}", file=sys.stderr, flush=True) | |
| return fallback | |
| def build_env(task_id: str) -> SupportTicketEnv: | |
| if LOCAL_IMAGE_NAME: | |
| return SupportTicketEnv.from_docker_image(image_name=LOCAL_IMAGE_NAME, task_id=task_id) | |
| if ENV_BASE_URL: | |
| return SupportTicketEnv.from_env(repo_id=BENCHMARK_NAME, base_url=ENV_BASE_URL, task_id=task_id) | |
| return SupportTicketEnv(task_id=task_id) | |
| def clamp_score(score: float) -> float: | |
| return min(max(score, 0.0), 1.0) | |
| def run_episode(task_id: str, client: OpenAI | None) -> None: | |
| env = build_env(task_id) | |
| rewards: list[float] = [] | |
| steps_taken = 0 | |
| final_score = 0.0 | |
| success = False | |
| log_start(task=task_id, env=BENCHMARK_NAME, model=MODEL_NAME) | |
| try: | |
| result = env.reset(task_id) | |
| while not result.done: | |
| action = choose_action(client, result.observation) | |
| result = env.step(action) | |
| steps_taken += 1 | |
| rewards.append(result.reward) | |
| action_str = json.dumps(action.model_dump(mode="json"), separators=(",", ":")) | |
| log_step( | |
| step=steps_taken, | |
| action=action_str, | |
| reward=result.reward, | |
| done=result.done, | |
| error=result.observation.last_action_error, | |
| ) | |
| final_score = clamp_score(float(result.info.get("score", 0.0))) | |
| success = final_score >= SUCCESS_THRESHOLD | |
| finally: | |
| env.close() | |
| log_end(success=success, steps=steps_taken, score=final_score, rewards=rewards) | |
| if __name__ == "__main__": | |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) if API_KEY else None | |
| for task_id in list_task_ids(): | |
| run_episode(task_id, client) | |