Commit 35e1cc00 authored by Vinit Veera's avatar Vinit Veera
Browse files

Extended the pre-existing function to pad and align the audio to 20ms.

parent 3e8c1d3d
Loading
Loading
Loading
Loading
+6 −3
Original line number Diff line number Diff line
@@ -43,6 +43,7 @@ from ivas_processing_scripts.constants import (
)
from ivas_processing_scripts.processing import chains, config
from ivas_processing_scripts.processing.processing import (
    compare_wav_lengths,
    multiple_of_frame_size,
    preprocess,
    preprocess_2,
@@ -96,6 +97,9 @@ def main(args):
        # set up logging
        logger = logging_init(args, cfg)

        # checking if audio is a multiple of frame size
        multiple_of_frame_size(cfg, logger)

        # Re-ordering items based on concatenation order
        if hasattr(cfg, "preprocessing_2"):
            if (
@@ -128,9 +132,6 @@ def main(args):

        cfg.metadata_path = metadata

        # checking if audio is a multiple of frame size
        multiple_of_frame_size(cfg)

        # run preprocessing only once
        if hasattr(cfg, "preprocessing"):
            # save process info for background noise
@@ -180,6 +181,8 @@ def main(args):
        if cfg.condition_in_output_filename:
            rename_generated_conditions(cfg.output_path)

        compare_wav_lengths(cfg.input_path, cfg.output_path, logger)

    # copy configuration to output directory
    with open(cfg.output_path.joinpath(f"{cfg.name}.yml"), "w") as f:
        yaml.safe_dump(cfg._yaml_dump, f)
+71 −4
Original line number Diff line number Diff line
@@ -319,6 +319,9 @@ def preprocess_2(cfg, logger):
    cfg.tmp_dirs = cfg.tmp_dirs[1:]
    cfg.out_dirs = cfg.out_dirs[1:]

    # Copy the conactenated file to the 20ms_aligned_files folder
    copyfile(cfg.concat_file, cfg.input_path / cfg.concat_file.name)

    return


@@ -519,10 +522,13 @@ def preprocess_background_noise(cfg):

def multiple_of_frame_size(
    cfg: TestConfig,
    logger: logging.Logger,
    frame_size_in_ms: Optional[int] = 20,
) -> np.ndarray:
    """
    Warn/Exit if audio if it isn't a multiple of frame size
    This function checks if the list of multi channel audio files is a multiple of frame size.
    If the file isn't a multiple then the function pads it to the next integer of frame size and writes the file to an output directory.
    It also copies the already aligned files to the output directory.

    Parameters
    ----------
@@ -534,7 +540,14 @@ def multiple_of_frame_size(
    # get the number of channels from the input format
    input_format = cfg.input["fmt"]
    num_channels = audio.fromtype(input_format).num_channels
    for item in cfg.items_list:

    # Create output directory if it doesn't exist
    output_dir = cfg.output_path / "20ms_aligned_files"
    output_dir.mkdir(exist_ok=True)

    # padded_audio_files_list = []

    for i, item in enumerate(cfg.items_list):
        # read the audio file
        if "fs" in cfg.input:
            sampling_rate = cfg.input["fs"]
@@ -555,15 +568,42 @@ def multiple_of_frame_size(
            )
        # warn if audio length not a multiple of frame length
        frame_length_samples = (frame_size_in_ms / 1000) * fs
        if n_samples_x % frame_length_samples != 0:
        remainder = n_samples_x % frame_length_samples
        if remainder != 0:
            # Calculate number of samples needed for padding
            padding_samples = int(frame_length_samples - remainder)

            if input_format.startswith("ISM") or input_format.startswith("MASA"):
                raise ValueError(
                    f"The length ({n_samples_x} samples) of audio ({item.name}) is not a multiple of frame length (20 ms)."
                )
            else:
                warn(
                    f"The length ({n_samples_x} samples) of audio ({item.name}) is not a multiple of frame length (20 ms)."
                    f"The length ({n_samples_x} samples) of audio ({item.name}) is not a multiple of frame length (20 ms). Padding to the nearest integer multiple."
                )
                logger.info(
                    f"The length ({n_samples_x} samples) of audio ({item.name}) is not a multiple of frame length (20 ms). Padding to the nearest integer multiple."
                )
                # Create and appending zeros
                padding_array = np.zeros((padding_samples, n_chan_x))
                padded_data = np.vstack((x, padding_array))
                # Write padded data to output directory
                write(output_dir / item.name, padded_data, fs)
        else:
            copyfile(item, output_dir / item.name)

        # Update audio file path in list
        cfg.items_list[i] = output_dir / item.name

    # Check if all files are present in output directory
    all_files_present = all(
        [(output_dir / item.name).exists() for audio_file in cfg.items_list]
    )
    if not all_files_present:
        raise Exception("Not all files are present in the output directory")

    # Make the output path as the new input path
    cfg.input_path = output_dir


def rename_generated_conditions(output_path: Path):
@@ -583,3 +623,30 @@ def rename_generated_conditions(output_path: Path):
            for file_path in subdirectory.iterdir():
                new_filename = f"{file_path.stem}.{subdirectory.name}{file_path.suffix}"
                file_path.rename(subdirectory / new_filename)


def compare_wav_lengths(input_path: Path, output_path: Path, logger: logging.Logger):
    """
    The function compares the number of samples of the files present in the input directory
    to the corresponding output files in the subdirectories which start with "c" followed by 2 digits.

    Parameters
    ----------
    input_path: Path
        Path to input directory
    output_path: Path
        Path to output directory
    """
    for subdir in output_path.iterdir():
        if subdir.is_dir() and subdir.name.startswith("c"):
            for file in subdir.glob("*.wav"):
                input_file = input_path / file.name
                output_file = file
                input_array, input_fs = read(str(input_file))
                output_array, output_fs = read(str(output_file))
                input_length, input_channels = input_array.shape
                output_length, output_channles = output_array.shape
                if input_length != output_length:
                    logger.info(
                        f"The {input_file.name} has {input_length} samples and the output condition {subdir.name} {output_file.name} has {output_length} samples and the difference between the two is {input_length - output_length} samples.\n"
                    )