"""Check 3: Patchability — inverse mutation test. For each planted vuln: apply its remediation, re-run the golden-path step that exploits it, and verify the step *fails*. Then revert via container restart. """ from __future__ import annotations import logging import re from open_range.protocols import CheckResult, ContainerSet, ExploitStep, SnapshotSpec from open_range.validator._golden_path import execute_step_result logger = logging.getLogger(__name__) # Prefixes / patterns that indicate an executable shell command. _CMD_PREFIXES = ( "/", "sed", "chmod", "rm", "mv", "cp", "echo", "apt", "pip", "patch", "iptables", "mysql", "docker", ) _CMD_OPERATORS_RE = re.compile(r"[|>&]|&&") def _looks_executable(remediation: str) -> bool: """Heuristic: return True if *remediation* looks like a shell command.""" stripped = remediation.strip() if not stripped: return False for prefix in _CMD_PREFIXES: if stripped.startswith(prefix): return True if _CMD_OPERATORS_RE.search(stripped): return True return False def _find_golden_step(snapshot: SnapshotSpec, chain_step: ExploitStep): """Return the golden-path step matching *chain_step*. Strategy: 1. Match via exploit_chain vuln_id → golden_path step whose command contains the exploit_chain command (exact vuln_id linkage). 2. Fall back to substring match requiring >= 10 char overlap. 3. Return None if no match. """ command_hint = chain_step.command # Strategy 1: find golden-path step that contains the exploit command if command_hint: for gp in snapshot.golden_path: if command_hint in gp.command or gp.command in command_hint: return gp # Strategy 2: substring overlap >= 10 chars if command_hint and len(command_hint) >= 10: for gp in snapshot.golden_path: # Check if any 10-char substring of command_hint appears in gp.command overlap = _longest_common_substring(command_hint, gp.command) if overlap >= 10: return gp return None def _longest_common_substring(a: str, b: str) -> int: """Return the length of the longest common substring between *a* and *b*.""" if not a or not b: return 0 # Use a simple O(n*m) approach, bounded since these are short command strings. max_len = 0 for i in range(len(a)): for j in range(len(b)): k = 0 while i + k < len(a) and j + k < len(b) and a[i + k] == b[j + k]: k += 1 if k > max_len: max_len = k return max_len class PatchabilityCheck: """Inverse mutation: patching a vuln must break the corresponding golden-path step.""" async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult: vulns = snapshot.truth_graph.vulns if not vulns: return CheckResult( name="patchability", passed=False, error="no vulns in truth_graph", ) results: list[dict] = [] all_ok = True tested_count = 0 for vuln in vulns: # --- Fail if no remediation defined --- if not vuln.remediation or not vuln.remediation.strip(): msg = "no remediation defined" logger.warning("patchability: vuln %s has %s — counting as failure", vuln.id, msg) results.append({"vuln": vuln.id, "passed": False, "reason": msg}) all_ok = False continue # --- Fail non-executable remediation (prose) --- if not _looks_executable(vuln.remediation): msg = f"remediation is not executable: {vuln.remediation!r}" logger.warning("patchability: vuln %s — %s — counting as failure", vuln.id, msg) results.append({"vuln": vuln.id, "passed": False, "reason": msg}) all_ok = False continue # Find the golden-path step(s) that exercise this vuln. linked_steps = [ s for s in snapshot.truth_graph.exploit_chain if s.vuln_id == vuln.id ] if not linked_steps: results.append({"vuln": vuln.id, "skipped": "no exploit_chain step linked"}) continue host = vuln.host or "attacker" # --- Apply remediation ------------------------------------------------- try: remediation_result = await containers.exec_run(host, vuln.remediation) except Exception as exc: # noqa: BLE001 results.append({"vuln": vuln.id, "error": f"remediation failed: {exc}"}) all_ok = False # Revert via restart even on remediation failure try: await containers.restart(host) except Exception: # noqa: BLE001 pass continue if remediation_result.exit_code != 0: results.append({ "vuln": vuln.id, "passed": False, "reason": ( "remediation command failed " f"(exit_code={remediation_result.exit_code})" ), "output_snippet": remediation_result.combined_output[:300], }) all_ok = False try: await containers.restart(host) except Exception: # noqa: BLE001 pass continue # --- Re-run linked golden-path step — must now fail -------------------- step_still_works = False inconclusive_details: list[dict[str, object]] = [] matched_any = False for chain_step in linked_steps: gp_step = _find_golden_step(snapshot, chain_step) if gp_step is None: continue matched_any = True if not gp_step.expect_in_stdout: inconclusive_details.append({ "step": gp_step.step, "reason": "missing expect_in_stdout for retest step", }) continue try: result = await execute_step_result( snapshot, containers, gp_step.command, host=getattr(gp_step, "host", None) or "attacker", ) except Exception as exc: # noqa: BLE001 inconclusive_details.append({ "step": gp_step.step, "reason": f"retest execution raised: {exc}", }) continue if result.exit_code != 0: inconclusive_details.append({ "step": gp_step.step, "reason": f"retest command failed (exit_code={result.exit_code})", "output_snippet": result.combined_output[:300], }) continue if gp_step.expect_in_stdout in result.combined_output: step_still_works = True if not matched_any: results.append({ "vuln": vuln.id, "skipped": "no matching golden-path step found for exploit_chain commands", }) # Revert via restart try: await containers.restart(host) except Exception: # noqa: BLE001 pass continue tested_count += 1 if inconclusive_details: results.append({ "vuln": vuln.id, "passed": False, "reason": "retest inconclusive after remediation", "details": inconclusive_details, }) all_ok = False elif step_still_works: results.append({ "vuln": vuln.id, "passed": False, "reason": "golden path still succeeds after patch", }) all_ok = False else: results.append({"vuln": vuln.id, "passed": True}) # --- Revert via container restart --- try: await containers.restart(host) except Exception: # noqa: BLE001 pass # best-effort # --- Don't pass vacuously --- if tested_count == 0: return CheckResult( name="patchability", passed=False, details={"vuln_results": results}, error="no vulns had testable remediation", ) return CheckResult( name="patchability", passed=all_ok, details={"vuln_results": results}, error="" if all_ok else "some vulns remain exploitable after remediation", )