Commit 9f6d28e8 authored by Vladimir Malenovsky's avatar Vladimir Malenovsky
Browse files

remove obsolete files

parent f886347b
Loading
Loading
Loading
Loading
Loading
+0 −353
Original line number Diff line number Diff line
#!/usr/bin/env python3

#
#  (C) 2022-2025 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB,
#  Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
#  Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
#  Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
#  contributors to this repository. All Rights Reserved.
#
#  This software is protected by copyright law and by international treaties.
#  The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB,
#  Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
#  Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
#  Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
#  contributors to this repository retain full ownership rights in their respective contributions in
#  the software. This notice grants no license of any kind, including but not limited to patent
#  license, nor is any license granted by implication, estoppel or otherwise.
#
#  Contributors are required to enter into the IVAS codec Public Collaboration agreement before making
#  contributions.
#
#  This software is provided "AS IS", without any express or implied warranties. The software is in the
#  development stage. It is intended exclusively for experts who have experience with such software and
#  solely for the purpose of inspection. All implied warranties of non-infringement, merchantability
#  and fitness for a particular purpose are hereby disclaimed and excluded.
#
#  Any dispute, controversy or claim arising under or in relation to providing this software shall be
#  submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in
#  accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and
#  the United Nations Convention on Contracts on the International Sales of Goods.
#

import logging
import os
from itertools import groupby, repeat
from math import floor

import numpy as np

from ivas_processing_scripts.audiotools import audio, audiofile, convert
from ivas_processing_scripts.audiotools.wrappers.bs1770 import get_loudness
from ivas_processing_scripts.audiotools.wrappers.reverb import reverb_foa, reverb_hoa2
from ivas_processing_scripts.generation import config
from ivas_processing_scripts.utils import apply_func_parallel

SEED_RANDOM_NOISE = 0


# function for converting nd numpy array to strings with 2 decimal digits
def csv_formatdata(data):
    for row in data:
        yield ["%0.2f" % v for v in row]


# function for searching sequences of same the same character and replacing it by another string
def replace_char_seq_with_string(str, char_seq, repl_str):
    result = []

    # find groups of consecutive letters
    groups = ["".join(list(g)) for k, g in groupby(str)]

    # limit the length of the replacement string by the length of the character sequence
    repl_str = repl_str[: len(char_seq)]

    # replace each occurence of the sequence of characters
    for g in groups:
        if char_seq in g:
            result.append(repl_str)
        else:
            result.append(g)

    return "".join(result)


def generate_ambi_items(
    cfg: config.TestConfig,
    logger: logging.Logger,
):
    """Generate FOA/HOA2 items from mono items based on scene description"""

    # set the target level
    if "loudness" not in cfg.__dict__:
        cfg.loudness = -26

    # set the fs
    if "fs" not in cfg.__dict__:
        cfg.fs = 48000

    # set the IR fs
    if "IR_fs" not in cfg.__dict__:
        cfg.IR_fs = 48000

    # set the pre-amble and post-amble
    if "preamble" not in cfg.__dict__:
        cfg.preamble = 0.0

    if "postamble" not in cfg.__dict__:
        cfg.postamble = 0.0

    # set the IR path
    if "IR_path" not in cfg.__dict__:
        cfg.IR_path = os.path.join(os.path.dirname(__file__), "IR")

    # set the pre-amble and post-amble
    if "add_low_level_random_noise" not in cfg.__dict__:
        cfg.add_low_level_random_noise = False

    # setup binaural rendering
    if "binaural_path" not in cfg.__dict__:
        cfg.binaural_path = ""

    # set the listening lab designator
    if "listening_lab" not in cfg.__dict__:
        cfg.listening_lab = "l"

    # set the language designator
    if "language" not in cfg.__dict__:
        cfg.language = "EN"

    # set the experiment designator
    if "exp" not in cfg.__dict__:
        cfg.exp = "p04"

    # set the provider
    if "provider" not in cfg.__dict__:
        cfg.provider = "g"

    # set the prefix for all input filenames
    if "use_input_prefix" not in cfg.__dict__:
        cfg.use_input_prefix = ""
    else:
        # replace file designators
        cfg.use_input_prefix = replace_char_seq_with_string(
            cfg.use_input_prefix, "l", cfg.listening_lab
        )
        cfg.use_input_prefix = replace_char_seq_with_string(
            cfg.use_input_prefix, "LL", cfg.language
        )
        cfg.use_input_prefix = replace_char_seq_with_string(
            cfg.use_input_prefix, "eee", cfg.exp
        )

    # set the prefix for all IR filenames
    if "use_IR_prefix" not in cfg.__dict__:
        cfg.use_IR_prefix = ""
    else:
        # replace file designators
        cfg.use_IR_prefix = replace_char_seq_with_string(
            cfg.use_IR_prefix, "p", cfg.provider
        )
        cfg.use_IR_prefix = replace_char_seq_with_string(
            cfg.use_IR_prefix, "LL", cfg.language
        )
        cfg.use_IR_prefix = replace_char_seq_with_string(
            cfg.use_IR_prefix, "eee", cfg.exp
        )

    # set the prefix for all output filenames
    if "use_output_prefix" not in cfg.__dict__:
        cfg.use_output_prefix = None
    else:
        # replace file designators
        cfg.use_output_prefix = replace_char_seq_with_string(
            cfg.use_output_prefix, "l", cfg.listening_lab
        )
        cfg.use_output_prefix = replace_char_seq_with_string(
            cfg.use_output_prefix, "eee", cfg.exp
        )

    # set multiprocessing
    if "multiprocessing" not in cfg.__dict__:
        cfg.multiprocessing = True

    apply_func_parallel(
        generate_ambi_scene,
        zip(cfg.scenes.keys(), cfg.scenes.values(), repeat(cfg), repeat(logger)),
        None,
        "mp" if cfg.multiprocessing else None,
    )

    return


