Loading tests/codec_be_on_mr_nonselection/test_param_file.py +257 −67 Original line number Diff line number Diff line Loading @@ -44,12 +44,13 @@ import glob import tempfile import numpy as np import shutil import json from tests.cmp_pcm import cmp_pcm from tests.cmp_bin_files import cmp_bin_files from tests.conftest import DecoderFrontend, EncoderFrontend from tests.testconfig import PARAM_FILE from ..constants import MLD_PATTERN, MAX_DIFF_PATTERN, ENC_AUX_FILES, MAX_ENC_DIFF_PATTERN from ..constants import MLD_PATTERN, MAX_DIFF_PATTERN, ENC_AUX_FILES, MAX_ENC_DIFF_PATTERN, MIN_ENC_AUX_FILE_DIFF_THR, MIN_ENC_AUX_FILE_LENGTH_DIFF import pdb Loading Loading @@ -129,11 +130,20 @@ def convert_test_string_to_tag(test_string): tag_str = "_".join(tag_str.split("__")) return tag_str def num(s): """ Convert string either to integer or float """ try: return int(s) except ValueError: return float(s) @pytest.mark.create_ref @pytest.mark.parametrize("test_tag", list(param_file_test_dict.keys())) def test_param_file_tests( record_property, encoder_only, decoder_only, dut_encoder_frontend: EncoderFrontend, dut_decoder_frontend: DecoderFrontend, Loading Loading @@ -184,15 +194,6 @@ def test_param_file_tests( bitstream_file = f"{testv_base}_{tag_str}.192" if not decoder_only: if update_ref == 2: # generate temp subdir for encoder output files ref_enc_res_dir = os.path.join(os.path.split(ref_encoder_frontend._path)[0], 'res') check_and_makedir(ref_enc_res_dir) ref_enc_dbg_dir = tempfile.TemporaryDirectory(dir=ref_enc_res_dir) dut_enc_res_dir = os.path.join(os.path.split(dut_encoder_frontend._path)[0], 'res') check_and_makedir(dut_enc_res_dir) dut_enc_dbg_dir = tempfile.TemporaryDirectory(dir=dut_enc_res_dir) encode( dut_encoder_frontend, Loading @@ -205,71 +206,174 @@ def test_param_file_tests( bitstream_file, enc_split, update_ref, ref_enc_dbg_dir = ref_enc_dbg_dir.name, dut_enc_dbg_dir = dut_enc_dbg_dir.name, encoder_only, ) # compare binary files extracted from the encoder if update_ref == 2: if encoder_only: print ("Comparing encoder auxiliary files") print ("=================================\n") msg = "" max_enc_diff = 0 ref_out_dir = f"{reference_path}/param_file/enc" stats_file = bitstream_file.replace(".192", ".stats") ref_stats_file = f"{reference_path}/param_file/enc/{stats_file}" dut_stats_file = f"{dut_base_path}/param_file/enc/{stats_file}" # pdb.set_trace() # open and read the .stats file with open(ref_stats_file, "r") as f_aux: ref_stats = json.load(f_aux) # create dictionary to map "name" to the corresponding dictionaries ref_stats_names = {d["name"]: d for d in ref_stats} with open(dut_stats_file, "r") as f_aux: dut_stats = json.load(f_aux) # create dictionary to map "name" to the corresponding dictionaries dut_stats_names = {d["name"]: d for d in dut_stats} # loop over all common aux files enc_test_result = 0 for f in ENC_AUX_FILES: filename = f[0] dtype = f[1] fs = int(sampling_rate) * 1000 if isinstance(f[2], str): nsamples_per_frame = np.int16(eval(f[2])) enc_test_result_msg = "" max_enc_diff = 0 for name in ref_stats_names: if name in dut_stats_names: # retrieve the dictionaries ref_stats_dict = ref_stats_names[name] dut_stats_dict = dut_stats_names[name] # ref_stats_dict = ref_stats_names["core_brate"] # dut_stats_dict = dut_stats_names["total_brate"] msg = f"File {name}" # compare the file lengths result_len_check = 0 file_length = max(ref_stats_dict["length"], dut_stats_dict["length"]) if ref_stats_dict["length"] != dut_stats_dict["length"]: msg += f" has different length between Ref {ref_stats_dict['length']} and DuT {dut_stats_dict['length']}" # check if threshold has been exceeded if abs(ref_stats_dict["length"] - dut_stats_dict["length"]) / file_length > MIN_ENC_AUX_FILE_LENGTH_DIFF: result_len_check = 1 msg += f", " # remove the "name" and "length" keys for further processing del ref_stats_dict["name"] del dut_stats_dict["name"] del ref_stats_dict["length"] del dut_stats_dict["length"] # convert keys and values from string to float ref_hist = {num(i) : num(j) for i,j in ref_stats_dict.items()} cut_hist = {num(i) : num(j) for i,j in dut_stats_dict.items()} delta_ref = set(cut_hist) - set(ref_hist) delta_cut = set(ref_hist) - set(cut_hist) # append missing keys for item in delta_cut: cut_hist[item] = 0 for item in delta_ref: ref_hist[item] = 0 ref_hist = dict(sorted(ref_hist.items())) cut_hist = dict(sorted(cut_hist.items())) # caculate difference of statistics diff_hist = { k : cut_hist[k] - ref_hist[k] for k in ref_hist.keys()} # calculate the total number of differences total_num_diff = sum(np.abs(list(diff_hist.values()))) total_num_diff_ratio = total_num_diff / (sum(ref_hist.values()) + sum(cut_hist.values())) msg += f"the total number of differences is {total_num_diff} ({(total_num_diff_ratio*100):.2f}%)" if total_num_diff_ratio > MIN_ENC_AUX_FILE_DIFF_THR: result_diff_check = 1 msg += "! " else: nsamples_per_frame = np.int16(f[2]) result_diff_check = 0 msg += ". " ref_aux_files = glob.glob(os.path.join(ref_enc_dbg_dir.name, filename + '*')) ref_aux_files = [os.path.basename(f) for f in ref_aux_files] dut_aux_files = glob.glob(os.path.join(dut_enc_dbg_dir.name, filename + '*')) dut_aux_files = [os.path.basename(f) for f in dut_aux_files] common_aux_files = [f for f in ref_aux_files if f in dut_aux_files] for aux_file in common_aux_files: # compare the contents of each binary file extracted from the encoder based on histogram output_differs, reason = cmp_bin_files( os.path.join(ref_enc_dbg_dir.name, aux_file), os.path.join(dut_enc_dbg_dir.name, aux_file), dtype=dtype, nsamples_per_frame=nsamples_per_frame, len_check=1, min_diff_thr=0.1, ) # check if the maximum difference has been exceeded if total_num_diff_ratio > max_enc_diff: max_enc_diff = total_num_diff_ratio if output_differs: # update test result if result_len_check or result_diff_check: enc_test_result = 1 if msg: msg += ", " msg += reason enc_test_result_msg += msg # check if the maximum difference has been exceeded search_result = re.search(MAX_ENC_DIFF_PATTERN, reason) if search_result: diff = search_result.groups(1)[0] diff = float(diff) if diff > max_enc_diff: max_enc_diff = diff print(msg) print ("") # remove encoder output files to save disk space if ref_enc_dbg_dir: shutil.rmtree(ref_enc_dbg_dir.name) if dut_enc_dbg_dir: shutil.rmtree(dut_enc_dbg_dir.name) if enc_test_result: record_property("MAXIMUM ENC DIFF", max_enc_diff) pytest.fail(msg) pytest.fail(enc_test_result_msg) # msg = "" # max_enc_diff = 0 # enc_test_result = 0 # for f in ENC_AUX_FILES: # filename = f[0] # dtype = f[1] # fs = int(sampling_rate) * 1000 # if isinstance(f[2], str): # nsamples_per_frame = np.int16(eval(f[2])) # else: # nsamples_per_frame = np.int16(f[2]) # ref_aux_files = glob.glob(os.path.join(ref_enc_dbg_dir.name, filename + '*')) # ref_aux_files = [os.path.basename(f) for f in ref_aux_files] # dut_aux_files = glob.glob(os.path.join(dut_enc_dbg_dir.name, filename + '*')) # dut_aux_files = [os.path.basename(f) for f in dut_aux_files] # common_aux_files = [f for f in ref_aux_files if f in dut_aux_files] # for aux_file in common_aux_files: # # compare the contents of each binary file extracted from the encoder based on histogram # output_differs, reason = cmp_bin_files( # os.path.join(ref_enc_dbg_dir.name, aux_file), # os.path.join(dut_enc_dbg_dir.name, aux_file), # dtype=dtype, # nsamples_per_frame=nsamples_per_frame, # len_check=1, # min_diff_thr=0.1, # ) # if output_differs: # enc_test_result = 1 # if msg: # msg += ", " # msg += reason # # check if the maximum difference has been exceeded # search_result = re.search(MAX_ENC_DIFF_PATTERN, reason) # if search_result: # diff = search_result.groups(1)[0] # diff = float(diff) # if diff > max_enc_diff: # max_enc_diff = diff # print ("") # # remove encoder output files to save disk space # if ref_enc_dbg_dir: # shutil.rmtree(ref_enc_dbg_dir.name) # if dut_enc_dbg_dir: # shutil.rmtree(dut_enc_dbg_dir.name) # if enc_test_result: # record_property("MAXIMUM ENC DIFF", max_enc_diff) # pytest.fail(msg) if encoder_only: return # check for networkSimulator_g192 command line if sim_opts != "": Loading Loading @@ -481,8 +585,7 @@ def encode( bitstream_file, enc_opts_list, update_ref, ref_enc_dbg_dir, dut_enc_dbg_dir, encoder_only, ): """ Call REF and/or DUT encoder. Loading @@ -494,30 +597,117 @@ def encode( ref_out_file = f"{ref_out_dir}/{bitstream_file}" dut_out_file = f"{dut_out_dir}/{bitstream_file}" if update_ref == 1 or update_ref == 2 and ( not os.path.exists(ref_out_file) or ref_enc_dbg_dir ): if encoder_only: stats_file = bitstream_file.replace(".192", ".stats") ref_stats_file = f"{ref_out_dir}/{stats_file}" dut_stats_file = f"{dut_out_dir}/{stats_file}" if ( update_ref in [1, 2] and not os.path.exists(ref_out_file) ) or encoder_only: check_and_makedir(ref_out_dir) # generate dbg tweak subfolder under /res to store encoder output files ref_dbg_tweak_folder = os.path.join(os.path.split(ref_encoder_frontend._path)[0], 'res') check_and_makedir(ref_dbg_tweak_folder) ref_dbg_tweak_folder = tempfile.TemporaryDirectory(dir=ref_dbg_tweak_folder) # call REF encoder ref_encoder_frontend.run( bitrate, sampling_rate, testv_file, ref_out_file, encoder_only=encoder_only, add_option_list=enc_opts_list, enc_dbg_dir=ref_enc_dbg_dir dbg_tweak_folder=Path(ref_dbg_tweak_folder.name), ) if update_ref in [0, 2]: # generate ref encoder stats if encoder_only: extract_enc_stats(Path(ref_dbg_tweak_folder.name), ref_stats_file, sampling_rate) ref_dbg_tweak_folder.cleanup() if update_ref in [0, 2] or encoder_only: check_and_makedir(dut_out_dir) # generate dbg tweak subfolder under /res to store encoder output files dut_dbg_tweak_folder = os.path.join(os.path.split(dut_encoder_frontend._path)[0], 'res') check_and_makedir(dut_dbg_tweak_folder) dut_dbg_tweak_folder = tempfile.TemporaryDirectory(dir=dut_dbg_tweak_folder) # call DUT encoder dut_encoder_frontend.run( bitrate, sampling_rate, testv_file, dut_out_file, encoder_only=encoder_only, add_option_list=enc_opts_list, enc_dbg_dir=dut_enc_dbg_dir dbg_tweak_folder=Path(dut_dbg_tweak_folder.name), ) # generate dut encoder stats if encoder_only: extract_enc_stats(Path(dut_dbg_tweak_folder.name), dut_stats_file, sampling_rate) dut_dbg_tweak_folder.cleanup() def extract_enc_stats ( dbg_tweak_folder, stats_file, sampling_rate, ): """ Extract statistics from auxiliary encoder files generated by running the encoder with DEBUG_MODE_INFO. Write the statistics to a text file """ hist_dicts = [] for f in ENC_AUX_FILES: filename = f[0] dtype = f[1] fs = int(sampling_rate) * 1000 if isinstance(f[2], str): nsamples_per_frame = np.int16(eval(f[2])) else: nsamples_per_frame = np.int16(f[2]) # aux_files = glob.glob(os.path.join(dbg_tweak_folder, filename + '\.*')) aux_files = [f for f in os.listdir(dbg_tweak_folder) if re.search(rf'^{filename}(\..*)?$', f)] # aux_files = [os.path.basename(f) for f in aux_files] for aux_file in aux_files: # extract statistics from the aux file based on histogram of values print(f"Extracting statistics from {os.path.basename(aux_file)} ... ", end="") # read the aux file with open(os.path.join(dbg_tweak_folder, aux_file), "r") as f_aux: data = np.fromfile(f_aux, dtype=dtype) # get file length data_len = data.shape[0] # remove the duplicates of each value per frame if nsamples_per_frame > 1: data = data[::nsamples_per_frame] # calculate histogram from data unique_values = np.sort(np.unique(data)) hist, _ = np.histogram(data, bins=np.append(unique_values, unique_values[-1] + 10)) # convert to dict and sort by absolute value of difference hist_dict = {"name" : os.path.basename(aux_file), "length" : data_len} dict_values = {str(unique_values[i]): str(hist[i]) for i in range(len(unique_values))} hist_dict.update(dict_values) hist_dicts.append(hist_dict) print(f"DONE") print("") with open(stats_file, "w") as f_stats: # append the statistics to the output file in text format f_stats.write(json.dumps(hist_dicts, indent=2)) def simulate( reference_path, Loading tests/constants.py +3 −1 Original line number Diff line number Diff line Loading @@ -5,6 +5,8 @@ MLD_PATTERN = r"MLD: ([\d\.]*)" MAX_DIFF_PATTERN = r"MAXIMUM ABS DIFF: (\d*)" MAX_ENC_DIFF_PATTERN = r"total number of differences is \d+ \((\d+\.\d+)%\)" MIN_ENC_AUX_FILE_DIFF_THR = 0.1 # minimum ratio of total number of differences in encoder aux file MIN_ENC_AUX_FILE_LENGTH_DIFF = 0.1 # minimum difference of encoder aux file length # list of encoder filename patterns with their data type and number of samples per frame # note: instead of specifying the number of samples per frame, you can use a formula incl. 'fs', e.g. 'fs/50' Loading @@ -15,7 +17,7 @@ ENC_AUX_FILES = [ ['cng_type', np.int16, 'fs/50'], ['coder_type', np.int16, 'fs/50'], ['core', np.int16, 'fs/50'], ['core_brate', np.float32, 640], ['core_brate', np.float32, 'fs/50'], ['count_SWB', np.int16, 'fs/50'], ['count_WB', np.int16, 'fs/50'], ['element_brate', np.float32, 'fs/50'], Loading Loading
tests/codec_be_on_mr_nonselection/test_param_file.py +257 −67 Original line number Diff line number Diff line Loading @@ -44,12 +44,13 @@ import glob import tempfile import numpy as np import shutil import json from tests.cmp_pcm import cmp_pcm from tests.cmp_bin_files import cmp_bin_files from tests.conftest import DecoderFrontend, EncoderFrontend from tests.testconfig import PARAM_FILE from ..constants import MLD_PATTERN, MAX_DIFF_PATTERN, ENC_AUX_FILES, MAX_ENC_DIFF_PATTERN from ..constants import MLD_PATTERN, MAX_DIFF_PATTERN, ENC_AUX_FILES, MAX_ENC_DIFF_PATTERN, MIN_ENC_AUX_FILE_DIFF_THR, MIN_ENC_AUX_FILE_LENGTH_DIFF import pdb Loading Loading @@ -129,11 +130,20 @@ def convert_test_string_to_tag(test_string): tag_str = "_".join(tag_str.split("__")) return tag_str def num(s): """ Convert string either to integer or float """ try: return int(s) except ValueError: return float(s) @pytest.mark.create_ref @pytest.mark.parametrize("test_tag", list(param_file_test_dict.keys())) def test_param_file_tests( record_property, encoder_only, decoder_only, dut_encoder_frontend: EncoderFrontend, dut_decoder_frontend: DecoderFrontend, Loading Loading @@ -184,15 +194,6 @@ def test_param_file_tests( bitstream_file = f"{testv_base}_{tag_str}.192" if not decoder_only: if update_ref == 2: # generate temp subdir for encoder output files ref_enc_res_dir = os.path.join(os.path.split(ref_encoder_frontend._path)[0], 'res') check_and_makedir(ref_enc_res_dir) ref_enc_dbg_dir = tempfile.TemporaryDirectory(dir=ref_enc_res_dir) dut_enc_res_dir = os.path.join(os.path.split(dut_encoder_frontend._path)[0], 'res') check_and_makedir(dut_enc_res_dir) dut_enc_dbg_dir = tempfile.TemporaryDirectory(dir=dut_enc_res_dir) encode( dut_encoder_frontend, Loading @@ -205,71 +206,174 @@ def test_param_file_tests( bitstream_file, enc_split, update_ref, ref_enc_dbg_dir = ref_enc_dbg_dir.name, dut_enc_dbg_dir = dut_enc_dbg_dir.name, encoder_only, ) # compare binary files extracted from the encoder if update_ref == 2: if encoder_only: print ("Comparing encoder auxiliary files") print ("=================================\n") msg = "" max_enc_diff = 0 ref_out_dir = f"{reference_path}/param_file/enc" stats_file = bitstream_file.replace(".192", ".stats") ref_stats_file = f"{reference_path}/param_file/enc/{stats_file}" dut_stats_file = f"{dut_base_path}/param_file/enc/{stats_file}" # pdb.set_trace() # open and read the .stats file with open(ref_stats_file, "r") as f_aux: ref_stats = json.load(f_aux) # create dictionary to map "name" to the corresponding dictionaries ref_stats_names = {d["name"]: d for d in ref_stats} with open(dut_stats_file, "r") as f_aux: dut_stats = json.load(f_aux) # create dictionary to map "name" to the corresponding dictionaries dut_stats_names = {d["name"]: d for d in dut_stats} # loop over all common aux files enc_test_result = 0 for f in ENC_AUX_FILES: filename = f[0] dtype = f[1] fs = int(sampling_rate) * 1000 if isinstance(f[2], str): nsamples_per_frame = np.int16(eval(f[2])) enc_test_result_msg = "" max_enc_diff = 0 for name in ref_stats_names: if name in dut_stats_names: # retrieve the dictionaries ref_stats_dict = ref_stats_names[name] dut_stats_dict = dut_stats_names[name] # ref_stats_dict = ref_stats_names["core_brate"] # dut_stats_dict = dut_stats_names["total_brate"] msg = f"File {name}" # compare the file lengths result_len_check = 0 file_length = max(ref_stats_dict["length"], dut_stats_dict["length"]) if ref_stats_dict["length"] != dut_stats_dict["length"]: msg += f" has different length between Ref {ref_stats_dict['length']} and DuT {dut_stats_dict['length']}" # check if threshold has been exceeded if abs(ref_stats_dict["length"] - dut_stats_dict["length"]) / file_length > MIN_ENC_AUX_FILE_LENGTH_DIFF: result_len_check = 1 msg += f", " # remove the "name" and "length" keys for further processing del ref_stats_dict["name"] del dut_stats_dict["name"] del ref_stats_dict["length"] del dut_stats_dict["length"] # convert keys and values from string to float ref_hist = {num(i) : num(j) for i,j in ref_stats_dict.items()} cut_hist = {num(i) : num(j) for i,j in dut_stats_dict.items()} delta_ref = set(cut_hist) - set(ref_hist) delta_cut = set(ref_hist) - set(cut_hist) # append missing keys for item in delta_cut: cut_hist[item] = 0 for item in delta_ref: ref_hist[item] = 0 ref_hist = dict(sorted(ref_hist.items())) cut_hist = dict(sorted(cut_hist.items())) # caculate difference of statistics diff_hist = { k : cut_hist[k] - ref_hist[k] for k in ref_hist.keys()} # calculate the total number of differences total_num_diff = sum(np.abs(list(diff_hist.values()))) total_num_diff_ratio = total_num_diff / (sum(ref_hist.values()) + sum(cut_hist.values())) msg += f"the total number of differences is {total_num_diff} ({(total_num_diff_ratio*100):.2f}%)" if total_num_diff_ratio > MIN_ENC_AUX_FILE_DIFF_THR: result_diff_check = 1 msg += "! " else: nsamples_per_frame = np.int16(f[2]) result_diff_check = 0 msg += ". " ref_aux_files = glob.glob(os.path.join(ref_enc_dbg_dir.name, filename + '*')) ref_aux_files = [os.path.basename(f) for f in ref_aux_files] dut_aux_files = glob.glob(os.path.join(dut_enc_dbg_dir.name, filename + '*')) dut_aux_files = [os.path.basename(f) for f in dut_aux_files] common_aux_files = [f for f in ref_aux_files if f in dut_aux_files] for aux_file in common_aux_files: # compare the contents of each binary file extracted from the encoder based on histogram output_differs, reason = cmp_bin_files( os.path.join(ref_enc_dbg_dir.name, aux_file), os.path.join(dut_enc_dbg_dir.name, aux_file), dtype=dtype, nsamples_per_frame=nsamples_per_frame, len_check=1, min_diff_thr=0.1, ) # check if the maximum difference has been exceeded if total_num_diff_ratio > max_enc_diff: max_enc_diff = total_num_diff_ratio if output_differs: # update test result if result_len_check or result_diff_check: enc_test_result = 1 if msg: msg += ", " msg += reason enc_test_result_msg += msg # check if the maximum difference has been exceeded search_result = re.search(MAX_ENC_DIFF_PATTERN, reason) if search_result: diff = search_result.groups(1)[0] diff = float(diff) if diff > max_enc_diff: max_enc_diff = diff print(msg) print ("") # remove encoder output files to save disk space if ref_enc_dbg_dir: shutil.rmtree(ref_enc_dbg_dir.name) if dut_enc_dbg_dir: shutil.rmtree(dut_enc_dbg_dir.name) if enc_test_result: record_property("MAXIMUM ENC DIFF", max_enc_diff) pytest.fail(msg) pytest.fail(enc_test_result_msg) # msg = "" # max_enc_diff = 0 # enc_test_result = 0 # for f in ENC_AUX_FILES: # filename = f[0] # dtype = f[1] # fs = int(sampling_rate) * 1000 # if isinstance(f[2], str): # nsamples_per_frame = np.int16(eval(f[2])) # else: # nsamples_per_frame = np.int16(f[2]) # ref_aux_files = glob.glob(os.path.join(ref_enc_dbg_dir.name, filename + '*')) # ref_aux_files = [os.path.basename(f) for f in ref_aux_files] # dut_aux_files = glob.glob(os.path.join(dut_enc_dbg_dir.name, filename + '*')) # dut_aux_files = [os.path.basename(f) for f in dut_aux_files] # common_aux_files = [f for f in ref_aux_files if f in dut_aux_files] # for aux_file in common_aux_files: # # compare the contents of each binary file extracted from the encoder based on histogram # output_differs, reason = cmp_bin_files( # os.path.join(ref_enc_dbg_dir.name, aux_file), # os.path.join(dut_enc_dbg_dir.name, aux_file), # dtype=dtype, # nsamples_per_frame=nsamples_per_frame, # len_check=1, # min_diff_thr=0.1, # ) # if output_differs: # enc_test_result = 1 # if msg: # msg += ", " # msg += reason # # check if the maximum difference has been exceeded # search_result = re.search(MAX_ENC_DIFF_PATTERN, reason) # if search_result: # diff = search_result.groups(1)[0] # diff = float(diff) # if diff > max_enc_diff: # max_enc_diff = diff # print ("") # # remove encoder output files to save disk space # if ref_enc_dbg_dir: # shutil.rmtree(ref_enc_dbg_dir.name) # if dut_enc_dbg_dir: # shutil.rmtree(dut_enc_dbg_dir.name) # if enc_test_result: # record_property("MAXIMUM ENC DIFF", max_enc_diff) # pytest.fail(msg) if encoder_only: return # check for networkSimulator_g192 command line if sim_opts != "": Loading Loading @@ -481,8 +585,7 @@ def encode( bitstream_file, enc_opts_list, update_ref, ref_enc_dbg_dir, dut_enc_dbg_dir, encoder_only, ): """ Call REF and/or DUT encoder. Loading @@ -494,30 +597,117 @@ def encode( ref_out_file = f"{ref_out_dir}/{bitstream_file}" dut_out_file = f"{dut_out_dir}/{bitstream_file}" if update_ref == 1 or update_ref == 2 and ( not os.path.exists(ref_out_file) or ref_enc_dbg_dir ): if encoder_only: stats_file = bitstream_file.replace(".192", ".stats") ref_stats_file = f"{ref_out_dir}/{stats_file}" dut_stats_file = f"{dut_out_dir}/{stats_file}" if ( update_ref in [1, 2] and not os.path.exists(ref_out_file) ) or encoder_only: check_and_makedir(ref_out_dir) # generate dbg tweak subfolder under /res to store encoder output files ref_dbg_tweak_folder = os.path.join(os.path.split(ref_encoder_frontend._path)[0], 'res') check_and_makedir(ref_dbg_tweak_folder) ref_dbg_tweak_folder = tempfile.TemporaryDirectory(dir=ref_dbg_tweak_folder) # call REF encoder ref_encoder_frontend.run( bitrate, sampling_rate, testv_file, ref_out_file, encoder_only=encoder_only, add_option_list=enc_opts_list, enc_dbg_dir=ref_enc_dbg_dir dbg_tweak_folder=Path(ref_dbg_tweak_folder.name), ) if update_ref in [0, 2]: # generate ref encoder stats if encoder_only: extract_enc_stats(Path(ref_dbg_tweak_folder.name), ref_stats_file, sampling_rate) ref_dbg_tweak_folder.cleanup() if update_ref in [0, 2] or encoder_only: check_and_makedir(dut_out_dir) # generate dbg tweak subfolder under /res to store encoder output files dut_dbg_tweak_folder = os.path.join(os.path.split(dut_encoder_frontend._path)[0], 'res') check_and_makedir(dut_dbg_tweak_folder) dut_dbg_tweak_folder = tempfile.TemporaryDirectory(dir=dut_dbg_tweak_folder) # call DUT encoder dut_encoder_frontend.run( bitrate, sampling_rate, testv_file, dut_out_file, encoder_only=encoder_only, add_option_list=enc_opts_list, enc_dbg_dir=dut_enc_dbg_dir dbg_tweak_folder=Path(dut_dbg_tweak_folder.name), ) # generate dut encoder stats if encoder_only: extract_enc_stats(Path(dut_dbg_tweak_folder.name), dut_stats_file, sampling_rate) dut_dbg_tweak_folder.cleanup() def extract_enc_stats ( dbg_tweak_folder, stats_file, sampling_rate, ): """ Extract statistics from auxiliary encoder files generated by running the encoder with DEBUG_MODE_INFO. Write the statistics to a text file """ hist_dicts = [] for f in ENC_AUX_FILES: filename = f[0] dtype = f[1] fs = int(sampling_rate) * 1000 if isinstance(f[2], str): nsamples_per_frame = np.int16(eval(f[2])) else: nsamples_per_frame = np.int16(f[2]) # aux_files = glob.glob(os.path.join(dbg_tweak_folder, filename + '\.*')) aux_files = [f for f in os.listdir(dbg_tweak_folder) if re.search(rf'^{filename}(\..*)?$', f)] # aux_files = [os.path.basename(f) for f in aux_files] for aux_file in aux_files: # extract statistics from the aux file based on histogram of values print(f"Extracting statistics from {os.path.basename(aux_file)} ... ", end="") # read the aux file with open(os.path.join(dbg_tweak_folder, aux_file), "r") as f_aux: data = np.fromfile(f_aux, dtype=dtype) # get file length data_len = data.shape[0] # remove the duplicates of each value per frame if nsamples_per_frame > 1: data = data[::nsamples_per_frame] # calculate histogram from data unique_values = np.sort(np.unique(data)) hist, _ = np.histogram(data, bins=np.append(unique_values, unique_values[-1] + 10)) # convert to dict and sort by absolute value of difference hist_dict = {"name" : os.path.basename(aux_file), "length" : data_len} dict_values = {str(unique_values[i]): str(hist[i]) for i in range(len(unique_values))} hist_dict.update(dict_values) hist_dicts.append(hist_dict) print(f"DONE") print("") with open(stats_file, "w") as f_stats: # append the statistics to the output file in text format f_stats.write(json.dumps(hist_dicts, indent=2)) def simulate( reference_path, Loading
tests/constants.py +3 −1 Original line number Diff line number Diff line Loading @@ -5,6 +5,8 @@ MLD_PATTERN = r"MLD: ([\d\.]*)" MAX_DIFF_PATTERN = r"MAXIMUM ABS DIFF: (\d*)" MAX_ENC_DIFF_PATTERN = r"total number of differences is \d+ \((\d+\.\d+)%\)" MIN_ENC_AUX_FILE_DIFF_THR = 0.1 # minimum ratio of total number of differences in encoder aux file MIN_ENC_AUX_FILE_LENGTH_DIFF = 0.1 # minimum difference of encoder aux file length # list of encoder filename patterns with their data type and number of samples per frame # note: instead of specifying the number of samples per frame, you can use a formula incl. 'fs', e.g. 'fs/50' Loading @@ -15,7 +17,7 @@ ENC_AUX_FILES = [ ['cng_type', np.int16, 'fs/50'], ['coder_type', np.int16, 'fs/50'], ['core', np.int16, 'fs/50'], ['core_brate', np.float32, 640], ['core_brate', np.float32, 'fs/50'], ['count_SWB', np.int16, 'fs/50'], ['count_WB', np.int16, 'fs/50'], ['element_brate', np.float32, 'fs/50'], Loading