arkai2025's picture
feat(mcp): integrate local FastMCP client with logging and improve service management and add display cache
c7829ce
"""
Fire-Rescue - Multi-Stage LLM Advisor Agent
Provides advisory recommendations based on world state analysis using a
multi-stage approach: Assessment β†’ Planning β†’ Execution.
The agent only suggests actions; it does not directly control units.
All analysis is performed by the AI - no rule-based fallback.
Uses HuggingFace Inference Provider API with openai/gpt-oss-120b model.
"""
import json
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
import yaml
from openai import OpenAI
# Load .env file if it exists
try:
from dotenv import load_dotenv
env_path = Path(__file__).parent / ".env"
if env_path.exists():
load_dotenv(env_path)
except ImportError:
pass # dotenv not installed, rely on environment variables
# =============================================================================
# HuggingFace Inference Provider Configuration
# =============================================================================
# HuggingFace Inference Provider base URL (OpenAI-compatible)
HF_INFERENCE_BASE_URL = "https://router.huggingface.co/v1"
HF_DEFAULT_MODEL = "openai/gpt-oss-120b"
# OpenAI native configuration
OPENAI_DEFAULT_MODEL = "gpt-5-mini"
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
OPENAI_API_ENV_VAR = "OPENAI_API_KEY"
def get_hf_token() -> str | None:
"""
Get HuggingFace token from environment variable.
Returns:
HF_TOKEN if available, None otherwise
"""
return os.getenv("HF_TOKEN")
def get_openai_api_key() -> str | None:
"""
Get OpenAI API key from environment variable.
Returns:
OPENAI_API_KEY if available, None otherwise
"""
return os.getenv(OPENAI_API_ENV_VAR)
# =============================================================================
# Stage 3 action cap + Prompt loading
# =============================================================================
# Stage 3 UI uses MAX_RECOMMENDATIONS to control how many actions can be rendered
MAX_RECOMMENDATIONS = 15
PROMPT_PLACEHOLDERS = {"{{MAX_RECOMMENDATIONS}}": str(MAX_RECOMMENDATIONS)}
def load_prompts() -> dict:
"""Load prompts from prompts.yaml configuration file."""
prompts_path = Path(__file__).parent / "prompts.yaml"
if prompts_path.exists():
with open(prompts_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
return {}
def _apply_prompt_placeholders(value):
"""Recursively replace placeholder tokens inside prompt config."""
if isinstance(value, str):
for token, replacement in PROMPT_PLACEHOLDERS.items():
value = value.replace(token, replacement)
return value
if isinstance(value, dict):
return {k: _apply_prompt_placeholders(v) for k, v in value.items()}
if isinstance(value, list):
return [_apply_prompt_placeholders(item) for item in value]
return value
PROMPTS_CONFIG = _apply_prompt_placeholders(load_prompts())
# =============================================================================
# Data Models
# =============================================================================
@dataclass
class Recommendation:
"""A single deployment or move recommendation from the advisor."""
reason: str
suggested_unit_type: str
target_x: int
target_y: int
action: str = "deploy" # "deploy", "move", or "remove"
source_x: int = -1 # For move action: source position
source_y: int = -1
def to_dict(self) -> dict:
result = {
"reason": self.reason,
"suggested_unit_type": self.suggested_unit_type,
"target": {"x": self.target_x, "y": self.target_y},
"action": self.action
}
if self.action == "move":
result["source"] = {"x": self.source_x, "y": self.source_y}
return result
@dataclass
class AssessmentResult:
"""Result from Stage 1: Assessment - analyzing the current situation."""
fire_count: int
high_intensity_fires: list # fires > 70%
building_threats: list # fires near buildings
uncovered_fires: list # fires with no unit in range
unit_count: int
max_units: int
effective_units: list # units in range of fires
ineffective_units: list # units not covering any fire
coverage_ratio: float # ratio of fires covered by units
threat_level: str # CRITICAL / HIGH / MODERATE / LOW
summary: str
building_integrity: float
def to_dict(self) -> dict:
return {
"fire_count": self.fire_count,
"high_intensity_fires": len(self.high_intensity_fires),
"building_threats": len(self.building_threats),
"uncovered_fires": len(self.uncovered_fires),
"unit_count": self.unit_count,
"max_units": self.max_units,
"effective_units": len(self.effective_units),
"ineffective_units": len(self.ineffective_units),
"coverage_ratio": round(self.coverage_ratio, 2),
"threat_level": self.threat_level,
"summary": self.summary,
"building_integrity": round(self.building_integrity, 2)
}
@dataclass
class PlanResult:
"""Result from Stage 2: Planning - deciding the strategy."""
strategy: str # "deploy_new" / "optimize_existing" / "balanced" / "monitor"
reasoning: str
deploy_count: int # how many new units to deploy
reposition_units: list # units to move (list of dicts with source and reason)
priority_targets: list # fires to prioritize (sorted by priority)
def to_dict(self) -> dict:
return {
"strategy": self.strategy,
"reasoning": self.reasoning,
"deploy_count": self.deploy_count,
"reposition_count": len(self.reposition_units),
"priority_targets": len(self.priority_targets)
}
@dataclass
class AdvisorResponse:
"""Complete response from the advisor agent."""
summary: str
recommendations: list[Recommendation]
thinking: str = "" # Chain of thought reasoning
analysis: str = "" # Situation analysis
priority: str = "" # Priority assessment
raw_response: Optional[str] = None
error: Optional[str] = None
# Multi-stage results
assessment: Optional[AssessmentResult] = None
plan: Optional[PlanResult] = None
def to_dict(self) -> dict:
result = {
"summary": self.summary,
"recommendations": [r.to_dict() for r in self.recommendations],
"thinking": self.thinking,
"analysis": self.analysis,
"priority": self.priority
}
if self.assessment:
result["assessment"] = self.assessment.to_dict()
if self.plan:
result["plan"] = self.plan.to_dict()
return result
@dataclass
class CycleSummary:
"""Stage 4 summary for a single advisor cycle."""
headline: str
threat_level: str
key_highlights: list[str]
risks: list[str]
next_focus: list[str]
def to_dict(self) -> dict:
return {
"headline": self.headline,
"threat_level": self.threat_level,
"key_highlights": self.key_highlights,
"risks": self.risks,
"next_focus": self.next_focus,
}
@dataclass
class AfterActionReport:
"""Structured after-action report summarizing mission outcome."""
summary: str
strengths: list[str]
improvements: list[str]
next_actions: list[str]
outcome: str = ""
error: Optional[str] = None
charts: dict = field(default_factory=dict)
player_actions: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"summary": self.summary,
"strengths": self.strengths,
"improvements": self.improvements,
"next_actions": self.next_actions,
"outcome": self.outcome,
"error": self.error,
"charts": self.charts,
"player_actions": self.player_actions,
}
# =============================================================================
# Advisor Agent
# =============================================================================
class AdvisorAgent:
"""
Multi-stage LLM-based advisor agent that analyzes world state and provides recommendations.
Stage 1: ASSESS - Analyze the situation
Stage 2: PLAN - Decide strategy
Stage 3: EXECUTE - Generate specific actions
All analysis is performed by the AI model - no fallback logic.
Uses HuggingFace Inference Provider API.
"""
def __init__(
self,
api_key: Optional[str] = None,
model: Optional[str] = None,
provider: str = "hf",
base_url: Optional[str] = None,
):
"""
Initialize the advisor agent.
Args:
api_key: Optional override for provider API key
model: Model to use for inference (defaults depend on provider)
provider: "hf" for HuggingFace Router or "openai" for native OpenAI
base_url: Optional override for API base URL
"""
self.provider = (provider or "hf").lower()
if self.provider not in {"hf", "openai"}:
self.provider = "hf"
if self.provider == "openai":
self.api_key = api_key or get_openai_api_key()
self.base_url = base_url or OPENAI_BASE_URL
else:
self.api_key = api_key or get_hf_token()
self.base_url = base_url or HF_INFERENCE_BASE_URL
# Load model config from prompts.yaml
model_config = PROMPTS_CONFIG.get("model", {})
# Model priority: explicit param > prompts.yaml > default
yaml_model = model_config.get("default") if self.provider == "hf" else None
provider_default_model = OPENAI_DEFAULT_MODEL if self.provider == "openai" else HF_DEFAULT_MODEL
self.model = model or yaml_model or provider_default_model
self.temperature = model_config.get("temperature") # None if not set
self.max_completion_tokens = model_config.get("max_completion_tokens", 2000)
# Initialize client
if self.api_key:
provider_label = "OpenAI" if self.provider == "openai" else "HuggingFace Inference"
print(f"πŸ€– AI Advisor initialized with {provider_label} API, model: {self.model}")
self.client = OpenAI(
api_key=self.api_key,
base_url=self.base_url
)
else:
self.client = None
missing_env = OPENAI_API_ENV_VAR if self.provider == "openai" else "HF_TOKEN"
print(f"⚠️ Warning: Missing {missing_env}. AI analysis will not work.")
# =========================================================================
# JSON Repair Helper
# =========================================================================
def _try_repair_json(self, content: str) -> Optional[dict]:
"""
Attempt to repair truncated JSON by closing open brackets/braces.
Returns:
Repaired JSON dict, or None if repair failed
"""
# Count open brackets and braces
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
# Check if we're in the middle of a string (odd number of unescaped quotes)
in_string = False
i = 0
while i < len(content):
if content[i] == '"' and (i == 0 or content[i-1] != '\\'):
in_string = not in_string
i += 1
repaired = content
# Close any open string
if in_string:
repaired += '"'
# Close brackets and braces in reverse order of opening
# This is a simplified repair - close all brackets then all braces
repaired += ']' * open_brackets
repaired += '}' * open_braces
try:
return json.loads(repaired)
except json.JSONDecodeError:
# Try more aggressive repair - find last valid JSON object
try:
# Try to find a partial valid structure
for end_pos in range(len(content), 0, -1):
partial = content[:end_pos]
open_b = partial.count('{') - partial.count('}')
open_br = partial.count('[') - partial.count(']')
attempt = partial + ']' * open_br + '}' * open_b
try:
return json.loads(attempt)
except json.JSONDecodeError:
continue
except Exception:
pass
return None
# =========================================================================
# LLM Call Helper
# =========================================================================
def _call_llm(self, system_prompt: str, user_message: str) -> Optional[dict]:
"""
Make an LLM API call to HuggingFace Inference Provider and parse JSON response.
Returns:
Parsed JSON dict, or None if failed
"""
if not self.client:
missing_env = OPENAI_API_ENV_VAR if self.provider == "openai" else "HF_TOKEN"
print(f"Error: No API client available ({missing_env} not set)")
return None
# Retry logic for rate limiting (429 errors)
max_retries = 3
retry_delay = 5 # seconds
for attempt in range(max_retries):
try:
# Build API call parameters for current provider
token_param = "max_completion_tokens" if self.provider == "openai" else "max_tokens"
api_params = {
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
],
"response_format": {"type": "json_object"}
}
api_params[token_param] = self.max_completion_tokens
# Only add temperature if explicitly set
if self.temperature is not None:
api_params["temperature"] = self.temperature
response = self.client.chat.completions.create(**api_params)
content = response.choices[0].message.content
finish_reason = response.choices[0].finish_reason
# Check if response was truncated
if finish_reason == "length":
print(f"⚠️ Warning: Response was truncated (hit token limit)")
if content:
# Try to parse JSON from the response
# Handle potential markdown code blocks in response
content = content.strip()
if content.startswith("```json"):
content = content[7:]
if content.startswith("```"):
content = content[3:]
if content.endswith("```"):
content = content[:-3]
content = content.strip()
try:
return json.loads(content)
except json.JSONDecodeError as e:
# Try to repair truncated JSON
print(f"⚠️ JSON parse error: {e}")
print(f" Raw content (first 500 chars): {content[:500]}...")
repaired = self._try_repair_json(content)
if repaired:
print("βœ… Successfully repaired truncated JSON")
return repaired
return None
return None
except Exception as e:
error_str = str(e)
# Check for rate limiting (429) error
if "429" in error_str or "too_many_requests" in error_str.lower():
if attempt < max_retries - 1:
print(f"⚠️ Rate limited (429), retrying in {retry_delay}s... (attempt {attempt + 1}/{max_retries})")
import time
time.sleep(retry_delay)
continue
else:
print(f"❌ Rate limited after {max_retries} attempts")
return None
else:
print(f"LLM call error: {e}")
return None
return None # Should not reach here
# =========================================================================
# Stage 1: Assessment
# =========================================================================
def assess(self, world_state: dict) -> AssessmentResult:
"""
Stage 1: Assess the current situation using AI.
"""
assess_config = PROMPTS_CONFIG.get("assess", {})
system = assess_config.get("system", "")
output_format = assess_config.get("output_format", "")
fires = world_state.get("fires", [])
units = world_state.get("units", [])
buildings = world_state.get("buildings", [])
building_integrity = world_state.get("building_integrity", 1.0)
max_units = world_state.get("max_units", 10)
# Prepare detailed user message
user_message = f"""Current World State:
- Grid size: {world_state.get("width", 10)}x{world_state.get("height", 10)}
- Building Integrity: {building_integrity:.1%}
- Max Units Allowed: {max_units}
FIRES ({len(fires)} total):
{json.dumps(fires, indent=2)}
UNITS ({len(units)} deployed):
{json.dumps(units, indent=2)}
BUILDINGS ({len(buildings)} total):
{json.dumps(buildings[:20], indent=2)}{" (showing first 20)" if len(buildings) > 20 else ""}
Remember:
- Fire Truck effective range: covers 1 tile outward from its center
- Helicopter effective range: covers 2 tiles outward from its center
- A fire is UNCOVERED if no unit is within range
- A unit is INEFFECTIVE if no fire is within its range
Output format:
{output_format}"""
full_prompt = system + "\n\n" + output_format
response = self._call_llm(full_prompt, user_message)
if not response:
# Return minimal assessment if AI fails
return AssessmentResult(
fire_count=len(fires),
high_intensity_fires=[f for f in fires if f.get("intensity", 0) > 0.7],
building_threats=[],
uncovered_fires=fires[:], # Assume all uncovered if AI fails
unit_count=len(units),
max_units=max_units,
effective_units=[],
ineffective_units=units[:], # Assume all ineffective if AI fails
coverage_ratio=0.0,
threat_level="HIGH",
summary="AI assessment unavailable - assuming high threat",
building_integrity=building_integrity
)
try:
fire_analysis = response.get("fire_analysis", {})
unit_analysis = response.get("unit_analysis", {})
# Parse uncovered fire positions
uncovered_positions = fire_analysis.get("uncovered_fire_positions", [])
uncovered_fires = []
for pos in uncovered_positions:
if isinstance(pos, list) and len(pos) >= 2:
for f in fires:
if f["x"] == pos[0] and f["y"] == pos[1]:
uncovered_fires.append(f)
break
# Parse high intensity positions
high_positions = fire_analysis.get("high_intensity_positions", [])
high_intensity_fires = []
for pos in high_positions:
if isinstance(pos, list) and len(pos) >= 2:
for f in fires:
if f["x"] == pos[0] and f["y"] == pos[1]:
high_intensity_fires.append(f)
break
# Fallback to intensity check
if not high_intensity_fires:
high_intensity_fires = [f for f in fires if f.get("intensity", 0) > 0.7]
# Parse building threat positions
building_threat_positions = fire_analysis.get("building_threat_positions", [])
building_threats = []
for pos in building_threat_positions:
if isinstance(pos, list) and len(pos) >= 2:
for f in fires:
if f["x"] == pos[0] and f["y"] == pos[1]:
building_threats.append(f)
break
# Parse ineffective units
ineffective_positions = unit_analysis.get("ineffective_positions", [])
ineffective_units = []
for pos in ineffective_positions:
if isinstance(pos, list) and len(pos) >= 2:
for u in units:
if u["x"] == pos[0] and u["y"] == pos[1]:
ineffective_units.append(u)
break
# Calculate effective units (all units not in ineffective list)
ineffective_set = set((u["x"], u["y"]) for u in ineffective_units)
effective_units = [u for u in units if (u["x"], u["y"]) not in ineffective_set]
return AssessmentResult(
fire_count=fire_analysis.get("total_fires", len(fires)),
high_intensity_fires=high_intensity_fires,
building_threats=building_threats,
uncovered_fires=uncovered_fires,
unit_count=len(units),
max_units=max_units,
effective_units=effective_units,
ineffective_units=ineffective_units,
coverage_ratio=unit_analysis.get("coverage_ratio", 0.0),
threat_level=response.get("threat_level", "MODERATE"),
summary=response.get("summary", ""),
building_integrity=building_integrity
)
except Exception as e:
print(f"Error parsing AI assessment: {e}")
return AssessmentResult(
fire_count=len(fires),
high_intensity_fires=[],
building_threats=[],
uncovered_fires=fires[:],
unit_count=len(units),
max_units=max_units,
effective_units=[],
ineffective_units=units[:],
coverage_ratio=0.0,
threat_level="HIGH",
summary=f"Assessment parse error: {e}",
building_integrity=building_integrity
)
# =========================================================================
# Stage 2: Planning
# =========================================================================
def plan(self, world_state: dict, assessment: AssessmentResult) -> PlanResult:
"""
Stage 2: Decide the strategy based on assessment using AI.
"""
plan_config = PROMPTS_CONFIG.get("plan", {})
system = plan_config.get("system", "")
output_format = plan_config.get("output_format", "")
fires = world_state.get("fires", [])
available_slots = assessment.max_units - assessment.unit_count
# Sort fires by priority for context
buildings = world_state.get("buildings", [])
building_positions = set((b["x"], b["y"]) for b in buildings)
def fire_priority(f):
near_building = any(
abs(f["x"] - bx) <= 1 and abs(f["y"] - by) <= 1
for bx, by in building_positions
)
return f.get("intensity", 0) + (1.0 if near_building else 0)
priority_targets = sorted(fires, key=fire_priority, reverse=True)
user_message = f"""Assessment Result:
- Threat Level: {assessment.threat_level}
- Fire Count: {assessment.fire_count}
- High Intensity Fires: {len(assessment.high_intensity_fires)}
- Building Threats: {len(assessment.building_threats)}
- UNCOVERED Fires (no unit in range): {len(assessment.uncovered_fires)}
- Coverage Ratio: {assessment.coverage_ratio:.1%}
- Effective Units: {len(assessment.effective_units)}
- INEFFECTIVE Units (not near any fire): {len(assessment.ineffective_units)}
- Summary: {assessment.summary}
Current Resources:
- Units deployed: {assessment.unit_count} / {assessment.max_units}
- Available slots: {available_slots}
UNCOVERED Fires (PRIORITY - these need coverage!):
{json.dumps([{"x": f["x"], "y": f["y"], "intensity": f["intensity"]} for f in assessment.uncovered_fires[:5]], indent=2)}
INEFFECTIVE Units (SHOULD BE MOVED to cover fires!):
{json.dumps([{"x": u["x"], "y": u["y"], "type": u["type"]} for u in assessment.ineffective_units], indent=2)}
Priority Fires (top 5):
{json.dumps([{"x": f["x"], "y": f["y"], "intensity": f["intensity"]} for f in priority_targets[:5]], indent=2)}
REMEMBER: If there are uncovered fires AND ineffective units, you SHOULD reposition those units!
Output format:
{output_format}"""
response = self._call_llm(system, user_message)
if not response:
# Smart fallback: calculate deploy count based on situation
uncovered_count = len(assessment.uncovered_fires)
idle_count = len(assessment.ineffective_units)
fires_after_reposition = max(0, uncovered_count - idle_count)
building_threats = len(assessment.building_threats)
# Smart deploy calculation
if building_threats > 0:
# Building emergency! Deploy enough to cover all building threats
smart_deploy = max(building_threats, fires_after_reposition)
elif uncovered_count <= 2:
# Few fires - deploy just enough
smart_deploy = fires_after_reposition
elif uncovered_count <= 5:
# Moderate fires - deploy to cover + small buffer
smart_deploy = fires_after_reposition + 1
else:
# Many fires - deploy more aggressively
smart_deploy = fires_after_reposition + 2
smart_deploy = min(smart_deploy, available_slots)
return PlanResult(
strategy="balanced" if assessment.ineffective_units else "deploy_new",
reasoning=f"AI planning unavailable - smart fallback: {uncovered_count} uncovered fires, repositioning {idle_count} idle units, deploying {smart_deploy} new",
deploy_count=smart_deploy,
reposition_units=assessment.ineffective_units[:],
priority_targets=priority_targets[:5]
)
try:
# Parse units to reposition
reposition_data = response.get("units_to_reposition", [])
reposition_units = []
for item in reposition_data:
if isinstance(item, list) and len(item) >= 3:
sx, sy, utype = item[0], item[1], item[2]
for u in assessment.ineffective_units:
if u["x"] == sx and u["y"] == sy:
reposition_units.append(u)
break
# If reposition_needed but no specific units, use all ineffective
if response.get("reposition_needed", False) and not reposition_units:
reposition_units = assessment.ineffective_units[:]
# Map priority_fire_indices to actual fires
priority_indices = response.get("priority_fire_indices", [0, 1, 2])
selected_targets = []
for i in priority_indices:
if isinstance(i, int) and i < len(priority_targets):
selected_targets.append(priority_targets[i])
if not selected_targets:
selected_targets = priority_targets[:5]
return PlanResult(
strategy=response.get("strategy", "balanced"),
reasoning=response.get("reasoning", ""),
deploy_count=response.get("deploy_count", 0),
reposition_units=reposition_units,
priority_targets=selected_targets
)
except Exception as e:
print(f"Error parsing AI plan: {e}")
# Smart fallback: calculate deploy count based on situation
uncovered_count = len(assessment.uncovered_fires)
idle_count = len(assessment.ineffective_units)
fires_after_reposition = max(0, uncovered_count - idle_count)
building_threats = len(assessment.building_threats)
# Smart deploy calculation
if building_threats > 0:
smart_deploy = max(building_threats, fires_after_reposition)
elif uncovered_count <= 2:
smart_deploy = fires_after_reposition
else:
smart_deploy = fires_after_reposition + 1
smart_deploy = min(smart_deploy, available_slots)
return PlanResult(
strategy="balanced",
reasoning=f"Plan parse error: {e} - using smart fallback",
deploy_count=smart_deploy,
reposition_units=assessment.ineffective_units[:],
priority_targets=priority_targets[:5]
)
# =========================================================================
# Stage 3: Execution
# =========================================================================
def execute(
self,
world_state: dict,
assessment: AssessmentResult,
plan: PlanResult
) -> list[Recommendation]:
"""
Stage 3: Generate specific deployment/move recommendations using AI.
"""
# Skip if strategy is monitor
if plan.strategy == "monitor":
return []
execute_config = PROMPTS_CONFIG.get("execute", {})
system = execute_config.get("system", "")
output_format = execute_config.get("output_format", "")
fires = world_state.get("fires", [])
units = world_state.get("units", [])
buildings = world_state.get("buildings", [])
width = world_state.get("width", 10)
height = world_state.get("height", 10)
user_message = f"""Assessment:
- Threat Level: {assessment.threat_level}
- Summary: {assessment.summary}
- Uncovered Fires: {len(assessment.uncovered_fires)}
- Effective Units: {len(assessment.effective_units)}
- Ineffective Units: {len(assessment.ineffective_units)}
Plan:
- Strategy: {plan.strategy}
- Reasoning: {plan.reasoning}
- Deploy Count: {plan.deploy_count}
- Reposition Needed: {len(plan.reposition_units) > 0}
World State:
- Grid: {width}x{height}
UNCOVERED FIRES (PRIORITY TARGETS - these need units!):
{json.dumps([{"x": f["x"], "y": f["y"], "intensity": f["intensity"]} for f in assessment.uncovered_fires[:5]], indent=2)}
INEFFECTIVE UNITS (MOVE THESE to uncovered fires!):
{json.dumps([{"x": u["x"], "y": u["y"], "type": u["type"]} for u in plan.reposition_units[:5]], indent=2)}
All Fire positions: {json.dumps([(f["x"], f["y"], round(f["intensity"], 2)) for f in fires[:15]])}
All Unit positions: {json.dumps([(u["x"], u["y"], u["type"]) for u in units])}
Building positions: {json.dumps([(b["x"], b["y"]) for b in buildings[:15]])}
INSTRUCTIONS:
1. FIRST generate MOVE actions for ineffective units β†’ move them to uncovered fires
2. THEN generate DEPLOY actions if more units needed
3. Max {MAX_RECOMMENDATIONS} recommendations total
4. Remember: deploy ADJACENT to fire (1-2 cells away), not ON the fire
Output format:
{output_format}"""
response = self._call_llm(system, user_message)
if not response:
# Generate basic recommendations if AI fails
return self._generate_fallback_recommendations(world_state, assessment, plan)
try:
recommendations = []
raw_recs = response.get("recommendations", [])
# Get blocked positions
fire_positions = set((f["x"], f["y"]) for f in fires)
unit_positions = set((u["x"], u["y"]) for u in units)
building_positions = set((b["x"], b["y"]) for b in buildings)
used_positions = set()
for rec in raw_recs[:MAX_RECOMMENDATIONS]: # Limit to UI capacity
action = rec.get("action", "deploy")
unit_type = rec.get("unit_type", "fire_truck")
target = rec.get("target", {})
target_x = target.get("x", 0)
target_y = target.get("y", 0)
reason = rec.get("reason", "AI recommendation")
# Validate target position
pos = (target_x, target_y)
if pos in fire_positions or pos in building_positions or pos in used_positions:
# Try to find valid nearby position
valid_pos = self._find_deploy_position(
target_x, target_y, world_state,
exclude_positions=used_positions | unit_positions
)
if valid_pos:
target_x, target_y = valid_pos
else:
continue
used_positions.add((target_x, target_y))
if action == "move":
source = rec.get("source", {})
source_x = source.get("x", -1)
source_y = source.get("y", -1)
# Validate source position has a unit
if (source_x, source_y) not in unit_positions:
continue
recommendations.append(Recommendation(
reason=reason,
suggested_unit_type=unit_type,
target_x=target_x,
target_y=target_y,
action="move",
source_x=source_x,
source_y=source_y
))
elif action == "remove":
# Remove action: remove unit at position
position = rec.get("position", {})
pos_x = position.get("x", target_x)
pos_y = position.get("y", target_y)
unit_type = rec.get("unit_type", "fire_truck")
# Validate position has a unit
if (pos_x, pos_y) not in unit_positions:
continue
recommendations.append(Recommendation(
reason=reason,
suggested_unit_type=unit_type,
target_x=pos_x,
target_y=pos_y,
action="remove"
))
else:
recommendations.append(Recommendation(
reason=reason,
suggested_unit_type=unit_type,
target_x=target_x,
target_y=target_y,
action="deploy"
))
return recommendations
except Exception as e:
print(f"Error parsing AI execution: {e}")
return self._generate_fallback_recommendations(world_state, assessment, plan)
def summarize(
self,
world_state: dict,
assessment: AssessmentResult,
plan: PlanResult,
recommendations: list[Recommendation],
advisor_response: AdvisorResponse,
) -> CycleSummary:
"""Stage 4: Summarize the cycle results using AI."""
summary_config = PROMPTS_CONFIG.get("summary", {})
system = summary_config.get("system", "")
output_format = summary_config.get("output_format", "")
fires = world_state.get("fires", [])
units = world_state.get("units", [])
tick = world_state.get("tick", 0)
status = world_state.get("status", "running")
rec_blocks = []
for idx, rec in enumerate(recommendations, 1):
block = {
"index": idx,
"action": rec.action,
"unit_type": rec.suggested_unit_type,
"target": {"x": rec.target_x, "y": rec.target_y},
"source": {"x": rec.source_x, "y": rec.source_y} if rec.action == "move" else None,
"reason": rec.reason,
}
rec_blocks.append(block)
user_message = f"""Tick: {tick} | Status: {status}
Threat Level: {assessment.threat_level} | Building Integrity: {assessment.building_integrity:.0%}
Fires: {assessment.fire_count} | Uncovered Fires: {len(assessment.uncovered_fires)}
Idle Units: {len(assessment.ineffective_units)} | Total Units: {assessment.unit_count}/{assessment.max_units}
Stage 1 Summary:
{assessment.summary}
Stage 2 Strategy:
- Strategy: {plan.strategy}
- Reasoning: {plan.reasoning}
- Deploy Count: {plan.deploy_count}
- Reposition Units: {len(plan.reposition_units)}
Stage 3 Recommendations:
{json.dumps(rec_blocks[:5], indent=2)}
World Snapshot (first 5 fires / units):
Fires -> {json.dumps(fires[:5], indent=2)}
Units -> {json.dumps(units[:5], indent=2)}
OUTPUT FORMAT:
{output_format}
"""
response = self._call_llm(system, user_message)
if not response:
return CycleSummary(
headline=advisor_response.summary if advisor_response else "Cycle summary unavailable",
threat_level=assessment.threat_level,
key_highlights=[advisor_response.analysis or "Analysis unavailable."],
risks=["Summary model unavailable."],
next_focus=["Review building-adjacent fires manually."],
)
def _coerce_items(value, fallback):
if isinstance(value, list):
cleaned = [str(item).strip() for item in value if str(item).strip()]
return cleaned or fallback
if isinstance(value, str) and value.strip():
return [value.strip()]
return fallback
headline = str(response.get("headline", advisor_response.summary if advisor_response else "Cycle summary")).strip()
threat_level = str(response.get("threat_level", assessment.threat_level or "MODERATE")).strip()
key_highlights = _coerce_items(response.get("key_highlights"), [advisor_response.analysis or "Highlights unavailable."])
risks = _coerce_items(response.get("risks"), ["No risks provided."])
next_focus = _coerce_items(response.get("next_focus"), ["Maintain coverage on building threats."])
return CycleSummary(
headline=headline or "Cycle summary",
threat_level=threat_level or (assessment.threat_level or "MODERATE"),
key_highlights=key_highlights,
risks=risks,
next_focus=next_focus,
)
def _generate_fallback_recommendations(
self,
world_state: dict,
assessment: AssessmentResult,
plan: PlanResult
) -> list[Recommendation]:
"""Generate SMART recommendations when AI fails - prioritize buildings, deploy efficiently!"""
recommendations = []
units = world_state.get("units", [])
buildings = world_state.get("buildings", [])
unit_positions = set((u["x"], u["y"]) for u in units)
building_positions = set((b["x"], b["y"]) for b in buildings)
used_positions = set()
# Helper: check if fire threatens building
def threatens_building(fire):
for bx, by in building_positions:
if abs(fire["x"] - bx) + abs(fire["y"] - by) <= 2:
return True
return False
# Sort uncovered fires: building threats FIRST, then by intensity
priority_fires = sorted(
assessment.uncovered_fires,
key=lambda f: (-int(threatens_building(f)), -f.get("intensity", 0))
)
# Count building threats
building_threat_count = sum(1 for f in priority_fires if threatens_building(f))
# Move ALL ineffective units to priority fires (this is free optimization!)
for i, unit in enumerate(plan.reposition_units):
if i >= len(priority_fires):
break
target_fire = priority_fires[i]
deploy_pos = self._find_deploy_position(
target_fire["x"], target_fire["y"], world_state,
exclude_positions=used_positions | unit_positions - {(unit["x"], unit["y"])}
)
if deploy_pos:
used_positions.add(deploy_pos)
is_building_threat = threatens_building(target_fire)
recommendations.append(Recommendation(
reason=f"{'🏒 BUILDING THREAT! ' if is_building_threat else ''}Move to cover fire at ({target_fire['x']}, {target_fire['y']})",
suggested_unit_type=unit["type"],
target_x=deploy_pos[0],
target_y=deploy_pos[1],
action="move",
source_x=unit["x"],
source_y=unit["y"]
))
# SMART deploy calculation: only deploy what we actually need
available_slots = assessment.max_units - assessment.unit_count
remaining_fires = priority_fires[len(recommendations):]
remaining_building_threats = sum(1 for f in remaining_fires if threatens_building(f))
# Calculate smart deploy count
if remaining_building_threats > 0:
# Building emergency! Deploy enough to cover ALL building threats
smart_deploy_count = max(remaining_building_threats, min(len(remaining_fires), available_slots))
elif len(remaining_fires) <= 2:
# Few fires - deploy just enough
smart_deploy_count = len(remaining_fires)
elif len(remaining_fires) <= 5:
# Moderate fires - deploy to cover + small buffer
smart_deploy_count = min(len(remaining_fires) + 1, available_slots)
else:
# Many fires - deploy more but not all
smart_deploy_count = min(len(remaining_fires), available_slots)
# Deploy to remaining uncovered fires (up to smart_deploy_count)
for i, fire in enumerate(remaining_fires[:smart_deploy_count]):
deploy_pos = self._find_deploy_position(
fire["x"], fire["y"], world_state,
exclude_positions=used_positions | unit_positions
)
if deploy_pos:
used_positions.add(deploy_pos)
is_building_threat = threatens_building(fire)
# Use fire_truck for building threats and high intensity (40% power)
unit_type = "fire_truck" if is_building_threat or fire.get("intensity", 0) > 0.5 else "helicopter"
recommendations.append(Recommendation(
reason=f"{'🏒 BUILDING THREAT! ' if is_building_threat else ''}Deploy to cover fire at ({fire['x']}, {fire['y']})",
suggested_unit_type=unit_type,
target_x=deploy_pos[0],
target_y=deploy_pos[1],
action="deploy"
))
return self._prioritize_recommendations(
recommendations,
plan.deploy_count,
smart_deploy_count,
)
def _prioritize_recommendations(
self,
recommendations: list[Recommendation],
plan_deploy_target: int,
smart_deploy_target: int,
max_actions: int = MAX_RECOMMENDATIONS,
) -> list[Recommendation]:
"""
Ensure we return a balanced mix of move/deploy actions without exceeding UI limits.
"""
if len(recommendations) <= max_actions:
return recommendations
deploy_recs = [rec for rec in recommendations if rec.action == "deploy"]
move_recs = [rec for rec in recommendations if rec.action == "move"]
other_recs = [rec for rec in recommendations if rec.action not in ("deploy", "move")]
deploy_priority = 0
if deploy_recs:
deploy_priority = max(plan_deploy_target, smart_deploy_target, 1)
move_priority = len(move_recs)
other_priority = len(other_recs)
priority_pairs = []
if deploy_priority:
priority_pairs.append(("deploy", deploy_priority))
if move_priority:
priority_pairs.append(("move", move_priority))
if other_priority:
priority_pairs.append(("other", other_priority))
if not priority_pairs:
return recommendations[:max_actions]
priority_pairs.sort(key=lambda item: item[1], reverse=True)
ordered_types = [ptype for ptype, _ in priority_pairs]
# Ensure every action type gets a chance once primary priorities are exhausted
for action_type in ("deploy", "move", "other"):
if action_type not in ordered_types:
ordered_types.append(action_type)
pools = {"deploy": deploy_recs, "move": move_recs, "other": other_recs}
indices = {key: 0 for key in pools}
selected: list[Recommendation] = []
while len(selected) < max_actions:
added = False
for action_type in ordered_types:
pool = pools[action_type]
idx = indices[action_type]
if idx >= len(pool):
continue
selected.append(pool[idx])
indices[action_type] += 1
added = True
if len(selected) >= max_actions:
break
if not added:
break
return selected
# =========================================================================
# After-Action Report
# =========================================================================
def generate_after_action_report(self, context: dict) -> AfterActionReport:
"""
Build an after-action report using Assessment / Planning / Execution transcripts.
"""
after_action_config = PROMPTS_CONFIG.get("after_action", {})
system = after_action_config.get("system", "")
output_format = after_action_config.get("output_format", "")
outcome = context.get("outcome", "unknown")
default_summary = context.get("summary_text") or f"Mission outcome: {outcome}"
report = AfterActionReport(
summary=default_summary,
strengths=[],
improvements=[],
next_actions=[],
outcome=outcome,
)
report.charts = {
"metrics": context.get("chart_points") or [],
"threat_levels": context.get("threat_history") or [],
"action_density": context.get("action_history") or [],
}
report.player_actions = context.get("player_actions_context") or {}
if not self.client:
report.error = "Missing HF_TOKEN – unable to generate AI after-action report."
return report
def _section(title: str, body: str) -> str:
if not body:
return f"{title}\n(no data available)\n"
return f"{title}\n{body}\n"
header_lines = [
f"Mission Outcome: {context.get('outcome_label', outcome)}",
f"Tick: {context.get('tick', 0)}",
f"Fires Remaining: {context.get('fires_remaining', 0)}",
f"Units Active: {context.get('units_active', 0)}",
f"Building Integrity: {context.get('building_integrity_percent', 'N/A')}",
]
mission_summary = context.get("summary_text", "")
if mission_summary:
header_lines.append(f"Mission Summary: {mission_summary}")
cycle_summaries = context.get("cycle_summaries") or []
if cycle_summaries:
summary_lines = []
for entry in cycle_summaries:
tick = entry.get("tick", "?")
headline = entry.get("headline", "No headline")
threat = entry.get("threat_level", "N/A")
highlights = entry.get("key_highlights") or []
risks = entry.get("risks") or []
next_focus = entry.get("next_focus") or []
block = [
f"- [Tick {tick}] {headline} (Threat: {threat})",
]
if highlights:
block.append(" β€’ Highlights: " + "; ".join(highlights))
if risks:
block.append(" β€’ Risks: " + "; ".join(risks))
if next_focus:
block.append(" β€’ Next Focus: " + "; ".join(next_focus))
summary_lines.append("\n".join(block))
history_block = "\n".join(summary_lines)
else:
history_block = "- No prior cycles captured."
user_sections = [
"Mission Status Summary:",
"\n".join(header_lines),
"",
_section("Stage 1 Β· Assessment", context.get("assessment_md", "")),
_section("Stage 2 Β· Planning", context.get("planning_md", "")),
_section("Stage 3 Β· Execution", context.get("execution_md", "")),
_section("Player Manual Actions", context.get("player_actions_md", "")),
"Historical Cycle Summaries:",
history_block,
]
if output_format:
user_sections.append("Please reply strictly using the JSON schema below:")
user_sections.append(output_format)
user_message = "\n\n".join(user_sections)
response = self._call_llm(system or "You are a mission debrief analyst.", user_message)
if not response:
report.error = "Failed to retrieve AI response."
return report
def _coerce_list(value) -> list[str]:
if isinstance(value, list):
return [str(item).strip() for item in value if str(item).strip()]
if isinstance(value, str) and value.strip():
return [value.strip()]
return []
report.summary = str(response.get("summary", report.summary)).strip() or report.summary
report.strengths = _coerce_list(response.get("strengths"))
report.improvements = _coerce_list(response.get("improvements"))
report.next_actions = _coerce_list(response.get("next_actions"))
if not (report.strengths or report.improvements or report.next_actions):
report.error = "AI response did not contain any usable sections."
return report
# =========================================================================
# Main Entry Point
# =========================================================================
def analyze(self, world_state: dict) -> AdvisorResponse:
"""
Main entry point: Run multi-stage analysis pipeline.
Stage 1: Assessment - Analyze the situation
Stage 2: Planning - Decide strategy
Stage 3: Execution - Generate specific actions
"""
# Stage 1: Assessment
assessment = self.assess(world_state)
# Stage 2: Planning
plan = self.plan(world_state, assessment)
# Stage 3: Execution
recommendations = self.execute(world_state, assessment, plan)
# Build thinking summary
thinking_parts = [
f"πŸ“Š Scanning {assessment.fire_count} active fires...",
]
if assessment.uncovered_fires:
thinking_parts.append(f"🚨 ALERT: {len(assessment.uncovered_fires)} fire(s) with NO coverage!")
if assessment.building_threats:
thinking_parts.append(f"🏒 {len(assessment.building_threats)} fire(s) threatening buildings!")
if assessment.ineffective_units:
thinking_parts.append(f"πŸ”„ {len(assessment.ineffective_units)} idle unit(s) should be repositioned")
thinking_parts.append(f"🎯 Strategy: {plan.strategy.upper()} - {plan.reasoning}")
# Generate summary based on threat level
priority_emoji = {
"CRITICAL": "πŸ”΄",
"HIGH": "🟠",
"MODERATE": "🟑",
"LOW": "🟒"
}
emoji = priority_emoji.get(assessment.threat_level, "βšͺ")
if assessment.threat_level == "CRITICAL":
summary = f"{emoji} CRITICAL: {assessment.summary}. Immediate action required!"
elif assessment.threat_level == "HIGH":
summary = f"{emoji} HIGH: {assessment.summary}. Rapid response needed."
elif assessment.threat_level == "MODERATE":
summary = f"{emoji} MODERATE: {assessment.summary}. Tactical deployment advised."
else:
summary = f"{emoji} LOW: {assessment.summary}. Monitoring situation."
return AdvisorResponse(
summary=summary,
recommendations=recommendations,
thinking="\n".join(thinking_parts),
analysis=f"{assessment.fire_count} fires | {assessment.unit_count}/{assessment.max_units} units | {assessment.building_integrity:.0%} building integrity",
priority=assessment.threat_level,
assessment=assessment,
plan=plan
)
# =========================================================================
# Helper Methods
# =========================================================================
def _find_deploy_position(
self,
fire_x: int,
fire_y: int,
world_state: dict,
exclude_positions: set = None
) -> tuple[int, int] | None:
"""
Find a valid deployment position adjacent to a fire.
Units cannot deploy on burning cells, so we find the nearest empty cell.
"""
if exclude_positions is None:
exclude_positions = set()
fires = world_state.get("fires", [])
units = world_state.get("units", [])
buildings = world_state.get("buildings", [])
width = world_state.get("width", 10)
height = world_state.get("height", 10)
fire_positions = set((f["x"], f["y"]) for f in fires)
unit_positions = set((u["x"], u["y"]) for u in units)
building_positions = set((b["x"], b["y"]) for b in buildings)
# Check positions at increasing distances
for distance in [1, 2, 3]:
candidates = []
for dx in range(-distance, distance + 1):
for dy in range(-distance, distance + 1):
if abs(dx) != distance and abs(dy) != distance:
continue
nx, ny = fire_x + dx, fire_y + dy
# Check bounds
if not (0 <= nx < width and 0 <= ny < height):
continue
# Skip invalid positions
if (nx, ny) in fire_positions:
continue
if (nx, ny) in unit_positions:
continue
if (nx, ny) in building_positions:
continue
if (nx, ny) in exclude_positions:
continue
# Valid candidate
dist_to_fire = abs(nx - fire_x) + abs(ny - fire_y)
candidates.append((nx, ny, dist_to_fire))
if candidates:
candidates.sort(key=lambda c: c[2])
return (candidates[0][0], candidates[0][1])
return None
# Backward compatibility
def _fallback_analyze(self, world_state: dict) -> AdvisorResponse:
"""Fallback method for service.py compatibility."""
return self.analyze(world_state)
# Add method to class
AdvisorAgent._fallback_analyze = _fallback_analyze