Commit 405e164c authored by Jan Kiene's avatar Jan Kiene
Browse files

Merge remote-tracking branch 'origin/main' into ci/calculate-coverage-only-on-ivas-libs

parents 5a0fbf19 80a90880
Loading
Loading
Loading
Loading
Loading
+627 −0
Original line number Diff line number Diff line
__copyright__ = """
(C) 2022-2023 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB,
Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
contributors to this repository. All Rights Reserved.

This software is protected by copyright law and by international treaties.
The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB,
Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
contributors to this repository retain full ownership rights in their respective contributions in
the software. This notice grants no license of any kind, including but not limited to patent
license, nor is any license granted by implication, estoppel or otherwise.

Contributors are required to enter into the IVAS codec Public Collaboration agreement before making
contributions.

This software is provided "AS IS", without any express or implied warranties. The software is in the
development stage. It is intended exclusively for experts who have experience with such software and
solely for the purpose of inspection. All implied warranties of non-infringement, merchantability
and fitness for a particular purpose are hereby disclaimed and excluded.

Any dispute, controversy or claim arising under or in relation to providing this software shall be
submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in
accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and
the United Nations Convention on Contracts on the International Sales of Goods.
"""

__doc__ = """
Execute tests specified via a parameter file.
"""

import os
from pathlib import Path
import errno
import platform
import filecmp
from subprocess import run
import pytest
from tests.cmp_pcm import cmp_pcm
from tests.cut_pcm import cut_samples
from tests.conftest import EncoderFrontend, DecoderFrontend
#from tests.testconfig import PARAM_FILE


VALID_DEC_OUTPUT_CONF = [
    "MONO",
    "STEREO",
    "5_1",
    "7_1",
    "5_1_2",
    "5_1_4",
    "7_1_4",
    "FOA",
    "HOA2",
    "HOA3",
    "BINAURAL",
    "BINAURAL_ROOM_IR",
    "BINAURAL_ROOM_REVERB",
    "EXT",
]

param_file_test_dict = {}
PARAM_FILE = "scripts/config/self_test_ltv.prm"
with open(PARAM_FILE, "r", encoding="UTF-8") as fp:
    data = fp.read()
    blocks = data.split("\n\n")
    for block in blocks:
        tag = ""
        enc_opts = ""
        dec_opts = ""
        sim_opts = ""
        eid_opts = ""
        for line in block.split("\n"):
            if line.startswith("// "):
                tag = line[3:]
            if line.startswith("../IVAS_cod "):
                enc_opts = line[12:]
            if line.startswith("../IVAS_dec "):
                dec_opts = line[12:]
            if line.startswith("networkSimulator_g192 "):
                sim_opts = line[22:]
            if line.startswith("eid-xor "):
                eid_opts = line[8:]
        if tag == "" or enc_opts == "" or dec_opts == "":
            # no complete parameter set
            continue
        if tag in param_file_test_dict:
            print("non-unique tag found - ignoring new entry")
            continue
        param_file_test_dict[tag] = (enc_opts, dec_opts, sim_opts, eid_opts)


def check_and_makedir(dir_path):
    if not os.path.exists(dir_path):
        try:
            os.makedirs(dir_path)
        except OSError as e:
            if e.errno != errno.EEXIST:
                raise  # raises the error again


def convert_test_string_to_tag(test_string):
    """
    Convert a test string (i.e. the test tag from the parameter file) to a tag string.
    Example:
    in:  "DFT stereo at 13.2 kbps, 16kHz in, 16kHz out, DTX on, random FEC at 5%"
    out: "DFT_stereo_at_13_2_kbps_16kHz_in_16kHz_out_DTX_on_random_FEC_at_5_"
    """
    # replace certain characters by "_" or remove them
    tag_str = ""
    replace_chars = " %.-()"
    remove_chars = ","
    for char in test_string:
        if char in replace_chars:
            tag_str += "_"
        elif char not in remove_chars:
            tag_str += char
    # replace double underscore by single one
    tag_str = "_".join(tag_str.split("__"))
    return tag_str


