MedLLM-Agent / utils.py
Y Phung Nguyen
Run Gemini in thread to avoid timeout
1fc52ea
"""Utility functions for translation, language detection, and formatting"""
import asyncio
import re
from langdetect import detect_langs, LangDetectException
from logger import logger
from client import MCP_AVAILABLE, call_agent
from config import GEMINI_MODEL_LITE
try:
import nest_asyncio
except ImportError:
nest_asyncio = None
def format_prompt_manually(messages: list, tokenizer) -> str:
"""Manually format prompt for models without chat template"""
system_content = ""
user_content = ""
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
system_content = content
elif role == "user":
user_content = content
if system_content:
prompt = f"{system_content}\n\nQuestion: {user_content}\n\nAnswer:"
else:
prompt = f"Question: {user_content}\n\nAnswer:"
return prompt
MIN_TEXT_LENGTH_FOR_DETECTION = 12
LANG_CONFIDENCE_THRESHOLD = 0.8
ASCII_DOMINANCE_THRESHOLD = 0.97
ENGLISH_HINT_RATIO = 0.2
ENGLISH_HINT_WORDS = {
"the", "and", "with", "for", "you", "your", "have", "has", "that", "this",
"pain", "blood", "pressure", "please", "what", "how", "can", "should", "need"
}
def _ascii_ratio(text: str) -> float:
if not text:
return 1.0
ascii_chars = sum(1 for ch in text if ord(ch) < 128)
return ascii_chars / max(len(text), 1)
def _looks_english(text: str) -> bool:
words = re.findall(r"[A-Za-z']+", text.lower())
if not words:
return False
english_hits = sum(1 for word in words if word in ENGLISH_HINT_WORDS)
return english_hits / len(words) >= ENGLISH_HINT_RATIO
def detect_language(text: str) -> str:
"""Detect language of input text with basic confidence heuristics"""
if not text:
return "en"
sample = text.strip()
if not sample:
return "en"
ascii_ratio = _ascii_ratio(sample)
has_non_ascii = ascii_ratio < 1.0
if len(sample) < MIN_TEXT_LENGTH_FOR_DETECTION and not has_non_ascii:
return "en"
try:
detections = detect_langs(sample)
except LangDetectException:
return "en"
except Exception as exc:
logger.debug(f"[LANG-DETECT] Unexpected error, defaulting to English: {exc}")
return "en"
if not detections:
return "en"
top = detections[0]
lang_code = top.lang
confidence = getattr(top, "prob", 0.0)
if confidence < LANG_CONFIDENCE_THRESHOLD:
return "en"
if lang_code == "en":
return "en"
if not has_non_ascii and ascii_ratio >= ASCII_DOMINANCE_THRESHOLD and _looks_english(sample):
logger.info(f"[LANG-DETECT] Overrode {lang_code} due to English heuristics (ascii_ratio={ascii_ratio:.2f})")
return "en"
return lang_code
def format_url_as_domain(url: str) -> str:
"""Format URL as simple domain name (e.g., www.mayoclinic.org)"""
if not url:
return ""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
domain = parsed.netloc or parsed.path
if domain.startswith('www.'):
return domain
elif domain:
return domain
return url
except Exception:
if '://' in url:
domain = url.split('://')[1].split('/')[0]
return domain
return url
async def translate_text_gemini(text: str, target_lang: str = "en", source_lang: str = None) -> str:
"""Translate text using Gemini MCP"""
if source_lang:
user_prompt = f"Translate the following {source_lang} text to {target_lang}. Only provide the translation, no explanations:\n\n{text}"
else:
user_prompt = f"Translate the following text to {target_lang}. Only provide the translation, no explanations:\n\n{text}"
system_prompt = "You are a professional translator. Translate accurately and concisely."
result = await call_agent(
user_prompt=user_prompt,
system_prompt=system_prompt,
model=GEMINI_MODEL_LITE,
temperature=0.2
)
return result.strip()
def translate_text(text: str, target_lang: str = "en", source_lang: str = None) -> str:
"""Translate text using Gemini MCP"""
if not MCP_AVAILABLE:
logger.warning("Gemini MCP not available for translation")
return text
try:
loop = asyncio.get_event_loop()
if loop.is_running():
if nest_asyncio:
translated = nest_asyncio.run(translate_text_gemini(text, target_lang, source_lang))
if translated:
logger.info(f"Translated via Gemini MCP: {translated[:50]}...")
return translated
else:
logger.error("Error in nested async translation: nest_asyncio not available")
else:
translated = loop.run_until_complete(translate_text_gemini(text, target_lang, source_lang))
if translated:
logger.info(f"Translated via Gemini MCP: {translated[:50]}...")
return translated
except Exception as e:
logger.error(f"Gemini MCP translation error: {e}")
return text