| import numpy as np | |
| DEFAULT_TRANSIENT_MAX_SECONDS = 0.1 | |
| DEFAULT_TRANSIENT_CONTEXT_SILENCE_SECONDS = 0.2 | |
| DEFAULT_TRANSIENT_WINDOW_SECONDS = 0.01 | |
| DEFAULT_TRANSIENT_SILENCE_THRESHOLD = 0.015 | |
| def _mono(audio_np: np.ndarray) -> np.ndarray: | |
| return audio_np.mean(axis=1) if audio_np.ndim == 2 else audio_np | |
| def _window_settings(sample_rate: int, max_transient_seconds: float, context_silence_seconds: float, window_seconds: float) -> tuple[int, int, int]: | |
| window = max(1, int(window_seconds * sample_rate)) | |
| max_noise_windows = max(1, int(np.ceil(max_transient_seconds * sample_rate / window))) | |
| min_silence_windows = max(1, int(np.ceil(context_silence_seconds * sample_rate / window))) | |
| return window, max_noise_windows, min_silence_windows | |
| def _rms_windows(mono: np.ndarray, window: int, frame_count: int, offset: int = 0) -> np.ndarray: | |
| return np.array([np.sqrt(np.mean(mono[offset + i * window : offset + (i + 1) * window].astype(np.float64) ** 2)) for i in range(frame_count)]) | |
| def _debug_print(debug: bool, label: str, message: str) -> None: | |
| if debug: | |
| print(f"[{label}] {message}") | |
| def trim_leading_transient_noise( | |
| audio_np: np.ndarray, | |
| sample_rate: int, | |
| *, | |
| max_transient_seconds: float = DEFAULT_TRANSIENT_MAX_SECONDS, | |
| context_silence_seconds: float = DEFAULT_TRANSIENT_CONTEXT_SILENCE_SECONDS, | |
| window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS, | |
| threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD, | |
| debug: bool = False, | |
| label: str = "Audio Cleaning", | |
| ) -> np.ndarray: | |
| mono = _mono(audio_np) | |
| window, max_noise_windows, min_silence_windows = _window_settings(sample_rate, max_transient_seconds, context_silence_seconds, window_seconds) | |
| frame_count = min(len(mono) // window, max_noise_windows + min_silence_windows) | |
| if frame_count < max_noise_windows + min_silence_windows: | |
| return audio_np | |
| active = _rms_windows(mono, window, frame_count) > threshold | |
| if not active[0]: | |
| return audio_np | |
| for silence_start in range(1, max_noise_windows + 1): | |
| if not active[silence_start : silence_start + min_silence_windows].any(): | |
| trim_end = silence_start * window | |
| _debug_print(debug, label, f"Trimmed leading transient noise ({trim_end / sample_rate:.2f}s)") | |
| return audio_np[trim_end:] | |
| return audio_np | |
| def trim_trailing_transient_noise( | |
| audio_np: np.ndarray, | |
| sample_rate: int, | |
| *, | |
| max_transient_seconds: float = DEFAULT_TRANSIENT_MAX_SECONDS, | |
| context_silence_seconds: float = DEFAULT_TRANSIENT_CONTEXT_SILENCE_SECONDS, | |
| window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS, | |
| threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD, | |
| debug: bool = False, | |
| label: str = "Audio Cleaning", | |
| ) -> np.ndarray: | |
| mono = _mono(audio_np) | |
| window, max_noise_windows, min_silence_windows = _window_settings(sample_rate, max_transient_seconds, context_silence_seconds, window_seconds) | |
| frame_count = min(len(mono) // window, max_noise_windows + min_silence_windows) | |
| if frame_count < max_noise_windows + min_silence_windows: | |
| return audio_np | |
| offset = len(mono) - frame_count * window | |
| active = _rms_windows(mono, window, frame_count, offset=offset) > threshold | |
| if not active[-1]: | |
| return audio_np | |
| for noise_start in range(frame_count - 1, frame_count - max_noise_windows - 1, -1): | |
| if not active[noise_start - min_silence_windows : noise_start].any(): | |
| trim_start = offset + noise_start * window | |
| if trim_start <= 0 or trim_start >= len(audio_np): | |
| return audio_np | |
| _debug_print(debug, label, f"Trimmed trailing transient noise ({(len(audio_np) - trim_start) / sample_rate:.2f}s)") | |
| return audio_np[:trim_start] | |
| return audio_np | |
| def mute_isolated_transient_noise( | |
| audio_np: np.ndarray, | |
| sample_rate: int, | |
| *, | |
| max_transient_seconds: float = DEFAULT_TRANSIENT_MAX_SECONDS, | |
| context_silence_seconds: float = DEFAULT_TRANSIENT_CONTEXT_SILENCE_SECONDS, | |
| window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS, | |
| threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD, | |
| debug: bool = False, | |
| label: str = "Audio Cleaning", | |
| ) -> np.ndarray: | |
| mono = _mono(audio_np) | |
| window, max_noise_windows, min_silence_windows = _window_settings(sample_rate, max_transient_seconds, context_silence_seconds, window_seconds) | |
| frame_count = len(mono) // window | |
| if frame_count < max_noise_windows + 2 * min_silence_windows: | |
| return audio_np | |
| active = _rms_windows(mono, window, frame_count) > threshold | |
| muted = audio_np.copy() | |
| active_start = None | |
| muted_count = 0 | |
| for idx, is_active in enumerate(active): | |
| if is_active and active_start is None: | |
| active_start = idx | |
| elif not is_active and active_start is not None: | |
| if idx - active_start <= max_noise_windows: | |
| prev_start = max(0, active_start - min_silence_windows) | |
| next_end = min(frame_count, idx + min_silence_windows) | |
| if not active[prev_start:active_start].any() and not active[idx:next_end].any() and active_start > 0 and idx < frame_count: | |
| muted[active_start * window : idx * window] = 0 | |
| muted_count += 1 | |
| active_start = None | |
| _debug_print(debug and muted_count > 0, label, f"Muted {muted_count} isolated transient noise segment(s)") | |
| return muted | |
| def trim_leading_noise_before_speech( | |
| audio_np: np.ndarray, | |
| sample_rate: int, | |
| *, | |
| speech_threshold: float = 0.03, | |
| max_leading_seconds: float = 1.0, | |
| keep_silence_seconds: float = 0.1, | |
| window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS, | |
| debug: bool = False, | |
| label: str = "Audio Cleaning", | |
| ) -> np.ndarray: | |
| mono = _mono(audio_np) | |
| window = max(1, int(window_seconds * sample_rate)) | |
| frame_count = min(len(mono) // window, max(1, int(max_leading_seconds * sample_rate / window))) | |
| if frame_count == 0: | |
| return audio_np | |
| strong_windows = np.where(_rms_windows(mono, window, frame_count) > speech_threshold)[0] | |
| if len(strong_windows) == 0: | |
| return audio_np | |
| first_speech = int(strong_windows[0]) * window | |
| keep_samples = int(keep_silence_seconds * sample_rate) | |
| trim_end = max(0, first_speech - keep_samples) | |
| if trim_end <= 0: | |
| return audio_np | |
| _debug_print(debug, label, f"Trimmed leading low-level noise before speech ({trim_end / sample_rate:.2f}s)") | |
| return audio_np[trim_end:] | |
| def ensure_trailing_silence(audio_np: np.ndarray, sample_rate: int, min_silence_seconds: float, *, threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD) -> np.ndarray: | |
| if min_silence_seconds <= 0: | |
| return audio_np | |
| mono = _mono(audio_np) | |
| window = max(1, int(DEFAULT_TRANSIENT_WINDOW_SECONDS * sample_rate)) | |
| frame_count = len(mono) // window | |
| if frame_count == 0: | |
| return audio_np | |
| active = _rms_windows(mono, window, frame_count) > threshold | |
| active_windows = np.where(active)[0] | |
| if len(active_windows) == 0: | |
| return audio_np | |
| last_active_end = min(len(audio_np), (int(active_windows[-1]) + 1) * window) | |
| existing_silence = len(audio_np) - last_active_end | |
| target_silence = int(min_silence_seconds * sample_rate) | |
| missing_silence = target_silence - existing_silence | |
| if missing_silence <= 0: | |
| return audio_np | |
| pad_shape = (missing_silence,) if audio_np.ndim == 1 else (missing_silence, audio_np.shape[1]) | |
| return np.concatenate([audio_np, np.zeros(pad_shape, dtype=audio_np.dtype)], axis=0) | |
| def trim_after_silence_boundary( | |
| audio_np: np.ndarray, | |
| sample_rate: int, | |
| earliest_seconds: float, | |
| *, | |
| search_seconds: float = 2.0, | |
| min_silence_seconds: float = 0.18, | |
| keep_silence_seconds: float = 0.12, | |
| window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS, | |
| threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD, | |
| debug: bool = False, | |
| label: str = "Audio Cleaning", | |
| ) -> np.ndarray: | |
| if earliest_seconds <= 0 or search_seconds <= 0 or len(audio_np) == 0: | |
| return audio_np | |
| mono = _mono(audio_np) | |
| window = max(1, int(window_seconds * sample_rate)) | |
| earliest_sample = max(0, int(earliest_seconds * sample_rate)) | |
| if earliest_sample >= len(mono): | |
| return audio_np | |
| offset = (earliest_sample // window) * window | |
| search_end = min(len(mono), earliest_sample + int(search_seconds * sample_rate)) | |
| frame_count = max(0, (search_end - offset) // window) | |
| min_silence_windows = max(1, int(np.ceil(min_silence_seconds * sample_rate / window))) | |
| if frame_count < min_silence_windows: | |
| return audio_np | |
| active = _rms_windows(mono, window, frame_count, offset=offset) > threshold | |
| for idx in range(0, len(active) - min_silence_windows + 1): | |
| if not active[idx : idx + min_silence_windows].any(): | |
| cut_sample = min(len(audio_np), offset + idx * window + int(keep_silence_seconds * sample_rate)) | |
| if cut_sample <= 0 or cut_sample >= len(audio_np): | |
| return audio_np | |
| _debug_print(debug, label, f"Trimmed chunk tail at silence boundary ({cut_sample / sample_rate:.2f}s)") | |
| return audio_np[:cut_sample] | |
| return audio_np | |
Xet Storage Details
- Size:
- 9.31 kB
- Xet hash:
- e71f5c06b501cb055651be5b71156626ce000599b9f83602bd7fee41ea308b7d
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.