Commit b53319f6 authored by Vladimir Malenovsky's avatar Vladimir Malenovsky
Browse files

add OMASA item generation script

parent 2e95d804
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -45,6 +45,7 @@ from ivas_processing_scripts.generation import (
    process_ism1_items,
    process_ism2_items,
    process_stereo_items,
    generate_omasa_items,
)
from ivas_processing_scripts.utils import create_dir

@@ -103,6 +104,9 @@ def main(args):
    elif "FOA" in cfg.format or "HOA2" in cfg.format:
        # generate FOA/HOA2 items according to scene description
        process_ambi_items.generate_ambi_items(cfg, logger)
    elif "OMASA" in cfg.format:
        # generate OMASA items according to scene description
        generate_omasa_items.generate_omasa_items(cfg, logger)

    # copy configuration to output directory
    with open(cfg.output_path.joinpath(f"{'_'.join(cfg.format)}.yml"), "w") as f:
+413 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3

#
#  (C) 2022-2023 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB,
#  Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
#  Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
#  Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
#  contributors to this repository. All Rights Reserved.
#
#  This software is protected by copyright law and by international treaties.
#  The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB,
#  Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
#  Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
#  Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
#  contributors to this repository retain full ownership rights in their respective contributions in
#  the software. This notice grants no license of any kind, including but not limited to patent
#  license, nor is any license granted by implication, estoppel or otherwise.
#
#  Contributors are required to enter into the IVAS codec Public Collaboration agreement before making
#  contributions.
#
#  This software is provided "AS IS", without any express or implied warranties. The software is in the
#  development stage. It is intended exclusively for experts who have experience with such software and
#  solely for the purpose of inspection. All implied warranties of non-infringement, merchantability
#  and fitness for a particular purpose are hereby disclaimed and excluded.
#
#  Any dispute, controversy or claim arising under or in relation to providing this software shall be
#  submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in
#  accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and
#  the United Nations Convention on Contracts on the International Sales of Goods.
#
import pdb
import csv
import logging
import os
from itertools import groupby, repeat
from math import floor
from pathlib import Path
from sox import file_info

import numpy as np

from ivas_processing_scripts.audiotools import audio, audiofile, audioarray
from ivas_processing_scripts.audiotools.wrappers.bs1770 import get_loudness, loudness_norm
from ivas_processing_scripts.audiotools.convert.scenebased import render_sba_to_masa
from ivas_processing_scripts.audiotools.convert.omasa import convert_omasa
from ivas_processing_scripts.generation import config
from ivas_processing_scripts.utils import apply_func_parallel

import pdb

SEED_RANDOM_NOISE = 0


# function for converting nd numpy array to strings with 2 decimal digits
def csv_formatdata(data):
    for row in data:
        yield ["%0.2f" % v for v in row]


# function for searching sequences of same the same character and replacing it by another string
def replace_char_seq_with_string(str, char_seq, repl_str):
    result = []

    # find groups of consecutive letters
    groups = ["".join(list(g)) for k, g in groupby(str)]

    # limit the length of the replacement string by the length of the character sequence
    repl_str = repl_str[: len(char_seq)]

    # replace each occurence of the sequence of characters
    for g in groups:
        if char_seq in g:
            result.append(repl_str)
        else:
            result.append(g)

    return "".join(result)
    
# function for appending string to a filename before file extension    
def append_str_filename(filename, str_to_append):
  p = Path(filename)
  return "{0}{2}{1}".format(p.stem, p.suffix, str_to_append)

def generate_omasa_items(
    cfg: config.TestConfig,
    logger: logging.Logger,
):
    """Generate OMASA items with metadata from FOA/HO2 and ISMn items based on scene description"""

    # set the target level
    if "loudness" not in cfg.__dict__:
        cfg.loudness = -26

    # set the fs
    if "fs" not in cfg.__dict__:
        cfg.fs = 48000

    # set the pre-amble and post-amble
    if "preamble" not in cfg.__dict__:
        cfg.preamble = 0.0

    if "postamble" not in cfg.__dict__:
        cfg.postamble = 0.0

    # set the pre-amble and post-amble
    if "add_low_level_random_noise" not in cfg.__dict__:
        cfg.add_low_level_random_noise = False

    # set the listening lab designator
    if "listening_lab" not in cfg.__dict__:
        cfg.listening_lab = "l"

    # set the language designator
    if "language" not in cfg.__dict__:
        cfg.language = "EN"

    # set the experiment designator
    if "exp" not in cfg.__dict__:
        cfg.exp = "p07"

    # set the provider
    if "provider" not in cfg.__dict__:
        cfg.provider = "g"

    # set the prefix for all input filenames
    if "use_input_prefix" not in cfg.__dict__:
        cfg.use_input_prefix = ""
    else:
        # replace file designators
        cfg.use_input_prefix = replace_char_seq_with_string(
            cfg.use_input_prefix, "l", cfg.listening_lab
        )
        cfg.use_input_prefix = replace_char_seq_with_string(
            cfg.use_input_prefix, "LL", cfg.language
        )
        cfg.use_input_prefix = replace_char_seq_with_string(
            cfg.use_input_prefix, "eee", cfg.exp
        )

    # set the prefix for all output filenames
    if "use_output_prefix" not in cfg.__dict__:
        cfg.use_output_prefix = ""
    else:
        # replace file designators
        cfg.use_output_prefix = replace_char_seq_with_string(
            cfg.use_output_prefix, "l", cfg.listening_lab
        )
        cfg.use_output_prefix = replace_char_seq_with_string(
            cfg.use_output_prefix, "eee", cfg.exp
        )

    # set multiprocessing
    if "multiprocessing" not in cfg.__dict__:
        cfg.multiprocessing = True

    apply_func_parallel(
        generate_scene,
        zip(cfg.scenes.keys(), cfg.scenes.values(), repeat(cfg), repeat(logger)),
        None,
        # "mp" if cfg.multiprocessing else None,
        None,
    )

    return


