| """Biological rule engine β hard and soft constraint checking.
|
|
|
| Hard constraints block action execution entirely.
|
| Soft constraints allow execution but degrade output quality and incur penalties.
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| from dataclasses import dataclass
|
| from enum import Enum
|
| from typing import List
|
|
|
| from models import ActionType, ExperimentAction, TOOL_REGISTRY
|
|
|
| from server.simulator.latent_state import FullLatentState
|
|
|
|
|
| class Severity(str, Enum):
|
| HARD = "hard"
|
| SOFT = "soft"
|
|
|
|
|
| @dataclass
|
| class RuleViolation:
|
| rule_id: str
|
| severity: Severity
|
| message: str
|
|
|
|
|
| class RuleEngine:
|
| """Evaluates biological and resource constraints against the current
|
| latent state before each action is applied.
|
| """
|
|
|
| @staticmethod
|
| def _has_analysis_evidence(s: FullLatentState) -> bool:
|
| p = s.progress
|
| return any([
|
| p.cells_clustered,
|
| p.de_performed,
|
| p.trajectories_inferred,
|
| p.pathways_analyzed,
|
| p.networks_inferred,
|
| p.markers_discovered,
|
| p.markers_validated,
|
| ])
|
|
|
| @staticmethod
|
| def _has_marker_evidence(s: FullLatentState) -> bool:
|
| p = s.progress
|
| return p.markers_discovered or p.markers_validated
|
|
|
| @staticmethod
|
| def _has_mechanism_evidence(s: FullLatentState) -> bool:
|
| p = s.progress
|
| return p.pathways_analyzed or p.networks_inferred
|
|
|
| def check(
|
| self, action: ExperimentAction, state: FullLatentState
|
| ) -> List[RuleViolation]:
|
| violations: List[RuleViolation] = []
|
| violations.extend(self._check_prerequisites(action, state))
|
| violations.extend(self._check_resource_constraints(action, state))
|
| violations.extend(self._check_redundancy(action, state))
|
| violations.extend(self._check_causal_validity(action, state))
|
| violations.extend(self._check_tool_compatibility(action, state))
|
| return violations
|
|
|
| def hard_violations(self, violations: List[RuleViolation]) -> List[str]:
|
| return [v.message for v in violations if v.severity == Severity.HARD]
|
|
|
| def soft_violations(self, violations: List[RuleViolation]) -> List[str]:
|
| return [v.message for v in violations if v.severity == Severity.SOFT]
|
|
|
|
|
|
|
| def _check_prerequisites(
|
| self, action: ExperimentAction, s: FullLatentState
|
| ) -> List[RuleViolation]:
|
| vs: List[RuleViolation] = []
|
| at = action.action_type
|
| p = s.progress
|
|
|
| REQUIRES = {
|
| ActionType.PREPARE_LIBRARY: [
|
| ("samples_collected", "Cannot prepare library without collected samples"),
|
| ],
|
| ActionType.SEQUENCE_CELLS: [
|
| ("library_prepared", "Cannot sequence without library preparation"),
|
| ],
|
| ActionType.RUN_QC: [
|
| ("cells_sequenced", "Cannot run QC before sequencing"),
|
| ],
|
| ActionType.FILTER_DATA: [
|
| ("qc_performed", "Cannot filter data before QC"),
|
| ],
|
| ActionType.NORMALIZE_DATA: [
|
| ("data_filtered", "Cannot normalise before filtering"),
|
| ],
|
| ActionType.INTEGRATE_BATCHES: [
|
| ("data_normalized", "Cannot integrate batches before normalisation"),
|
| ],
|
| ActionType.CLUSTER_CELLS: [
|
| ("data_normalized", "Cannot cluster before normalisation"),
|
| ],
|
| ActionType.DIFFERENTIAL_EXPRESSION: [
|
| ("data_normalized", "Cannot run DE before normalisation"),
|
| ],
|
| ActionType.TRAJECTORY_ANALYSIS: [
|
| ("data_normalized", "Cannot infer trajectories before normalisation"),
|
| ],
|
| ActionType.PATHWAY_ENRICHMENT: [
|
| ("de_performed", "Cannot run pathway enrichment without DE results"),
|
| ],
|
| ActionType.REGULATORY_NETWORK_INFERENCE: [
|
| ("data_normalized", "Cannot infer networks before normalisation"),
|
| ],
|
| ActionType.MARKER_SELECTION: [
|
| ("de_performed", "Cannot select markers without DE results"),
|
| ],
|
| ActionType.VALIDATE_MARKER: [
|
| ("markers_discovered", "Cannot validate markers before discovery"),
|
| ],
|
| ActionType.PERTURB_GENE: [
|
| ("samples_collected", "Cannot perturb without samples"),
|
| ],
|
| ActionType.PERTURB_COMPOUND: [
|
| ("samples_collected", "Cannot perturb without samples"),
|
| ],
|
| ActionType.CULTURE_CELLS: [
|
| ("samples_collected", "Cannot culture without samples"),
|
| ],
|
| ActionType.SYNTHESIZE_CONCLUSION: [
|
| ("data_normalized", "Cannot synthesize conclusions before data normalization"),
|
| ],
|
| }
|
|
|
| for flag, msg in REQUIRES.get(at, []):
|
| if not getattr(p, flag, False):
|
| vs.append(RuleViolation(
|
| rule_id=f"prereq_{at.value}_{flag}",
|
| severity=Severity.HARD,
|
| message=msg,
|
| ))
|
| return vs
|
|
|
|
|
|
|
| def _check_resource_constraints(
|
| self, action: ExperimentAction, s: FullLatentState
|
| ) -> List[RuleViolation]:
|
| vs: List[RuleViolation] = []
|
| if s.resources.budget_exhausted:
|
| vs.append(RuleViolation(
|
| rule_id="budget_exhausted",
|
| severity=Severity.HARD,
|
| message="Budget exhausted - no further actions possible",
|
| ))
|
| if s.resources.time_exhausted:
|
| vs.append(RuleViolation(
|
| rule_id="time_exhausted",
|
| severity=Severity.HARD,
|
| message="Time limit reached - no further actions possible",
|
| ))
|
|
|
| remaining = s.resources.budget_remaining
|
| from server.simulator.transition import compute_action_cost
|
| cost, _ = compute_action_cost(action)
|
| if cost > remaining and remaining > 0:
|
| vs.append(RuleViolation(
|
| rule_id="budget_insufficient",
|
| severity=Severity.HARD,
|
| message=f"Action costs ${cost:,.0f} but only ${remaining:,.0f} remains",
|
| ))
|
| return vs
|
|
|
|
|
|
|
| def _check_redundancy(
|
| self, action: ExperimentAction, s: FullLatentState
|
| ) -> List[RuleViolation]:
|
| vs: List[RuleViolation] = []
|
| at = action.action_type
|
| p = s.progress
|
|
|
| REDUNDANT = {
|
| ActionType.COLLECT_SAMPLE: "samples_collected",
|
| ActionType.PREPARE_LIBRARY: "library_prepared",
|
| ActionType.SEQUENCE_CELLS: "cells_sequenced",
|
| ActionType.RUN_QC: "qc_performed",
|
| ActionType.FILTER_DATA: "data_filtered",
|
| ActionType.NORMALIZE_DATA: "data_normalized",
|
| ActionType.CLUSTER_CELLS: "cells_clustered",
|
| ActionType.DIFFERENTIAL_EXPRESSION: "de_performed",
|
| ActionType.TRAJECTORY_ANALYSIS: "trajectories_inferred",
|
| ActionType.PATHWAY_ENRICHMENT: "pathways_analyzed",
|
| ActionType.REGULATORY_NETWORK_INFERENCE: "networks_inferred",
|
| ActionType.MARKER_SELECTION: "markers_discovered",
|
| ActionType.VALIDATE_MARKER: "markers_validated",
|
| ActionType.DESIGN_FOLLOWUP: "followup_designed",
|
| ActionType.REQUEST_SUBAGENT_REVIEW: "subagent_review_requested",
|
| ActionType.SYNTHESIZE_CONCLUSION: "conclusion_reached",
|
| }
|
| flag = REDUNDANT.get(at)
|
| if flag and getattr(p, flag, False):
|
| vs.append(RuleViolation(
|
| rule_id=f"redundant_{at.value}",
|
| severity=Severity.HARD,
|
| message=f"Step '{at.value}' already completed β redundant action blocked",
|
| ))
|
| return vs
|
|
|
|
|
|
|
| def _check_causal_validity(
|
| self, action: ExperimentAction, s: FullLatentState
|
| ) -> List[RuleViolation]:
|
| vs: List[RuleViolation] = []
|
| has_analysis_evidence = self._has_analysis_evidence(s)
|
|
|
| if action.action_type == ActionType.DESIGN_FOLLOWUP:
|
| if not has_analysis_evidence:
|
| vs.append(RuleViolation(
|
| rule_id="premature_followup_design",
|
| severity=Severity.HARD,
|
| message=(
|
| "Follow-up design without prior analysis is blocked; "
|
| "complete wet-lab and computational steps first"
|
| ),
|
| ))
|
|
|
| if action.action_type == ActionType.REQUEST_SUBAGENT_REVIEW:
|
| if not has_analysis_evidence:
|
| vs.append(RuleViolation(
|
| rule_id="premature_subagent_review",
|
| severity=Severity.HARD,
|
| message=(
|
| "Subagent review without prior analysis is blocked; "
|
| "generate evidence first"
|
| ),
|
| ))
|
|
|
| if action.action_type == ActionType.SYNTHESIZE_CONCLUSION:
|
| if not s.progress.de_performed and not s.progress.cells_clustered:
|
| vs.append(RuleViolation(
|
| rule_id="premature_conclusion",
|
| severity=Severity.HARD,
|
| message="Cannot synthesise conclusion without substantive analysis",
|
| ))
|
|
|
| if not self._has_marker_evidence(s):
|
| vs.append(RuleViolation(
|
| rule_id="conclusion_without_marker_evidence",
|
| severity=Severity.HARD,
|
| message="Cannot synthesise conclusion before discovering or validating markers",
|
| ))
|
|
|
| if not self._has_mechanism_evidence(s):
|
| vs.append(RuleViolation(
|
| rule_id="conclusion_without_mechanism_evidence",
|
| severity=Severity.HARD,
|
| message="Cannot synthesise conclusion before inferring pathways or mechanisms",
|
| ))
|
|
|
| claims = action.parameters.get("claims", [])
|
| for claim in claims:
|
| if isinstance(claim, dict) and claim.get("claim_type") == "causal":
|
| if not s.progress.markers_validated and not s.progress.networks_inferred:
|
| vs.append(RuleViolation(
|
| rule_id="unsupported_causal_claim",
|
| severity=Severity.SOFT,
|
| message="Causal claim without validation or network evidence",
|
| ))
|
| break
|
|
|
| if action.action_type == ActionType.PATHWAY_ENRICHMENT:
|
| if not s.progress.de_performed:
|
| vs.append(RuleViolation(
|
| rule_id="pathway_without_de",
|
| severity=Severity.SOFT,
|
| message="Pathway enrichment without DE may yield unreliable results",
|
| ))
|
| return vs
|
|
|
|
|
|
|
| _KNOWN_METHODS = {
|
| "scanpy.pp.calculate_qc_metrics", "scanpy.pp.filter_cells",
|
| "scanpy.pp.filter_genes", "scanpy.pp.normalize_total",
|
| "scanpy.pp.log1p", "scanpy.pp.highly_variable_genes",
|
| "scanpy.pp.neighbors", "scanpy.tl.leiden", "scanpy.tl.louvain",
|
| "scanpy.tl.rank_genes_groups", "scanpy.tl.paga", "scanpy.tl.umap",
|
| "gseapy.prerank", "gseapy.gsea", "10x_chromium", "NovaSeq",
|
| }
|
| _METHOD_TO_TOOL = {
|
| "scanpy.pp.calculate_qc_metrics": "Scanpy",
|
| "scanpy.pp.filter_cells": "Scanpy",
|
| "scanpy.pp.filter_genes": "Scanpy",
|
| "scanpy.pp.normalize_total": "Scanpy",
|
| "scanpy.pp.log1p": "Scanpy",
|
| "scanpy.pp.highly_variable_genes": "Scanpy",
|
| "scanpy.pp.neighbors": "Scanpy",
|
| "scanpy.tl.leiden": "Leiden",
|
| "scanpy.tl.louvain": "Louvain",
|
| "scanpy.tl.rank_genes_groups": "Scanpy",
|
| "scanpy.tl.paga": "PAGA",
|
| "scanpy.tl.umap": "UMAP",
|
| "gseapy.prerank": "Scanpy",
|
| "gseapy.gsea": "Scanpy",
|
| "10x_chromium": "CellRanger",
|
| "NovaSeq": "CellRanger",
|
| }
|
|
|
| def _check_tool_compatibility(
|
| self, action: ExperimentAction, s: FullLatentState
|
| ) -> List[RuleViolation]:
|
| """Warn when the chosen tool is incompatible with the task modality."""
|
| vs: List[RuleViolation] = []
|
| method = action.method
|
| if not method:
|
| return vs
|
|
|
| resolved = self._METHOD_TO_TOOL.get(method, method)
|
| tool_spec = TOOL_REGISTRY.get(resolved)
|
| if tool_spec is None and method not in self._KNOWN_METHODS:
|
| vs.append(RuleViolation(
|
| rule_id="unknown_tool",
|
| severity=Severity.SOFT,
|
| message=f"Tool '{method}' is not in the registry β results may be unreliable",
|
| ))
|
| return vs
|
| if tool_spec is None:
|
| return vs
|
|
|
|
|
|
|
|
|
|
|
| task_modality = getattr(s, "task_modality", None)
|
| if task_modality and tool_spec.modalities:
|
| if task_modality not in tool_spec.modalities:
|
| vs.append(RuleViolation(
|
| rule_id="tool_modality_mismatch",
|
| severity=Severity.SOFT,
|
| message=(
|
| f"Tool '{method}' is designed for "
|
| f"{', '.join(tool_spec.modalities)} but task modality "
|
| f"is '{task_modality}'"
|
| ),
|
| ))
|
|
|
| return vs
|
|
|