Commit b0229483 authored by Vladimir Malenovsky's avatar Vladimir Malenovsky
Browse files

support RIR convolution for SBA formats

parent 595887f2
Loading
Loading
Loading
Loading
Loading
+8 −13
Original line number Diff line number Diff line
@@ -246,22 +246,18 @@ def generate_MASA_scene(
            # of the reference signal (0-based index)
            if isinstance(scene["shift"][i], str) and "(" in scene["shift"][i]:
                # extract X and i_ref
                match = re.match(
                    r"([+-]?\d*\.?\d+)[\(\[]([+-]?\d+)[\)\]]", scene["shift"][i]
                )
                match = re.match(r"([+-]?\d*\.?\d+)[\(\[]([+-]?\d+)[\)\]]", scene["shift"][i])

                if match:
                    overlap = float(match.group(1))
                    overlap_ref = int(match.group(2))
                else:
                    scene_shift_str = scene["shift"][i]
                    logger.error(
                        f"Unable to parse {scene_shift_str}. The specification of overlap or reference is incorrect!"
                    )
                    logger.error(f"Unable to parse {scene_shift_str}. The specification of overlap or reference is incorrect!")
                    sys.exit(-1)

                # calculate absolute shift of the source signal in seconds
                source_shift = end_position[overlap_ref] - overlap
                source_shift = end_position[overlap_ref] + overlap
        else:
            source_shift = 0.0

@@ -432,15 +428,14 @@ def generate_MASA_scene(
    # trim the output signal if the total duration exceeds X seconds
    if "duration" in cfg.__dict__:
        # convert from seconds to samples (ensure multiple of 20ms)
        duration = int(np.floor(int(cfg.duration * cfg.fs) / frame_len) * frame_len)
        duration = int(
            np.floor(int(cfg.duration * cfg.fs) / frame_len) * frame_len
        )

        # check if the current length of the output signal exceeds the duration
        if len(y_int.audio) > duration:
            y_int.audio = audioarray.trim(
                y_int.audio,
                y_int.fs,
                limits=[0, len(y_int.audio) - duration],
                samples=True,
                y_int.audio, y_int.fs, limits=[0, len(y_int.audio) - duration], samples=True
            )

    # adjust the loudness of the output signal
@@ -525,7 +520,7 @@ def generate_MASA_scene(
        y_int.audio = audioarray.window(y_int.audio, y_int.fs, cfg.fade_in_out * 1000)

    # generate MASA metadata filename (should end with .met)
    y.metadata_file = output_filename.with_suffix(".met")
    y.metadata_file = output_filename.with_suffix(output_filename.suffix + ".met")

    # convert the intermediate SBA output signal to MASA format
    render_sba_to_masa(y_int, y)
+2 −2
Original line number Diff line number Diff line
@@ -474,7 +474,7 @@ def generate_OMASA_scene(

            # add ISM metadata .csv filename (should end with .wav.0.csv, .wav.1.csv, ...)
            y_int.metadata_files.insert(
                i - 1, str(output_filename.with_suffix(f".{i - 1}.csv"))
                i - 1, str(output_filename.with_suffix(output_filename.suffix + f".{i - 1}.csv"))
            )

    # append pre-amble and post-amble
@@ -520,7 +520,7 @@ def generate_OMASA_scene(
        y_int.audio = audioarray.window(y_int.audio, y_int.fs, cfg.fade_in_out * 1000)

    # generate and insert MASA metadata filename (should end with .met)
    y.metadata_files.append(str(output_filename.with_suffix(".met")))
    y.metadata_files.append(str(output_filename.with_suffix(output_filename.suffix + ".met")))

    # convert the intermediate OSBA object to OMASA object
    convert_osba(y_int, y)
+1 −1
Original line number Diff line number Diff line
@@ -460,7 +460,7 @@ def generate_OSBA_scene(

            # add ISM metadata .csv filename (should end with .wav.0.csv, .wav.1.csv, ...)
            y.metadata_files.insert(
                i - 1, str(output_filename.with_suffix(f".{i - 1}.csv"))
                i - 1, str(output_filename.with_suffix(output_filename.suffix + f".{i - 1}.csv"))
            )

    # append pre-amble and post-amble
+121 −18
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ from pathlib import Path
import numpy as np

from ivas_processing_scripts.audiotools import audio, audioarray, audiofile
from ivas_processing_scripts.audiotools.convert.objectbased import convert_objectbased
from ivas_processing_scripts.audiotools.convert.scenebased import convert_scenebased
from ivas_processing_scripts.audiotools.wrappers.bs1770 import loudness_norm
from ivas_processing_scripts.audiotools.wrappers.reverb import (
@@ -208,13 +209,27 @@ def generate_sba_scene(
        source_file = (
            scene["input"][i] if isinstance(scene["input"], list) else scene["input"]
        )
        IR_file = scene["IR"][i] if isinstance(scene["IR"], list) else scene["IR"]

        # get input filename and IR filename
        input_filename = Path(source_file).parent / (
            cfg.use_input_prefix + Path(source_file).name
        )

        # get input filename and IR filename
        if "IR" in scene.keys():
            IR_file = scene["IR"][i] if isinstance(scene["IR"], list) else scene["IR"]
            IR_filename = Path(IR_file).parent / (cfg.use_IR_prefix + Path(IR_file).name)
        else:
            # read azimuth and elevation information
            source_azi = (
                scene["azimuth"][i]
                if isinstance(scene["azimuth"], list)
                else scene["azimuth"]
            )
            source_ele = (
                scene["elevation"][i]
                if isinstance(scene["elevation"], list)
                else scene["elevation"]
            )

        # read the source shift length (in seconds)
        if "shift" in scene.keys():
@@ -282,9 +297,14 @@ def generate_sba_scene(
        else:
            level = -26

        if "IR" in scene.keys():
            logger.info(
                f"-- Convolving {source_file} with {IR_file} at {level} LKFS with shift of {source_shift_in_seconds} seconds"
            )
        else:
            logger.info(
                f"-- Encoding {source_file} at position(s) {source_azi},{source_ele} at {level} LKFS with shift of {source_shift_in_seconds} seconds"
            )

        # read source file
        x = audio.fromfile("MONO", input_filename)
@@ -301,17 +321,6 @@ def generate_sba_scene(
            x.audio = resampled_audio
            x.fs = cfg.fs

        # read the IR file (!must be in target format!)
        IR = audio.fromfile(cfg.format, IR_filename)

        # convolve MONO source audio with FOA/HOA2/HOA3 IR -> results in FOA/HOA2/HOA3 audio object
        if cfg.format == "FOA":
            x = reverb_foa(x, IR, mode=None)
        elif cfg.format == "HOA2":
            x = reverb_hoa2(x, IR, mode=None)
        elif cfg.format == "HOA3":
            x = reverb_hoa3(x, IR, mode=None)

        # adjust the level of the FOA/HOA2/HOA3 signal
        if level is None:
            # do not change the level of the audio source signal
@@ -331,6 +340,100 @@ def generate_sba_scene(
                    x.audio, x.fs, limits=[0, -N_pad], samples=True
                )

        # get the number of frames (multiple of 20ms)
        N_frames = int(len(x.audio) / frame_len)

        if "IR" in scene.keys():
            # read the IR file (!must be in target format!)
            IR = audio.fromfile(cfg.format, IR_filename)

            # convolve MONO source audio with FOA/HOA2/HOA3 IR -> results in FOA/HOA2/HOA3 audio object
            if cfg.format == "FOA":
                x = reverb_foa(x, IR, mode=None)
            elif cfg.format == "HOA2":
                x = reverb_hoa2(x, IR, mode=None)
            elif cfg.format == "HOA3":
                x = reverb_hoa3(x, IR, mode=None)
        else:
            # convert MONO to ISM1
            x_ism = audio.ObjectBasedAudio("ISM1")  # ISM with 1 channel
            x_ism.fs = cfg.fs
            x_ism.audio = x.audio.copy()

            # convert azimuth information in case of moving object
            if isinstance(source_azi, str):
                if ":" in source_azi:
                    # convert into array (initial_value:step:stop_value)
                    start_str, step_str, stop_str = source_azi.split(":")
                    start = float(eval(start_str))
                    step = float(eval(step_str))
                    stop = float(eval(stop_str))
                    azi = np.arange(start, stop, step)

                    # adjust length to N_frames
                    if len(azi) > N_frames:
                        azi = azi[:N_frames]
                    elif len(azi) < N_frames:
                        azi = np.append(azi, np.full(N_frames - len(azi), azi[-1]))
                else:
                    # replicate static azimuth value N_frames times
                    azi = np.repeat(float(eval(source_azi)), N_frames)
            else:
                # replicate static azimuth value N_frames times
                azi = np.repeat(float(source_azi), N_frames)

            # convert azimuth from 0 .. 360 to -180 .. +180
            azi = (azi + 180) % 360 - 180

            # check if azimuth is from -180 .. +180
            if any(azi > 180) or any(azi < -180):
                logger.error(
                    f"Incorrect value(s) of azimuth: {azi[(azi > 180) | (azi < -180)]}"
                )

            # convert elevation information in case mof moving object
            if isinstance(source_ele, str):
                if ":" in source_ele:
                    # convert into array (initial_value:step:stop_value)
                    start_str, step_str, stop_str = source_ele.split(":")
                    start = float(eval(start_str))
                    step = float(eval(step_str))
                    stop = float(eval(stop_str))
                    ele = np.arange(start, stop, step)

                    # adjust length to N_frames
                    if len(ele) > N_frames:
                        ele = ele[:N_frames]
                    elif len(ele) < N_frames:
                        ele = np.append(ele, np.full(N_frames - len(ele), ele[-1]))

                else:
                    # replicate static elevation value N_frames times
                    ele = np.repeat(float(eval(source_ele)), N_frames)
            else:
                # replicate static elevation value N_frames times
                ele = np.repeat(float(source_ele), N_frames)

            # wrap elevation angle to -90 .. +90
            ele = ((ele + 90) % 180) - 90

            # check if elevation is from -90 .. +90
            if any(ele > 90) or any(ele < -90):
                logger.error(
                    f"Incorrect value(s) of elevation: {ele[(ele > 90) | (ele < -90)]}"
                )

            # generate radius vector with all values equal to 1.0
            rad = np.ones(N_frames)

            # arrange all metadata fields column-wise into a matrix
            x_ism.object_pos.append(np.column_stack((azi, ele, rad)))

            # convert ISM1 object to SBA
            x_sba = audio.SceneBasedAudio(cfg.format)
            convert_objectbased(x_ism, x_sba)
            x = x_sba  # replace x with the SBA object

        # add the convolved FOA/HOA2/HOA3 audio source signal to the output signal
        if y.audio is None:
            # add source signal to the array of all source signals
@@ -338,7 +441,7 @@ def generate_sba_scene(

            if source_shift > 0:
                # insert zeros to the new audio source signal to shift it right
                y.audio = audioarray.trim_meta(
                y.audio = audioarray.trim(
                    y.audio, y.fs, limits=[-source_shift, 0], samples=True
                )
            else: