customer-support-openenv / inference.py
Dar3devil's picture
Sync inference
4320bcf verified
from __future__ import annotations
import json
import os
import sys
from typing import Any
from openai import OpenAI
from support_ticket_env import BENCHMARK_NAME, DEFAULT_SUCCESS_THRESHOLD, SupportTicketEnv, fallback_action, list_task_ids, parse_action
API_KEY = os.getenv("HF_TOKEN")
API_BASE_URL = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1"
MODEL_NAME = os.getenv("MODEL_NAME") or "Qwen/Qwen2.5-72B-Instruct"
ENV_BASE_URL = os.getenv("ENV_BASE_URL")
LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME")
TEMPERATURE = 0.0
MAX_TOKENS = 220
SUCCESS_THRESHOLD = DEFAULT_SUCCESS_THRESHOLD
SYSTEM_PROMPT = """You are operating a deterministic customer-support environment.
Choose exactly one tool action at each step and respond with exactly one JSON object.
Valid actions:
- {\"action_type\": \"search_kb\", \"query\": \"...\"}
- {\"action_type\": \"lookup_account\", \"customer_id\": \"...\"}
- {\"action_type\": \"send_reply\", \"message\": \"...\"}
- {\"action_type\": \"issue_refund\", \"amount_cents\": 4900, \"reason_code\": \"duplicate_charge\"}
- {\"action_type\": \"resolve_ticket\", \"resolution_code\": \"password_reset_guidance\"}
- {\"action_type\": \"resolve_ticket\", \"resolution_code\": \"billing_refund_processed\"}
- {\"action_type\": \"escalate_ticket\", \"queue\": \"support_lead\", \"priority\": \"P2\", \"summary\": \"...\"}
- {\"action_type\": \"escalate_ticket\", \"queue\": \"legal_data_incident\", \"priority\": \"P0\", \"summary\": \"...\"}
Do not include markdown, code fences, or explanations."""
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: str | None) -> None:
error_value = "null" if not error else error.replace("\n", " ")
print(
f"[STEP] step={step} action={action} reward={reward:.2f} done={str(done).lower()} error={error_value}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: list[float]) -> None:
rewards_str = ",".join(f"{reward:.2f}" for reward in rewards)
print(
f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}",
flush=True,
)
def _strip_code_fences(text: str) -> str:
cleaned = text.strip()
if cleaned.startswith("```"):
lines = cleaned.splitlines()
if lines and lines[0].startswith("```"):
lines = lines[1:]
if lines and lines[-1].startswith("```"):
lines = lines[:-1]
cleaned = "\n".join(lines).strip()
return cleaned
def _extract_json_object(text: str) -> dict[str, Any]:
cleaned = _strip_code_fences(text)
start = cleaned.find("{")
end = cleaned.rfind("}")
if start == -1 or end == -1 or end <= start:
raise ValueError("No JSON object found in model response")
return json.loads(cleaned[start : end + 1])
def build_user_prompt(observation: dict[str, Any]) -> str:
return (
"Choose the next best action for this support ticket. "
"Keep it valid and deterministic. Observation JSON:\n"
f"{json.dumps(observation, indent=2)}"
)
def choose_action(client: OpenAI | None, observation) -> Any:
fallback = fallback_action(observation)
if client is None:
return fallback
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": build_user_prompt(observation.model_dump(mode="json"))},
],
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
)
content = (completion.choices[0].message.content or "").strip()
return parse_action(_extract_json_object(content))
except Exception as exc: # pragma: no cover - depends on external endpoint
print(f"[DEBUG] Falling back to scripted policy: {exc}", file=sys.stderr, flush=True)
return fallback
def build_env(task_id: str) -> SupportTicketEnv:
if LOCAL_IMAGE_NAME:
return SupportTicketEnv.from_docker_image(image_name=LOCAL_IMAGE_NAME, task_id=task_id)
if ENV_BASE_URL:
return SupportTicketEnv.from_env(repo_id=BENCHMARK_NAME, base_url=ENV_BASE_URL, task_id=task_id)
return SupportTicketEnv(task_id=task_id)
def clamp_score(score: float) -> float:
return min(max(score, 0.0), 1.0)
def run_episode(task_id: str, client: OpenAI | None) -> None:
env = build_env(task_id)
rewards: list[float] = []
steps_taken = 0
final_score = 0.0
success = False
log_start(task=task_id, env=BENCHMARK_NAME, model=MODEL_NAME)
try:
result = env.reset(task_id)
while not result.done:
action = choose_action(client, result.observation)
result = env.step(action)
steps_taken += 1
rewards.append(result.reward)
action_str = json.dumps(action.model_dump(mode="json"), separators=(",", ":"))
log_step(
step=steps_taken,
action=action_str,
reward=result.reward,
done=result.done,
error=result.observation.last_action_error,
)
final_score = clamp_score(float(result.info.get("score", 0.0)))
success = final_score >= SUCCESS_THRESHOLD
finally:
env.close()
log_end(success=success, steps=steps_taken, score=final_score, rewards=rewards)
if __name__ == "__main__":
client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) if API_KEY else None
for task_id in list_task_ids():
run_episode(task_id, client)