""" Fire-Rescue - Multi-Stage LLM Advisor Agent Provides advisory recommendations based on world state analysis using a multi-stage approach: Assessment โ†’ Planning โ†’ Execution. The agent only suggests actions; it does not directly control units. All analysis is performed by the AI - no rule-based fallback. Uses HuggingFace Inference Provider API with openai/gpt-oss-120b model. """ import json import os from dataclasses import dataclass, field from pathlib import Path from typing import Optional import yaml from openai import OpenAI # Load .env file if it exists try: from dotenv import load_dotenv env_path = Path(__file__).parent / ".env" if env_path.exists(): load_dotenv(env_path) except ImportError: pass # dotenv not installed, rely on environment variables # ============================================================================= # HuggingFace Inference Provider Configuration # ============================================================================= # HuggingFace Inference Provider base URL (OpenAI-compatible) HF_INFERENCE_BASE_URL = "https://router.huggingface.co/v1" HF_DEFAULT_MODEL = "openai/gpt-oss-120b" # OpenAI native configuration OPENAI_DEFAULT_MODEL = "gpt-5-mini" OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") OPENAI_API_ENV_VAR = "OPENAI_API_KEY" def get_hf_token() -> str | None: """ Get HuggingFace token from environment variable. Returns: HF_TOKEN if available, None otherwise """ return os.getenv("HF_TOKEN") def get_openai_api_key() -> str | None: """ Get OpenAI API key from environment variable. Returns: OPENAI_API_KEY if available, None otherwise """ return os.getenv(OPENAI_API_ENV_VAR) # ============================================================================= # Stage 3 action cap + Prompt loading # ============================================================================= # Stage 3 UI uses MAX_RECOMMENDATIONS to control how many actions can be rendered MAX_RECOMMENDATIONS = 15 PROMPT_PLACEHOLDERS = {"{{MAX_RECOMMENDATIONS}}": str(MAX_RECOMMENDATIONS)} def load_prompts() -> dict: """Load prompts from prompts.yaml configuration file.""" prompts_path = Path(__file__).parent / "prompts.yaml" if prompts_path.exists(): with open(prompts_path, "r", encoding="utf-8") as f: return yaml.safe_load(f) return {} def _apply_prompt_placeholders(value): """Recursively replace placeholder tokens inside prompt config.""" if isinstance(value, str): for token, replacement in PROMPT_PLACEHOLDERS.items(): value = value.replace(token, replacement) return value if isinstance(value, dict): return {k: _apply_prompt_placeholders(v) for k, v in value.items()} if isinstance(value, list): return [_apply_prompt_placeholders(item) for item in value] return value PROMPTS_CONFIG = _apply_prompt_placeholders(load_prompts()) # ============================================================================= # Data Models # ============================================================================= @dataclass class Recommendation: """A single deployment or move recommendation from the advisor.""" reason: str suggested_unit_type: str target_x: int target_y: int action: str = "deploy" # "deploy", "move", or "remove" source_x: int = -1 # For move action: source position source_y: int = -1 def to_dict(self) -> dict: result = { "reason": self.reason, "suggested_unit_type": self.suggested_unit_type, "target": {"x": self.target_x, "y": self.target_y}, "action": self.action } if self.action == "move": result["source"] = {"x": self.source_x, "y": self.source_y} return result @dataclass class AssessmentResult: """Result from Stage 1: Assessment - analyzing the current situation.""" fire_count: int high_intensity_fires: list # fires > 70% building_threats: list # fires near buildings uncovered_fires: list # fires with no unit in range unit_count: int max_units: int effective_units: list # units in range of fires ineffective_units: list # units not covering any fire coverage_ratio: float # ratio of fires covered by units threat_level: str # CRITICAL / HIGH / MODERATE / LOW summary: str building_integrity: float def to_dict(self) -> dict: return { "fire_count": self.fire_count, "high_intensity_fires": len(self.high_intensity_fires), "building_threats": len(self.building_threats), "uncovered_fires": len(self.uncovered_fires), "unit_count": self.unit_count, "max_units": self.max_units, "effective_units": len(self.effective_units), "ineffective_units": len(self.ineffective_units), "coverage_ratio": round(self.coverage_ratio, 2), "threat_level": self.threat_level, "summary": self.summary, "building_integrity": round(self.building_integrity, 2) } @dataclass class PlanResult: """Result from Stage 2: Planning - deciding the strategy.""" strategy: str # "deploy_new" / "optimize_existing" / "balanced" / "monitor" reasoning: str deploy_count: int # how many new units to deploy reposition_units: list # units to move (list of dicts with source and reason) priority_targets: list # fires to prioritize (sorted by priority) def to_dict(self) -> dict: return { "strategy": self.strategy, "reasoning": self.reasoning, "deploy_count": self.deploy_count, "reposition_count": len(self.reposition_units), "priority_targets": len(self.priority_targets) } @dataclass class AdvisorResponse: """Complete response from the advisor agent.""" summary: str recommendations: list[Recommendation] thinking: str = "" # Chain of thought reasoning analysis: str = "" # Situation analysis priority: str = "" # Priority assessment raw_response: Optional[str] = None error: Optional[str] = None # Multi-stage results assessment: Optional[AssessmentResult] = None plan: Optional[PlanResult] = None def to_dict(self) -> dict: result = { "summary": self.summary, "recommendations": [r.to_dict() for r in self.recommendations], "thinking": self.thinking, "analysis": self.analysis, "priority": self.priority } if self.assessment: result["assessment"] = self.assessment.to_dict() if self.plan: result["plan"] = self.plan.to_dict() return result @dataclass class CycleSummary: """Stage 4 summary for a single advisor cycle.""" headline: str threat_level: str key_highlights: list[str] risks: list[str] next_focus: list[str] def to_dict(self) -> dict: return { "headline": self.headline, "threat_level": self.threat_level, "key_highlights": self.key_highlights, "risks": self.risks, "next_focus": self.next_focus, } @dataclass class AfterActionReport: """Structured after-action report summarizing mission outcome.""" summary: str strengths: list[str] improvements: list[str] next_actions: list[str] outcome: str = "" error: Optional[str] = None charts: dict = field(default_factory=dict) player_actions: dict = field(default_factory=dict) def to_dict(self) -> dict: return { "summary": self.summary, "strengths": self.strengths, "improvements": self.improvements, "next_actions": self.next_actions, "outcome": self.outcome, "error": self.error, "charts": self.charts, "player_actions": self.player_actions, } # ============================================================================= # Advisor Agent # ============================================================================= class AdvisorAgent: """ Multi-stage LLM-based advisor agent that analyzes world state and provides recommendations. Stage 1: ASSESS - Analyze the situation Stage 2: PLAN - Decide strategy Stage 3: EXECUTE - Generate specific actions All analysis is performed by the AI model - no fallback logic. Uses HuggingFace Inference Provider API. """ def __init__( self, api_key: Optional[str] = None, model: Optional[str] = None, provider: str = "hf", base_url: Optional[str] = None, ): """ Initialize the advisor agent. Args: api_key: Optional override for provider API key model: Model to use for inference (defaults depend on provider) provider: "hf" for HuggingFace Router or "openai" for native OpenAI base_url: Optional override for API base URL """ self.provider = (provider or "hf").lower() if self.provider not in {"hf", "openai"}: self.provider = "hf" if self.provider == "openai": self.api_key = api_key or get_openai_api_key() self.base_url = base_url or OPENAI_BASE_URL else: self.api_key = api_key or get_hf_token() self.base_url = base_url or HF_INFERENCE_BASE_URL # Load model config from prompts.yaml model_config = PROMPTS_CONFIG.get("model", {}) # Model priority: explicit param > prompts.yaml > default yaml_model = model_config.get("default") if self.provider == "hf" else None provider_default_model = OPENAI_DEFAULT_MODEL if self.provider == "openai" else HF_DEFAULT_MODEL self.model = model or yaml_model or provider_default_model self.temperature = model_config.get("temperature") # None if not set self.max_completion_tokens = model_config.get("max_completion_tokens", 2000) # Initialize client if self.api_key: provider_label = "OpenAI" if self.provider == "openai" else "HuggingFace Inference" print(f"๐Ÿค– AI Advisor initialized with {provider_label} API, model: {self.model}") self.client = OpenAI( api_key=self.api_key, base_url=self.base_url ) else: self.client = None missing_env = OPENAI_API_ENV_VAR if self.provider == "openai" else "HF_TOKEN" print(f"โš ๏ธ Warning: Missing {missing_env}. AI analysis will not work.") # ========================================================================= # JSON Repair Helper # ========================================================================= def _try_repair_json(self, content: str) -> Optional[dict]: """ Attempt to repair truncated JSON by closing open brackets/braces. Returns: Repaired JSON dict, or None if repair failed """ # Count open brackets and braces open_braces = content.count('{') - content.count('}') open_brackets = content.count('[') - content.count(']') # Check if we're in the middle of a string (odd number of unescaped quotes) in_string = False i = 0 while i < len(content): if content[i] == '"' and (i == 0 or content[i-1] != '\\'): in_string = not in_string i += 1 repaired = content # Close any open string if in_string: repaired += '"' # Close brackets and braces in reverse order of opening # This is a simplified repair - close all brackets then all braces repaired += ']' * open_brackets repaired += '}' * open_braces try: return json.loads(repaired) except json.JSONDecodeError: # Try more aggressive repair - find last valid JSON object try: # Try to find a partial valid structure for end_pos in range(len(content), 0, -1): partial = content[:end_pos] open_b = partial.count('{') - partial.count('}') open_br = partial.count('[') - partial.count(']') attempt = partial + ']' * open_br + '}' * open_b try: return json.loads(attempt) except json.JSONDecodeError: continue except Exception: pass return None # ========================================================================= # LLM Call Helper # ========================================================================= def _call_llm(self, system_prompt: str, user_message: str) -> Optional[dict]: """ Make an LLM API call to HuggingFace Inference Provider and parse JSON response. Returns: Parsed JSON dict, or None if failed """ if not self.client: missing_env = OPENAI_API_ENV_VAR if self.provider == "openai" else "HF_TOKEN" print(f"Error: No API client available ({missing_env} not set)") return None # Retry logic for rate limiting (429 errors) max_retries = 3 retry_delay = 5 # seconds for attempt in range(max_retries): try: # Build API call parameters for current provider token_param = "max_completion_tokens" if self.provider == "openai" else "max_tokens" api_params = { "model": self.model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message} ], "response_format": {"type": "json_object"} } api_params[token_param] = self.max_completion_tokens # Only add temperature if explicitly set if self.temperature is not None: api_params["temperature"] = self.temperature response = self.client.chat.completions.create(**api_params) content = response.choices[0].message.content finish_reason = response.choices[0].finish_reason # Check if response was truncated if finish_reason == "length": print(f"โš ๏ธ Warning: Response was truncated (hit token limit)") if content: # Try to parse JSON from the response # Handle potential markdown code blocks in response content = content.strip() if content.startswith("```json"): content = content[7:] if content.startswith("```"): content = content[3:] if content.endswith("```"): content = content[:-3] content = content.strip() try: return json.loads(content) except json.JSONDecodeError as e: # Try to repair truncated JSON print(f"โš ๏ธ JSON parse error: {e}") print(f" Raw content (first 500 chars): {content[:500]}...") repaired = self._try_repair_json(content) if repaired: print("โœ… Successfully repaired truncated JSON") return repaired return None return None except Exception as e: error_str = str(e) # Check for rate limiting (429) error if "429" in error_str or "too_many_requests" in error_str.lower(): if attempt < max_retries - 1: print(f"โš ๏ธ Rate limited (429), retrying in {retry_delay}s... (attempt {attempt + 1}/{max_retries})") import time time.sleep(retry_delay) continue else: print(f"โŒ Rate limited after {max_retries} attempts") return None else: print(f"LLM call error: {e}") return None return None # Should not reach here # ========================================================================= # Stage 1: Assessment # ========================================================================= def assess(self, world_state: dict) -> AssessmentResult: """ Stage 1: Assess the current situation using AI. """ assess_config = PROMPTS_CONFIG.get("assess", {}) system = assess_config.get("system", "") output_format = assess_config.get("output_format", "") fires = world_state.get("fires", []) units = world_state.get("units", []) buildings = world_state.get("buildings", []) building_integrity = world_state.get("building_integrity", 1.0) max_units = world_state.get("max_units", 10) # Prepare detailed user message user_message = f"""Current World State: - Grid size: {world_state.get("width", 10)}x{world_state.get("height", 10)} - Building Integrity: {building_integrity:.1%} - Max Units Allowed: {max_units} FIRES ({len(fires)} total): {json.dumps(fires, indent=2)} UNITS ({len(units)} deployed): {json.dumps(units, indent=2)} BUILDINGS ({len(buildings)} total): {json.dumps(buildings[:20], indent=2)}{" (showing first 20)" if len(buildings) > 20 else ""} Remember: - Fire Truck effective range: covers 1 tile outward from its center - Helicopter effective range: covers 2 tiles outward from its center - A fire is UNCOVERED if no unit is within range - A unit is INEFFECTIVE if no fire is within its range Output format: {output_format}""" full_prompt = system + "\n\n" + output_format response = self._call_llm(full_prompt, user_message) if not response: # Return minimal assessment if AI fails return AssessmentResult( fire_count=len(fires), high_intensity_fires=[f for f in fires if f.get("intensity", 0) > 0.7], building_threats=[], uncovered_fires=fires[:], # Assume all uncovered if AI fails unit_count=len(units), max_units=max_units, effective_units=[], ineffective_units=units[:], # Assume all ineffective if AI fails coverage_ratio=0.0, threat_level="HIGH", summary="AI assessment unavailable - assuming high threat", building_integrity=building_integrity ) try: fire_analysis = response.get("fire_analysis", {}) unit_analysis = response.get("unit_analysis", {}) # Parse uncovered fire positions uncovered_positions = fire_analysis.get("uncovered_fire_positions", []) uncovered_fires = [] for pos in uncovered_positions: if isinstance(pos, list) and len(pos) >= 2: for f in fires: if f["x"] == pos[0] and f["y"] == pos[1]: uncovered_fires.append(f) break # Parse high intensity positions high_positions = fire_analysis.get("high_intensity_positions", []) high_intensity_fires = [] for pos in high_positions: if isinstance(pos, list) and len(pos) >= 2: for f in fires: if f["x"] == pos[0] and f["y"] == pos[1]: high_intensity_fires.append(f) break # Fallback to intensity check if not high_intensity_fires: high_intensity_fires = [f for f in fires if f.get("intensity", 0) > 0.7] # Parse building threat positions building_threat_positions = fire_analysis.get("building_threat_positions", []) building_threats = [] for pos in building_threat_positions: if isinstance(pos, list) and len(pos) >= 2: for f in fires: if f["x"] == pos[0] and f["y"] == pos[1]: building_threats.append(f) break # Parse ineffective units ineffective_positions = unit_analysis.get("ineffective_positions", []) ineffective_units = [] for pos in ineffective_positions: if isinstance(pos, list) and len(pos) >= 2: for u in units: if u["x"] == pos[0] and u["y"] == pos[1]: ineffective_units.append(u) break # Calculate effective units (all units not in ineffective list) ineffective_set = set((u["x"], u["y"]) for u in ineffective_units) effective_units = [u for u in units if (u["x"], u["y"]) not in ineffective_set] return AssessmentResult( fire_count=fire_analysis.get("total_fires", len(fires)), high_intensity_fires=high_intensity_fires, building_threats=building_threats, uncovered_fires=uncovered_fires, unit_count=len(units), max_units=max_units, effective_units=effective_units, ineffective_units=ineffective_units, coverage_ratio=unit_analysis.get("coverage_ratio", 0.0), threat_level=response.get("threat_level", "MODERATE"), summary=response.get("summary", ""), building_integrity=building_integrity ) except Exception as e: print(f"Error parsing AI assessment: {e}") return AssessmentResult( fire_count=len(fires), high_intensity_fires=[], building_threats=[], uncovered_fires=fires[:], unit_count=len(units), max_units=max_units, effective_units=[], ineffective_units=units[:], coverage_ratio=0.0, threat_level="HIGH", summary=f"Assessment parse error: {e}", building_integrity=building_integrity ) # ========================================================================= # Stage 2: Planning # ========================================================================= def plan(self, world_state: dict, assessment: AssessmentResult) -> PlanResult: """ Stage 2: Decide the strategy based on assessment using AI. """ plan_config = PROMPTS_CONFIG.get("plan", {}) system = plan_config.get("system", "") output_format = plan_config.get("output_format", "") fires = world_state.get("fires", []) available_slots = assessment.max_units - assessment.unit_count # Sort fires by priority for context buildings = world_state.get("buildings", []) building_positions = set((b["x"], b["y"]) for b in buildings) def fire_priority(f): near_building = any( abs(f["x"] - bx) <= 1 and abs(f["y"] - by) <= 1 for bx, by in building_positions ) return f.get("intensity", 0) + (1.0 if near_building else 0) priority_targets = sorted(fires, key=fire_priority, reverse=True) user_message = f"""Assessment Result: - Threat Level: {assessment.threat_level} - Fire Count: {assessment.fire_count} - High Intensity Fires: {len(assessment.high_intensity_fires)} - Building Threats: {len(assessment.building_threats)} - UNCOVERED Fires (no unit in range): {len(assessment.uncovered_fires)} - Coverage Ratio: {assessment.coverage_ratio:.1%} - Effective Units: {len(assessment.effective_units)} - INEFFECTIVE Units (not near any fire): {len(assessment.ineffective_units)} - Summary: {assessment.summary} Current Resources: - Units deployed: {assessment.unit_count} / {assessment.max_units} - Available slots: {available_slots} UNCOVERED Fires (PRIORITY - these need coverage!): {json.dumps([{"x": f["x"], "y": f["y"], "intensity": f["intensity"]} for f in assessment.uncovered_fires[:5]], indent=2)} INEFFECTIVE Units (SHOULD BE MOVED to cover fires!): {json.dumps([{"x": u["x"], "y": u["y"], "type": u["type"]} for u in assessment.ineffective_units], indent=2)} Priority Fires (top 5): {json.dumps([{"x": f["x"], "y": f["y"], "intensity": f["intensity"]} for f in priority_targets[:5]], indent=2)} REMEMBER: If there are uncovered fires AND ineffective units, you SHOULD reposition those units! Output format: {output_format}""" response = self._call_llm(system, user_message) if not response: # Smart fallback: calculate deploy count based on situation uncovered_count = len(assessment.uncovered_fires) idle_count = len(assessment.ineffective_units) fires_after_reposition = max(0, uncovered_count - idle_count) building_threats = len(assessment.building_threats) # Smart deploy calculation if building_threats > 0: # Building emergency! Deploy enough to cover all building threats smart_deploy = max(building_threats, fires_after_reposition) elif uncovered_count <= 2: # Few fires - deploy just enough smart_deploy = fires_after_reposition elif uncovered_count <= 5: # Moderate fires - deploy to cover + small buffer smart_deploy = fires_after_reposition + 1 else: # Many fires - deploy more aggressively smart_deploy = fires_after_reposition + 2 smart_deploy = min(smart_deploy, available_slots) return PlanResult( strategy="balanced" if assessment.ineffective_units else "deploy_new", reasoning=f"AI planning unavailable - smart fallback: {uncovered_count} uncovered fires, repositioning {idle_count} idle units, deploying {smart_deploy} new", deploy_count=smart_deploy, reposition_units=assessment.ineffective_units[:], priority_targets=priority_targets[:5] ) try: # Parse units to reposition reposition_data = response.get("units_to_reposition", []) reposition_units = [] for item in reposition_data: if isinstance(item, list) and len(item) >= 3: sx, sy, utype = item[0], item[1], item[2] for u in assessment.ineffective_units: if u["x"] == sx and u["y"] == sy: reposition_units.append(u) break # If reposition_needed but no specific units, use all ineffective if response.get("reposition_needed", False) and not reposition_units: reposition_units = assessment.ineffective_units[:] # Map priority_fire_indices to actual fires priority_indices = response.get("priority_fire_indices", [0, 1, 2]) selected_targets = [] for i in priority_indices: if isinstance(i, int) and i < len(priority_targets): selected_targets.append(priority_targets[i]) if not selected_targets: selected_targets = priority_targets[:5] return PlanResult( strategy=response.get("strategy", "balanced"), reasoning=response.get("reasoning", ""), deploy_count=response.get("deploy_count", 0), reposition_units=reposition_units, priority_targets=selected_targets ) except Exception as e: print(f"Error parsing AI plan: {e}") # Smart fallback: calculate deploy count based on situation uncovered_count = len(assessment.uncovered_fires) idle_count = len(assessment.ineffective_units) fires_after_reposition = max(0, uncovered_count - idle_count) building_threats = len(assessment.building_threats) # Smart deploy calculation if building_threats > 0: smart_deploy = max(building_threats, fires_after_reposition) elif uncovered_count <= 2: smart_deploy = fires_after_reposition else: smart_deploy = fires_after_reposition + 1 smart_deploy = min(smart_deploy, available_slots) return PlanResult( strategy="balanced", reasoning=f"Plan parse error: {e} - using smart fallback", deploy_count=smart_deploy, reposition_units=assessment.ineffective_units[:], priority_targets=priority_targets[:5] ) # ========================================================================= # Stage 3: Execution # ========================================================================= def execute( self, world_state: dict, assessment: AssessmentResult, plan: PlanResult ) -> list[Recommendation]: """ Stage 3: Generate specific deployment/move recommendations using AI. """ # Skip if strategy is monitor if plan.strategy == "monitor": return [] execute_config = PROMPTS_CONFIG.get("execute", {}) system = execute_config.get("system", "") output_format = execute_config.get("output_format", "") fires = world_state.get("fires", []) units = world_state.get("units", []) buildings = world_state.get("buildings", []) width = world_state.get("width", 10) height = world_state.get("height", 10) user_message = f"""Assessment: - Threat Level: {assessment.threat_level} - Summary: {assessment.summary} - Uncovered Fires: {len(assessment.uncovered_fires)} - Effective Units: {len(assessment.effective_units)} - Ineffective Units: {len(assessment.ineffective_units)} Plan: - Strategy: {plan.strategy} - Reasoning: {plan.reasoning} - Deploy Count: {plan.deploy_count} - Reposition Needed: {len(plan.reposition_units) > 0} World State: - Grid: {width}x{height} UNCOVERED FIRES (PRIORITY TARGETS - these need units!): {json.dumps([{"x": f["x"], "y": f["y"], "intensity": f["intensity"]} for f in assessment.uncovered_fires[:5]], indent=2)} INEFFECTIVE UNITS (MOVE THESE to uncovered fires!): {json.dumps([{"x": u["x"], "y": u["y"], "type": u["type"]} for u in plan.reposition_units[:5]], indent=2)} All Fire positions: {json.dumps([(f["x"], f["y"], round(f["intensity"], 2)) for f in fires[:15]])} All Unit positions: {json.dumps([(u["x"], u["y"], u["type"]) for u in units])} Building positions: {json.dumps([(b["x"], b["y"]) for b in buildings[:15]])} INSTRUCTIONS: 1. FIRST generate MOVE actions for ineffective units โ†’ move them to uncovered fires 2. THEN generate DEPLOY actions if more units needed 3. Max {MAX_RECOMMENDATIONS} recommendations total 4. Remember: deploy ADJACENT to fire (1-2 cells away), not ON the fire Output format: {output_format}""" response = self._call_llm(system, user_message) if not response: # Generate basic recommendations if AI fails return self._generate_fallback_recommendations(world_state, assessment, plan) try: recommendations = [] raw_recs = response.get("recommendations", []) # Get blocked positions fire_positions = set((f["x"], f["y"]) for f in fires) unit_positions = set((u["x"], u["y"]) for u in units) building_positions = set((b["x"], b["y"]) for b in buildings) used_positions = set() for rec in raw_recs[:MAX_RECOMMENDATIONS]: # Limit to UI capacity action = rec.get("action", "deploy") unit_type = rec.get("unit_type", "fire_truck") target = rec.get("target", {}) target_x = target.get("x", 0) target_y = target.get("y", 0) reason = rec.get("reason", "AI recommendation") # Validate target position pos = (target_x, target_y) if pos in fire_positions or pos in building_positions or pos in used_positions: # Try to find valid nearby position valid_pos = self._find_deploy_position( target_x, target_y, world_state, exclude_positions=used_positions | unit_positions ) if valid_pos: target_x, target_y = valid_pos else: continue used_positions.add((target_x, target_y)) if action == "move": source = rec.get("source", {}) source_x = source.get("x", -1) source_y = source.get("y", -1) # Validate source position has a unit if (source_x, source_y) not in unit_positions: continue recommendations.append(Recommendation( reason=reason, suggested_unit_type=unit_type, target_x=target_x, target_y=target_y, action="move", source_x=source_x, source_y=source_y )) elif action == "remove": # Remove action: remove unit at position position = rec.get("position", {}) pos_x = position.get("x", target_x) pos_y = position.get("y", target_y) unit_type = rec.get("unit_type", "fire_truck") # Validate position has a unit if (pos_x, pos_y) not in unit_positions: continue recommendations.append(Recommendation( reason=reason, suggested_unit_type=unit_type, target_x=pos_x, target_y=pos_y, action="remove" )) else: recommendations.append(Recommendation( reason=reason, suggested_unit_type=unit_type, target_x=target_x, target_y=target_y, action="deploy" )) return recommendations except Exception as e: print(f"Error parsing AI execution: {e}") return self._generate_fallback_recommendations(world_state, assessment, plan) def summarize( self, world_state: dict, assessment: AssessmentResult, plan: PlanResult, recommendations: list[Recommendation], advisor_response: AdvisorResponse, ) -> CycleSummary: """Stage 4: Summarize the cycle results using AI.""" summary_config = PROMPTS_CONFIG.get("summary", {}) system = summary_config.get("system", "") output_format = summary_config.get("output_format", "") fires = world_state.get("fires", []) units = world_state.get("units", []) tick = world_state.get("tick", 0) status = world_state.get("status", "running") rec_blocks = [] for idx, rec in enumerate(recommendations, 1): block = { "index": idx, "action": rec.action, "unit_type": rec.suggested_unit_type, "target": {"x": rec.target_x, "y": rec.target_y}, "source": {"x": rec.source_x, "y": rec.source_y} if rec.action == "move" else None, "reason": rec.reason, } rec_blocks.append(block) user_message = f"""Tick: {tick} | Status: {status} Threat Level: {assessment.threat_level} | Building Integrity: {assessment.building_integrity:.0%} Fires: {assessment.fire_count} | Uncovered Fires: {len(assessment.uncovered_fires)} Idle Units: {len(assessment.ineffective_units)} | Total Units: {assessment.unit_count}/{assessment.max_units} Stage 1 Summary: {assessment.summary} Stage 2 Strategy: - Strategy: {plan.strategy} - Reasoning: {plan.reasoning} - Deploy Count: {plan.deploy_count} - Reposition Units: {len(plan.reposition_units)} Stage 3 Recommendations: {json.dumps(rec_blocks[:5], indent=2)} World Snapshot (first 5 fires / units): Fires -> {json.dumps(fires[:5], indent=2)} Units -> {json.dumps(units[:5], indent=2)} OUTPUT FORMAT: {output_format} """ response = self._call_llm(system, user_message) if not response: return CycleSummary( headline=advisor_response.summary if advisor_response else "Cycle summary unavailable", threat_level=assessment.threat_level, key_highlights=[advisor_response.analysis or "Analysis unavailable."], risks=["Summary model unavailable."], next_focus=["Review building-adjacent fires manually."], ) def _coerce_items(value, fallback): if isinstance(value, list): cleaned = [str(item).strip() for item in value if str(item).strip()] return cleaned or fallback if isinstance(value, str) and value.strip(): return [value.strip()] return fallback headline = str(response.get("headline", advisor_response.summary if advisor_response else "Cycle summary")).strip() threat_level = str(response.get("threat_level", assessment.threat_level or "MODERATE")).strip() key_highlights = _coerce_items(response.get("key_highlights"), [advisor_response.analysis or "Highlights unavailable."]) risks = _coerce_items(response.get("risks"), ["No risks provided."]) next_focus = _coerce_items(response.get("next_focus"), ["Maintain coverage on building threats."]) return CycleSummary( headline=headline or "Cycle summary", threat_level=threat_level or (assessment.threat_level or "MODERATE"), key_highlights=key_highlights, risks=risks, next_focus=next_focus, ) def _generate_fallback_recommendations( self, world_state: dict, assessment: AssessmentResult, plan: PlanResult ) -> list[Recommendation]: """Generate SMART recommendations when AI fails - prioritize buildings, deploy efficiently!""" recommendations = [] units = world_state.get("units", []) buildings = world_state.get("buildings", []) unit_positions = set((u["x"], u["y"]) for u in units) building_positions = set((b["x"], b["y"]) for b in buildings) used_positions = set() # Helper: check if fire threatens building def threatens_building(fire): for bx, by in building_positions: if abs(fire["x"] - bx) + abs(fire["y"] - by) <= 2: return True return False # Sort uncovered fires: building threats FIRST, then by intensity priority_fires = sorted( assessment.uncovered_fires, key=lambda f: (-int(threatens_building(f)), -f.get("intensity", 0)) ) # Count building threats building_threat_count = sum(1 for f in priority_fires if threatens_building(f)) # Move ALL ineffective units to priority fires (this is free optimization!) for i, unit in enumerate(plan.reposition_units): if i >= len(priority_fires): break target_fire = priority_fires[i] deploy_pos = self._find_deploy_position( target_fire["x"], target_fire["y"], world_state, exclude_positions=used_positions | unit_positions - {(unit["x"], unit["y"])} ) if deploy_pos: used_positions.add(deploy_pos) is_building_threat = threatens_building(target_fire) recommendations.append(Recommendation( reason=f"{'๐Ÿข BUILDING THREAT! ' if is_building_threat else ''}Move to cover fire at ({target_fire['x']}, {target_fire['y']})", suggested_unit_type=unit["type"], target_x=deploy_pos[0], target_y=deploy_pos[1], action="move", source_x=unit["x"], source_y=unit["y"] )) # SMART deploy calculation: only deploy what we actually need available_slots = assessment.max_units - assessment.unit_count remaining_fires = priority_fires[len(recommendations):] remaining_building_threats = sum(1 for f in remaining_fires if threatens_building(f)) # Calculate smart deploy count if remaining_building_threats > 0: # Building emergency! Deploy enough to cover ALL building threats smart_deploy_count = max(remaining_building_threats, min(len(remaining_fires), available_slots)) elif len(remaining_fires) <= 2: # Few fires - deploy just enough smart_deploy_count = len(remaining_fires) elif len(remaining_fires) <= 5: # Moderate fires - deploy to cover + small buffer smart_deploy_count = min(len(remaining_fires) + 1, available_slots) else: # Many fires - deploy more but not all smart_deploy_count = min(len(remaining_fires), available_slots) # Deploy to remaining uncovered fires (up to smart_deploy_count) for i, fire in enumerate(remaining_fires[:smart_deploy_count]): deploy_pos = self._find_deploy_position( fire["x"], fire["y"], world_state, exclude_positions=used_positions | unit_positions ) if deploy_pos: used_positions.add(deploy_pos) is_building_threat = threatens_building(fire) # Use fire_truck for building threats and high intensity (40% power) unit_type = "fire_truck" if is_building_threat or fire.get("intensity", 0) > 0.5 else "helicopter" recommendations.append(Recommendation( reason=f"{'๐Ÿข BUILDING THREAT! ' if is_building_threat else ''}Deploy to cover fire at ({fire['x']}, {fire['y']})", suggested_unit_type=unit_type, target_x=deploy_pos[0], target_y=deploy_pos[1], action="deploy" )) return self._prioritize_recommendations( recommendations, plan.deploy_count, smart_deploy_count, ) def _prioritize_recommendations( self, recommendations: list[Recommendation], plan_deploy_target: int, smart_deploy_target: int, max_actions: int = MAX_RECOMMENDATIONS, ) -> list[Recommendation]: """ Ensure we return a balanced mix of move/deploy actions without exceeding UI limits. """ if len(recommendations) <= max_actions: return recommendations deploy_recs = [rec for rec in recommendations if rec.action == "deploy"] move_recs = [rec for rec in recommendations if rec.action == "move"] other_recs = [rec for rec in recommendations if rec.action not in ("deploy", "move")] deploy_priority = 0 if deploy_recs: deploy_priority = max(plan_deploy_target, smart_deploy_target, 1) move_priority = len(move_recs) other_priority = len(other_recs) priority_pairs = [] if deploy_priority: priority_pairs.append(("deploy", deploy_priority)) if move_priority: priority_pairs.append(("move", move_priority)) if other_priority: priority_pairs.append(("other", other_priority)) if not priority_pairs: return recommendations[:max_actions] priority_pairs.sort(key=lambda item: item[1], reverse=True) ordered_types = [ptype for ptype, _ in priority_pairs] # Ensure every action type gets a chance once primary priorities are exhausted for action_type in ("deploy", "move", "other"): if action_type not in ordered_types: ordered_types.append(action_type) pools = {"deploy": deploy_recs, "move": move_recs, "other": other_recs} indices = {key: 0 for key in pools} selected: list[Recommendation] = [] while len(selected) < max_actions: added = False for action_type in ordered_types: pool = pools[action_type] idx = indices[action_type] if idx >= len(pool): continue selected.append(pool[idx]) indices[action_type] += 1 added = True if len(selected) >= max_actions: break if not added: break return selected # ========================================================================= # After-Action Report # ========================================================================= def generate_after_action_report(self, context: dict) -> AfterActionReport: """ Build an after-action report using Assessment / Planning / Execution transcripts. """ after_action_config = PROMPTS_CONFIG.get("after_action", {}) system = after_action_config.get("system", "") output_format = after_action_config.get("output_format", "") outcome = context.get("outcome", "unknown") default_summary = context.get("summary_text") or f"Mission outcome: {outcome}" report = AfterActionReport( summary=default_summary, strengths=[], improvements=[], next_actions=[], outcome=outcome, ) report.charts = { "metrics": context.get("chart_points") or [], "threat_levels": context.get("threat_history") or [], "action_density": context.get("action_history") or [], } report.player_actions = context.get("player_actions_context") or {} if not self.client: report.error = "Missing HF_TOKEN โ€“ unable to generate AI after-action report." return report def _section(title: str, body: str) -> str: if not body: return f"{title}\n(no data available)\n" return f"{title}\n{body}\n" header_lines = [ f"Mission Outcome: {context.get('outcome_label', outcome)}", f"Tick: {context.get('tick', 0)}", f"Fires Remaining: {context.get('fires_remaining', 0)}", f"Units Active: {context.get('units_active', 0)}", f"Building Integrity: {context.get('building_integrity_percent', 'N/A')}", ] mission_summary = context.get("summary_text", "") if mission_summary: header_lines.append(f"Mission Summary: {mission_summary}") cycle_summaries = context.get("cycle_summaries") or [] if cycle_summaries: summary_lines = [] for entry in cycle_summaries: tick = entry.get("tick", "?") headline = entry.get("headline", "No headline") threat = entry.get("threat_level", "N/A") highlights = entry.get("key_highlights") or [] risks = entry.get("risks") or [] next_focus = entry.get("next_focus") or [] block = [ f"- [Tick {tick}] {headline} (Threat: {threat})", ] if highlights: block.append(" โ€ข Highlights: " + "; ".join(highlights)) if risks: block.append(" โ€ข Risks: " + "; ".join(risks)) if next_focus: block.append(" โ€ข Next Focus: " + "; ".join(next_focus)) summary_lines.append("\n".join(block)) history_block = "\n".join(summary_lines) else: history_block = "- No prior cycles captured." user_sections = [ "Mission Status Summary:", "\n".join(header_lines), "", _section("Stage 1 ยท Assessment", context.get("assessment_md", "")), _section("Stage 2 ยท Planning", context.get("planning_md", "")), _section("Stage 3 ยท Execution", context.get("execution_md", "")), _section("Player Manual Actions", context.get("player_actions_md", "")), "Historical Cycle Summaries:", history_block, ] if output_format: user_sections.append("Please reply strictly using the JSON schema below:") user_sections.append(output_format) user_message = "\n\n".join(user_sections) response = self._call_llm(system or "You are a mission debrief analyst.", user_message) if not response: report.error = "Failed to retrieve AI response." return report def _coerce_list(value) -> list[str]: if isinstance(value, list): return [str(item).strip() for item in value if str(item).strip()] if isinstance(value, str) and value.strip(): return [value.strip()] return [] report.summary = str(response.get("summary", report.summary)).strip() or report.summary report.strengths = _coerce_list(response.get("strengths")) report.improvements = _coerce_list(response.get("improvements")) report.next_actions = _coerce_list(response.get("next_actions")) if not (report.strengths or report.improvements or report.next_actions): report.error = "AI response did not contain any usable sections." return report # ========================================================================= # Main Entry Point # ========================================================================= def analyze(self, world_state: dict) -> AdvisorResponse: """ Main entry point: Run multi-stage analysis pipeline. Stage 1: Assessment - Analyze the situation Stage 2: Planning - Decide strategy Stage 3: Execution - Generate specific actions """ # Stage 1: Assessment assessment = self.assess(world_state) # Stage 2: Planning plan = self.plan(world_state, assessment) # Stage 3: Execution recommendations = self.execute(world_state, assessment, plan) # Build thinking summary thinking_parts = [ f"๐Ÿ“Š Scanning {assessment.fire_count} active fires...", ] if assessment.uncovered_fires: thinking_parts.append(f"๐Ÿšจ ALERT: {len(assessment.uncovered_fires)} fire(s) with NO coverage!") if assessment.building_threats: thinking_parts.append(f"๐Ÿข {len(assessment.building_threats)} fire(s) threatening buildings!") if assessment.ineffective_units: thinking_parts.append(f"๐Ÿ”„ {len(assessment.ineffective_units)} idle unit(s) should be repositioned") thinking_parts.append(f"๐ŸŽฏ Strategy: {plan.strategy.upper()} - {plan.reasoning}") # Generate summary based on threat level priority_emoji = { "CRITICAL": "๐Ÿ”ด", "HIGH": "๐ŸŸ ", "MODERATE": "๐ŸŸก", "LOW": "๐ŸŸข" } emoji = priority_emoji.get(assessment.threat_level, "โšช") if assessment.threat_level == "CRITICAL": summary = f"{emoji} CRITICAL: {assessment.summary}. Immediate action required!" elif assessment.threat_level == "HIGH": summary = f"{emoji} HIGH: {assessment.summary}. Rapid response needed." elif assessment.threat_level == "MODERATE": summary = f"{emoji} MODERATE: {assessment.summary}. Tactical deployment advised." else: summary = f"{emoji} LOW: {assessment.summary}. Monitoring situation." return AdvisorResponse( summary=summary, recommendations=recommendations, thinking="\n".join(thinking_parts), analysis=f"{assessment.fire_count} fires | {assessment.unit_count}/{assessment.max_units} units | {assessment.building_integrity:.0%} building integrity", priority=assessment.threat_level, assessment=assessment, plan=plan ) # ========================================================================= # Helper Methods # ========================================================================= def _find_deploy_position( self, fire_x: int, fire_y: int, world_state: dict, exclude_positions: set = None ) -> tuple[int, int] | None: """ Find a valid deployment position adjacent to a fire. Units cannot deploy on burning cells, so we find the nearest empty cell. """ if exclude_positions is None: exclude_positions = set() fires = world_state.get("fires", []) units = world_state.get("units", []) buildings = world_state.get("buildings", []) width = world_state.get("width", 10) height = world_state.get("height", 10) fire_positions = set((f["x"], f["y"]) for f in fires) unit_positions = set((u["x"], u["y"]) for u in units) building_positions = set((b["x"], b["y"]) for b in buildings) # Check positions at increasing distances for distance in [1, 2, 3]: candidates = [] for dx in range(-distance, distance + 1): for dy in range(-distance, distance + 1): if abs(dx) != distance and abs(dy) != distance: continue nx, ny = fire_x + dx, fire_y + dy # Check bounds if not (0 <= nx < width and 0 <= ny < height): continue # Skip invalid positions if (nx, ny) in fire_positions: continue if (nx, ny) in unit_positions: continue if (nx, ny) in building_positions: continue if (nx, ny) in exclude_positions: continue # Valid candidate dist_to_fire = abs(nx - fire_x) + abs(ny - fire_y) candidates.append((nx, ny, dist_to_fire)) if candidates: candidates.sort(key=lambda c: c[2]) return (candidates[0][0], candidates[0][1]) return None # Backward compatibility def _fallback_analyze(self, world_state: dict) -> AdvisorResponse: """Fallback method for service.py compatibility.""" return self.analyze(world_state) # Add method to class AdvisorAgent._fallback_analyze = _fallback_analyze