def generate_ambi_scene(
    scene_name: str, scene: dict, cfg: config.TestConfig, logger: logging.Logger
):
    logger.info(
        f"Processing scene: {scene_name} out of {len(cfg.scenes)} scenes, name: {scene['name']}"
    )

    # extract the number of audio sources
    N_sources = len(np.atleast_1d(scene["source"]))

    # read the shift time in seconds
    if "overlap" in scene.keys():
        source_overlap = float(scene["overlap"])
    else:
        source_overlap = 0.0

    # read the ambi format
    if "format" in scene.keys():
        ambi_format = scene["format"]
    else:
        ambi_format = "FOA"

    len_s1 = 0
    y = audio.SceneBasedAudio(ambi_format)
    for i in range(N_sources):
        # parse parameters from the scene description
        source_file = np.atleast_1d(scene["source"])[i]
        IR_file = np.atleast_1d(scene["IR"])[i]

        logger.info(f"Convolving {source_file} with {IR_file}")

        # read source file
        x = audio.fromfile(
            "MONO",
            os.path.join(
                cfg.input_path,
                os.path.dirname(source_file),
                cfg.use_input_prefix + os.path.basename(source_file),
            ),
            fs=cfg.fs,
        )

        # read the IR file
        IR = audio.fromfile(
            ambi_format,
            os.path.join(
                cfg.IR_path,
                os.path.dirname(IR_file),
                cfg.use_IR_prefix + os.path.basename(IR_file),
            ),
            fs=cfg.IR_fs,
        )

        if i == 0:
            len_s1 = x.audio.shape[0]

        # convolve with the FOA/HOA2 IR
        if ambi_format == "FOA":
            x = reverb_foa(x, IR)
        elif ambi_format == "HOA2":
            x = reverb_hoa2(x, IR)

        # adjust the level of the foa signal
        _, scale_factor, _ = get_loudness(x, cfg.loudness, "BINAURAL")
        x.audio *= scale_factor

        # shift the second (and all other) source files (positive shift creates overlap, negative shift creates a gap)
        if i > 0:
            # get the length of the first source file
            N_delay = len_s1

            # add the shift
            N_delay += int(-source_overlap * x.fs)

            # insert all-zero preamble
            pre = np.zeros((N_delay, x.audio.shape[1]))
            x.audio = np.concatenate([pre, x.audio])

        # pad with zeros to ensure that the signal length is a multiple of 20ms
        N_frame = x.fs / 50
        if len(x.audio) % N_frame != 0:
            N_pad = int(N_frame - len(x.audio) % N_frame)

            # insert all-zero preamble
            pre = np.zeros((N_pad, x.audio.shape[1]))
            x.audio = np.concatenate([pre, x.audio])

        # add source signal to the array of source signals
        y.fs = x.fs
        if y.audio is None:
            y.audio = x.audio.copy()
        else:
            # pad with zeros to have equal length of all source signals
            if x.audio.shape[0] > y.audio.shape[0]:
                y.audio = np.vstack(
                    (
                        y.audio,
                        np.zeros(
                            (
                                x.audio.shape[0] - y.audio.shape[0],
                                y.audio.shape[1],
                            )
                        ),
                    )
                )
            elif y.audio.shape[0] > x.audio.shape[0]:
                x.audio = np.vstack(
                    (
                        x.audio,
                        np.zeros(
                            (
                                y.audio.shape[0] - x.audio.shape[0],
                                x.audio.shape[1],
                            )
                        ),
                    )
                )

            # superimpose
            y.audio += x.audio

    # append pre-amble and post-amble to all sources
    if cfg.preamble != 0.0:
        # ensure that pre-amble is a multiple of 20ms
        N_pre = int(floor(cfg.preamble * 50) / 50 * y.fs)

        # insert all-zero preamble to all sources
        pre = np.zeros((N_pre, y.audio.shape[1]))
        y.audio = np.concatenate([pre, y.audio])

    if cfg.postamble != 0.0:
        # ensure that post-mable is a multiple of 20ms
        N_post = int(floor(cfg.postamble * 50) / 50 * y.fs)

        # append all-zero postamble to all sources
        post = np.zeros((N_post, y.audio.shape[1]))
        y.audio = np.concatenate([y.audio, post])

    # add random noise
    if cfg.add_low_level_random_noise:
        # create uniformly distributed noise between -4 and 4
        np.random.seed(SEED_RANDOM_NOISE)
        noise = np.random.randint(low=-4, high=5, size=y.audio.shape).astype("float")

        # superimpose
        y.audio += noise

    # write the reverberated audio into output file
    audiofile.write(
        os.path.join(
            cfg.output_path,
            os.path.dirname(scene["name"]),
            cfg.use_output_prefix + os.path.basename(scene["name"]),
        ),
        y.audio,
        y.fs,
    )

    # convert to binaural if option chosen
    if cfg.binaural_path != "":
        binaudio = audio.fromtype("BINAURAL")
        binaudio.fs = y.fs
        convert.format_conversion(y, binaudio)
        audiofile.write(
            os.path.join(cfg.binaural_path, scene["name"]),
            binaudio.audio,
            binaudio.fs,
        )

    return
