""" Fire-Rescue MCP Server Provides MCP tools for fire rescue simulation. MCP tools only return DATA - all decisions are made by AI. Core Operations: - reset_scenario: Initialize a new simulation - get_world_state: Get current world state snapshot - deploy_unit: Deploy a firefighting unit - move_unit: Move an existing unit to a new position - step_simulation: Advance simulation by ticks Data Query Tools: - find_idle_units: Get units not covering any fires - find_uncovered_fires: Get fires with no unit coverage - find_building_threats: Get fires near buildings - analyze_coverage: Get comprehensive coverage data """ import sys import threading from pathlib import Path from typing import Optional from mcp.server.fastmcp import FastMCP # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from config import range_text from models import CellType, UnitType from simulation import SimulationEngine, SimulationConfig # Create FastMCP server instance mcp = FastMCP("Fire-Rescue Simulation") # Shared simulation engines keyed by session_id _engines: dict[str, SimulationEngine] = {} _engine_lock = threading.RLock() def attach_engine(engine: SimulationEngine, session_id: str) -> None: """Allow external callers to reuse their engine inside the MCP server.""" if not session_id: raise ValueError("session_id is required to attach engine") with _engine_lock: _engines[session_id] = engine # Unit effective ranges (matching SimulationConfig, Chebyshev/square distance) FIRE_TRUCK_RANGE = 1 # Square coverage radius (includes 8 neighbors) HELICOPTER_RANGE = 2 # Square coverage radius (extends two cells in all directions) FIRE_COUNT_RANGE_TEXT = range_text("fire_count") FIRE_INTENSITY_RANGE_TEXT = range_text("fire_intensity") BUILDING_COUNT_RANGE_TEXT = range_text("building_count") MAX_UNITS_RANGE_TEXT = range_text("max_units") def detach_engine(session_id: str) -> None: """Remove engine mapping for a session (best-effort).""" if not session_id: return with _engine_lock: _engines.pop(session_id, None) def get_engine(session_id: Optional[str]) -> SimulationEngine: """Get the simulation engine for a specific session.""" if not session_id: raise ValueError("session_id is required") with _engine_lock: engine = _engines.get(session_id) if engine is None: raise ValueError(f"No simulation engine attached for session '{session_id}'") return engine def _get_unit_effective_range(unit_type: str) -> int: """Get the effective range for a unit type (for coverage analysis).""" if unit_type == "fire_truck": return FIRE_TRUCK_RANGE elif unit_type == "helicopter": return HELICOPTER_RANGE return 2 # Default def _calculate_distance(x1: int, y1: int, x2: int, y2: int) -> int: """Calculate Chebyshev distance (square radius) between two points.""" return max(abs(x1 - x2), abs(y1 - y2)) def _is_in_range(ux: int, uy: int, fx: int, fy: int, unit_type: str) -> bool: """Check if a fire position is within unit's effective range.""" effective_range = _get_unit_effective_range(unit_type) return _calculate_distance(ux, uy, fx, fy) <= effective_range def generate_emoji_map(engine: SimulationEngine) -> str: """ Generate an emoji-based visualization of the current world state. Legend (matching Gradio UI): - ๐ŸŒฒ Forest (no fire) - ๐Ÿข Building (no fire) - ๐Ÿ”ฅ Fire (intensity >= 10%) - ๐Ÿ’จ Smoke (smoldering, intensity < 10%) - ๐Ÿš’ Fire Truck - ๐Ÿš Helicopter """ if engine.world is None: return "No map available" world = engine.world # Create unit position lookup unit_positions = {} for unit in world.units: key = (unit.x, unit.y) if key not in unit_positions: unit_positions[key] = [] unit_positions[key].append(unit.unit_type.value) # Build the map with coordinates lines = [] # Header with X coordinates header = " " + "".join(f"{x:2}" for x in range(world.width)) lines.append(header) for y in range(world.height): row_chars = [] for x in range(world.width): cell = world.grid[y][x] pos = (x, y) # Priority: Units > Fire > Terrain if pos in unit_positions: if "fire_truck" in unit_positions[pos]: row_chars.append("๐Ÿš’") else: row_chars.append("๐Ÿš") elif cell.fire_intensity > 0: # Show fire intensity (matching Gradio: >=10% = fire, <10% = smoke) if cell.fire_intensity >= 0.1: row_chars.append("๐Ÿ”ฅ") else: row_chars.append("๐Ÿ’จ") else: # Show terrain if cell.cell_type == CellType.BUILDING: row_chars.append("๐Ÿข") elif cell.cell_type == CellType.FOREST: row_chars.append("๐ŸŒฒ") else: row_chars.append("โฌœ") lines.append(f"{y:2} " + "".join(row_chars)) # Add legend (matching Gradio UI) lines.append("") lines.append("Legend: ๐ŸŒฒForest ๐ŸขBuilding ๐Ÿ”ฅFire ๐Ÿ’จSmoke ๐Ÿš’Truck ๐ŸšHeli") return "\n".join(lines) def _resolve_engine(session_id: Optional[str]): """Return (engine, error_dict) tuple for tool handlers.""" try: return get_engine(session_id), None except ValueError as exc: return None, {"status": "error", "message": str(exc)} @mcp.tool() def reset_scenario( seed: Optional[int] = None, fire_count: int = 10, fire_intensity: float = 0.5, building_count: int = 20, max_units: int = 10, session_id: Optional[str] = None, ) -> dict: f""" Reset and initialize a new fire rescue simulation scenario. Args: seed: Random seed for reproducibility (optional) fire_count: Number of initial fire points ({FIRE_COUNT_RANGE_TEXT}, default: 10) fire_intensity: Initial fire intensity ({FIRE_INTENSITY_RANGE_TEXT}, default: 0.5) building_count: Number of buildings to place ({BUILDING_COUNT_RANGE_TEXT}, default: 20) max_units: Maximum deployable units ({MAX_UNITS_RANGE_TEXT}, default: 10) Returns: Status, summary, and emoji map of the initial state """ engine, error = _resolve_engine(session_id) if error: return error world = engine.reset( seed=seed, fire_count=fire_count, fire_intensity=fire_intensity, building_count=building_count, max_units=max_units ) fires = world.get_fires() return { "status": "ok", "tick": world.tick, "grid_size": f"{world.width}x{world.height}", "initial_fires": len(fires), "buildings": len(world.building_positions), "max_units": world.max_units, "max_ticks": world.max_ticks, "emoji_map": generate_emoji_map(engine) } @mcp.tool() def get_world_state(session_id: Optional[str] = None) -> dict: """ Get the current world state snapshot with emoji map visualization. Returns complete state of the simulation including: - Current tick number - Emoji map showing the battlefield visually - Fire locations and intensities - Deployed units and their positions - Building integrity and forest burn ratio - Recent events The emoji_map provides a visual overview: ๐ŸŒฒ Forest | ๐Ÿข Building | ๐Ÿ”ฅ Fire (>=10%) | ๐Ÿ’จ Smoke (<10%) ๐Ÿš’ Fire Truck | ๐Ÿš Helicopter """ engine, error = _resolve_engine(session_id) if error: return error if engine.world is None: return { "status": "error", "message": "Simulation not initialized. Call reset_scenario first." } state = engine.get_state() state["emoji_map"] = generate_emoji_map(engine) return state @mcp.tool() def deploy_unit( unit_type: str, x: int, y: int, source: str = "player", session_id: Optional[str] = None, ) -> dict: """ Deploy a firefighting unit at the specified position. Args: unit_type: Type of unit - "fire_truck" or "helicopter" x: X coordinate (0 to grid_width-1) y: Y coordinate (0 to grid_height-1) source: Who initiated the deployment - "player", "player_accept", "auto_accept_ai" Returns: Status and details of the deployed unit """ engine, error = _resolve_engine(session_id) if error: return error return engine.deploy_unit(unit_type, x, y, source) @mcp.tool() def step_simulation(ticks: int = 1, session_id: Optional[str] = None) -> dict: """ Advance the simulation by the specified number of ticks. Args: ticks: Number of ticks to advance (default: 1) Returns: Current world state with emoji map after advancing """ engine, error = _resolve_engine(session_id) if error: return error if engine.world is None: return { "status": "error", "message": "Simulation not initialized. Call reset_scenario first." } for _ in range(ticks): engine.step() state = engine.get_state() state["emoji_map"] = generate_emoji_map(engine) return state @mcp.tool() def move_unit( source_x: int, source_y: int, target_x: int, target_y: int, session_id: Optional[str] = None, ) -> dict: """ Move an existing unit from source position to target position. Useful for repositioning idle units to cover uncovered fires. Args: source_x: Current X coordinate of the unit source_y: Current Y coordinate of the unit target_x: New X coordinate to move to target_y: New Y coordinate to move to Returns: Status and details of the move operation """ engine, error = _resolve_engine(session_id) if error: return error if engine.world is None: return { "status": "error", "message": "Simulation not initialized. Call reset_scenario first." } # Find unit at source position unit_to_move = None for unit in engine.world.units: if unit.x == source_x and unit.y == source_y: unit_to_move = unit break if unit_to_move is None: return { "status": "error", "message": f"No unit found at source position ({source_x}, {source_y})" } unit_type = unit_to_move.unit_type.value # Remove unit from source remove_result = engine.remove_unit_at(source_x, source_y) if remove_result.get("status") != "ok": return { "status": "error", "message": f"Failed to remove unit: {remove_result.get('message')}" } # Deploy unit at target deploy_result = engine.deploy_unit(unit_type, target_x, target_y, "ai_move") if deploy_result.get("status") != "ok": # Restore unit at original position engine.deploy_unit(unit_type, source_x, source_y, "ai_restore") return { "status": "error", "message": f"Failed to deploy at target: {deploy_result.get('message')}. Unit restored to original position." } return { "status": "ok", "unit_type": unit_type, "source": {"x": source_x, "y": source_y}, "target": {"x": target_x, "y": target_y}, "message": f"Moved {unit_type} from ({source_x}, {source_y}) to ({target_x}, {target_y})" } @mcp.tool() def remove_unit( x: int, y: int, session_id: Optional[str] = None, ) -> dict: """ Remove an existing unit at the specified position. Use this to free up a deployment slot, then deploy_unit to place a new unit elsewhere. Args: x: X coordinate of the unit to remove y: Y coordinate of the unit to remove Returns: Status and details of the removed unit Example use cases: - Remove ineffective truck, then deploy helicopter at better position - Free up deployment slot when unit is no longer needed - Reposition unit: remove_unit + deploy_unit at new location """ engine, error = _resolve_engine(session_id) if error: return error if engine.world is None: return { "status": "error", "message": "Simulation not initialized. Call reset_scenario first." } # Find unit at position unit_to_remove = None for unit in engine.world.units: if unit.x == x and unit.y == y: unit_to_remove = unit break if unit_to_remove is None: return { "status": "error", "message": f"No unit found at position ({x}, {y})" } removed_unit_type = unit_to_remove.unit_type.value # Remove the unit remove_result = engine.remove_unit_at(x, y) if remove_result.get("status") != "ok": return { "status": "error", "message": f"Failed to remove unit: {remove_result.get('message')}" } return { "status": "ok", "removed_unit_type": removed_unit_type, "position": {"x": x, "y": y}, "message": f"Removed {removed_unit_type} at ({x}, {y}). You can now deploy a new unit." } @mcp.tool() def find_idle_units(session_id: Optional[str] = None) -> dict: """ Find units that are not covering any fires (idle/ineffective units). An idle unit is one where NO fires exist within its effective range: - Fire Truck effective range: 2 cells - Helicopter effective range: 3 cells Returns: List of idle units and effective units with their positions """ engine, error = _resolve_engine(session_id) if error: return error if engine.world is None: return { "status": "error", "message": "Simulation not initialized. Call reset_scenario first." } world = engine.world fires = world.get_fires() fire_positions = [(f.x, f.y, f.intensity) for f in fires] idle_units = [] effective_units = [] for unit in world.units: unit_type = unit.unit_type.value has_fire_in_range = False # Check if any fire is within this unit's effective range for fx, fy, intensity in fire_positions: if _is_in_range(unit.x, unit.y, fx, fy, unit_type): has_fire_in_range = True break unit_info = { "x": unit.x, "y": unit.y, "type": unit_type, "effective_range": _get_unit_effective_range(unit_type) } if has_fire_in_range: effective_units.append(unit_info) else: idle_units.append(unit_info) return { "status": "ok", "idle_units": idle_units, "idle_count": len(idle_units), "effective_units": effective_units, "effective_count": len(effective_units), "total_units": len(world.units) } @mcp.tool() def find_uncovered_fires(session_id: Optional[str] = None) -> dict: """ Find fires that have NO unit coverage. An uncovered fire is one where NO unit is within effective range: - Fire Truck range: 2 cells - Helicopter range: 3 cells Returns: List of uncovered fires with their positions, intensity, and building threat status """ engine, error = _resolve_engine(session_id) if error: return error if engine.world is None: return { "status": "error", "message": "Simulation not initialized. Call reset_scenario first." } world = engine.world fires = world.get_fires() units = world.units building_positions = set(world.building_positions) uncovered_fires = [] covered_fires = [] for fire in fires: is_covered = False # Check if any unit covers this fire for unit in units: unit_type = unit.unit_type.value if _is_in_range(unit.x, unit.y, fire.x, fire.y, unit_type): is_covered = True break # Check if fire threatens any building (within 2 cells) threatens_building = False for bx, by in building_positions: if _calculate_distance(fire.x, fire.y, bx, by) <= 2: threatens_building = True break fire_info = { "x": fire.x, "y": fire.y, "intensity": round(fire.intensity, 2), "threatens_building": threatens_building } if is_covered: covered_fires.append(fire_info) else: uncovered_fires.append(fire_info) return { "status": "ok", "uncovered_fires": uncovered_fires, "uncovered_count": len(uncovered_fires), "covered_fires": covered_fires, "covered_count": len(covered_fires), "total_fires": len(fires), "coverage_ratio": round(len(covered_fires) / len(fires), 2) if fires else 1.0 } @mcp.tool() def find_building_threats(session_id: Optional[str] = None) -> dict: """ Find fires that are threatening buildings (within 2 cells of any building). Returns: List of building-threatening fires with their positions, threatened buildings, and coverage status """ engine, error = _resolve_engine(session_id) if error: return error if engine.world is None: return { "status": "error", "message": "Simulation not initialized. Call reset_scenario first." } world = engine.world fires = world.get_fires() units = world.units building_positions = set(world.building_positions) building_threats = [] for fire in fires: # Check if fire threatens any building threatened_buildings = [] for bx, by in building_positions: dist = _calculate_distance(fire.x, fire.y, bx, by) if dist <= 2: threatened_buildings.append({"x": bx, "y": by, "distance": dist}) if not threatened_buildings: continue # Check if this fire is covered is_covered = False covering_unit = None for unit in units: unit_type = unit.unit_type.value if _is_in_range(unit.x, unit.y, fire.x, fire.y, unit_type): is_covered = True covering_unit = {"x": unit.x, "y": unit.y, "type": unit_type} break building_threats.append({ "fire": {"x": fire.x, "y": fire.y, "intensity": round(fire.intensity, 2)}, "threatened_buildings": threatened_buildings, "is_covered": is_covered, "covering_unit": covering_unit }) uncovered_threats = [t for t in building_threats if not t["is_covered"]] return { "status": "ok", "building_threats": building_threats, "total_threats": len(building_threats), "uncovered_threats": len(uncovered_threats), "building_integrity": round(world.building_integrity, 2) } @mcp.tool() def analyze_coverage(session_id: Optional[str] = None) -> dict: """ Get comprehensive coverage analysis data. This tool combines information from multiple analyses: - Idle units (not covering any fire) - Uncovered fires (no unit in range) - Building threats (fires near buildings) - High intensity fires Returns: Comprehensive data about fires, units, and coverage status """ engine, error = _resolve_engine(session_id) if error: return error if engine.world is None: return { "status": "error", "message": "Simulation not initialized. Call reset_scenario first." } world = engine.world fires = world.get_fires() units = world.units building_positions = set(world.building_positions) # Analyze fires fire_analysis = { "total": len(fires), "high_intensity": [], # >70% "building_threats": [], # within 2 cells of building "uncovered": [] # no unit in range } # Analyze units unit_analysis = { "total": len(units), "max_units": world.max_units, "available_slots": world.max_units - len(units), "idle": [], # no fire in range "effective": [] # has fire in range } # Process fires for fire in fires: fire_info = {"x": fire.x, "y": fire.y, "intensity": round(fire.intensity, 2)} # High intensity check if fire.intensity > 0.7: fire_analysis["high_intensity"].append(fire_info) # Building threat check for bx, by in building_positions: if _calculate_distance(fire.x, fire.y, bx, by) <= 2: fire_analysis["building_threats"].append(fire_info) break # Coverage check is_covered = False for unit in units: if _is_in_range(unit.x, unit.y, fire.x, fire.y, unit.unit_type.value): is_covered = True break if not is_covered: fire_analysis["uncovered"].append(fire_info) # Process units fire_positions = [(f.x, f.y) for f in fires] for unit in units: unit_info = {"x": unit.x, "y": unit.y, "type": unit.unit_type.value} has_fire = False for fx, fy in fire_positions: if _is_in_range(unit.x, unit.y, fx, fy, unit.unit_type.value): has_fire = True break if has_fire: unit_analysis["effective"].append(unit_info) else: unit_analysis["idle"].append(unit_info) # Calculate coverage ratio coverage_ratio = 1.0 if fires: covered_count = len(fires) - len(fire_analysis["uncovered"]) coverage_ratio = covered_count / len(fires) return { "status": "ok", "building_integrity": round(world.building_integrity, 2), "coverage_ratio": round(coverage_ratio, 2), "fire_analysis": { "total_fires": fire_analysis["total"], "high_intensity_count": len(fire_analysis["high_intensity"]), "high_intensity_fires": fire_analysis["high_intensity"], "building_threat_count": len(fire_analysis["building_threats"]), "building_threat_fires": fire_analysis["building_threats"], "uncovered_count": len(fire_analysis["uncovered"]), "uncovered_fires": fire_analysis["uncovered"] }, "unit_analysis": { "deployed": unit_analysis["total"], "max_units": unit_analysis["max_units"], "available_slots": unit_analysis["available_slots"], "idle_count": len(unit_analysis["idle"]), "idle_units": unit_analysis["idle"], "effective_count": len(unit_analysis["effective"]), "effective_units": unit_analysis["effective"] }, "emoji_map": generate_emoji_map(engine) } def run_server(): """Run the MCP server with stdio transport.""" mcp.run() if __name__ == "__main__": run_server()