diff --git a/tests/cmp_stats_files.py b/tests/cmp_stats_files.py new file mode 100644 index 0000000000000000000000000000000000000000..bc191c87ee5e1c961c90f55c24dc50e588f19a22 --- /dev/null +++ b/tests/cmp_stats_files.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 + +import argparse +import os +import sys +import json +import numpy as np + + +def str2num(s): + """ + Convert string either to integer or float + """ + try: + return int(s) + except ValueError: + return float(s) + + +def cmp_stats_files( + ref_stats_file, dut_stats_file, min_enc_file_length_diff=0.1, min_enc_stats_diff=0.1 +) -> (int, str): + """ + Compare two .stats files containing encoder statistics (extracted from binary files) + """ + print(f"Comparing {os.path.basename(ref_stats_file)} between Ref and Dut ... \n") + + # open and read the .stats files + with open(ref_stats_file, "r") as f_aux: + ref_stats = json.load(f_aux) + # create dictionary to map "name" to the corresponding dictionaries + ref_stats_names = {d["name"]: d for d in ref_stats} + + with open(dut_stats_file, "r") as f_aux: + dut_stats = json.load(f_aux) + # create dictionary to map "name" to the corresponding dictionaries + dut_stats_names = {d["name"]: d for d in dut_stats} + + # loop over all common aux files + enc_test_result = 0 + enc_test_result_msg = "" + max_total_num_diff = 0 + max_total_num_diff_ratio = 0 + for name in ref_stats_names: + if name in dut_stats_names: + # retrieve the dictionaries + ref_stats_dict = ref_stats_names[name] + dut_stats_dict = dut_stats_names[name] + + msg = f"File {name}" + + # compare the file lengths + result_len_check = 0 + file_length = max(ref_stats_dict["length"], dut_stats_dict["length"]) + if ref_stats_dict["length"] != dut_stats_dict["length"]: + msg += f" has different length between Ref {ref_stats_dict['length']} and DuT {dut_stats_dict['length']}" + + # check if threshold has been exceeded + if ( + abs(ref_stats_dict["length"] - dut_stats_dict["length"]) + / file_length + > min_enc_file_length_diff + ): + result_len_check = 1 + + msg += ", " + + # remove the "name" and "length" keys for further processing + del ref_stats_dict["name"] + del dut_stats_dict["name"] + del ref_stats_dict["length"] + del dut_stats_dict["length"] + + # convert keys and values from string to float + ref_hist = {str2num(i): str2num(j) for i, j in ref_stats_dict.items()} + cut_hist = {str2num(i): str2num(j) for i, j in dut_stats_dict.items()} + delta_ref = set(cut_hist) - set(ref_hist) + delta_cut = set(ref_hist) - set(cut_hist) + + # append missing keys + for item in delta_cut: + cut_hist[item] = 0 + + for item in delta_ref: + ref_hist[item] = 0 + + ref_hist = dict(sorted(ref_hist.items())) + cut_hist = dict(sorted(cut_hist.items())) + + # caculate difference of statistics + diff_hist = {k: cut_hist[k] - ref_hist[k] for k in ref_hist.keys()} + + # calculate the total number of differences + total_num_diff = sum(np.abs(list(diff_hist.values()))) + total_num_diff_ratio = total_num_diff / ( + sum(ref_hist.values()) + sum(cut_hist.values()) + ) + + msg += f"the total number of differences is {total_num_diff} ({(total_num_diff_ratio*100):.2f}%)" + if total_num_diff_ratio > min_enc_stats_diff: + result_diff_check = 1 + msg += "! " + else: + result_diff_check = 0 + msg += ". " + + # check if the maximum difference has been exceeded + if total_num_diff_ratio > max_total_num_diff_ratio: + max_total_num_diff = total_num_diff + max_total_num_diff_ratio = total_num_diff_ratio + + # update test result + if result_len_check or result_diff_check: + enc_test_result = 1 + enc_test_result_msg += msg + + print(msg) + + if enc_test_result and max_total_num_diff > 0: + msg = f"MAXIMUM ENC DIFF: {max_total_num_diff} ({(max_total_num_diff_ratio*100):.2f}%) " + enc_test_result_msg += msg + print(msg) + + return enc_test_result, enc_test_result_msg + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + parser.add_argument("ref_stats_file", type=str) + parser.add_argument("dut_stats_file", type=str) + parser.add_argument( + "--min_enc_file_length_diff", + type=float, + default=0.1, + dest="min_enc_file_length_diff", + ) + parser.add_argument( + "--min_enc_stats_diff", type=float, default=0.1, dest="min_enc_stats_diff" + ) + args = parser.parse_args() + + enc_test_result, enc_test_result_msg = cmp_stats_files(**vars(args)) + print(enc_test_result_msg) + + sys.exit(enc_test_result) diff --git a/tests/codec_be_on_mr_nonselection/test_param_file.py b/tests/codec_be_on_mr_nonselection/test_param_file.py index 1feae1de3991c2539e83441c8f732aa684ff73df..9a11b4d6d42be2e143e56dfa2bef0b9b84963640 100644 --- a/tests/codec_be_on_mr_nonselection/test_param_file.py +++ b/tests/codec_be_on_mr_nonselection/test_param_file.py @@ -42,8 +42,15 @@ import pytest import numpy as np from tests.cmp_pcm import cmp_pcm +from tests.cmp_stats_files import cmp_stats_files from tests.conftest import DecoderFrontend, EncoderFrontend, parse_properties from tests.testconfig import PARAM_FILE +from tests.constants import ( + MIN_ENC_FILE_LENGTH_DIFF, + MIN_ENC_STATS_DIFF, + SCRIPTS_DIR, +) + VALID_DEC_OUTPUT_CONF = [ "MONO", @@ -134,6 +141,7 @@ def convert_test_string_to_tag(test_string): def test_param_file_tests( record_property, props_to_record, + encoder_only, decoder_only, dut_encoder_frontend: EncoderFrontend, dut_decoder_frontend: DecoderFrontend, @@ -151,6 +159,7 @@ def test_param_file_tests( get_mld_lim, abs_tol, get_ssnr, + get_enc_stats, get_odg, ): enc_opts, dec_opts, sim_opts, eid_opts = param_file_test_dict[test_tag] @@ -175,7 +184,7 @@ def test_param_file_tests( # bitrate can be a filename: remove leading "../" if bitrate.startswith("../"): - bitrate = bitrate[3:] + bitrate = Path(bitrate[3:]).absolute() testv_base = testv_file.split("/")[-1] if testv_base.endswith(".pcm"): @@ -186,6 +195,9 @@ def test_param_file_tests( # -> construct bitstream filename bitstream_file = f"{testv_base}_{tag_str}.192" + cmp_result_msg = "" + enc_test_result = 0 + if not decoder_only: encode( dut_encoder_frontend, @@ -198,8 +210,42 @@ def test_param_file_tests( bitstream_file, enc_split, update_ref, + get_enc_stats, ) + # compare binary files extracted from the encoder + if update_ref in [0, 2] and get_enc_stats: + print("Comparing encoder files") + print("=======================\n") + + stats_file = bitstream_file.replace(".192", ".stats") + ref_stats_file = f"{reference_path}/param_file/enc/{stats_file}" + dut_stats_file = f"{dut_base_path}/param_file/enc/{stats_file}" + + # compare ref and dut .stats files + enc_test_result, enc_test_result_msg = cmp_stats_files( + ref_stats_file, + dut_stats_file, + min_enc_file_length_diff=MIN_ENC_FILE_LENGTH_DIFF, + min_enc_stats_diff=MIN_ENC_STATS_DIFF, + ) + + print("") + cmp_result_msg += enc_test_result_msg + props = parse_properties(cmp_result_msg, False, props_to_record) + for k, v in props.items(): + record_property(k, v) + + if enc_test_result: + pytest.fail("Too high difference in encoder statistics found.") + else: + # remove DUT stats file when test result is OK (to save disk space) + if not keep_files: + os.remove(dut_stats_file) + + if encoder_only: + return # don't proceed with the decoder if user specified --encoder_only on the command line + # check for networkSimulator_g192 command line if sim_opts != "": sim_split = sim_opts.split() @@ -232,6 +278,7 @@ def test_param_file_tests( ) # check for eid-xor command line + if eid_opts != "": eid_split = eid_opts.split() assert len(eid_split) >= 3, "eid-xor expects at least 3 parameters" @@ -248,6 +295,7 @@ def test_param_file_tests( # -> construct netsim output file name eid_xor_outfile = f"{testv_base}_{tag_str}.fer.192" eid_split[-1] = eid_xor_outfile + error_insertion( reference_path, dut_base_path, @@ -268,6 +316,11 @@ def test_param_file_tests( ] # remove leading "../" dec_split = [x[3:] if x.startswith("../") else x for x in dec_split] + # convert "scripts/" paths into absolute ones + dec_split = [ + str(SCRIPTS_DIR.joinpath(x[8:])) if x.startswith("scripts/") else x + for x in dec_split + ] output_file = dec_split.pop() bitstream_file_dec = dec_split.pop() @@ -330,8 +383,12 @@ def test_param_file_tests( ref_tracefile_dec = f"{reference_path}/param_file/dec/{tracefile_dec}" # check for same RTP sequence number in last line of tracefile - dut_rtp_num_last = np.genfromtxt(dut_tracefile_dec, delimiter=";", usecols=[0])[-1] - ref_rtp_num_last = np.genfromtxt(ref_tracefile_dec, delimiter=";", usecols=[0])[-1] + dut_rtp_num_last = np.genfromtxt( + dut_tracefile_dec, delimiter=";", usecols=[0] + )[-1] + ref_rtp_num_last = np.genfromtxt( + ref_tracefile_dec, delimiter=";", usecols=[0] + )[-1] tracefile_last_rtp_numbers_differ = dut_rtp_num_last != ref_rtp_num_last # same sequence number -> likely no crash, assume length difference is due to difference in TSM @@ -354,7 +411,9 @@ def test_param_file_tests( ) md_out_files = get_expected_md_files(ref_output_file, enc_split, output_config) - props = parse_properties(reason, output_differs, props_to_record) + cmp_result_msg += reason + + props = parse_properties(cmp_result_msg, output_differs, props_to_record) for k, v in props.items(): record_property(k, v) @@ -391,6 +450,9 @@ def test_param_file_tests( msg += "metadata only" pytest.fail(msg) + if enc_test_result: + pytest.fail("Too high difference in encoder statistics found.") + # remove DUT output files when test result is OK (to save disk space) if not keep_files: os.remove(f"{dut_base_path}/param_file/dec/{output_file}") @@ -419,6 +481,7 @@ def encode( bitstream_file, enc_opts_list, update_ref, + get_enc_stats=False, ): """ Call REF and/or DUT encoder. @@ -430,8 +493,17 @@ def encode( ref_out_file = f"{ref_out_dir}/{bitstream_file}" dut_out_file = f"{dut_out_dir}/{bitstream_file}" - if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file): + if get_enc_stats: + stats_file = bitstream_file.replace(".192", ".stats") + ref_stats_file = f"{ref_out_dir}/{stats_file}" + dut_stats_file = f"{dut_out_dir}/{stats_file}" + else: + ref_stats_file = None + dut_stats_file = None + + if update_ref in [1, 2] and not os.path.exists(ref_out_file): check_and_makedir(ref_out_dir) + # call REF encoder ref_encoder_frontend.run( bitrate, @@ -439,10 +511,12 @@ def encode( testv_file, ref_out_file, add_option_list=enc_opts_list, + stats_file=ref_stats_file, ) if update_ref in [0, 2]: check_and_makedir(dut_out_dir) + # call DUT encoder dut_encoder_frontend.run( bitrate, @@ -450,6 +524,7 @@ def encode( testv_file, dut_out_file, add_option_list=enc_opts_list, + stats_file=dut_stats_file, ) diff --git a/tests/codec_be_on_mr_nonselection/test_sba.py b/tests/codec_be_on_mr_nonselection/test_sba.py index b3041bba4f184e377f471569ac05917c24b01442..667169d4bce66d0fe5dd51ac1343381b1b90ab34 100644 --- a/tests/codec_be_on_mr_nonselection/test_sba.py +++ b/tests/codec_be_on_mr_nonselection/test_sba.py @@ -35,22 +35,20 @@ __doc__ = """ import errno import os - import pytest from cut_bs import cut_from_start from tests.cmp_pcm import cmp_pcm from tests.conftest import DecoderFrontend, EncoderFrontend from ..conftest import parse_properties -from ..constants import TESTV_DIR +from ..cmp_stats_files import cmp_stats_files +from ..constants import TESTV_DIR, MIN_ENC_FILE_LENGTH_DIFF, MIN_ENC_STATS_DIFF from tests.testconfig import use_ltv -# params tag_list = ["ltvFOA" if use_ltv else "stvFOA"] tag_list_HOA2 = ["ltvHOA2" if use_ltv else "stv2OA"] tag_list_HOA3 = ["ltvHOA3" if use_ltv else "stv3OA"] - tag_list_bw_force = tag_list dtx_set = ["0", "1"] dict_fsample_bw = {"48": "3", "32": "2", "16": "1"} @@ -73,9 +71,7 @@ ivas_br_HOA3 = ["256000", "384000", "512000"] SID_list = [0, 1] sample_rate_list = ["48", "32", "16"] gain_list = [0, 1] - sample_rate_bw_idx_list = [("48", "SWB"), ("48", "WB"), ("32", "WB")] - AbsTol = "0" # params for PLC test @@ -94,7 +90,7 @@ def check_and_makedir(dir_path): @pytest.mark.parametrize("tag", tag_list) -@pytest.mark.parametrize("fs", sample_rate_list) +@pytest.mark.parametrize("sampling_rate", sample_rate_list) def test_pca_enc( record_property, props_to_record, @@ -105,91 +101,101 @@ def test_pca_enc( dut_base_path, ref_encoder_frontend, ref_decoder_frontend, + br_switch_file_path, update_ref, keep_files, tag, - fs, + sampling_rate, get_mld, get_mld_lim, + encoder_only, decoder_only, abs_tol, get_ssnr, get_odg, + get_enc_stats, ): pca = True - ivas_br = "256000" + bitrate = "256000" dtx = "0" + sid = 0 max_bw = "FB" gain_flag = -1 sba_order = "+1" output_config = "FOA" - + cut_testv = True + cut_gain = "1.0" + plc_pattern = None + if "ltv" in tag: - tag = f"ltv{fs}_FOA" + tag = f"ltv{sampling_rate}_FOA" cut_testv = False elif "stv" in tag: - tag = tag + fs + "c" - cut_testv=True + tag = tag + sampling_rate + "c" + cut_testv = True else: - assert 0 + assert 0 if not decoder_only: - # enc sba_enc( dut_encoder_frontend, test_vector_path, ref_encoder_frontend, reference_path, dut_base_path, - None, + br_switch_file_path, tag, - fs, - ivas_br, + sampling_rate, + bitrate, dtx, - None, + sid, max_bw, sba_order, update_ref, gain_flag, - keep_files, + keep_files=keep_files, + cut_gain=cut_gain, cut_testv=cut_testv, pca=pca, + plc_pattern=plc_pattern, + get_enc_stats=get_enc_stats, + ) + + if not encoder_only: + sba_dec( + record_property, + props_to_record, + dut_decoder_frontend, + ref_decoder_frontend, + reference_path, + dut_base_path, + tag, + sampling_rate, + bitrate, + dtx, + sid, + max_bw, + output_config, + update_ref, + gain_flag, + keep_files, + decoder_only, + plc_pattern=plc_pattern, + get_mld=get_mld, + get_mld_lim=get_mld_lim, + pca=pca, + abs_tol=abs_tol, + get_ssnr=get_ssnr, + get_odg=get_odg, ) - # dec - sba_dec( - record_property, - props_to_record, - dut_decoder_frontend, - ref_decoder_frontend, - reference_path, - dut_base_path, - tag, - fs, - ivas_br, - dtx, - None, - max_bw, - output_config, - update_ref, - gain_flag, - keep_files, - decoder_only, - get_mld=get_mld, - get_mld_lim=get_mld_lim, - pca=pca, - abs_tol=abs_tol, - get_ssnr=get_ssnr, - get_odg=get_odg, - ) - - -@pytest.mark.parametrize("ivas_br", ivas_br_FOA) + +@pytest.mark.parametrize("bitrate", ivas_br_FOA) @pytest.mark.parametrize("dtx", dtx_set) @pytest.mark.parametrize("tag", tag_list) -@pytest.mark.parametrize("fs", sample_rate_list) +@pytest.mark.parametrize("sampling_rate", sample_rate_list) @pytest.mark.parametrize("gain_flag", gain_list) -@pytest.mark.parametrize("SID", SID_list) +@pytest.mark.parametrize("sid", SID_list) def test_sba_enc_system( record_property, props_to_record, @@ -203,50 +209,59 @@ def test_sba_enc_system( br_switch_file_path, update_ref, keep_files, - ivas_br, + bitrate, dtx, tag, - fs, + sampling_rate, gain_flag, - SID, + sid, get_mld, get_mld_lim, + encoder_only, decoder_only, abs_tol, get_ssnr, get_odg, + get_enc_stats, ): - if dtx == "1" and ivas_br not in ["13200", "16400", "24400", "32000", "64000"]: + + plc_pattern = None + pca = False + max_bw = "FB" + sba_order = "+1" + output_config = "FOA" + cut_gain = "1.0" + plc_pattern = None + cut_testv = True + + if dtx == "1" and bitrate not in ["13200", "16400", "24400", "32000", "64000"]: # skip high bitrates for DTX until DTX issue is resolved pytest.skip() - if SID == 1: + if sid == 1: if ( - ivas_br not in ["13200", "16400", "64000"] - or fs == "16" + bitrate not in ["13200", "16400", "64000"] + or sampling_rate == "16" or gain_flag == 1 or dtx == "0" ): pytest.skip() else: - if ivas_br in ["13200", "16400"]: + if bitrate in ["13200", "16400"]: pytest.skip() - if ivas_br == "sw_24k4_256k.bin" and gain_flag != 1: + if bitrate == "sw_24k4_256k.bin" and gain_flag != 1: pytest.skip() - if gain_flag == 1 and ivas_br not in ["13200", "16400", "24400", "32000"]: + if gain_flag == 1 and bitrate not in ["13200", "16400", "24400", "32000"]: pytest.skip() if "ltv" in tag: - tag = f"ltv{fs}_FOA" + tag = f"ltv{sampling_rate}_FOA" cut_testv = False elif "stv" in tag: - tag = tag + fs + "c" + tag = tag + sampling_rate + "c" cut_testv = True else: - assert 0 + assert 0 - max_bw = "FB" - sba_order = "+1" - output_config = "FOA" if gain_flag == 1: cut_gain = "16.0" elif dtx == "1": @@ -255,8 +270,7 @@ def test_sba_enc_system( cut_gain = "1.0" if not decoder_only: - # enc - sba_enc( + ref_stats_file, dut_stats_file = sba_enc( dut_encoder_frontend, test_vector_path, ref_encoder_frontend, @@ -264,47 +278,87 @@ def test_sba_enc_system( dut_base_path, br_switch_file_path, tag, - fs, - ivas_br, + sampling_rate, + bitrate, dtx, - SID, + sid, max_bw, sba_order, update_ref, gain_flag, - keep_files, + keep_files=keep_files, cut_gain=cut_gain, cut_testv=cut_testv, + pca=pca, + plc_pattern=plc_pattern, + get_enc_stats=get_enc_stats, + ) + + if update_ref == 0 and get_enc_stats: + print("Comparing encoder files") + print("=======================\n") + + # check if both REF and DUT .stats file exist + if ref_stats_file is None or not os.path.exists(ref_stats_file): + pytest.fail("REF .stats file does not exist or has not been generated!") + if dut_stats_file is None or not os.path.exists(dut_stats_file): + pytest.fail("DUT .stats file does not exist or has not been generated!") + + # compare ref and dut .stats files + enc_test_result, enc_test_result_msg = cmp_stats_files( + ref_stats_file, + dut_stats_file, + min_enc_file_length_diff=MIN_ENC_FILE_LENGTH_DIFF, + min_enc_stats_diff=MIN_ENC_STATS_DIFF, + ) + + print("") + + # TODO: there is cleanup potential here: + # - move filename handling outside of the sba_enc/dec functions into start of this function + # - move file cleanup part out as well + # - move this comparison to the end and the one in sba_dec there as well + if encoder_only: + props = parse_properties( + enc_test_result_msg, enc_test_result != 0, props_to_record + ) + for k, v in props.items(): + record_property(k, v) + + # report compare result + if enc_test_result != 0: + pytest.fail(enc_test_result_msg) + + if not encoder_only: + sba_dec( + record_property, + props_to_record, + dut_decoder_frontend, + ref_decoder_frontend, + reference_path, + dut_base_path, + tag, + sampling_rate, + bitrate, + dtx, + sid, + max_bw, + output_config, + update_ref, + gain_flag, + keep_files, + decoder_only, + plc_pattern=plc_pattern, + get_mld=get_mld, + get_mld_lim=get_mld_lim, + pca=pca, + abs_tol=abs_tol, + get_ssnr=get_ssnr, + get_odg=get_odg, ) - # dec - sba_dec( - record_property, - props_to_record, - dut_decoder_frontend, - ref_decoder_frontend, - reference_path, - dut_base_path, - tag, - fs, - ivas_br, - dtx, - SID, - max_bw, - output_config, - update_ref, - gain_flag, - keep_files, - decoder_only, - get_mld=get_mld, - get_mld_lim=get_mld_lim, - abs_tol=abs_tol, - get_ssnr=get_ssnr, - get_odg=get_odg, - ) - - -@pytest.mark.parametrize("ivas_br", ivas_br_HOA2) + +@pytest.mark.parametrize("bitrate", ivas_br_HOA2) @pytest.mark.parametrize("tag", tag_list_HOA2) def test_spar_hoa2_enc_system( record_property, @@ -316,81 +370,129 @@ def test_spar_hoa2_enc_system( dut_base_path, ref_encoder_frontend, ref_decoder_frontend, + br_switch_file_path, update_ref, keep_files, - ivas_br, + bitrate, tag, get_mld, get_mld_lim, + encoder_only, decoder_only, abs_tol, get_ssnr, get_odg, + get_enc_stats, ): - fs = "48" + sampling_rate = "48" + pca = False + max_bw = "FB" + sba_order = "+2" + output_config = "HOA2" dtx = "0" + sid = 0 gain_flag = -1 + cut_gain = "1.0" + plc_pattern = None + cut_testv = False if "ltv" in tag: - tag = f"ltv{fs}_HOA2" + tag = f"ltv{sampling_rate}_HOA2" elif "stv" in tag: - tag = tag + fs + "c" + tag = tag + sampling_rate + "c" else: - assert 0 - - max_bw = "FB" - sba_order = "+2" - output_config = "HOA2" + assert 0 if not decoder_only: - # enc - sba_enc( + ref_stats_file, dut_stats_file = sba_enc( dut_encoder_frontend, test_vector_path, ref_encoder_frontend, reference_path, dut_base_path, - None, + br_switch_file_path, tag, - fs, - ivas_br, + sampling_rate, + bitrate, dtx, - None, + sid, max_bw, sba_order, update_ref, gain_flag, + keep_files=keep_files, + cut_gain=cut_gain, + cut_testv=cut_testv, + pca=pca, + plc_pattern=plc_pattern, + get_enc_stats=get_enc_stats, + ) + + if update_ref == 0 and get_enc_stats: + print("Comparing encoder files") + print("=======================\n") + + # check if both REF and DUT .stats file exist + if ref_stats_file is None or not os.path.exists(ref_stats_file): + pytest.fail("REF .stats file does not exist or has not been generated!") + if dut_stats_file is None or not os.path.exists(dut_stats_file): + pytest.fail("DUT .stats file does not exist or has not been generated!") + + # compare ref and dut .stats files + enc_test_result, enc_test_result_msg = cmp_stats_files( + ref_stats_file, + dut_stats_file, + min_enc_file_length_diff=MIN_ENC_FILE_LENGTH_DIFF, + min_enc_stats_diff=MIN_ENC_STATS_DIFF, + ) + + print("") + + # TODO: there is cleanup potential here: + # - move filename handling outside of the sba_enc/dec functions into start of this function + # - move file cleanup part out as well + # - move this comparison to the end and the one in sba_dec there as well + if encoder_only: + props = parse_properties( + enc_test_result_msg, enc_test_result != 0, props_to_record + ) + for k, v in props.items(): + record_property(k, v) + + # report compare result + if enc_test_result != 0: + pytest.fail(enc_test_result_msg) + + if not encoder_only: + sba_dec( + record_property, + props_to_record, + dut_decoder_frontend, + ref_decoder_frontend, + reference_path, + dut_base_path, + tag, + sampling_rate, + bitrate, + dtx, + sid, + max_bw, + output_config, + update_ref, + gain_flag, keep_files, + decoder_only, + plc_pattern=plc_pattern, + get_mld=get_mld, + get_mld_lim=get_mld_lim, + pca=pca, + abs_tol=abs_tol, + get_ssnr=get_ssnr, + get_odg=get_odg, ) - # dec - sba_dec( - record_property, - props_to_record, - dut_decoder_frontend, - ref_decoder_frontend, - reference_path, - dut_base_path, - tag, - fs, - ivas_br, - dtx, - None, - max_bw, - output_config, - update_ref, - gain_flag, - keep_files, - decoder_only, - get_mld=get_mld, - get_mld_lim=get_mld_lim, - abs_tol=abs_tol, - get_ssnr=get_ssnr, - get_odg=get_odg, - ) - - -@pytest.mark.parametrize("ivas_br", ivas_br_HOA3) + +@pytest.mark.parametrize("bitrate", ivas_br_HOA3) @pytest.mark.parametrize("tag", tag_list_HOA3) def test_spar_hoa3_enc_system( record_property, @@ -402,81 +504,123 @@ def test_spar_hoa3_enc_system( dut_base_path, ref_encoder_frontend, ref_decoder_frontend, + br_switch_file_path, update_ref, keep_files, - ivas_br, + bitrate, tag, get_mld, get_mld_lim, + encoder_only, decoder_only, abs_tol, get_ssnr, get_odg, + get_enc_stats, ): - fs = "48" + sampling_rate = "48" + pca = False + max_bw = "FB" + sba_order = "+3" + output_config = "HOA3" dtx = "0" + sid = 0 gain_flag = -1 + cut_gain = "1.0" + plc_pattern = None + cut_testv = False if "ltv" in tag: - tag = f"ltv{fs}_HOA3" + tag = f"ltv{sampling_rate}_HOA3" elif "stv" in tag: - tag = tag + fs + "c" + tag = tag + sampling_rate + "c" else: - assert 0 - - max_bw = "FB" - sba_order = "+3" - output_config = "HOA3" + assert 0 if not decoder_only: - # enc - sba_enc( + ref_stats_file, dut_stats_file = sba_enc( dut_encoder_frontend, test_vector_path, ref_encoder_frontend, reference_path, dut_base_path, - None, + br_switch_file_path, tag, - fs, - ivas_br, + sampling_rate, + bitrate, dtx, - None, + sid, max_bw, sba_order, update_ref, gain_flag, + keep_files=keep_files, + cut_gain=cut_gain, + cut_testv=cut_testv, + pca=pca, + plc_pattern=plc_pattern, + get_enc_stats=get_enc_stats, + ) + + if get_enc_stats: + print("Comparing encoder files") + print("=======================\n") + + # compare ref and dut .stats files + enc_test_result, enc_test_result_msg = cmp_stats_files( + ref_stats_file, + dut_stats_file, + min_enc_file_length_diff=MIN_ENC_FILE_LENGTH_DIFF, + min_enc_stats_diff=MIN_ENC_STATS_DIFF, + ) + + print("") + + # TODO: there is cleanup potential here: + # - move filename handling outside of the sba_enc/dec functions into start of this function + # - move file cleanup part out as well + # - move this comparison to the end and the one in sba_dec there as well + if encoder_only: + props = parse_properties( + enc_test_result_msg, enc_test_result != 0, props_to_record + ) + for k, v in props.items(): + record_property(k, v) + + # report compare result + if enc_test_result != 0: + pytest.fail(enc_test_result_msg) + + if not encoder_only: + sba_dec( + record_property, + props_to_record, + dut_decoder_frontend, + ref_decoder_frontend, + reference_path, + dut_base_path, + tag, + sampling_rate, + bitrate, + dtx, + sid, + max_bw, + output_config, + update_ref, + gain_flag, keep_files, + decoder_only, + plc_pattern=plc_pattern, + get_mld=get_mld, + get_mld_lim=get_mld_lim, + pca=pca, + abs_tol=abs_tol, + get_ssnr=get_ssnr, + get_odg=get_odg, ) - # dec - sba_dec( - record_property, - props_to_record, - dut_decoder_frontend, - ref_decoder_frontend, - reference_path, - dut_base_path, - tag, - fs, - ivas_br, - dtx, - None, - max_bw, - output_config, - update_ref, - gain_flag, - keep_files, - decoder_only, - get_mld=get_mld, - get_mld_lim=get_mld_lim, - abs_tol=abs_tol, - get_ssnr=get_ssnr, - get_odg=get_odg, - ) - - -@pytest.mark.parametrize("ivas_br", ivas_br_FOA) + +@pytest.mark.parametrize("bitrate", ivas_br_FOA) @pytest.mark.parametrize("dtx", dtx_set) @pytest.mark.parametrize("tag", tag_list_bw_force) @pytest.mark.parametrize("sample_rate_bw_idx", sample_rate_bw_idx_list) @@ -490,94 +634,145 @@ def test_sba_enc_BWforce_system( dut_base_path, ref_encoder_frontend, ref_decoder_frontend, + br_switch_file_path, update_ref, keep_files, - ivas_br, + bitrate, dtx, tag, sample_rate_bw_idx, get_mld, get_mld_lim, + encoder_only, decoder_only, abs_tol, get_ssnr, get_odg, + get_enc_stats, ): - if dtx == "1" and ivas_br not in ["32000", "64000"]: + sid = 0 + plc_pattern = None + pca = False + gain_flag = -1 + sba_order = "+1" + output_config = "FOA" + cut_gain = "1.0" + cut_testv = False + sampling_rate = sample_rate_bw_idx[0] + max_bw = sample_rate_bw_idx[1] + + if dtx == "1" and bitrate not in ["32000", "64000"]: # skip high bitrates for DTX until DTX issue is resolved pytest.skip() - if ivas_br == "13200" or ivas_br == "16400": + + if bitrate == "13200" or bitrate == "16400": pytest.skip() - if ivas_br == "sw_24k4_256k.bin": + elif bitrate == "sw_24k4_256k.bin": pytest.skip() - fs = sample_rate_bw_idx[0] - bw = sample_rate_bw_idx[1] + if "ltv" in tag: - tag = f"ltv{fs}_FOA" + tag = f"ltv{sampling_rate}_FOA" cut_testv = False elif "stv" in tag: - tag = tag + fs + "c" + tag = tag + sampling_rate + "c" cut_testv = True else: - assert 0 - gain_flag = -1 - sba_order = "+1" - output_config = "FOA" + assert 0 if not decoder_only: - # enc - sba_enc( + ref_stats_file, dut_stats_file = sba_enc( dut_encoder_frontend, test_vector_path, ref_encoder_frontend, reference_path, dut_base_path, - None, + br_switch_file_path, tag, - fs, - ivas_br, + sampling_rate, + bitrate, dtx, - None, - bw, + sid, + max_bw, sba_order, update_ref, gain_flag, - keep_files, + keep_files=keep_files, + cut_gain=cut_gain, cut_testv=cut_testv, + pca=pca, + plc_pattern=plc_pattern, + get_enc_stats=get_enc_stats, + ) + + if update_ref == 0 and get_enc_stats: + print("Comparing encoder files") + print("=======================\n") + + # check if both REF and DUT .stats file exist + if ref_stats_file is None or not os.path.exists(ref_stats_file): + pytest.fail("REF .stats file does not exist or has not been generated!") + if dut_stats_file is None or not os.path.exists(dut_stats_file): + pytest.fail("DUT .stats file does not exist or has not been generated!") + + # compare ref and dut .stats files + enc_test_result, enc_test_result_msg = cmp_stats_files( + ref_stats_file, + dut_stats_file, + min_enc_file_length_diff=MIN_ENC_FILE_LENGTH_DIFF, + min_enc_stats_diff=MIN_ENC_STATS_DIFF, + ) + + print("") + + # TODO: there is cleanup potential here: + # - move filename handling outside of the sba_enc/dec functions into start of this function + # - move file cleanup part out as well + # - move this comparison to the end and the one in sba_dec there as well + if encoder_only: + props = parse_properties( + enc_test_result_msg, enc_test_result != 0, props_to_record + ) + for k, v in props.items(): + record_property(k, v) + + # report compare result + if enc_test_result != 0: + pytest.fail(enc_test_result_msg) + + if not encoder_only: + sba_dec( + record_property, + props_to_record, + dut_decoder_frontend, + ref_decoder_frontend, + reference_path, + dut_base_path, + tag, + sampling_rate, + bitrate, + dtx, + sid, + max_bw, + output_config, + update_ref, + gain_flag, + keep_files, + decoder_only, + plc_pattern=plc_pattern, + get_mld=get_mld, + get_mld_lim=get_mld_lim, + pca=pca, + abs_tol=abs_tol, + get_ssnr=get_ssnr, + get_odg=get_odg, ) - # dec - sba_dec( - record_property, - props_to_record, - dut_decoder_frontend, - ref_decoder_frontend, - reference_path, - dut_base_path, - tag, - fs, - ivas_br, - dtx, - None, - bw, - output_config, - update_ref, - gain_flag, - keep_files, - decoder_only, - get_mld=get_mld, - get_mld_lim=get_mld_lim, - abs_tol=abs_tol, - get_ssnr=get_ssnr, - get_odg=get_odg, - ) - - -@pytest.mark.parametrize("ivas_br", ivas_br_plc) + +@pytest.mark.parametrize("bitrate", ivas_br_plc) @pytest.mark.parametrize("dtx", dtx_set) @pytest.mark.parametrize("tag", tag_list) @pytest.mark.parametrize("plc_pattern", plc_patterns) -@pytest.mark.parametrize("fs", sample_rate_list) +@pytest.mark.parametrize("sampling_rate", sample_rate_list) @pytest.mark.parametrize("gain_flag", gain_list) def test_sba_plc_system( record_property, @@ -592,46 +787,55 @@ def test_sba_plc_system( br_switch_file_path, update_ref, keep_files, - ivas_br, + bitrate, dtx, tag, plc_pattern, - fs, + sampling_rate, gain_flag, get_mld, get_mld_lim, + encoder_only, + decoder_only, abs_tol, get_ssnr, get_odg, + get_enc_stats, ): - SID = 0 - if dtx == "1" and ivas_br not in ["13200", "16400", "24400", "32000", "64000"]: + sid = 0 + pca = False + max_bw = "FB" + sba_order = "+1" + cut_testv = True + output_config = "FOA" + + if dtx == "1" and bitrate not in ["13200", "16400", "24400", "32000", "64000"]: # skip high bitrates for DTX until DTX issue is resolved pytest.skip() - if ivas_br == "13200" or ivas_br == "16400": + + if bitrate == "13200" or bitrate == "16400": if ( dtx == "1" and gain_flag == 0 - and fs != "16" + and sampling_rate != "16" and plc_pattern == "PLperc12mblen5" ): - SID = 1 + sid = 1 else: pytest.skip() - if gain_flag == 1 and ivas_br not in ["13200", "16400", "24400", "32000"]: + + if gain_flag == 1 and bitrate not in ["13200", "16400", "24400", "32000"]: pytest.skip() + if "ltv" in tag: - tag = f"ltv{fs}_FOA" + tag = f"ltv{sampling_rate}_FOA" cut_testv = False elif "stv" in tag: - tag = tag + fs + "c" + tag = tag + sampling_rate + "c" cut_testv = True else: - assert 0 + assert 0 - # added enc call - max_bw = "FB" - sba_order = "+1" if gain_flag == 1: cut_gain = "16.0" elif dtx == "1": @@ -639,60 +843,65 @@ def test_sba_plc_system( else: cut_gain = "1.0" - sba_enc( - dut_encoder_frontend, - test_vector_path, - ref_encoder_frontend, - reference_path, - dut_base_path, - br_switch_file_path, - tag, - fs, - ivas_br, - dtx, - SID, - max_bw, - sba_order, - update_ref, - gain_flag, - keep_files, - cut_gain=cut_gain, - cut_testv=cut_testv, - plc_pattern=plc_pattern, - ) - - # dec - sba_dec( - record_property, - props_to_record, - dut_decoder_frontend, - ref_decoder_frontend, - reference_path, - dut_base_path, - tag, - fs, - ivas_br, - dtx, - SID, - "FB", - "FOA", - update_ref, - gain_flag, - keep_files, - False, - plc_pattern=plc_pattern, - get_mld=get_mld, - get_mld_lim=get_mld_lim, - abs_tol=abs_tol, - get_ssnr=get_ssnr, - get_odg=get_odg, - ) + if not decoder_only: + sba_enc( + dut_encoder_frontend, + test_vector_path, + ref_encoder_frontend, + reference_path, + dut_base_path, + br_switch_file_path, + tag, + sampling_rate, + bitrate, + dtx, + sid, + max_bw, + sba_order, + update_ref, + gain_flag, + keep_files=keep_files, + cut_gain=cut_gain, + cut_testv=cut_testv, + pca=pca, + plc_pattern=plc_pattern, + get_enc_stats=get_enc_stats, + ) + + if not encoder_only: + sba_dec( + record_property, + props_to_record, + dut_decoder_frontend, + ref_decoder_frontend, + reference_path, + dut_base_path, + tag, + sampling_rate, + bitrate, + dtx, + sid, + max_bw, + output_config, + update_ref, + gain_flag, + keep_files, + decoder_only, + plc_pattern=plc_pattern, + get_mld=get_mld, + get_mld_lim=get_mld_lim, + pca=pca, + abs_tol=abs_tol, + get_ssnr=get_ssnr, + get_odg=get_odg, + ) ######################################################### -# -------------------- test function -------------------- + + def sba_enc( - encoder_frontend, + dut_encoder_frontend, test_vector_path, ref_encoder_frontend, reference_path, @@ -700,10 +909,10 @@ def sba_enc( br_switch_file_path, tag, sampling_rate, - ivas_br, + bitrate, dtx, - SID, - ivas_max_bw, + sid, + max_bw, sba_order, update_ref, gain_flag, @@ -712,81 +921,92 @@ def sba_enc( cut_testv=False, pca=False, plc_pattern=None, + get_enc_stats=False, ): - # ------------ run cmd ------------ - dut_out_dir = f"{dut_base_path}/sba_bs/pkt" - ref_out_dir = f"{reference_path}/sba_bs/pkt" - check_and_makedir(dut_out_dir) - check_and_makedir(ref_out_dir) + input_path = f"{test_vector_path}/{tag}.wav" + dtx_mode = dtx == "1" - in_extension = ".wav" + dut_pkt_dir = f"{dut_base_path}/sba_bs/pkt" + ref_pkt_dir = f"{reference_path}/sba_bs/pkt" + + check_and_makedir(dut_pkt_dir) + check_and_makedir(ref_pkt_dir) + + tag_out = tag - tag_in = tag # sampling rate to BW mapping bw_idx = dict_fsample_bw[sampling_rate] - if int(dict_bw_idx[ivas_max_bw]) < int(bw_idx): - tag = tag + dict_bw_tag[ivas_max_bw] + if int(dict_bw_idx[max_bw]) < int(bw_idx): + tag_out += dict_bw_tag[max_bw] - tag_out = f"{tag}_ivasbr{ivas_br[:-3]}k_DTX{dtx}" - if ivas_br == "sw_24k4_256k.bin": - ivas_br = f"{br_switch_file_path}/sw_24k4_256k.bin" + tag_out += f"_ivasbr{bitrate[:-3]}k_DTX{dtx}" + if bitrate == "sw_24k4_256k.bin": + bitrate = f"{br_switch_file_path}/sw_24k4_256k.bin" # to avoid conflicting names in case of parallel test execution, differentiate all cases - tag_ext = "" if pca: - tag_ext += "_pca" + tag_out += "_pca" if gain_flag != -1: - tag_ext += f"_Gain{gain_flag}" - if SID == 1: - tag_ext += "_SID" + tag_out += f"_Gain{gain_flag}" + if sid == 1: + tag_out += "_SID" if plc_pattern is not None: - tag_ext += "_" + plc_pattern - dut_pkt_file = f"{dut_out_dir}/{tag_out}{tag_ext}.192" - ref_pkt_file = f"{ref_out_dir}/{tag_out}{tag_ext}.192" - if SID == 1: - dut_pkt_file_cut = f"{dut_out_dir}/{tag_out}{tag_ext}_cut.192" - ref_pkt_file_cut = f"{ref_out_dir}/{tag_out}{tag_ext}_cut.192" - input_path = f"{test_vector_path}/{tag_in}{in_extension}" - dtx_mode = dtx == "1" + tag_out += "_" + plc_pattern + + dut_pkt_file = f"{dut_pkt_dir}/{tag_out}.192" + ref_pkt_file = f"{ref_pkt_dir}/{tag_out}.192" if cut_testv: # use shortened and potentially gain adjusted input PCM file - create if not present # cut input PCM file: currently with mostly fixed (i.e. not test dependant) values if cut_gain == "1.0": - cut_file = f"{test_vector_path}/{tag_in}_cut{in_extension}" + cut_file = f"{test_vector_path}/{tag}_cut.wav" else: - cut_file = f"{test_vector_path}/{tag_in}_cut_{cut_gain}{in_extension}" + cut_file = f"{test_vector_path}/{tag}_cut_{cut_gain}.wav" input_path = cut_file + if get_enc_stats: + ref_stats_file = f"{ref_pkt_dir}/{tag_out}.stats" + dut_stats_file = f"{dut_pkt_dir}/{tag_out}.stats" + else: + ref_stats_file = None + dut_stats_file = None + if update_ref == 1: # call REF encoder ref_encoder_frontend.run( - ivas_br, + bitrate, sampling_rate, input_path, ref_pkt_file, sba_order=sba_order, - max_band=ivas_max_bw, + max_band=max_bw, pca=pca, dtx_mode=dtx_mode, + stats_file=ref_stats_file, ) if update_ref == 0: # call DUT encoder - encoder_frontend.run( - ivas_br, + dut_encoder_frontend.run( + bitrate, sampling_rate, input_path, dut_pkt_file, sba_order=sba_order, - max_band=ivas_max_bw, + max_band=max_bw, pca=pca, dtx_mode=dtx_mode, + stats_file=dut_stats_file, ) - if SID == 1: + if sid == 1: + # cut .pkt files such that they start with SID frame + dut_pkt_file_cut = f"{dut_pkt_dir}/{tag_out}_cut.192" + ref_pkt_file_cut = f"{ref_pkt_dir}/{tag_out}_cut.192" + if update_ref == 1: with open(ref_pkt_file, "rb") as fp_in: with open(ref_pkt_file_cut, "wb") as fp_out: @@ -800,20 +1020,22 @@ def sba_enc( if not keep_files: os.remove(dut_pkt_file) + return ref_stats_file, dut_stats_file + def sba_dec( record_property, props_to_record, - decoder_frontend, + dut_decoder_frontend, ref_decoder_frontend, reference_path, dut_base_path, tag, sampling_rate, - ivas_br, + bitrate, dtx, - SID, - ivas_max_bw, + sid, + max_bw, output_config, update_ref, gain_flag, @@ -827,74 +1049,78 @@ def sba_dec( get_ssnr=False, get_odg=False, ): - # -------- run cmd ------------ + dut_pkt_dir = f"{dut_base_path}/sba_bs/pkt" + ref_pkt_dir = f"{reference_path}/sba_bs/pkt" + dut_out_dir = f"{dut_base_path}/sba_bs/syn" + ref_out_dir = f"{reference_path}/sba_bs/syn" + + check_and_makedir(dut_out_dir) + check_and_makedir(ref_out_dir) + + tag_in = tag + tag_out = tag + # sampling rate to BW mapping bw_idx = dict_fsample_bw[sampling_rate] - if int(dict_bw_idx[ivas_max_bw]) < int(bw_idx): - tag = tag + dict_bw_tag[ivas_max_bw] + if int(dict_bw_idx[max_bw]) < int(bw_idx): + tag_out += dict_bw_tag[max_bw] - tag_out = f"{tag}_ivasbr{ivas_br[:-3]}k_DTX{dtx}" + tag_out += f"_ivasbr{bitrate[:-3]}k_DTX{dtx}" # to avoid conflicting names in case of parallel test execution, differentiate all cases - tag_ext = "" if pca: - tag_ext += "_pca" + tag_out += "_pca" if gain_flag != -1: - tag_ext += f"_Gain{gain_flag}" - if SID == 1: - tag_ext += "_SID" + tag_out += f"_Gain{gain_flag}" + if sid == 1: + tag_out += "_SID" if plc_pattern is not None: - tag_ext += "_" + plc_pattern - if SID == 1: - tag_ext += "_cut" - - dut_out_dir = f"{dut_base_path}/sba_bs/raw" - ref_out_dir = f"{reference_path}/sba_bs/raw" - - dut_in_pkt = f"{dut_base_path}/sba_bs/pkt/{tag_out}{tag_ext}.192" - ref_in_pkt = f"{reference_path}/sba_bs/pkt/{tag_out}{tag_ext}.192" + tag_out += "_" + plc_pattern plc_file = None if plc_pattern is not None: plc_file = f"{TESTV_DIR}/{plc_pattern}.g192" - dut_out_raw = f"{dut_out_dir}/{tag_out}{tag_ext}.wav" - ref_out_raw = f"{ref_out_dir}/{tag_out}{tag_ext}.wav" + if sid == 1: + dut_pkt_file = f"{dut_pkt_dir}/{tag_out}_cut.192" + ref_pkt_file = f"{ref_pkt_dir}/{tag_out}_cut.192" + else: + dut_pkt_file = f"{dut_pkt_dir}/{tag_out}.192" + ref_pkt_file = f"{ref_pkt_dir}/{tag_out}.192" - check_and_makedir(dut_out_dir) - check_and_makedir(ref_out_dir) + dut_out_file = f"{dut_out_dir}/{tag_out}.wav" + ref_out_file = f"{ref_out_dir}/{tag_out}.wav" if update_ref == 1: # call REF decoder ref_decoder_frontend.run( output_config, sampling_rate, - ref_in_pkt, - ref_out_raw, + ref_pkt_file, + ref_out_file, plc_file=plc_file, ) if update_ref == 0: - if plc_file is not None: - dut_in_pkt = dut_in_pkt - elif decoder_only: - dut_in_pkt = ref_in_pkt + if decoder_only: + # DUT .pkt file has not been generated -> use REF .pkt file instead + dut_pkt_file = ref_pkt_file # call DUT decoder - decoder_frontend.run( + dut_decoder_frontend.run( output_config, sampling_rate, - dut_in_pkt, - dut_out_raw, + dut_pkt_file, + dut_out_file, plc_file=plc_file, ) - fs = int(sampling_rate) * 1000 + sampling_rate_Hz = int(sampling_rate) * 1000 cmp_result, reason = cmp_pcm( - dut_out_raw, - ref_out_raw, + dut_out_file, + ref_out_file, output_config, - fs, + sampling_rate_Hz, get_mld=get_mld, mld_lim=get_mld_lim, abs_tol=abs_tol, @@ -902,16 +1128,16 @@ def sba_dec( get_odg=get_odg, ) - props = parse_properties(reason, cmp_result != 0, props_to_record) + text_to_parse = reason + props = parse_properties(text_to_parse, cmp_result != 0, props_to_record) for k, v in props.items(): record_property(k, v) # report compare result if cmp_result != 0: - pytest.fail(reason) + pytest.fail(text_to_parse) # remove DUT output files when test result is OK (to save disk space) - if not keep_files: - os.remove(dut_out_raw) - if not decoder_only and plc_file is None: - os.remove(dut_in_pkt) + # if not keep_files: + # os.remove(dut_out_file) + # os.remove(dut_pkt_file) diff --git a/tests/conftest.py b/tests/conftest.py index 06658dd7e195a1ca5686ad75f10902d70cf62dce..636881ecefa58e5394eb00afb7024e76b9347ebe 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -35,23 +35,37 @@ Pytest customization (configuration and fixtures) for the IVAS codec test suite. import logging import os import re +import json from tests import testconfig import pytest import platform import textwrap from pathlib import Path from subprocess import TimeoutExpired, run +import tempfile from typing import Optional, Union -from .constants import MLD_PATTERN, MAX_DIFF_PATTERN, SSNR_PATTERN, ODG_PATTERN +import numpy as np +from .constants import ( + MLD_PATTERN, + MAX_DIFF_PATTERN, + SSNR_PATTERN, + ENC_AUX_FILES, + ODG_PATTERN, + MLD, + MAX_ABS_DIFF, + SSNR, + ODG, + MAX_ENC_DIFF, + MAX_ENC_DIFF_PATTERN, + SCRIPTS_DIR, +) logger = logging.getLogger(__name__) USE_LOGGER_FOR_DBG = False # current tests do not make use of the logger feature -HERE = Path(__file__).parent -SCRIPTS_DIR = str(HERE.parent.joinpath("scripts").absolute()) import sys -sys.path.append(SCRIPTS_DIR) +sys.path.append(str(SCRIPTS_DIR)) import prepare_combined_format_inputs @@ -180,6 +194,12 @@ def pytest_addoption(parser): help="Compute Segmental SNR (SSNR) between ref and dut output instead of just comparing for bitexactness", ) + parser.addoption( + "--enc_stats", + action="store_true", + help="Activate logging and comparison of statistics from the encoder.", + ) + parser.addoption( "--odg", action="store_true", @@ -219,6 +239,13 @@ def pytest_addoption(parser): default=20, ) + parser.addoption( + "--encoder_only", + help="Only run encoder parts of tests in 'codec_be_on_mr_nonselection'.", + action="store_true", + default=False, + ) + parser.addoption( "--decoder_only", help="Only run decoder parts of tests in 'codec_be_on_mr_nonselection'. Use ref encoder output bitstreams.", @@ -278,6 +305,14 @@ def get_ssnr(request): return request.config.option.ssnr +@pytest.fixture(scope="session", autouse=True) +def get_enc_stats(request) -> bool: + """ + Return value of cmdl param --get_enc_stats + """ + return request.config.getoption("--enc_stats") + + @pytest.fixture(scope="session", autouse=True) def get_odg(request): """ @@ -338,16 +373,90 @@ def dut_encoder_path(request) -> str: class EncoderFrontend: def __init__(self, path, enc_type, timeout=None) -> None: - self._path = path + self._path = Path(path).absolute() self._type = enc_type self.returncode = None self.stdout = None self.stderr = None self.timeout = timeout + def extract_enc_stats( + self, + dbg_tweak_folder, + stats_file, + sampling_rate, + ): + """ + Extract statistics from auxiliary encoder files generated by running the encoder with DEBUG_MODE_INFO. + Write the statistics to a text file + """ + + hist_dicts = [] + + if not os.path.exists(dbg_tweak_folder): + print(f"No statistics have been extracted from the res/ folder to the {stats_file} file!") + else: + for f in ENC_AUX_FILES: + filename = f[0] + dtype = f[1] + fs = int(sampling_rate) * 1000 + if isinstance(f[2], str): + nsamples_per_frame = np.int16(eval(f[2])) + else: + nsamples_per_frame = np.int16(f[2]) + + # aux_files = glob.glob(os.path.join(dbg_tweak_folder, filename + '\.*')) + aux_files = [ + f + for f in os.listdir(dbg_tweak_folder) + if re.search(rf"^{filename}(\..*)?$", f) + ] + # aux_files = [os.path.basename(f) for f in aux_files] + + for aux_file in aux_files: + # extract statistics from the aux file based on histogram of values + print( + f"Extracting statistics from {os.path.basename(aux_file)} ... ", + end="", + ) + + # read the aux file + with open(os.path.join(dbg_tweak_folder, aux_file), "r") as f_aux: + data = np.fromfile(f_aux, dtype=dtype) + + # get file length + data_len = data.shape[0] + + # remove the duplicates of each value per frame + if nsamples_per_frame > 1: + data = data[::nsamples_per_frame] + + # calculate histogram from data + unique_values = np.sort(np.unique(data)) + hist, _ = np.histogram( + data, bins=np.append(unique_values, unique_values[-1] + 10) + ) + + # convert to dict and sort by absolute value of difference + hist_dict = {"name": os.path.basename(aux_file), "length": data_len} + dict_values = { + str(unique_values[i]): str(hist[i]) + for i in range(len(unique_values)) + } + hist_dict.update(dict_values) + hist_dicts.append(hist_dict) + + print("DONE") + + print("") + + with open(stats_file, "w") as f_stats: + # append the statistics to the output file in text format + f_stats.write(json.dumps(hist_dicts, indent=2)) + def run( self, - bitrate: int, + bitrate: Union[int, Path], input_sampling_rate: int, input_path: Path, output_bitstream_path: Path, @@ -358,8 +467,9 @@ class EncoderFrontend: quiet_mode: Optional[bool] = True, add_option_list: Optional[list] = None, run_dir: Optional[Path] = None, + stats_file: Optional[Path] = None, ) -> None: - command = [self._path] + command = [str(self._path)] # add optional parameters if sba_order is not None: @@ -392,13 +502,25 @@ class EncoderFrontend: log_dbg_msg(f"{self._type} encoder command:\n{cmd_str}") try: - result = run( - command, - capture_output=True, - check=False, - timeout=self.timeout, - cwd=run_dir, - ) + with tempfile.TemporaryDirectory() as tmp_dir: + if run_dir is None: + cwd = Path(tmp_dir).absolute() + else: + cwd = Path(run_dir).absolute() + + result = run( + command, + capture_output=True, + check=False, + timeout=self.timeout, + cwd=cwd, + ) + + if stats_file is not None: + self.extract_enc_stats( + cwd.joinpath("res"), stats_file, input_sampling_rate + ) + except TimeoutExpired: pytest.fail(f"{self._type} encoder run timed out after {self.timeout}s.") @@ -519,7 +641,7 @@ def dut_decoder_path(request) -> str: class DecoderFrontend: def __init__(self, path, dec_type, timeout=None, fr=20) -> None: - self._path = path + self._path = str(Path(path).absolute()) self._type = dec_type self.returncode = None self.stdout = None @@ -551,18 +673,18 @@ class DecoderFrontend: system = platform.system() if system == "Windows": - eid_path = "./scripts/tools/Win32/eid-xor.exe" + eid_path = SCRIPTS_DIR.joinpath("tools/Win32/eid-xor.exe") elif system == "Linux": - eid_path = "./scripts/tools/Linux/eid-xor" + eid_path = SCRIPTS_DIR.joinpath("tools/Linux/eid-xor") elif system == "Darwin": - eid_path = "./scripts/tools/Darwin/eid-xor" + eid_path = SCRIPTS_DIR.joinpath("tools/Darwin/eid-xor") else: raise ValueError(f'Wrong system "{system}"!') - if not os.path.isfile(eid_path): + if not eid_path.exists(): raise FileNotFoundError(f"eid-xor binary {eid_path} not found!\n") - eid_command = [eid_path] + eid_command = [str(eid_path)] eid_command.extend(["-fer", "-vbr", "-bs", "g192", "-ep", "g192"]) eid_output_suffix = ".fer" eid_command += [ @@ -573,7 +695,12 @@ class DecoderFrontend: try: if not os.path.exists(str(input_bitstream_path) + eid_output_suffix): - result = run(eid_command, check=True, cwd=run_dir) + with tempfile.TemporaryDirectory() as tmp_dir: + if run_dir is None: + cwd = Path(tmp_dir).absolute() + else: + cwd = Path(run_dir).absolute() + result = run(eid_command, check=True, cwd=cwd) except Exception as e: pytest.fail(f"eid-xor operation failed! - {e}") @@ -584,11 +711,13 @@ class DecoderFrontend: # TODO: centralize this in a utils file if system == "Windows": - netsim_path = "./scripts/tools/Win32/networkSimulator_g192.exe" + netsim_path = SCRIPTS_DIR.joinpath( + "tools/Win32/networkSimulator_g192.exe" + ) elif system == "Linux": - netsim_path = "./scripts/tools/Linux/networkSimulator_g192" + netsim_path = SCRIPTS_DIR.joinpath("tools/Linux/networkSimulator_g192") elif system == "Darwin": - netsim_path = "./scripts/tools/Darwin/networkSimulator_g192" + netsim_path = SCRIPTS_DIR.joinpath("tools/Darwin/networkSimulator_g192") else: raise ValueError(f'Wrong system "{system}"!') @@ -609,7 +738,12 @@ class DecoderFrontend: ] print(netsim_command) try: - run(netsim_command, check=True, cwd=run_dir) + with tempfile.TemporaryDirectory() as tmp_dir: + if run_dir is None: + cwd = Path(tmp_dir).absolute() + else: + cwd = Path(run_dir).absolute() + run(netsim_command, check=True, cwd=cwd) except Exception as e: pytest.fail(f"netsim operation failed! - {e}") @@ -637,13 +771,18 @@ class DecoderFrontend: log_dbg_msg(f"{self._type} decoder command:\n{cmd_str}") try: - result = run( - command, - capture_output=True, - check=False, - timeout=self.timeout, - cwd=run_dir, - ) + with tempfile.TemporaryDirectory() as tmp_dir: + if run_dir is None: + cwd = Path(tmp_dir).absolute() + else: + cwd = Path(run_dir).absolute() + result = run( + command, + capture_output=True, + check=False, + timeout=self.timeout, + cwd=cwd, + ) except TimeoutExpired: pytest.fail(f"{self._type} decoder run timed out after {self.timeout}s.") @@ -822,6 +961,14 @@ def decoder_only(request) -> bool: return request.config.getoption("--decoder_only") +@pytest.fixture(scope="session", autouse=True) +def encoder_only(request) -> bool: + """ + Return value of cmdl param --encoder_only + """ + return request.config.getoption("--encoder_only") + + def pytest_configure(config): config.addinivalue_line("markers", "serial: mark test to run only in serial") if config.option.param_file: @@ -841,14 +988,22 @@ def pytest_configure(config): @pytest.fixture(scope="session") -def props_to_record(request, get_mld, get_ssnr, get_odg) -> str: - props = ["MAXIMUM ABS DIFF"] - if get_mld: - props.append("MLD") - if get_ssnr: - props.append("SSNR") - if get_odg: - props.append("ODG") +def props_to_record( + request, get_mld, get_ssnr, get_odg, get_enc_stats, encoder_only +) -> str: + props = [] + + if get_enc_stats: + props.append(MAX_ENC_DIFF) + + if not encoder_only: + props.append(MAX_ABS_DIFF) + if get_mld: + props.append(MLD) + if get_ssnr: + props.append(SSNR) + if get_odg: + props.append(ODG) return props @@ -861,10 +1016,10 @@ def parse_properties(text_to_parse: str, output_differs: bool, props_to_record: props = dict() for prop in props_to_record: - if prop == "MLD": + if prop == MLD: mld = float(re.search(MLD_PATTERN, text_to_parse).groups(1)[0]) props[prop] = mld - elif prop == "MAXIMUM ABS DIFF": + elif prop == MAX_ABS_DIFF: max_diff = 0 if output_differs: if (match := re.search(MAX_DIFF_PATTERN, text_to_parse)) is not None: @@ -872,18 +1027,26 @@ def parse_properties(text_to_parse: str, output_differs: bool, props_to_record: else: raise MaxDiffPatternNotFound() props[prop] = max_diff - elif prop == "SSNR": + elif prop == SSNR: ssnrs = re.findall(SSNR_PATTERN, text_to_parse) min_ssnr = min(ssnrs) min_ssnr_channel = ssnrs.index(min_ssnr) props["MIN_SSNR"] = min_ssnr props["MIN_SSNR_CHANNEL"] = min_ssnr_channel - elif prop == "ODG": + elif prop == ODG: odgs = re.findall(ODG_PATTERN, text_to_parse) min_odg = min(odgs) min_odg_channel = odgs.index(min_odg) props["MIN_ODG"] = min_odg props["MIN_ODG_CHANNEL"] = min_odg_channel + elif prop == MAX_ENC_DIFF: + search_result = re.search(MAX_ENC_DIFF_PATTERN, text_to_parse) + max_enc_diff = 0 + if search_result: + max_enc_diff, max_enc_diff_ratio = search_result.groups(0) + if max_enc_diff: + max_enc_diff = float(max_enc_diff) + props[MAX_ENC_DIFF] = max_enc_diff return props diff --git a/tests/constants.py b/tests/constants.py index 2f1746e76b33953a0db82cb47cdbfc313d077cce..c2e84e50b71834715df52eaebdfac22c1b15d838 100644 --- a/tests/constants.py +++ b/tests/constants.py @@ -1,12 +1,57 @@ import pathlib +import numpy as np HERE = pathlib.Path(__file__).parent.absolute() SCRIPTS_DIR = HERE.parent.joinpath("scripts") TESTV_DIR = SCRIPTS_DIR.joinpath("testv") -# regex patterns for parsing the output from cmp_pcm -> mainly for BASOP ci + +# Properties to record +MLD = "MLD" +MAX_ABS_DIFF = "MAXIMUM ABS DIFF" +SSNR = "SSNR" +ODG = "ODG" +MAX_ENC_DIFF = "MAXIMUM ENC DIFF" + + +# regex patterns for parsing the output from comparisons -> mainly for BASOP ci MLD_PATTERN = r"MLD: ([\d\.]*)" MAX_DIFF_PATTERN = r"MAXIMUM ABS DIFF: (\d*)" ODG_PATTERN_PQEVALAUDIO = r"Objective Difference Grade: (-*\d*\.\d*)" ODG_PATTERN = r"ODG: (-*\d*\.\d*)" SSNR_PATTERN = r"Channel \d* SSNR: (nan|[+-]*inf|[-*\d\.]*)" +MAX_ENC_DIFF_PATTERN = r"MAXIMUM ENC DIFF: (\d+) \((\d+\.\d+)%\)" + + +# minimum difference between ref and dut encoder file lengths +MIN_ENC_FILE_LENGTH_DIFF = 0.1 +# minimum difference between the statistics of ref and dut encoder files +MIN_ENC_STATS_DIFF = 0.1 + + +# list of encoder filename patterns with their data type and number of samples per frame +# note: instead of specifying the number of samples per frame, you can use a formula incl. 'fs', e.g. 'fs/50' +ENC_AUX_FILES = [ + ["bits_nominal", np.int16, "fs/50"], + ["bwidth", np.int16, "fs/50"], + ["clas", np.int16, "fs/50"], + ["cng_type", np.int16, "fs/50"], + ["coder_type", np.int16, "fs/50"], + ["core", np.int16, "fs/50"], + ["core_brate", np.float32, "fs/50"], + ["count_SWB", np.int16, "fs/50"], + ["count_WB", np.int16, "fs/50"], + ["element_brate", np.float32, "fs/50"], + ["element_mode", np.int16, "fs/50"], + ["extl", np.int16, "fs/50"], + ["extl_brate", np.float32, "fs/50"], + ["ivas_total_brate", np.float32, "fs/50"], + ["L_frame", np.int16, "fs/50"], + ["localVAD", np.int16, "fs/50"], + ["sp_aud_decision0", np.int16, "fs/50"], + ["sp_aud_decision1", np.int16, "fs/50"], + ["sp_aud_decision2", np.int16, "fs/50"], + ["tdm_LRTD_flag", np.int16, "fs/50"], + ["total_brate", np.float32, "fs/50"], + ["vad_flag", np.int16, "fs/50"], +]