diff --git a/tests/test_param_file_ltv.py b/tests/test_param_file_ltv.py new file mode 100644 index 0000000000000000000000000000000000000000..055cd2415471b1aa5dd7e8c723783fb8b93cd127 --- /dev/null +++ b/tests/test_param_file_ltv.py @@ -0,0 +1,627 @@ +__copyright__ = """ +(C) 2022-2023 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB, +Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., +Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, +Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other +contributors to this repository. All Rights Reserved. + +This software is protected by copyright law and by international treaties. +The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB, +Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., +Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, +Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other +contributors to this repository retain full ownership rights in their respective contributions in +the software. This notice grants no license of any kind, including but not limited to patent +license, nor is any license granted by implication, estoppel or otherwise. + +Contributors are required to enter into the IVAS codec Public Collaboration agreement before making +contributions. + +This software is provided "AS IS", without any express or implied warranties. The software is in the +development stage. It is intended exclusively for experts who have experience with such software and +solely for the purpose of inspection. All implied warranties of non-infringement, merchantability +and fitness for a particular purpose are hereby disclaimed and excluded. + +Any dispute, controversy or claim arising under or in relation to providing this software shall be +submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in +accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and +the United Nations Convention on Contracts on the International Sales of Goods. +""" + +__doc__ = """ +Execute tests specified via a parameter file. +""" + +import os +from pathlib import Path +import errno +import platform +import filecmp +from subprocess import run +import pytest +from tests.cmp_pcm import cmp_pcm +from tests.cut_pcm import cut_samples +from tests.conftest import EncoderFrontend, DecoderFrontend +#from tests.testconfig import PARAM_FILE + + +VALID_DEC_OUTPUT_CONF = [ + "MONO", + "STEREO", + "5_1", + "7_1", + "5_1_2", + "5_1_4", + "7_1_4", + "FOA", + "HOA2", + "HOA3", + "BINAURAL", + "BINAURAL_ROOM_IR", + "BINAURAL_ROOM_REVERB", + "EXT", +] + +param_file_test_dict = {} +PARAM_FILE = "scripts/config/self_test_ltv.prm" +with open(PARAM_FILE, "r", encoding="UTF-8") as fp: + data = fp.read() + blocks = data.split("\n\n") + for block in blocks: + tag = "" + enc_opts = "" + dec_opts = "" + sim_opts = "" + eid_opts = "" + for line in block.split("\n"): + if line.startswith("// "): + tag = line[3:] + if line.startswith("../IVAS_cod "): + enc_opts = line[12:] + if line.startswith("../IVAS_dec "): + dec_opts = line[12:] + if line.startswith("networkSimulator_g192 "): + sim_opts = line[22:] + if line.startswith("eid-xor "): + eid_opts = line[8:] + if tag == "" or enc_opts == "" or dec_opts == "": + # no complete parameter set + continue + if tag in param_file_test_dict: + print("non-unique tag found - ignoring new entry") + continue + param_file_test_dict[tag] = (enc_opts, dec_opts, sim_opts, eid_opts) + + +def check_and_makedir(dir_path): + if not os.path.exists(dir_path): + try: + os.makedirs(dir_path) + except OSError as e: + if e.errno != errno.EEXIST: + raise # raises the error again + + +def convert_test_string_to_tag(test_string): + """ + Convert a test string (i.e. the test tag from the parameter file) to a tag string. + Example: + in: "DFT stereo at 13.2 kbps, 16kHz in, 16kHz out, DTX on, random FEC at 5%" + out: "DFT_stereo_at_13_2_kbps_16kHz_in_16kHz_out_DTX_on_random_FEC_at_5_" + """ + # replace certain characters by "_" or remove them + tag_str = "" + replace_chars = " %.-()" + remove_chars = "," + for char in test_string: + if char in replace_chars: + tag_str += "_" + elif char not in remove_chars: + tag_str += char + # replace double underscore by single one + tag_str = "_".join(tag_str.split("__")) + return tag_str + + +@pytest.mark.create_ref +@pytest.mark.parametrize("test_tag", list(param_file_test_dict.keys())) +def test_param_file_tests( + dut_encoder_frontend: EncoderFrontend, + dut_decoder_frontend: DecoderFrontend, + ref_encoder_frontend: EncoderFrontend, + ref_decoder_frontend: DecoderFrontend, + reference_path, + dut_base_path, + test_vector_path, + update_ref, + rootdir, + keep_files, + test_tag, + get_mld, +): + enc_opts, dec_opts, sim_opts, eid_opts = param_file_test_dict[test_tag] + + tag_str = convert_test_string_to_tag(test_tag) + + # evaluate encoder options + enc_split = enc_opts.split() + assert len(enc_split) >= 4 + + # replace "testv/" by test vector path + enc_split = [ + x.replace("testv", f"{test_vector_path}", 1) if x.startswith("testv/") else x + for x in enc_split + ] + + bitstream_file = enc_split.pop() + testv_file = enc_split.pop() + fs = enc_split.pop() + sampling_rate = int(fs) + bitrate = enc_split.pop() + + sba_br_switching_dtx = 0 + if ( + not bitrate.isdigit() + and "-dtx" in enc_split + and "-sba" in enc_split + and testv_file.split("/")[1].startswith("stv") + ): + sba_br_switching_dtx = 1 + cut_file = pre_proc_input(testv_file, fs) + testv_file = cut_file + + # bitrate can be a filename: remove leading "../" + if bitrate.startswith("../"): + bitrate = bitrate[3:] + + testv_base = testv_file.split("/")[-1] + if testv_base.endswith(".pcm"): + testv_base = testv_base[:-4] + + assert bitstream_file == "bit" + # in the parameter file, only "bit" is used as bitstream file name + # -> construct bitstream filename + bitstream_file = f"{testv_base}_{tag_str}.192" + + encode( + dut_encoder_frontend, + ref_encoder_frontend, + reference_path, + dut_base_path, + bitrate, + sampling_rate, + testv_file, + bitstream_file, + enc_split, + update_ref, + ) + if sba_br_switching_dtx == 1: + is_exist = os.path.exists(cut_file) + if is_exist: + os.remove(cut_file) + + # check for networkSimulator_g192 command line + if sim_opts != "": + sim_split = sim_opts.split() + assert len(sim_split) == 6, "networkSimulator_g192 expects 6 parameters" + # [sim_profile, sim_input, sim_output, sim_trace, sim_nFPP, sim_offset] = sim_split + if sim_split[0].startswith(("../")): + # remove leading "../" + sim_split[0] = sim_split[0][3:] + assert sim_split[1] == "bit" + # in the parameter file, only "bit" is used as bitstream file name + # -> re-use bitstream filename from encoder call + sim_split[1] = bitstream_file + assert sim_split[2] == "netsimoutput" + # in the parameter file, only "netsimoutput" is used as netsim output file name + # -> construct netsim output file name + netsim_outfile = f"{testv_base}_{tag_str}.netsimout" + sim_split[2] = netsim_outfile + assert sim_split[3] == "tracefile_sim" + # in the parameter file, only "tracefile_sim" is used as trace output file name + # -> construct trace output file name + netsim_trace_outfile = f"{testv_base}_{tag_str}.netsimtrace" + sim_split[3] = netsim_trace_outfile + simulate( + reference_path, + dut_base_path, + sim_split, + update_ref, + rootdir, + ) + + # check for eid-xor command line + if eid_opts != "": + eid_split = eid_opts.split() + assert len(eid_split) >= 3, "eid-xor expects at least 3 parameters" + # [..., in_bs, err_pat_bs, out_bs] = eid_split + if eid_split[-2].startswith(("../")): + # remove leading "../" + eid_split[-2] = eid_split[-2][3:] + assert eid_split[-3] == "bit" + # in the parameter file, only "bit" is used as the input bitstream file name + # -> re-use bitstream filename from encoder call + eid_split[-3] = bitstream_file + assert eid_split[-1] == "bit_error" + # in the parameter file, only "bit_error" is used as the output bitstream file name + # -> construct netsim output file name + eid_xor_outfile = f"{testv_base}_{tag_str}.fer.192" + eid_split[-1] = eid_xor_outfile + error_insertion( + reference_path, + dut_base_path, + eid_split, + update_ref, + rootdir, + ) + + # evaluate decoder options + dec_split = dec_opts.split() + assert len(dec_split) >= 3 + + # replace "testv/" by test vector path + dec_split = [ + x.replace("testv", f"{test_vector_path}", 1) if x.startswith("testv/") else x + for x in dec_split + ] + # remove leading "../" + dec_split = [x[3:] if x.startswith("../") else x for x in dec_split] + + output_file = dec_split.pop() + bitstream_file_dec = dec_split.pop() + sampling_rate = int(dec_split.pop()) + if len(dec_split) > 0: + output_config = dec_split.pop() + if output_config.upper() not in VALID_DEC_OUTPUT_CONF: + if not output_config.endswith(".txt"): + # must be EVS tests with additional parameters - put param back + dec_split.append(output_config) + output_config = "" + else: + output_config = "" + + output_config_name = output_config + if "/" in output_config: + # the output config is a file + output_config_name = os.path.splitext(os.path.basename(output_config))[0] + + tracefile_dec = "" + if sim_opts != "": + assert bitstream_file_dec == "netsimoutput" + # in the parameter file, only "netsimoutput" is used as bitstream file name + # -> re-use netsim_outfile + bitstream_file = netsim_outfile + tracefile_dec = f"{testv_base}_{tag_str}.dectrace" + elif eid_opts != "": + assert bitstream_file_dec == "bit_error" + # in the parameter file, only "bit_error" is used as bitstream input file name + # -> re-use eid_xor_outfile + bitstream_file = eid_xor_outfile + else: + assert bitstream_file_dec == "bit" + # in the parameter file, only "bit" is used as bitstream file name + # -> re-use bitstream filename from encoder call + + + # the output file is not the real output filename + # -> construct output filename + output_file = f"{testv_base}_{tag_str}.dec.wav" + + stdout = decode( + dut_decoder_frontend, + ref_decoder_frontend, + reference_path, + dut_base_path, + output_config, + sampling_rate, + bitstream_file, + output_file, + dec_split, + update_ref, + tracefile_dec, + ) + + if update_ref in [0, 2]: + dut_output_file = f"{dut_base_path}/param_file/dec/{output_file}" + ref_output_file = f"{reference_path}/param_file/dec/{output_file}" + fs = int(sampling_rate) * 1000 + output_differs, reason = cmp_pcm( + dut_output_file, ref_output_file, output_config, fs, get_mld=get_mld + ) + md_out_files = get_expected_md_files(ref_output_file, enc_split, output_config) + + metadata_differs = False + for md_file in md_out_files: + dut_metadata_file = Path(f"{dut_base_path}/param_file/dec/{md_file}") + ref_metadata_file = Path(f"{reference_path}/param_file/dec/{md_file}") + try: + if not filecmp.cmp(dut_metadata_file, ref_metadata_file): + print("Output metadata differs for file: " + md_file) + metadata_differs = True + except FileNotFoundError: + if not dut_metadata_file.exists(): + print(f"DUT output metadata missing for expected file: " + md_file) + if not ref_metadata_file.exists(): + print(f"REF output metadata missing for expected file: " + md_file) + metadata_differs = True + + if output_differs or metadata_differs: + msg = "Difference between ref and dut in " + if output_differs and metadata_differs: + msg += "output and metadata" + elif output_differs: + msg += "output only" + elif metadata_differs: + msg += "metadata only" + + assert False, msg + + # remove DUT output files when test result is OK (to save disk space) + if not keep_files: + os.remove(f"{dut_base_path}/param_file/enc/{bitstream_file}") + os.remove(f"{dut_base_path}/param_file/dec/{output_file}") + if sim_opts != "": + os.remove(f"{dut_base_path}/param_file/enc/{testv_base}_{tag_str}.192") + os.remove(f"{dut_base_path}/param_file/enc/{netsim_trace_outfile}") + os.remove(f"{dut_base_path}/param_file/dec/{tracefile_dec}") + elif eid_opts != "": + os.remove(f"{dut_base_path}/param_file/enc/{testv_base}_{tag_str}.192") + + + +def encode( + dut_encoder_frontend, + ref_encoder_frontend, + reference_path, + dut_base_path, + bitrate, + sampling_rate, + testv_file, + bitstream_file, + enc_opts_list, + update_ref, +): + """ + Call REF and/or DUT encoder. + """ + # directories + dut_out_dir = f"{dut_base_path}/param_file/enc" + ref_out_dir = f"{reference_path}/param_file/enc" + + ref_out_file = f"{ref_out_dir}/{bitstream_file}" + dut_out_file = f"{dut_out_dir}/{bitstream_file}" + + if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file): + check_and_makedir(ref_out_dir) + # call REF encoder + ref_encoder_frontend.run( + bitrate, + sampling_rate, + testv_file, + ref_out_file, + add_option_list=enc_opts_list, + ) + + if update_ref in [0, 2]: + check_and_makedir(dut_out_dir) + # call DUT encoder + dut_encoder_frontend.run( + bitrate, + sampling_rate, + testv_file, + dut_out_file, + add_option_list=enc_opts_list, + ) + + +def pre_proc_input(testv_file, fs): + cut_from = "0.0" + cut_len = "5.0" + cut_gain = "0.004" + if "stvFOA" in testv_file: + num_channel = "4" + elif "stv2OA" in testv_file: + num_channel = "9" + elif "stv3OA" in testv_file: + num_channel = "16" + cut_file = testv_file.replace(".wav", num_channel + "chn_" + cut_gain + ".wav") + cut_samples( + testv_file, cut_file, num_channel, fs + "000", cut_from, cut_len, cut_gain + ) + return cut_file + + +def simulate( + reference_path, + dut_base_path, + sim_opts_list, + update_ref, + rootdir, +): + """ + Call network simulator on REF and/or DUT encoder output. + """ + # directories + dut_out_dir = f"{dut_base_path}/param_file/enc" + ref_out_dir = f"{reference_path}/param_file/enc" + + netsim_infile = sim_opts_list[1] + netsim_outfile = sim_opts_list[2] + netsim_tracefile = sim_opts_list[3] + ref_out_file = f"{ref_out_dir}/{netsim_outfile}" + + if platform.system() == "Windows": + netsim = [ + os.path.join( + rootdir, "scripts", "tools", "Win32", "networkSimulator_g192.exe" + ) + ] + elif platform.system() in ["Linux", "Darwin"]: + netsim = [ + os.path.join( + rootdir, "scripts", "tools", platform.system(), "networkSimulator_g192" + ) + ] + else: + assert False, f"networkSimulator_g192 not available for {platform.system()}" + + if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file): + # call network simulator on REF encoder output + cmd_opts = sim_opts_list + cmd_opts[1] = f"{ref_out_dir}/{netsim_infile}" + cmd_opts[2] = f"{ref_out_dir}/{netsim_outfile}" # ref_out_file + cmd_opts[3] = f"{ref_out_dir}/{netsim_tracefile}" + run(netsim + cmd_opts, check=False) + + if update_ref in [0, 2]: + # call network simulator on DUT encoder output + cmd_opts = sim_opts_list + cmd_opts[1] = f"{dut_out_dir}/{netsim_infile}" + cmd_opts[2] = f"{dut_out_dir}/{netsim_outfile}" # dut_out_file + cmd_opts[3] = f"{dut_out_dir}/{netsim_tracefile}" + run(netsim + cmd_opts, check=False) + +def error_insertion( + reference_path, + dut_base_path, + eid_opts_list, + update_ref, + rootdir, +): + """ + Call eid-xor to insert frame erasure on REF and/or DUT encoder output. + """ + + # directories + dut_out_dir = f"{dut_base_path}/param_file/enc" + ref_out_dir = f"{reference_path}/param_file/enc" + + eid_xor_infile = eid_opts_list[-3] + eid_xor_outfile = eid_opts_list[-1] + ref_out_file = f"{ref_out_dir}/{eid_xor_outfile}" + + if platform.system() == "Windows": + eid_xor = [ + os.path.join( + rootdir, "scripts", "tools", "Win32", "eid-xor.exe" + ) + ] + elif platform.system() in ["Linux", "Darwin"]: + eid_xor = [ + os.path.join( + rootdir, "scripts", "tools", platform.system(), "eid-xor" + ) + ] + else: + assert False, f"eid-xor not available for {platform.system()}" + + + if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file): + # call eid-xor on REF encoder output + cmd_opts = eid_opts_list + cmd_opts[-3] = f"{ref_out_dir}/{eid_xor_infile}" + cmd_opts[-1] = f"{ref_out_dir}/{eid_xor_outfile}" # ref_out_file + run(eid_xor + cmd_opts, check=False) + + if update_ref in [0, 2]: + # call eid-xor on DUT encoder output + cmd_opts = eid_opts_list + cmd_opts[-3] = f"{dut_out_dir}/{eid_xor_infile}" + cmd_opts[-1] = f"{dut_out_dir}/{eid_xor_outfile}" # ref_out_file + run(eid_xor + cmd_opts, check=False) + +def decode( + decoder_frontend, + ref_decoder_frontend, + reference_path, + dut_base_path, + output_config, + sampling_rate, + bitstream_file, + output_file, + dec_opts_list, + update_ref, + tracefile_dec, +): + """ + Call REF and/or DUT decoder. + """ + # directories + dut_out_dir = f"{dut_base_path}/param_file/dec" + ref_out_dir = f"{reference_path}/param_file/dec" + + dut_in_file = f"{dut_base_path}/param_file/enc/{bitstream_file}" + ref_in_file = f"{reference_path}/param_file/enc/{bitstream_file}" + dut_out_file = f"{dut_out_dir}/{output_file}" + ref_out_file = f"{ref_out_dir}/{output_file}" + + if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file): + check_and_makedir(ref_out_dir) + add_option_list = dec_opts_list + if tracefile_dec != "": + add_option_list = [ + x if x != "tracefile_dec" else f"{ref_out_dir}/{tracefile_dec}" + for x in dec_opts_list + ] + # call REF decoder + ref_decoder_frontend.run( + output_config, + sampling_rate, + ref_in_file, + ref_out_file, + add_option_list=add_option_list, + ) + + stdout = ref_decoder_frontend.stdout + + if update_ref in [0, 2]: + check_and_makedir(dut_out_dir) + add_option_list = dec_opts_list + if tracefile_dec != "": + add_option_list = [ + x if x != "tracefile_dec" else f"{dut_out_dir}/{tracefile_dec}" + for x in dec_opts_list + ] + # call DUT decoder + decoder_frontend.run( + output_config, + sampling_rate, + dut_in_file, + dut_out_file, + add_option_list=add_option_list, + ) + + stdout = decoder_frontend.stdout + + return stdout + + +def get_expected_md_files(ref_output_file, enc_opts, output_config): + """ + Based on input and output configs, get the filenames of MD files that are expected to being output by the decoder + """ + + if output_config.upper() != "EXT": + return list() + + md_files = list() + enc_opts_upper = [o.upper() for o in enc_opts] + md_filename = Path(ref_output_file).name + + if any([o in enc_opts_upper for o in ["-MASA", "-ISM_MASA"]]): + # always only one MD file, just add ending + md_files.append(md_filename + ".met") + + for ism_opt in ["-ISM", "-ISM_MASA", "-ISM_SBA"]: + # for ism MD, there are three modes that may output MD files + # try to find any of them in the encoder options + md_tmpl = md_filename + ".{}.csv" + try: + idx = enc_opts_upper.index(ism_opt) + ism_num = int(enc_opts[idx + 1]) + md_files.extend([md_tmpl.format(i) for i in range(ism_num)]) + break + except ValueError: + pass + + return md_files