Commit 661bc864 authored by Vladimir Malenovsky's avatar Vladimir Malenovsky
Browse files

refactoring to use trim_meta() function

parent 8134591d
Loading
Loading
Loading
Loading
+159 −106
Original line number Diff line number Diff line
@@ -38,7 +38,7 @@ from pathlib import Path

import numpy as np

from ivas_processing_scripts.audiotools import audio, audioarray, audiofile
from ivas_processing_scripts.audiotools import audio, audioarray, audiofile, metadata
from ivas_processing_scripts.audiotools.convert.osba import convert_osba
from ivas_processing_scripts.audiotools.wrappers.bs1770 import loudness_norm
from ivas_processing_scripts.generation import config
@@ -195,13 +195,25 @@ def generate_OSBA_scene(

    # initialize output OSBA object
    y = audio.OSBAAudio(osba_format)
    y.fs = cfg.fs

    # set the frame length
    frame_len = int(cfg.fs / 50)

    # repeat for all source files
    offset = 0
    for i in range(N_inputs):
        # parse parameters from the scene description
        source_file = (
            scene["input"][i] if isinstance(scene["input"], list) else scene["input"]
        )

        # get input filename
        input_filename = Path(source_file).parent / (
            cfg.use_input_prefix + Path(source_file).name
        )

        # read azimuth and elevation information
        source_azi = (
            scene["azimuth"][i]
            if isinstance(scene["azimuth"], list)
@@ -223,6 +235,13 @@ def generate_OSBA_scene(
        else:
            source_shift = 0.0

        # convert overlap to samples and ensure it is a multiple of 20ms
        source_shift = source_shift * cfg.fs
        if source_shift >= 0:
            source_shift = int(np.floor(source_shift / frame_len) * frame_len)
        else:
            source_shift = int(np.ceil(source_shift / frame_len) * frame_len)

        # read the level
        if "level" in scene.keys():
            level = (
@@ -260,86 +279,56 @@ def generate_OSBA_scene(
            )
            sys.exit(-1)

        if fmt == "MONO":
            # generate ISM metadata .csv filename (should end with .wav.0.csv, .wav.1.csv, ...)
            y.metadata_files.insert(i - 1, f"{output_filename}.{i-1}.csv")

        # read source file
        x = audio.fromfile(fmt, input_filename, fs=cfg.fs)

        # resample to the target fs if necessary
        if x.fs != cfg.fs:
            logger.warning(
                f"Warning: Sample rate of the audio source is {x.fs} Hz and needs to be resampled to {cfg.fs}!"
            )
            resampled_audio = audioarray.resample(x.audio, x.fs, cfg.fs)
            x.audio = resampled_audio
            x.fs = cfg.fs

        # adjust the level of the source file
        if fmt in ["FOA", "HOA2", "HOA3"]:
            x.audio, _ = loudness_norm(x, level, loudness_format="STEREO", rms=True)
        else:
            x.audio, _ = loudness_norm(x, level, loudness_format="MONO")

        # shift the source signal (positive shift creates overlap, negative shift creates a gap)
        if int(floor(-source_shift)) != 0:
            x.audio = audioarray.trim(x.audio, x.fs, limits=[-source_shift, 0])

        # get the number of frames (multiple of 20ms)
        frame_len = int(x.fs / 50)
        N_frames = int(len(x.audio) / frame_len)

        # ensure the length of the audio source signal is a multiple of 20ms
        if len(x.audio) % frame_len != 0:
            # pad with zeros to ensure that the signal length is a multiple of 20ms
            if len(x.audio) % frame_len != 0:
            # pad the source signal
                N_pad = int(frame_len - len(x.audio) % frame_len)
                x.audio = audioarray.trim(x.audio, x.fs, limits=[0, -N_pad], samples=True)

        if fmt in ["FOA", "HOA2", "HOA3"]:
            # copy FOA/HOA2/HOA3 signal to the OSBA oject
            y.audio = x.audio
            y.fs = x.fs
        else:
            # pad ISM signal with zeros to have the same length as the SBA signal
            N_pad = y.audio.shape[0] - x.audio.shape[0]
            if N_pad != 0:
                x.audio = audioarray.trim(
                    x.audio, x.fs, limits=[0, -N_pad], samples=True
                )

            # append ISM signal to the OSBA object (ISM comes first !!!)
            y.audio = np.insert(y.audio, [i - 1], x.audio, axis=1)

    # append pre-amble and post-amble to all sources
    y.audio = audioarray.trim(y.audio, y.fs, limits=[-cfg.preamble, -cfg.postamble])

    # add random noise
    if cfg.add_low_level_random_noise:
        # create uniformly distributed noise between -4 and 4
        np.random.seed(SEED_RANDOM_NOISE)
        noise = np.random.randint(low=-4, high=5, size=y.audio.shape).astype("float")

        # superimpose
        y.audio += noise

    # generate ISM metadata files
    for i in range(1, N_ISMs + 1):
        # parse metadata parameters from the scene description
        source_azi = (
            scene["azimuth"][i]
            if isinstance(scene["azimuth"], list)
            else scene["azimuth"]
        )
        source_ele = (
            scene["elevation"][i]
            if isinstance(scene["elevation"], list)
            else scene["elevation"]
        )
        # get the number of frames (multiple of 20ms)
        N_frames = int(len(x.audio) / frame_len)

        N_frames = int(np.rint((len(y.audio) / y.fs * 50)))
        # convert the input audio source signal to ISM
        if fmt == "MONO":
            # convert MONO to ISM1
            x_ism = audio.ObjectBasedAudio("ISM1")  # ISM with 1 channel
            x_ism.fs = cfg.fs
            x_ism.audio = x.audio.copy()

        # read azimuth information and convert to an array
            # convert azimuth information in case of moving object
            if isinstance(source_azi, str):
                if ":" in source_azi:
                # start with the initial azimuth value and apply step N_frames times
                source_azi = source_azi.split(":")
                azi = np.arange(
                    float(eval(source_azi[0])),
                    float(eval(source_azi[0])) + N_frames * float(eval(source_azi[1])),
                    float(eval(source_azi[1])),
                )
                    # convert into array (initial_value:step:stop_value)
                    start_str, step_str, stop_str = source_azi.split(":")
                    start = float(eval(start_str))
                    step = float(eval(step_str))
                    stop = float(eval(stop_str))
                    azi = np.arange(start, stop, step)

                    # adjust length to N_frames
                    if len(azi) > N_frames:
                        azi = azi[:N_frames]
                    elif len(azi) < N_frames:
                        azi = np.append(azi, np.full(N_frames - len(azi), azi[-1]))
                else:
                    # replicate static azimuth value N_frames times
                    azi = np.repeat(float(eval(source_azi)), N_frames)
@@ -356,21 +345,22 @@ def generate_OSBA_scene(
                    f"Incorrect value(s) of azimuth: {azi[(azi > 180) | (azi < -180)]}"
                )

        # read elevation information and convert to an array
            # convert elevation information in case mof moving object
            if isinstance(source_ele, str):
                if ":" in source_ele:
                    # convert into array (initial_value:step:stop_value)
                # note: the stop_value value is +-90 degrees depending on the sign of the step
                source_ele = source_ele.split(":")
                ele = np.arange(
                    float(eval(source_ele[0])),
                    np.sign(float(eval(source_ele[1]))) * 90,
                    float(eval(source_ele[1])),
                )[:N_frames]

                # repeat the last elevation value, if array is shorter than N_frames
                if len(ele) < N_frames:
                    start_str, step_str, stop_str = source_ele.split(":")
                    start = float(eval(start_str))
                    step = float(eval(step_str))
                    stop = float(eval(stop_str))
                    ele = np.arange(start, stop, step)

                    # adjust length to N_frames
                    if len(ele) > N_frames:
                        ele = ele[:N_frames]
                    elif len(ele) < N_frames:
                        ele = np.append(ele, np.full(N_frames - len(ele), ele[-1]))

                else:
                    # replicate static elevation value N_frames times
                    ele = np.repeat(float(eval(source_ele)), N_frames)
@@ -378,22 +368,87 @@ def generate_OSBA_scene(
                # replicate static elevation value N_frames times
                ele = np.repeat(float(source_ele), N_frames)

            # wrap elevation angle to -90 .. +90
            ele = ((ele + 90) % 180) - 90

            # check if elevation is from -90 .. +90
            if any(ele > 90) or any(ele < -90):
                logger.error(
                    f"Incorrect value(s) of elevation: {ele[(ele > 90) | (ele < -90)]}"
                )

            # generate radius vector with all values equal to 1.0
            rad = np.ones(N_frames)

            # arrange all metadata fields column-wise into a matrix
        x_meta = np.column_stack((azi, ele))
            x_ism.object_pos.append(np.column_stack((azi, ele, rad)))

            x = x_ism  # replace x with the ISM object

        # copy new audio source signal to the OSBA object
        if y.audio is None:
            # add the first audio source signal (should be FOA/HOA2/HOA3) to the array of all source signals
            y.audio = x.audio.copy()

            if fmt == "MONO":
                # if ISM, append object position to the OSBA object
                y.object_pos = x.object_pos.copy()

            # if source_shift < 0:
            #     # insert zeros to the new audio source signal to shift it right
            #     metadata.trim_meta(y, limits=[source_shift, 0], samples=True)
            offset = source_shift
        else:
            # shift the beginning of the audio source signal
            delta_offset = source_shift - offset
            if delta_offset > 0:
                # insert zeros to the previous ISM signal(s) to shift them right
                metadata.trim_meta(y, limits=[-delta_offset, 0], samples=True)
                offset = source_shift
            else:
                # insert zeros to the new audio source signal to shift it right
                metadata.trim_meta(x, limits=[delta_offset, 0], samples=True)

            # adjust the length of the audio source signal
            delta_length = len(x.audio) - len(y.audio)
            if delta_length > 0:
                # pad zeros to the previous ISM signal(s)
                metadata.trim_meta(y, limits=[0, -delta_length], samples=True)
            else:
                # pad zeros to the new audio source signal
                metadata.trim_meta(x, limits=[0, delta_length], samples=True)

            # append ISM signal to the OMASA object (ISM comes first !!!)
            y.audio = np.insert(y.audio, [i - 1], x.audio, axis=1)
            y.object_pos.extend(x.object_pos)

            # add ISM metadata .csv filename (should end with .wav.0.csv, .wav.1.csv, ...)
            y.metadata_files.insert(i - 1, str(output_filename.with_suffix(f".{i - 1}.csv")))

    # append pre-amble and post-amble
    metadata.trim_meta(y, limits=[-cfg.preamble * 1000, -cfg.postamble * 1000])

        # write to .csv output metadata file
        np.savetxt(y.metadata_files[i - 1], x_meta, fmt="%0.2f", delimiter=",", encoding="utf-8")
    # add random noise
    if cfg.add_low_level_random_noise:
        # create uniformly distributed noise between -4 and 4
        np.random.seed(SEED_RANDOM_NOISE)
        noise = np.random.randint(low=-4, high=5, size=y.audio.shape).astype("float")
        y.audio += noise

    y.init_metadata()  # this is needed to populate 'y.object_pos[]'
    # adjust the length of the output signal
    if "duration" in cfg.__dict__:
        # trim the output signal such that the total duration is X seconds
        duration = int(cfg.duration * cfg.fs)  # convert to samples
    else:
        # do not change the length of the audio signal
        duration = len(y.audio)
    duration = int(np.floor(duration / frame_len) * frame_len)  # ensure multiple of 20ms
    if len(y.audio) != duration:
        metadata.trim_meta(y, limits=[0, len(y.audio) - duration], samples=True)

    # write the OSBA output to .wav file in an interleaved format
    # write the OSBA audio output to .wav file in an interleaved format and ISM metadata in .csv files
    audiofile.write(output_filename, y.audio, y.fs)
    metadata.write_ISM_metadata_in_file(y.object_pos, y.metadata_files)

    # convert the OSBA output to BINAURAL, if option was chosen
    if cfg.binaural_output:
@@ -408,5 +463,3 @@ def generate_OSBA_scene(
            binaudio.audio,
            binaudio.fs,
        )

    return