+0 −356
Original line number Diff line number Diff line
#!/usr/bin/env python3

#
#  (C) 2022-2025 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB,
#  Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
#  Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
#  Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
#  contributors to this repository. All Rights Reserved.
#
#  This software is protected by copyright law and by international treaties.
#  The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB,
#  Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
#  Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
#  Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
#  contributors to this repository retain full ownership rights in their respective contributions in
#  the software. This notice grants no license of any kind, including but not limited to patent
#  license, nor is any license granted by implication, estoppel or otherwise.
#
#  Contributors are required to enter into the IVAS codec Public Collaboration agreement before making
#  contributions.
#
#  This software is provided "AS IS", without any express or implied warranties. The software is in the
#  development stage. It is intended exclusively for experts who have experience with such software and
#  solely for the purpose of inspection. All implied warranties of non-infringement, merchantability
#  and fitness for a particular purpose are hereby disclaimed and excluded.
#
#  Any dispute, controversy or claim arising under or in relation to providing this software shall be
#  submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in
#  accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and
#  the United Nations Convention on Contracts on the International Sales of Goods.
#

import csv
import logging
import os
from itertools import groupby, repeat
from math import floor

import numpy as np

from ivas_processing_scripts.audiotools import audio, audiofile
from ivas_processing_scripts.audiotools.wrappers.bs1770 import get_loudness
from ivas_processing_scripts.generation import config
from ivas_processing_scripts.utils import apply_func_parallel

SEED_RANDOM_NOISE = 0


# function for converting nd numpy array to strings with 2 decimal digits
def csv_formatdata(data):
    for row in data:
        yield ["%0.2f" % v for v in row]


# function for searching sequences of same the same character and replacing it by another string
def replace_char_seq_with_string(str, char_seq, repl_str):
    result = []

    # find groups of consecutive letters
    groups = ["".join(list(g)) for k, g in groupby(str)]

    # limit the length of the replacement string by the length of the character sequence
    repl_str = repl_str[: len(char_seq)]

    # replace each occurence of the sequence of characters
    for g in groups:
        if char_seq in g:
            result.append(repl_str)
        else:
            result.append(g)

    return "".join(result)