def generate_scene(
    scene_name: str, scene: dict, cfg: config.TestConfig, logger: logging.Logger
):
    logger.info( f"Processing scene {scene_name}:")
    
    # extract the number of audio sources
    N_sources = len(np.atleast_1d(scene["source"]))
    N_ISMs = N_sources-1

    # initialize output array
    omasa_format = f"ISM{N_ISMs}MASA{cfg.masa_tc}DIR{cfg.masa_dirs}"
    output_filename = os.path.join( cfg.output_path, os.path.dirname(scene["name"]), cfg.use_output_prefix + append_str_filename(os.path.basename(scene["name"]), f"_s{scene_name}_{omasa_format}") )
    y = audio.OMASAAudio(omasa_format)

    # repeat for all source files
    for i in range(N_sources):

        # parse parameters from the scene description
        source_file = (
            scene["source"][i] if isinstance(scene["source"], list) else scene["source"]
        )
        source_azi = (
            scene["azimuth"][i]
            if isinstance(scene["azimuth"], list)
            else scene["azimuth"]
        )
        source_ele = (
            scene["elevation"][i]
            if isinstance(scene["elevation"], list)
            else scene["elevation"]
        )

        # read the overlap length
        if "overlap" in scene.keys():
            source_overlap = (
                scene["overlap"][i]
                if isinstance(scene["overlap"], list)
                else scene["overlap"]
            )
        else:
            source_overlap = 0.0
            
        # read the level
        if "level" in scene.keys():
            level = (
                scene["level"][i]
                if isinstance(scene["level"], list)
                else scene["level"]
            )
        else:
            level = -26

        logger.info(f"Encoding {source_file} at position(s) {source_azi},{source_ele}")

        # get the number of channels from the .wav file header
        N_channels = file_info.channels(os.path.join(cfg.input_path, os.path.dirname(source_file), cfg.use_input_prefix + os.path.basename(source_file)))
        
        if N_channels == 1:
            fmt = "MONO"
        elif N_channels == 2:
            fmt = "STEREO"
        elif N_channels == 4:
            fmt = "FOA"
        elif N_channels == 9:
            fmt = "HOA2"
        elif N_channels == 16:
            fmt = "HOA3"
        else:
            logger.info(f"Error: Input format of the source file with {N_channels} channels is not supported!")
            sys.exit(-1)
    
        if fmt in ["FOA", "HOA2"]:
            # generate MASA metadata .met filename (should end with .met)
            y.metadata_files.append(os.path.splitext(output_filename)[0]+".met")
        elif fmt == "MONO":
            # generate ISM metadata .csv filename (should end with .wav..0.csv, .wav.1.csv, ...)
            y.metadata_files.insert(i-1, f"{output_filename}.{i-1}.csv")
    
        # read source file
        x = audio.fromfile(
            fmt,
            os.path.join(
                cfg.input_path,
                os.path.dirname(source_file),
                cfg.use_input_prefix + os.path.basename(source_file),
            ),
            fs=cfg.fs,
        )

        # get the number of frames (multiple of 20ms)
        N_frames = int(len(x.audio) / x.fs * 50)
        frame_len = int(x.fs / 50)

        # trim the samples from the end to ensure that the signal length is a multiple of 20ms
        audioarray.cut(x.audio, [0, N_frames * frame_len])
        
        # adjust the level of the source file
        if fmt in ["FOA", "HOA2"]:
            x.audio, _ = loudness_norm(x, level, loudness_format="STEREO", rms=True)
        else:
            x.audio, _ = loudness_norm(x, level, loudness_format="MONO")
        
        # shift the source signal (positive shift creates overlap, negative shift creates a gap)
        if int(floor(-source_overlap)) != 0:
            x.audio = audioarray.trim(x.audio, x.fs, limits=[-source_overlap, 0])
            
        # pad with zeros to ensure that the signal length is a multiple of 20ms
        if len(x.audio) % frame_len != 0:
            # pad the source signal
            N_pad = int(frame_len - len(x.audio) % frame_len)
            x.audio = audioarray.trim(x.audio, x.fs, limits=[0, -N_pad], samples=True)

        # convert FOA to MASA
        if fmt in ["FOA", "HOA2"]:
            x_masa = audio.MetadataAssistedSpatialAudio(f"MASA2DIR1")
            x_masa.metadata_file = y.metadata_files[i]
            render_sba_to_masa(x, x_masa)
            y.audio = x_masa.audio
            y.fs = x.fs
        else:
            # pad ISM signal with zeros to have the same length as the MASA signal
            N_pad = y.audio.shape[0] - x.audio.shape[0]
            if N_pad != 0:
                x.audio = audioarray.trim(x.audio, x.fs, limits=[0, -N_pad], samples=True)
            
            # append ISM signal to the OMASA object (ISM comes first !!!)
            y.audio = np.insert(y.audio, [i-1], x.audio, axis=1)

    # append pre-amble and post-amble to all sources
    y.audio = audioarray.trim(y.audio, y.fs, limits=[-cfg.preamble, -cfg.postamble])
    
    # add random noise
    if cfg.add_low_level_random_noise:
        # create uniformly distributed noise between -4 and 4
        np.random.seed(SEED_RANDOM_NOISE)
        noise = np.random.randint(low=-4, high=5, size=y.audio.shape).astype("float")

        # superimpose
        y.audio += noise

    # generate ISM metadata files
    y_meta = None
    for i in range(1, N_ISMs + 1):
        # parse metadata parameters from the scene description
        source_azi = (
            scene["azimuth"][i]
            if isinstance(scene["azimuth"], list)
            else scene["azimuth"]
        )
        source_ele = (
            scene["elevation"][i]
            if isinstance(scene["elevation"], list)
            else scene["elevation"]
        )

        # pdb.set_trace()
        N_frames = int(np.rint((len(y.audio) / y.fs * 50)))

        # read azimuth information and convert to an array 
        if isinstance(source_azi, str):
            if ":" in source_azi:
                # start with the initial azimuth value and apply step N_frames times
                source_azi = source_azi.split(":")
                azi = np.arange(
                    float(eval(source_azi[0])),
                    float(eval(source_azi[0])) + N_frames * float(eval(source_azi[1])),
                    float(eval(source_azi[1]))
                )
            else:
                # replicate static azimuth value N_frames times
                azi = np.repeat(float(eval(source_azi)), N_frames)
        else:
            # replicate static azimuth value N_frames times
            azi = np.repeat(float(source_azi), N_frames)
            
        # convert azimuth from 0 .. 360 to -180 .. +180
        azi = (azi + 180) % 360 - 180

        # check if azimuth is from -180 .. +180
        if any(azi > 180) or any(azi < -180):
            logger.error(
                f"Incorrect value(s) of azimuth: {azi[(azi > 180) | (azi < -180)]}"
            )

        # read elevation information and convert to an array
        if isinstance(source_ele, str):
            if ":" in source_ele:
                # convert into array (initial_value:step:stop_value)
                # note: the stop_value value is +-90 degrees depending on the sign of the step
                source_ele = source_ele.split(":")
                ele = np.arange(
                    float(eval(source_ele[0])),
                    np.sign(float(eval(source_ele[1]))) * 90,
                    float(eval(source_ele[1]))
                )[:N_frames]

                # repeat the last elevation value, if array is shorter than N_frames
                if len(ele) < N_frames:
                    ele = np.append(ele, np.full(N_frames - len(ele), ele[-1]))
            else:
                # replicate static elevation value N_frames times
                ele = np.repeat(float(eval(source_ele)), N_frames)
        else:
            # replicate static elevation value N_frames times
            ele = np.repeat(float(source_ele), N_frames)

        # check if elevation is from -90 .. +90
        if any(ele > 90) or any(ele < -90):
            logger.error(
                f"Incorrect value(s) of elevation: {ele[(ele > 90) | (ele < -90)]}"
            )

        # arrange all metadata fields column-wise into a matrix
        x_meta = np.column_stack((azi, ele))
        
        # write to .csv output metadata file
        with open(
            y.metadata_files[i-1],
            "w",
            newline="",
            encoding="utf-8",
        ) as f:
            # create csv writer
            writer = csv.writer(f)

            # write all rows to the .csv file
            writer.writerows(csv_formatdata(x_meta))

    y.init_metadata()       # this is needed to populate 'y.object_pos[]'

    # write the OMASA output to .wav file in an interleaved format
    audiofile.write( output_filename, y.audio, y.fs )
    
    # convert to OMASA output to BINAURAL, if option was chosen
    if cfg.binaural_path != "":
        binaudio = audio.fromtype("BINAURAL")
        binaudio.fs = y.fs
        convert_omasa(y, binaudio)
        audiofile.write(
            os.path.join(
                cfg.binaural_path, append_str_filename(os.path.basename(scene["name"]), f"_s{scene_name}_{omasa_format}_BINAURAL") ),
            binaudio.audio,
            binaudio.fs,
        )
        
    return