Loading tests/test_param_file_ltv.py 0 → 100644 +627 −0 Original line number Diff line number Diff line __copyright__ = """ (C) 2022-2023 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB, Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other contributors to this repository. All Rights Reserved. This software is protected by copyright law and by international treaties. The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB, Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other contributors to this repository retain full ownership rights in their respective contributions in the software. This notice grants no license of any kind, including but not limited to patent license, nor is any license granted by implication, estoppel or otherwise. Contributors are required to enter into the IVAS codec Public Collaboration agreement before making contributions. This software is provided "AS IS", without any express or implied warranties. The software is in the development stage. It is intended exclusively for experts who have experience with such software and solely for the purpose of inspection. All implied warranties of non-infringement, merchantability and fitness for a particular purpose are hereby disclaimed and excluded. Any dispute, controversy or claim arising under or in relation to providing this software shall be submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and the United Nations Convention on Contracts on the International Sales of Goods. """ __doc__ = """ Execute tests specified via a parameter file. """ import os from pathlib import Path import errno import platform import filecmp from subprocess import run import pytest from tests.cmp_pcm import cmp_pcm from tests.cut_pcm import cut_samples from tests.conftest import EncoderFrontend, DecoderFrontend #from tests.testconfig import PARAM_FILE VALID_DEC_OUTPUT_CONF = [ "MONO", "STEREO", "5_1", "7_1", "5_1_2", "5_1_4", "7_1_4", "FOA", "HOA2", "HOA3", "BINAURAL", "BINAURAL_ROOM_IR", "BINAURAL_ROOM_REVERB", "EXT", ] param_file_test_dict = {} PARAM_FILE = "scripts/config/self_test_ltv.prm" with open(PARAM_FILE, "r", encoding="UTF-8") as fp: data = fp.read() blocks = data.split("\n\n") for block in blocks: tag = "" enc_opts = "" dec_opts = "" sim_opts = "" eid_opts = "" for line in block.split("\n"): if line.startswith("// "): tag = line[3:] if line.startswith("../IVAS_cod "): enc_opts = line[12:] if line.startswith("../IVAS_dec "): dec_opts = line[12:] if line.startswith("networkSimulator_g192 "): sim_opts = line[22:] if line.startswith("eid-xor "): eid_opts = line[8:] if tag == "" or enc_opts == "" or dec_opts == "": # no complete parameter set continue if tag in param_file_test_dict: print("non-unique tag found - ignoring new entry") continue param_file_test_dict[tag] = (enc_opts, dec_opts, sim_opts, eid_opts) def check_and_makedir(dir_path): if not os.path.exists(dir_path): try: os.makedirs(dir_path) except OSError as e: if e.errno != errno.EEXIST: raise # raises the error again def convert_test_string_to_tag(test_string): """ Convert a test string (i.e. the test tag from the parameter file) to a tag string. Example: in: "DFT stereo at 13.2 kbps, 16kHz in, 16kHz out, DTX on, random FEC at 5%" out: "DFT_stereo_at_13_2_kbps_16kHz_in_16kHz_out_DTX_on_random_FEC_at_5_" """ # replace certain characters by "_" or remove them tag_str = "" replace_chars = " %.-()" remove_chars = "," for char in test_string: if char in replace_chars: tag_str += "_" elif char not in remove_chars: tag_str += char # replace double underscore by single one tag_str = "_".join(tag_str.split("__")) return tag_str @pytest.mark.create_ref @pytest.mark.parametrize("test_tag", list(param_file_test_dict.keys())) def test_param_file_tests( dut_encoder_frontend: EncoderFrontend, dut_decoder_frontend: DecoderFrontend, ref_encoder_frontend: EncoderFrontend, ref_decoder_frontend: DecoderFrontend, reference_path, dut_base_path, test_vector_path, update_ref, rootdir, keep_files, test_tag, get_mld, ): enc_opts, dec_opts, sim_opts, eid_opts = param_file_test_dict[test_tag] tag_str = convert_test_string_to_tag(test_tag) # evaluate encoder options enc_split = enc_opts.split() assert len(enc_split) >= 4 # replace "testv/" by test vector path enc_split = [ x.replace("testv", f"{test_vector_path}", 1) if x.startswith("testv/") else x for x in enc_split ] bitstream_file = enc_split.pop() testv_file = enc_split.pop() fs = enc_split.pop() sampling_rate = int(fs) bitrate = enc_split.pop() sba_br_switching_dtx = 0 if ( not bitrate.isdigit() and "-dtx" in enc_split and "-sba" in enc_split and testv_file.split("/")[1].startswith("stv") ): sba_br_switching_dtx = 1 cut_file = pre_proc_input(testv_file, fs) testv_file = cut_file # bitrate can be a filename: remove leading "../" if bitrate.startswith("../"): bitrate = bitrate[3:] testv_base = testv_file.split("/")[-1] if testv_base.endswith(".pcm"): testv_base = testv_base[:-4] assert bitstream_file == "bit" # in the parameter file, only "bit" is used as bitstream file name # -> construct bitstream filename bitstream_file = f"{testv_base}_{tag_str}.192" encode( dut_encoder_frontend, ref_encoder_frontend, reference_path, dut_base_path, bitrate, sampling_rate, testv_file, bitstream_file, enc_split, update_ref, ) if sba_br_switching_dtx == 1: is_exist = os.path.exists(cut_file) if is_exist: os.remove(cut_file) # check for networkSimulator_g192 command line if sim_opts != "": sim_split = sim_opts.split() assert len(sim_split) == 6, "networkSimulator_g192 expects 6 parameters" # [sim_profile, sim_input, sim_output, sim_trace, sim_nFPP, sim_offset] = sim_split if sim_split[0].startswith(("../")): # remove leading "../" sim_split[0] = sim_split[0][3:] assert sim_split[1] == "bit" # in the parameter file, only "bit" is used as bitstream file name # -> re-use bitstream filename from encoder call sim_split[1] = bitstream_file assert sim_split[2] == "netsimoutput" # in the parameter file, only "netsimoutput" is used as netsim output file name # -> construct netsim output file name netsim_outfile = f"{testv_base}_{tag_str}.netsimout" sim_split[2] = netsim_outfile assert sim_split[3] == "tracefile_sim" # in the parameter file, only "tracefile_sim" is used as trace output file name # -> construct trace output file name netsim_trace_outfile = f"{testv_base}_{tag_str}.netsimtrace" sim_split[3] = netsim_trace_outfile simulate( reference_path, dut_base_path, sim_split, update_ref, rootdir, ) # check for eid-xor command line if eid_opts != "": eid_split = eid_opts.split() assert len(eid_split) >= 3, "eid-xor expects at least 3 parameters" # [..., in_bs, err_pat_bs, out_bs] = eid_split if eid_split[-2].startswith(("../")): # remove leading "../" eid_split[-2] = eid_split[-2][3:] assert eid_split[-3] == "bit" # in the parameter file, only "bit" is used as the input bitstream file name # -> re-use bitstream filename from encoder call eid_split[-3] = bitstream_file assert eid_split[-1] == "bit_error" # in the parameter file, only "bit_error" is used as the output bitstream file name # -> construct netsim output file name eid_xor_outfile = f"{testv_base}_{tag_str}.fer.192" eid_split[-1] = eid_xor_outfile error_insertion( reference_path, dut_base_path, eid_split, update_ref, rootdir, ) # evaluate decoder options dec_split = dec_opts.split() assert len(dec_split) >= 3 # replace "testv/" by test vector path dec_split = [ x.replace("testv", f"{test_vector_path}", 1) if x.startswith("testv/") else x for x in dec_split ] # remove leading "../" dec_split = [x[3:] if x.startswith("../") else x for x in dec_split] output_file = dec_split.pop() bitstream_file_dec = dec_split.pop() sampling_rate = int(dec_split.pop()) if len(dec_split) > 0: output_config = dec_split.pop() if output_config.upper() not in VALID_DEC_OUTPUT_CONF: if not output_config.endswith(".txt"): # must be EVS tests with additional parameters - put param back dec_split.append(output_config) output_config = "" else: output_config = "" output_config_name = output_config if "/" in output_config: # the output config is a file output_config_name = os.path.splitext(os.path.basename(output_config))[0] tracefile_dec = "" if sim_opts != "": assert bitstream_file_dec == "netsimoutput" # in the parameter file, only "netsimoutput" is used as bitstream file name # -> re-use netsim_outfile bitstream_file = netsim_outfile tracefile_dec = f"{testv_base}_{tag_str}.dectrace" elif eid_opts != "": assert bitstream_file_dec == "bit_error" # in the parameter file, only "bit_error" is used as bitstream input file name # -> re-use eid_xor_outfile bitstream_file = eid_xor_outfile else: assert bitstream_file_dec == "bit" # in the parameter file, only "bit" is used as bitstream file name # -> re-use bitstream filename from encoder call # the output file is not the real output filename # -> construct output filename output_file = f"{testv_base}_{tag_str}.dec.wav" stdout = decode( dut_decoder_frontend, ref_decoder_frontend, reference_path, dut_base_path, output_config, sampling_rate, bitstream_file, output_file, dec_split, update_ref, tracefile_dec, ) if update_ref in [0, 2]: dut_output_file = f"{dut_base_path}/param_file/dec/{output_file}" ref_output_file = f"{reference_path}/param_file/dec/{output_file}" fs = int(sampling_rate) * 1000 output_differs, reason = cmp_pcm( dut_output_file, ref_output_file, output_config, fs, get_mld=get_mld ) md_out_files = get_expected_md_files(ref_output_file, enc_split, output_config) metadata_differs = False for md_file in md_out_files: dut_metadata_file = Path(f"{dut_base_path}/param_file/dec/{md_file}") ref_metadata_file = Path(f"{reference_path}/param_file/dec/{md_file}") try: if not filecmp.cmp(dut_metadata_file, ref_metadata_file): print("Output metadata differs for file: " + md_file) metadata_differs = True except FileNotFoundError: if not dut_metadata_file.exists(): print(f"DUT output metadata missing for expected file: " + md_file) if not ref_metadata_file.exists(): print(f"REF output metadata missing for expected file: " + md_file) metadata_differs = True if output_differs or metadata_differs: msg = "Difference between ref and dut in " if output_differs and metadata_differs: msg += "output and metadata" elif output_differs: msg += "output only" elif metadata_differs: msg += "metadata only" assert False, msg # remove DUT output files when test result is OK (to save disk space) if not keep_files: os.remove(f"{dut_base_path}/param_file/enc/{bitstream_file}") os.remove(f"{dut_base_path}/param_file/dec/{output_file}") if sim_opts != "": os.remove(f"{dut_base_path}/param_file/enc/{testv_base}_{tag_str}.192") os.remove(f"{dut_base_path}/param_file/enc/{netsim_trace_outfile}") os.remove(f"{dut_base_path}/param_file/dec/{tracefile_dec}") elif eid_opts != "": os.remove(f"{dut_base_path}/param_file/enc/{testv_base}_{tag_str}.192") def encode( dut_encoder_frontend, ref_encoder_frontend, reference_path, dut_base_path, bitrate, sampling_rate, testv_file, bitstream_file, enc_opts_list, update_ref, ): """ Call REF and/or DUT encoder. """ # directories dut_out_dir = f"{dut_base_path}/param_file/enc" ref_out_dir = f"{reference_path}/param_file/enc" ref_out_file = f"{ref_out_dir}/{bitstream_file}" dut_out_file = f"{dut_out_dir}/{bitstream_file}" if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file): check_and_makedir(ref_out_dir) # call REF encoder ref_encoder_frontend.run( bitrate, sampling_rate, testv_file, ref_out_file, add_option_list=enc_opts_list, ) if update_ref in [0, 2]: check_and_makedir(dut_out_dir) # call DUT encoder dut_encoder_frontend.run( bitrate, sampling_rate, testv_file, dut_out_file, add_option_list=enc_opts_list, ) def pre_proc_input(testv_file, fs): cut_from = "0.0" cut_len = "5.0" cut_gain = "0.004" if "stvFOA" in testv_file: num_channel = "4" elif "stv2OA" in testv_file: num_channel = "9" elif "stv3OA" in testv_file: num_channel = "16" cut_file = testv_file.replace(".wav", num_channel + "chn_" + cut_gain + ".wav") cut_samples( testv_file, cut_file, num_channel, fs + "000", cut_from, cut_len, cut_gain ) return cut_file def simulate( reference_path, dut_base_path, sim_opts_list, update_ref, rootdir, ): """ Call network simulator on REF and/or DUT encoder output. """ # directories dut_out_dir = f"{dut_base_path}/param_file/enc" ref_out_dir = f"{reference_path}/param_file/enc" netsim_infile = sim_opts_list[1] netsim_outfile = sim_opts_list[2] netsim_tracefile = sim_opts_list[3] ref_out_file = f"{ref_out_dir}/{netsim_outfile}" if platform.system() == "Windows": netsim = [ os.path.join( rootdir, "scripts", "tools", "Win32", "networkSimulator_g192.exe" ) ] elif platform.system() in ["Linux", "Darwin"]: netsim = [ os.path.join( rootdir, "scripts", "tools", platform.system(), "networkSimulator_g192" ) ] else: assert False, f"networkSimulator_g192 not available for {platform.system()}" if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file): # call network simulator on REF encoder output cmd_opts = sim_opts_list cmd_opts[1] = f"{ref_out_dir}/{netsim_infile}" cmd_opts[2] = f"{ref_out_dir}/{netsim_outfile}" # ref_out_file cmd_opts[3] = f"{ref_out_dir}/{netsim_tracefile}" run(netsim + cmd_opts, check=False) if update_ref in [0, 2]: # call network simulator on DUT encoder output cmd_opts = sim_opts_list cmd_opts[1] = f"{dut_out_dir}/{netsim_infile}" cmd_opts[2] = f"{dut_out_dir}/{netsim_outfile}" # dut_out_file cmd_opts[3] = f"{dut_out_dir}/{netsim_tracefile}" run(netsim + cmd_opts, check=False) def error_insertion( reference_path, dut_base_path, eid_opts_list, update_ref, rootdir, ): """ Call eid-xor to insert frame erasure on REF and/or DUT encoder output. """ # directories dut_out_dir = f"{dut_base_path}/param_file/enc" ref_out_dir = f"{reference_path}/param_file/enc" eid_xor_infile = eid_opts_list[-3] eid_xor_outfile = eid_opts_list[-1] ref_out_file = f"{ref_out_dir}/{eid_xor_outfile}" if platform.system() == "Windows": eid_xor = [ os.path.join( rootdir, "scripts", "tools", "Win32", "eid-xor.exe" ) ] elif platform.system() in ["Linux", "Darwin"]: eid_xor = [ os.path.join( rootdir, "scripts", "tools", platform.system(), "eid-xor" ) ] else: assert False, f"eid-xor not available for {platform.system()}" if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file): # call eid-xor on REF encoder output cmd_opts = eid_opts_list cmd_opts[-3] = f"{ref_out_dir}/{eid_xor_infile}" cmd_opts[-1] = f"{ref_out_dir}/{eid_xor_outfile}" # ref_out_file run(eid_xor + cmd_opts, check=False) if update_ref in [0, 2]: # call eid-xor on DUT encoder output cmd_opts = eid_opts_list cmd_opts[-3] = f"{dut_out_dir}/{eid_xor_infile}" cmd_opts[-1] = f"{dut_out_dir}/{eid_xor_outfile}" # ref_out_file run(eid_xor + cmd_opts, check=False) def decode( decoder_frontend, ref_decoder_frontend, reference_path, dut_base_path, output_config, sampling_rate, bitstream_file, output_file, dec_opts_list, update_ref, tracefile_dec, ): """ Call REF and/or DUT decoder. """ # directories dut_out_dir = f"{dut_base_path}/param_file/dec" ref_out_dir = f"{reference_path}/param_file/dec" dut_in_file = f"{dut_base_path}/param_file/enc/{bitstream_file}" ref_in_file = f"{reference_path}/param_file/enc/{bitstream_file}" dut_out_file = f"{dut_out_dir}/{output_file}" ref_out_file = f"{ref_out_dir}/{output_file}" if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file): check_and_makedir(ref_out_dir) add_option_list = dec_opts_list if tracefile_dec != "": add_option_list = [ x if x != "tracefile_dec" else f"{ref_out_dir}/{tracefile_dec}" for x in dec_opts_list ] # call REF decoder ref_decoder_frontend.run( output_config, sampling_rate, ref_in_file, ref_out_file, add_option_list=add_option_list, ) stdout = ref_decoder_frontend.stdout if update_ref in [0, 2]: check_and_makedir(dut_out_dir) add_option_list = dec_opts_list if tracefile_dec != "": add_option_list = [ x if x != "tracefile_dec" else f"{dut_out_dir}/{tracefile_dec}" for x in dec_opts_list ] # call DUT decoder decoder_frontend.run( output_config, sampling_rate, dut_in_file, dut_out_file, add_option_list=add_option_list, ) stdout = decoder_frontend.stdout return stdout def get_expected_md_files(ref_output_file, enc_opts, output_config): """ Based on input and output configs, get the filenames of MD files that are expected to being output by the decoder """ if output_config.upper() != "EXT": return list() md_files = list() enc_opts_upper = [o.upper() for o in enc_opts] md_filename = Path(ref_output_file).name if any([o in enc_opts_upper for o in ["-MASA", "-ISM_MASA"]]): # always only one MD file, just add ending md_files.append(md_filename + ".met") for ism_opt in ["-ISM", "-ISM_MASA", "-ISM_SBA"]: # for ism MD, there are three modes that may output MD files # try to find any of them in the encoder options md_tmpl = md_filename + ".{}.csv" try: idx = enc_opts_upper.index(ism_opt) ism_num = int(enc_opts[idx + 1]) md_files.extend([md_tmpl.format(i) for i in range(ism_num)]) break except ValueError: pass return md_files Loading
tests/test_param_file_ltv.py 0 → 100644 +627 −0 Original line number Diff line number Diff line __copyright__ = """ (C) 2022-2023 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB, Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other contributors to this repository. All Rights Reserved. This software is protected by copyright law and by international treaties. The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB, Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other contributors to this repository retain full ownership rights in their respective contributions in the software. This notice grants no license of any kind, including but not limited to patent license, nor is any license granted by implication, estoppel or otherwise. Contributors are required to enter into the IVAS codec Public Collaboration agreement before making contributions. This software is provided "AS IS", without any express or implied warranties. The software is in the development stage. It is intended exclusively for experts who have experience with such software and solely for the purpose of inspection. All implied warranties of non-infringement, merchantability and fitness for a particular purpose are hereby disclaimed and excluded. Any dispute, controversy or claim arising under or in relation to providing this software shall be submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and the United Nations Convention on Contracts on the International Sales of Goods. """ __doc__ = """ Execute tests specified via a parameter file. """ import os from pathlib import Path import errno import platform import filecmp from subprocess import run import pytest from tests.cmp_pcm import cmp_pcm from tests.cut_pcm import cut_samples from tests.conftest import EncoderFrontend, DecoderFrontend #from tests.testconfig import PARAM_FILE VALID_DEC_OUTPUT_CONF = [ "MONO", "STEREO", "5_1", "7_1", "5_1_2", "5_1_4", "7_1_4", "FOA", "HOA2", "HOA3", "BINAURAL", "BINAURAL_ROOM_IR", "BINAURAL_ROOM_REVERB", "EXT", ] param_file_test_dict = {} PARAM_FILE = "scripts/config/self_test_ltv.prm" with open(PARAM_FILE, "r", encoding="UTF-8") as fp: data = fp.read() blocks = data.split("\n\n") for block in blocks: tag = "" enc_opts = "" dec_opts = "" sim_opts = "" eid_opts = "" for line in block.split("\n"): if line.startswith("// "): tag = line[3:] if line.startswith("../IVAS_cod "): enc_opts = line[12:] if line.startswith("../IVAS_dec "): dec_opts = line[12:] if line.startswith("networkSimulator_g192 "): sim_opts = line[22:] if line.startswith("eid-xor "): eid_opts = line[8:] if tag == "" or enc_opts == "" or dec_opts == "": # no complete parameter set continue if tag in param_file_test_dict: print("non-unique tag found - ignoring new entry") continue param_file_test_dict[tag] = (enc_opts, dec_opts, sim_opts, eid_opts) def check_and_makedir(dir_path): if not os.path.exists(dir_path): try: os.makedirs(dir_path) except OSError as e: if e.errno != errno.EEXIST: raise # raises the error again def convert_test_string_to_tag(test_string): """ Convert a test string (i.e. the test tag from the parameter file) to a tag string. Example: in: "DFT stereo at 13.2 kbps, 16kHz in, 16kHz out, DTX on, random FEC at 5%" out: "DFT_stereo_at_13_2_kbps_16kHz_in_16kHz_out_DTX_on_random_FEC_at_5_" """ # replace certain characters by "_" or remove them tag_str = "" replace_chars = " %.-()" remove_chars = "," for char in test_string: if char in replace_chars: tag_str += "_" elif char not in remove_chars: tag_str += char # replace double underscore by single one tag_str = "_".join(tag_str.split("__")) return tag_str @pytest.mark.create_ref @pytest.mark.parametrize("test_tag", list(param_file_test_dict.keys())) def test_param_file_tests( dut_encoder_frontend: EncoderFrontend, dut_decoder_frontend: DecoderFrontend, ref_encoder_frontend: EncoderFrontend, ref_decoder_frontend: DecoderFrontend, reference_path, dut_base_path, test_vector_path, update_ref, rootdir, keep_files, test_tag, get_mld, ): enc_opts, dec_opts, sim_opts, eid_opts = param_file_test_dict[test_tag] tag_str = convert_test_string_to_tag(test_tag) # evaluate encoder options enc_split = enc_opts.split() assert len(enc_split) >= 4 # replace "testv/" by test vector path enc_split = [ x.replace("testv", f"{test_vector_path}", 1) if x.startswith("testv/") else x for x in enc_split ] bitstream_file = enc_split.pop() testv_file = enc_split.pop() fs = enc_split.pop() sampling_rate = int(fs) bitrate = enc_split.pop() sba_br_switching_dtx = 0 if ( not bitrate.isdigit() and "-dtx" in enc_split and "-sba" in enc_split and testv_file.split("/")[1].startswith("stv") ): sba_br_switching_dtx = 1 cut_file = pre_proc_input(testv_file, fs) testv_file = cut_file # bitrate can be a filename: remove leading "../" if bitrate.startswith("../"): bitrate = bitrate[3:] testv_base = testv_file.split("/")[-1] if testv_base.endswith(".pcm"): testv_base = testv_base[:-4] assert bitstream_file == "bit" # in the parameter file, only "bit" is used as bitstream file name # -> construct bitstream filename bitstream_file = f"{testv_base}_{tag_str}.192" encode( dut_encoder_frontend, ref_encoder_frontend, reference_path, dut_base_path, bitrate, sampling_rate, testv_file, bitstream_file, enc_split, update_ref, ) if sba_br_switching_dtx == 1: is_exist = os.path.exists(cut_file) if is_exist: os.remove(cut_file) # check for networkSimulator_g192 command line if sim_opts != "": sim_split = sim_opts.split() assert len(sim_split) == 6, "networkSimulator_g192 expects 6 parameters" # [sim_profile, sim_input, sim_output, sim_trace, sim_nFPP, sim_offset] = sim_split if sim_split[0].startswith(("../")): # remove leading "../" sim_split[0] = sim_split[0][3:] assert sim_split[1] == "bit" # in the parameter file, only "bit" is used as bitstream file name # -> re-use bitstream filename from encoder call sim_split[1] = bitstream_file assert sim_split[2] == "netsimoutput" # in the parameter file, only "netsimoutput" is used as netsim output file name # -> construct netsim output file name netsim_outfile = f"{testv_base}_{tag_str}.netsimout" sim_split[2] = netsim_outfile assert sim_split[3] == "tracefile_sim" # in the parameter file, only "tracefile_sim" is used as trace output file name # -> construct trace output file name netsim_trace_outfile = f"{testv_base}_{tag_str}.netsimtrace" sim_split[3] = netsim_trace_outfile simulate( reference_path, dut_base_path, sim_split, update_ref, rootdir, ) # check for eid-xor command line if eid_opts != "": eid_split = eid_opts.split() assert len(eid_split) >= 3, "eid-xor expects at least 3 parameters" # [..., in_bs, err_pat_bs, out_bs] = eid_split if eid_split[-2].startswith(("../")): # remove leading "../" eid_split[-2] = eid_split[-2][3:] assert eid_split[-3] == "bit" # in the parameter file, only "bit" is used as the input bitstream file name # -> re-use bitstream filename from encoder call eid_split[-3] = bitstream_file assert eid_split[-1] == "bit_error" # in the parameter file, only "bit_error" is used as the output bitstream file name # -> construct netsim output file name eid_xor_outfile = f"{testv_base}_{tag_str}.fer.192" eid_split[-1] = eid_xor_outfile error_insertion( reference_path, dut_base_path, eid_split, update_ref, rootdir, ) # evaluate decoder options dec_split = dec_opts.split() assert len(dec_split) >= 3 # replace "testv/" by test vector path dec_split = [ x.replace("testv", f"{test_vector_path}", 1) if x.startswith("testv/") else x for x in dec_split ] # remove leading "../" dec_split = [x[3:] if x.startswith("../") else x for x in dec_split] output_file = dec_split.pop() bitstream_file_dec = dec_split.pop() sampling_rate = int(dec_split.pop()) if len(dec_split) > 0: output_config = dec_split.pop() if output_config.upper() not in VALID_DEC_OUTPUT_CONF: if not output_config.endswith(".txt"): # must be EVS tests with additional parameters - put param back dec_split.append(output_config) output_config = "" else: output_config = "" output_config_name = output_config if "/" in output_config: # the output config is a file output_config_name = os.path.splitext(os.path.basename(output_config))[0] tracefile_dec = "" if sim_opts != "": assert bitstream_file_dec == "netsimoutput" # in the parameter file, only "netsimoutput" is used as bitstream file name # -> re-use netsim_outfile bitstream_file = netsim_outfile tracefile_dec = f"{testv_base}_{tag_str}.dectrace" elif eid_opts != "": assert bitstream_file_dec == "bit_error" # in the parameter file, only "bit_error" is used as bitstream input file name # -> re-use eid_xor_outfile bitstream_file = eid_xor_outfile else: assert bitstream_file_dec == "bit" # in the parameter file, only "bit" is used as bitstream file name # -> re-use bitstream filename from encoder call # the output file is not the real output filename # -> construct output filename output_file = f"{testv_base}_{tag_str}.dec.wav" stdout = decode( dut_decoder_frontend, ref_decoder_frontend, reference_path, dut_base_path, output_config, sampling_rate, bitstream_file, output_file, dec_split, update_ref, tracefile_dec, ) if update_ref in [0, 2]: dut_output_file = f"{dut_base_path}/param_file/dec/{output_file}" ref_output_file = f"{reference_path}/param_file/dec/{output_file}" fs = int(sampling_rate) * 1000 output_differs, reason = cmp_pcm( dut_output_file, ref_output_file, output_config, fs, get_mld=get_mld ) md_out_files = get_expected_md_files(ref_output_file, enc_split, output_config) metadata_differs = False for md_file in md_out_files: dut_metadata_file = Path(f"{dut_base_path}/param_file/dec/{md_file}") ref_metadata_file = Path(f"{reference_path}/param_file/dec/{md_file}") try: if not filecmp.cmp(dut_metadata_file, ref_metadata_file): print("Output metadata differs for file: " + md_file) metadata_differs = True except FileNotFoundError: if not dut_metadata_file.exists(): print(f"DUT output metadata missing for expected file: " + md_file) if not ref_metadata_file.exists(): print(f"REF output metadata missing for expected file: " + md_file) metadata_differs = True if output_differs or metadata_differs: msg = "Difference between ref and dut in " if output_differs and metadata_differs: msg += "output and metadata" elif output_differs: msg += "output only" elif metadata_differs: msg += "metadata only" assert False, msg # remove DUT output files when test result is OK (to save disk space) if not keep_files: os.remove(f"{dut_base_path}/param_file/enc/{bitstream_file}") os.remove(f"{dut_base_path}/param_file/dec/{output_file}") if sim_opts != "": os.remove(f"{dut_base_path}/param_file/enc/{testv_base}_{tag_str}.192") os.remove(f"{dut_base_path}/param_file/enc/{netsim_trace_outfile}") os.remove(f"{dut_base_path}/param_file/dec/{tracefile_dec}") elif eid_opts != "": os.remove(f"{dut_base_path}/param_file/enc/{testv_base}_{tag_str}.192") def encode( dut_encoder_frontend, ref_encoder_frontend, reference_path, dut_base_path, bitrate, sampling_rate, testv_file, bitstream_file, enc_opts_list, update_ref, ): """ Call REF and/or DUT encoder. """ # directories dut_out_dir = f"{dut_base_path}/param_file/enc" ref_out_dir = f"{reference_path}/param_file/enc" ref_out_file = f"{ref_out_dir}/{bitstream_file}" dut_out_file = f"{dut_out_dir}/{bitstream_file}" if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file): check_and_makedir(ref_out_dir) # call REF encoder ref_encoder_frontend.run( bitrate, sampling_rate, testv_file, ref_out_file, add_option_list=enc_opts_list, ) if update_ref in [0, 2]: check_and_makedir(dut_out_dir) # call DUT encoder dut_encoder_frontend.run( bitrate, sampling_rate, testv_file, dut_out_file, add_option_list=enc_opts_list, ) def pre_proc_input(testv_file, fs): cut_from = "0.0" cut_len = "5.0" cut_gain = "0.004" if "stvFOA" in testv_file: num_channel = "4" elif "stv2OA" in testv_file: num_channel = "9" elif "stv3OA" in testv_file: num_channel = "16" cut_file = testv_file.replace(".wav", num_channel + "chn_" + cut_gain + ".wav") cut_samples( testv_file, cut_file, num_channel, fs + "000", cut_from, cut_len, cut_gain ) return cut_file def simulate( reference_path, dut_base_path, sim_opts_list, update_ref, rootdir, ): """ Call network simulator on REF and/or DUT encoder output. """ # directories dut_out_dir = f"{dut_base_path}/param_file/enc" ref_out_dir = f"{reference_path}/param_file/enc" netsim_infile = sim_opts_list[1] netsim_outfile = sim_opts_list[2] netsim_tracefile = sim_opts_list[3] ref_out_file = f"{ref_out_dir}/{netsim_outfile}" if platform.system() == "Windows": netsim = [ os.path.join( rootdir, "scripts", "tools", "Win32", "networkSimulator_g192.exe" ) ] elif platform.system() in ["Linux", "Darwin"]: netsim = [ os.path.join( rootdir, "scripts", "tools", platform.system(), "networkSimulator_g192" ) ] else: assert False, f"networkSimulator_g192 not available for {platform.system()}" if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file): # call network simulator on REF encoder output cmd_opts = sim_opts_list cmd_opts[1] = f"{ref_out_dir}/{netsim_infile}" cmd_opts[2] = f"{ref_out_dir}/{netsim_outfile}" # ref_out_file cmd_opts[3] = f"{ref_out_dir}/{netsim_tracefile}" run(netsim + cmd_opts, check=False) if update_ref in [0, 2]: # call network simulator on DUT encoder output cmd_opts = sim_opts_list cmd_opts[1] = f"{dut_out_dir}/{netsim_infile}" cmd_opts[2] = f"{dut_out_dir}/{netsim_outfile}" # dut_out_file cmd_opts[3] = f"{dut_out_dir}/{netsim_tracefile}" run(netsim + cmd_opts, check=False) def error_insertion( reference_path, dut_base_path, eid_opts_list, update_ref, rootdir, ): """ Call eid-xor to insert frame erasure on REF and/or DUT encoder output. """ # directories dut_out_dir = f"{dut_base_path}/param_file/enc" ref_out_dir = f"{reference_path}/param_file/enc" eid_xor_infile = eid_opts_list[-3] eid_xor_outfile = eid_opts_list[-1] ref_out_file = f"{ref_out_dir}/{eid_xor_outfile}" if platform.system() == "Windows": eid_xor = [ os.path.join( rootdir, "scripts", "tools", "Win32", "eid-xor.exe" ) ] elif platform.system() in ["Linux", "Darwin"]: eid_xor = [ os.path.join( rootdir, "scripts", "tools", platform.system(), "eid-xor" ) ] else: assert False, f"eid-xor not available for {platform.system()}" if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file): # call eid-xor on REF encoder output cmd_opts = eid_opts_list cmd_opts[-3] = f"{ref_out_dir}/{eid_xor_infile}" cmd_opts[-1] = f"{ref_out_dir}/{eid_xor_outfile}" # ref_out_file run(eid_xor + cmd_opts, check=False) if update_ref in [0, 2]: # call eid-xor on DUT encoder output cmd_opts = eid_opts_list cmd_opts[-3] = f"{dut_out_dir}/{eid_xor_infile}" cmd_opts[-1] = f"{dut_out_dir}/{eid_xor_outfile}" # ref_out_file run(eid_xor + cmd_opts, check=False) def decode( decoder_frontend, ref_decoder_frontend, reference_path, dut_base_path, output_config, sampling_rate, bitstream_file, output_file, dec_opts_list, update_ref, tracefile_dec, ): """ Call REF and/or DUT decoder. """ # directories dut_out_dir = f"{dut_base_path}/param_file/dec" ref_out_dir = f"{reference_path}/param_file/dec" dut_in_file = f"{dut_base_path}/param_file/enc/{bitstream_file}" ref_in_file = f"{reference_path}/param_file/enc/{bitstream_file}" dut_out_file = f"{dut_out_dir}/{output_file}" ref_out_file = f"{ref_out_dir}/{output_file}" if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file): check_and_makedir(ref_out_dir) add_option_list = dec_opts_list if tracefile_dec != "": add_option_list = [ x if x != "tracefile_dec" else f"{ref_out_dir}/{tracefile_dec}" for x in dec_opts_list ] # call REF decoder ref_decoder_frontend.run( output_config, sampling_rate, ref_in_file, ref_out_file, add_option_list=add_option_list, ) stdout = ref_decoder_frontend.stdout if update_ref in [0, 2]: check_and_makedir(dut_out_dir) add_option_list = dec_opts_list if tracefile_dec != "": add_option_list = [ x if x != "tracefile_dec" else f"{dut_out_dir}/{tracefile_dec}" for x in dec_opts_list ] # call DUT decoder decoder_frontend.run( output_config, sampling_rate, dut_in_file, dut_out_file, add_option_list=add_option_list, ) stdout = decoder_frontend.stdout return stdout def get_expected_md_files(ref_output_file, enc_opts, output_config): """ Based on input and output configs, get the filenames of MD files that are expected to being output by the decoder """ if output_config.upper() != "EXT": return list() md_files = list() enc_opts_upper = [o.upper() for o in enc_opts] md_filename = Path(ref_output_file).name if any([o in enc_opts_upper for o in ["-MASA", "-ISM_MASA"]]): # always only one MD file, just add ending md_files.append(md_filename + ".met") for ism_opt in ["-ISM", "-ISM_MASA", "-ISM_SBA"]: # for ism MD, there are three modes that may output MD files # try to find any of them in the encoder options md_tmpl = md_filename + ".{}.csv" try: idx = enc_opts_upper.index(ism_opt) ism_num = int(enc_opts[idx + 1]) md_files.extend([md_tmpl.format(i) for i in range(ism_num)]) break except ValueError: pass return md_files