|
|
""" |
|
|
Local FastMCP client used by the Gradio simulator service. |
|
|
|
|
|
This module provides a lightweight in-process client that routes every request |
|
|
through the registered FastMCP server instance so the application can rely on |
|
|
actual MCP tool invocations (and capture request/response telemetry) instead |
|
|
of calling helper functions directly. |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import json |
|
|
import threading |
|
|
import time |
|
|
from copy import deepcopy |
|
|
from datetime import datetime |
|
|
from typing import Any, Callable, Sequence |
|
|
|
|
|
import anyio |
|
|
|
|
|
from mcp.types import ContentBlock |
|
|
|
|
|
|
|
|
class LocalFastMCPClient: |
|
|
"""Synchronous helper that forwards calls to a FastMCP server instance.""" |
|
|
|
|
|
def __init__(self, server, log_callback: Callable[[dict[str, Any]], None] | None = None): |
|
|
self._server = server |
|
|
self._log_callback = log_callback |
|
|
self._lock = threading.Lock() |
|
|
|
|
|
def list_tools(self) -> Any: |
|
|
"""Expose server tool metadata (used for debugging/tests).""" |
|
|
|
|
|
async def _list_tools(): |
|
|
return await self._server.list_tools() |
|
|
|
|
|
with self._lock: |
|
|
return anyio.run(_list_tools) |
|
|
|
|
|
def call_tool(self, name: str, **arguments: Any) -> dict[str, Any]: |
|
|
"""Invoke an MCP tool and return a normalized dict response.""" |
|
|
clean_args = {k: v for k, v in arguments.items() if v is not None} |
|
|
start = time.perf_counter() |
|
|
|
|
|
async def _call(): |
|
|
return await self._server.call_tool(name, clean_args) |
|
|
|
|
|
with self._lock: |
|
|
raw_result = anyio.run(_call) |
|
|
|
|
|
normalized = self._normalize_result(raw_result) |
|
|
self._log(name, clean_args, normalized, start) |
|
|
return normalized |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _normalize_result(self, result: Any) -> dict[str, Any]: |
|
|
"""Convert FastMCP responses into standard dicts for easier handling.""" |
|
|
if isinstance(result, dict): |
|
|
return result |
|
|
if isinstance(result, Sequence): |
|
|
parsed = self._maybe_parse_json_from_blocks(result) |
|
|
if parsed is not None: |
|
|
return parsed |
|
|
blocks: list[dict[str, Any]] = [] |
|
|
for block in result: |
|
|
if isinstance(block, ContentBlock): |
|
|
blocks.append(block.model_dump(mode="json")) |
|
|
elif hasattr(block, "model_dump"): |
|
|
blocks.append(block.model_dump(mode="json")) |
|
|
else: |
|
|
blocks.append({"type": "text", "text": str(block)}) |
|
|
return {"status": "ok", "content": blocks} |
|
|
return {"status": "ok", "data": deepcopy(result)} |
|
|
|
|
|
def _maybe_parse_json_from_blocks(self, blocks: Sequence[Any]) -> dict[str, Any] | None: |
|
|
"""If the MCP server returned a single text block containing JSON, parse it.""" |
|
|
if not blocks: |
|
|
return None |
|
|
first = blocks[0] |
|
|
text = None |
|
|
if isinstance(first, ContentBlock) and getattr(first, "type", None) == "text": |
|
|
text = first.model_dump().get("text") |
|
|
elif hasattr(first, "text"): |
|
|
text = getattr(first, "text") |
|
|
elif isinstance(first, dict) and first.get("type") == "text": |
|
|
text = first.get("text") |
|
|
if text is None: |
|
|
return None |
|
|
stripped = text.strip() |
|
|
if not stripped or stripped[0] not in "{[": |
|
|
return None |
|
|
try: |
|
|
return json.loads(stripped) |
|
|
except json.JSONDecodeError: |
|
|
return None |
|
|
|
|
|
def _log(self, name: str, arguments: dict[str, Any], result: dict[str, Any], start: float) -> None: |
|
|
"""Send invocation metadata to the optional callback.""" |
|
|
if not self._log_callback: |
|
|
return |
|
|
duration_ms = round((time.perf_counter() - start) * 1000, 1) |
|
|
entry = { |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"tool": name, |
|
|
"arguments": deepcopy(arguments), |
|
|
"result": deepcopy(result), |
|
|
"duration_ms": duration_ms, |
|
|
} |
|
|
self._log_callback(entry) |
|
|
|
|
|
|
|
|
|