def generate_ism1_items(
    cfg: config.TestConfig,
    logger: logging.Logger,
):
    """Generate ISM2 items with metadata from mono items based on scene description"""

    # set the target level
    if "loudness" not in cfg.__dict__:
        cfg.loudness = -26

    # set the fs
    if "fs" not in cfg.__dict__:
        cfg.fs = 48000

    # set the pre-amble and post-amble
    if "preamble" not in cfg.__dict__:
        cfg.preamble = 0.0

    if "postamble" not in cfg.__dict__:
        cfg.postamble = 0.0

    # set the pre-amble and post-amble
    if "add_low_level_random_noise" not in cfg.__dict__:
        cfg.add_low_level_random_noise = False

    # set the listening lab designator
    if "listening_lab" not in cfg.__dict__:
        cfg.listening_lab = "l"

    # set the language designator
    if "language" not in cfg.__dict__:
        cfg.language = "EN"

    # set the experiment designator
    if "exp" not in cfg.__dict__:
        cfg.exp = "p06"

    # set the provider
    if "provider" not in cfg.__dict__:
        cfg.provider = "g"

    # set the prefix for all input filenames
    if "use_input_prefix" not in cfg.__dict__:
        cfg.use_input_prefix = ""
    else:
        # replace file designators
        cfg.use_input_prefix = replace_char_seq_with_string(
            cfg.use_input_prefix, "l", cfg.listening_lab
        )
        cfg.use_input_prefix = replace_char_seq_with_string(
            cfg.use_input_prefix, "LL", cfg.language
        )
        cfg.use_input_prefix = replace_char_seq_with_string(
            cfg.use_input_prefix, "eee", cfg.exp
        )

    # set the prefix for all output filenames
    if "use_output_prefix" not in cfg.__dict__:
        cfg.use_output_prefix = None
    else:
        # replace file designators
        cfg.use_output_prefix = replace_char_seq_with_string(
            cfg.use_output_prefix, "l", cfg.listening_lab
        )
        cfg.use_output_prefix = replace_char_seq_with_string(
            cfg.use_output_prefix, "eee", cfg.exp
        )

    # set multiprocessing
    if "multiprocessing" not in cfg.__dict__:
        cfg.multiprocessing = True

    apply_func_parallel(
        generate_ism1_scene,
        zip(cfg.scenes.keys(), cfg.scenes.values(), repeat(cfg), repeat(logger)),
        None,
        "mp" if cfg.multiprocessing else None,
    )

    return


