File size: 7,065 Bytes
1922dbd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15459e9
1922dbd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
"""Magentic-based orchestrator for DeepCritical."""

from collections.abc import AsyncGenerator

import structlog
from agent_framework import (
    MagenticAgentDeltaEvent,
    MagenticAgentMessageEvent,
    MagenticBuilder,
    MagenticFinalResultEvent,
    MagenticOrchestratorMessageEvent,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

from src.agents.judge_agent import JudgeAgent
from src.agents.search_agent import SearchAgent
from src.orchestrator import JudgeHandlerProtocol, SearchHandlerProtocol
from src.utils.config import settings
from src.utils.models import AgentEvent, Evidence

logger = structlog.get_logger()


class MagenticOrchestrator:
    """
    Magentic-based orchestrator - same API as Orchestrator.

    Uses Microsoft Agent Framework's MagenticBuilder for multi-agent coordination.
    """

    def __init__(
        self,
        search_handler: SearchHandlerProtocol,
        judge_handler: JudgeHandlerProtocol,
        max_rounds: int = 10,
    ) -> None:
        self._search_handler = search_handler
        self._judge_handler = judge_handler
        self._max_rounds = max_rounds
        self._evidence_store: dict[str, list[Evidence]] = {"current": []}

    async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]:
        """
        Run the Magentic workflow - same API as simple Orchestrator.

        Yields AgentEvent objects for real-time UI updates.
        """
        logger.info("Starting Magentic orchestrator", query=query)

        yield AgentEvent(
            type="started",
            message=f"Starting research (Magentic mode): {query}",
            iteration=0,
        )

        # Create agent wrappers
        search_agent = SearchAgent(self._search_handler, self._evidence_store)
        judge_agent = JudgeAgent(self._judge_handler, self._evidence_store)

        # Build Magentic workflow
        # Note: MagenticBuilder.participants takes named arguments for agent instances
        workflow = (
            MagenticBuilder()
            .participants(
                searcher=search_agent,
                judge=judge_agent,
            )
            .with_standard_manager(
                chat_client=OpenAIChatClient(
                    model_id=settings.openai_model, api_key=settings.openai_api_key
                ),
                max_round_count=self._max_rounds,
                max_stall_count=3,
                max_reset_count=2,
            )
            .build()
        )

        # Task instruction for the manager
        task = f"""Research drug repurposing opportunities for: {query}

Instructions:
1. Use SearcherAgent to find evidence. SEND ONLY A SIMPLE KEYWORD QUERY (e.g. "metformin aging")
   as the instruction. Complex queries fail.
2. Use JudgeAgent to evaluate if evidence is sufficient.
3. If JudgeAgent says "continue", search with refined queries.
4. If JudgeAgent says "synthesize", provide final synthesis
5. Stop when synthesis is ready or max rounds reached

Focus on finding:
- Mechanism of action evidence
- Clinical/preclinical studies
- Specific drug candidates
"""

        iteration = 0
        try:
            # workflow.run_stream returns an async generator of workflow events
            # We use 'await' in the for loop for async generator
            async for event in workflow.run_stream(task):
                if isinstance(event, MagenticOrchestratorMessageEvent):
                    # Manager events (planning, instruction, ledger)
                    # The 'message' attribute might be None if it's just a state change,
                    # check message presence
                    message_text = (
                        event.message.text
                        if event.message and hasattr(event.message, "text")
                        else ""
                    )
                    # kind might be 'plan', 'instruction', etc.
                    kind = getattr(event, "kind", "manager")

                    if message_text:
                        yield AgentEvent(
                            type="judging",
                            message=f"Manager ({kind}): {message_text[:100]}...",
                            iteration=iteration,
                        )

                elif isinstance(event, MagenticAgentMessageEvent):
                    # Complete agent response
                    iteration += 1
                    agent_name = event.agent_id or "unknown"
                    msg_text = (
                        event.message.text
                        if event.message and hasattr(event.message, "text")
                        else ""
                    )

                    if "search" in agent_name.lower():
                        # Check if we found evidence (based on SearchAgent logic)
                        yield AgentEvent(
                            type="search_complete",
                            message=f"Search agent: {msg_text[:100]}...",
                            iteration=iteration,
                        )
                    elif "judge" in agent_name.lower():
                        yield AgentEvent(
                            type="judge_complete",
                            message=f"Judge agent: {msg_text[:100]}...",
                            iteration=iteration,
                        )

                elif isinstance(event, MagenticFinalResultEvent):
                    # Final workflow result
                    final_text = (
                        event.message.text
                        if event.message and hasattr(event.message, "text")
                        else "No result"
                    )
                    yield AgentEvent(
                        type="complete",
                        message=final_text,
                        data={"iterations": iteration},
                        iteration=iteration,
                    )

                elif isinstance(event, MagenticAgentDeltaEvent):
                    # Streaming token chunks from agents (optional "typing" effect)
                    # Only emit if we have actual text content
                    if event.text:
                        yield AgentEvent(
                            type="streaming",
                            message=event.text,
                            data={"agent_id": event.agent_id},
                            iteration=iteration,
                        )

                elif isinstance(event, WorkflowOutputEvent):
                    # Alternative final output event
                    if event.data:
                        yield AgentEvent(
                            type="complete",
                            message=str(event.data),
                            iteration=iteration,
                        )

        except Exception as e:
            logger.error("Magentic workflow failed", error=str(e))
            yield AgentEvent(
                type="error",
                message=f"Workflow error: {e!s}",
                iteration=iteration,
            )