Commit 113872d0 authored by Anika Treffehn's avatar Anika Treffehn
Browse files

at least some ISM support

parent a4848584
Loading
Loading
Loading
Loading
+35 −47
Original line number Diff line number Diff line
@@ -60,6 +60,7 @@ from ivas_processing_scripts.audiotools.metadata import (
from ivas_processing_scripts.constants import LOGGER_DATEFMT, LOGGER_FORMAT
from ivas_processing_scripts.processing.config import TestConfig
from ivas_processing_scripts.utils import apply_func_parallel, list_audio, pairwise
from ivas_processing_scripts.audiotools.constants import IVAS_FRAME_LEN_MS


class Processing(ABC):
@@ -168,7 +169,7 @@ def concat_setup(cfg: TestConfig, chain, logger: logging.Logger):
    logger.info(f"Splits written to file {cfg.concat_file.with_suffix('.splits.log')}")


def concat_teardown(x, splits, out_fmt, fs, in_fs, logger: logging.Logger):
def concat_teardown(x, splits, out_fmt, fs, in_fs, meta, logger: logging.Logger):

    if not splits:
        raise ValueError("Splitting not possible without split marker")
@@ -186,6 +187,7 @@ def concat_teardown(x, splits, out_fmt, fs, in_fs, logger: logging.Logger):

    split_old = 0
    split_signals = []
    split_meta = []
    for idx, split in enumerate(splits):
        # split
        y = x[split_old:split, :]
@@ -196,26 +198,26 @@ def concat_teardown(x, splits, out_fmt, fs, in_fs, logger: logging.Logger):
        # add signal to list
        split_signals.append(y)

        split_old = split

        # split ISM metadata
    if out_fmt.startswith("ISM"):  # TODO
        # for odir in cfg.out_dirs:
        #     path_input = odir / cfg.items_list[0].name
        #     out_meta_paths = split_meta_in_file(
        #         path_input,
        #         odir,
        #         cfg.split_names,
        #         cfg.splits,
        #         output_format,
        #         meta_files=cfg.metadata_path[0],
        #     )
        #     out_meta.append(out_meta_paths)
        split_meta = repeat(None)
        pass
        if out_fmt.startswith("ISM"):
            split_meta_object = []
            for obj_meta in meta:
                # compute number of frames per split
                split_old_frames = int(split_old / IVAS_FRAME_LEN_MS / fs * 1000)
                split_frames = int(split / IVAS_FRAME_LEN_MS / fs * 1000)

                # split
                obj_meta = obj_meta[split_old_frames:split_frames, :]

                # add signal to list
                split_meta_object.append(obj_meta)

            split_meta.append(split_meta_object)
        else:
            split_meta = repeat(None)

        split_old = split

    return split_signals, split_meta


@@ -408,47 +410,33 @@ def process_item(
                    copyfile(ppm, out_meta[idx])


def remove_preamble(x, out_fmt, fs, repeat_signal, preamble, logger):
def remove_preamble(x, out_fmt, fs, repeat_signal, preamble, meta, logger):

    # remove preamble for ISM metadata
    if out_fmt.startswith("ISM"):  # TODO
        # # search for metadata
        # meta_item = metadata_search(
        #     odir, [Path(item.name)], num_objects=num_channels
        # )
        # metadata_array = []
        # for meta_i in meta_item:
        #     metadata_array.append(np.genfromtxt(meta_i, delimiter=","))
        #
        # # cut first half of the metadata
        # if cfg.pre2.repeat_signal:
        #     metadata_array = [m[int(len(m) / 2) :, :] for m in metadata_array]
        #
        # # remove preamble
        # if cfg.pre2.preamble:
        #     metadata_array = add_remove_preamble(
        #         metadata_array, cfg.pre2.preamble, add=False
        #     )
        #
        # # write csv files
        # write_ISM_metadata_in_file(
        #     metadata_array, [path_input], automatic_naming=True
        # )
        pass
    if out_fmt.startswith("ISM"):
        # cut first half of the metadata
        if repeat_signal:
            meta = [m[int(len(m) / 2):, :] for m in meta]

        # remove preamble
        if preamble:
            meta = add_remove_preamble(
                meta, preamble, add=False
            )

    # remove first half of signal
    if repeat_signal:
        if logger:
            logger.info("Remove first half of signal")
            logger.debug("Remove first half of signal")
        x = x[int(len(x) / 2):, :]

    # remove preamble
    if preamble:
        if logger:
            logger.info("Remove preamble")
            logger.debug("Remove preamble")
        x = trim(x, fs, (preamble, 0))

    return x
    return x, meta


def preprocess_background_noise(cfg):
+30 −17
Original line number Diff line number Diff line
@@ -34,6 +34,7 @@ import logging
from pathlib import Path
from itertools import repeat
import re
import numpy as np

from ivas_processing_scripts.processing.processing import Processing, remove_preamble, concat_teardown
from ivas_processing_scripts.audiotools import audio
@@ -59,20 +60,25 @@ class Processing_splitting_scaling(Processing):
            in_file_noerror = Path(f"{in_file.with_suffix('')}.noerror.wav")
            out_file_noerror = Path(f"{out_file.with_suffix('')}.noerror.wav")
            x_noerror, _ = read(in_file_noerror, nchannels=num_channels, fs=self.fs)
        # read metadata
        meta_arrays = []
        for meta in in_meta:
            meta_arrays.append(np.genfromtxt(meta, delimiter=","))

        # cut preamble and split file
        out_files, file_splits, meta_splits = self.revert_preamble_concatenation(x, fs, in_file, out_file, False, logger)
        out_files, file_splits, out_meta, meta_splits = self.revert_preamble_concatenation(x, fs, in_file, out_file, meta_arrays, False, logger)
        if self.tx_condition:
            out_files_noerror, file_splits_noerror, meta_splits_noerror = self.revert_preamble_concatenation(x_noerror, fs, in_file_noerror, out_file_noerror, True)
            in_meta_noerror = None  # TODO
            out_files_noerror, file_splits_noerror, meta_splits_noerror = self.revert_preamble_concatenation(x_noerror, fs, in_file_noerror, out_file_noerror, in_meta_noerror, True)

        # scale splitted files
        if self.loudness:
            if self.tx_condition:
                # do special scaling -> measure noerror loudness and apply scaling to signal with error
                scaling_splits = measure_loudness(file_splits_noerror, self.out_fmt, fs, self.loudness, self.loudness_fmt, logger)
                scaling_splits = measure_loudness(file_splits_noerror, self.out_fmt, fs, self.loudness, self.loudness_fmt, meta_splits, logger)
                file_splits = [f*loud for f, loud in zip(file_splits, scaling_splits)]
            else:
                file_splits = adjust_loudness(file_splits, self.out_fmt, fs, self.loudness, self.loudness_fmt, logger)
                file_splits = adjust_loudness(file_splits, self.out_fmt, fs, self.loudness, self.loudness_fmt, meta_splits, logger)

        # derive output folder names
        out_out_files = []
@@ -86,7 +92,7 @@ class Processing_splitting_scaling(Processing):
                f_out = Path(str(f).replace(tmp_name, out_name).replace(".processing_splitting_scaling.", "."))
            out_out_files.append(f_out)
            if meta_splits:
                # TODO
                # TODO ISM
                pass

        # write file(s) in tmp and output folder
@@ -94,13 +100,12 @@ class Processing_splitting_scaling(Processing):
            write(of, file_s, fs)
            write(oof, file_s, fs)
            if meta_s:
                pass  # TODO
                pass  # TODO ISM

    def revert_preamble_concatenation(self, x, fs, in_file, out_file, noerror=False, logger=None):
    def revert_preamble_concatenation(self, x, fs, in_file, out_file, in_meta, noerror=False, logger=None):
        # remove preamble and first half of signal due to repetition
        if self.preamble or self.repeat_signal:
            # TODO: ISM metadata
            x = remove_preamble(x, self.out_fmt, self.fs, self.repeat_signal, self.preamble, logger)
            x, in_meta = remove_preamble(x, self.out_fmt, self.fs, self.repeat_signal, self.preamble, in_meta, logger)

        # reverse concatenation
        if self.concatenate_input:
@@ -112,21 +117,25 @@ class Processing_splitting_scaling(Processing):
                splits_info_file = Path(
                    f"{in_file.parent.parent.joinpath('tmp_preprocessing_2').joinpath(Path(in_file.stem).stem)}.splits.log")
            splits, split_names, split_fs = read_splits_file(splits_info_file)

            # split file
            file_splits, meta_splits = concat_teardown(x, splits, self.out_fmt, fs, split_fs, logger)
            file_splits, meta_splits = concat_teardown(x, splits, self.out_fmt, fs, split_fs, in_meta, logger)

            # set new out_files
            out_files = [in_file.parent.joinpath(sn).with_suffix(f".{self.name}.wav") for sn in split_names]
            out_meta = [in_file.parent.joinpath(sn).with_suffix(f".{self.name}.wav") for sn in split_names]  # TODO ISM
            if self.tx_condition:
                file_splits_noerror, meta_splits_noerror = concat_teardown(x, splits, self.out_fmt, fs, split_fs, logger)
        else:
            out_files = [out_file]
            file_splits = [x]
            meta_splits = repeat(None)
            meta_splits = repeat(None)  # TODO
            out_meta = repeat(None)  # TODO
            if self.tx_condition:
                file_splits_noerror = None
                file_splits_noerror = None  # TODO
                meta_splits_noerror = None  # TODO

        return out_files, file_splits, meta_splits
        return out_files, file_splits, out_meta, meta_splits


def read_splits_file(splits_file):
@@ -138,21 +147,25 @@ def read_splits_file(splits_file):
    return splits, names, fs


def adjust_loudness(file_splits, out_fmt, fs, loudness, loudness_fmt, logger=None):
def adjust_loudness(file_splits, out_fmt, fs, loudness, loudness_fmt, meta, logger=None):
    scaled_signals = []
    for f in file_splits:
    for f, m in zip(file_splits, meta):
        audio_object = audio.fromarray(fmt=out_fmt, x=f, fs=fs)
        if isinstance(audio_object, audio.ObjectBasedAudio):
            audio_object.object_pos = m
        scaled_signal, _ = loudness_norm(audio_object, loudness, loudness_fmt, logger=logger)
        scaled_signals.append(scaled_signal)
    return scaled_signals


def measure_loudness(file_splits, out_fmt, fs, loudness, loudness_fmt, logger):
def measure_loudness(file_splits, out_fmt, fs, loudness, loudness_fmt, meta, logger):
    if logger:
        logger.debug("Apply special scaling for bitstream error conditions")
    scaling_splits = []
    for f in file_splits:
    for f, m in zip(file_splits, meta):
        audio_object = audio.fromarray(fmt=out_fmt, x=f, fs=fs)
        if isinstance(audio_object, audio.ObjectBasedAudio):
            audio_object.object_pos = m
        _, scale_factor = loudness_norm(audio_object, loudness, loudness_fmt, logger=logger)
        scaling_splits.append(scale_factor)
    return scaling_splits