DeepCritical / src /orchestrator_magentic.py
VibecoderMcSwaggins's picture
fix: exclude reference_repos from mypy and cleanup
0a480cb
raw
history blame
7.45 kB
"""Magentic-based orchestrator using ChatAgent pattern."""
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, Any
import structlog
from agent_framework import (
MagenticAgentDeltaEvent,
MagenticAgentMessageEvent,
MagenticBuilder,
MagenticFinalResultEvent,
MagenticOrchestratorMessageEvent,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient
from src.agents.magentic_agents import (
create_hypothesis_agent,
create_judge_agent,
create_report_agent,
create_search_agent,
)
from src.agents.state import init_magentic_state
from src.utils.config import settings
from src.utils.exceptions import ConfigurationError
from src.utils.models import AgentEvent
if TYPE_CHECKING:
from src.services.embeddings import EmbeddingService
logger = structlog.get_logger()
class MagenticOrchestrator:
"""
Magentic-based orchestrator using ChatAgent pattern.
Each agent has an internal LLM that understands natural language
instructions from the manager and can call tools appropriately.
"""
def __init__(
self,
max_rounds: int = 10,
chat_client: OpenAIChatClient | None = None,
) -> None:
"""Initialize orchestrator.
Args:
max_rounds: Maximum coordination rounds
chat_client: Optional shared chat client for agents
"""
if not settings.openai_api_key:
raise ConfigurationError(
"Magentic mode requires OPENAI_API_KEY. Set the key or use mode='simple'."
)
self._max_rounds = max_rounds
self._chat_client = chat_client
def _init_embedding_service(self) -> "EmbeddingService | None":
"""Initialize embedding service if available."""
try:
from src.services.embeddings import get_embedding_service
service = get_embedding_service()
logger.info("Embedding service enabled")
return service
except ImportError:
logger.info("Embedding service not available (dependencies missing)")
except Exception as e:
logger.warning("Failed to initialize embedding service", error=str(e))
return None
def _build_workflow(self) -> Any:
"""Build the Magentic workflow with ChatAgent participants."""
# Create agents with internal LLMs
search_agent = create_search_agent(self._chat_client)
judge_agent = create_judge_agent(self._chat_client)
hypothesis_agent = create_hypothesis_agent(self._chat_client)
report_agent = create_report_agent(self._chat_client)
# Manager chat client (orchestrates the agents)
manager_client = OpenAIChatClient(
model_id=settings.openai_model, # Use configured model
api_key=settings.openai_api_key,
)
return (
MagenticBuilder()
.participants(
searcher=search_agent,
hypothesizer=hypothesis_agent,
judge=judge_agent,
reporter=report_agent,
)
.with_standard_manager(
chat_client=manager_client,
max_round_count=self._max_rounds,
max_stall_count=3,
max_reset_count=2,
)
.build()
)
async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]:
"""
Run the Magentic workflow.
Args:
query: User's research question
Yields:
AgentEvent objects for real-time UI updates
"""
logger.info("Starting Magentic orchestrator", query=query)
yield AgentEvent(
type="started",
message=f"Starting research (Magentic mode): {query}",
iteration=0,
)
# Initialize context state
embedding_service = self._init_embedding_service()
init_magentic_state(embedding_service)
workflow = self._build_workflow()
task = f"""Research drug repurposing opportunities for: {query}
Workflow:
1. SearchAgent: Find evidence from PubMed, ClinicalTrials.gov, and bioRxiv
2. HypothesisAgent: Generate mechanistic hypotheses (Drug -> Target -> Pathway -> Effect)
3. JudgeAgent: Evaluate if evidence is sufficient
4. If insufficient -> SearchAgent refines search based on gaps
5. If sufficient -> ReportAgent synthesizes final report
Focus on:
- Identifying specific molecular targets
- Understanding mechanism of action
- Finding clinical evidence supporting hypotheses
The final output should be a structured research report."""
iteration = 0
try:
async for event in workflow.run_stream(task):
agent_event = self._process_event(event, iteration)
if agent_event:
if isinstance(event, MagenticAgentMessageEvent):
iteration += 1
yield agent_event
except Exception as e:
logger.error("Magentic workflow failed", error=str(e))
yield AgentEvent(
type="error",
message=f"Workflow error: {e!s}",
iteration=iteration,
)
def _process_event(self, event: Any, iteration: int) -> AgentEvent | None:
"""Process workflow event into AgentEvent."""
if isinstance(event, MagenticOrchestratorMessageEvent):
text = event.message.text if event.message else ""
if text:
return AgentEvent(
type="judging",
message=f"Manager ({event.kind}): {text[:200]}...",
iteration=iteration,
)
elif isinstance(event, MagenticAgentMessageEvent):
agent_name = event.agent_id or "unknown"
text = event.message.text if event.message else ""
event_type = "judging"
if "search" in agent_name.lower():
event_type = "search_complete"
elif "judge" in agent_name.lower():
event_type = "judge_complete"
elif "hypothes" in agent_name.lower():
event_type = "hypothesizing"
elif "report" in agent_name.lower():
event_type = "synthesizing"
return AgentEvent(
type=event_type, # type: ignore[arg-type]
message=f"{agent_name}: {text[:200]}...",
iteration=iteration + 1,
)
elif isinstance(event, MagenticFinalResultEvent):
text = event.message.text if event.message else "No result"
return AgentEvent(
type="complete",
message=text,
data={"iterations": iteration},
iteration=iteration,
)
elif isinstance(event, MagenticAgentDeltaEvent):
if event.text:
return AgentEvent(
type="streaming",
message=event.text,
data={"agent_id": event.agent_id},
iteration=iteration,
)
elif isinstance(event, WorkflowOutputEvent):
if event.data:
return AgentEvent(
type="complete",
message=str(event.data),
iteration=iteration,
)
return None