Daankular's picture
download
raw
11 kB
import os, shutil, sys, time
# Global variables to track download progress
_start_time = None
_last_time = None
_last_downloaded = 0
_speed_history = []
_update_interval = 0.5 # Update speed every 0.5 seconds
def progress_hook(block_num, block_size, total_size, filename=None):
"""
Simple progress bar hook for urlretrieve
Args:
block_num: Number of blocks downloaded so far
block_size: Size of each block in bytes
total_size: Total size of the file in bytes
filename: Name of the file being downloaded (optional)
"""
global _start_time, _last_time, _last_downloaded, _speed_history, _update_interval
current_time = time.time()
downloaded = block_num * block_size
# Initialize timing on first call
if _start_time is None or block_num == 0:
_start_time = current_time
_last_time = current_time
_last_downloaded = 0
_speed_history = []
# Calculate download speed only at specified intervals
speed = 0
if current_time - _last_time >= _update_interval:
if _last_time > 0:
current_speed = (downloaded - _last_downloaded) / (current_time - _last_time)
_speed_history.append(current_speed)
# Keep only last 5 speed measurements for smoothing
if len(_speed_history) > 5:
_speed_history.pop(0)
# Average the recent speeds for smoother display
speed = sum(_speed_history) / len(_speed_history)
_last_time = current_time
_last_downloaded = downloaded
elif _speed_history:
# Use the last calculated average speed
speed = sum(_speed_history) / len(_speed_history)
# Format file sizes and speed
def format_bytes(bytes_val):
for unit in ['B', 'KB', 'MB', 'GB']:
if bytes_val < 1024:
return f"{bytes_val:.1f}{unit}"
bytes_val /= 1024
return f"{bytes_val:.1f}TB"
file_display = filename if filename else "Unknown file"
if total_size <= 0:
# If total size is unknown, show downloaded bytes
speed_str = f" @ {format_bytes(speed)}/s" if speed > 0 else ""
line = f"\r{file_display}: {format_bytes(downloaded)}{speed_str}"
# Clear any trailing characters by padding with spaces
sys.stdout.write(line.ljust(80))
sys.stdout.flush()
return
downloaded = block_num * block_size
percent = min(100, (downloaded / total_size) * 100)
# Create progress bar (40 characters wide to leave room for other info)
bar_length = 40
filled = int(bar_length * percent / 100)
bar = '█' * filled + '░' * (bar_length - filled)
# Format file sizes and speed
def format_bytes(bytes_val):
for unit in ['B', 'KB', 'MB', 'GB']:
if bytes_val < 1024:
return f"{bytes_val:.1f}{unit}"
bytes_val /= 1024
return f"{bytes_val:.1f}TB"
speed_str = f" @ {format_bytes(speed)}/s" if speed > 0 else ""
# Display progress with filename first
line = f"\r{file_display}: [{bar}] {percent:.1f}% ({format_bytes(downloaded)}/{format_bytes(total_size)}){speed_str}"
# Clear any trailing characters by padding with spaces
sys.stdout.write(line.ljust(100))
sys.stdout.flush()
# Print newline when complete
if percent >= 100:
print()
# Wrapper function to include filename in progress hook
def create_progress_hook(filename):
"""Creates a progress hook with the filename included"""
global _start_time, _last_time, _last_downloaded, _speed_history
# Reset timing variables for new download
_start_time = None
_last_time = None
_last_downloaded = 0
_speed_history = []
def hook(block_num, block_size, total_size):
return progress_hook(block_num, block_size, total_size, filename)
return hook
def process_files_def(repoId=None, sourceFolderList=None, fileList=None, targetFolderList=None):
from huggingface_hub import hf_hub_download, snapshot_download
from shared.utils import files_locator as fl
if targetFolderList is None:
targetFolderList = [None] * len(sourceFolderList)
for targetFolder, sourceFolder, files in zip(targetFolderList, sourceFolderList, fileList):
if targetFolder is not None and len(targetFolder) == 0:
targetFolder = None
explicit_target = targetFolder if targetFolder is not None else (sourceFolder if len(sourceFolder) > 0 else None)
targetRoot = fl.get_smart_download_root(explicit_target)
local_dir = os.path.join(targetRoot, targetFolder) if targetFolder is not None else targetRoot
if len(files) == 0:
if fl.locate_folder(sourceFolder if targetFolder is None else os.path.join(targetFolder, sourceFolder), error_if_none=False) is None:
snapshot_download(repo_id=repoId, allow_patterns=sourceFolder + "/*", local_dir=local_dir)
else:
for onefile in files:
if len(sourceFolder) > 0:
if fl.locate_file((sourceFolder + "/" + onefile) if targetFolder is None else os.path.join(targetFolder, sourceFolder, onefile), error_if_none=False) is None:
hf_hub_download(repo_id=repoId, filename=onefile, local_dir=local_dir, subfolder=sourceFolder)
else:
if fl.locate_file(onefile if targetFolder is None else os.path.join(targetFolder, onefile), error_if_none=False) is None:
hf_hub_download(repo_id=repoId, filename=onefile, local_dir=local_dir)
def _download_relpath(source_folder, filename, target_folder=None):
source_folder = "" if source_folder is None else source_folder
if target_folder is not None and len(target_folder) == 0:
target_folder = None
if target_folder is None:
return os.path.join(source_folder, filename) if len(source_folder) > 0 else filename
return os.path.join(target_folder, source_folder, filename) if len(source_folder) > 0 else os.path.join(target_folder, filename)
def download_def_missing_files(download_def):
from shared.utils import files_locator as fl
if download_def is None:
return []
if isinstance(download_def, list):
missing = []
for one_def in download_def:
missing.extend(download_def_missing_files(one_def))
return missing
source_folders = download_def.get("sourceFolderList", [])
file_lists = download_def.get("fileList", [])
target_folders = download_def.get("targetFolderList")
if target_folders is None:
target_folders = [None] * len(source_folders)
missing = []
for source_folder, files, target_folder in zip(source_folders, file_lists, target_folders):
if len(files) == 0:
rel_folder = _download_relpath(source_folder, "", target_folder).rstrip("\\/")
if fl.locate_folder(rel_folder, error_if_none=False) is None:
missing.append(rel_folder)
continue
for filename in files:
rel_path = _download_relpath(source_folder, filename, target_folder)
if fl.locate_file(rel_path, error_if_none=False) is None:
missing.append(rel_path)
return missing
def send_download_status(send_cmd=None, status_text=None):
if send_cmd is not None and status_text:
send_cmd("status", status_text)
def process_files_def_if_needed(download_def, send_cmd=None, status_text=None):
if download_def is None or len(download_def_missing_files(download_def)) == 0:
return False
send_download_status(send_cmd, status_text)
if isinstance(download_def, list):
for one_def in download_def:
process_files_def(**one_def)
else:
process_files_def(**download_def)
return True
def query_audio_background_replacement_download_def():
return {
"repoId": "DeepBeepMeep/Wan2.1",
"sourceFolderList": ["roformer"],
"fileList": [["model_bs_roformer_ep_317_sdr_12.9755.ckpt", "model_bs_roformer_ep_317_sdr_12.9755.yaml", "download_checks.json"]],
}
def download_audio_background_replacement(send_cmd=None, status_text="Downloading audio background replacement model files..."):
return process_files_def_if_needed(query_audio_background_replacement_download_def(), send_cmd=send_cmd, status_text=status_text)
def query_speaker_separator_download_def():
return [
{
"repoId": "DeepBeepMeep/Wan2.1",
"sourceFolderList": ["pyannote"],
"fileList": [["pyannote_model_wespeaker-voxceleb-resnet34-LM.bin", "pytorch_model_segmentation-3.0.bin"]],
},
{
"repoId": "DeepBeepMeep/LTX-2",
"sourceFolderList": ["sherpa"],
"fileList": [["sherpa-onnx-pyannote-segmentation-3-0/model.onnx", "3dspeaker_speech_eres2net_base_sv_zh-cn_3dspeaker_16k.onnx"]],
},
]
def download_speaker_separator(send_cmd=None, status_text="Downloading speaker separator model files..."):
return process_files_def_if_needed(query_speaker_separator_download_def(), send_cmd=send_cmd, status_text=status_text)
def process_download_defs(download_defs):
if isinstance(download_defs, dict):
process_files_def(**download_defs)
return
for download_def in download_defs or []:
if download_def is not None:
process_files_def(**download_def)
def download_file(url, filename):
from huggingface_hub import hf_hub_download
from shared.utils import files_locator as fl
url = url.split("|")[0]
if url.startswith("https://huggingface.co/") and "/resolve/main/" in url:
base_dir = os.path.dirname(filename)
url = url[len("https://huggingface.co/"):]
url_parts = url.split("/resolve/main/")
repoId = url_parts[0]
onefile = os.path.basename(url_parts[-1])
sourceFolder = os.path.dirname(url_parts[-1])
if len(sourceFolder) == 0:
hf_hub_download(repo_id=repoId, filename=onefile, local_dir=fl.get_download_location() if len(base_dir) == 0 else base_dir)
else:
tgt = fl.get_download_location() if len(base_dir) == 0 else base_dir
if not os.path.exists(tgt):
os.makedirs(tgt)
temp_dir_path = os.path.join(tgt, f"_temp{time.time()}")
temp_full_path = os.path.join(temp_dir_path, sourceFolder)
if not os.path.exists(temp_full_path):
os.makedirs(temp_full_path)
hf_hub_download(repo_id=repoId, filename=onefile, local_dir=temp_dir_path, subfolder=sourceFolder)
shutil.move(os.path.join(temp_full_path, onefile), tgt)
shutil.rmtree(temp_dir_path)
else:
from urllib.request import urlretrieve
urlretrieve(url, filename, create_progress_hook(filename))

Xet Storage Details

Size:
11 kB
·
Xet hash:
2d74e4b1d2f05666945fdbf68fad2e6da82e37f2668c7efb29c9c5fbb24f751f

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.