File size: 9,274 Bytes
8c486a8
 
 
49d1c75
 
8c486a8
 
 
 
49d1c75
 
 
 
595e190
49d1c75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8c486a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49d1c75
8c486a8
 
7fedc25
 
 
 
 
 
8c486a8
 
7fedc25
49d1c75
 
7fedc25
 
 
49d1c75
 
8c486a8
 
 
 
 
 
 
 
 
 
 
 
 
595e190
8c486a8
 
 
49d1c75
 
 
 
 
8c486a8
595e190
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8c486a8
 
 
595e190
49d1c75
8c486a8
49d1c75
8c486a8
 
49d1c75
595e190
 
 
 
 
 
8c486a8
595e190
49d1c75
 
8c486a8
49d1c75
8c486a8
595e190
 
 
 
 
 
 
 
 
 
 
 
 
 
8c486a8
 
49d1c75
 
 
 
 
 
 
 
 
 
 
 
 
 
5848160
595e190
 
 
 
 
 
 
 
49d1c75
 
 
 
 
8c486a8
 
 
 
49d1c75
 
 
 
 
 
 
 
 
 
 
 
 
 
8c486a8
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
"""Check 3: Patchability — inverse mutation test.

For each planted vuln: apply its remediation, re-run the golden-path step
that exploits it, and verify the step *fails*.  Then revert via container
restart.
"""

from __future__ import annotations

import logging
import re

from open_range.protocols import CheckResult, ContainerSet, ExploitStep, SnapshotSpec
from open_range.validator._golden_path import execute_step_result

logger = logging.getLogger(__name__)

# Prefixes / patterns that indicate an executable shell command.
_CMD_PREFIXES = (
    "/", "sed", "chmod", "rm", "mv", "cp", "echo", "apt", "pip",
    "patch", "iptables", "mysql", "docker",
)
_CMD_OPERATORS_RE = re.compile(r"[|>&]|&&")


def _looks_executable(remediation: str) -> bool:
    """Heuristic: return True if *remediation* looks like a shell command."""
    stripped = remediation.strip()
    if not stripped:
        return False
    for prefix in _CMD_PREFIXES:
        if stripped.startswith(prefix):
            return True
    if _CMD_OPERATORS_RE.search(stripped):
        return True
    return False


def _find_golden_step(snapshot: SnapshotSpec, chain_step: ExploitStep):
    """Return the golden-path step matching *chain_step*.

    Strategy:
    1. Match via exploit_chain vuln_id → golden_path step whose command
       contains the exploit_chain command (exact vuln_id linkage).
    2. Fall back to substring match requiring >= 10 char overlap.
    3. Return None if no match.
    """
    command_hint = chain_step.command

    # Strategy 1: find golden-path step that contains the exploit command
    if command_hint:
        for gp in snapshot.golden_path:
            if command_hint in gp.command or gp.command in command_hint:
                return gp

    # Strategy 2: substring overlap >= 10 chars
    if command_hint and len(command_hint) >= 10:
        for gp in snapshot.golden_path:
            # Check if any 10-char substring of command_hint appears in gp.command
            overlap = _longest_common_substring(command_hint, gp.command)
            if overlap >= 10:
                return gp

    return None


def _longest_common_substring(a: str, b: str) -> int:
    """Return the length of the longest common substring between *a* and *b*."""
    if not a or not b:
        return 0
    # Use a simple O(n*m) approach, bounded since these are short command strings.
    max_len = 0
    for i in range(len(a)):
        for j in range(len(b)):
            k = 0
            while i + k < len(a) and j + k < len(b) and a[i + k] == b[j + k]:
                k += 1
            if k > max_len:
                max_len = k
    return max_len


