invoiceguard-code / inference.py
piyush-mk's picture
Upload folder using huggingface_hub
4cd2128 verified
"""
InvoiceGuard -- Baseline Inference Script.
Runs a baseline LLM agent against all canonical tasks and reports per-task
and overall grader scores. Uses the OpenAI API client as required by
the hackathon guidelines.
STDOUT FORMAT (mandatory):
[START] task=<task_name> env=invoice_guard model=<model_name>
[STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null>
[END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn>
Environment variables (mandatory):
API_BASE_URL -- LLM API endpoint (default: https://api.openai.com/v1)
MODEL_NAME -- model identifier (default: gpt-4.1-mini)
HF_TOKEN -- Your Hugging Face / API key
LOCAL_IMAGE_NAME -- Docker image name (uses from_docker_image when set)
"""
import asyncio
import json
import os
import re
import sys
import time
from typing import List, Optional
from dotenv import load_dotenv
from openai import OpenAI
_MODELS_USING_MAX_COMPLETION_TOKENS = {"gpt-5.4", "gpt-5.4-mini", "gpt-5.4-nano",
"gpt-5", "gpt-5-mini", "gpt-5.1"}
def _token_limit_kwarg(model_name: str, limit: int = 512) -> dict:
"""Return the correct token-limit parameter for the model."""
for prefix in _MODELS_USING_MAX_COMPLETION_TOKENS:
if model_name.startswith(prefix):
return {"max_completion_tokens": limit}
return {"max_tokens": limit}
from models import (
ActionType, DecisionType, ExceptionType, InvoiceGuardAction, TaskID,
)
from tasks import get_task_case, TASK_LIST
load_dotenv()
API_BASE_URL = os.getenv("API_BASE_URL", "https://api.openai.com/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4.1-mini")
API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") or os.getenv("OPENAI_API_KEY") or ""
LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME")
BENCHMARK = "invoice_guard"
# -- Mandatory stdout logging --------------------------------------------
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
error_val = error if error else "null"
done_val = str(done).lower()
print(
f"[STEP] step={step} action={action} reward={reward:.2f} "
f"done={done_val} error={error_val}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(
f"[END] success={str(success).lower()} steps={steps} "
f"score={score:.2f} rewards={rewards_str}",
flush=True,
)
# -- System prompt --------------------------------------------------------
SYSTEM_PROMPT = """You are a senior accounts payable analyst. You will be given an invoice case to investigate and resolve.
The environment tells you your goal, available actions, and decision options. Read the goal carefully.
WORKFLOW:
1. Investigate: inspect documents (PO, GRN, vendor profile, policy rules), run comparisons (quantity, price, totals), check for duplicates.
2. Resolve: submit_final_resolution with your decision, exception type, evidence references, and explanation.
Complete a thorough investigation before resolving. Inspect at least: purchase order, goods receipt note, compare quantity, compare price, policy rules, duplicate check, and vendor profile.
RESPONSE FORMAT:
- Respond with ONLY a valid JSON object. No markdown, no commentary.
- Investigation example: {"action_type": "inspect_purchase_order"}
- Resolution example: {"action_type": "submit_final_resolution", "final_decision": "approve_for_payment", "exception_type": "clean_match", "evidence_references": ["inspect_purchase_order", "compare_quantity"], "explanation": "All documents match within tolerance.", "confidence": 0.9}
EXCEPTION TYPE OPTIONS:
- clean_match: all documents match within tolerance
- quantity_mismatch: billed quantity exceeds ordered quantity
- price_mismatch: billed unit price exceeds PO-agreed price
- total_amount_mismatch: invoice subtotal/total math is inconsistent
- partial_receipt: invoice bills more than has been received
- missing_receipt: no valid GRN supports the invoice/period
- duplicate_invoice: same invoice or already-paid duplicate detected
- tax_variance: tax amount differs from policy or expected value
- policy_violation: policy requires hold/escalation despite matching docs
- mixed_discrepancy: multiple material exception types are present
RULES:
- Pay close attention to POLICY findings -- they tell you when escalation is required.
- When multiple issues exist, escalation takes priority over hold.
- Check PO references carefully before concluding an invoice is a duplicate.
- Include all investigation actions you performed in evidence_references.
- Cite specific numbers in your explanation.
- NEVER repeat an action you already took.
- When remaining_steps is 3 or fewer, submit immediately with what you have.
"""
# -- Prompt building ------------------------------------------------------
def build_observation_prompt(obs, is_first: bool = False) -> str:
"""Format observation as a readable prompt for the LLM."""
parts = [
f"Case: {obs.case_id} | Difficulty: {obs.difficulty} | Steps remaining: {obs.remaining_steps}",
f"Invoice: {obs.invoice_summary}",
]
if is_first and obs.goal:
parts.append(f"\n{obs.goal}")
if obs.revealed_documents:
parts.append(f"Documents reviewed: {', '.join(obs.revealed_documents)}")
if obs.findings:
parts.append("Findings:")
for i, f in enumerate(obs.findings, 1):
parts.append(f" {i}. {f}")
if obs.last_action_result:
parts.append(f"Last result: {obs.last_action_result}")
if obs.warnings:
parts.append(f"Warnings: {'; '.join(obs.warnings)}")
if obs.remaining_steps <= 2:
parts.append(
">>> YOU MUST submit_final_resolution NOW. "
"No more investigation. Decide based on what you have. <<<"
)
return "\n".join(parts)
# -- LLM response parsing ------------------------------------------------
_THINK_RE = re.compile(r"<think>.*?</think>", re.DOTALL)
def strip_think_blocks(text: str) -> str:
"""Remove Qwen3-style <think>...</think> reasoning blocks."""
return _THINK_RE.sub("", text).strip()
def _extract_first_json_object(text: str) -> dict | None:
"""Find the first balanced {...} in text and parse it."""
start = text.find("{")
if start == -1:
return None
depth = 0
in_str = False
escape = False
for i in range(start, len(text)):
c = text[i]
if escape:
escape = False
continue
if c == "\\":
escape = True
continue
if c == '"':
in_str = not in_str
continue
if in_str:
continue
if c == "{":
depth += 1
elif c == "}":
depth -= 1
if depth == 0:
try:
return json.loads(text[start : i + 1])
except json.JSONDecodeError:
return None
return None
def parse_llm_response(response_text: str) -> dict:
"""Extract a JSON object from the LLM response."""
text = strip_think_blocks(response_text).strip()
if "```json" in text:
text = text.split("```json")[1].split("```")[0].strip()
elif "```" in text:
text = text.split("```")[1].split("```")[0].strip()
try:
return json.loads(text)
except json.JSONDecodeError:
pass
for line in text.split("\n"):
line = line.strip()
if line.startswith("{"):
try:
return json.loads(line)
except json.JSONDecodeError:
continue
obj = _extract_first_json_object(text)
if obj is not None:
return obj
return {"action_type": "summarize_findings"}
def build_action(params: dict) -> InvoiceGuardAction:
"""Build a typed InvoiceGuardAction from parsed LLM output."""
action_type = params.get("action_type", "summarize_findings")
try:
ActionType(action_type)
except ValueError:
action_type = "summarize_findings"
kwargs = {"action_type": action_type}
if params.get("final_decision"):
try:
kwargs["final_decision"] = DecisionType(params["final_decision"])
except ValueError:
pass
if params.get("exception_type"):
try:
kwargs["exception_type"] = ExceptionType(params["exception_type"])
except ValueError:
pass
if params.get("evidence_references"):
kwargs["evidence_references"] = list(params["evidence_references"])
if params.get("explanation"):
kwargs["explanation"] = str(params["explanation"])
if params.get("confidence") is not None:
try:
conf = float(params["confidence"])
if 0.0 <= conf <= 1.0:
kwargs["confidence"] = conf
except (ValueError, TypeError):
pass
return InvoiceGuardAction(**kwargs)
# -- Observation extraction helpers ---------------------------------------
def _obs_from_step_result(result):
"""Extract observation from an EnvClient StepResult, copying reward/done."""
obs = result.observation
obs.reward = result.reward
obs.done = result.done
return obs
# -- Episode runner (local, synchronous) ----------------------------------
def run_episode_local(env, client: OpenAI, task_id: TaskID) -> dict:
"""Run one full episode against the local environment."""
obs = env.reset(task_id=task_id.value)
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
rewards: List[float] = []
steps = 0
score = 0.0
success = False
last_decision = None
last_exception = None
log_start(task=task_id.value, env=BENCHMARK, model=MODEL_NAME)
try:
while not obs.done:
user_msg = build_observation_prompt(obs, is_first=(steps == 0))
messages.append({"role": "user", "content": user_msg})
try:
api_kwargs = {
"model": MODEL_NAME,
"messages": messages,
"temperature": 0.0,
**_token_limit_kwarg(MODEL_NAME),
}
try:
api_kwargs["response_format"] = {"type": "json_object"}
response = client.chat.completions.create(**api_kwargs)
except Exception:
del api_kwargs["response_format"]
response = client.chat.completions.create(**api_kwargs)
assistant_msg = response.choices[0].message.content or ""
except Exception as e:
print(f"[DEBUG] LLM API error: {e}", flush=True)
assistant_msg = '{"action_type": "summarize_findings"}'
messages.append({"role": "assistant", "content": assistant_msg})
params = parse_llm_response(assistant_msg)
if params.get("final_decision"):
last_decision = params["final_decision"]
if params.get("exception_type"):
last_exception = params["exception_type"]
action = build_action(params)
obs = env.step(action)
reward = obs.reward if obs.reward else 0.0
rewards.append(reward)
steps += 1
error_str = None
if obs.last_action_error:
error_str = obs.last_action_result
log_step(
step=steps,
action=action.action_type.value,
reward=reward,
done=obs.done,
error=error_str,
)
grader_data = getattr(obs, "grader_result", None) or obs.metadata.get("grader_result", {})
score = grader_data.get("score", 0.0) if isinstance(grader_data, dict) else 0.0
score = min(max(score, 0.0), 1.0)
success = score >= 0.5
finally:
log_end(success=success, steps=steps, score=score, rewards=rewards)
return {
"task_id": task_id.value,
"steps": steps,
"grader_score": score,
"total_reward": sum(rewards),
"rewards": rewards,
"decision": last_decision,
"exception_type": last_exception,
"grader_breakdown": grader_data,
}
# -- Episode runner (Docker, asynchronous) --------------------------------
async def run_episode_docker(env, client: OpenAI, task_id: TaskID) -> dict:
"""Run one full episode against a Docker-based environment via EnvClient."""
result = await env.reset(task_id=task_id.value)
obs = _obs_from_step_result(result)
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
rewards: List[float] = []
steps = 0
score = 0.0
success = False
last_decision = None
last_exception = None
log_start(task=task_id.value, env=BENCHMARK, model=MODEL_NAME)
try:
while not obs.done:
user_msg = build_observation_prompt(obs, is_first=(steps == 0))
messages.append({"role": "user", "content": user_msg})
try:
api_kwargs = {
"model": MODEL_NAME,
"messages": messages,
"temperature": 0.0,
**_token_limit_kwarg(MODEL_NAME),
}
try:
api_kwargs["response_format"] = {"type": "json_object"}
response = client.chat.completions.create(**api_kwargs)
except Exception:
del api_kwargs["response_format"]
response = client.chat.completions.create(**api_kwargs)
assistant_msg = response.choices[0].message.content or ""
except Exception as e:
print(f"[DEBUG] LLM API error: {e}", flush=True)
assistant_msg = '{"action_type": "summarize_findings"}'
messages.append({"role": "assistant", "content": assistant_msg})
params = parse_llm_response(assistant_msg)
if params.get("final_decision"):
last_decision = params["final_decision"]
if params.get("exception_type"):
last_exception = params["exception_type"]
action = build_action(params)
result = await env.step(action)
obs = _obs_from_step_result(result)
reward = obs.reward if obs.reward else 0.0
rewards.append(reward)
steps += 1
error_str = None
if obs.last_action_error:
error_str = obs.last_action_result
log_step(
step=steps,
action=action.action_type.value,
reward=reward,
done=obs.done,
error=error_str,
)
grader_data = getattr(obs, "grader_result", None) or obs.metadata.get("grader_result", {})
score = grader_data.get("score", 0.0) if isinstance(grader_data, dict) else 0.0
score = min(max(score, 0.0), 1.0)
success = score >= 0.5
finally:
log_end(success=success, steps=steps, score=score, rewards=rewards)
return {
"task_id": task_id.value,
"steps": steps,
"grader_score": score,
"total_reward": sum(rewards),
"rewards": rewards,
"decision": last_decision,
"exception_type": last_exception,
"grader_breakdown": grader_data,
}
# -- Main -----------------------------------------------------------------
def _print_header():
print("=" * 60, flush=True)
print("InvoiceGuard -- Baseline Inference", flush=True)
print("=" * 60, flush=True)
print(f"API Base URL: {API_BASE_URL}", flush=True)
print(f"Model: {MODEL_NAME}", flush=True)
print(f"Tasks: {len(TASK_LIST)}", flush=True)
mode = f"docker ({LOCAL_IMAGE_NAME})" if LOCAL_IMAGE_NAME else "local"
print(f"Mode: {mode}", flush=True)
print(flush=True)
def _print_results(results):
print(flush=True)
print("=" * 60, flush=True)
print("RESULTS SUMMARY", flush=True)
print("=" * 60, flush=True)
scores = [r["grader_score"] for r in results]
for r in results:
print(
f" {r['task_id']:30s} score={r['grader_score']:.4f} "
f"decision={r['decision']}",
flush=True,
)
avg = sum(scores) / len(scores) if scores else 0.0
print(f"\n Average score: {avg:.4f}", flush=True)
print(f" Total tasks: {len(scores)}", flush=True)
print("=" * 60, flush=True)
async def main_docker():
"""Run inference against a Docker container via EnvClient."""
from client import InvoiceGuardEnv
_print_header()
llm_client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY)
env = await InvoiceGuardEnv.from_docker_image(LOCAL_IMAGE_NAME)
results = []
try:
for task_id in TASK_LIST:
start = time.time()
result = await run_episode_docker(env, llm_client, task_id)
elapsed = time.time() - start
print(
f" >> {task_id.value}: score={result['grader_score']:.4f} "
f"steps={result['steps']} decision={result['decision']} "
f"time={elapsed:.1f}s",
flush=True,
)
results.append(result)
finally:
try:
await env.close()
except Exception as e:
print(f"[DEBUG] env.close() error: {e}", flush=True)
_print_results(results)
def main_local():
"""Run inference directly against the local environment."""
from server.invoice_guard_environment import InvoiceGuardEnvironment
_print_header()
llm_client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY)
env = InvoiceGuardEnvironment()
results = []
for task_id in TASK_LIST:
start = time.time()
result = run_episode_local(env, llm_client, task_id)
elapsed = time.time() - start
print(
f" >> {task_id.value}: score={result['grader_score']:.4f} "
f"steps={result['steps']} decision={result['decision']} "
f"time={elapsed:.1f}s",
flush=True,
)
results.append(result)
_print_results(results)
if __name__ == "__main__":
if LOCAL_IMAGE_NAME:
asyncio.run(main_docker())
else:
main_local()