@pytest.mark.create_ref
@pytest.mark.parametrize("test_tag", list(param_file_test_dict.keys()))
def test_param_file_tests(
    dut_encoder_frontend: EncoderFrontend,
    dut_decoder_frontend: DecoderFrontend,
    ref_encoder_frontend: EncoderFrontend,
    ref_decoder_frontend: DecoderFrontend,
    reference_path,
    dut_base_path,
    test_vector_path,
    update_ref,
    rootdir,
    keep_files,
    test_tag,
    get_mld,
):
    enc_opts, dec_opts, sim_opts, eid_opts = param_file_test_dict[test_tag]

    tag_str = convert_test_string_to_tag(test_tag)

    # evaluate encoder options
    enc_split = enc_opts.split()
    assert len(enc_split) >= 4

    # replace "testv/" by test vector path
    enc_split = [
        x.replace("testv", f"{test_vector_path}", 1) if x.startswith("testv/") else x
        for x in enc_split
    ]

    bitstream_file = enc_split.pop()
    testv_file = enc_split.pop()
    fs = enc_split.pop()
    sampling_rate = int(fs)
    bitrate = enc_split.pop()

    sba_br_switching_dtx = 0
    if (
        not bitrate.isdigit()
        and "-dtx" in enc_split
        and "-sba" in enc_split
        and testv_file.split("/")[1].startswith("stv")
    ):
        sba_br_switching_dtx = 1
        cut_file = pre_proc_input(testv_file, fs)
        testv_file = cut_file

    # bitrate can be a filename: remove leading "../"
    if bitrate.startswith("../"):
        bitrate = bitrate[3:]

    testv_base = testv_file.split("/")[-1]
    if testv_base.endswith(".pcm"):
        testv_base = testv_base[:-4]

    assert bitstream_file == "bit"
    # in the parameter file, only "bit" is used as bitstream file name
    # -> construct bitstream filename
    bitstream_file = f"{testv_base}_{tag_str}.192"

    encode(
        dut_encoder_frontend,
        ref_encoder_frontend,
        reference_path,
        dut_base_path,
        bitrate,
        sampling_rate,
        testv_file,
        bitstream_file,
        enc_split,
        update_ref,
    )
    if sba_br_switching_dtx == 1:
        is_exist = os.path.exists(cut_file)
        if is_exist:
            os.remove(cut_file)
         
    # check for networkSimulator_g192 command line
    if sim_opts != "":
        sim_split = sim_opts.split()
        assert len(sim_split) == 6, "networkSimulator_g192 expects 6 parameters"
        # [sim_profile, sim_input, sim_output, sim_trace, sim_nFPP, sim_offset] = sim_split
        if sim_split[0].startswith(("../")):
            # remove leading "../"
            sim_split[0] = sim_split[0][3:]
        assert sim_split[1] == "bit"
        # in the parameter file, only "bit" is used as bitstream file name
        # -> re-use bitstream filename from encoder call
        sim_split[1] = bitstream_file
        assert sim_split[2] == "netsimoutput"
        # in the parameter file, only "netsimoutput" is used as netsim output file name
        # -> construct netsim output file name
        netsim_outfile = f"{testv_base}_{tag_str}.netsimout"
        sim_split[2] = netsim_outfile
        assert sim_split[3] == "tracefile_sim"
        # in the parameter file, only "tracefile_sim" is used as trace output file name
        # -> construct trace output file name
        netsim_trace_outfile = f"{testv_base}_{tag_str}.netsimtrace"
        sim_split[3] = netsim_trace_outfile
        simulate(
            reference_path,
            dut_base_path,
            sim_split,
            update_ref,
            rootdir,
        )
              
    # check for eid-xor command line
    if eid_opts != "":
        eid_split = eid_opts.split()
        assert len(eid_split) >= 3, "eid-xor expects at least 3 parameters"
        # [..., in_bs, err_pat_bs, out_bs] = eid_split
        if eid_split[-2].startswith(("../")):
            # remove leading "../"
            eid_split[-2] = eid_split[-2][3:]
        assert eid_split[-3] == "bit"
        # in the parameter file, only "bit" is used as the input bitstream file name
        # -> re-use bitstream filename from encoder call
        eid_split[-3] = bitstream_file
        assert eid_split[-1] == "bit_error"
        # in the parameter file, only "bit_error" is used as the output bitstream file name
        # -> construct netsim output file name
        eid_xor_outfile = f"{testv_base}_{tag_str}.fer.192"
        eid_split[-1] = eid_xor_outfile
        error_insertion(
            reference_path,
            dut_base_path,
            eid_split,
            update_ref,
            rootdir,
        )
        
    # evaluate decoder options
    dec_split = dec_opts.split()
    assert len(dec_split) >= 3

    # replace "testv/" by test vector path
    dec_split = [
        x.replace("testv", f"{test_vector_path}", 1) if x.startswith("testv/") else x
        for x in dec_split
    ]
    # remove leading "../"
    dec_split = [x[3:] if x.startswith("../") else x for x in dec_split]

    output_file = dec_split.pop()
    bitstream_file_dec = dec_split.pop()
    sampling_rate = int(dec_split.pop())
    if len(dec_split) > 0:
        output_config = dec_split.pop()
        if output_config.upper() not in VALID_DEC_OUTPUT_CONF:
            if not output_config.endswith(".txt"):
                # must be EVS tests with additional parameters - put param back
                dec_split.append(output_config)
                output_config = ""
    else:
        output_config = ""

    output_config_name = output_config
    if "/" in output_config:
        # the output config is a file
        output_config_name = os.path.splitext(os.path.basename(output_config))[0]

    tracefile_dec = ""
    if sim_opts != "":
        assert bitstream_file_dec == "netsimoutput"
        # in the parameter file, only "netsimoutput" is used as bitstream file name
        # -> re-use netsim_outfile
        bitstream_file = netsim_outfile
        tracefile_dec = f"{testv_base}_{tag_str}.dectrace"
    elif eid_opts != "":
        assert bitstream_file_dec == "bit_error"
        # in the parameter file, only "bit_error" is used as bitstream input file name
        # -> re-use eid_xor_outfile
        bitstream_file = eid_xor_outfile
    else:
        assert bitstream_file_dec == "bit"
        # in the parameter file, only "bit" is used as bitstream file name
        # -> re-use bitstream filename from encoder call
        

    # the output file is not the real output filename
    # -> construct output filename
    output_file = f"{testv_base}_{tag_str}.dec.wav"

    stdout = decode(
        dut_decoder_frontend,
        ref_decoder_frontend,
        reference_path,
        dut_base_path,
        output_config,
        sampling_rate,
        bitstream_file,
        output_file,
        dec_split,
        update_ref,
        tracefile_dec,
    )
    
    if update_ref in [0, 2]:
        dut_output_file = f"{dut_base_path}/param_file/dec/{output_file}"
        ref_output_file = f"{reference_path}/param_file/dec/{output_file}"
        fs = int(sampling_rate) * 1000
        output_differs, reason = cmp_pcm(
            dut_output_file, ref_output_file, output_config, fs, get_mld=get_mld
        )
        md_out_files = get_expected_md_files(ref_output_file, enc_split, output_config)

        metadata_differs = False
        for md_file in md_out_files:
            dut_metadata_file = Path(f"{dut_base_path}/param_file/dec/{md_file}")
            ref_metadata_file = Path(f"{reference_path}/param_file/dec/{md_file}")
            try:
                if not filecmp.cmp(dut_metadata_file, ref_metadata_file):
                    print("Output metadata differs for file: " + md_file)
                    metadata_differs = True
            except FileNotFoundError:
                if not dut_metadata_file.exists():
                    print(f"DUT output metadata missing for expected file: " + md_file)
                if not ref_metadata_file.exists():
                    print(f"REF output metadata missing for expected file: " + md_file)
                metadata_differs = True

        if output_differs or metadata_differs:
            msg = "Difference between ref and dut in "
            if output_differs and metadata_differs:
                msg += "output and metadata"
            elif output_differs:
                msg += "output only"
            elif metadata_differs:
                msg += "metadata only"

            assert False, msg

        # remove DUT output files when test result is OK (to save disk space)
        if not keep_files:
            os.remove(f"{dut_base_path}/param_file/enc/{bitstream_file}")
            os.remove(f"{dut_base_path}/param_file/dec/{output_file}")
            if sim_opts != "":
                os.remove(f"{dut_base_path}/param_file/enc/{testv_base}_{tag_str}.192")
                os.remove(f"{dut_base_path}/param_file/enc/{netsim_trace_outfile}")
                os.remove(f"{dut_base_path}/param_file/dec/{tracefile_dec}")
            elif eid_opts != "":
                os.remove(f"{dut_base_path}/param_file/enc/{testv_base}_{tag_str}.192")



