CCD / app.py
X-iZhang's picture
Update app.py
145e85f verified
import os
# Force CPU-only in this process by hiding CUDA devices (set before importing heavy libs)
os.environ['CUDA_VISIBLE_DEVICES'] = ''
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
import torch
import gradio as gr
import time
# Force CPU device globally by overriding torch.cuda functions
torch.cuda.is_available = lambda: False
torch.cuda.device_count = lambda: 0
# Prevent any CUDA initialization attempts
def _dummy_lazy_init():
pass
torch.cuda._lazy_init = _dummy_lazy_init
torch.cuda.init = _dummy_lazy_init
# Override cuda() method to return self (stay on CPU)
def _cpu_only_cuda(self, *_args, **_kwargs):
# Instead of moving to CUDA, just return self (stay on CPU)
return self
torch.Tensor.cuda = _cpu_only_cuda
torch.nn.Module.cuda = _cpu_only_cuda
# =========================================
# Safe Libra Hook (CPU fallback + dtype fix)
# This hook must run before any heavyweight libra model-loading occurs.
# =========================================
import libra.model.builder as builder
import libra.eval.run_libra as run_libra
# δΏε­˜εŽŸε§‹ε‡½ζ•°οΌˆε¦‚ζžœε­˜εœ¨οΌ‰
_original_load_pretrained_model = getattr(builder, 'load_pretrained_model', None)
def safe_load_pretrained_model(model_path, model_base=None, model_name=None, **kwargs):
print("[INFO] Hook activated: safe_load_pretrained_model()")
# Complete model_name to avoid .lower() on None
if model_name is None:
model_name = model_path
# Force CPU device only (remove conflicting parameters)
kwargs = dict(kwargs)
kwargs['device'] = 'cpu'
# Remove any parameters that might conflict
kwargs.pop('device_map', None)
kwargs.pop('low_cpu_mem_usage', None)
kwargs.pop('torch_dtype', None)
if _original_load_pretrained_model is None:
raise RuntimeError('Original load_pretrained_model not found in builder')
# Call original function with CPU device
print(f"[INFO] Loading model with kwargs: {kwargs}")
tokenizer, model, image_processor, context_len = _original_load_pretrained_model(
model_path, model_base, model_name, **kwargs
)
# Fix tokenizer pad_token_id if it's None (common issue with Llama 3 models)
if tokenizer.pad_token_id is None:
if tokenizer.eos_token_id is not None:
# tokenizer.pad_token_id = tokenizer.eos_token_id
tokenizer.pad_token_id = 128001
print(f'[INFO] Set pad_token_id to eos_token_id: {tokenizer.pad_token_id}')
else:
tokenizer.pad_token_id = 0
print('[INFO] Set pad_token_id to 0 (default)')
# # Also ensure pad_token is set (Llama 3 specific) - CRITICAL for output
# if tokenizer.pad_token is None:
# if tokenizer.eos_token is not None:
# tokenizer.pad_token_id = tokenizer.eos_token
# print(f'[INFO] Set pad_token to eos_token: {tokenizer.pad_token_id}')
# else:
# tokenizer.add_special_tokens({'pad_token': '[PAD]'})
# print('[INFO] Added [PAD] token to tokenizer')
# Set padding_side to left for Llama 3 (prevents empty generation)
if hasattr(tokenizer, 'padding_side'):
tokenizer.padding_side = "left"
print('[INFO] Set tokenizer.padding_side to "left" for proper generation')
# Force all model components to CPU (keep original dtype if possible, fallback to float32)
print('[INFO] Ensuring all components are on CPU...')
try:
# Only convert to float32 if model is in float16 (which is slow on CPU)
current_dtype = next(model.parameters()).dtype
if current_dtype == torch.float16 or current_dtype == torch.bfloat16:
print(f'[INFO] Converting model from {current_dtype} to float32 for CPU compatibility...')
model = model.to(device='cpu', dtype=torch.float32)
else:
print(f'[INFO] Keeping model dtype as {current_dtype} (already CPU-compatible)')
model = model.to(device='cpu')
print('[INFO] Model moved to CPU.')
except Exception as e:
print(f"[WARN] Could not move model to CPU: {e}")
try:
if hasattr(model, 'get_vision_tower'):
vt = model.get_vision_tower()
if vt is not None:
vt_dtype = next(vt.parameters()).dtype
if vt_dtype == torch.float16 or vt_dtype == torch.bfloat16:
vt = vt.to(device='cpu', dtype=torch.float32)
print(f'[INFO] Vision tower converted to float32 for CPU.')
else:
vt = vt.to(device='cpu')
print(f'[INFO] Vision tower moved to CPU (keeping {vt_dtype}).')
except Exception as e:
print(f"[WARN] Could not move vision_tower to CPU: {e}")
try:
if hasattr(model, 'get_model'):
inner_model = model.get_model()
if inner_model is not None:
inner_dtype = next(inner_model.parameters()).dtype
if inner_dtype == torch.float16 or inner_dtype == torch.bfloat16:
inner_model = inner_model.to(device='cpu', dtype=torch.float32)
print(f'[INFO] Inner model converted to float32 for CPU.')
else:
inner_model = inner_model.to(device='cpu')
print(f'[INFO] Inner model moved to CPU (keeping {inner_dtype}).')
except Exception as e:
print(f"[WARN] Could not move inner model to CPU: {e}")
return tokenizer, model, image_processor, context_len
if _original_load_pretrained_model is not None:
builder.load_pretrained_model = safe_load_pretrained_model
# εŒζ—Άζ›Ώζ’ run_libra.load_model
def safe_load_model(model_path, model_base=None, model_name=None):
print('[INFO] Hook activated: safe_load_model()')
if model_name is None:
model_name = model_path
return safe_load_pretrained_model(model_path, model_base, model_name)
run_libra.load_model = safe_load_model
def get_image_tensors_batch_cpu(images, image_processor, model, device='cpu'):
"""
CPU-only version of get_image_tensors_batch.
Keeps the same structure and behaviour as the original function.
"""
import torch
from libra.mm_utils import process_images
batch_size = len(images)
all_processed = []
if isinstance(images, str):
images = [images]
elif not isinstance(images, (list, tuple)):
raise TypeError("images must be a string or a list/tuple of strings")
for i in range(batch_size):
images_i = [images[i]]
# Ensure two images are present
if len(images_i) != 2:
images_i.append(images_i[0])
if hasattr(model, "config") and getattr(model.config, "mm_projector_type", None) == "TAC":
print("Contains only current image. Adding a dummy prior image for TAC.")
processed_images = []
for img_data in images_i:
image_tensor = process_images([img_data], image_processor, model.config)[0]
# βœ… Force to CPU instead of CUDA
image_tensor = image_tensor.to(device=device, non_blocking=False)
processed_images.append(image_tensor)
cur_images = processed_images[0]
prior_images = processed_images[1]
batch_images = torch.stack([cur_images, prior_images])
all_processed.append(batch_images)
# ⚠️ Keep the original structure (dim=1)
batch_images = torch.stack(all_processed, dim=1)
return batch_images
# IMPORTANT: Patch libra functions BEFORE importing CCD
# (because ccd imports these functions during module load)
import libra.eval.run_libra as run_libra_module
# Replace the function in the module BEFORE ccd imports it
run_libra_module.get_image_tensors_batch = get_image_tensors_batch_cpu
# print('[INFO] Replaced get_image_tensors_batch with CPU-only version')
# Now import CCD and hook ccd_utils to force CPU for expert models
import ccd.ccd_utils as ccd_utils_module
ccd_utils_module._DEVICE = torch.device('cpu')
print('[INFO] Forced ccd_utils._DEVICE to CPU')
# Now import CCD module and patch its imported function
import ccd.run_ccd as ccd_run_ccd_module
# Replace the function that ccd.run_ccd imported
ccd_run_ccd_module.get_image_tensors_batch = get_image_tensors_batch_cpu
# print('[INFO] Patched ccd.run_ccd.get_image_tensors_batch')
# Now import the evaluation functions
from ccd import ccd_eval as _original_ccd_eval, run_eval
from libra.eval.run_libra import load_model
# Wrap ccd_eval to ensure all tensors stay on CPU
def ccd_eval_cpu_wrapper(*args, **kwargs):
"""Wrapper to ensure ccd_eval runs on CPU only"""
import warnings
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
result = _original_ccd_eval(*args, **kwargs)
return result
# Replace with wrapped version
ccd_eval = ccd_eval_cpu_wrapper
# =========================================
# Global Configuration
# =========================================
MODEL_CATALOGUE = {
"MAIRA-2": "X-iZhang/libra-maira-2",
"Libra-v1.0-3B (⚑Recommended for CPU)": "X-iZhang/libra-v1.0-3b",
"Libra-v1.0-7B": "X-iZhang/libra-v1.0-7b",
"LLaVA-Med-v1.5": "X-iZhang/libra-llava-med-v1.5-mistral-7b",
"LLaVA-Rad": "X-iZhang/libra-llava-rad",
"Med-CXRGen-F": "X-iZhang/Med-CXRGen-F",
"Med-CXRGen-I": "X-iZhang/Med-CXRGen-I"
}
DEFAULT_MODEL_NAME = "Libra-v1.0-3B (⚑Recommended for CPU)"
_loaded_models = {}
# =========================================
# Environment Setup
# =========================================
def setup_environment():
print("πŸ”Ή Running in CPU-only mode (forced for Hugging Face Spaces)")
os.environ['TOKENIZERS_PARALLELISM'] = 'false'
os.environ['TRANSFORMERS_CACHE'] = './cache'
# Set number of threads for CPU inference
num_threads = min(os.cpu_count() or 4, 8)
torch.set_num_threads(num_threads)
print(f"πŸ”Ή Using {num_threads} CPU threads")
# =========================================
# Model Loader
# =========================================
def load_or_get_model(model_name: str):
"""Load the model based on its display name."""
model_path = MODEL_CATALOGUE[model_name]
print(f"πŸ”Ή Model path resolved: {model_path}")
if model_path in _loaded_models:
print(f"πŸ”Ή Model already loaded: {model_name}")
return _loaded_models[model_path]
print(f"πŸ”Ή Loading model: {model_name} ({model_path}) ...")
print(f"πŸ”Ή This may take 2-5 minutes on CPU, please wait...")
try:
# Clear cache before loading to maximize available memory
import gc
gc.collect()
if hasattr(torch.cuda, 'empty_cache'):
torch.cuda.empty_cache()
with torch.no_grad():
model = load_model(model_path)
_loaded_models[model_path] = model
print(f"βœ… Loaded successfully: {model_name}")
# Clean up after loading
gc.collect()
return model
except Exception as e:
print(f"❌ Error loading model {model_name}: {e}")
import traceback
traceback.print_exc()
raise
# =========================================
# CCD Logic
# =========================================
def generate_ccd_description(
selected_model_name,
current_img,
prompt,
expert_model,
alpha,
beta,
gamma,
use_run_eval,
max_new_tokens,
progress=gr.Progress()
):
"""Generate findings using CCD evaluation."""
if not current_img:
return "⚠️ Please upload or select an example image first."
try:
progress(0, desc="Starting inference...")
print(f"πŸ”Ή Generating description with model: {selected_model_name}")
print(f"πŸ”Ή Parameters: alpha={alpha}, beta={beta}, gamma={gamma}")
print(f"πŸ”Ή Image path: {current_img}")
progress(0.1, desc="Loading model (this may take several minutes on CPU)...")
model = load_or_get_model(selected_model_name)
progress(0.3, desc="Running CCD inference (this may take 5-10 minutes on CPU)...")
print(f"πŸ”Ή Running CCD with {selected_model_name} and expert model {expert_model}...")
# Debug: Print tokenizer info
tokenizer = model[0]
print(f"[DEBUG] Tokenizer pad_token: {tokenizer.pad_token}")
print(f"[DEBUG] Tokenizer pad_token_id: {tokenizer.pad_token_id}")
print(f"[DEBUG] Tokenizer eos_token: {tokenizer.eos_token}")
print(f"[DEBUG] Tokenizer eos_token_id: {tokenizer.eos_token_id}")
print(f"[DEBUG] Tokenizer padding_side: {getattr(tokenizer, 'padding_side', 'NOT SET')}")
print(f"[DEBUG] Input image path: {current_img}")
print(f"[DEBUG] max_new_tokens: {max_new_tokens}")
prompt = "You are a helpful AI Assistant. " + prompt
print(f"[DEBUG] Input prompt: {prompt}")
ccd_output = ccd_eval(
libra_model=model,
image=current_img,
question=prompt,
max_new_tokens=max_new_tokens,
expert_model=expert_model,
alpha=alpha,
beta=beta,
gamma=gamma
)
print(f"[DEBUG] CCD output type: {type(ccd_output)}")
print(f"[DEBUG] CCD output length: {len(ccd_output) if ccd_output else 0}")
print(f"[DEBUG] CCD output content: '{ccd_output}'")
progress(0.8, desc="Processing results...")
if use_run_eval:
progress(0.85, desc="Running baseline comparison...")
baseline_output = run_eval(
libra_model=model,
image=current_img,
question=prompt,
max_new_tokens=max_new_tokens,
num_beams=1
)
progress(1.0, desc="Complete!")
return (
f"### 🩺 CCD Result ({expert_model})\n{ccd_output}\n\n"
f"---\n### βš–οΈ Baseline (run_eval)\n{baseline_output[0]}"
)
progress(1.0, desc="Complete!")
return f"### 🩺 CCD Result ({expert_model})\n{ccd_output}"
except Exception:
import traceback, sys
error_msg = traceback.format_exc()
print("========== CCD ERROR LOG ==========", file=sys.stderr)
print(error_msg, file=sys.stderr)
print("===================================", file=sys.stderr)
return f"❌ Exception Trace:\n```\n{error_msg}\n```"
def safe_generate_ccd_description(
selected_model_name,
current_img,
prompt,
expert_model,
alpha,
beta,
gamma,
use_run_eval,
max_new_tokens
):
"""Wrapper around generate_ccd_description that logs inputs and prints full traceback on error."""
import traceback, sys, time
print("\n=== Gradio callback invoked ===")
print(f"timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"selected_model_name={selected_model_name}")
print(f"current_img={current_img}")
print(f"prompt={prompt}")
print(f"expert_model={expert_model}, alpha={alpha}, beta={beta}, gamma={gamma}, use_run_eval={use_run_eval}, max_new_tokens={max_new_tokens}")
try:
return generate_ccd_description(
selected_model_name,
current_img,
prompt,
expert_model,
alpha,
beta,
gamma,
use_run_eval,
max_new_tokens
)
except Exception as e:
err = traceback.format_exc()
print("========== GRADIO CALLBACK ERROR ==========", file=sys.stderr)
print(err, file=sys.stderr)
print("==========================================", file=sys.stderr)
# Also write the error and inputs to a persistent log file for easier inspection
try:
with open('/workspace/CCD/callback.log', 'a', encoding='utf-8') as f:
f.write('\n=== CALLBACK LOG ENTRY ===\n')
f.write(f"timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"selected_model_name={selected_model_name}\n")
f.write(f"current_img={current_img}\n")
f.write(f"prompt={prompt}\n")
f.write(f"expert_model={expert_model}, alpha={alpha}, beta={beta}, gamma={gamma}, use_run_eval={use_run_eval}, max_new_tokens={max_new_tokens}\n")
f.write('TRACEBACK:\n')
f.write(err + '\n')
f.write('=== END ENTRY ===\n')
except Exception as fe:
print(f"Failed to write callback.log: {fe}", file=sys.stderr)
# Also return a user-friendly error message to the UI with traceback
return f"❌ An internal error occurred. See server logs for details.\n\nTraceback:\n```\n{err}\n```"
# =========================================
# Main Application
# =========================================
def main():
setup_environment()
# Example Image Path
cur_dir = os.path.abspath(os.path.dirname(__file__))
example_path = os.path.abspath(os.path.join(cur_dir, "example.jpg"))
example_exists = os.path.exists(example_path)
# Model reference table
model_table = """
| **Model Name** | **HuggingFace Link** |
|----------------|----------------------|
| **Libra-v1.0-7B** | [X-iZhang/libra-v1.0-7b](https://huggingface.co/X-iZhang/libra-v1.0-7b) |
| **Libra-v1.0-3B** | [X-iZhang/libra-v1.0-3b](https://huggingface.co/X-iZhang/libra-v1.0-3b) |
| **MAIRA-2** | [X-iZhang/libra-maira-2](https://huggingface.co/X-iZhang/libra-maira-2) |
| **LLaVA-Med-v1.5** | [X-iZhang/libra-llava-med-v1.5-mistral-7b](https://huggingface.co/X-iZhang/libra-llava-med-v1.5-mistral-7b) |
| **LLaVA-Rad** | [X-iZhang/libra-llava-rad](https://huggingface.co/X-iZhang/libra-llava-rad) |
| **Med-CXRGen-F** | [X-iZhang/Med-CXRGen-F](https://huggingface.co/X-iZhang/Med-CXRGen-F) |
| **Med-CXRGen-I** | [X-iZhang/Med-CXRGen-I](https://huggingface.co/X-iZhang/Med-CXRGen-I) |
"""
with gr.Blocks(title="πŸ“· Clinical Contrastive Decoding", theme="soft") as demo:
gr.Markdown("""
# πŸ“· CCD: Mitigating Hallucinations in Radiology MLLMs via Clinical Contrastive Decoding
### [Project Page](https://x-izhang.github.io/CCD/) | [Paper](https://arxiv.org/abs/2509.23379) | [Code](https://github.com/X-iZhang/CCD) | [Models](https://huggingface.co/collections/X-iZhang/libra-6772bfccc6079298a0fa5f8d)
**🚨 Performance Warning**
This demo is running on **CPU-only** mode. A single inference may take **25-30 minutes** depending on the model and parameters.
**Recommendations for faster inference:**
- Use smaller models (Libra-v1.0-3B is faster than 7B models) **The model has already been loaded** ⏬
- Please do not attempt to load other models, as this may cause a **runtime error**: "Workload evicted, storage limit exceeded (50G)"
- Reduce `Max New Tokens` to 64-128 (default: 128)
- Disable baseline comparison
- For GPU acceleration, please [run the demo locally](https://github.com/X-iZhang/CCD#gradio-web-interface)
**Note:** If you see "Connection Lost", please wait - the inference is still running. The results will appear when complete.
""")
with gr.Tab("✨ CCD Demo"):
with gr.Row():
# -------- Left Column: Image --------
with gr.Column(scale=1):
gr.Markdown("### Radiology Image (eg. Chest X-ray)")
current_img = gr.Image(label="Radiology Image", type="filepath", interactive=True)
if example_exists:
gr.Examples(
examples=[[example_path]],
inputs=[current_img],
label="Example Image"
)
else:
gr.Markdown(f"⚠️ Example image not found at `{example_path}`")
# -------- Right Column: Controls --------
with gr.Column(scale=1):
gr.Markdown("### Model Selection & Prompt")
selected_model_name = gr.Dropdown(
label="Base Radiology MLLM",
choices=list(MODEL_CATALOGUE.keys()),
value=DEFAULT_MODEL_NAME
)
prompt = gr.Textbox(
label="Question / Prompt",
value="What are the findings in this chest X-ray? Give a detailed description.",
lines=1
)
gr.Markdown("### CCD Parameters")
expert_model = gr.Radio(
label="Expert Model",
choices=["MedSigLip", "DenseNet"],
value="DenseNet"
)
# Notice for MedSigLip access requirements (hidden by default)
medsiglip_message = (
"**Note: The MedSigLip model requires authorization to access.**\n\n"
"To use MedSigLip, please deploy the Gradio Web Interface locally and complete the authentication steps.\n"
"See deployment instructions and how to run locally here: "
"[Gradio Web Interface](https://github.com/X-iZhang/CCD#gradio-web-interface)"
)
medsiglip_notice = gr.Markdown(value="", visible=False)
def _toggle_medsiglip_notice(choice):
if choice == "MedSigLip":
return gr.update(visible=True, value=medsiglip_message)
else:
return gr.update(visible=False, value="")
# Connect radio change to the notice visibility
expert_model.change(fn=_toggle_medsiglip_notice, inputs=[expert_model], outputs=[medsiglip_notice])
with gr.Row():
alpha = gr.Slider(0.0, 1.0, value=0.5, step=0.1, label="Alpha")
beta = gr.Slider(0.0, 1.0, value=0.5, step=0.1, label="Beta")
gamma = gr.Slider(0, 20, value=10, step=1, label="Gamma")
with gr.Accordion("Advanced Options", open=False):
max_new_tokens = gr.Slider(10, 256, value=64, step=1, label="Max New Tokens (lower = faster)")
use_run_eval = gr.Checkbox(label="Compare with baseline (run_eval) [doubles inference time]", value=False)
generate_btn = gr.Button("πŸš€ Generate", variant="primary")
# -------- Output --------
# output = gr.Markdown(label="Output", value="### πŸ“· Results will appear here.πŸ‘‡")
output = gr.Markdown(
value='<h3 style="color:#007BFF;">πŸ“· Results will appear here.πŸ‘‡</h3>',
label="Output"
)
# Switch callback to the safe wrapper
generate_btn.click(
fn=safe_generate_ccd_description,
inputs=[
selected_model_name, current_img, prompt,
expert_model, alpha, beta, gamma,
use_run_eval, max_new_tokens
],
outputs=output
)
# -------- Model Table --------
# gr.Markdown("### 🧠 Supported Models")
# gr.Markdown(model_table)
gr.Markdown("""
### Terms of Use
The service is a research preview intended for non-commercial use only, subject to the model [License](https://github.com/facebookresearch/llama/blob/main/MODEL_CARD.md) of LLaMA.
By accessing or using this demo, you acknowledge and agree to the following:
- **Research & Non-Commercial Purposes**: This demo is provided solely for research and demonstration. It must not be used for commercial activities or profit-driven endeavors.
- **Not Medical Advice**: All generated content is experimental and must not replace professional medical judgment.
- **Content Moderationt**: While we apply basic safety checks, the system may still produce inaccurate or offensive outputs.
- **Responsible Use**: Do not use this demo for any illegal, harmful, hateful, violent, or sexual purposes.
By continuing to use this service, you confirm your acceptance of these terms. If you do not agree, please discontinue use immediately.
""")
# Log that Gradio is starting (helpful when stdout/stderr are captured)
# write startup log to local file in repository (avoid permission issues on Spaces)
try:
os.makedirs('logs', exist_ok=True)
with open('logs/callback.log', 'a', encoding='utf-8') as f:
f.write(f"\n=== GRADIO START ===\nstarted_at: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n")
except Exception:
pass
# Launch with extended timeout for CPU inference
demo.queue(max_size=10) # Enable queue for better handling of long-running tasks
demo.launch(
max_threads=5, # Limit concurrent requests
show_error=True # Show detailed errors
)
if __name__ == "__main__":
main()