Spaces:
Runtime error
Runtime error
File size: 9,274 Bytes
8c486a8 49d1c75 8c486a8 49d1c75 595e190 49d1c75 8c486a8 49d1c75 8c486a8 7fedc25 8c486a8 7fedc25 49d1c75 7fedc25 49d1c75 8c486a8 595e190 8c486a8 49d1c75 8c486a8 595e190 8c486a8 595e190 49d1c75 8c486a8 49d1c75 8c486a8 49d1c75 595e190 8c486a8 595e190 49d1c75 8c486a8 49d1c75 8c486a8 595e190 8c486a8 49d1c75 5848160 595e190 49d1c75 8c486a8 49d1c75 8c486a8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 | """Check 3: Patchability — inverse mutation test.
For each planted vuln: apply its remediation, re-run the golden-path step
that exploits it, and verify the step *fails*. Then revert via container
restart.
"""
from __future__ import annotations
import logging
import re
from open_range.protocols import CheckResult, ContainerSet, ExploitStep, SnapshotSpec
from open_range.validator._golden_path import execute_step_result
logger = logging.getLogger(__name__)
# Prefixes / patterns that indicate an executable shell command.
_CMD_PREFIXES = (
"/", "sed", "chmod", "rm", "mv", "cp", "echo", "apt", "pip",
"patch", "iptables", "mysql", "docker",
)
_CMD_OPERATORS_RE = re.compile(r"[|>&]|&&")
def _looks_executable(remediation: str) -> bool:
"""Heuristic: return True if *remediation* looks like a shell command."""
stripped = remediation.strip()
if not stripped:
return False
for prefix in _CMD_PREFIXES:
if stripped.startswith(prefix):
return True
if _CMD_OPERATORS_RE.search(stripped):
return True
return False
def _find_golden_step(snapshot: SnapshotSpec, chain_step: ExploitStep):
"""Return the golden-path step matching *chain_step*.
Strategy:
1. Match via exploit_chain vuln_id → golden_path step whose command
contains the exploit_chain command (exact vuln_id linkage).
2. Fall back to substring match requiring >= 10 char overlap.
3. Return None if no match.
"""
command_hint = chain_step.command
# Strategy 1: find golden-path step that contains the exploit command
if command_hint:
for gp in snapshot.golden_path:
if command_hint in gp.command or gp.command in command_hint:
return gp
# Strategy 2: substring overlap >= 10 chars
if command_hint and len(command_hint) >= 10:
for gp in snapshot.golden_path:
# Check if any 10-char substring of command_hint appears in gp.command
overlap = _longest_common_substring(command_hint, gp.command)
if overlap >= 10:
return gp
return None
def _longest_common_substring(a: str, b: str) -> int:
"""Return the length of the longest common substring between *a* and *b*."""
if not a or not b:
return 0
# Use a simple O(n*m) approach, bounded since these are short command strings.
max_len = 0
for i in range(len(a)):
for j in range(len(b)):
k = 0
while i + k < len(a) and j + k < len(b) and a[i + k] == b[j + k]:
k += 1
if k > max_len:
max_len = k
return max_len
class PatchabilityCheck:
"""Inverse mutation: patching a vuln must break the corresponding golden-path step."""
async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
vulns = snapshot.truth_graph.vulns
if not vulns:
return CheckResult(
name="patchability",
passed=False,
error="no vulns in truth_graph",
)
results: list[dict] = []
all_ok = True
tested_count = 0
for vuln in vulns:
# --- Fail if no remediation defined ---
if not vuln.remediation or not vuln.remediation.strip():
msg = "no remediation defined"
logger.warning("patchability: vuln %s has %s — counting as failure", vuln.id, msg)
results.append({"vuln": vuln.id, "passed": False, "reason": msg})
all_ok = False
continue
# --- Fail non-executable remediation (prose) ---
if not _looks_executable(vuln.remediation):
msg = f"remediation is not executable: {vuln.remediation!r}"
logger.warning("patchability: vuln %s — %s — counting as failure", vuln.id, msg)
results.append({"vuln": vuln.id, "passed": False, "reason": msg})
all_ok = False
continue
# Find the golden-path step(s) that exercise this vuln.
linked_steps = [
s for s in snapshot.truth_graph.exploit_chain
if s.vuln_id == vuln.id
]
if not linked_steps:
results.append({"vuln": vuln.id, "skipped": "no exploit_chain step linked"})
continue
host = vuln.host or "attacker"
# --- Apply remediation -------------------------------------------------
try:
remediation_result = await containers.exec_run(host, vuln.remediation)
except Exception as exc: # noqa: BLE001
results.append({"vuln": vuln.id, "error": f"remediation failed: {exc}"})
all_ok = False
# Revert via restart even on remediation failure
try:
await containers.restart(host)
except Exception: # noqa: BLE001
pass
continue
if remediation_result.exit_code != 0:
results.append({
"vuln": vuln.id,
"passed": False,
"reason": (
"remediation command failed "
f"(exit_code={remediation_result.exit_code})"
),
"output_snippet": remediation_result.combined_output[:300],
})
all_ok = False
try:
await containers.restart(host)
except Exception: # noqa: BLE001
pass
continue
# --- Re-run linked golden-path step — must now fail --------------------
step_still_works = False
inconclusive_details: list[dict[str, object]] = []
matched_any = False
for chain_step in linked_steps:
gp_step = _find_golden_step(snapshot, chain_step)
if gp_step is None:
continue
matched_any = True
if not gp_step.expect_in_stdout:
inconclusive_details.append({
"step": gp_step.step,
"reason": "missing expect_in_stdout for retest step",
})
continue
try:
result = await execute_step_result(
snapshot,
containers,
gp_step.command,
host=getattr(gp_step, "host", None) or "attacker",
)
except Exception as exc: # noqa: BLE001
inconclusive_details.append({
"step": gp_step.step,
"reason": f"retest execution raised: {exc}",
})
continue
if result.exit_code != 0:
inconclusive_details.append({
"step": gp_step.step,
"reason": f"retest command failed (exit_code={result.exit_code})",
"output_snippet": result.combined_output[:300],
})
continue
if gp_step.expect_in_stdout in result.combined_output:
step_still_works = True
if not matched_any:
results.append({
"vuln": vuln.id,
"skipped": "no matching golden-path step found for exploit_chain commands",
})
# Revert via restart
try:
await containers.restart(host)
except Exception: # noqa: BLE001
pass
continue
tested_count += 1
if inconclusive_details:
results.append({
"vuln": vuln.id,
"passed": False,
"reason": "retest inconclusive after remediation",
"details": inconclusive_details,
})
all_ok = False
elif step_still_works:
results.append({
"vuln": vuln.id,
"passed": False,
"reason": "golden path still succeeds after patch",
})
all_ok = False
else:
results.append({"vuln": vuln.id, "passed": True})
# --- Revert via container restart ---
try:
await containers.restart(host)
except Exception: # noqa: BLE001
pass # best-effort
# --- Don't pass vacuously ---
if tested_count == 0:
return CheckResult(
name="patchability",
passed=False,
details={"vuln_results": results},
error="no vulns had testable remediation",
)
return CheckResult(
name="patchability",
passed=all_ok,
details={"vuln_results": results},
error="" if all_ok else "some vulns remain exploitable after remediation",
)
|