def encode(
    dut_encoder_frontend,
    ref_encoder_frontend,
    reference_path,
    dut_base_path,
    bitrate,
    sampling_rate,
    testv_file,
    bitstream_file,
    enc_opts_list,
    update_ref,
):
    """
    Call REF and/or DUT encoder.
    """
    # directories
    dut_out_dir = f"{dut_base_path}/param_file/enc"
    ref_out_dir = f"{reference_path}/param_file/enc"

    ref_out_file = f"{ref_out_dir}/{bitstream_file}"
    dut_out_file = f"{dut_out_dir}/{bitstream_file}"

    if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file):
        check_and_makedir(ref_out_dir)
        # call REF encoder
        ref_encoder_frontend.run(
            bitrate,
            sampling_rate,
            testv_file,
            ref_out_file,
            add_option_list=enc_opts_list,
        )

    if update_ref in [0, 2]:
        check_and_makedir(dut_out_dir)
        # call DUT encoder
        dut_encoder_frontend.run(
            bitrate,
            sampling_rate,
            testv_file,
            dut_out_file,
            add_option_list=enc_opts_list,
        )


def pre_proc_input(testv_file, fs):
    cut_from = "0.0"
    cut_len = "5.0"
    cut_gain = "0.004"
    if "stvFOA" in testv_file:
        num_channel = "4"
    elif "stv2OA" in testv_file:
        num_channel = "9"
    elif "stv3OA" in testv_file:
        num_channel = "16"
    cut_file = testv_file.replace(".wav", num_channel + "chn_" + cut_gain + ".wav")
    cut_samples(
        testv_file, cut_file, num_channel, fs + "000", cut_from, cut_len, cut_gain
    )
    return cut_file


