Buckets:
| """ | |
| OncoAgent LangGraph — SOTA Multi-Agent Orchestration Graph. | |
| Architecture synthesised from: | |
| - Claude Code: deterministic harness + sub-agent delegation | |
| - Hermes Agent: structured tool calling + persistent state | |
| - Corrective RAG: graded retrieval with query rewriting | |
| - Reflexion: generator ↔ critic loop with max iterations | |
| - Model Tiering: Qwen3.5-9B (fast) ↔ Qwen3.6-27B (deep reasoning) | |
| Topology: | |
| Router → Ingestion → Corrective RAG → Specialist ↔ Critic → HITL Gate → Formatter | |
| ↓ | |
| Fallback | |
| Conditional edges: | |
| - Router: routes "insufficient" directly to fallback | |
| - CRAG: routes insufficient docs to fallback | |
| - Critic: loops back to specialist (max 2) or to fallback | |
| - HITL: routes high-acuity to interrupt, others to formatter | |
| """ | |
| import logging | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.checkpoint.memory import MemorySaver | |
| from .state import AgentState | |
| from .router import router_node | |
| from .nodes import data_ingestion_node | |
| from .corrective_rag import corrective_rag_node | |
| from .specialist import specialist_node | |
| from .critic import critic_node, MAX_CRITIC_ATTEMPTS | |
| from .formatter import formatter_node, fallback_node | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Conditional edge functions | |
| # --------------------------------------------------------------------------- | |
| def _route_after_router(state: AgentState) -> str: | |
| """Route based on the router's complexity classification. | |
| Returns: | |
| Node name to transition to. | |
| """ | |
| decision = state.get("routing_decision", "simple") | |
| if decision == "insufficient": | |
| logger.info("Router → Fallback (insufficient input)") | |
| return "fallback" | |
| # Both "simple" and "complex" proceed to ingestion | |
| return "ingestion" | |
| def _route_after_crag(state: AgentState) -> str: | |
| """Route based on CRAG retrieval results. | |
| If insufficient relevant documents were found (even after rewrites), | |
| route directly to fallback. | |
| Returns: | |
| Node name to transition to. | |
| """ | |
| graded_count = state.get("rag_grading_pass_count", 0) | |
| retrieval_count = state.get("rag_retrieval_count", 0) | |
| if retrieval_count == 0 and graded_count == 0: | |
| logger.info("CRAG → Fallback (no relevant documents)") | |
| return "fallback" | |
| return "specialist" | |
| def _route_after_critic(state: AgentState) -> str: | |
| """Route based on the critic's verdict and attempt count. | |
| - PASS → proceed to HITL gate | |
| - FAIL + attempts < max → loop back to specialist | |
| - FAIL + attempts >= max → fallback | |
| Returns: | |
| Node name to transition to. | |
| """ | |
| verdict = state.get("critic_verdict", "FAIL") | |
| attempts = state.get("critic_attempts", 0) | |
| if verdict == "PASS": | |
| logger.info("Critic → HITL Gate (PASS on attempt %d)", attempts) | |
| return "hitl_gate" | |
| if attempts >= MAX_CRITIC_ATTEMPTS: | |
| logger.warning( | |
| "Critic → Fallback (FAIL after %d/%d attempts)", | |
| attempts, MAX_CRITIC_ATTEMPTS, | |
| ) | |
| return "fallback" | |
| logger.info( | |
| "Critic → Specialist retry (FAIL, attempt %d/%d)", | |
| attempts, MAX_CRITIC_ATTEMPTS, | |
| ) | |
| return "specialist" | |
| def _route_after_hitl(state: AgentState) -> str: | |
| """Route based on acuity level and HITL requirements. | |
| For the hackathon, high-acuity cases are flagged but auto-proceed. | |
| In production, this would use LangGraph's interrupt() for real | |
| clinician approval. | |
| Returns: | |
| Node name to transition to. | |
| """ | |
| # For now, always proceed to formatter | |
| # In production: if hitl_required and not hitl_approved → interrupt | |
| return "formatter" | |
| # --------------------------------------------------------------------------- | |
| # HITL Gate Node | |
| # --------------------------------------------------------------------------- | |
| def hitl_gate_node(state: AgentState) -> dict: | |
| """Determine if the case requires Human-in-the-Loop approval. | |
| Acuity classification: | |
| - high: Stage IV + rare mutations → requires clinician review | |
| - medium: Stage III or complex → flagged but auto-proceeds | |
| - low: Standard cases → auto-proceeds | |
| Args: | |
| state: Current LangGraph state. | |
| Returns: | |
| State update with acuity_level, hitl_required, hitl_approved. | |
| """ | |
| entities = state.get("extracted_entities", {}) | |
| complexity = state.get("complexity_score", 0.0) | |
| stage = entities.get("stage", "Unknown").upper() | |
| # Determine acuity | |
| if "IV" in stage and complexity >= 0.6: | |
| acuity = "high" | |
| hitl_required = True | |
| elif "III" in stage or complexity >= 0.4: | |
| acuity = "medium" | |
| hitl_required = False | |
| else: | |
| acuity = "low" | |
| hitl_required = False | |
| logger.info( | |
| "HITL Gate: acuity=%s, hitl_required=%s, complexity=%.2f", | |
| acuity, hitl_required, complexity, | |
| ) | |
| return { | |
| "acuity_level": acuity, | |
| "hitl_required": hitl_required, | |
| "hitl_approved": not hitl_required, # Auto-approve non-HITL cases | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Graph Builder | |
| # --------------------------------------------------------------------------- | |
| def build_oncoagent_graph() -> StateGraph: | |
| """Build the SOTA OncoAgent LangGraph state machine. | |
| Topology: | |
| START → router → (ingestion | fallback) | |
| ↓ | |
| corrective_rag → (specialist | fallback) | |
| ↓ | |
| specialist ↔ critic (max 2 loops) | |
| ↓ | |
| hitl_gate → formatter → END | |
| ↓ | |
| fallback → END | |
| Returns: | |
| Compiled LangGraph state machine. | |
| """ | |
| workflow = StateGraph(AgentState) | |
| # --- Define Nodes --- | |
| workflow.add_node("router", router_node) | |
| workflow.add_node("ingestion", data_ingestion_node) | |
| workflow.add_node("corrective_rag", corrective_rag_node) | |
| workflow.add_node("specialist", specialist_node) | |
| workflow.add_node("critic", critic_node) | |
| workflow.add_node("hitl_gate", hitl_gate_node) | |
| workflow.add_node("formatter", formatter_node) | |
| workflow.add_node("fallback", fallback_node) | |
| # --- Define Edges --- | |
| # Entry point | |
| workflow.set_entry_point("router") | |
| # Router → Ingestion or Fallback (conditional) | |
| workflow.add_conditional_edges( | |
| "router", | |
| _route_after_router, | |
| { | |
| "ingestion": "ingestion", | |
| "fallback": "fallback", | |
| }, | |
| ) | |
| # Ingestion → Corrective RAG (always) | |
| workflow.add_edge("ingestion", "corrective_rag") | |
| # Corrective RAG → Specialist or Fallback (conditional) | |
| workflow.add_conditional_edges( | |
| "corrective_rag", | |
| _route_after_crag, | |
| { | |
| "specialist": "specialist", | |
| "fallback": "fallback", | |
| }, | |
| ) | |
| # Specialist → Critic (always) | |
| workflow.add_edge("specialist", "critic") | |
| # Critic → HITL Gate, Specialist (retry), or Fallback (conditional) | |
| workflow.add_conditional_edges( | |
| "critic", | |
| _route_after_critic, | |
| { | |
| "hitl_gate": "hitl_gate", | |
| "specialist": "specialist", | |
| "fallback": "fallback", | |
| }, | |
| ) | |
| # HITL Gate → Formatter (conditional, future: interrupt for clinician) | |
| workflow.add_conditional_edges( | |
| "hitl_gate", | |
| _route_after_hitl, | |
| { | |
| "formatter": "formatter", | |
| }, | |
| ) | |
| # Terminal edges | |
| workflow.add_edge("formatter", END) | |
| workflow.add_edge("fallback", END) | |
| # Compile with recursion limit (Rule #20: strict limit for loops) | |
| memory = MemorySaver() | |
| compiled = workflow.compile( | |
| checkpointer=memory, | |
| ) | |
| logger.info("OncoAgent graph compiled successfully (8 nodes, SOTA topology).") | |
| return compiled | |
Xet Storage Details
- Size:
- 8.13 kB
- Xet hash:
- 81575877a6cb64ec75011cf190ca5116777bd5e74b4e7328ec9fd0106ed982dc
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.