Commit 800d5656 authored by Vladimir Malenovsky's avatar Vladimir Malenovsky
Browse files

update all item generation scripts for all formats

parent a240ad89
Loading
Loading
Loading
Loading
Loading
+97 −135
Original line number Diff line number Diff line
@@ -34,12 +34,15 @@ import logging
import os
from itertools import groupby, repeat
from math import floor
from pathlib import Path

import numpy as np

from ivas_processing_scripts.audiotools import audio, audiofile, convert
from ivas_processing_scripts.audiotools.wrappers.bs1770 import get_loudness
from ivas_processing_scripts.audiotools.wrappers.reverb import reverb_foa, reverb_hoa2
from ivas_processing_scripts.audiotools import audio, audioarray, audiofile, convert
from ivas_processing_scripts.audiotools.convert.objectbased import convert_objectbased
from ivas_processing_scripts.audiotools.convert.scenebased import convert_scenebased
from ivas_processing_scripts.audiotools.wrappers.bs1770 import get_loudness, loudness_norm
from ivas_processing_scripts.audiotools.wrappers.reverb import reverb_foa, reverb_hoa2, reverb_hoa3
from ivas_processing_scripts.generation import config
from ivas_processing_scripts.utils import apply_func_parallel

@@ -76,7 +79,7 @@ def generate_ambi_items(
    cfg: config.TestConfig,
    logger: logging.Logger,
):
    """Generate FOA/HOA2 items from mono items based on scene description"""
    """Generate FOA/HOA2/HOA3 items from mono items based on scene description"""

    # set the target level
    if "loudness" not in cfg.__dict__:
