"""Check 2: Exploitability — run golden path end-to-end.""" from __future__ import annotations import logging from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec from open_range.validator._golden_path import execute_step_result logger = logging.getLogger(__name__) _META_COMMANDS = {"submit_flag", "submit_evidence", "submit_finding", "auth", "logout"} def _collapse_whitespace(value: str) -> str: return " ".join(value.split()) class ExploitabilityCheck: """Execute every golden-path step and verify ``expect_in_stdout`` appears.""" def __init__(self, *, require_expectation: bool = True) -> None: """Create an exploitability check. Args: require_expectation: When ``True`` (default), every non-meta golden path step must define ``expect_in_stdout``. Missing expectations are treated as validation failures. """ self.require_expectation = require_expectation async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult: if not snapshot.golden_path: return CheckResult( name="exploitability", passed=False, error="golden path is empty", ) failed_steps: list[dict] = [] skipped_steps: list[int] = [] unvalidated_steps: list[int] = [] for step in snapshot.golden_path: cmd_name = step.command.strip().split()[0] if step.command.strip() else "" if cmd_name in _META_COMMANDS: skipped_steps.append(step.step) continue host = getattr(step, "host", None) or "attacker" try: result = await execute_step_result(snapshot, containers, step.command, host=host) except Exception as exc: # noqa: BLE001 failed_steps.append({ "step": step.step, "error": str(exc), }) continue if result.exit_code != 0: failed_steps.append({ "step": step.step, "error": f"command failed with exit_code={result.exit_code}", "got_snippet": result.combined_output[:300], }) continue output = result.combined_output expected = step.expect_in_stdout if not expected: message = ( f"golden path step {step.step} has no expect_in_stdout" ) if self.require_expectation: failed_steps.append({ "step": step.step, "error": message, }) else: logger.warning( "exploitability: %s — output not validated", message, ) unvalidated_steps.append(step.step) elif expected not in output and _collapse_whitespace(expected) not in _collapse_whitespace(output): failed_steps.append({ "step": step.step, "expected": expected, "got_snippet": output[:300], }) passed = len(failed_steps) == 0 and ( not self.require_expectation or len(unvalidated_steps) == 0 ) issues: list[str] = [] if unvalidated_steps: issues.append( f"Steps with no expected output validation: {unvalidated_steps}" ) error_parts: list[str] = [] if failed_steps: error_parts.append(f"{len(failed_steps)} golden-path step(s) failed") if self.require_expectation and unvalidated_steps: error_parts.append( f"{len(unvalidated_steps)} golden-path step(s) missing expect_in_stdout" ) return CheckResult( name="exploitability", passed=passed, details={ "failed_steps": failed_steps, "skipped_steps": skipped_steps, "unvalidated_steps": unvalidated_steps, "issues": issues, "total_steps": len(snapshot.golden_path), "require_expectation": self.require_expectation, }, error="" if passed else "; ".join(error_parts), )