Spaces:
Runtime error
Runtime error
| """Check 7: Task feasibility — golden path references real hosts, evidence targets exist.""" | |
| from __future__ import annotations | |
| from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec | |
| from open_range.validator.graphs import compile_snapshot_graphs | |
| from open_range.validator.path_solvability import build_host_adjacency, has_host_path | |
| class TaskFeasibilityCheck: | |
| """Verify: | |
| 1. Every golden-path step references a host that exists in the topology. | |
| 2. Every evidence_spec item references a container that exists. | |
| 3. Red's exploit chain references vulns that exist in truth_graph. | |
| """ | |
| async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult: | |
| issues: list[str] = [] | |
| compiled = compile_snapshot_graphs(snapshot) | |
| adjacency = build_host_adjacency(snapshot, compiled) | |
| # Collect known host names from topology. | |
| topo_hosts: set[str] = set() | |
| raw_hosts = snapshot.topology.get("hosts", []) | |
| for h in raw_hosts: | |
| if isinstance(h, dict): | |
| topo_hosts.add(h.get("name", "")) | |
| else: | |
| topo_hosts.add(str(h)) | |
| topo_hosts.discard("") | |
| # Fail early if topology has no hosts. | |
| if not topo_hosts: | |
| return CheckResult( | |
| name="task_feasibility", | |
| passed=False, | |
| details={"issues": ["Topology has no hosts defined"]}, | |
| error="Topology has no hosts defined", | |
| ) | |
| # 1. Golden-path hosts exist in topology. | |
| for step in snapshot.golden_path: | |
| host = getattr(step, "host", None) or "attacker" | |
| if host not in topo_hosts: | |
| issues.append(f"golden path step {step.step}: host '{host}' not in topology") | |
| # 2. Evidence targets reference existing containers. | |
| for item in snapshot.evidence_spec: | |
| loc = item.location | |
| if ":" in loc: | |
| host = loc.split(":")[0] | |
| else: | |
| host = "siem" | |
| if host not in topo_hosts: | |
| issues.append(f"evidence item '{item.type}' references unknown host '{host}'") | |
| # 3. Exploit chain vuln IDs exist in truth_graph. | |
| vuln_ids = {v.id for v in snapshot.truth_graph.vulns} | |
| for step in snapshot.truth_graph.exploit_chain: | |
| if step.vuln_id and step.vuln_id not in vuln_ids: | |
| issues.append(f"exploit chain references unknown vuln '{step.vuln_id}'") | |
| # 4. Flag hosts exist in topology. | |
| for flag in snapshot.flags: | |
| if flag.host not in topo_hosts: | |
| issues.append(f"flag '{flag.id}' references unknown host '{flag.host}'") | |
| vuln_by_id = {v.id: v for v in snapshot.truth_graph.vulns} | |
| flag_by_value = {flag.value: flag for flag in snapshot.flags} | |
| submit_steps = { | |
| step.command.removeprefix("submit_flag ").strip(): step | |
| for step in snapshot.golden_path | |
| if step.command.strip().startswith("submit_flag ") | |
| } | |
| exploit_steps = {step.vuln_id: step for step in snapshot.truth_graph.exploit_chain} | |
| plan = snapshot.mutation_plan | |
| if plan is not None: | |
| for op in plan.ops: | |
| if op.op_type != "seed_vuln": | |
| continue | |
| vuln_id = str(op.params.get("instantiated_vuln_id", "")).strip() | |
| flag_value = str(op.params.get("instantiated_flag_value", "")).strip() | |
| exploit_command = str(op.params.get("instantiated_exploit_command", "")).strip() | |
| flag_host = str(op.params.get("instantiated_flag_host", "")).strip() | |
| vuln = vuln_by_id.get(vuln_id) | |
| if vuln is None: | |
| issues.append( | |
| f"seed_vuln mutation '{op.mutation_id}' did not materialize a vuln id" | |
| ) | |
| continue | |
| flag = flag_by_value.get(flag_value) | |
| if flag is None: | |
| issues.append( | |
| f"seed_vuln mutation '{op.mutation_id}' did not materialize a flag" | |
| ) | |
| continue | |
| if flag_host and flag.host != flag_host: | |
| issues.append( | |
| f"seed_vuln mutation '{op.mutation_id}' flag host mismatch " | |
| f"('{flag.host}' != '{flag_host}')" | |
| ) | |
| if flag_value not in submit_steps: | |
| issues.append( | |
| f"seed_vuln mutation '{op.mutation_id}' missing submit_flag step for " | |
| f"'{flag_value}'" | |
| ) | |
| exploit = exploit_steps.get(vuln_id) | |
| if exploit is None: | |
| issues.append( | |
| f"seed_vuln mutation '{op.mutation_id}' missing exploit_chain linkage" | |
| ) | |
| elif exploit_command and exploit.command != exploit_command: | |
| issues.append( | |
| f"seed_vuln mutation '{op.mutation_id}' exploit command mismatch" | |
| ) | |
| if exploit_command and not any( | |
| exploit_command in step.command for step in snapshot.golden_path | |
| ): | |
| issues.append( | |
| f"seed_vuln mutation '{op.mutation_id}' missing golden-path exploit step" | |
| ) | |
| if flag.host != vuln.host and not has_host_path(vuln.host, flag.host, adjacency): | |
| issues.append( | |
| f"seed_vuln mutation '{op.mutation_id}' flag host '{flag.host}' " | |
| f"is unreachable from vuln host '{vuln.host}'" | |
| ) | |
| passed = len(issues) == 0 | |
| return CheckResult( | |
| name="task_feasibility", | |
| passed=passed, | |
| details={"issues": issues}, | |
| error="" if passed else f"{len(issues)} feasibility issue(s)", | |
| ) | |