class PatchabilityCheck:
    """Inverse mutation: patching a vuln must break the corresponding golden-path step."""

    async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
        vulns = snapshot.truth_graph.vulns
        if not vulns:
            return CheckResult(
                name="patchability",
                passed=False,
                error="no vulns in truth_graph",
            )

        results: list[dict] = []
        all_ok = True
        tested_count = 0

        for vuln in vulns:
            # --- Fail if no remediation defined ---
            if not vuln.remediation or not vuln.remediation.strip():
                msg = "no remediation defined"
                logger.warning("patchability: vuln %s has %s — counting as failure", vuln.id, msg)
                results.append({"vuln": vuln.id, "passed": False, "reason": msg})
                all_ok = False
                continue

            # --- Fail non-executable remediation (prose) ---
            if not _looks_executable(vuln.remediation):
                msg = f"remediation is not executable: {vuln.remediation!r}"
                logger.warning("patchability: vuln %s — %s — counting as failure", vuln.id, msg)
                results.append({"vuln": vuln.id, "passed": False, "reason": msg})
                all_ok = False
                continue

            # Find the golden-path step(s) that exercise this vuln.
            linked_steps = [
                s for s in snapshot.truth_graph.exploit_chain
                if s.vuln_id == vuln.id
            ]
            if not linked_steps:
                results.append({"vuln": vuln.id, "skipped": "no exploit_chain step linked"})
                continue

            host = vuln.host or "attacker"

            # --- Apply remediation -------------------------------------------------
            try:
                remediation_result = await containers.exec_run(host, vuln.remediation)
            except Exception as exc:  # noqa: BLE001
                results.append({"vuln": vuln.id, "error": f"remediation failed: {exc}"})
                all_ok = False
                # Revert via restart even on remediation failure
                try:
                    await containers.restart(host)
                except Exception:  # noqa: BLE001
                    pass
                continue
            if remediation_result.exit_code != 0:
                results.append({
                    "vuln": vuln.id,
                    "passed": False,
                    "reason": (
                        "remediation command failed "
                        f"(exit_code={remediation_result.exit_code})"
                    ),
                    "output_snippet": remediation_result.combined_output[:300],
                })
                all_ok = False
                try:
                    await containers.restart(host)
                except Exception:  # noqa: BLE001
                    pass
                continue

            # --- Re-run linked golden-path step — must now fail --------------------
            step_still_works = False
            inconclusive_details: list[dict[str, object]] = []
            matched_any = False
            for chain_step in linked_steps:
                gp_step = _find_golden_step(snapshot, chain_step)
                if gp_step is None:
                    continue
                matched_any = True
                if not gp_step.expect_in_stdout:
                    inconclusive_details.append({
                        "step": gp_step.step,
                        "reason": "missing expect_in_stdout for retest step",
                    })
                    continue
                try:
                    result = await execute_step_result(
                        snapshot,
                        containers,
                        gp_step.command,
                        host=getattr(gp_step, "host", None) or "attacker",
                    )
                except Exception as exc:  # noqa: BLE001
                    inconclusive_details.append({
                        "step": gp_step.step,
                        "reason": f"retest execution raised: {exc}",
                    })
                    continue
                if result.exit_code != 0:
                    inconclusive_details.append({
                        "step": gp_step.step,
                        "reason": f"retest command failed (exit_code={result.exit_code})",
                        "output_snippet": result.combined_output[:300],
                    })
                    continue
                if gp_step.expect_in_stdout in result.combined_output:
                    step_still_works = True

            if not matched_any:
                results.append({
                    "vuln": vuln.id,
                    "skipped": "no matching golden-path step found for exploit_chain commands",
                })
                # Revert via restart
                try:
                    await containers.restart(host)
                except Exception:  # noqa: BLE001
                    pass
                continue

            tested_count += 1

            if inconclusive_details:
                results.append({
                    "vuln": vuln.id,
                    "passed": False,
                    "reason": "retest inconclusive after remediation",
                    "details": inconclusive_details,
                })
                all_ok = False
            elif step_still_works:
                results.append({
                    "vuln": vuln.id,
                    "passed": False,
                    "reason": "golden path still succeeds after patch",
                })
                all_ok = False
            else:
                results.append({"vuln": vuln.id, "passed": True})

            # --- Revert via container restart ---
            try:
                await containers.restart(host)
            except Exception:  # noqa: BLE001
                pass  # best-effort

        # --- Don't pass vacuously ---
        if tested_count == 0:
            return CheckResult(
                name="patchability",
                passed=False,
                details={"vuln_results": results},
                error="no vulns had testable remediation",
            )

        return CheckResult(
            name="patchability",
            passed=all_ok,
            details={"vuln_results": results},
            error="" if all_ok else "some vulns remain exploitable after remediation",
        )