# Copyright 2022 David Scripka. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # imports from multiprocessing.pool import ThreadPool import os from functools import partial from pathlib import Path import random from tqdm import tqdm from typing import List, Tuple import numpy as np import torch from numpy.lib.format import open_memmap from speechbrain.dataio.dataio import read_audio from speechbrain.processing.signal_processing import reverberate import torchaudio import mutagen import acoustics # Load audio clips and structure into clips of the same length def stack_clips(audio_data, clip_size=16000*2): """ Takes an input list of 1D arrays (of different lengths), concatenates them together, and then extracts clips of a uniform size by dividing the combined array. Args: audio_data (List[ndarray]): A list of 1D numpy arrays to combine and stack clip_size (int): The desired total length of the uniform clip size (in samples) Returns: ndarray: A N by `clip_size` array with the audio data, converted to 16-bit PCM """ # Combine all clips into single clip combined_data = np.hstack((audio_data)) # Get chunks of the specified size new_examples = [] for i in range(0, combined_data.shape[0], clip_size): chunk = combined_data[i:i+clip_size] if chunk.shape[0] != clip_size: chunk = np.hstack((chunk, np.zeros(clip_size - chunk.shape[0]))) new_examples.append(chunk) return np.array(new_examples) def load_audio_clips(files, clip_size=32000): """ Takes the specified audio files and shapes them into an array of N by `clip_size`, where N is determined by the length of the audio files and `clip_size` at run time. Clips longer than `clip size` are truncated and extended into the N+1 row. Clips shorter than `clip_size` are combined with the previous or next clip (except for the last clip in `files`, which is ignored if it is too short.) Args: files (List[str]): A list of filepaths clip_size (int): The number of samples (of 16khz audio) for all of the rows in the array Returns: ndarray: A N by `clip_size` array with the audio data, converted to 16-bit PCM """ # Load audio files audio_data = [] for i in files: try: audio_data.append(read_audio(i)) except ValueError: continue # Get shape of output array N = sum([i.shape[0] for i in audio_data])//clip_size X = np.empty((N, clip_size)) # Add audio data to rows previous_row_remainder = None cnt = 0 for row in audio_data: row = np.hstack((previous_row_remainder, row)) while row.shape[0] >= clip_size: X[cnt, :] = row[0:clip_size] row = row[clip_size:] cnt += 1 previous_row_remainder = row if row.size > 0 else None # Convert to 16-bit PCM data X = (X*32767).astype(np.int16) return X # Dato I/O utils # Convert clips with sox def _convert_clip(input_file, output_file, backend="ffmpeg"): if backend == "sox": cmd = f"sox \"{input_file}\" -G -r 16000 -c 1 \"{output_file}\"" elif backend == "ffmpeg": cmd = f"ffmpeg -y -i \"{input_file}\" -ar 16000 \"{output_file}\"" os.system(cmd) return None def convert_clips(input_files, output_files, sr=16000, ncpu=1, backend="ffmpeg"): """ Converts files in parallel with multithreading using Sox or ffmpeg. Intended to only convert input audio files into single-channel, 16 khz clips. Args: input_files (List[str]): A list of paths to input files output_files (List[str]): A list of paths to output files, corresponding 1:1 to the input files sr (int): The output sample rate of the converted clip ncpu (int): The number of CPUs to use for the conversion backend (str): The utilty to use for conversion, "sox" or "ffmpeg" Returns: None """ # Setup ThreadPool object pool = ThreadPool(processes=ncpu) # Set backend for conversion f = partial(_convert_clip, backend=backend) # Submit jobs pool.starmap(f, [(i, j) for i, j in zip(input_files, output_files)]) def filter_audio_paths(target_dirs, min_length_secs, max_length_secs, duration_method="size", glob_filter=None): """ Gets the paths of wav files in flat target directories, automatically filtering out files below/above the specified length (in seconds). Assumes that all wav files are sampled at 16khz, are single channel, and have 16-bit PCM data. Uses `os.scandir` in Python for highly efficient file system exploration, and doesn't require loading the files into memory for length estimation. Args: target_dir (List[str]): The target directories containing the audio files min_length_secs (float): The minimum length in seconds (otherwise the clip is skipped) max_length_secs (float): The maximum length in seconds (otherwise the clip is skipped) duration_method (str): Whether to use the file size ('size'), or header information ('header') to estimate the duration of the audio file. 'size' is generally much faster, but assumes that all files in the target directory are the same type, sample rate, and bitrate. If None, durations are not calculated. glob_filter (str): A pathlib glob filter string to select specific files within the target directory Returns: tuple: A list of strings corresponding to the paths of the wav files that met the length criteria, and a list of their durations (in seconds) """ file_paths = [] durations = [] for target_dir in target_dirs: sizes = [] dir_paths = [] if glob_filter: dir_paths = [str(i) for i in Path(target_dir).glob(glob_filter)] file_paths.extend(dir_paths) sizes.extend([os.path.getsize(i) for i in dir_paths]) else: for i in tqdm(os.scandir(target_dir)): dir_paths.append(i.path) file_paths.append(i.path) sizes.append(i.stat().st_size) if duration_method == "size": durations.extend(estimate_clip_duration(dir_paths, sizes)) elif duration_method == "header": durations.extend([get_clip_duration(i) for i in tqdm(dir_paths)]) if durations != []: filtered = [(i, j) for i, j in zip(file_paths, durations) if j >= min_length_secs and j <= max_length_secs] return [i[0] for i in filtered], [i[1] for i in filtered] else: return file_paths, [] def estimate_clip_duration(audio_files: list, sizes: list): """Estimates the duration of each audio file in a list. Assumes that all of the audio files have the same audio format, bit depth, and sample rate. Args: audio_file (str): A list of audio file paths sizes (int): The size of each audio file in bytes Returns: list: A list of durations (in seconds) for the audio files """ # Determine file type by checking the first file details = torchaudio.info(audio_files[0]) # Caculate any correction factors needed from the first file details = mutagen.File(audio_files[0]) correction = 8*os.path.getsize(audio_files[0]) - details.info.bitrate*details.info.length # Estimate duration for all remaining clips from file size only durations = [] for size in sizes: durations.append((size*8-correction)/details.info.bitrate) return durations def estimate_mp3_duration(fpath): """Estimates the duration of an MP3 file from metadata and file-size. Is only accurate for 16000 khz sample rate audio with a relatively constant bit-rate. Args: fpath (str): The input path to the MP3 file Returns: float: The duration of the MP3 file in seconds """ conversion_factors = { "16_khz_single_channel": 0.000333318208471784, "16_khz_stereo": 0.000333318208471784/2 } duration_seconds = 0 try: md = torchaudio.info(fpath) except RuntimeError: return duration_seconds nbytes = os.path.getsize(fpath) if md.num_channels == 1: if md.sample_rate == 16000: duration_seconds = nbytes*conversion_factors["16_khz_single_channel"] elif md.num_channels == 2: if md.sample_rate == 16000: duration_seconds = nbytes*conversion_factors["16_khz_stereo"] return duration_seconds def get_clip_duration(clip): """Gets the duration of an audio clip in seconds from file header information""" try: metadata = torchaudio.info(clip) except RuntimeError: # skip cases where file metadata can't be read return 0 return metadata.num_frames/metadata.sample_rate def get_wav_duration_from_filesize(size, nbytes=2): """ Calculates the duration (in seconds) from a WAV file, assuming it contains 16 khz single-channel audio. The bit depth is user specified, and defaults to 2 for 16-bit PCM audio. Args: size (int): The file size in bytes nbytes (int): How many bytes for each data point in the audio (e.g., 16-bit is 2, 32-bit is 4, etc.) Returns: float: The duration of the audio file in seconds """ return (size-44)/nbytes/16000 # Data augmentation utility function def mix_clips_batch( foreground_clips: List[str], background_clips: List[str], combined_size: int, labels: List[int] = [], batch_size: int = 32, snr_low: float = 0, snr_high: float = 0, start_index: List[int] = [], foreground_durations: List[float] = [], foreground_truncate_strategy: str = "random", rirs: List[str] = [], rir_probability: int = 1, volume_augmentation: bool = True, generated_noise_augmentation: float = 0.0, shuffle: bool = True, return_sequence_labels: bool = False, return_background_clips: bool = False, return_background_clips_delay: Tuple[int, int] = (0, 0), seed: int = 0 ): """ Mixes foreground and background clips at a random SNR level in batches. References: https://pytorch.org/audio/main/tutorials/audio_data_augmentation_tutorial.html and https://speechbrain.readthedocs.io/en/latest/API/speechbrain.processing.speech_augmentation.html#speechbrain.processing.speech_augmentation.AddNoise Args: foreground_clips (List[str]): A list of paths to the foreground clips background_clips (List[str]): A list of paths to the background clips (randomly selected for each foreground clip) combined_size (int): The total length (in samples) of the combined clip. If needed, the background clips are duplicated or truncated to reach this length. labels (List[int]): A list of integer labels corresponding 1:1 for the foreground clips. Will be updated as needed with foreground clips to ensure that mixed clips retain the proper labels. batch_size (int): The batch size snr_low (float): The low SNR level of the mixing in db snr_high (float): The high snr level of the mixing in db start_index (List[int]): The starting position (in samples) for the foreground clip to start in the background clip. If the foreground clip is longer than `combined_size` when starting at this point, the foreground clip will be truncated according to the `foreground_truncate_strategy` argument. foreground_durations (List[float]): The desired duration of each foreground clip (in seconds) foreground_truncate_strategy (str): The method used to truncate the foreground clip, if needed based on the `start_index`, `foreground_durations`, and `combined_size` arguments. See the options in the `truncate_clip` method. rirs (List[str]): A list of paths to room impulse response functions (RIR) to convolve with the clips to simulate different recording environments. Applies a single random selection from the list RIR file to the entire batch. If empty (the default), nothing is done. rir_probability (float): The probability (between 0 and 1) that the batch will be convolved with a RIR file. volume_augmentation (bool): Whether to randomly apply volume augmentation to the clips in the batch. This simply scales the data of each clip such that the maximum value is is between 0.02 and 1.0 (the floor shouldn't be zero as beyond a certain point the audio data is no longer valid). generated_noise_augmentation: The probability of further mixing the mixed clip with generated random noise. Will be either "white", "brown", "blue", "pink", or "violet" noise, mixed at a random SNR between `snr_low` and `snr_high`. return_background_clips (bool): Whether to return the segment of the background clip that was mixed with each foreground clip in the batch. return_background_clips_delay (Tuple(int)): The lower and upper bound of a random delay (in samples) to apply to the segment of each returned backgroud clip mixed with each foreground clip in the batch. This is primarily intended to simulate the drift between input and output channels in audio devices, which means that the mixed audio is never exactly aligned with the two source clips. shuffle (bool): Whether to shuffle the foreground clips before mixing (default: True) return_sequence_labels (bool): Whether to return sequence labels (i.e., frame-level labels) for each clip based on the start/end positions of the foreground clip. seed (int): A random seed Returns: generator: Returns a generator that yields batches of mixed foreground/background audio, labels, and the background segments used for each audio clip (or None is the `return_backgroun_clips` argument is False) """ # Set random seed, if needed if seed: np.random.seed(seed) random.seed(seed) # Check and Set start indices, if needed if not start_index: start_index = [0]*batch_size else: if min(start_index) < 0: raise ValueError("Error! At least one value of the `start_index` argument is <0. Check your inputs.") # Make dummy labels if not labels: labels = [0]*len(foreground_clips) if shuffle: p = np.random.permutation(len(foreground_clips)) foreground_clips = np.array(foreground_clips)[p].tolist() start_index = np.array(start_index)[p].tolist() labels = np.array(labels)[p].tolist() if foreground_durations: foreground_durations = np.array(foreground_durations)[p].tolist() for i in range(0, len(foreground_clips), batch_size): # Load foreground clips/start indices and truncate as needed sr = 16000 start_index_batch = start_index[i:i+batch_size] foreground_clips_batch = [read_audio(j) for j in foreground_clips[i:i+batch_size]] foreground_clips_batch = [j[0] if len(j.shape) > 1 else j for j in foreground_clips_batch] if foreground_durations: foreground_clips_batch = [truncate_clip(j, int(k*sr), foreground_truncate_strategy) for j, k in zip(foreground_clips_batch, foreground_durations[i:i+batch_size])] labels_batch = np.array(labels[i:i+batch_size]) # Load background clips and pad/truncate as needed background_clips_batch = [read_audio(j) for j in random.sample(background_clips, batch_size)] background_clips_batch = [j[0] if len(j.shape) > 1 else j for j in background_clips_batch] background_clips_batch_delayed = [] delay = np.random.randint(return_background_clips_delay[0], return_background_clips_delay[1] + 1) for ndx, background_clip in enumerate(background_clips_batch): if background_clip.shape[0] < (combined_size + delay): repeated = background_clip.repeat( np.ceil((combined_size + delay)/background_clip.shape[0]).astype(np.int32) ) background_clips_batch[ndx] = repeated[0:combined_size] background_clips_batch_delayed.append(repeated[0+delay:combined_size + delay].clone()) elif background_clip.shape[0] > (combined_size + delay): r = np.random.randint(0, max(1, background_clip.shape[0] - combined_size - delay)) background_clips_batch[ndx] = background_clip[r:r + combined_size] background_clips_batch_delayed.append(background_clip[r+delay:r + combined_size + delay].clone()) # Mix clips at snr levels snrs_db = np.random.uniform(snr_low, snr_high, batch_size) mixed_clips = [] sequence_labels = [] for fg, bg, snr, start in zip(foreground_clips_batch, background_clips_batch, snrs_db, start_index_batch): if bg.shape[0] != combined_size: raise ValueError(bg.shape) mixed_clip = mix_clip(fg, bg, snr, start) sequence_labels.append(get_frame_labels(combined_size, start, start+fg.shape[0])) if np.random.random() < generated_noise_augmentation: noise_color = ["white", "pink", "blue", "brown", "violet"] noise_clip = acoustics.generator.noise(combined_size, color=np.random.choice(noise_color)) noise_clip = torch.from_numpy(noise_clip/noise_clip.max()) mixed_clip = mix_clip(mixed_clip, noise_clip, np.random.choice(snrs_db), 0) mixed_clips.append(mixed_clip) mixed_clips_batch = torch.vstack(mixed_clips) sequence_labels_batch = torch.from_numpy(np.vstack(sequence_labels)) # Apply reverberation to the batch (from a single RIR file) if rirs: if np.random.random() <= rir_probability: rir_waveform, sr = torchaudio.load(random.choice(rirs)) if rir_waveform.shape[0] > 1: rir_waveform = rir_waveform[random.randint(0, rir_waveform.shape[0]-1), :] mixed_clips_batch = reverberate(mixed_clips_batch, rir_waveform, rescale_amp="avg") # Apply volume augmentation if volume_augmentation: volume_levels = np.random.uniform(0.02, 1.0, mixed_clips_batch.shape[0]) mixed_clips_batch = (volume_levels/mixed_clips_batch.max(axis=1)[0])[..., None]*mixed_clips_batch else: # Normalize clips only if max value is outside of [-1, 1] abs_max, _ = torch.max( torch.abs(mixed_clips_batch), dim=1, keepdim=True ) mixed_clips_batch = mixed_clips_batch / abs_max.clamp(min=1.0) # Convert to 16-bit PCM audio mixed_clips_batch = (mixed_clips_batch.numpy()*32767).astype(np.int16) # Remove any clips that are silent (happens rarely when mixing/reverberating) error_index = np.where(mixed_clips_batch.max(axis=1) != 0)[0] mixed_clips_batch = mixed_clips_batch[error_index] labels_batch = labels_batch[error_index] sequence_labels_batch = sequence_labels_batch[error_index] if not return_background_clips: yield mixed_clips_batch, labels_batch if not return_sequence_labels else sequence_labels_batch, None else: background_clips_batch_delayed = (torch.vstack(background_clips_batch_delayed).numpy() * 32767).astype(np.int16)[error_index] yield (mixed_clips_batch, labels_batch if not return_sequence_labels else sequence_labels_batch, background_clips_batch_delayed) def get_frame_labels(combined_size, start, end, buffer=1): sequence_label = np.zeros(np.ceil((combined_size-12400)/1280).astype(int)) frame_positions = np.arange(12400, combined_size, 1280) start_frame = np.argmin(abs(frame_positions - start)) end_frame = np.argmin(abs(frame_positions - end)) sequence_label[start_frame:start_frame+2] = 1 sequence_label[end_frame-1:end_frame+1] = 1 return sequence_label def mix_clip(fg, bg, snr, start): fg_rms, bg_rms = fg.norm(p=2), bg.norm(p=2) snr = 10 ** (snr / 20) scale = snr * bg_rms / fg_rms bg[start:start + fg.shape[0]] = bg[start:start + fg.shape[0]] + scale*fg return bg / 2 def truncate_clip(x, max_size, method="truncate_start"): """ Truncates and audio clip with the specified method Args: x (nd.array): An array of audio data max_size (int): The maximum size (in samples) method (str): Can be one of four options: - "truncate_start": Truncate the start of the clip - "truncate_end": Truncate the end of the clip - "truncate_both": Truncate both the start and end of the clip - "random": Randomly select a segment of the right size from the clip Returns: nd.array: The truncated audio data """ if x.shape[0] > max_size: if method == "truncate_start": x = x[x.shape[0] - max_size:] if method == "truncate_end": x = x[0:max_size] if method == "truncate_both": n = int(np.ceil(x.shape[0] - max_size)/2) x = x[n:-n][0:max_size] if method == "random": rn = np.random.randint(0, x.shape[0] - max_size) x = x[rn:rn + max_size] return x # Reverberation data augmentation function def apply_reverb(x, rir_files): """ Applies reverberation to the input audio clips Args: x (nd.array): A numpy array of shape (batch, audio_samples) containing the audio clips rir_files (Union[str, list]): Either a path to an RIR (room impulse response) file or a list of RIR files. If a list, one file will be randomly chosen to apply to `x` Returns: nd.array: The reverberated audio clips """ if isinstance(rir_files, str): rir_waveform, sr = torchaudio.load(rir_files[0]) elif isinstance(rir_files, list): rir_waveform, sr = torchaudio.load(random.choice(rir_files)) # Apply reverberation to the batch (from a single RIR file) if rir_waveform.shape[0] > 1: rir_waveform = rir_waveform[random.randint(0, rir_waveform.shape[0]-1), :] reverbed = reverberate(torch.from_numpy(x), rir_waveform, rescale_amp="avg") return reverbed.numpy() # Load batches of data from mmaped numpy arrays class mmap_batch_generator: """ A generator class designed to dynamically build batches from mmaped numpy arrays. The generator will return tuples of (data, labels) with a batch size determined by the `n_per_class` initialization argument. When a mmaped numpy array has been fully interated over, it will restart at the zeroth index automatically. """ def __init__(self, data_files: dict, label_files: dict = {}, batch_size: int = 128, n_per_class: dict = {}, data_transform_funcs: dict = {}, label_transform_funcs: dict = {} ): """ Initialize the generator object Args: data_files (dict): A dictionary of labels (as keys) and on-disk numpy array paths (as values). Keys should be integer strings representing class labels. label_files (dict): A dictionary where the keys are the class labels and the values are the per-example labels. The values must be the same shape as the correponding numpy data arrays from the `data_files` argument. batch_size (int): The number of samples per batch n_per_class (dict): A dictionary with integer string labels (as keys) and number of example per batch (as values). If None (the default), batch sizes for each class will be automatically calculated based on the the input dataframe shapes and transformation functions. data_transform_funcs (dict): A dictionary of transformation functions to apply to each batch of per class data loaded from the mmaped array. For example, with an array of shape (batch, timesteps, features), if the goal is to half the timesteps per example, (effectively doubling the size of the batch) this function could be passed: lambda x: np.vstack( (x[:, 0:timesteps//2, :], x[:, timesteps//2:, :] )) The user should incorporate the effect of any transform on the values of the `n_per_class` argument accordingly, in order to end of with the desired total batch size for each iteration of the generator. label_transform_funcs (dict): A dictionary of transformation functions to apply to each batch of labels. For example, strings can be mapped to integers or one-hot encoded, groups of classes can be merged together into one, etc. """ # inputs self.data_files = data_files self.label_files = label_files self.n_per_class = n_per_class self.data_transform_funcs = data_transform_funcs self.label_transform_funcs = label_transform_funcs # Get array mmaps and store their shapes (but load files < 1 GB total size into memory) self.data = {label: np.load(fl, mmap_mode='r') for label, fl in data_files.items()} self.labels = {label: np.load(fl) for label, fl in label_files.items()} self.data_counter = {label: 0 for label in data_files.keys()} self.original_shapes = {label: self.data[label].shape for label in self.data.keys()} self.shapes = {label: self.data[label].shape for label in self.data.keys()} # # Update effective shape of mmap array based on user-provided transforms (currently broken) # for lbl, f in self.data_transform_funcs.items(): # dummy_data = np.random.random((1, self.original_shapes[lbl][1], self.original_shapes[lbl][2])) # new_shape = f(dummy_data).shape # self.shapes[lbl] = (new_shape[0]*self.original_shapes[lbl][0], new_shape[1], new_shape[2]) # Calculate batch sizes, if the user didn't specify them scale_factor = 1 if not self.n_per_class: self.n_per_class = {} for lbl, shape in self.shapes.items(): dummy_data = np.random.random((10, self.shapes[lbl][1], self.shapes[lbl][2])) if self.data_transform_funcs.get(lbl, None): scale_factor = self.data_transform_funcs.get(lbl, None)(dummy_data).shape[0]/10 ratio = self.shapes[lbl][0]/sum([i[0] for i in self.shapes.values()]) self.n_per_class[lbl] = max(1, int(int(batch_size*ratio)/scale_factor)) # Get estimated batches per epoch, including the effect of any user-provided transforms batch_size = sum([val*scale_factor for val in self.n_per_class.values()]) batches_per_epoch = sum([i[0] for i in self.shapes.values()])//batch_size self.batch_per_epoch = batches_per_epoch print("Batches/steps per epoch:", batches_per_epoch) def __iter__(self): return self def __next__(self): # Build batch while True: X, y = [], [] for label, n in self.n_per_class.items(): # Restart at zeroth index if an array reaches the end if self.data_counter[label] >= self.shapes[label][0]: self.data_counter[label] = 0 # self.data[label] = np.load(self.data_files[label], mmap_mode='r') # Get data from mmaped file x = self.data[label][self.data_counter[label]:self.data_counter[label]+n] self.data_counter[label] += x.shape[0] # Transform data if self.data_transform_funcs and self.data_transform_funcs.get(label): x = self.data_transform_funcs[label](x) # Make labels for data (following whatever the current shape of `x` is) if self.label_files.get(label, None): y_batch = self.labels[label][self.data_counter[label]:self.data_counter[label]+n] else: y_batch = [label]*x.shape[0] # Transform labels if self.label_transform_funcs and self.label_transform_funcs.get(label): y_batch = self.label_transform_funcs[label](y_batch) # Add data to batch X.append(x) y.extend(y_batch) return np.vstack(X), np.array(y) # Function to remove empty rows from the end of a mmap array def trim_mmap(mmap_path): """ Trims blank rows from the end of a mmaped numpy array by creates new mmap array without the blank rows. Note that a copy is created and disk usage will briefly double as the function runs. Args: mmap_path (str): The path to mmap array file to trim Returns: None """ # Identify the last full row in the mmaped file mmap_file1 = np.load(mmap_path, mmap_mode='r') i = -1 while np.all(mmap_file1[i, :, :] == 0): i -= 1 N_new = mmap_file1.shape[0] + i + 1 # Create new mmap_file and copy over data in batches output_file2 = mmap_path.strip(".npy") + "2.npy" mmap_file2 = open_memmap(output_file2, mode='w+', dtype=np.float32, shape=(N_new, mmap_file1.shape[1], mmap_file1.shape[2])) for i in tqdm(range(0, mmap_file1.shape[0], 1024), total=mmap_file1.shape[0]//1024): if i + 1024 > N_new: mmap_file2[i:N_new] = mmap_file1[i:N_new].copy() mmap_file2.flush() else: mmap_file2[i:i+1024] = mmap_file1[i:i+1024].copy() mmap_file2.flush() # Remove old mmaped file os.remove(mmap_path) # Rename new mmap file to match original os.rename(output_file2, mmap_path)