arkai2025's picture
feat(mcp): integrate local FastMCP client with logging and improve service management and add display cache
c7829ce
"""
Local FastMCP client used by the Gradio simulator service.
This module provides a lightweight in-process client that routes every request
through the registered FastMCP server instance so the application can rely on
actual MCP tool invocations (and capture request/response telemetry) instead
of calling helper functions directly.
"""
from __future__ import annotations
import json
import threading
import time
from copy import deepcopy
from datetime import datetime
from typing import Any, Callable, Sequence
import anyio
from mcp.types import ContentBlock
class LocalFastMCPClient:
"""Synchronous helper that forwards calls to a FastMCP server instance."""
def __init__(self, server, log_callback: Callable[[dict[str, Any]], None] | None = None):
self._server = server
self._log_callback = log_callback
self._lock = threading.Lock()
def list_tools(self) -> Any:
"""Expose server tool metadata (used for debugging/tests)."""
async def _list_tools():
return await self._server.list_tools()
with self._lock:
return anyio.run(_list_tools)
def call_tool(self, name: str, **arguments: Any) -> dict[str, Any]:
"""Invoke an MCP tool and return a normalized dict response."""
clean_args = {k: v for k, v in arguments.items() if v is not None}
start = time.perf_counter()
async def _call():
return await self._server.call_tool(name, clean_args)
with self._lock:
raw_result = anyio.run(_call)
normalized = self._normalize_result(raw_result)
self._log(name, clean_args, normalized, start)
return normalized
# --------------------------------------------------------------------- #
# Internal utilities
# --------------------------------------------------------------------- #
def _normalize_result(self, result: Any) -> dict[str, Any]:
"""Convert FastMCP responses into standard dicts for easier handling."""
if isinstance(result, dict):
return result
if isinstance(result, Sequence):
parsed = self._maybe_parse_json_from_blocks(result)
if parsed is not None:
return parsed
blocks: list[dict[str, Any]] = []
for block in result:
if isinstance(block, ContentBlock):
blocks.append(block.model_dump(mode="json"))
elif hasattr(block, "model_dump"):
blocks.append(block.model_dump(mode="json"))
else:
blocks.append({"type": "text", "text": str(block)})
return {"status": "ok", "content": blocks}
return {"status": "ok", "data": deepcopy(result)}
def _maybe_parse_json_from_blocks(self, blocks: Sequence[Any]) -> dict[str, Any] | None:
"""If the MCP server returned a single text block containing JSON, parse it."""
if not blocks:
return None
first = blocks[0]
text = None
if isinstance(first, ContentBlock) and getattr(first, "type", None) == "text":
text = first.model_dump().get("text")
elif hasattr(first, "text"):
text = getattr(first, "text")
elif isinstance(first, dict) and first.get("type") == "text":
text = first.get("text")
if text is None:
return None
stripped = text.strip()
if not stripped or stripped[0] not in "{[":
return None
try:
return json.loads(stripped)
except json.JSONDecodeError:
return None
def _log(self, name: str, arguments: dict[str, Any], result: dict[str, Any], start: float) -> None:
"""Send invocation metadata to the optional callback."""
if not self._log_callback:
return
duration_ms = round((time.perf_counter() - start) * 1000, 1)
entry = {
"timestamp": datetime.utcnow().isoformat(),
"tool": name,
"arguments": deepcopy(arguments),
"result": deepcopy(result),
"duration_ms": duration_ms,
}
self._log_callback(entry)