Spaces:
Running
Running
| """Magentic-based orchestrator using ChatAgent pattern.""" | |
| from collections.abc import AsyncGenerator | |
| from typing import TYPE_CHECKING, Any | |
| import structlog | |
| from agent_framework import ( | |
| MagenticAgentDeltaEvent, | |
| MagenticAgentMessageEvent, | |
| MagenticBuilder, | |
| MagenticFinalResultEvent, | |
| MagenticOrchestratorMessageEvent, | |
| WorkflowOutputEvent, | |
| ) | |
| from agent_framework.openai import OpenAIChatClient | |
| from src.agents.magentic_agents import ( | |
| create_hypothesis_agent, | |
| create_judge_agent, | |
| create_report_agent, | |
| create_search_agent, | |
| ) | |
| from src.agents.state import init_magentic_state | |
| from src.utils.config import settings | |
| from src.utils.exceptions import ConfigurationError | |
| from src.utils.models import AgentEvent | |
| if TYPE_CHECKING: | |
| from src.services.embeddings import EmbeddingService | |
| logger = structlog.get_logger() | |
| class MagenticOrchestrator: | |
| """ | |
| Magentic-based orchestrator using ChatAgent pattern. | |
| Each agent has an internal LLM that understands natural language | |
| instructions from the manager and can call tools appropriately. | |
| """ | |
| def __init__( | |
| self, | |
| max_rounds: int = 10, | |
| chat_client: OpenAIChatClient | None = None, | |
| ) -> None: | |
| """Initialize orchestrator. | |
| Args: | |
| max_rounds: Maximum coordination rounds | |
| chat_client: Optional shared chat client for agents | |
| """ | |
| if not settings.openai_api_key: | |
| raise ConfigurationError( | |
| "Magentic mode requires OPENAI_API_KEY. Set the key or use mode='simple'." | |
| ) | |
| self._max_rounds = max_rounds | |
| self._chat_client = chat_client | |
| def _init_embedding_service(self) -> "EmbeddingService | None": | |
| """Initialize embedding service if available.""" | |
| try: | |
| from src.services.embeddings import get_embedding_service | |
| service = get_embedding_service() | |
| logger.info("Embedding service enabled") | |
| return service | |
| except ImportError: | |
| logger.info("Embedding service not available (dependencies missing)") | |
| except Exception as e: | |
| logger.warning("Failed to initialize embedding service", error=str(e)) | |
| return None | |
| def _build_workflow(self) -> Any: | |
| """Build the Magentic workflow with ChatAgent participants.""" | |
| # Create agents with internal LLMs | |
| search_agent = create_search_agent(self._chat_client) | |
| judge_agent = create_judge_agent(self._chat_client) | |
| hypothesis_agent = create_hypothesis_agent(self._chat_client) | |
| report_agent = create_report_agent(self._chat_client) | |
| # Manager chat client (orchestrates the agents) | |
| manager_client = OpenAIChatClient( | |
| model_id=settings.openai_model, # Use configured model | |
| api_key=settings.openai_api_key, | |
| ) | |
| return ( | |
| MagenticBuilder() | |
| .participants( | |
| searcher=search_agent, | |
| hypothesizer=hypothesis_agent, | |
| judge=judge_agent, | |
| reporter=report_agent, | |
| ) | |
| .with_standard_manager( | |
| chat_client=manager_client, | |
| max_round_count=self._max_rounds, | |
| max_stall_count=3, | |
| max_reset_count=2, | |
| ) | |
| .build() | |
| ) | |
| async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]: | |
| """ | |
| Run the Magentic workflow. | |
| Args: | |
| query: User's research question | |
| Yields: | |
| AgentEvent objects for real-time UI updates | |
| """ | |
| logger.info("Starting Magentic orchestrator", query=query) | |
| yield AgentEvent( | |
| type="started", | |
| message=f"Starting research (Magentic mode): {query}", | |
| iteration=0, | |
| ) | |
| # Initialize context state | |
| embedding_service = self._init_embedding_service() | |
| init_magentic_state(embedding_service) | |
| workflow = self._build_workflow() | |
| task = f"""Research drug repurposing opportunities for: {query} | |
| Workflow: | |
| 1. SearchAgent: Find evidence from PubMed, ClinicalTrials.gov, and bioRxiv | |
| 2. HypothesisAgent: Generate mechanistic hypotheses (Drug -> Target -> Pathway -> Effect) | |
| 3. JudgeAgent: Evaluate if evidence is sufficient | |
| 4. If insufficient -> SearchAgent refines search based on gaps | |
| 5. If sufficient -> ReportAgent synthesizes final report | |
| Focus on: | |
| - Identifying specific molecular targets | |
| - Understanding mechanism of action | |
| - Finding clinical evidence supporting hypotheses | |
| The final output should be a structured research report.""" | |
| iteration = 0 | |
| try: | |
| async for event in workflow.run_stream(task): | |
| agent_event = self._process_event(event, iteration) | |
| if agent_event: | |
| if isinstance(event, MagenticAgentMessageEvent): | |
| iteration += 1 | |
| yield agent_event | |
| except Exception as e: | |
| logger.error("Magentic workflow failed", error=str(e)) | |
| yield AgentEvent( | |
| type="error", | |
| message=f"Workflow error: {e!s}", | |
| iteration=iteration, | |
| ) | |
| def _process_event(self, event: Any, iteration: int) -> AgentEvent | None: | |
| """Process workflow event into AgentEvent.""" | |
| if isinstance(event, MagenticOrchestratorMessageEvent): | |
| text = event.message.text if event.message else "" | |
| if text: | |
| return AgentEvent( | |
| type="judging", | |
| message=f"Manager ({event.kind}): {text[:200]}...", | |
| iteration=iteration, | |
| ) | |
| elif isinstance(event, MagenticAgentMessageEvent): | |
| agent_name = event.agent_id or "unknown" | |
| text = event.message.text if event.message else "" | |
| event_type = "judging" | |
| if "search" in agent_name.lower(): | |
| event_type = "search_complete" | |
| elif "judge" in agent_name.lower(): | |
| event_type = "judge_complete" | |
| elif "hypothes" in agent_name.lower(): | |
| event_type = "hypothesizing" | |
| elif "report" in agent_name.lower(): | |
| event_type = "synthesizing" | |
| return AgentEvent( | |
| type=event_type, # type: ignore[arg-type] | |
| message=f"{agent_name}: {text[:200]}...", | |
| iteration=iteration + 1, | |
| ) | |
| elif isinstance(event, MagenticFinalResultEvent): | |
| text = event.message.text if event.message else "No result" | |
| return AgentEvent( | |
| type="complete", | |
| message=text, | |
| data={"iterations": iteration}, | |
| iteration=iteration, | |
| ) | |
| elif isinstance(event, MagenticAgentDeltaEvent): | |
| if event.text: | |
| return AgentEvent( | |
| type="streaming", | |
| message=event.text, | |
| data={"agent_id": event.agent_id}, | |
| iteration=iteration, | |
| ) | |
| elif isinstance(event, WorkflowOutputEvent): | |
| if event.data: | |
| return AgentEvent( | |
| type="complete", | |
| message=str(event.data), | |
| iteration=iteration, | |
| ) | |
| return None | |