Commit 582e619e authored by Vladimir Malenovsky's avatar Vladimir Malenovsky
Browse files

refactoring of the script to use the trim_meta() function

parent 07b19e4b
Loading
Loading
Loading
Loading
Loading
+149 −108
Original line number Diff line number Diff line
@@ -31,12 +31,11 @@
#
import logging
from itertools import groupby, repeat
from math import floor
from pathlib import Path

import numpy as np

from ivas_processing_scripts.audiotools import audio, audioarray, audiofile
from ivas_processing_scripts.audiotools import audio, audioarray, audiofile, metadata
from ivas_processing_scripts.audiotools.convert.objectbased import convert_objectbased
from ivas_processing_scripts.audiotools.wrappers.bs1770 import loudness_norm
from ivas_processing_scripts.generation import config
@@ -196,126 +195,108 @@ def generate_ismN_scene(

    # initialize output ISM object
    y = audio.ObjectBasedAudio(ism_format)
    y.fs = cfg.fs

    # set the frame length
    frame_len = int(cfg.fs / 50)

    # repeat for all source files
    offset = 0
    for i in range(N_inputs):
        # parse parameters from the scene description
        # read input filename
        source_file = (
            scene["input"][i] if isinstance(scene["input"], list) else scene["input"]
        )

        input_filename = Path(source_file).parent / (
            cfg.use_input_prefix + Path(source_file).name
        )

        # read azimuth and elevation information
        if "azimuth" in scene.keys():
            source_azi = (
            scene["azimuth"][i]
            if isinstance(scene["azimuth"], list)
            else scene["azimuth"]
                scene["azimuth"][i] if isinstance(scene["azimuth"], list) else scene["azimuth"]
            )
        else:
            source_azi = 0.0

        if "elevation" in scene.keys():
            source_ele = (
            scene["elevation"][i]
            if isinstance(scene["elevation"], list)
            else scene["elevation"]
                scene["elevation"][i] if isinstance(scene["elevation"], list) else scene["elevation"]
            )
        else:
            source_ele = 0.0

        # read the overlap length
        # read the source shift length (in seconds)
        if "shift" in scene.keys():
            source_shift = (
                scene["shift"][i]
                if isinstance(scene["shift"], list)
                else scene["shift"]
                scene["shift"][i] if isinstance(scene["shift"], list) else scene["shift"]
            )
        else:
            source_shift = 0.0

        # convert overlap to samples and ensure it is a multiple of 20ms
        source_shift = source_shift * cfg.fs
        if source_shift >= 0:
            source_shift = int(np.floor(source_shift / frame_len) * frame_len)
        else:
            source_shift = int(np.ceil(source_shift / frame_len) * frame_len)

        # read the level
        if "level" in scene.keys():
            level = (
                scene["level"][i]
                if isinstance(scene["level"], list)
                else scene["level"]
                scene["level"][i] if isinstance(scene["level"], list) else scene["level"]
            )
        else:
            level = -26

        logger.info(f"Encoding {source_file} at position(s) {source_azi},{source_ele}")

        # get input filename
        input_filename = Path(source_file).parent / (
            cfg.use_input_prefix + Path(source_file).name
        )

        # generate ISM metadata .csv filename (should end with .wav..0.csv, .wav.1.csv, ...)
        y.metadata_files.insert(i, str(output_filename.with_suffix(f".{i}.csv")))

        # read source file
        x = audio.fromfile("MONO", input_filename, fs=cfg.fs)

        # adjust the level of the source file
        x.audio, _ = loudness_norm(x, level, loudness_format="MONO")

        # shift the source signal (positive shift creates overlap, negative shift creates a gap)
        if int(floor(-source_shift)) != 0:
            x.audio = audioarray.trim(x.audio, x.fs, limits=[-source_shift, 0])

        # get the number of frames (multiple of 20ms)
        frame_len = int(x.fs / 50)
        N_frames = int(len(x.audio) / frame_len)
        x = audio.fromtype("ISM1")
        x.audio, x.fs = audiofile.read(input_filename)

        # resample to the target fs if necessary
        if x.fs != cfg.fs:
            logger.warning(
                f"Warning: Sample rate of the audio source is {x.fs} Hz and needs to be resampled to {cfg.fs}!"
            )
            resampled_audio = audioarray.resample(x.audio, x.fs, cfg.fs)
            x.audio = resampled_audio
            x.fs = cfg.fs

        # adjust the level of the audio source file (need to convert to MONO first)
        x_temp = audio.ChannelBasedAudio("MONO")  # create a temporary mono audio object
        x_temp.audio = x.audio.copy()
        x_temp.fs = x.fs
        x_temp.audio, _ = loudness_norm(x_temp, level, loudness_format="MONO")
        x.audio = x_temp.audio

        # ensure the length of the audio source signal is a multiple of 20ms
        if len(x.audio) % frame_len != 0:
            # pad with zeros to ensure that the signal length is a multiple of 20ms
            if len(x.audio) % frame_len != 0:
                N_pad = int(frame_len - len(x.audio) % frame_len)
                x.audio = audioarray.trim(x.audio, x.fs, limits=[0, -N_pad], samples=True)

        if y.audio is None:
            # add source signal to the array of all source signals
            y.audio = x.audio.copy()
            y.fs = x.fs
        else:
            # pad ISM signal with zeros to have the same length as the MASA signal
            N_pad = y.audio.shape[0] - x.audio.shape[0]
            if N_pad != 0:
                x.audio = audioarray.trim(
                    x.audio, x.fs, limits=[0, -N_pad], samples=True
                )

            # append ISM signal to the ISM object
            y.audio = np.append(y.audio, x.audio, axis=1)

    # append pre-amble and post-amble to all sources
    y.audio = audioarray.trim(y.audio, y.fs, limits=[-cfg.preamble, -cfg.postamble])

    # add random noise
    if cfg.add_low_level_random_noise:
        # create uniformly distributed noise between -4 and 4
        np.random.seed(SEED_RANDOM_NOISE)
        noise = np.random.randint(low=-4, high=5, size=y.audio.shape).astype("float")

        # superimpose
        y.audio += noise

    # generate ISM metadata
    for i in range(N_inputs):
        # parse metadata parameters from the scene description
        source_azi = (
            scene["azimuth"][i]
            if isinstance(scene["azimuth"], list)
            else scene["azimuth"]
        )
        source_ele = (
            scene["elevation"][i]
            if isinstance(scene["elevation"], list)
            else scene["elevation"]
        )

        N_frames = int(np.rint((len(y.audio) / y.fs * 50)))
        # get the number of frames (multiple of 20ms)
        N_frames = int(len(x.audio) / frame_len)

        # read azimuth information and convert to an array
        # convert azimuth information in case of moving object
        if isinstance(source_azi, str):
            if ":" in source_azi:
                # start with the initial azimuth value and apply step N_frames times
                source_azi = source_azi.split(":")
                azi = np.arange(
                    float(eval(source_azi[0])),
                    float(eval(source_azi[0])) + N_frames * float(eval(source_azi[1])),
                    float(eval(source_azi[1])),
                )
                # convert into array (initial_value:step:stop_value)
                start_str, step_str, stop_str = source_azi.split(":")
                start = float(eval(start_str))
                step = float(eval(step_str))
                stop = float(eval(stop_str))
                azi = np.arange(start, stop, step)

                # adjust length to N_frames
                if len(azi) > N_frames:
                    azi = azi[:N_frames]
                elif len(azi) < N_frames:
                    azi = np.append(azi, np.full(N_frames - len(azi), azi[-1]))
            else:
                # replicate static azimuth value N_frames times
                azi = np.repeat(float(eval(source_azi)), N_frames)
@@ -332,21 +313,22 @@ def generate_ismN_scene(
                f"Incorrect value(s) of azimuth: {azi[(azi > 180) | (azi < -180)]}"
            )

        # read elevation information and convert to an array
        # convert elevation information in case mof moving object
        if isinstance(source_ele, str):
            if ":" in source_ele:
                # convert into array (initial_value:step:stop_value)
                # note: the stop_value value is +-90 degrees depending on the sign of the step
                source_ele = source_ele.split(":")
                ele = np.arange(
                    float(eval(source_ele[0])),
                    np.sign(float(eval(source_ele[1]))) * 90,
                    float(eval(source_ele[1])),
                )[:N_frames]

                # repeat the last elevation value, if array is shorter than N_frames
                if len(ele) < N_frames:
                start_str, step_str, stop_str = source_ele.split(":")
                start = float(eval(start_str))
                step = float(eval(step_str))
                stop = float(eval(stop_str))
                ele = np.arange(start, stop, step)

                # adjust length to N_frames
                if len(ele) > N_frames:
                    ele = ele[:N_frames]
                elif len(ele) < N_frames:
                    ele = np.append(ele, np.full(N_frames - len(ele), ele[-1]))

            else:
                # replicate static elevation value N_frames times
                ele = np.repeat(float(eval(source_ele)), N_frames)
@@ -354,24 +336,83 @@ def generate_ismN_scene(
            # replicate static elevation value N_frames times
            ele = np.repeat(float(source_ele), N_frames)

        # wrap elevation angle to -90 .. +90
        ele = ((ele + 90) % 180) - 90

        # check if elevation is from -90 .. +90
        if any(ele > 90) or any(ele < -90):
            logger.error(
                f"Incorrect value(s) of elevation: {ele[(ele > 90) | (ele < -90)]}"
            )

        # generate radius vector with all values equal to 1.0
        rad = np.ones(N_frames)

        # arrange all metadata fields column-wise into a matrix
        x_meta = np.column_stack((azi, ele))
        x.object_pos.append(np.column_stack((azi, ele, rad)))

        # copy new audio source signal to the ISMn object
        if y.audio is None:
            # add the first audio source signal to the array of all source signals
            y.audio = x.audio.copy()
            y.object_pos = x.object_pos.copy()
            y.fs = x.fs
            # if source_shift < 0:
            #     # insert zeros to the new audio source signal to shift it right
            #     metadata.trim_meta(y, limits=[source_shift, 0], samples=True)
            offset = source_shift
        else:
            # shift the beginning of the audio source signal
            delta_offset = source_shift - offset
            if delta_offset > 0:
                # insert zeros to the previous ISM signal(s) to shift them right
                metadata.trim_meta(y, limits=[-delta_offset, 0], samples=True)
                offset = source_shift
            else:
                # insert zeros to the new audio source signal to shift it right
                metadata.trim_meta(x, limits=[delta_offset, 0], samples=True)

            # adjust the length of the audio source signal
            delta_length = len(x.audio) - len(y.audio)
            if delta_length > 0:
                # pad zeros to the previous ISM signal(s)
                metadata.trim_meta(y, limits=[0, -delta_length], samples=True)
            else:
                # pad zeros to the new audio source signal
                metadata.trim_meta(x, limits=[0, delta_length], samples=True)

            y.audio = np.append(y.audio, x.audio, axis=1)
            y.object_pos.extend(x.object_pos)

        # add ISM metadata .csv filename (should end with .wav.0.csv, .wav.1.csv, ...)
        y.metadata_files.insert(i, str(output_filename.with_suffix(f".{i}.csv")))

        # write to .csv output metadata file
        np.savetxt(y.metadata_files[i - 1], x_meta, fmt="%0.2f", delimiter=",", encoding="utf-8")
    # append pre-amble and post-amble
    metadata.trim_meta(y, limits=[-cfg.preamble * 1000, -cfg.postamble * 1000])

    y.init_metadata()  # this is needed to populate 'y.object_pos[]'
    # add random noise
    if cfg.add_low_level_random_noise:
        # create uniformly distributed noise between -4 and 4
        np.random.seed(SEED_RANDOM_NOISE)
        noise = np.random.randint(low=-4, high=5, size=y.audio.shape).astype("float")
        y.audio += noise

    # adjust the length of the output signal
    if "duration" in cfg.__dict__:
        # trim the output signal such that the total duration is X seconds
        duration = int(cfg.duration * cfg.fs)  # convert to samples
    else:
        # do not change the length of the audio signal
        duration = len(y.audio)
    duration = int(np.floor(duration / frame_len) * frame_len)  # ensure multiple of 20ms
    if len(y.audio) != duration:
        metadata.trim_meta(y, limits=[0, len(y.audio) - duration], samples=True)

    # write the OMASA output to .wav file in an interleaved format
    # write the ISMn output to .wav file in an interleaved format and ISM metadata in .csv files
    audiofile.write(output_filename, y.audio, y.fs)
    metadata.write_ISM_metadata_in_file(y.object_pos, y.metadata_files)

    # convert to ISM output to BINAURAL, if option was chosen
    # convert to BINAURAL, if option was chosen
    if cfg.binaural_output:
        binaudio = audio.fromtype("BINAURAL")
        binaudio.fs = y.fs