Spaces:
Running
on
Zero
Running
on
Zero
| """Web search functions""" | |
| import json | |
| import asyncio | |
| import concurrent.futures | |
| from logger import logger | |
| from client import MCP_AVAILABLE, get_mcp_session, get_cached_mcp_tools, call_agent | |
| from config import GEMINI_MODEL | |
| try: | |
| import nest_asyncio | |
| except ImportError: | |
| nest_asyncio = None | |
| from supervisor import MAX_SEARCH_STRATEGIES | |
| async def search_web_mcp_tool(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list: | |
| """Search web using MCP web search tool (e.g., DuckDuckGo MCP server)""" | |
| if not MCP_AVAILABLE: | |
| return [] | |
| try: | |
| tools = await get_cached_mcp_tools() | |
| if not tools: | |
| return [] | |
| search_tool = None | |
| for tool in tools: | |
| tool_name_lower = tool.name.lower() | |
| if any(keyword in tool_name_lower for keyword in ["search", "duckduckgo", "ddg", "web"]): | |
| search_tool = tool | |
| logger.info(f"Found web search MCP tool: {tool.name}") | |
| break | |
| if not search_tool: | |
| tools = await get_cached_mcp_tools(force_refresh=True) | |
| for tool in tools: | |
| tool_name_lower = tool.name.lower() | |
| if any(keyword in tool_name_lower for keyword in ["search", "duckduckgo", "ddg", "web"]): | |
| search_tool = tool | |
| logger.info(f"Found web search MCP tool after refresh: {tool.name}") | |
| break | |
| if search_tool: | |
| try: | |
| session = await get_mcp_session() | |
| if session is None: | |
| return [] | |
| result = await session.call_tool( | |
| search_tool.name, | |
| arguments={"query": query, "max_results": max_results} | |
| ) | |
| web_content = [] | |
| if hasattr(result, 'content') and result.content: | |
| for item in result.content: | |
| if hasattr(item, 'text'): | |
| try: | |
| data = json.loads(item.text) | |
| if isinstance(data, list): | |
| for entry in data[:max_results]: | |
| web_content.append({ | |
| 'title': entry.get('title', ''), | |
| 'url': entry.get('url', entry.get('href', '')), | |
| 'content': entry.get('body', entry.get('snippet', entry.get('content', ''))) | |
| }) | |
| elif isinstance(data, dict): | |
| if 'results' in data: | |
| for entry in data['results'][:max_results]: | |
| web_content.append({ | |
| 'title': entry.get('title', ''), | |
| 'url': entry.get('url', entry.get('href', '')), | |
| 'content': entry.get('body', entry.get('snippet', entry.get('content', ''))) | |
| }) | |
| else: | |
| web_content.append({ | |
| 'title': data.get('title', ''), | |
| 'url': data.get('url', data.get('href', '')), | |
| 'content': data.get('body', data.get('snippet', data.get('content', ''))) | |
| }) | |
| except json.JSONDecodeError: | |
| web_content.append({ | |
| 'title': '', | |
| 'url': '', | |
| 'content': item.text[:1000] | |
| }) | |
| if web_content: | |
| return web_content | |
| except Exception as e: | |
| logger.error(f"Error calling web search MCP tool: {e}") | |
| else: | |
| logger.debug("No MCP web search tool discovered in current catalog") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Web search MCP tool error: {e}") | |
| return [] | |
| async def search_web_mcp(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list: | |
| """Search web using MCP tools - tries web search MCP tool first, then falls back to direct search""" | |
| results = await search_web_mcp_tool(query, max_results) | |
| if results: | |
| logger.info(f"✅ Web search via MCP tool: found {len(results)} results") | |
| return results | |
| logger.info("ℹ️ [Direct API] No web search MCP tool found, using direct DuckDuckGo search (results will be summarized with Gemini MCP)") | |
| return search_web_fallback(query, max_results) | |
| def search_web_fallback(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list: | |
| """Fallback web search using DuckDuckGo directly (when MCP is not available)""" | |
| logger.info(f"🔍 [Direct API] Performing web search using DuckDuckGo API for: {query[:100]}...") | |
| try: | |
| from ddgs import DDGS | |
| import requests | |
| from bs4 import BeautifulSoup | |
| except ImportError: | |
| logger.error("Fallback dependencies (ddgs, requests, beautifulsoup4) not available") | |
| return [] | |
| try: | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=max_results)) | |
| web_content = [] | |
| for result in results: | |
| try: | |
| url = result.get('href', '') | |
| title = result.get('title', '') | |
| snippet = result.get('body', '') | |
| try: | |
| response = requests.get(url, timeout=5, headers={'User-Agent': 'Mozilla/5.0'}) | |
| if response.status_code == 200: | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| text = soup.get_text() | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = ' '.join(chunk for chunk in chunks if chunk) | |
| if len(text) > 1000: | |
| text = text[:1000] + "..." | |
| web_content.append({ | |
| 'title': title, | |
| 'url': url, | |
| 'content': snippet + "\n" + text[:500] if text else snippet | |
| }) | |
| else: | |
| web_content.append({ | |
| 'title': title, | |
| 'url': url, | |
| 'content': snippet | |
| }) | |
| except: | |
| web_content.append({ | |
| 'title': title, | |
| 'url': url, | |
| 'content': snippet | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error processing search result: {e}") | |
| continue | |
| logger.info(f"✅ [Direct API] Web search completed: {len(web_content)} results") | |
| return web_content | |
| except Exception as e: | |
| logger.error(f"❌ [Direct API] Web search error: {e}") | |
| return [] | |
| def search_web(query: str, max_results: int = MAX_SEARCH_STRATEGIES) -> list: | |
| """Search web using MCP tools (synchronous wrapper) - prioritizes MCP over direct ddgs""" | |
| if MCP_AVAILABLE: | |
| try: | |
| try: | |
| loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| if loop.is_running(): | |
| if nest_asyncio: | |
| results = nest_asyncio.run(search_web_mcp(query, max_results)) | |
| if results: | |
| return results | |
| else: | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future = executor.submit(asyncio.run, search_web_mcp(query, max_results)) | |
| results = future.result(timeout=30) | |
| if results: | |
| return results | |
| else: | |
| results = loop.run_until_complete(search_web_mcp(query, max_results)) | |
| if results: | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error running async MCP search: {e}") | |
| logger.info("ℹ️ [Direct API] Falling back to direct DuckDuckGo search (MCP unavailable or returned no results)") | |
| return search_web_fallback(query, max_results) | |
| async def summarize_web_content_gemini(content_list: list, query: str) -> str: | |
| """Summarize web search results using Gemini MCP""" | |
| combined_content = "\n\n".join([f"Source: {item['title']}\n{item['content']}" for item in content_list[:3]]) | |
| user_prompt = f"""Summarize the following web search results related to the query: "{query}" | |
| Extract key medical information, facts, and insights. Be concise and focus on reliable information. | |
| Search Results: | |
| {combined_content} | |
| Summary:""" | |
| system_prompt = "You are a medical information summarizer. Extract and summarize key medical facts accurately. Provide the summary directly without conversational prefixes like 'Here is...', 'This is...', or 'To summarize...'. Start with the actual content immediately." | |
| result = await call_agent( | |
| user_prompt=user_prompt, | |
| system_prompt=system_prompt, | |
| model=GEMINI_MODEL, | |
| temperature=0.5 | |
| ) | |
| return result.strip() | |
| def summarize_web_content(content_list: list, query: str) -> str: | |
| """Summarize web search results using Gemini MCP""" | |
| if not MCP_AVAILABLE: | |
| logger.warning("Gemini MCP not available for summarization") | |
| if content_list: | |
| return content_list[0].get('content', '')[:500] | |
| return "" | |
| try: | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| if nest_asyncio: | |
| summary = nest_asyncio.run(summarize_web_content_gemini(content_list, query)) | |
| if summary: | |
| return summary | |
| else: | |
| logger.error("Error in nested async summarization: nest_asyncio not available") | |
| else: | |
| summary = loop.run_until_complete(summarize_web_content_gemini(content_list, query)) | |
| if summary: | |
| return summary | |
| except Exception as e: | |
| logger.error(f"Gemini MCP summarization error: {e}") | |
| if content_list: | |
| return content_list[0].get('content', '')[:500] | |
| return "" | |