awesome-depth-anything-3 / benchmarks /gpu_preprocessing_benchmark.py
Delanoe Pirard
Deploy to HuggingFace Spaces
18b382b
#!/usr/bin/env python3
# Copyright (c) 2025 Delanoe Pirard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
GPU Preprocessing Benchmark
Compares CPU vs GPU preprocessing performance across different image sizes.
Measures:
- Preprocessing time only
- Total inference time (preprocessing + model forward)
- Memory usage
- Speedup percentages
"""
import time
from typing import List, Tuple
import numpy as np
import torch
from PIL import Image
from depth_anything_3.utils.io.input_processor import InputProcessor
from depth_anything_3.utils.io.gpu_input_processor import GPUInputProcessor
import os
import shutil
def create_test_files(sizes: List[Tuple[int, int]], count: int = 4, temp_dir: str = "temp_bench_imgs") -> Tuple[List[List[str]], str]:
"""Create test image files on disk.
Args:
sizes: List of (width, height) tuples
count: Number of images per size
temp_dir: Directory to save images
Returns:
List of image path batches, one per size
Path to temp directory
"""
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
os.makedirs(temp_dir)
batches = []
for w, h, _ in sizes:
batch = []
for i in range(count):
img = Image.new("RGB", (w, h), color=(i * 50, 100, 150))
fname = f"{temp_dir}/{w}x{h}_{i}.jpg"
img.save(fname, quality=95, subsampling=0)
batch.append(fname)
batches.append(batch)
return batches, temp_dir
def benchmark_gpu_decode_files(
processor,
image_paths: List[str],
process_res: int = 504,
warmup_runs: int = 2,
benchmark_runs: int = 10,
num_workers: int = 8,
) -> float:
"""Benchmark GPU decoding (from file path)."""
# Warmup
for _ in range(warmup_runs):
processor(
image=image_paths,
process_res=process_res,
process_res_method="upper_bound_resize",
num_workers=num_workers,
)
# Benchmark
times = []
for _ in range(benchmark_runs):
if hasattr(processor, 'device') and processor.device.type == "cuda":
torch.cuda.synchronize()
start = time.perf_counter()
# Pass file paths directly to GPUInputProcessor
tensor, _, _ = processor(
image=image_paths,
process_res=process_res,
process_res_method="upper_bound_resize",
num_workers=num_workers,
)
if hasattr(processor, 'device') and processor.device.type == "cuda":
torch.cuda.synchronize()
elapsed = time.perf_counter() - start
times.append(elapsed)
return np.mean(times)
def create_test_images(sizes: List[Tuple[int, int]], count: int = 4) -> List[List[Image.Image]]:
"""Create test images for each size.
Args:
sizes: List of (width, height) tuples
count: Number of images per size
Returns:
List of image batches, one per size
"""
batches = []
for w, h in sizes:
batch = [Image.new("RGB", (w, h), color=(i * 50, 100, 150)) for i in range(count)]
batches.append(batch)
return batches
def benchmark_hybrid(
processor,
images: List[Image.Image],
process_res: int = 504,
warmup_runs: int = 2,
benchmark_runs: int = 10,
num_workers: int = 8,
device=torch.device("cuda")
) -> float:
"""Benchmark hybrid preprocessing (CPU resize -> GPU normalize)."""
# Warmup
for _ in range(warmup_runs):
imgs_cpu, _, _ = processor(
image=images,
process_res=process_res,
process_res_method="upper_bound_resize",
num_workers=num_workers,
perform_normalization=False
)
imgs_gpu = imgs_cpu.to(device, non_blocking=True).float() / 255.0
_ = InputProcessor.normalize_tensor(imgs_gpu, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
# Benchmark
times = []
for _ in range(benchmark_runs):
if device.type == "cuda":
torch.cuda.synchronize()
start = time.perf_counter()
# 1. CPU Preprocessing (uint8)
imgs_cpu, _, _ = processor(
image=images,
process_res=process_res,
process_res_method="upper_bound_resize",
num_workers=num_workers,
perform_normalization=False
)
# 2. Transfer + Normalize
imgs_gpu = imgs_cpu.to(device, non_blocking=True).float() / 255.0
_ = InputProcessor.normalize_tensor(imgs_gpu, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
if device.type == "cuda":
torch.cuda.synchronize()
elapsed = time.perf_counter() - start
times.append(elapsed)
return np.mean(times)
def benchmark_preprocessing(
processor,
images: List[Image.Image],
process_res: int = 504,
warmup_runs: int = 2,
benchmark_runs: int = 10,
num_workers: int = 8,
) -> float:
"""Benchmark preprocessing performance.
Args:
processor: InputProcessor or GPUInputProcessor instance
images: List of test images
process_res: Processing resolution
warmup_runs: Number of warmup runs to discard
benchmark_runs: Number of benchmark runs to average
num_workers: Number of parallel workers (for CPU processor)
Returns:
Average preprocessing time in seconds
"""
# Warmup
for _ in range(warmup_runs):
processor(
image=images,
process_res=process_res,
process_res_method="upper_bound_resize",
num_workers=num_workers,
)
# Benchmark
times = []
for _ in range(benchmark_runs):
if hasattr(processor, 'device') and processor.device.type == "cuda":
torch.cuda.synchronize()
start = time.perf_counter()
tensor, _, _ = processor(
image=images,
process_res=process_res,
process_res_method="upper_bound_resize",
num_workers=num_workers,
)
if hasattr(processor, 'device') and processor.device.type == "cuda":
torch.cuda.synchronize()
elapsed = time.perf_counter() - start
times.append(elapsed)
return np.mean(times)
def print_results_table(results: List[dict]):
"""Pretty print benchmark results as table."""
print("\n" + "=" * 140)
print("GPU PREPROCESSING BENCHMARK RESULTS")
print("=" * 140)
print(f"{'Image Size':<15} {'CPU Time':<12} {'GPU Time':<12} {'Hybrid Time':<12} {'GPU Decode':<12} {'Best Method':<15}")
print("-" * 140)
for result in results:
size_str = f"{result['width']}x{result['height']}"
cpu_time = f"{result['cpu_time']*1000:.2f} ms"
gpu_time = f"{result['gpu_time']*1000:.2f} ms"
hybrid_time = f"{result['hybrid_time']*1000:.2f} ms"
gpu_decode_time = f"{result['gpu_decode_time']*1000:.2f} ms"
times = [result['cpu_time'], result['gpu_time'], result['hybrid_time'], result['gpu_decode_time']]
labels = ["CPU", "GPU", "Hybrid", "GPU Decode"]
best_idx = np.argmin(times)
best = labels[best_idx]
print(f"{size_str:<15} {cpu_time:<12} {gpu_time:<12} {hybrid_time:<12} {gpu_decode_time:<12} {best:<15}")
print("=" * 140 + "\n")
def main():
"""Run comprehensive benchmark."""
print("\n" + "=" * 100)
print("INITIALIZING GPU PREPROCESSING BENCHMARK")
print("=" * 100)
# Check GPU availability
if torch.cuda.is_available():
device_name = "cuda"
device_info = torch.cuda.get_device_name(0)
print(f"βœ“ GPU Device: {device_info}")
print("βœ“ GPU preprocessing: ENABLED (NVJPEG + Kornia)")
elif torch.backends.mps.is_available():
device_name = "mps"
device_info = "Apple MPS"
print(f"βœ“ GPU Device: {device_info}")
print("β„Ή GPU preprocessing: DISABLED on MPS (CPU is faster on Apple Silicon)")
print(" β†’ GPUInputProcessor will use CPU path automatically")
print(" β†’ GPU reserved for model inference (5-10x speedup there)")
else:
print("βœ— No GPU available - benchmark will show CPU vs CPU (no speedup expected)")
device_name = "cpu"
device_info = "CPU only"
device = torch.device(device_name)
# Create processors
cpu_proc = InputProcessor()
gpu_proc = GPUInputProcessor(device=device_name)
print(f"βœ“ Processors initialized: CPU vs {device_name.upper()}")
# Test configurations
# Format: (width, height, description)
test_sizes = [
(640, 480, "Small (VGA)"),
(1280, 720, "Medium (HD)"),
(1920, 1080, "Large (Full HD)"),
(3840, 2160, "XLarge (4K)"),
]
process_res = 504
num_images = 4
num_workers = 8
print(f"βœ“ Test config: {num_images} images per batch, process_res={process_res}, num_workers={num_workers}")
print(f"βœ“ Testing {len(test_sizes)} image sizes: {', '.join([desc for _, _, desc in test_sizes])}")
# Create test images
print("\nGenerating test images (PIL & Files)...")
image_batches_pil = create_test_images([(w, h) for w, h, _ in test_sizes], count=num_images)
image_batches_files, temp_dir = create_test_files(test_sizes, count=num_images)
print("βœ“ Test images generated")
# Run benchmarks
print("\nRunning benchmarks (this may take a minute)...\n")
results = []
try:
for (w, h, desc), imgs_pil, imgs_files in zip(test_sizes, image_batches_pil, image_batches_files):
print(f"Benchmarking {desc} ({w}x{h})...", end=" ", flush=True)
cpu_time = benchmark_preprocessing(cpu_proc, imgs_pil, process_res, num_workers=num_workers)
gpu_time = benchmark_preprocessing(gpu_proc, imgs_pil, process_res, num_workers=num_workers)
hybrid_time = benchmark_hybrid(cpu_proc, imgs_pil, process_res, num_workers=num_workers, device=device)
# GPU Decode uses file paths
gpu_decode_time = benchmark_gpu_decode_files(gpu_proc, imgs_files, process_res, num_workers=num_workers)
results.append({
'width': w,
'height': h,
'description': desc,
'cpu_time': cpu_time,
'gpu_time': gpu_time,
'hybrid_time': hybrid_time,
'gpu_decode_time': gpu_decode_time
})
best_time = min(cpu_time, gpu_time, hybrid_time, gpu_decode_time)
if best_time == gpu_decode_time:
win = "GPU Decode"
elif best_time == hybrid_time:
win = "Hybrid"
elif best_time == gpu_time:
win = "GPU"
else:
win = "CPU"
print(f"βœ“ Best: {win}")
# Print results table
print_results_table(results)
# Memory info (CUDA only)
if device_name == "cuda":
print("\nGPU Memory Usage:")
print(f" Allocated: {torch.cuda.memory_allocated(0) / 1024**2:.1f} MB")
print(f" Cached: {torch.cuda.memory_reserved(0) / 1024**2:.1f} MB")
finally:
# Cleanup
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
print(f"\nβœ“ Cleaned up temp directory: {temp_dir}")
if __name__ == "__main__":
main()