Loading ivas_processing_scripts/processing/processing.py +67 −5 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ import numpy as np from ivas_processing_scripts.audiotools import audio from ivas_processing_scripts.audiotools.audioarray import window from ivas_processing_scripts.audiotools.audiofile import concat, read, trim from ivas_processing_scripts.audiotools.audiofile import concat, read, trim, write from ivas_processing_scripts.audiotools.constants import IVAS_FRAME_LEN_MS from ivas_processing_scripts.audiotools.convert.__init__ import convert from ivas_processing_scripts.audiotools.metadata import ( Loading Loading @@ -177,6 +177,16 @@ def concat_teardown(x, splits, out_fmt, fs, in_fs, meta, logger: logging.Logger) new_splits.append(int(float(split_i) * relative_fs_change)) splits = new_splits # check if last split ending coincides with last sample of signal if splits[-1] > len(x): raise ValueError( f"Last split index {splits[-1]} is larger than the signal length {len(x)}" ) elif splits[-1] < len(x): warn( f"Last split index {splits[-1]} is smaller that the signal length {len(x)}" ) split_old = 0 split_signals = [] split_meta = [] Loading Loading @@ -451,7 +461,9 @@ def multiple_of_frame_size( frame_size_in_ms: Optional[int] = 20, ) -> np.ndarray: """ Warn/Exit if audio if it isn't a multiple of frame size This function checks if the list of multi channel audio files is a multiple of frame size. If the file isn't a multiple then the function pads it to the next integer of frame size and writes the file to an output directory. It also copies the already aligned files to the output directory. Parameters ---------- Loading @@ -463,7 +475,18 @@ def multiple_of_frame_size( # get the number of channels from the input format input_format = cfg.input["fmt"] num_channels = audio.fromtype(input_format).num_channels for item in cfg.items_list: # Create output directory output_dir = cfg.output_path / "20ms_aligned_files" try: output_dir.mkdir(exist_ok=False) except FileExistsError: raise ValueError( "Folder for 20ms aligned files already exists. Please move or delete folder" ) # iterate over input files for i, item in enumerate(cfg.items_list): # read the audio file if "fs" in cfg.input: sampling_rate = cfg.input["fs"] Loading @@ -473,7 +496,11 @@ def multiple_of_frame_size( elif item.suffix == ".wav": x, fs = read(item) sampling_rate = fs else: raise ValueError(f"Unsupported input file type {item.suffix}") n_samples_x, n_chan_x = x.shape # check for number of channels and sampling rate if fs != sampling_rate: raise ValueError( f"Sampling rate of the file ({fs}) does NOT match with that ({sampling_rate}) specified in the config yaml." Loading @@ -482,14 +509,49 @@ def multiple_of_frame_size( raise ValueError( f"The number of channels in the file ({n_chan_x}) do NOT match with those of format ({num_channels}, {input_format}) specified in the config yaml." ) # warn if audio length not a multiple of frame length frame_length_samples = (frame_size_in_ms / 1000) * fs if n_samples_x % frame_length_samples != 0: remainder = n_samples_x % frame_length_samples if remainder != 0: # Calculate number of samples needed for padding padding_samples = int(frame_length_samples - remainder) if input_format.startswith("ISM") or input_format.startswith("MASA"): raise ValueError( f"The length ({n_samples_x} samples) of audio ({item.name}) is not a multiple of frame length (20 ms)." ) else: warn( f"The length ({n_samples_x} samples) of audio ({item.name}) is not a multiple of frame length (20 ms)." f"The length ({n_samples_x} samples) of audio ({item.name}) is not a multiple of frame length (20 ms). Padding to the nearest integer multiple." ) # Create and append zeros padded_data = trim(x, sampling_rate, (0, -padding_samples), pad_noise=True, samples=True) # Write padded data to output directory write(output_dir / item.name, padded_data, fs) else: copyfile(item, output_dir / item.name) # Update audio file path in list cfg.items_list[i] = output_dir / item.name # Copy metadata and update path if input_format.startswith("ISM"): for j in range(int(cfg.input["fmt"][3])): copyfile( cfg.metadata_path[i][j], output_dir / cfg.metadata_path[i][j].name ) cfg.metadata_path[i][j] = output_dir / cfg.metadata_path[i][j].name elif input_format.startswith("MASA"): raise ValueError("MASA as input format not implemented yet") # Check if all files are present in output directory all_files_present = all( [(output_dir / audio_file.name).exists() for audio_file in cfg.items_list] ) if not all_files_present: raise Exception("Not all files are present in the output directory") # Make the output path as the new input path cfg.input_path = output_dir ivas_processing_scripts/processing/processing_splitting_scaling.py +14 −0 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ import logging import re from itertools import repeat from pathlib import Path from warnings import warn import numpy as np Loading Loading @@ -239,6 +240,19 @@ class Processing_splitting_scaling(Processing): out_meta = repeat(None) else: # check length of output signals input_aligned_file = ( in_file.parent.parent / "20ms_aligned_files" / f"{Path(in_file.stem).stem}.wav" ) input_aligned_array, _ = read(input_aligned_file) if (len_inp := len(input_aligned_array)) != (len_out := len(x)): warn( f"For file {out_file} the length is {len_out} and does not match the (frame aligned) input length {len_inp}." ) # set output values out_files = [out_file] file_splits = [x] if isinstance(audio.fromtype(self.out_fmt), audio.ObjectBasedAudio): Loading Loading
ivas_processing_scripts/processing/processing.py +67 −5 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ import numpy as np from ivas_processing_scripts.audiotools import audio from ivas_processing_scripts.audiotools.audioarray import window from ivas_processing_scripts.audiotools.audiofile import concat, read, trim from ivas_processing_scripts.audiotools.audiofile import concat, read, trim, write from ivas_processing_scripts.audiotools.constants import IVAS_FRAME_LEN_MS from ivas_processing_scripts.audiotools.convert.__init__ import convert from ivas_processing_scripts.audiotools.metadata import ( Loading Loading @@ -177,6 +177,16 @@ def concat_teardown(x, splits, out_fmt, fs, in_fs, meta, logger: logging.Logger) new_splits.append(int(float(split_i) * relative_fs_change)) splits = new_splits # check if last split ending coincides with last sample of signal if splits[-1] > len(x): raise ValueError( f"Last split index {splits[-1]} is larger than the signal length {len(x)}" ) elif splits[-1] < len(x): warn( f"Last split index {splits[-1]} is smaller that the signal length {len(x)}" ) split_old = 0 split_signals = [] split_meta = [] Loading Loading @@ -451,7 +461,9 @@ def multiple_of_frame_size( frame_size_in_ms: Optional[int] = 20, ) -> np.ndarray: """ Warn/Exit if audio if it isn't a multiple of frame size This function checks if the list of multi channel audio files is a multiple of frame size. If the file isn't a multiple then the function pads it to the next integer of frame size and writes the file to an output directory. It also copies the already aligned files to the output directory. Parameters ---------- Loading @@ -463,7 +475,18 @@ def multiple_of_frame_size( # get the number of channels from the input format input_format = cfg.input["fmt"] num_channels = audio.fromtype(input_format).num_channels for item in cfg.items_list: # Create output directory output_dir = cfg.output_path / "20ms_aligned_files" try: output_dir.mkdir(exist_ok=False) except FileExistsError: raise ValueError( "Folder for 20ms aligned files already exists. Please move or delete folder" ) # iterate over input files for i, item in enumerate(cfg.items_list): # read the audio file if "fs" in cfg.input: sampling_rate = cfg.input["fs"] Loading @@ -473,7 +496,11 @@ def multiple_of_frame_size( elif item.suffix == ".wav": x, fs = read(item) sampling_rate = fs else: raise ValueError(f"Unsupported input file type {item.suffix}") n_samples_x, n_chan_x = x.shape # check for number of channels and sampling rate if fs != sampling_rate: raise ValueError( f"Sampling rate of the file ({fs}) does NOT match with that ({sampling_rate}) specified in the config yaml." Loading @@ -482,14 +509,49 @@ def multiple_of_frame_size( raise ValueError( f"The number of channels in the file ({n_chan_x}) do NOT match with those of format ({num_channels}, {input_format}) specified in the config yaml." ) # warn if audio length not a multiple of frame length frame_length_samples = (frame_size_in_ms / 1000) * fs if n_samples_x % frame_length_samples != 0: remainder = n_samples_x % frame_length_samples if remainder != 0: # Calculate number of samples needed for padding padding_samples = int(frame_length_samples - remainder) if input_format.startswith("ISM") or input_format.startswith("MASA"): raise ValueError( f"The length ({n_samples_x} samples) of audio ({item.name}) is not a multiple of frame length (20 ms)." ) else: warn( f"The length ({n_samples_x} samples) of audio ({item.name}) is not a multiple of frame length (20 ms)." f"The length ({n_samples_x} samples) of audio ({item.name}) is not a multiple of frame length (20 ms). Padding to the nearest integer multiple." ) # Create and append zeros padded_data = trim(x, sampling_rate, (0, -padding_samples), pad_noise=True, samples=True) # Write padded data to output directory write(output_dir / item.name, padded_data, fs) else: copyfile(item, output_dir / item.name) # Update audio file path in list cfg.items_list[i] = output_dir / item.name # Copy metadata and update path if input_format.startswith("ISM"): for j in range(int(cfg.input["fmt"][3])): copyfile( cfg.metadata_path[i][j], output_dir / cfg.metadata_path[i][j].name ) cfg.metadata_path[i][j] = output_dir / cfg.metadata_path[i][j].name elif input_format.startswith("MASA"): raise ValueError("MASA as input format not implemented yet") # Check if all files are present in output directory all_files_present = all( [(output_dir / audio_file.name).exists() for audio_file in cfg.items_list] ) if not all_files_present: raise Exception("Not all files are present in the output directory") # Make the output path as the new input path cfg.input_path = output_dir
ivas_processing_scripts/processing/processing_splitting_scaling.py +14 −0 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ import logging import re from itertools import repeat from pathlib import Path from warnings import warn import numpy as np Loading Loading @@ -239,6 +240,19 @@ class Processing_splitting_scaling(Processing): out_meta = repeat(None) else: # check length of output signals input_aligned_file = ( in_file.parent.parent / "20ms_aligned_files" / f"{Path(in_file.stem).stem}.wav" ) input_aligned_array, _ = read(input_aligned_file) if (len_inp := len(input_aligned_array)) != (len_out := len(x)): warn( f"For file {out_file} the length is {len_out} and does not match the (frame aligned) input length {len_inp}." ) # set output values out_files = [out_file] file_splits = [x] if isinstance(audio.fromtype(self.out_fmt), audio.ObjectBasedAudio): Loading