@@ -97,18 +100,10 @@ def generate_ambi_items(
    if "postamble" not in cfg.__dict__:
        cfg.postamble = 0.0

    # set the IR path
    if "IR_path" not in cfg.__dict__:
        cfg.IR_path = os.path.join(os.path.dirname(__file__), "IR")

    # set the pre-amble and post-amble
    if "add_low_level_random_noise" not in cfg.__dict__:
        cfg.add_low_level_random_noise = False

    # setup binaural rendering
    if "binaural_path" not in cfg.__dict__:
        cfg.binaural_path = ""

    # set the listening lab designator
    if "listening_lab" not in cfg.__dict__:
        cfg.listening_lab = "l"
@@ -157,7 +152,7 @@ def generate_ambi_items(

    # set the prefix for all output filenames
    if "use_output_prefix" not in cfg.__dict__:
        cfg.use_output_prefix = None
        cfg.use_output_prefix = ""
    else:
        # replace file designators
        cfg.use_output_prefix = replace_char_seq_with_string(
@@ -169,13 +164,13 @@ def generate_ambi_items(

    # set multiprocessing
    if "multiprocessing" not in cfg.__dict__:
        cfg.multiprocessing = True
        cfg.multiprocessing = False

    apply_func_parallel(
        generate_ambi_scene,
        zip(cfg.scenes.keys(), cfg.scenes.values(), repeat(cfg), repeat(logger)),
        None,
        "mp" if cfg.multiprocessing else None,
        type = "mp" if cfg.multiprocessing else None,
        show_progress = None,
    )

    return
@@ -184,140 +179,114 @@ def generate_ambi_items(
def generate_ambi_scene(
    scene_name: str, scene: dict, cfg: config.TestConfig, logger: logging.Logger
):
    """
    Processes a single scene to generate FOA/HOA2/HOA3 item.

    Args:
        scene_name (str): The name of the scene being processed.
        scene (dict): A dictionary containing scene description, including source files, azimuth, elevation, and other parameters.
        cfg (config.TestConfig): Configuration object containing settings for processing, such as input/output paths, sampling rate, and loudness levels.
        logger (logging.Logger): Logger instance for logging information and errors.

    Expected Behavior:
        - Reads mono audio source files and processes them based on the scene description.
        - Writes the processed FOA/HOA2/HOA3 audio to the output file.
    """
    
    logger.info(
        f"Processing scene: {scene_name} out of {len(cfg.scenes)} scenes, name: {scene['name']}"
        f"Processing scene: {scene_name} out of {len(cfg.scenes)} scenes, output: {scene['output']}"
    )

    # extract the number of audio sources
    N_sources = len(np.atleast_1d(scene["source"]))
    N_inputs = len(np.atleast_1d(scene["input"]))

    # read the overlap length
    if "overlap" in scene.keys():
        source_overlap = float(scene["overlap"])
    else:
        source_overlap = 0.0
    # initialize output dirs
    output_filename = Path(scene["output"]).parent / (cfg.use_output_prefix + Path(scene["output"]).name)

    # read the ambi format
    if "format" in scene.keys():
        ambi_format = scene["format"]
    else:
        ambi_format = "FOA"
    dir_path = output_filename.parent
    if dir_path and not dir_path.exists():
        dir_path.mkdir(parents=True, exist_ok=True)

    # initialize output audio object
    y = audio.SceneBasedAudio(cfg.format)

    for i in range(N_inputs):

    len_s1 = 0
    y = audio.SceneBasedAudio(ambi_format)
    for i in range(N_sources):
        # parse parameters from the scene description
        source_file = np.atleast_1d(scene["source"])[i]
        source_file = np.atleast_1d(scene["input"])[i]
        IR_file = np.atleast_1d(scene["IR"])[i]

        # read the overlap length
        if "shift" in scene.keys():
            source_shift = (
                scene["shift"][i]
                if isinstance(scene["shift"], list)
                else scene["shift"]
            )
        else:
            source_shift = 0.0

        # read the level
        if "level" in scene.keys():
            level = (
                scene["level"][i]
                if isinstance(scene["level"], list)
                else scene["level"]
            )
        else:
            level = -26

        logger.info(f"Convolving {source_file} with {IR_file}")

        # read source file
        x = audio.fromfile(
            "MONO",
            os.path.join(
                cfg.input_path,
                os.path.dirname(source_file),
                cfg.use_input_prefix + os.path.basename(source_file),
            ),
            fs=cfg.fs,
        )
        # get input filename and IR filename
        input_filename = Path(source_file).parent / (cfg.use_input_prefix + Path(source_file).name)
        IR_filename = Path(IR_file).parent / (cfg.use_IR_prefix + Path(IR_file).name)   

        # read the IR file
        IR = audio.fromfile(
            ambi_format,
            os.path.join(
                cfg.IR_path,
                os.path.dirname(IR_file),
                cfg.use_IR_prefix + os.path.basename(IR_file),
            ),
            fs=cfg.IR_fs,
        )
        # read source file
        x = audio.fromfile( "MONO", input_filename, fs=cfg.fs )

        if i == 0:
            len_s1 = x.audio.shape[0]
        # read the IR file (!must be in target format!)
        IR = audio.fromfile( cfg.format, IR_filename, fs=cfg.IR_fs )  

        # convolve with the FOA/HOA2 IR
        if ambi_format == "FOA":
        # convolve with the FOA/HOA2/HOA3 IR
        if cfg.format == "FOA":
            x = reverb_foa(x, IR)
        elif ambi_format == "HOA2":
        elif cfg.format == "HOA2":
            x = reverb_hoa2(x, IR)
        elif cfg.format == "HOA3":
            x = reverb_hoa3(x, IR)

        # adjust the level of the foa signal
        _, scale_factor, _ = get_loudness(x, cfg.loudness, "BINAURAL")
        x.audio *= scale_factor

        # shift the second (and all other) source files (positive shift creates overlap, negative shift creates a gap)
        if i > 0:
            # get the length of the first source file
            N_delay = len_s1
        # adjust the level of the target signal
        x.audio, _ = loudness_norm(x, level, loudness_format="STEREO")

            # add the shift
            N_delay += int(-source_overlap * x.fs)
        # shift the source signal (positive shift creates overlap, negative shift creates a gap)
        if int(floor(-source_shift)) != 0:
            x.audio = audioarray.trim(x.audio, x.fs, limits=[-source_shift, 0])

            # insert all-zero preamble
            pre = np.zeros((N_delay, x.audio.shape[1]))
            x.audio = np.concatenate([pre, x.audio])
        # get the number of frames (multiple of 20ms)
        frame_len = int(x.fs / 50)
        N_frames = int(len(x.audio) / frame_len)

        # pad with zeros to ensure that the signal length is a multiple of 20ms
        N_frame = x.fs / 50
        if len(x.audio) % N_frame != 0:
            N_pad = int(N_frame - len(x.audio) % N_frame)
        if len(x.audio) % frame_len != 0:
            N_pad = int(frame_len - len(x.audio) % frame_len)
            x.audio = audioarray.trim(x.audio, x.fs, limits=[0, -N_pad], samples=True)

            # insert all-zero preamble
            pre = np.zeros((N_pad, x.audio.shape[1]))
            x.audio = np.concatenate([pre, x.audio])

        # add source signal to the array of source signals
        y.fs = x.fs
        if y.audio is None:
            # add source signal to the array of all source signals
            y.audio = x.audio.copy()
            y.fs = x.fs
        else:
            # pad with zeros to have equal length of all source signals
            if x.audio.shape[0] > y.audio.shape[0]:
                y.audio = np.vstack(
                    (
                        y.audio,
                        np.zeros(
                            (
                                x.audio.shape[0] - y.audio.shape[0],
                                y.audio.shape[1],
                            )
                        ),
                    )
                )
            elif y.audio.shape[0] > x.audio.shape[0]:
                x.audio = np.vstack(
                    (
                        x.audio,
                        np.zeros(
                            (
                                y.audio.shape[0] - x.audio.shape[0],
                                x.audio.shape[1],
                            )
                        ),
                    )
                )
            # adjust the signal length (trim from the end or pad with zeros) to align its length with the previous signal(s)
            N_pad = y.audio.shape[0] - x.audio.shape[0]
            if N_pad != 0:
                x.audio = audioarray.trim(x.audio, x.fs, limits=[0, -N_pad], samples=True)
            
            # superimpose
            y.audio += x.audio

    # append pre-amble and post-amble to all sources
    if cfg.preamble != 0.0:
        # ensure that pre-amble is a multiple of 20ms
        N_pre = int(floor(cfg.preamble * 50) / 50 * y.fs)

        # insert all-zero preamble to all sources
        pre = np.zeros((N_pre, y.audio.shape[1]))
        y.audio = np.concatenate([pre, y.audio])

    if cfg.postamble != 0.0:
        # ensure that post-mable is a multiple of 20ms
        N_post = int(floor(cfg.postamble * 50) / 50 * y.fs)

        # append all-zero postamble to all sources
        post = np.zeros((N_post, y.audio.shape[1]))
        y.audio = np.concatenate([y.audio, post])
    y.audio = audioarray.trim(y.audio, y.fs, limits=[-cfg.preamble, -cfg.postamble])

    # add random noise
    if cfg.add_low_level_random_noise:
@@ -328,26 +297,19 @@ def generate_ambi_scene(
        # superimpose
        y.audio += noise

    # write the reverberated audio into output file
    audiofile.write(
        os.path.join(
            cfg.output_path,
            os.path.dirname(scene["name"]),
            cfg.use_output_prefix + os.path.basename(scene["name"]),
        ),
        y.audio,
        y.fs,
    )
    # write the FOA/HOA2/HOA3 audio into output file
    audiofile.write( output_filename, y.audio, y.fs )

    # convert to binaural if option chosen
    if cfg.binaural_path != "":
    # convert to BINAURAL, if option was chosen
    if cfg.binaural_output:
        binaudio = audio.fromtype("BINAURAL")
        binaudio.fs = y.fs
        convert.format_conversion(y, binaudio)
        convert_scenebased(y, binaudio)
        binaural_output_filename = output_filename.with_name(output_filename.stem + "_BINAURAL" + output_filename.suffix)
        audiofile.write(
            os.path.join(cfg.binaural_path, scene["name"]),
            binaural_output_filename,
            binaudio.audio,
            binaudio.fs,
        )

        logger.info(f"Written BINAURAL output to: {binaural_output_filename}")
    return
+46 −50
Original line number Diff line number Diff line
@@ -32,12 +32,10 @@

import csv
import logging
import os
import sys
from itertools import groupby, repeat
from math import floor
from pathlib import Path
from sox import file_info

import numpy as np

from ivas_processing_scripts.audiotools import audio, audiofile, audioarray
@@ -78,7 +76,8 @@ def replace_char_seq_with_string(str, char_seq, repl_str):
# function for appending string to a filename before file extension    
def append_str_filename(filename, str_to_append):
    p = Path(filename)
  return "{0}{2}{1}".format(p.stem, p.suffix, str_to_append)
    # Combine the stem, the string to append, and the suffix
    return p.parent / (p.stem + str_to_append + p.suffix)

def generate_omasa_items(
    cfg: config.TestConfig,
@@ -153,7 +152,7 @@ def generate_omasa_items(
        cfg.multiprocessing = False

    apply_func_parallel(
        generate_scene,
        generate_OMASA_scene,
        zip(cfg.scenes.keys(), cfg.scenes.values(), repeat(cfg), repeat(logger)),
        type = "mp" if cfg.multiprocessing else None,
        show_progress = None,
@@ -162,7 +161,7 @@ def generate_omasa_items(
    return


def generate_scene(
def generate_OMASA_scene(
    scene_name: str, scene: dict, cfg: config.TestConfig, logger: logging.Logger
):
    """
@@ -180,29 +179,33 @@ def generate_scene(
        - Writes the processed audio and metadata to output files.
        - Handles various audio formats (e.g., MONO, FOA, HOA2) and applies transformations like loudness normalization, trimming, and padding.
    """
    logger.info( f"Processing scene {scene_name}:")

    logger.info(
        f"Processing {scene_name} out of {len(cfg.scenes)} scenes, output: {scene['output']}"
    )
    
    # extract the number of audio sources
    N_sources = len(np.atleast_1d(scene["source"]))
    N_ISMs = N_sources-1
    N_inputs = len(np.atleast_1d(scene["input"]))
    N_ISMs = N_inputs-1

    # initialize output dirs
    # get output filename
    omasa_format = f"ISM{N_ISMs}MASA{cfg.masa_tc}DIR{cfg.masa_dirs}"
    output_filename = os.path.join( cfg.output_path, os.path.dirname(scene["name"]), cfg.use_output_prefix + append_str_filename(os.path.basename(scene["name"]), f"_s{scene_name}_{omasa_format}") )
    output_filename = Path(scene["output"]).parent / (cfg.use_output_prefix + Path(scene["output"]).name)

    dir_path = os.path.dirname(output_filename)
    if dir_path and not os.path.exists(dir_path):
        os.makedirs(dir_path, exist_ok=True)
    # initialize output dirs
    dir_path = output_filename.parent
    if dir_path and not dir_path.exists():
        dir_path.mkdir(parents=True, exist_ok=True)

    # initialize output OMASA object
    y = audio.OMASAAudio(omasa_format)

    # repeat for all source files
    for i in range(N_sources):
    for i in range(N_inputs):

        # parse parameters from the scene description
        source_file = (
            scene["source"][i] if isinstance(scene["source"], list) else scene["source"]
            scene["input"][i] if isinstance(scene["input"], list) else scene["input"]
        )
        source_azi = (
            scene["azimuth"][i]
@@ -215,15 +218,15 @@ def generate_scene(
            else scene["elevation"]
        )

        # read the overlap length
        if "overlap" in scene.keys():
            source_overlap = (
                scene["overlap"][i]
                if isinstance(scene["overlap"], list)
                else scene["overlap"]
        # read the shift length
        if "shift" in scene.keys():
            source_shift = (
                scene["shift"][i]
                if isinstance(scene["shift"], list)
                else scene["shift"]
            )
        else:
            source_overlap = 0.0
            source_shift = 0.0
            
        # read the level
        if "level" in scene.keys():
@@ -237,8 +240,12 @@ def generate_scene(

        logger.info(f"Encoding {source_file} at position(s) {source_azi},{source_ele}")

        # get input filename
        input_filename = Path(source_file).parent / (cfg.use_input_prefix + Path(source_file).name)
   
        # get the number of channels from the .wav file header
        N_channels = file_info.channels(os.path.join(cfg.input_path, os.path.dirname(source_file), cfg.use_input_prefix + os.path.basename(source_file)))
        wav_header = audiofile.parse_wave_header(input_filename)
        N_channels = wav_header['channels']

        if N_channels == 1:
            fmt = "MONO"
@@ -256,28 +263,13 @@ def generate_scene(
    
        if fmt in ["FOA", "HOA2", "HOA3"]:
            # generate MASA metadata .met filename (should end with .met)
            y.metadata_files.append(os.path.splitext(output_filename)[0]+".met")
            y.metadata_files.append(output_filename.with_suffix(".met"))
        elif fmt == "MONO":
            # generate ISM metadata .csv filename (should end with .wav..0.csv, .wav.1.csv, ...)
            y.metadata_files.insert(i-1, f"{output_filename}.{i-1}.csv")
            # generate ISM metadata .csv filename (should end with .wav.0.csv, .wav.1.csv, ...)
            y.metadata_files.insert(i-1, output_filename.with_suffix(f".{i-1}.csv"))
    
        # read source file
        x = audio.fromfile(
            fmt,
            os.path.join(
                cfg.input_path,
                os.path.dirname(source_file),
                cfg.use_input_prefix + os.path.basename(source_file),
            ),
            fs=cfg.fs,
        )

        # get the number of frames (multiple of 20ms)
        N_frames = int(len(x.audio) / x.fs * 50)
        frame_len = int(x.fs / 50)

        # trim the samples from the end to ensure that the signal length is a multiple of 20ms
        audioarray.cut(x.audio, [0, N_frames * frame_len])
        x = audio.fromfile( fmt, input_filename, fs=cfg.fs )

        # adjust the level of the source file
        if fmt in ["FOA", "HOA2", "HOA3"]:
@@ -286,8 +278,12 @@ def generate_scene(
            x.audio, _ = loudness_norm(x, level, loudness_format="MONO")
        
        # shift the source signal (positive shift creates overlap, negative shift creates a gap)
        if int(floor(-source_overlap)) != 0:
            x.audio = audioarray.trim(x.audio, x.fs, limits=[-source_overlap, 0])
        if int(floor(-source_shift)) != 0:
            x.audio = audioarray.trim(x.audio, x.fs, limits=[-source_shift, 0])

        # get the number of frames (multiple of 20ms)
        frame_len = int(x.fs / 50)
        N_frames = int(len(x.audio) / frame_len)
            
        # pad with zeros to ensure that the signal length is a multiple of 20ms
        if len(x.audio) % frame_len != 0:
@@ -416,13 +412,13 @@ def generate_scene(
    audiofile.write( output_filename, y.audio, y.fs )
    
    # convert to OMASA output to BINAURAL, if option was chosen
    if cfg.binaural_path != "":
    if cfg.binaural_output:
        binaudio = audio.fromtype("BINAURAL")
        binaudio.fs = y.fs
        convert_omasa(y, binaudio)
        binaural_output_filename = output_filename.with_name(output_filename.stem + "_BINAURAL" + output_filename.suffix)
        audiofile.write(
            os.path.join(
                cfg.binaural_path, append_str_filename(os.path.basename(scene["name"]), f"_s{scene_name}_{omasa_format}_BINAURAL") ),
            binaural_output_filename,
            binaudio.audio,
            binaudio.fs,
        )
+85 −113

File changed and moved.

Preview size limit exceeded, changes collapsed.

Loading