| import os, shutil, sys, time | |
| # Global variables to track download progress | |
| _start_time = None | |
| _last_time = None | |
| _last_downloaded = 0 | |
| _speed_history = [] | |
| _update_interval = 0.5 # Update speed every 0.5 seconds | |
| def progress_hook(block_num, block_size, total_size, filename=None): | |
| """ | |
| Simple progress bar hook for urlretrieve | |
| Args: | |
| block_num: Number of blocks downloaded so far | |
| block_size: Size of each block in bytes | |
| total_size: Total size of the file in bytes | |
| filename: Name of the file being downloaded (optional) | |
| """ | |
| global _start_time, _last_time, _last_downloaded, _speed_history, _update_interval | |
| current_time = time.time() | |
| downloaded = block_num * block_size | |
| # Initialize timing on first call | |
| if _start_time is None or block_num == 0: | |
| _start_time = current_time | |
| _last_time = current_time | |
| _last_downloaded = 0 | |
| _speed_history = [] | |
| # Calculate download speed only at specified intervals | |
| speed = 0 | |
| if current_time - _last_time >= _update_interval: | |
| if _last_time > 0: | |
| current_speed = (downloaded - _last_downloaded) / (current_time - _last_time) | |
| _speed_history.append(current_speed) | |
| # Keep only last 5 speed measurements for smoothing | |
| if len(_speed_history) > 5: | |
| _speed_history.pop(0) | |
| # Average the recent speeds for smoother display | |
| speed = sum(_speed_history) / len(_speed_history) | |
| _last_time = current_time | |
| _last_downloaded = downloaded | |
| elif _speed_history: | |
| # Use the last calculated average speed | |
| speed = sum(_speed_history) / len(_speed_history) | |
| # Format file sizes and speed | |
| def format_bytes(bytes_val): | |
| for unit in ['B', 'KB', 'MB', 'GB']: | |
| if bytes_val < 1024: | |
| return f"{bytes_val:.1f}{unit}" | |
| bytes_val /= 1024 | |
| return f"{bytes_val:.1f}TB" | |
| file_display = filename if filename else "Unknown file" | |
| if total_size <= 0: | |
| # If total size is unknown, show downloaded bytes | |
| speed_str = f" @ {format_bytes(speed)}/s" if speed > 0 else "" | |
| line = f"\r{file_display}: {format_bytes(downloaded)}{speed_str}" | |
| # Clear any trailing characters by padding with spaces | |
| sys.stdout.write(line.ljust(80)) | |
| sys.stdout.flush() | |
| return | |
| downloaded = block_num * block_size | |
| percent = min(100, (downloaded / total_size) * 100) | |
| # Create progress bar (40 characters wide to leave room for other info) | |
| bar_length = 40 | |
| filled = int(bar_length * percent / 100) | |
| bar = '█' * filled + '░' * (bar_length - filled) | |
| # Format file sizes and speed | |
| def format_bytes(bytes_val): | |
| for unit in ['B', 'KB', 'MB', 'GB']: | |
| if bytes_val < 1024: | |
| return f"{bytes_val:.1f}{unit}" | |
| bytes_val /= 1024 | |
| return f"{bytes_val:.1f}TB" | |
| speed_str = f" @ {format_bytes(speed)}/s" if speed > 0 else "" | |
| # Display progress with filename first | |
| line = f"\r{file_display}: [{bar}] {percent:.1f}% ({format_bytes(downloaded)}/{format_bytes(total_size)}){speed_str}" | |
| # Clear any trailing characters by padding with spaces | |
| sys.stdout.write(line.ljust(100)) | |
| sys.stdout.flush() | |
| # Print newline when complete | |
| if percent >= 100: | |
| print() | |
| # Wrapper function to include filename in progress hook | |
| def create_progress_hook(filename): | |
| """Creates a progress hook with the filename included""" | |
| global _start_time, _last_time, _last_downloaded, _speed_history | |
| # Reset timing variables for new download | |
| _start_time = None | |
| _last_time = None | |
| _last_downloaded = 0 | |
| _speed_history = [] | |
| def hook(block_num, block_size, total_size): | |
| return progress_hook(block_num, block_size, total_size, filename) | |
| return hook | |
| def process_files_def(repoId=None, sourceFolderList=None, fileList=None, targetFolderList=None): | |
| from huggingface_hub import hf_hub_download, snapshot_download | |
| from shared.utils import files_locator as fl | |
| if targetFolderList is None: | |
| targetFolderList = [None] * len(sourceFolderList) | |
| for targetFolder, sourceFolder, files in zip(targetFolderList, sourceFolderList, fileList): | |
| if targetFolder is not None and len(targetFolder) == 0: | |
| targetFolder = None | |
| explicit_target = targetFolder if targetFolder is not None else (sourceFolder if len(sourceFolder) > 0 else None) | |
| targetRoot = fl.get_smart_download_root(explicit_target) | |
| local_dir = os.path.join(targetRoot, targetFolder) if targetFolder is not None else targetRoot | |
| if len(files) == 0: | |
| if fl.locate_folder(sourceFolder if targetFolder is None else os.path.join(targetFolder, sourceFolder), error_if_none=False) is None: | |
| snapshot_download(repo_id=repoId, allow_patterns=sourceFolder + "/*", local_dir=local_dir) | |
| else: | |
| for onefile in files: | |
| if len(sourceFolder) > 0: | |
| if fl.locate_file((sourceFolder + "/" + onefile) if targetFolder is None else os.path.join(targetFolder, sourceFolder, onefile), error_if_none=False) is None: | |
| hf_hub_download(repo_id=repoId, filename=onefile, local_dir=local_dir, subfolder=sourceFolder) | |
| else: | |
| if fl.locate_file(onefile if targetFolder is None else os.path.join(targetFolder, onefile), error_if_none=False) is None: | |
| hf_hub_download(repo_id=repoId, filename=onefile, local_dir=local_dir) | |
| def _download_relpath(source_folder, filename, target_folder=None): | |
| source_folder = "" if source_folder is None else source_folder | |
| if target_folder is not None and len(target_folder) == 0: | |
| target_folder = None | |
| if target_folder is None: | |
| return os.path.join(source_folder, filename) if len(source_folder) > 0 else filename | |
| return os.path.join(target_folder, source_folder, filename) if len(source_folder) > 0 else os.path.join(target_folder, filename) | |
| def download_def_missing_files(download_def): | |
| from shared.utils import files_locator as fl | |
| if download_def is None: | |
| return [] | |
| if isinstance(download_def, list): | |
| missing = [] | |
| for one_def in download_def: | |
| missing.extend(download_def_missing_files(one_def)) | |
| return missing | |
| source_folders = download_def.get("sourceFolderList", []) | |
| file_lists = download_def.get("fileList", []) | |
| target_folders = download_def.get("targetFolderList") | |
| if target_folders is None: | |
| target_folders = [None] * len(source_folders) | |
| missing = [] | |
| for source_folder, files, target_folder in zip(source_folders, file_lists, target_folders): | |
| if len(files) == 0: | |
| rel_folder = _download_relpath(source_folder, "", target_folder).rstrip("\\/") | |
| if fl.locate_folder(rel_folder, error_if_none=False) is None: | |
| missing.append(rel_folder) | |
| continue | |
| for filename in files: | |
| rel_path = _download_relpath(source_folder, filename, target_folder) | |
| if fl.locate_file(rel_path, error_if_none=False) is None: | |
| missing.append(rel_path) | |
| return missing | |
| def send_download_status(send_cmd=None, status_text=None): | |
| if send_cmd is not None and status_text: | |
| send_cmd("status", status_text) | |
| def process_files_def_if_needed(download_def, send_cmd=None, status_text=None): | |
| if download_def is None or len(download_def_missing_files(download_def)) == 0: | |
| return False | |
| send_download_status(send_cmd, status_text) | |
| if isinstance(download_def, list): | |
| for one_def in download_def: | |
| process_files_def(**one_def) | |
| else: | |
| process_files_def(**download_def) | |
| return True | |
| def query_audio_background_replacement_download_def(): | |
| return { | |
| "repoId": "DeepBeepMeep/Wan2.1", | |
| "sourceFolderList": ["roformer"], | |
| "fileList": [["model_bs_roformer_ep_317_sdr_12.9755.ckpt", "model_bs_roformer_ep_317_sdr_12.9755.yaml", "download_checks.json"]], | |
| } | |
| def download_audio_background_replacement(send_cmd=None, status_text="Downloading audio background replacement model files..."): | |
| return process_files_def_if_needed(query_audio_background_replacement_download_def(), send_cmd=send_cmd, status_text=status_text) | |
| def query_speaker_separator_download_def(): | |
| return [ | |
| { | |
| "repoId": "DeepBeepMeep/Wan2.1", | |
| "sourceFolderList": ["pyannote"], | |
| "fileList": [["pyannote_model_wespeaker-voxceleb-resnet34-LM.bin", "pytorch_model_segmentation-3.0.bin"]], | |
| }, | |
| { | |
| "repoId": "DeepBeepMeep/LTX-2", | |
| "sourceFolderList": ["sherpa"], | |
| "fileList": [["sherpa-onnx-pyannote-segmentation-3-0/model.onnx", "3dspeaker_speech_eres2net_base_sv_zh-cn_3dspeaker_16k.onnx"]], | |
| }, | |
| ] | |
| def download_speaker_separator(send_cmd=None, status_text="Downloading speaker separator model files..."): | |
| return process_files_def_if_needed(query_speaker_separator_download_def(), send_cmd=send_cmd, status_text=status_text) | |
| def process_download_defs(download_defs): | |
| if isinstance(download_defs, dict): | |
| process_files_def(**download_defs) | |
| return | |
| for download_def in download_defs or []: | |
| if download_def is not None: | |
| process_files_def(**download_def) | |
| def download_file(url, filename): | |
| from huggingface_hub import hf_hub_download | |
| from shared.utils import files_locator as fl | |
| url = url.split("|")[0] | |
| if url.startswith("https://huggingface.co/") and "/resolve/main/" in url: | |
| base_dir = os.path.dirname(filename) | |
| url = url[len("https://huggingface.co/"):] | |
| url_parts = url.split("/resolve/main/") | |
| repoId = url_parts[0] | |
| onefile = os.path.basename(url_parts[-1]) | |
| sourceFolder = os.path.dirname(url_parts[-1]) | |
| if len(sourceFolder) == 0: | |
| hf_hub_download(repo_id=repoId, filename=onefile, local_dir=fl.get_download_location() if len(base_dir) == 0 else base_dir) | |
| else: | |
| tgt = fl.get_download_location() if len(base_dir) == 0 else base_dir | |
| if not os.path.exists(tgt): | |
| os.makedirs(tgt) | |
| temp_dir_path = os.path.join(tgt, f"_temp{time.time()}") | |
| temp_full_path = os.path.join(temp_dir_path, sourceFolder) | |
| if not os.path.exists(temp_full_path): | |
| os.makedirs(temp_full_path) | |
| hf_hub_download(repo_id=repoId, filename=onefile, local_dir=temp_dir_path, subfolder=sourceFolder) | |
| shutil.move(os.path.join(temp_full_path, onefile), tgt) | |
| shutil.rmtree(temp_dir_path) | |
| else: | |
| from urllib.request import urlretrieve | |
| urlretrieve(url, filename, create_progress_hook(filename)) | |
Xet Storage Details
- Size:
- 11 kB
- Xet hash:
- 2d74e4b1d2f05666945fdbf68fad2e6da82e37f2668c7efb29c9c5fbb24f751f
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.