def simulate(
    reference_path,
    dut_base_path,
    sim_opts_list,
    update_ref,
    rootdir,
):
    """
    Call network simulator on REF and/or DUT encoder output.
    """
    # directories
    dut_out_dir = f"{dut_base_path}/param_file/enc"
    ref_out_dir = f"{reference_path}/param_file/enc"

    netsim_infile = sim_opts_list[1]
    netsim_outfile = sim_opts_list[2]
    netsim_tracefile = sim_opts_list[3]
    ref_out_file = f"{ref_out_dir}/{netsim_outfile}"

    if platform.system() == "Windows":
        netsim = [
            os.path.join(
                rootdir, "scripts", "tools", "Win32", "networkSimulator_g192.exe"
            )
        ]
    elif platform.system() in ["Linux", "Darwin"]:
        netsim = [
            os.path.join(
                rootdir, "scripts", "tools", platform.system(), "networkSimulator_g192"
            )
        ]
    else:
        assert False, f"networkSimulator_g192 not available for {platform.system()}"

    if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file):
        # call network simulator on REF encoder output
        cmd_opts = sim_opts_list
        cmd_opts[1] = f"{ref_out_dir}/{netsim_infile}"
        cmd_opts[2] = f"{ref_out_dir}/{netsim_outfile}"  # ref_out_file
        cmd_opts[3] = f"{ref_out_dir}/{netsim_tracefile}"
        run(netsim + cmd_opts, check=False)

    if update_ref in [0, 2]:
        # call network simulator on DUT encoder output
        cmd_opts = sim_opts_list
        cmd_opts[1] = f"{dut_out_dir}/{netsim_infile}"
        cmd_opts[2] = f"{dut_out_dir}/{netsim_outfile}"  # dut_out_file
        cmd_opts[3] = f"{dut_out_dir}/{netsim_tracefile}"
        run(netsim + cmd_opts, check=False)
        
def error_insertion(
    reference_path,
    dut_base_path,
    eid_opts_list,
    update_ref,
    rootdir,
):
    """
    Call eid-xor to insert frame erasure on REF and/or DUT encoder output.
    """
    
    # directories
    dut_out_dir = f"{dut_base_path}/param_file/enc"
    ref_out_dir = f"{reference_path}/param_file/enc"

    eid_xor_infile = eid_opts_list[-3]
    eid_xor_outfile = eid_opts_list[-1]
    ref_out_file = f"{ref_out_dir}/{eid_xor_outfile}"

    if platform.system() == "Windows":
        eid_xor = [
            os.path.join(
                rootdir, "scripts", "tools", "Win32", "eid-xor.exe"
            )
        ]
    elif platform.system() in ["Linux", "Darwin"]:
        eid_xor = [
            os.path.join(
                rootdir, "scripts", "tools", platform.system(), "eid-xor"
            )
        ]
    else:
        assert False, f"eid-xor not available for {platform.system()}"


    if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file):
        # call eid-xor on REF encoder output
        cmd_opts = eid_opts_list
        cmd_opts[-3] = f"{ref_out_dir}/{eid_xor_infile}"
        cmd_opts[-1] = f"{ref_out_dir}/{eid_xor_outfile}"  # ref_out_file
        run(eid_xor + cmd_opts, check=False)
        
    if update_ref in [0, 2]:
        # call eid-xor on DUT encoder output
        cmd_opts = eid_opts_list
        cmd_opts[-3] = f"{dut_out_dir}/{eid_xor_infile}"
        cmd_opts[-1] = f"{dut_out_dir}/{eid_xor_outfile}"  # ref_out_file
        run(eid_xor + cmd_opts, check=False)

def decode(
    decoder_frontend,
    ref_decoder_frontend,
    reference_path,
    dut_base_path,
    output_config,
    sampling_rate,
    bitstream_file,
    output_file,
    dec_opts_list,
    update_ref,
    tracefile_dec,
):
    """
    Call REF and/or DUT decoder.
    """
    # directories
    dut_out_dir = f"{dut_base_path}/param_file/dec"
    ref_out_dir = f"{reference_path}/param_file/dec"

    dut_in_file = f"{dut_base_path}/param_file/enc/{bitstream_file}"
    ref_in_file = f"{reference_path}/param_file/enc/{bitstream_file}"
    dut_out_file = f"{dut_out_dir}/{output_file}"
    ref_out_file = f"{ref_out_dir}/{output_file}"

    if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file):
        check_and_makedir(ref_out_dir)
        add_option_list = dec_opts_list
        if tracefile_dec != "":
            add_option_list = [
                x if x != "tracefile_dec" else f"{ref_out_dir}/{tracefile_dec}"
                for x in dec_opts_list
            ]
        # call REF decoder
        ref_decoder_frontend.run(
            output_config,
            sampling_rate,
            ref_in_file,
            ref_out_file,
            add_option_list=add_option_list,
        )

        stdout = ref_decoder_frontend.stdout

    if update_ref in [0, 2]:
        check_and_makedir(dut_out_dir)
        add_option_list = dec_opts_list
        if tracefile_dec != "":
            add_option_list = [
                x if x != "tracefile_dec" else f"{dut_out_dir}/{tracefile_dec}"
                for x in dec_opts_list
            ]
        # call DUT decoder
        decoder_frontend.run(
            output_config,
            sampling_rate,
            dut_in_file,
            dut_out_file,
            add_option_list=add_option_list,
        )

        stdout = decoder_frontend.stdout

    return stdout


def get_expected_md_files(ref_output_file, enc_opts, output_config):
    """
    Based on input and output configs, get the filenames of MD files that are expected to being output by the decoder
    """

    if output_config.upper() != "EXT":
        return list()

    md_files = list()
    enc_opts_upper = [o.upper() for o in enc_opts]
    md_filename = Path(ref_output_file).name

    if any([o in enc_opts_upper for o in ["-MASA", "-ISM_MASA"]]):
        # always only one MD file, just add ending
        md_files.append(md_filename + ".met")

    for ism_opt in ["-ISM", "-ISM_MASA", "-ISM_SBA"]:
        # for ism MD, there are three modes that may output MD files
        # try to find any of them in the encoder options
        md_tmpl = md_filename + ".{}.csv"
        try:
            idx = enc_opts_upper.index(ism_opt)
            ism_num = int(enc_opts[idx + 1])
            md_files.extend([md_tmpl.format(i) for i in range(ism_num)])
            break
        except ValueError:
            pass

    return md_files