Commit cc990927 authored by bayers's avatar bayers
Browse files

added wrapper for the network simulator

parent a6abb38d
Loading
Loading
Loading
Loading
Loading
+169 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3

#
#  (C) 2022-2023 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB,
#  Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
#  Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
#  Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
#  contributors to this repository. All Rights Reserved.
#
#  This software is protected by copyright law and by international treaties.
#  The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB,
#  Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
#  Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
#  Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
#  contributors to this repository retain full ownership rights in their respective contributions in
#  the software. This notice grants no license of any kind, including but not limited to patent
#  license, nor is any license granted by implication, estoppel or otherwise.
#
#  Contributors are required to enter into the IVAS codec Public Collaboration agreement before making
#  contributions.
#
#  This software is provided "AS IS", without any express or implied warranties. The software is in the
#  development stage. It is intended exclusively for experts who have experience with such software and
#  solely for the purpose of inspection. All implied warranties of non-infringement, merchantability
#  and fitness for a particular purpose are hereby disclaimed and excluded.
#
#  Any dispute, controversy or claim arising under or in relation to providing this software shall be
#  submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in
#  accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and
#  the United Nations Convention on Contracts on the International Sales of Goods.
#

from pathlib import Path
from typing import Optional, Union
import os.path

from ivas_processing_scripts.audiotools.wrappers.gen_patt import create_error_pattern
from ivas_processing_scripts.utils import find_binary, run

LIST_JBM_PROFILES = range(12)
ERROR_PATTERNS_DIR = Path(__file__).parent.parent.parent.joinpath("dly_error_profiles")


def validate_network_simulator(
        error_pattern: Optional[Union[Path, str]] = None,
        error_profile: Optional[int] = None,
    ):
    if find_binary("networkSimulator_g192")is None:
        raise FileNotFoundError(
            "The network simulator binary was not found! Please check the configuration."
        )
    if error_pattern is not None:
        if not os.path.exists(os.path.realpath(error_pattern)):
            raise FileNotFoundError(
                f"The network simulator error profile file {error_pattern} was not found! Please check the configuration."
            )
        if error_profile is not None:
            raise ValueError(
                "JBM pattern and JBM profile number are specified for bitstream processing. Can't use both! Please check the configuration."
            )
    elif error_profile is not None:
        if error_profile not in LIST_JBM_PROFILES:
            raise ValueError(
                f"JBM profile number {error_profile} does not exist, should be between {LIST_JBM_PROFILES[0]} and {LIST_JBM_PROFILES[-1]}"
            )


def network_simulator(
    error_pattern: Union[str, Path],
    in_bitstream: Union[str, Path],
    out_bitstream: Union[str, Path],
    n_frames_per_paket: int,
) -> None:
    """
    Wrapper for eid-xor binary to apply error patterns for the bitstream processing

    Parameters
    ----------
    error_pattern: Union[str, Path]
        Path to error pattern file
    in_bitstream: Union[str, Path]
        Path to input bitstream file
    out_bitstream: Union[str, Path]
        Output path for modified bitstream
    """

    # find binary
    binary = find_binary("networkSimulator_g192")

    # check for valid inputs
    if not Path(in_bitstream).is_file():
        raise ValueError(
            f"Input bitstream file {in_bitstream} for bitstream processing does not exist"
        )
    elif not Path(error_pattern).is_file():
        raise ValueError(
            f"Error pattern file {error_pattern} for bitstream processing does not exist"
        )

    # set up command line
    cmd = [
        str(binary),
        error_pattern,
        in_bitstream,
        out_bitstream,
        f"{out_bitstream}_tracefile_sim",
        str(n_frames_per_paket),
        "0"
    ]

    # run command
    run(cmd)

    return


def apply_network_simulator(
    in_bitstream: Union[Path, str],
    out_bitstream: Union[Path, str],
    error_pattern: Optional[Union[Path, str]] = None,
    error_profile: Optional[int] = None,
    n_frames_per_paket: Optional[int] = None,
) -> None:
    """
    Function to apply a jbm error profile to a bitstreaam

    Parameters
    ----------
    in_bitstream: Union[Path, str]
        Path of input bitstream
    out_bitstream: Union[Path, str]
        Path of output bitstream
    error_pattern: Optional[Union[Path, str]]
        Path to existing error pattern
    error_profile: Optional[int]
        Index of existing error pattern
    n_frames_per_paket: Optional[int]
        number of frames per paket
    fs: Optional[int]
        Sampling rate
    """

    if error_pattern is None:
        # create error pattern
        if error_profile is not None:
            if error_profile in LIST_JBM_PROFILES:
                error_pattern = ERROR_PATTERNS_DIR.joinpath(f"dly_error_profile_{error_profile}.dat")
            else:
                raise ValueError(
                    f"JBM profile number {error_profile} does not exist, should be between {LIST_JBM_PROFILES[0]} and {LIST_JBM_PROFILES[-1]}"
                )
        else:
            raise ValueError(
                "Either error pattern or error profile number has to be specified for network simulator bitstream processing"
            )
    elif error_profile is not None:
        raise ValueError(
            "JBM pattern and JBM profile number are specified for bitstream processing. Can't use both"
        )

    if n_frames_per_paket is None:
        n_frames_per_paket = 1
        if error_profile is not None and error_profile == 5:
            n_frames_per_paket = 2

    # apply error pattern
    network_simulator(error_pattern, in_bitstream, out_bitstream, n_frames_per_paket)

    return