def generate_ism1_scene(
    scene_name: str, scene: dict, cfg: config.TestConfig, logger: logging.Logger
):
    logger.info(
        f"Processing {scene_name} out of {len(cfg.scenes)} scenes, name: {scene['name']}"
    )

    # extract the number of audio sources
    N_sources = len(np.atleast_1d(scene["source"]))

    # initialize output arrays
    y = audio.ChannelBasedAudio("MONO")
    y_meta = None

    # read the shift time in seconds
    if "overlap" in scene.keys():
        source_overlap = float(scene["overlap"])
    else:
        source_overlap = 0.0

    logger.info(
        f"Encoding {scene['source']} at position(s) {scene['azimuth']},{scene['elevation']}"
    )

    # repeat for all source files
    for i in range(N_sources):
        # parse parameters from the scene description
        source_file = (
            scene["source"][i] if isinstance(scene["source"], list) else scene["source"]
        )

        # read source file
        x = audio.fromfile(
            "MONO",
            os.path.join(
                cfg.input_path,
                os.path.dirname(source_file),
                cfg.use_input_prefix + os.path.basename(source_file),
            ),
            fs=cfg.fs,
        )

        # get the number of frames (multiple of 20ms)
        N_frames = int(len(x.audio) / x.fs * 50)
        frame_len = int(x.fs / 50)

        # trim the samples from the end to ensure that the signal length is a multiple of 20ms
        x.audio = x.audio[: N_frames * frame_len]

        # adjust the level of the source file
        _, scale_factor, _ = get_loudness(x, cfg.loudness, "MONO")
        x.audio *= scale_factor

        # shift the second (and all other) source files (positive shift creates overlap, negative shift creates a gap)
        if i > 0:
            # get the length of the first source file
            N_delay = len(y.audio)

            # add the shift value (ensure that the shift is a multiple of 20ms)
            N_delay += int(floor(-source_overlap * 50) / 50 * x.fs)

            # insert all-zero signal
            pre = np.zeros((N_delay, 1))
            x.audio = np.concatenate([pre, x.audio])

        # pad with zeros to ensure that the signal length is a multiple of 20ms
        if len(x.audio) % frame_len != 0:
            # pad the source signal
            N_pad = int(frame_len - len(x.audio) % frame_len)
            post = np.zeros((N_pad, 1))
            x.audio = np.concatenate([x.audio, post])

        # superimpose all source signals together
        y.fs = x.fs
        if y.audio is None:
            y.audio = x.audio.copy()
        else:
            y.audio.resize(x.audio.shape, refcheck=False)
            y.audio += x.audio

    # append pre-amble and post-amble to all sources
    if cfg.preamble != 0.0:
        # ensure that pre-amble is a multiple of 20ms
        N_pre = int(floor(cfg.preamble * 50) / 50 * y.fs)

        # insert all-zero preamble to all sources
        pre = np.zeros((N_pre, y.audio.shape[1]))
        y.audio = np.concatenate([pre, y.audio])

    if cfg.postamble != 0.0:
        # ensure that post-amble is a multiple of 20ms
        N_post = int(floor(cfg.postamble * 50) / 50 * y.fs)

        # append all-zero postamble to all sources
        post = np.zeros((N_post, y.audio.shape[1]))
        y.audio = np.concatenate([y.audio, post])

    # add random noise
    if cfg.add_low_level_random_noise:
        # create uniformly distributed noise between -4 and 4
        np.random.seed(SEED_RANDOM_NOISE)
        noise = np.random.randint(low=-4, high=5, size=y.audio.shape).astype("float")

        # superimpose
        y.audio += noise

    # process azimuth and elevation
    source_azi = scene["azimuth"]
    source_ele = scene["elevation"]

    N_frames = int(len(y.audio) / y.fs * 50)

    # read azimuth information and convert to an array
    if isinstance(source_azi, str):
        if ":" in source_azi:
            # start with the initial azimuth value and apply step N_frames times
            source_azi = source_azi.split(":")
            azi = np.arange(
                float(eval(source_azi[0])),
                float(eval(source_azi[0])) + N_frames * float(eval(source_azi[1])),
                float(eval(source_azi[1])),
            )
        else:
            # replicate static azimuth value N_frames times
            azi = np.repeat(float(eval(source_azi)), N_frames)
    else:
        # replicate static azimuth value N_frames times
        azi = np.repeat(float(source_azi), N_frames)

    # convert azimuth from 0 .. 360 to -180 .. +180
    azi = (azi + 180) % 360 - 180

    # check, if azimuth is from -180 .. +180
    if any(azi > 180) or any(azi < -180):
        logger.error(
            f"Incorrect value(s) of azimuth: {azi[(azi > 180) | (azi < -180)]}"
        )

    # read elevation information and convert to an array
    if isinstance(source_ele, str):
        if ":" in source_ele:
            # convert into array (initial_value:step:stop_value)
            # note: the stop_value value is +-90 degrees depending on the sign of the step
            source_ele = source_ele.split(":")
            ele = np.arange(
                float(eval(source_ele[0])),
                np.sign(float(eval(source_ele[1]))) * 90,
                float(eval(source_ele[1])),
            )[:N_frames]

            # repeat the last elevation value, if array is shorter than N_frames
            if len(ele) < N_frames:
                ele = np.append(ele, np.full(N_frames - len(ele), ele[-1]))
        else:
            # replicate static elevation value N_frames times
            ele = np.repeat(float(eval(source_ele)), N_frames)
    else:
        # replicate static elevation value N_frames times
        ele = np.repeat(float(source_ele), N_frames)

    # check if elevation is from -90 .. +90
    if any(ele > 90) or any(ele < -90):
        logger.error(
            f"Incorrect value(s) of elevation: {ele[(ele > 90) | (ele < -90)]}"
        )

    # arrange all metadata fields column-wise into a matrix
    y_meta = np.column_stack((azi, ele))

    # write ISM audio stream to the output file
    audiofile.write(
        os.path.join(
            cfg.output_path,
            os.path.dirname(scene["name"]),
            cfg.use_output_prefix + os.path.basename(scene["name"]),
        ),
        y.audio,
        y.fs,
    )

    # write ISM metadata to the output file in .0.csv format
    csv_filename = os.path.join(
        cfg.output_path,
        os.path.dirname(scene["name"]),
        cfg.use_output_prefix + os.path.basename(scene["name"]) + ".0.csv",
    )

    with open(
        csv_filename,
        "w",
        newline="",
        encoding="utf-8",
    ) as f:
        # create csv writer
        writer = csv.writer(f)

        # write all rows to the .csv file
        writer.writerows(csv_formatdata(y_meta))

    return
+0 −400

File deleted.

Preview size limit exceeded, changes collapsed.

+0 −327

File deleted.

Preview size limit exceeded, changes collapsed.