|
|
""" |
|
|
Fire-Rescue - Multi-Stage LLM Advisor Agent |
|
|
|
|
|
Provides advisory recommendations based on world state analysis using a |
|
|
multi-stage approach: Assessment β Planning β Execution. |
|
|
|
|
|
The agent only suggests actions; it does not directly control units. |
|
|
All analysis is performed by the AI - no rule-based fallback. |
|
|
|
|
|
Uses HuggingFace Inference Provider API with openai/gpt-oss-120b model. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import os |
|
|
from dataclasses import dataclass, field |
|
|
from pathlib import Path |
|
|
from typing import Optional |
|
|
|
|
|
import yaml |
|
|
from openai import OpenAI |
|
|
|
|
|
|
|
|
try: |
|
|
from dotenv import load_dotenv |
|
|
env_path = Path(__file__).parent / ".env" |
|
|
if env_path.exists(): |
|
|
load_dotenv(env_path) |
|
|
except ImportError: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
HF_INFERENCE_BASE_URL = "https://router.huggingface.co/v1" |
|
|
HF_DEFAULT_MODEL = "openai/gpt-oss-120b" |
|
|
|
|
|
|
|
|
OPENAI_DEFAULT_MODEL = "gpt-5-mini" |
|
|
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") |
|
|
OPENAI_API_ENV_VAR = "OPENAI_API_KEY" |
|
|
|
|
|
def get_hf_token() -> str | None: |
|
|
""" |
|
|
Get HuggingFace token from environment variable. |
|
|
|
|
|
Returns: |
|
|
HF_TOKEN if available, None otherwise |
|
|
""" |
|
|
return os.getenv("HF_TOKEN") |
|
|
|
|
|
|
|
|
def get_openai_api_key() -> str | None: |
|
|
""" |
|
|
Get OpenAI API key from environment variable. |
|
|
|
|
|
Returns: |
|
|
OPENAI_API_KEY if available, None otherwise |
|
|
""" |
|
|
return os.getenv(OPENAI_API_ENV_VAR) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MAX_RECOMMENDATIONS = 15 |
|
|
PROMPT_PLACEHOLDERS = {"{{MAX_RECOMMENDATIONS}}": str(MAX_RECOMMENDATIONS)} |
|
|
|
|
|
def load_prompts() -> dict: |
|
|
"""Load prompts from prompts.yaml configuration file.""" |
|
|
prompts_path = Path(__file__).parent / "prompts.yaml" |
|
|
if prompts_path.exists(): |
|
|
with open(prompts_path, "r", encoding="utf-8") as f: |
|
|
return yaml.safe_load(f) |
|
|
return {} |
|
|
|
|
|
def _apply_prompt_placeholders(value): |
|
|
"""Recursively replace placeholder tokens inside prompt config.""" |
|
|
if isinstance(value, str): |
|
|
for token, replacement in PROMPT_PLACEHOLDERS.items(): |
|
|
value = value.replace(token, replacement) |
|
|
return value |
|
|
if isinstance(value, dict): |
|
|
return {k: _apply_prompt_placeholders(v) for k, v in value.items()} |
|
|
if isinstance(value, list): |
|
|
return [_apply_prompt_placeholders(item) for item in value] |
|
|
return value |
|
|
|
|
|
PROMPTS_CONFIG = _apply_prompt_placeholders(load_prompts()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Recommendation: |
|
|
"""A single deployment or move recommendation from the advisor.""" |
|
|
reason: str |
|
|
suggested_unit_type: str |
|
|
target_x: int |
|
|
target_y: int |
|
|
action: str = "deploy" |
|
|
source_x: int = -1 |
|
|
source_y: int = -1 |
|
|
|
|
|
def to_dict(self) -> dict: |
|
|
result = { |
|
|
"reason": self.reason, |
|
|
"suggested_unit_type": self.suggested_unit_type, |
|
|
"target": {"x": self.target_x, "y": self.target_y}, |
|
|
"action": self.action |
|
|
} |
|
|
if self.action == "move": |
|
|
result["source"] = {"x": self.source_x, "y": self.source_y} |
|
|
return result |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AssessmentResult: |
|
|
"""Result from Stage 1: Assessment - analyzing the current situation.""" |
|
|
fire_count: int |
|
|
high_intensity_fires: list |
|
|
building_threats: list |
|
|
uncovered_fires: list |
|
|
unit_count: int |
|
|
max_units: int |
|
|
effective_units: list |
|
|
ineffective_units: list |
|
|
coverage_ratio: float |
|
|
threat_level: str |
|
|
summary: str |
|
|
building_integrity: float |
|
|
|
|
|
def to_dict(self) -> dict: |
|
|
return { |
|
|
"fire_count": self.fire_count, |
|
|
"high_intensity_fires": len(self.high_intensity_fires), |
|
|
"building_threats": len(self.building_threats), |
|
|
"uncovered_fires": len(self.uncovered_fires), |
|
|
"unit_count": self.unit_count, |
|
|
"max_units": self.max_units, |
|
|
"effective_units": len(self.effective_units), |
|
|
"ineffective_units": len(self.ineffective_units), |
|
|
"coverage_ratio": round(self.coverage_ratio, 2), |
|
|
"threat_level": self.threat_level, |
|
|
"summary": self.summary, |
|
|
"building_integrity": round(self.building_integrity, 2) |
|
|
} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class PlanResult: |
|
|
"""Result from Stage 2: Planning - deciding the strategy.""" |
|
|
strategy: str |
|
|
reasoning: str |
|
|
deploy_count: int |
|
|
reposition_units: list |
|
|
priority_targets: list |
|
|
|
|
|
def to_dict(self) -> dict: |
|
|
return { |
|
|
"strategy": self.strategy, |
|
|
"reasoning": self.reasoning, |
|
|
"deploy_count": self.deploy_count, |
|
|
"reposition_count": len(self.reposition_units), |
|
|
"priority_targets": len(self.priority_targets) |
|
|
} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AdvisorResponse: |
|
|
"""Complete response from the advisor agent.""" |
|
|
summary: str |
|
|
recommendations: list[Recommendation] |
|
|
thinking: str = "" |
|
|
analysis: str = "" |
|
|
priority: str = "" |
|
|
raw_response: Optional[str] = None |
|
|
error: Optional[str] = None |
|
|
|
|
|
assessment: Optional[AssessmentResult] = None |
|
|
plan: Optional[PlanResult] = None |
|
|
|
|
|
def to_dict(self) -> dict: |
|
|
result = { |
|
|
"summary": self.summary, |
|
|
"recommendations": [r.to_dict() for r in self.recommendations], |
|
|
"thinking": self.thinking, |
|
|
"analysis": self.analysis, |
|
|
"priority": self.priority |
|
|
} |
|
|
if self.assessment: |
|
|
result["assessment"] = self.assessment.to_dict() |
|
|
if self.plan: |
|
|
result["plan"] = self.plan.to_dict() |
|
|
return result |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class CycleSummary: |
|
|
"""Stage 4 summary for a single advisor cycle.""" |
|
|
headline: str |
|
|
threat_level: str |
|
|
key_highlights: list[str] |
|
|
risks: list[str] |
|
|
next_focus: list[str] |
|
|
|
|
|
def to_dict(self) -> dict: |
|
|
return { |
|
|
"headline": self.headline, |
|
|
"threat_level": self.threat_level, |
|
|
"key_highlights": self.key_highlights, |
|
|
"risks": self.risks, |
|
|
"next_focus": self.next_focus, |
|
|
} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AfterActionReport: |
|
|
"""Structured after-action report summarizing mission outcome.""" |
|
|
summary: str |
|
|
strengths: list[str] |
|
|
improvements: list[str] |
|
|
next_actions: list[str] |
|
|
outcome: str = "" |
|
|
error: Optional[str] = None |
|
|
charts: dict = field(default_factory=dict) |
|
|
player_actions: dict = field(default_factory=dict) |
|
|
|
|
|
def to_dict(self) -> dict: |
|
|
return { |
|
|
"summary": self.summary, |
|
|
"strengths": self.strengths, |
|
|
"improvements": self.improvements, |
|
|
"next_actions": self.next_actions, |
|
|
"outcome": self.outcome, |
|
|
"error": self.error, |
|
|
"charts": self.charts, |
|
|
"player_actions": self.player_actions, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AdvisorAgent: |
|
|
""" |
|
|
Multi-stage LLM-based advisor agent that analyzes world state and provides recommendations. |
|
|
|
|
|
Stage 1: ASSESS - Analyze the situation |
|
|
Stage 2: PLAN - Decide strategy |
|
|
Stage 3: EXECUTE - Generate specific actions |
|
|
|
|
|
All analysis is performed by the AI model - no fallback logic. |
|
|
Uses HuggingFace Inference Provider API. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
api_key: Optional[str] = None, |
|
|
model: Optional[str] = None, |
|
|
provider: str = "hf", |
|
|
base_url: Optional[str] = None, |
|
|
): |
|
|
""" |
|
|
Initialize the advisor agent. |
|
|
|
|
|
Args: |
|
|
api_key: Optional override for provider API key |
|
|
model: Model to use for inference (defaults depend on provider) |
|
|
provider: "hf" for HuggingFace Router or "openai" for native OpenAI |
|
|
base_url: Optional override for API base URL |
|
|
""" |
|
|
self.provider = (provider or "hf").lower() |
|
|
if self.provider not in {"hf", "openai"}: |
|
|
self.provider = "hf" |
|
|
|
|
|
if self.provider == "openai": |
|
|
self.api_key = api_key or get_openai_api_key() |
|
|
self.base_url = base_url or OPENAI_BASE_URL |
|
|
else: |
|
|
self.api_key = api_key or get_hf_token() |
|
|
self.base_url = base_url or HF_INFERENCE_BASE_URL |
|
|
|
|
|
|
|
|
model_config = PROMPTS_CONFIG.get("model", {}) |
|
|
|
|
|
|
|
|
yaml_model = model_config.get("default") if self.provider == "hf" else None |
|
|
provider_default_model = OPENAI_DEFAULT_MODEL if self.provider == "openai" else HF_DEFAULT_MODEL |
|
|
self.model = model or yaml_model or provider_default_model |
|
|
|
|
|
self.temperature = model_config.get("temperature") |
|
|
self.max_completion_tokens = model_config.get("max_completion_tokens", 2000) |
|
|
|
|
|
|
|
|
if self.api_key: |
|
|
provider_label = "OpenAI" if self.provider == "openai" else "HuggingFace Inference" |
|
|
print(f"π€ AI Advisor initialized with {provider_label} API, model: {self.model}") |
|
|
self.client = OpenAI( |
|
|
api_key=self.api_key, |
|
|
base_url=self.base_url |
|
|
) |
|
|
else: |
|
|
self.client = None |
|
|
missing_env = OPENAI_API_ENV_VAR if self.provider == "openai" else "HF_TOKEN" |
|
|
print(f"β οΈ Warning: Missing {missing_env}. AI analysis will not work.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _try_repair_json(self, content: str) -> Optional[dict]: |
|
|
""" |
|
|
Attempt to repair truncated JSON by closing open brackets/braces. |
|
|
|
|
|
Returns: |
|
|
Repaired JSON dict, or None if repair failed |
|
|
""" |
|
|
|
|
|
open_braces = content.count('{') - content.count('}') |
|
|
open_brackets = content.count('[') - content.count(']') |
|
|
|
|
|
|
|
|
in_string = False |
|
|
i = 0 |
|
|
while i < len(content): |
|
|
if content[i] == '"' and (i == 0 or content[i-1] != '\\'): |
|
|
in_string = not in_string |
|
|
i += 1 |
|
|
|
|
|
repaired = content |
|
|
|
|
|
|
|
|
if in_string: |
|
|
repaired += '"' |
|
|
|
|
|
|
|
|
|
|
|
repaired += ']' * open_brackets |
|
|
repaired += '}' * open_braces |
|
|
|
|
|
try: |
|
|
return json.loads(repaired) |
|
|
except json.JSONDecodeError: |
|
|
|
|
|
try: |
|
|
|
|
|
for end_pos in range(len(content), 0, -1): |
|
|
partial = content[:end_pos] |
|
|
open_b = partial.count('{') - partial.count('}') |
|
|
open_br = partial.count('[') - partial.count(']') |
|
|
attempt = partial + ']' * open_br + '}' * open_b |
|
|
try: |
|
|
return json.loads(attempt) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
except Exception: |
|
|
pass |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _call_llm(self, system_prompt: str, user_message: str) -> Optional[dict]: |
|
|
""" |
|
|
Make an LLM API call to HuggingFace Inference Provider and parse JSON response. |
|
|
|
|
|
Returns: |
|
|
Parsed JSON dict, or None if failed |
|
|
""" |
|
|
if not self.client: |
|
|
missing_env = OPENAI_API_ENV_VAR if self.provider == "openai" else "HF_TOKEN" |
|
|
print(f"Error: No API client available ({missing_env} not set)") |
|
|
return None |
|
|
|
|
|
|
|
|
max_retries = 3 |
|
|
retry_delay = 5 |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
|
|
|
token_param = "max_completion_tokens" if self.provider == "openai" else "max_tokens" |
|
|
api_params = { |
|
|
"model": self.model, |
|
|
"messages": [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": user_message} |
|
|
], |
|
|
"response_format": {"type": "json_object"} |
|
|
} |
|
|
api_params[token_param] = self.max_completion_tokens |
|
|
|
|
|
|
|
|
if self.temperature is not None: |
|
|
api_params["temperature"] = self.temperature |
|
|
|
|
|
response = self.client.chat.completions.create(**api_params) |
|
|
|
|
|
content = response.choices[0].message.content |
|
|
finish_reason = response.choices[0].finish_reason |
|
|
|
|
|
|
|
|
if finish_reason == "length": |
|
|
print(f"β οΈ Warning: Response was truncated (hit token limit)") |
|
|
|
|
|
if content: |
|
|
|
|
|
|
|
|
content = content.strip() |
|
|
if content.startswith("```json"): |
|
|
content = content[7:] |
|
|
if content.startswith("```"): |
|
|
content = content[3:] |
|
|
if content.endswith("```"): |
|
|
content = content[:-3] |
|
|
content = content.strip() |
|
|
|
|
|
try: |
|
|
return json.loads(content) |
|
|
except json.JSONDecodeError as e: |
|
|
|
|
|
print(f"β οΈ JSON parse error: {e}") |
|
|
print(f" Raw content (first 500 chars): {content[:500]}...") |
|
|
repaired = self._try_repair_json(content) |
|
|
if repaired: |
|
|
print("β
Successfully repaired truncated JSON") |
|
|
return repaired |
|
|
return None |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
error_str = str(e) |
|
|
|
|
|
if "429" in error_str or "too_many_requests" in error_str.lower(): |
|
|
if attempt < max_retries - 1: |
|
|
print(f"β οΈ Rate limited (429), retrying in {retry_delay}s... (attempt {attempt + 1}/{max_retries})") |
|
|
import time |
|
|
time.sleep(retry_delay) |
|
|
continue |
|
|
else: |
|
|
print(f"β Rate limited after {max_retries} attempts") |
|
|
return None |
|
|
else: |
|
|
print(f"LLM call error: {e}") |
|
|
return None |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def assess(self, world_state: dict) -> AssessmentResult: |
|
|
""" |
|
|
Stage 1: Assess the current situation using AI. |
|
|
""" |
|
|
assess_config = PROMPTS_CONFIG.get("assess", {}) |
|
|
system = assess_config.get("system", "") |
|
|
output_format = assess_config.get("output_format", "") |
|
|
|
|
|
fires = world_state.get("fires", []) |
|
|
units = world_state.get("units", []) |
|
|
buildings = world_state.get("buildings", []) |
|
|
building_integrity = world_state.get("building_integrity", 1.0) |
|
|
max_units = world_state.get("max_units", 10) |
|
|
|
|
|
|
|
|
user_message = f"""Current World State: |
|
|
- Grid size: {world_state.get("width", 10)}x{world_state.get("height", 10)} |
|
|
- Building Integrity: {building_integrity:.1%} |
|
|
- Max Units Allowed: {max_units} |
|
|
|
|
|
FIRES ({len(fires)} total): |
|
|
{json.dumps(fires, indent=2)} |
|
|
|
|
|
UNITS ({len(units)} deployed): |
|
|
{json.dumps(units, indent=2)} |
|
|
|
|
|
BUILDINGS ({len(buildings)} total): |
|
|
{json.dumps(buildings[:20], indent=2)}{" (showing first 20)" if len(buildings) > 20 else ""} |
|
|
|
|
|
Remember: |
|
|
- Fire Truck effective range: covers 1 tile outward from its center |
|
|
- Helicopter effective range: covers 2 tiles outward from its center |
|
|
- A fire is UNCOVERED if no unit is within range |
|
|
- A unit is INEFFECTIVE if no fire is within its range |
|
|
|
|
|
Output format: |
|
|
{output_format}""" |
|
|
|
|
|
full_prompt = system + "\n\n" + output_format |
|
|
response = self._call_llm(full_prompt, user_message) |
|
|
|
|
|
if not response: |
|
|
|
|
|
return AssessmentResult( |
|
|
fire_count=len(fires), |
|
|
high_intensity_fires=[f for f in fires if f.get("intensity", 0) > 0.7], |
|
|
building_threats=[], |
|
|
uncovered_fires=fires[:], |
|
|
unit_count=len(units), |
|
|
max_units=max_units, |
|
|
effective_units=[], |
|
|
ineffective_units=units[:], |
|
|
coverage_ratio=0.0, |
|
|
threat_level="HIGH", |
|
|
summary="AI assessment unavailable - assuming high threat", |
|
|
building_integrity=building_integrity |
|
|
) |
|
|
|
|
|
try: |
|
|
fire_analysis = response.get("fire_analysis", {}) |
|
|
unit_analysis = response.get("unit_analysis", {}) |
|
|
|
|
|
|
|
|
uncovered_positions = fire_analysis.get("uncovered_fire_positions", []) |
|
|
uncovered_fires = [] |
|
|
for pos in uncovered_positions: |
|
|
if isinstance(pos, list) and len(pos) >= 2: |
|
|
for f in fires: |
|
|
if f["x"] == pos[0] and f["y"] == pos[1]: |
|
|
uncovered_fires.append(f) |
|
|
break |
|
|
|
|
|
|
|
|
high_positions = fire_analysis.get("high_intensity_positions", []) |
|
|
high_intensity_fires = [] |
|
|
for pos in high_positions: |
|
|
if isinstance(pos, list) and len(pos) >= 2: |
|
|
for f in fires: |
|
|
if f["x"] == pos[0] and f["y"] == pos[1]: |
|
|
high_intensity_fires.append(f) |
|
|
break |
|
|
|
|
|
if not high_intensity_fires: |
|
|
high_intensity_fires = [f for f in fires if f.get("intensity", 0) > 0.7] |
|
|
|
|
|
|
|
|
building_threat_positions = fire_analysis.get("building_threat_positions", []) |
|
|
building_threats = [] |
|
|
for pos in building_threat_positions: |
|
|
if isinstance(pos, list) and len(pos) >= 2: |
|
|
for f in fires: |
|
|
if f["x"] == pos[0] and f["y"] == pos[1]: |
|
|
building_threats.append(f) |
|
|
break |
|
|
|
|
|
|
|
|
ineffective_positions = unit_analysis.get("ineffective_positions", []) |
|
|
ineffective_units = [] |
|
|
for pos in ineffective_positions: |
|
|
if isinstance(pos, list) and len(pos) >= 2: |
|
|
for u in units: |
|
|
if u["x"] == pos[0] and u["y"] == pos[1]: |
|
|
ineffective_units.append(u) |
|
|
break |
|
|
|
|
|
|
|
|
ineffective_set = set((u["x"], u["y"]) for u in ineffective_units) |
|
|
effective_units = [u for u in units if (u["x"], u["y"]) not in ineffective_set] |
|
|
|
|
|
return AssessmentResult( |
|
|
fire_count=fire_analysis.get("total_fires", len(fires)), |
|
|
high_intensity_fires=high_intensity_fires, |
|
|
building_threats=building_threats, |
|
|
uncovered_fires=uncovered_fires, |
|
|
unit_count=len(units), |
|
|
max_units=max_units, |
|
|
effective_units=effective_units, |
|
|
ineffective_units=ineffective_units, |
|
|
coverage_ratio=unit_analysis.get("coverage_ratio", 0.0), |
|
|
threat_level=response.get("threat_level", "MODERATE"), |
|
|
summary=response.get("summary", ""), |
|
|
building_integrity=building_integrity |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Error parsing AI assessment: {e}") |
|
|
return AssessmentResult( |
|
|
fire_count=len(fires), |
|
|
high_intensity_fires=[], |
|
|
building_threats=[], |
|
|
uncovered_fires=fires[:], |
|
|
unit_count=len(units), |
|
|
max_units=max_units, |
|
|
effective_units=[], |
|
|
ineffective_units=units[:], |
|
|
coverage_ratio=0.0, |
|
|
threat_level="HIGH", |
|
|
summary=f"Assessment parse error: {e}", |
|
|
building_integrity=building_integrity |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def plan(self, world_state: dict, assessment: AssessmentResult) -> PlanResult: |
|
|
""" |
|
|
Stage 2: Decide the strategy based on assessment using AI. |
|
|
""" |
|
|
plan_config = PROMPTS_CONFIG.get("plan", {}) |
|
|
system = plan_config.get("system", "") |
|
|
output_format = plan_config.get("output_format", "") |
|
|
|
|
|
fires = world_state.get("fires", []) |
|
|
available_slots = assessment.max_units - assessment.unit_count |
|
|
|
|
|
|
|
|
buildings = world_state.get("buildings", []) |
|
|
building_positions = set((b["x"], b["y"]) for b in buildings) |
|
|
|
|
|
def fire_priority(f): |
|
|
near_building = any( |
|
|
abs(f["x"] - bx) <= 1 and abs(f["y"] - by) <= 1 |
|
|
for bx, by in building_positions |
|
|
) |
|
|
return f.get("intensity", 0) + (1.0 if near_building else 0) |
|
|
|
|
|
priority_targets = sorted(fires, key=fire_priority, reverse=True) |
|
|
|
|
|
user_message = f"""Assessment Result: |
|
|
- Threat Level: {assessment.threat_level} |
|
|
- Fire Count: {assessment.fire_count} |
|
|
- High Intensity Fires: {len(assessment.high_intensity_fires)} |
|
|
- Building Threats: {len(assessment.building_threats)} |
|
|
- UNCOVERED Fires (no unit in range): {len(assessment.uncovered_fires)} |
|
|
- Coverage Ratio: {assessment.coverage_ratio:.1%} |
|
|
- Effective Units: {len(assessment.effective_units)} |
|
|
- INEFFECTIVE Units (not near any fire): {len(assessment.ineffective_units)} |
|
|
- Summary: {assessment.summary} |
|
|
|
|
|
Current Resources: |
|
|
- Units deployed: {assessment.unit_count} / {assessment.max_units} |
|
|
- Available slots: {available_slots} |
|
|
|
|
|
UNCOVERED Fires (PRIORITY - these need coverage!): |
|
|
{json.dumps([{"x": f["x"], "y": f["y"], "intensity": f["intensity"]} for f in assessment.uncovered_fires[:5]], indent=2)} |
|
|
|
|
|
INEFFECTIVE Units (SHOULD BE MOVED to cover fires!): |
|
|
{json.dumps([{"x": u["x"], "y": u["y"], "type": u["type"]} for u in assessment.ineffective_units], indent=2)} |
|
|
|
|
|
Priority Fires (top 5): |
|
|
{json.dumps([{"x": f["x"], "y": f["y"], "intensity": f["intensity"]} for f in priority_targets[:5]], indent=2)} |
|
|
|
|
|
REMEMBER: If there are uncovered fires AND ineffective units, you SHOULD reposition those units! |
|
|
|
|
|
Output format: |
|
|
{output_format}""" |
|
|
|
|
|
response = self._call_llm(system, user_message) |
|
|
|
|
|
if not response: |
|
|
|
|
|
uncovered_count = len(assessment.uncovered_fires) |
|
|
idle_count = len(assessment.ineffective_units) |
|
|
fires_after_reposition = max(0, uncovered_count - idle_count) |
|
|
building_threats = len(assessment.building_threats) |
|
|
|
|
|
|
|
|
if building_threats > 0: |
|
|
|
|
|
smart_deploy = max(building_threats, fires_after_reposition) |
|
|
elif uncovered_count <= 2: |
|
|
|
|
|
smart_deploy = fires_after_reposition |
|
|
elif uncovered_count <= 5: |
|
|
|
|
|
smart_deploy = fires_after_reposition + 1 |
|
|
else: |
|
|
|
|
|
smart_deploy = fires_after_reposition + 2 |
|
|
|
|
|
smart_deploy = min(smart_deploy, available_slots) |
|
|
|
|
|
return PlanResult( |
|
|
strategy="balanced" if assessment.ineffective_units else "deploy_new", |
|
|
reasoning=f"AI planning unavailable - smart fallback: {uncovered_count} uncovered fires, repositioning {idle_count} idle units, deploying {smart_deploy} new", |
|
|
deploy_count=smart_deploy, |
|
|
reposition_units=assessment.ineffective_units[:], |
|
|
priority_targets=priority_targets[:5] |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
reposition_data = response.get("units_to_reposition", []) |
|
|
reposition_units = [] |
|
|
for item in reposition_data: |
|
|
if isinstance(item, list) and len(item) >= 3: |
|
|
sx, sy, utype = item[0], item[1], item[2] |
|
|
for u in assessment.ineffective_units: |
|
|
if u["x"] == sx and u["y"] == sy: |
|
|
reposition_units.append(u) |
|
|
break |
|
|
|
|
|
|
|
|
if response.get("reposition_needed", False) and not reposition_units: |
|
|
reposition_units = assessment.ineffective_units[:] |
|
|
|
|
|
|
|
|
priority_indices = response.get("priority_fire_indices", [0, 1, 2]) |
|
|
selected_targets = [] |
|
|
for i in priority_indices: |
|
|
if isinstance(i, int) and i < len(priority_targets): |
|
|
selected_targets.append(priority_targets[i]) |
|
|
|
|
|
if not selected_targets: |
|
|
selected_targets = priority_targets[:5] |
|
|
|
|
|
return PlanResult( |
|
|
strategy=response.get("strategy", "balanced"), |
|
|
reasoning=response.get("reasoning", ""), |
|
|
deploy_count=response.get("deploy_count", 0), |
|
|
reposition_units=reposition_units, |
|
|
priority_targets=selected_targets |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Error parsing AI plan: {e}") |
|
|
|
|
|
uncovered_count = len(assessment.uncovered_fires) |
|
|
idle_count = len(assessment.ineffective_units) |
|
|
fires_after_reposition = max(0, uncovered_count - idle_count) |
|
|
building_threats = len(assessment.building_threats) |
|
|
|
|
|
|
|
|
if building_threats > 0: |
|
|
smart_deploy = max(building_threats, fires_after_reposition) |
|
|
elif uncovered_count <= 2: |
|
|
smart_deploy = fires_after_reposition |
|
|
else: |
|
|
smart_deploy = fires_after_reposition + 1 |
|
|
|
|
|
smart_deploy = min(smart_deploy, available_slots) |
|
|
|
|
|
return PlanResult( |
|
|
strategy="balanced", |
|
|
reasoning=f"Plan parse error: {e} - using smart fallback", |
|
|
deploy_count=smart_deploy, |
|
|
reposition_units=assessment.ineffective_units[:], |
|
|
priority_targets=priority_targets[:5] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def execute( |
|
|
self, |
|
|
world_state: dict, |
|
|
assessment: AssessmentResult, |
|
|
plan: PlanResult |
|
|
) -> list[Recommendation]: |
|
|
""" |
|
|
Stage 3: Generate specific deployment/move recommendations using AI. |
|
|
""" |
|
|
|
|
|
if plan.strategy == "monitor": |
|
|
return [] |
|
|
|
|
|
execute_config = PROMPTS_CONFIG.get("execute", {}) |
|
|
system = execute_config.get("system", "") |
|
|
output_format = execute_config.get("output_format", "") |
|
|
|
|
|
fires = world_state.get("fires", []) |
|
|
units = world_state.get("units", []) |
|
|
buildings = world_state.get("buildings", []) |
|
|
width = world_state.get("width", 10) |
|
|
height = world_state.get("height", 10) |
|
|
|
|
|
user_message = f"""Assessment: |
|
|
- Threat Level: {assessment.threat_level} |
|
|
- Summary: {assessment.summary} |
|
|
- Uncovered Fires: {len(assessment.uncovered_fires)} |
|
|
- Effective Units: {len(assessment.effective_units)} |
|
|
- Ineffective Units: {len(assessment.ineffective_units)} |
|
|
|
|
|
Plan: |
|
|
- Strategy: {plan.strategy} |
|
|
- Reasoning: {plan.reasoning} |
|
|
- Deploy Count: {plan.deploy_count} |
|
|
- Reposition Needed: {len(plan.reposition_units) > 0} |
|
|
|
|
|
World State: |
|
|
- Grid: {width}x{height} |
|
|
|
|
|
UNCOVERED FIRES (PRIORITY TARGETS - these need units!): |
|
|
{json.dumps([{"x": f["x"], "y": f["y"], "intensity": f["intensity"]} for f in assessment.uncovered_fires[:5]], indent=2)} |
|
|
|
|
|
INEFFECTIVE UNITS (MOVE THESE to uncovered fires!): |
|
|
{json.dumps([{"x": u["x"], "y": u["y"], "type": u["type"]} for u in plan.reposition_units[:5]], indent=2)} |
|
|
|
|
|
All Fire positions: {json.dumps([(f["x"], f["y"], round(f["intensity"], 2)) for f in fires[:15]])} |
|
|
All Unit positions: {json.dumps([(u["x"], u["y"], u["type"]) for u in units])} |
|
|
Building positions: {json.dumps([(b["x"], b["y"]) for b in buildings[:15]])} |
|
|
|
|
|
INSTRUCTIONS: |
|
|
1. FIRST generate MOVE actions for ineffective units β move them to uncovered fires |
|
|
2. THEN generate DEPLOY actions if more units needed |
|
|
3. Max {MAX_RECOMMENDATIONS} recommendations total |
|
|
4. Remember: deploy ADJACENT to fire (1-2 cells away), not ON the fire |
|
|
|
|
|
Output format: |
|
|
{output_format}""" |
|
|
|
|
|
response = self._call_llm(system, user_message) |
|
|
|
|
|
if not response: |
|
|
|
|
|
return self._generate_fallback_recommendations(world_state, assessment, plan) |
|
|
|
|
|
try: |
|
|
recommendations = [] |
|
|
raw_recs = response.get("recommendations", []) |
|
|
|
|
|
|
|
|
fire_positions = set((f["x"], f["y"]) for f in fires) |
|
|
unit_positions = set((u["x"], u["y"]) for u in units) |
|
|
building_positions = set((b["x"], b["y"]) for b in buildings) |
|
|
used_positions = set() |
|
|
|
|
|
for rec in raw_recs[:MAX_RECOMMENDATIONS]: |
|
|
action = rec.get("action", "deploy") |
|
|
unit_type = rec.get("unit_type", "fire_truck") |
|
|
target = rec.get("target", {}) |
|
|
target_x = target.get("x", 0) |
|
|
target_y = target.get("y", 0) |
|
|
reason = rec.get("reason", "AI recommendation") |
|
|
|
|
|
|
|
|
pos = (target_x, target_y) |
|
|
if pos in fire_positions or pos in building_positions or pos in used_positions: |
|
|
|
|
|
valid_pos = self._find_deploy_position( |
|
|
target_x, target_y, world_state, |
|
|
exclude_positions=used_positions | unit_positions |
|
|
) |
|
|
if valid_pos: |
|
|
target_x, target_y = valid_pos |
|
|
else: |
|
|
continue |
|
|
|
|
|
used_positions.add((target_x, target_y)) |
|
|
|
|
|
if action == "move": |
|
|
source = rec.get("source", {}) |
|
|
source_x = source.get("x", -1) |
|
|
source_y = source.get("y", -1) |
|
|
|
|
|
|
|
|
if (source_x, source_y) not in unit_positions: |
|
|
continue |
|
|
|
|
|
recommendations.append(Recommendation( |
|
|
reason=reason, |
|
|
suggested_unit_type=unit_type, |
|
|
target_x=target_x, |
|
|
target_y=target_y, |
|
|
action="move", |
|
|
source_x=source_x, |
|
|
source_y=source_y |
|
|
)) |
|
|
elif action == "remove": |
|
|
|
|
|
position = rec.get("position", {}) |
|
|
pos_x = position.get("x", target_x) |
|
|
pos_y = position.get("y", target_y) |
|
|
unit_type = rec.get("unit_type", "fire_truck") |
|
|
|
|
|
|
|
|
if (pos_x, pos_y) not in unit_positions: |
|
|
continue |
|
|
|
|
|
recommendations.append(Recommendation( |
|
|
reason=reason, |
|
|
suggested_unit_type=unit_type, |
|
|
target_x=pos_x, |
|
|
target_y=pos_y, |
|
|
action="remove" |
|
|
)) |
|
|
else: |
|
|
recommendations.append(Recommendation( |
|
|
reason=reason, |
|
|
suggested_unit_type=unit_type, |
|
|
target_x=target_x, |
|
|
target_y=target_y, |
|
|
action="deploy" |
|
|
)) |
|
|
|
|
|
return recommendations |
|
|
except Exception as e: |
|
|
print(f"Error parsing AI execution: {e}") |
|
|
return self._generate_fallback_recommendations(world_state, assessment, plan) |
|
|
|
|
|
def summarize( |
|
|
self, |
|
|
world_state: dict, |
|
|
assessment: AssessmentResult, |
|
|
plan: PlanResult, |
|
|
recommendations: list[Recommendation], |
|
|
advisor_response: AdvisorResponse, |
|
|
) -> CycleSummary: |
|
|
"""Stage 4: Summarize the cycle results using AI.""" |
|
|
summary_config = PROMPTS_CONFIG.get("summary", {}) |
|
|
system = summary_config.get("system", "") |
|
|
output_format = summary_config.get("output_format", "") |
|
|
|
|
|
fires = world_state.get("fires", []) |
|
|
units = world_state.get("units", []) |
|
|
tick = world_state.get("tick", 0) |
|
|
status = world_state.get("status", "running") |
|
|
|
|
|
rec_blocks = [] |
|
|
for idx, rec in enumerate(recommendations, 1): |
|
|
block = { |
|
|
"index": idx, |
|
|
"action": rec.action, |
|
|
"unit_type": rec.suggested_unit_type, |
|
|
"target": {"x": rec.target_x, "y": rec.target_y}, |
|
|
"source": {"x": rec.source_x, "y": rec.source_y} if rec.action == "move" else None, |
|
|
"reason": rec.reason, |
|
|
} |
|
|
rec_blocks.append(block) |
|
|
|
|
|
user_message = f"""Tick: {tick} | Status: {status} |
|
|
Threat Level: {assessment.threat_level} | Building Integrity: {assessment.building_integrity:.0%} |
|
|
Fires: {assessment.fire_count} | Uncovered Fires: {len(assessment.uncovered_fires)} |
|
|
Idle Units: {len(assessment.ineffective_units)} | Total Units: {assessment.unit_count}/{assessment.max_units} |
|
|
|
|
|
Stage 1 Summary: |
|
|
{assessment.summary} |
|
|
|
|
|
Stage 2 Strategy: |
|
|
- Strategy: {plan.strategy} |
|
|
- Reasoning: {plan.reasoning} |
|
|
- Deploy Count: {plan.deploy_count} |
|
|
- Reposition Units: {len(plan.reposition_units)} |
|
|
|
|
|
Stage 3 Recommendations: |
|
|
{json.dumps(rec_blocks[:5], indent=2)} |
|
|
|
|
|
World Snapshot (first 5 fires / units): |
|
|
Fires -> {json.dumps(fires[:5], indent=2)} |
|
|
Units -> {json.dumps(units[:5], indent=2)} |
|
|
|
|
|
OUTPUT FORMAT: |
|
|
{output_format} |
|
|
""" |
|
|
|
|
|
response = self._call_llm(system, user_message) |
|
|
if not response: |
|
|
return CycleSummary( |
|
|
headline=advisor_response.summary if advisor_response else "Cycle summary unavailable", |
|
|
threat_level=assessment.threat_level, |
|
|
key_highlights=[advisor_response.analysis or "Analysis unavailable."], |
|
|
risks=["Summary model unavailable."], |
|
|
next_focus=["Review building-adjacent fires manually."], |
|
|
) |
|
|
|
|
|
def _coerce_items(value, fallback): |
|
|
if isinstance(value, list): |
|
|
cleaned = [str(item).strip() for item in value if str(item).strip()] |
|
|
return cleaned or fallback |
|
|
if isinstance(value, str) and value.strip(): |
|
|
return [value.strip()] |
|
|
return fallback |
|
|
|
|
|
headline = str(response.get("headline", advisor_response.summary if advisor_response else "Cycle summary")).strip() |
|
|
threat_level = str(response.get("threat_level", assessment.threat_level or "MODERATE")).strip() |
|
|
key_highlights = _coerce_items(response.get("key_highlights"), [advisor_response.analysis or "Highlights unavailable."]) |
|
|
risks = _coerce_items(response.get("risks"), ["No risks provided."]) |
|
|
next_focus = _coerce_items(response.get("next_focus"), ["Maintain coverage on building threats."]) |
|
|
|
|
|
return CycleSummary( |
|
|
headline=headline or "Cycle summary", |
|
|
threat_level=threat_level or (assessment.threat_level or "MODERATE"), |
|
|
key_highlights=key_highlights, |
|
|
risks=risks, |
|
|
next_focus=next_focus, |
|
|
) |
|
|
|
|
|
def _generate_fallback_recommendations( |
|
|
self, |
|
|
world_state: dict, |
|
|
assessment: AssessmentResult, |
|
|
plan: PlanResult |
|
|
) -> list[Recommendation]: |
|
|
"""Generate SMART recommendations when AI fails - prioritize buildings, deploy efficiently!""" |
|
|
recommendations = [] |
|
|
units = world_state.get("units", []) |
|
|
buildings = world_state.get("buildings", []) |
|
|
unit_positions = set((u["x"], u["y"]) for u in units) |
|
|
building_positions = set((b["x"], b["y"]) for b in buildings) |
|
|
used_positions = set() |
|
|
|
|
|
|
|
|
def threatens_building(fire): |
|
|
for bx, by in building_positions: |
|
|
if abs(fire["x"] - bx) + abs(fire["y"] - by) <= 2: |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
priority_fires = sorted( |
|
|
assessment.uncovered_fires, |
|
|
key=lambda f: (-int(threatens_building(f)), -f.get("intensity", 0)) |
|
|
) |
|
|
|
|
|
|
|
|
building_threat_count = sum(1 for f in priority_fires if threatens_building(f)) |
|
|
|
|
|
|
|
|
for i, unit in enumerate(plan.reposition_units): |
|
|
if i >= len(priority_fires): |
|
|
break |
|
|
|
|
|
target_fire = priority_fires[i] |
|
|
deploy_pos = self._find_deploy_position( |
|
|
target_fire["x"], target_fire["y"], world_state, |
|
|
exclude_positions=used_positions | unit_positions - {(unit["x"], unit["y"])} |
|
|
) |
|
|
|
|
|
if deploy_pos: |
|
|
used_positions.add(deploy_pos) |
|
|
is_building_threat = threatens_building(target_fire) |
|
|
recommendations.append(Recommendation( |
|
|
reason=f"{'π’ BUILDING THREAT! ' if is_building_threat else ''}Move to cover fire at ({target_fire['x']}, {target_fire['y']})", |
|
|
suggested_unit_type=unit["type"], |
|
|
target_x=deploy_pos[0], |
|
|
target_y=deploy_pos[1], |
|
|
action="move", |
|
|
source_x=unit["x"], |
|
|
source_y=unit["y"] |
|
|
)) |
|
|
|
|
|
|
|
|
available_slots = assessment.max_units - assessment.unit_count |
|
|
remaining_fires = priority_fires[len(recommendations):] |
|
|
remaining_building_threats = sum(1 for f in remaining_fires if threatens_building(f)) |
|
|
|
|
|
|
|
|
if remaining_building_threats > 0: |
|
|
|
|
|
smart_deploy_count = max(remaining_building_threats, min(len(remaining_fires), available_slots)) |
|
|
elif len(remaining_fires) <= 2: |
|
|
|
|
|
smart_deploy_count = len(remaining_fires) |
|
|
elif len(remaining_fires) <= 5: |
|
|
|
|
|
smart_deploy_count = min(len(remaining_fires) + 1, available_slots) |
|
|
else: |
|
|
|
|
|
smart_deploy_count = min(len(remaining_fires), available_slots) |
|
|
|
|
|
|
|
|
for i, fire in enumerate(remaining_fires[:smart_deploy_count]): |
|
|
deploy_pos = self._find_deploy_position( |
|
|
fire["x"], fire["y"], world_state, |
|
|
exclude_positions=used_positions | unit_positions |
|
|
) |
|
|
|
|
|
if deploy_pos: |
|
|
used_positions.add(deploy_pos) |
|
|
is_building_threat = threatens_building(fire) |
|
|
|
|
|
unit_type = "fire_truck" if is_building_threat or fire.get("intensity", 0) > 0.5 else "helicopter" |
|
|
recommendations.append(Recommendation( |
|
|
reason=f"{'π’ BUILDING THREAT! ' if is_building_threat else ''}Deploy to cover fire at ({fire['x']}, {fire['y']})", |
|
|
suggested_unit_type=unit_type, |
|
|
target_x=deploy_pos[0], |
|
|
target_y=deploy_pos[1], |
|
|
action="deploy" |
|
|
)) |
|
|
|
|
|
return self._prioritize_recommendations( |
|
|
recommendations, |
|
|
plan.deploy_count, |
|
|
smart_deploy_count, |
|
|
) |
|
|
|
|
|
def _prioritize_recommendations( |
|
|
self, |
|
|
recommendations: list[Recommendation], |
|
|
plan_deploy_target: int, |
|
|
smart_deploy_target: int, |
|
|
max_actions: int = MAX_RECOMMENDATIONS, |
|
|
) -> list[Recommendation]: |
|
|
""" |
|
|
Ensure we return a balanced mix of move/deploy actions without exceeding UI limits. |
|
|
""" |
|
|
if len(recommendations) <= max_actions: |
|
|
return recommendations |
|
|
|
|
|
deploy_recs = [rec for rec in recommendations if rec.action == "deploy"] |
|
|
move_recs = [rec for rec in recommendations if rec.action == "move"] |
|
|
other_recs = [rec for rec in recommendations if rec.action not in ("deploy", "move")] |
|
|
|
|
|
deploy_priority = 0 |
|
|
if deploy_recs: |
|
|
deploy_priority = max(plan_deploy_target, smart_deploy_target, 1) |
|
|
move_priority = len(move_recs) |
|
|
other_priority = len(other_recs) |
|
|
|
|
|
priority_pairs = [] |
|
|
if deploy_priority: |
|
|
priority_pairs.append(("deploy", deploy_priority)) |
|
|
if move_priority: |
|
|
priority_pairs.append(("move", move_priority)) |
|
|
if other_priority: |
|
|
priority_pairs.append(("other", other_priority)) |
|
|
|
|
|
if not priority_pairs: |
|
|
return recommendations[:max_actions] |
|
|
|
|
|
priority_pairs.sort(key=lambda item: item[1], reverse=True) |
|
|
ordered_types = [ptype for ptype, _ in priority_pairs] |
|
|
|
|
|
|
|
|
for action_type in ("deploy", "move", "other"): |
|
|
if action_type not in ordered_types: |
|
|
ordered_types.append(action_type) |
|
|
|
|
|
pools = {"deploy": deploy_recs, "move": move_recs, "other": other_recs} |
|
|
indices = {key: 0 for key in pools} |
|
|
selected: list[Recommendation] = [] |
|
|
|
|
|
while len(selected) < max_actions: |
|
|
added = False |
|
|
for action_type in ordered_types: |
|
|
pool = pools[action_type] |
|
|
idx = indices[action_type] |
|
|
if idx >= len(pool): |
|
|
continue |
|
|
selected.append(pool[idx]) |
|
|
indices[action_type] += 1 |
|
|
added = True |
|
|
if len(selected) >= max_actions: |
|
|
break |
|
|
if not added: |
|
|
break |
|
|
|
|
|
return selected |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_after_action_report(self, context: dict) -> AfterActionReport: |
|
|
""" |
|
|
Build an after-action report using Assessment / Planning / Execution transcripts. |
|
|
""" |
|
|
after_action_config = PROMPTS_CONFIG.get("after_action", {}) |
|
|
system = after_action_config.get("system", "") |
|
|
output_format = after_action_config.get("output_format", "") |
|
|
|
|
|
outcome = context.get("outcome", "unknown") |
|
|
default_summary = context.get("summary_text") or f"Mission outcome: {outcome}" |
|
|
report = AfterActionReport( |
|
|
summary=default_summary, |
|
|
strengths=[], |
|
|
improvements=[], |
|
|
next_actions=[], |
|
|
outcome=outcome, |
|
|
) |
|
|
report.charts = { |
|
|
"metrics": context.get("chart_points") or [], |
|
|
"threat_levels": context.get("threat_history") or [], |
|
|
"action_density": context.get("action_history") or [], |
|
|
} |
|
|
report.player_actions = context.get("player_actions_context") or {} |
|
|
|
|
|
if not self.client: |
|
|
report.error = "Missing HF_TOKEN β unable to generate AI after-action report." |
|
|
return report |
|
|
|
|
|
def _section(title: str, body: str) -> str: |
|
|
if not body: |
|
|
return f"{title}\n(no data available)\n" |
|
|
return f"{title}\n{body}\n" |
|
|
|
|
|
header_lines = [ |
|
|
f"Mission Outcome: {context.get('outcome_label', outcome)}", |
|
|
f"Tick: {context.get('tick', 0)}", |
|
|
f"Fires Remaining: {context.get('fires_remaining', 0)}", |
|
|
f"Units Active: {context.get('units_active', 0)}", |
|
|
f"Building Integrity: {context.get('building_integrity_percent', 'N/A')}", |
|
|
] |
|
|
|
|
|
mission_summary = context.get("summary_text", "") |
|
|
if mission_summary: |
|
|
header_lines.append(f"Mission Summary: {mission_summary}") |
|
|
|
|
|
cycle_summaries = context.get("cycle_summaries") or [] |
|
|
if cycle_summaries: |
|
|
summary_lines = [] |
|
|
for entry in cycle_summaries: |
|
|
tick = entry.get("tick", "?") |
|
|
headline = entry.get("headline", "No headline") |
|
|
threat = entry.get("threat_level", "N/A") |
|
|
highlights = entry.get("key_highlights") or [] |
|
|
risks = entry.get("risks") or [] |
|
|
next_focus = entry.get("next_focus") or [] |
|
|
block = [ |
|
|
f"- [Tick {tick}] {headline} (Threat: {threat})", |
|
|
] |
|
|
if highlights: |
|
|
block.append(" β’ Highlights: " + "; ".join(highlights)) |
|
|
if risks: |
|
|
block.append(" β’ Risks: " + "; ".join(risks)) |
|
|
if next_focus: |
|
|
block.append(" β’ Next Focus: " + "; ".join(next_focus)) |
|
|
summary_lines.append("\n".join(block)) |
|
|
history_block = "\n".join(summary_lines) |
|
|
else: |
|
|
history_block = "- No prior cycles captured." |
|
|
|
|
|
user_sections = [ |
|
|
"Mission Status Summary:", |
|
|
"\n".join(header_lines), |
|
|
"", |
|
|
_section("Stage 1 Β· Assessment", context.get("assessment_md", "")), |
|
|
_section("Stage 2 Β· Planning", context.get("planning_md", "")), |
|
|
_section("Stage 3 Β· Execution", context.get("execution_md", "")), |
|
|
_section("Player Manual Actions", context.get("player_actions_md", "")), |
|
|
"Historical Cycle Summaries:", |
|
|
history_block, |
|
|
] |
|
|
|
|
|
if output_format: |
|
|
user_sections.append("Please reply strictly using the JSON schema below:") |
|
|
user_sections.append(output_format) |
|
|
|
|
|
user_message = "\n\n".join(user_sections) |
|
|
|
|
|
response = self._call_llm(system or "You are a mission debrief analyst.", user_message) |
|
|
if not response: |
|
|
report.error = "Failed to retrieve AI response." |
|
|
return report |
|
|
|
|
|
def _coerce_list(value) -> list[str]: |
|
|
if isinstance(value, list): |
|
|
return [str(item).strip() for item in value if str(item).strip()] |
|
|
if isinstance(value, str) and value.strip(): |
|
|
return [value.strip()] |
|
|
return [] |
|
|
|
|
|
report.summary = str(response.get("summary", report.summary)).strip() or report.summary |
|
|
report.strengths = _coerce_list(response.get("strengths")) |
|
|
report.improvements = _coerce_list(response.get("improvements")) |
|
|
report.next_actions = _coerce_list(response.get("next_actions")) |
|
|
|
|
|
if not (report.strengths or report.improvements or report.next_actions): |
|
|
report.error = "AI response did not contain any usable sections." |
|
|
|
|
|
return report |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze(self, world_state: dict) -> AdvisorResponse: |
|
|
""" |
|
|
Main entry point: Run multi-stage analysis pipeline. |
|
|
|
|
|
Stage 1: Assessment - Analyze the situation |
|
|
Stage 2: Planning - Decide strategy |
|
|
Stage 3: Execution - Generate specific actions |
|
|
""" |
|
|
|
|
|
assessment = self.assess(world_state) |
|
|
|
|
|
|
|
|
plan = self.plan(world_state, assessment) |
|
|
|
|
|
|
|
|
recommendations = self.execute(world_state, assessment, plan) |
|
|
|
|
|
|
|
|
thinking_parts = [ |
|
|
f"π Scanning {assessment.fire_count} active fires...", |
|
|
] |
|
|
if assessment.uncovered_fires: |
|
|
thinking_parts.append(f"π¨ ALERT: {len(assessment.uncovered_fires)} fire(s) with NO coverage!") |
|
|
if assessment.building_threats: |
|
|
thinking_parts.append(f"π’ {len(assessment.building_threats)} fire(s) threatening buildings!") |
|
|
if assessment.ineffective_units: |
|
|
thinking_parts.append(f"π {len(assessment.ineffective_units)} idle unit(s) should be repositioned") |
|
|
thinking_parts.append(f"π― Strategy: {plan.strategy.upper()} - {plan.reasoning}") |
|
|
|
|
|
|
|
|
priority_emoji = { |
|
|
"CRITICAL": "π΄", |
|
|
"HIGH": "π ", |
|
|
"MODERATE": "π‘", |
|
|
"LOW": "π’" |
|
|
} |
|
|
emoji = priority_emoji.get(assessment.threat_level, "βͺ") |
|
|
|
|
|
if assessment.threat_level == "CRITICAL": |
|
|
summary = f"{emoji} CRITICAL: {assessment.summary}. Immediate action required!" |
|
|
elif assessment.threat_level == "HIGH": |
|
|
summary = f"{emoji} HIGH: {assessment.summary}. Rapid response needed." |
|
|
elif assessment.threat_level == "MODERATE": |
|
|
summary = f"{emoji} MODERATE: {assessment.summary}. Tactical deployment advised." |
|
|
else: |
|
|
summary = f"{emoji} LOW: {assessment.summary}. Monitoring situation." |
|
|
|
|
|
return AdvisorResponse( |
|
|
summary=summary, |
|
|
recommendations=recommendations, |
|
|
thinking="\n".join(thinking_parts), |
|
|
analysis=f"{assessment.fire_count} fires | {assessment.unit_count}/{assessment.max_units} units | {assessment.building_integrity:.0%} building integrity", |
|
|
priority=assessment.threat_level, |
|
|
assessment=assessment, |
|
|
plan=plan |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _find_deploy_position( |
|
|
self, |
|
|
fire_x: int, |
|
|
fire_y: int, |
|
|
world_state: dict, |
|
|
exclude_positions: set = None |
|
|
) -> tuple[int, int] | None: |
|
|
""" |
|
|
Find a valid deployment position adjacent to a fire. |
|
|
Units cannot deploy on burning cells, so we find the nearest empty cell. |
|
|
""" |
|
|
if exclude_positions is None: |
|
|
exclude_positions = set() |
|
|
|
|
|
fires = world_state.get("fires", []) |
|
|
units = world_state.get("units", []) |
|
|
buildings = world_state.get("buildings", []) |
|
|
width = world_state.get("width", 10) |
|
|
height = world_state.get("height", 10) |
|
|
|
|
|
fire_positions = set((f["x"], f["y"]) for f in fires) |
|
|
unit_positions = set((u["x"], u["y"]) for u in units) |
|
|
building_positions = set((b["x"], b["y"]) for b in buildings) |
|
|
|
|
|
|
|
|
for distance in [1, 2, 3]: |
|
|
candidates = [] |
|
|
for dx in range(-distance, distance + 1): |
|
|
for dy in range(-distance, distance + 1): |
|
|
if abs(dx) != distance and abs(dy) != distance: |
|
|
continue |
|
|
|
|
|
nx, ny = fire_x + dx, fire_y + dy |
|
|
|
|
|
|
|
|
if not (0 <= nx < width and 0 <= ny < height): |
|
|
continue |
|
|
|
|
|
|
|
|
if (nx, ny) in fire_positions: |
|
|
continue |
|
|
if (nx, ny) in unit_positions: |
|
|
continue |
|
|
if (nx, ny) in building_positions: |
|
|
continue |
|
|
if (nx, ny) in exclude_positions: |
|
|
continue |
|
|
|
|
|
|
|
|
dist_to_fire = abs(nx - fire_x) + abs(ny - fire_y) |
|
|
candidates.append((nx, ny, dist_to_fire)) |
|
|
|
|
|
if candidates: |
|
|
candidates.sort(key=lambda c: c[2]) |
|
|
return (candidates[0][0], candidates[0][1]) |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def _fallback_analyze(self, world_state: dict) -> AdvisorResponse: |
|
|
"""Fallback method for service.py compatibility.""" |
|
|
return self.analyze(world_state) |
|
|
|
|
|
|
|
|
AdvisorAgent._fallback_analyze = _fallback_analyze |
|
|
|