+3 −3
Original line number Diff line number Diff line
@@ -227,9 +227,9 @@ def get_processing_chain(
            tx_jbm_cfg = cond_cfg["tx_jbm"]
        elif hasattr(cfg, "tx_jbm"):
            tx_jbm_cfg = {
                "bin": cfg.tx_jbm["bs_proc_bin"],
                "delay_error_profile": cfg.tx_jbm["delay_error_profile"],
                "opts": cfg.tx_jbm["bs_proc_opts"],
                "error_pattern": cfg.tx_jbm.get("error_pattern", None),
                "error_profile": cfg.tx_jbm.get("error_profile", None),
                "n_frames_per_paket": cfg.tx_jbm.get("n_frames_per_paket", 1),
            }
        else:
            tx_jbm_cfg = None
+26 −15
Original line number Diff line number Diff line
@@ -41,6 +41,8 @@ from ivas_processing_scripts.audiotools import audio
from ivas_processing_scripts.audiotools.audiofile import parse_wave_header, read
from ivas_processing_scripts.audiotools.wrappers.eid_xor import (
    create_and_apply_error_pattern,
)from ivas_processing_scripts.audiotools.wrappers.networkSimulator import (
    apply_network_simulator, validate_network_simulator
)
from ivas_processing_scripts.processing.processing import Processing
from ivas_processing_scripts.utils import run
@@ -53,6 +55,8 @@ class IVAS(Processing):
        self.name = "ivas"
        self.in_fmt = audio.fromtype(self.in_fmt)
        self.out_fmt = audio.fromtype(self.out_fmt)
        if not hasattr(self,"dec_opts"):
            self.dec_opts = None

    def _validate(self):
        if not self.cod_bin or not Path(self.cod_bin).exists():
@@ -73,6 +77,11 @@ class IVAS(Processing):
                raise FileNotFoundError(
                    "The IVAS decoder binary was not found! Please check the configuration."
                )
        # TODO: if tx_fer or tx_jbm test for existence of bitstream processing binaries and
        # existence of error pattern files (if given) already here
        if self.tx_jbm is not None:
            validate_network_simulator(self.tx_jbm["error_pattern"],self.tx_jbm["error_profile"])


    def process(
        self, in_file: Path, out_file: Path, in_meta, logger: logging.Logger
@@ -194,22 +203,24 @@ class IVAS(Processing):
        logger: Optional[logging.Logger] = None,
    ) -> Union[Path, str]:
        if self.tx_jbm is not None:
            cmd = [self.tx_jbm["bin"]]
            bs, ext = os.path.splitext(bitstream)
            error_pattern = self.tx_jbm["error_pattern"]
            bitstream_processed = Path(bs + "." + os.path.basename(error_pattern) + ext)
            opts = deepcopy(self.tx_jbm["opts"])
            opts = [
                x.format(
                    error_pattern=error_pattern,
                    bitstream=bitstream,
                    processed_bitstream=bitstream_processed,
            bitstream_processed = Path(f"{bs}_processed{ext}")
            logger.debug(f"Network simulator {bitstream} -> {bitstream_processed}")
            apply_network_simulator(
                bitstream,
                bitstream_processed,
                self.tx_jbm["error_pattern"],
                self.tx_jbm["error_profile"],
                self.tx_jbm["n_frames_per_paket"],
            )
                for x in opts
            ]
            logger.debug(f"JBM network simulator {bitstream} -> {bitstream_processed}")
            cmd.extend(opts)
            run(cmd, logger=logger)
            # add -voip cmdline option to the decoder
            # TODO: tracefile also?
            if self.dec_opts:
                if "-voip" not in self.dec_opts:
                    self.dec_opts.extend(["-voip"])

            else:
                self.dec_opts = ["-voip"]
            return bitstream_processed

        elif self.tx_fer is not None: