""" Fire-Rescue - Simulation Core Handles fire spread, unit behavior, and win/lose conditions. """ import random from typing import Optional from models import ( WorldState, Cell, CellType, Unit, UnitType, SimulationStatus, Event ) class SimulationConfig: """Configuration parameters for the simulation.""" # Grid size GRID_WIDTH = 10 GRID_HEIGHT = 10 # Fire spread parameters (balanced for playability) FIRE_SPREAD_CHANCE = 0.08 # Reduced: chance to spread to adjacent cell per tick FIRE_GROWTH_RATE = 0.02 # Reduced: how much fire intensity grows per tick FIRE_MAX_INTENSITY = 1.0 FIRE_DECAY_RATE = 0.01 # Natural decay (very slow) # Damage parameters DAMAGE_PER_TICK = 0.01 # Reduced: damage dealt by fire per tick # Unit parameters (stronger firefighting) FIRE_TRUCK_RANGE = 1 # Square coverage radius (includes 8 neighboring cells) FIRE_TRUCK_POWER = 0.4 # Increased: reduction in fire intensity per action HELICOPTER_RANGE = 2 # Square coverage radius (extends two cells in all directions) HELICOPTER_POWER = 0.25 # Increased: less powerful but wider coverage UNIT_COOLDOWN = 1 # Reduced: ticks between unit actions (faster response) # Win/lose thresholds BUILDING_DAMAGE_THRESHOLD = 0.5 # Fail if building integrity < 50% FOREST_DAMAGE_THRESHOLD = 0.8 # Fail if forest burn > 80% FIRE_SAFE_THRESHOLD = 0.1 # Win if all fires below this intensity class SimulationEngine: """ Core simulation engine that manages world state updates. """ def __init__(self, config: Optional[SimulationConfig] = None): self.config = config or SimulationConfig() self.world: Optional[WorldState] = None def reset( self, seed: Optional[int] = None, fire_count: int = 4, fire_intensity: float = 0.6, building_count: int = 16, max_units: int = 10 ) -> WorldState: """ Reset and initialize a new simulation. Args: seed: Random seed for reproducibility fire_count: Number of initial fire points (1-25) fire_intensity: Initial fire intensity (0.0-1.0) building_count: Number of buildings to place (1-25) max_units: Maximum number of deployable units (1-20) """ self.world = WorldState( width=self.config.GRID_WIDTH, height=self.config.GRID_HEIGHT, tick=0, status=SimulationStatus.RUNNING, max_ticks=200, max_units=max_units ) self.world.initialize_grid( seed=seed, fire_count=fire_count, fire_intensity=fire_intensity, building_count=building_count ) self.world.calculate_metrics() return self.world def step(self) -> WorldState: """Advance simulation by one tick.""" if self.world is None: raise RuntimeError("Simulation not initialized. Call reset() first.") if self.world.status != SimulationStatus.RUNNING: return self.world # 1. Units perform actions FIRST (so deployed units work immediately) self._update_units() # 2. Fire spreads and grows self._update_fire() # 3. Fire causes damage self._update_damage() # 4. Recalculate metrics self.world.calculate_metrics() # 5. Increment tick self.world.tick += 1 # 6. Check win/lose conditions self._check_end_conditions() return self.world def _update_fire(self): """Update fire spread and growth.""" new_fires: list[tuple[int, int, float]] = [] for row in self.world.grid: for cell in row: if cell.fire_intensity > 0: # Fire grows cell.fire_intensity = min( cell.fire_intensity + self.config.FIRE_GROWTH_RATE, self.config.FIRE_MAX_INTENSITY ) # Fire spreads to neighbors for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]: nx, ny = cell.x + dx, cell.y + dy neighbor = self.world.get_cell(nx, ny) if neighbor and neighbor.fire_intensity == 0 and not neighbor.is_destroyed(): # Spread chance based on source fire intensity spread_chance = self.config.FIRE_SPREAD_CHANCE * cell.fire_intensity if random.random() < spread_chance: # Initial intensity based on source new_intensity = cell.fire_intensity * 0.5 new_fires.append((nx, ny, new_intensity)) # Apply new fires for x, y, intensity in new_fires: cell = self.world.get_cell(x, y) if cell and cell.fire_intensity == 0: cell.fire_intensity = intensity def _update_damage(self): """Update damage caused by fire.""" for row in self.world.grid: for cell in row: if cell.fire_intensity > 0 and cell.cell_type != CellType.EMPTY: # Damage proportional to fire intensity damage = self.config.DAMAGE_PER_TICK * cell.fire_intensity cell.damage = min(cell.damage + damage, 1.0) def _update_units(self): """Update unit actions (firefighting).""" for unit in self.world.units: # Decrease cooldown if unit.cooldown > 0: unit.cooldown -= 1 continue # Find cells to extinguish extinguished = False if unit.unit_type == UnitType.FIRE_TRUCK: extinguished = self._fire_truck_action(unit) elif unit.unit_type == UnitType.HELICOPTER: extinguished = self._helicopter_action(unit) if extinguished: unit.cooldown = self.config.UNIT_COOLDOWN def _fire_truck_action(self, unit: Unit) -> bool: """Fire truck extinguishes fires within a square radius (Chebyshev distance).""" targets = [] # Check cells within square range (including diagonals) for dx in range(-self.config.FIRE_TRUCK_RANGE, self.config.FIRE_TRUCK_RANGE + 1): for dy in range(-self.config.FIRE_TRUCK_RANGE, self.config.FIRE_TRUCK_RANGE + 1): cell = self.world.get_cell(unit.x + dx, unit.y + dy) if cell and cell.fire_intensity > 0: targets.append(cell) if not targets: return False # Target highest intensity fire targets.sort(key=lambda c: c.fire_intensity, reverse=True) target = targets[0] # Reduce fire target.fire_intensity = max(0, target.fire_intensity - self.config.FIRE_TRUCK_POWER) return True def _helicopter_action(self, unit: Unit) -> bool: """Helicopter extinguishes fires within a wider square radius.""" affected = False for dx in range(-self.config.HELICOPTER_RANGE, self.config.HELICOPTER_RANGE + 1): for dy in range(-self.config.HELICOPTER_RANGE, self.config.HELICOPTER_RANGE + 1): cell = self.world.get_cell(unit.x + dx, unit.y + dy) if cell and cell.fire_intensity > 0: cell.fire_intensity = max(0, cell.fire_intensity - self.config.HELICOPTER_POWER) affected = True return affected def _check_end_conditions(self): """Check win/lose conditions.""" # Check time limit if self.world.tick >= self.world.max_ticks: self.world.status = SimulationStatus.FAIL self.world.recent_events.append(Event( tick=self.world.tick, event_type="simulation_end", details={"reason": "time_limit_exceeded"} )) return # Check building damage if self.world.building_integrity < (1 - self.config.BUILDING_DAMAGE_THRESHOLD): self.world.status = SimulationStatus.FAIL self.world.recent_events.append(Event( tick=self.world.tick, event_type="simulation_end", details={"reason": "building_destroyed"} )) return # Check forest damage if self.world.forest_burn_ratio > self.config.FOREST_DAMAGE_THRESHOLD: self.world.status = SimulationStatus.FAIL self.world.recent_events.append(Event( tick=self.world.tick, event_type="simulation_end", details={"reason": "forest_destroyed"} )) return # Check if all fires are extinguished fires = self.world.get_fires() if not fires or all(f.intensity < self.config.FIRE_SAFE_THRESHOLD for f in fires): self.world.status = SimulationStatus.SUCCESS self.world.recent_events.append(Event( tick=self.world.tick, event_type="simulation_end", details={"reason": "fire_contained"} )) def deploy_unit( self, unit_type: str, x: int, y: int, source: str = "player" ) -> dict: """Deploy a new unit at the specified position.""" if self.world is None: return {"status": "error", "message": "Simulation not initialized"} if self.world.status != SimulationStatus.RUNNING: return {"status": "error", "message": "Simulation is not running"} # Parse unit type try: utype = UnitType(unit_type) except ValueError: return {"status": "error", "message": f"Invalid unit type: {unit_type}"} # Check position validity first for better error messages if not (0 <= x < self.world.width and 0 <= y < self.world.height): return {"status": "error", "message": f"Position ({x}, {y}) is out of bounds"} cell = self.world.get_cell(x, y) if cell and cell.fire_intensity > 0: return {"status": "error", "message": f"Cannot deploy on burning cell at ({x}, {y})"} if cell and cell.cell_type == CellType.BUILDING: return {"status": "error", "message": f"Cannot deploy on building at ({x}, {y})"} # Check unit limit if len(self.world.units) >= self.world.max_units: return {"status": "error", "message": f"Unit limit reached ({self.world.max_units})"} # Try to add unit unit = self.world.add_unit(utype, x, y, source) if unit is None: return {"status": "error", "message": "Failed to deploy unit"} return { "status": "ok", "unit": unit.to_dict() } def remove_unit_at(self, x: int, y: int) -> dict: """Remove a unit at the specified position.""" if self.world is None: return {"status": "error", "message": "Simulation not initialized"} # Find unit at position unit_to_remove = None for unit in self.world.units: if unit.x == x and unit.y == y: unit_to_remove = unit break if unit_to_remove is None: return {"status": "error", "message": f"No unit at ({x}, {y})"} # Remove the unit self.world.units.remove(unit_to_remove) return { "status": "ok", "message": f"Removed {unit_to_remove.unit_type.value} at ({x}, {y})", "unit": unit_to_remove.to_dict() } def get_state(self) -> dict: """Get current world state as dictionary.""" if self.world is None: return {"status": "error", "message": "Simulation not initialized"} return self.world.to_dict()