open-range / src /open_range /validator /exploitability.py
Lars Talian
fix(runtime): stabilize live admission boot path (#102)
5b99233 unverified
"""Check 2: Exploitability — run golden path end-to-end."""
from __future__ import annotations
import logging
from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
from open_range.validator._golden_path import execute_step_result
logger = logging.getLogger(__name__)
_META_COMMANDS = {"submit_flag", "submit_evidence", "submit_finding", "auth", "logout"}
def _collapse_whitespace(value: str) -> str:
return " ".join(value.split())
class ExploitabilityCheck:
"""Execute every golden-path step and verify ``expect_in_stdout`` appears."""
def __init__(self, *, require_expectation: bool = True) -> None:
"""Create an exploitability check.
Args:
require_expectation: When ``True`` (default), every non-meta golden
path step must define ``expect_in_stdout``. Missing expectations
are treated as validation failures.
"""
self.require_expectation = require_expectation
async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
if not snapshot.golden_path:
return CheckResult(
name="exploitability",
passed=False,
error="golden path is empty",
)
failed_steps: list[dict] = []
skipped_steps: list[int] = []
unvalidated_steps: list[int] = []
for step in snapshot.golden_path:
cmd_name = step.command.strip().split()[0] if step.command.strip() else ""
if cmd_name in _META_COMMANDS:
skipped_steps.append(step.step)
continue
host = getattr(step, "host", None) or "attacker"
try:
result = await execute_step_result(snapshot, containers, step.command, host=host)
except Exception as exc: # noqa: BLE001
failed_steps.append({
"step": step.step,
"error": str(exc),
})
continue
if result.exit_code != 0:
failed_steps.append({
"step": step.step,
"error": f"command failed with exit_code={result.exit_code}",
"got_snippet": result.combined_output[:300],
})
continue
output = result.combined_output
expected = step.expect_in_stdout
if not expected:
message = (
f"golden path step {step.step} has no expect_in_stdout"
)
if self.require_expectation:
failed_steps.append({
"step": step.step,
"error": message,
})
else:
logger.warning(
"exploitability: %s — output not validated",
message,
)
unvalidated_steps.append(step.step)
elif expected not in output and _collapse_whitespace(expected) not in _collapse_whitespace(output):
failed_steps.append({
"step": step.step,
"expected": expected,
"got_snippet": output[:300],
})
passed = len(failed_steps) == 0 and (
not self.require_expectation or len(unvalidated_steps) == 0
)
issues: list[str] = []
if unvalidated_steps:
issues.append(
f"Steps with no expected output validation: {unvalidated_steps}"
)
error_parts: list[str] = []
if failed_steps:
error_parts.append(f"{len(failed_steps)} golden-path step(s) failed")
if self.require_expectation and unvalidated_steps:
error_parts.append(
f"{len(unvalidated_steps)} golden-path step(s) missing expect_in_stdout"
)
return CheckResult(
name="exploitability",
passed=passed,
details={
"failed_steps": failed_steps,
"skipped_steps": skipped_steps,
"unvalidated_steps": unvalidated_steps,
"issues": issues,
"total_steps": len(snapshot.golden_path),
"require_expectation": self.require_expectation,
},
error="" if passed else "; ".join(error_parts),
)