Buckets:

MaximoLopezChenlo's picture
download
raw
8.13 kB
"""
OncoAgent LangGraph — SOTA Multi-Agent Orchestration Graph.
Architecture synthesised from:
- Claude Code: deterministic harness + sub-agent delegation
- Hermes Agent: structured tool calling + persistent state
- Corrective RAG: graded retrieval with query rewriting
- Reflexion: generator ↔ critic loop with max iterations
- Model Tiering: Qwen3.5-9B (fast) ↔ Qwen3.6-27B (deep reasoning)
Topology:
Router → Ingestion → Corrective RAG → Specialist ↔ Critic → HITL Gate → Formatter
Fallback
Conditional edges:
- Router: routes "insufficient" directly to fallback
- CRAG: routes insufficient docs to fallback
- Critic: loops back to specialist (max 2) or to fallback
- HITL: routes high-acuity to interrupt, others to formatter
"""
import logging
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from .state import AgentState
from .router import router_node
from .nodes import data_ingestion_node
from .corrective_rag import corrective_rag_node
from .specialist import specialist_node
from .critic import critic_node, MAX_CRITIC_ATTEMPTS
from .formatter import formatter_node, fallback_node
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Conditional edge functions
# ---------------------------------------------------------------------------
def _route_after_router(state: AgentState) -> str:
"""Route based on the router's complexity classification.
Returns:
Node name to transition to.
"""
decision = state.get("routing_decision", "simple")
if decision == "insufficient":
logger.info("Router → Fallback (insufficient input)")
return "fallback"
# Both "simple" and "complex" proceed to ingestion
return "ingestion"
def _route_after_crag(state: AgentState) -> str:
"""Route based on CRAG retrieval results.
If insufficient relevant documents were found (even after rewrites),
route directly to fallback.
Returns:
Node name to transition to.
"""
graded_count = state.get("rag_grading_pass_count", 0)
retrieval_count = state.get("rag_retrieval_count", 0)
if retrieval_count == 0 and graded_count == 0:
logger.info("CRAG → Fallback (no relevant documents)")
return "fallback"
return "specialist"
def _route_after_critic(state: AgentState) -> str:
"""Route based on the critic's verdict and attempt count.
- PASS → proceed to HITL gate
- FAIL + attempts < max → loop back to specialist
- FAIL + attempts >= max → fallback
Returns:
Node name to transition to.
"""
verdict = state.get("critic_verdict", "FAIL")
attempts = state.get("critic_attempts", 0)
if verdict == "PASS":
logger.info("Critic → HITL Gate (PASS on attempt %d)", attempts)
return "hitl_gate"
if attempts >= MAX_CRITIC_ATTEMPTS:
logger.warning(
"Critic → Fallback (FAIL after %d/%d attempts)",
attempts, MAX_CRITIC_ATTEMPTS,
)
return "fallback"
logger.info(
"Critic → Specialist retry (FAIL, attempt %d/%d)",
attempts, MAX_CRITIC_ATTEMPTS,
)
return "specialist"
def _route_after_hitl(state: AgentState) -> str:
"""Route based on acuity level and HITL requirements.
For the hackathon, high-acuity cases are flagged but auto-proceed.
In production, this would use LangGraph's interrupt() for real
clinician approval.
Returns:
Node name to transition to.
"""
# For now, always proceed to formatter
# In production: if hitl_required and not hitl_approved → interrupt
return "formatter"
# ---------------------------------------------------------------------------
# HITL Gate Node
# ---------------------------------------------------------------------------
def hitl_gate_node(state: AgentState) -> dict:
"""Determine if the case requires Human-in-the-Loop approval.
Acuity classification:
- high: Stage IV + rare mutations → requires clinician review
- medium: Stage III or complex → flagged but auto-proceeds
- low: Standard cases → auto-proceeds
Args:
state: Current LangGraph state.
Returns:
State update with acuity_level, hitl_required, hitl_approved.
"""
entities = state.get("extracted_entities", {})
complexity = state.get("complexity_score", 0.0)
stage = entities.get("stage", "Unknown").upper()
# Determine acuity
if "IV" in stage and complexity >= 0.6:
acuity = "high"
hitl_required = True
elif "III" in stage or complexity >= 0.4:
acuity = "medium"
hitl_required = False
else:
acuity = "low"
hitl_required = False
logger.info(
"HITL Gate: acuity=%s, hitl_required=%s, complexity=%.2f",
acuity, hitl_required, complexity,
)
return {
"acuity_level": acuity,
"hitl_required": hitl_required,
"hitl_approved": not hitl_required, # Auto-approve non-HITL cases
}
# ---------------------------------------------------------------------------
# Graph Builder
# ---------------------------------------------------------------------------
def build_oncoagent_graph() -> StateGraph:
"""Build the SOTA OncoAgent LangGraph state machine.
Topology:
START → router → (ingestion | fallback)
corrective_rag → (specialist | fallback)
specialist ↔ critic (max 2 loops)
hitl_gate → formatter → END
fallback → END
Returns:
Compiled LangGraph state machine.
"""
workflow = StateGraph(AgentState)
# --- Define Nodes ---
workflow.add_node("router", router_node)
workflow.add_node("ingestion", data_ingestion_node)
workflow.add_node("corrective_rag", corrective_rag_node)
workflow.add_node("specialist", specialist_node)
workflow.add_node("critic", critic_node)
workflow.add_node("hitl_gate", hitl_gate_node)
workflow.add_node("formatter", formatter_node)
workflow.add_node("fallback", fallback_node)
# --- Define Edges ---
# Entry point
workflow.set_entry_point("router")
# Router → Ingestion or Fallback (conditional)
workflow.add_conditional_edges(
"router",
_route_after_router,
{
"ingestion": "ingestion",
"fallback": "fallback",
},
)
# Ingestion → Corrective RAG (always)
workflow.add_edge("ingestion", "corrective_rag")
# Corrective RAG → Specialist or Fallback (conditional)
workflow.add_conditional_edges(
"corrective_rag",
_route_after_crag,
{
"specialist": "specialist",
"fallback": "fallback",
},
)
# Specialist → Critic (always)
workflow.add_edge("specialist", "critic")
# Critic → HITL Gate, Specialist (retry), or Fallback (conditional)
workflow.add_conditional_edges(
"critic",
_route_after_critic,
{
"hitl_gate": "hitl_gate",
"specialist": "specialist",
"fallback": "fallback",
},
)
# HITL Gate → Formatter (conditional, future: interrupt for clinician)
workflow.add_conditional_edges(
"hitl_gate",
_route_after_hitl,
{
"formatter": "formatter",
},
)
# Terminal edges
workflow.add_edge("formatter", END)
workflow.add_edge("fallback", END)
# Compile with recursion limit (Rule #20: strict limit for loops)
memory = MemorySaver()
compiled = workflow.compile(
checkpointer=memory,
)
logger.info("OncoAgent graph compiled successfully (8 nodes, SOTA topology).")
return compiled

Xet Storage Details

Size:
8.13 kB
·
Xet hash:
81575877a6cb64ec75011cf190ca5116777bd5e74b4e7328ec9fd0106ed982dc

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.