File size: 4,460 Bytes
8c486a8
 
 
 
7fedc25
 
8c486a8
595e190
8c486a8
7fedc25
 
307d729
 
8c486a8
5b99233
 
 
 
8c486a8
 
 
6f0f018
 
 
 
 
 
 
 
 
 
8c486a8
 
 
 
 
 
 
 
 
307d729
7fedc25
8c486a8
307d729
 
 
 
8c486a8
 
595e190
8c486a8
 
 
 
 
 
595e190
 
 
 
 
 
 
 
8c486a8
 
7fedc25
6f0f018
 
7fedc25
6f0f018
 
 
 
 
 
 
 
 
 
 
5b99233
8c486a8
 
 
 
 
 
016a288
 
 
7fedc25
 
 
 
 
72e9079
 
 
016a288
72e9079
 
 
8c486a8
 
 
307d729
 
 
7fedc25
 
307d729
6f0f018
307d729
72e9079
8c486a8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""Check 2: Exploitability — run golden path end-to-end."""

from __future__ import annotations

import logging

from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
from open_range.validator._golden_path import execute_step_result

logger = logging.getLogger(__name__)

_META_COMMANDS = {"submit_flag", "submit_evidence", "submit_finding", "auth", "logout"}


def _collapse_whitespace(value: str) -> str:
    return " ".join(value.split())


class ExploitabilityCheck:
    """Execute every golden-path step and verify ``expect_in_stdout`` appears."""

    def __init__(self, *, require_expectation: bool = True) -> None:
        """Create an exploitability check.

        Args:
            require_expectation: When ``True`` (default), every non-meta golden
                path step must define ``expect_in_stdout``. Missing expectations
                are treated as validation failures.
        """
        self.require_expectation = require_expectation

    async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
        if not snapshot.golden_path:
            return CheckResult(
                name="exploitability",
                passed=False,
                error="golden path is empty",
            )

        failed_steps: list[dict] = []
        skipped_steps: list[int] = []
        unvalidated_steps: list[int] = []
        for step in snapshot.golden_path:
            cmd_name = step.command.strip().split()[0] if step.command.strip() else ""
            if cmd_name in _META_COMMANDS:
                skipped_steps.append(step.step)
                continue
            host = getattr(step, "host", None) or "attacker"
            try:
                result = await execute_step_result(snapshot, containers, step.command, host=host)
            except Exception as exc:  # noqa: BLE001
                failed_steps.append({
                    "step": step.step,
                    "error": str(exc),
                })
                continue
            if result.exit_code != 0:
                failed_steps.append({
                    "step": step.step,
                    "error": f"command failed with exit_code={result.exit_code}",
                    "got_snippet": result.combined_output[:300],
                })
                continue
            output = result.combined_output

            expected = step.expect_in_stdout
            if not expected:
                message = (
                    f"golden path step {step.step} has no expect_in_stdout"
                )
                if self.require_expectation:
                    failed_steps.append({
                        "step": step.step,
                        "error": message,
                    })
                else:
                    logger.warning(
                        "exploitability: %s — output not validated",
                        message,
                    )
                    unvalidated_steps.append(step.step)
            elif expected not in output and _collapse_whitespace(expected) not in _collapse_whitespace(output):
                failed_steps.append({
                    "step": step.step,
                    "expected": expected,
                    "got_snippet": output[:300],
                })

        passed = len(failed_steps) == 0 and (
            not self.require_expectation or len(unvalidated_steps) == 0
        )
        issues: list[str] = []
        if unvalidated_steps:
            issues.append(
                f"Steps with no expected output validation: {unvalidated_steps}"
            )
        error_parts: list[str] = []
        if failed_steps:
            error_parts.append(f"{len(failed_steps)} golden-path step(s) failed")
        if self.require_expectation and unvalidated_steps:
            error_parts.append(
                f"{len(unvalidated_steps)} golden-path step(s) missing expect_in_stdout"
            )
        return CheckResult(
            name="exploitability",
            passed=passed,
            details={
                "failed_steps": failed_steps,
                "skipped_steps": skipped_steps,
                "unvalidated_steps": unvalidated_steps,
                "issues": issues,
                "total_steps": len(snapshot.golden_path),
                "require_expectation": self.require_expectation,
            },
            error="" if passed else "; ".join(error_parts),
        )