Loading ivas_processing_scripts/__init__.py +0 −9 Original line number Diff line number Diff line Loading @@ -50,7 +50,6 @@ from ivas_processing_scripts.processing.processing import ( process_item, rename_generated_conditions, reorder_items_list, reverse_process_2, scale_resulting_files, ) from ivas_processing_scripts.utils import DirManager, apply_func_parallel Loading Loading @@ -184,14 +183,6 @@ def main(args): "mp" if cfg.multiprocessing else None, ) # remove preamble and split signals if hasattr(cfg, "preprocessing_2"): reverse_process_2(cfg, logger) # scale individual files if cfg.postprocessing.get("loudness", False): scale_resulting_files(cfg, logger) # rename output with condition name if cfg.condition_in_output_filename: rename_generated_conditions(cfg.output_path) Loading ivas_processing_scripts/processing/chains.py +18 −3 Original line number Diff line number Diff line Loading @@ -39,6 +39,7 @@ from ivas_processing_scripts.processing.ivas import IVAS, validate_sba_fmt from ivas_processing_scripts.processing.postprocessing import Postprocessing from ivas_processing_scripts.processing.preprocessing import Preprocessing from ivas_processing_scripts.processing.preprocessing_2 import Preprocessing2 from ivas_processing_scripts.processing.processing_splitting_scaling import Processing_splitting_scaling from ivas_processing_scripts.utils import get_abs_path, list_audio Loading @@ -54,9 +55,6 @@ def init_processing_chains(cfg: TestConfig) -> None: # other processing chains for cond_name, cond_cfg in cfg.conditions_to_generate.items(): bitrates = cond_cfg.get("bitrates") # TODO we may need to change this to ensure it is only one value for IVAS and a possible list for EVS # condition naming will also need to be checked since we rename to {cond_name}_{bitrate} # this may not be desired if bitrates is not None and len(bitrates) > 1: multiple_bitrates_flag = True else: Loading Loading @@ -214,6 +212,7 @@ def get_processing_chain( # get pre and post processing configurations pre_cfg = getattr(cfg, "preprocessing", {}) post_cfg = cfg.postprocessing pre2_cfg = getattr(cfg, "preprocessing_2", {}) # default to input values if preprocessing was not requested tmp_in_fs = pre_cfg.get("fs", cfg.input.get("fs")) Loading Loading @@ -440,5 +439,21 @@ def get_processing_chain( } ) ) # add splitting and scaling for all conditions chain["processes"].append( Processing_splitting_scaling( { "fs": post_cfg["fs"], "in_fmt": post_cfg["fmt"], "out_fmt": post_cfg["fmt"], # no rendering here "concatenate_input": pre2_cfg.get("concatenate_input", False), "preamble": pre2_cfg.get("preamble", 0), "repeat_signal": pre2_cfg.get("repeat_signal", False), "loudness": post_cfg.get("loudness", None), "loudness_fmt": post_cfg.get("loudness_fmt", None), "tx_condition": tx_condition, } ) ) return chain ivas_processing_scripts/processing/preprocessing_2.py +0 −1 Original line number Diff line number Diff line Loading @@ -65,7 +65,6 @@ class Preprocessing2(Processing): ) # add preamble # also apply preamble to ISM metadata if self.in_fmt.startswith("ISM"): if not self.preamble: Loading ivas_processing_scripts/processing/processing.py +92 −127 Original line number Diff line number Diff line Loading @@ -45,10 +45,10 @@ from ivas_processing_scripts.audiotools import audio from ivas_processing_scripts.audiotools.audiofile import ( concat, read, split, trim, write, ) from ivas_processing_scripts.audiotools.audioarray import window from ivas_processing_scripts.audiotools.convert.__init__ import convert from ivas_processing_scripts.audiotools.metadata import ( add_remove_preamble, Loading Loading @@ -172,60 +172,54 @@ def concat_setup(cfg: TestConfig, chain, logger: logging.Logger): logger.info(f"Splits written to file {cfg.concat_file.with_suffix('.splits.log')}") def concat_teardown(cfg: TestConfig, logger: logging.Logger): if not cfg.splits: raise ValueError("Splitting not possible without split marker") output_format = cfg.postprocessing["fmt"] out_files = [] out_meta = [] def concat_teardown(x, splits, out_fmt, fs, in_fs, logger: logging.Logger): logger.info(f"Splitting output file in directory {cfg.output_path}") if not splits: raise ValueError("Splitting not possible without split marker") logger.debug("Split files") # if sampling rate changed, adjust splits fs_new = float(cfg.postprocessing["fs"]) fs_old = float(cfg.pre2.in_fs) fs_new = float(fs) fs_old = float(in_fs) relative_fs_change = fs_new / fs_old new_splits = [] for split_i in cfg.splits: for split_i in splits: new_splits.append(int(float(split_i) * relative_fs_change)) cfg.splits = new_splits for odir in cfg.out_dirs: path_input = odir / cfg.items_list[0].name out_paths = split( path_input, odir, cfg.split_names, cfg.splits, in_fs=cfg.postprocessing["fs"], ) splits = new_splits logger.debug( f"Resulting split files condition {odir.name}: {', '.join([str(op) for op in out_paths])}" ) out_files.append(out_paths) split_old = 0 split_signals = [] for idx, split in enumerate(splits): # split y = x[split_old:split, :] # split ISM metadata if output_format.startswith("ISM"): for odir in cfg.out_dirs: path_input = odir / cfg.items_list[0].name out_meta_paths = split_meta_in_file( path_input, odir, cfg.split_names, cfg.splits, output_format, meta_files=cfg.metadata_path[0], ) out_meta.append(out_meta_paths) # windowing y = window(y) # remove concatenated file if cfg.delete_tmp: cfg.concat_file.unlink(missing_ok=True) # add signal to list split_signals.append(y) split_old = split # split ISM metadata if out_fmt.startswith("ISM"): # TODO # for odir in cfg.out_dirs: # path_input = odir / cfg.items_list[0].name # out_meta_paths = split_meta_in_file( # path_input, # odir, # cfg.split_names, # cfg.splits, # output_format, # meta_files=cfg.metadata_path[0], # ) # out_meta.append(out_meta_paths) split_meta = None pass else: split_meta = None return out_files, out_meta return split_signals, split_meta def preprocess(cfg, logger): Loading Loading @@ -322,38 +316,6 @@ def preprocess_2(cfg, logger): return def reverse_process_2(cfg, logger): # remove preamble and first half of signal due to repetition if cfg.pre2.preamble or cfg.pre2.repeat_signal: remove_preamble(cfg, logger) # reverse concatenation if cfg.pre2.concatenate_input: # write out the splits, optionally remove file out_paths_splits, out_meta_splits = concat_teardown(cfg, logger) else: # if no concatenation read files from folder out_paths_splits = [] for out_dir in cfg.out_dirs: list_audio_dir = list_audio(out_dir) out_paths_splits.append(list_audio_dir) if cfg.postprocessing["fmt"].startswith("ISM"): out_meta_splits = [] for i, condition in enumerate(out_paths_splits): meta_condition = metadata_search( cfg.out_dirs[i], condition, num_objects=int(cfg.postprocessing["fmt"][-1]), ) out_meta_splits.append(meta_condition) else: out_meta_splits = None cfg.pre2.out_paths_splits = out_paths_splits cfg.pre2.out_meta_splits = out_meta_splits return def process_item( in_file: Union[Path, str], tmp_dir: Union[Path, str], Loading Loading @@ -413,6 +375,7 @@ def process_item( for im in range(len(in_meta)): out_meta.append(out_dir.joinpath(f"{Path(out_file).stem}.wav.{im}.csv")) tx_condition = False # execute each process sequentially, feed output into input of next process for p, (input, output), input_meta in zip( chain, pairwise(processing_paths), processing_paths_meta[:-1] Loading @@ -424,6 +387,12 @@ def process_item( fh.setFormatter(logging.Formatter(LOGGER_FORMAT, datefmt=LOGGER_DATEFMT)) item_logger.addHandler(fh) # set info about tx and loudness try: tx_condition = p.tx_condition except AttributeError: tx_condition = False p.process(input, output, input_meta, item_logger) # copy output and metadata from final process to output file Loading @@ -431,59 +400,55 @@ def process_item( if processing_paths_meta[-1]: for idx, ppm in enumerate(processing_paths_meta[-1]): copyfile(ppm, out_meta[idx]) # if bitstream processing and loudness adjustment copy additional file if tx_condition: out_file_noerror = Path(f"{out_file.with_suffix('')}.noerror.wav") in_file_noerror = Path(f"{processing_paths[-1].with_suffix('')}.noerror.wav") copyfile(in_file_noerror, out_file_noerror) if processing_paths_meta[-1]: # TODO for idx, ppm in enumerate(processing_paths_meta[-1]): copyfile(ppm, out_meta[idx]) def remove_preamble(cfg, logger): # get number of channels from output format num_channels = audio.fromtype(cfg.postprocessing["fmt"]).num_channels for odir in cfg.out_dirs: for item in cfg.items_list: path_input = odir / item.name def remove_preamble(x, out_fmt, fs, repeat_signal, preamble, logger): # remove preamble for ISM metadata if cfg.postprocessing["fmt"].startswith("ISM"): # search for metadata meta_item = metadata_search( odir, [Path(item.name)], num_objects=num_channels ) metadata_array = [] for meta_i in meta_item: metadata_array.append(np.genfromtxt(meta_i, delimiter=",")) # cut first half of the metadata if cfg.pre2.repeat_signal: metadata_array = [m[int(len(m) / 2) :, :] for m in metadata_array] # remove preamble if cfg.pre2.preamble: metadata_array = add_remove_preamble( metadata_array, cfg.pre2.preamble, add=False ) # write csv files write_ISM_metadata_in_file( metadata_array, [path_input], automatic_naming=True ) # read file x, fs = read( path_input, nchannels=num_channels, fs=cfg.postprocessing["fs"] ) if out_fmt.startswith("ISM"): # TODO # # search for metadata # meta_item = metadata_search( # odir, [Path(item.name)], num_objects=num_channels # ) # metadata_array = [] # for meta_i in meta_item: # metadata_array.append(np.genfromtxt(meta_i, delimiter=",")) # # # cut first half of the metadata # if cfg.pre2.repeat_signal: # metadata_array = [m[int(len(m) / 2) :, :] for m in metadata_array] # # # remove preamble # if cfg.pre2.preamble: # metadata_array = add_remove_preamble( # metadata_array, cfg.pre2.preamble, add=False # ) # # # write csv files # write_ISM_metadata_in_file( # metadata_array, [path_input], automatic_naming=True # ) pass # remove first half of signal if cfg.pre2.repeat_signal: if repeat_signal: logger.info("Remove first half of signal") x = x[int(len(x) / 2):, :] # remove preamble if cfg.pre2.preamble: if preamble: logger.info("Remove preamble") x = trim(x, fs, (cfg.pre2.preamble, 0)) x = trim(x, fs, (preamble, 0)) # write file write(path_input, x, fs) return return x def preprocess_background_noise(cfg): Loading ivas_processing_scripts/processing/processing_splitting_scaling.py 0 → 100644 +103 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # (C) 2022-2023 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB, # Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., # Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, # Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other # contributors to this repository. All Rights Reserved. # # This software is protected by copyright law and by international treaties. # The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB, # Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., # Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, # Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other # contributors to this repository retain full ownership rights in their respective contributions in # the software. This notice grants no license of any kind, including but not limited to patent # license, nor is any license granted by implication, estoppel or otherwise. # # Contributors are required to enter into the IVAS codec Public Collaboration agreement before making # contributions. # # This software is provided "AS IS", without any express or implied warranties. The software is in the # development stage. It is intended exclusively for experts who have experience with such software and # solely for the purpose of inspection. All implied warranties of non-infringement, merchantability # and fitness for a particular purpose are hereby disclaimed and excluded. # # Any dispute, controversy or claim arising under or in relation to providing this software shall be # submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in # accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and # the United Nations Convention on Contracts on the International Sales of Goods. # import logging from pathlib import Path from itertools import repeat from ivas_processing_scripts.processing.processing import Processing, remove_preamble, concat_teardown, \ scale_resulting_files from ivas_processing_scripts.audiotools import audio from ivas_processing_scripts.audiotools.audiofile import write, read class Processing_splitting_scaling(Processing): def __init__(self, attrs: dict): super().__init__(attrs) self.name = "processing_splitting_scaling" def process(self, in_file: Path, out_file: Path, in_meta, logger: logging.Logger): logger.debug(f"Processing splitting scaling configuration : {self.__dict__}") logger.debug(f"Processing splitting scaling {in_file.absolute()}") # get number of channels from output format num_channels = audio.fromtype(self.out_fmt).num_channels # read file x, fs = read(in_file, nchannels=num_channels, fs=self.fs) if self.tx_condition: in_file_noerror = Path(f"{in_file.with_suffix('')}.noerror.wav") x_noerror, _ = read(in_file_noerror, nchannels=num_channels, fs=self.fs) # remove preamble and first half of signal due to repetition if self.preamble or self.repeat_signal: # TODO: ISM metadata x = remove_preamble(x, self.out_fmt, self.fs, self.repeat_signal, self.preamble, logger) if self.tx_condition: # also process noerror signal x_noerror = remove_preamble(x_noerror, self.out_fmt, self.fs, self.repeat_signal, self.preamble, logger) # reverse concatenation if self.concatenate_input: # read out splits file -> start/end, names, sampling rate splits, split_names, split_fs = read_splits_file(in_file) # split file file_splits,meta_splits = concat_teardown(x, splits, self.out_fmt, fs, split_fs, logger) # set new out_files out_files = split_names # TODO if self.tx_condition: file_splits_noerror, meta_splits_noerror = concat_teardown(x, splits, self.out_fmt, fs, split_fs, logger) else: out_files = [out_file] file_splits = [x] meta_splits = repeat(None) if self.tx_condition: file_splits_noerror = None meta_splits_noerror = None # TODO # scale splitted files if self.loudness: # TODO # scale_resulting_files(cfg, logger) pass # write file(s) for of, file_s, meta_s in zip(out_files, file_splits, meta_splits): write(of, file_s, fs) if meta_s: pass # TODO if self.tx_condition: pass # TODO def read_splits_file(in_file): # TODO return None, None, None Loading
ivas_processing_scripts/__init__.py +0 −9 Original line number Diff line number Diff line Loading @@ -50,7 +50,6 @@ from ivas_processing_scripts.processing.processing import ( process_item, rename_generated_conditions, reorder_items_list, reverse_process_2, scale_resulting_files, ) from ivas_processing_scripts.utils import DirManager, apply_func_parallel Loading Loading @@ -184,14 +183,6 @@ def main(args): "mp" if cfg.multiprocessing else None, ) # remove preamble and split signals if hasattr(cfg, "preprocessing_2"): reverse_process_2(cfg, logger) # scale individual files if cfg.postprocessing.get("loudness", False): scale_resulting_files(cfg, logger) # rename output with condition name if cfg.condition_in_output_filename: rename_generated_conditions(cfg.output_path) Loading
ivas_processing_scripts/processing/chains.py +18 −3 Original line number Diff line number Diff line Loading @@ -39,6 +39,7 @@ from ivas_processing_scripts.processing.ivas import IVAS, validate_sba_fmt from ivas_processing_scripts.processing.postprocessing import Postprocessing from ivas_processing_scripts.processing.preprocessing import Preprocessing from ivas_processing_scripts.processing.preprocessing_2 import Preprocessing2 from ivas_processing_scripts.processing.processing_splitting_scaling import Processing_splitting_scaling from ivas_processing_scripts.utils import get_abs_path, list_audio Loading @@ -54,9 +55,6 @@ def init_processing_chains(cfg: TestConfig) -> None: # other processing chains for cond_name, cond_cfg in cfg.conditions_to_generate.items(): bitrates = cond_cfg.get("bitrates") # TODO we may need to change this to ensure it is only one value for IVAS and a possible list for EVS # condition naming will also need to be checked since we rename to {cond_name}_{bitrate} # this may not be desired if bitrates is not None and len(bitrates) > 1: multiple_bitrates_flag = True else: Loading Loading @@ -214,6 +212,7 @@ def get_processing_chain( # get pre and post processing configurations pre_cfg = getattr(cfg, "preprocessing", {}) post_cfg = cfg.postprocessing pre2_cfg = getattr(cfg, "preprocessing_2", {}) # default to input values if preprocessing was not requested tmp_in_fs = pre_cfg.get("fs", cfg.input.get("fs")) Loading Loading @@ -440,5 +439,21 @@ def get_processing_chain( } ) ) # add splitting and scaling for all conditions chain["processes"].append( Processing_splitting_scaling( { "fs": post_cfg["fs"], "in_fmt": post_cfg["fmt"], "out_fmt": post_cfg["fmt"], # no rendering here "concatenate_input": pre2_cfg.get("concatenate_input", False), "preamble": pre2_cfg.get("preamble", 0), "repeat_signal": pre2_cfg.get("repeat_signal", False), "loudness": post_cfg.get("loudness", None), "loudness_fmt": post_cfg.get("loudness_fmt", None), "tx_condition": tx_condition, } ) ) return chain
ivas_processing_scripts/processing/preprocessing_2.py +0 −1 Original line number Diff line number Diff line Loading @@ -65,7 +65,6 @@ class Preprocessing2(Processing): ) # add preamble # also apply preamble to ISM metadata if self.in_fmt.startswith("ISM"): if not self.preamble: Loading
ivas_processing_scripts/processing/processing.py +92 −127 Original line number Diff line number Diff line Loading @@ -45,10 +45,10 @@ from ivas_processing_scripts.audiotools import audio from ivas_processing_scripts.audiotools.audiofile import ( concat, read, split, trim, write, ) from ivas_processing_scripts.audiotools.audioarray import window from ivas_processing_scripts.audiotools.convert.__init__ import convert from ivas_processing_scripts.audiotools.metadata import ( add_remove_preamble, Loading Loading @@ -172,60 +172,54 @@ def concat_setup(cfg: TestConfig, chain, logger: logging.Logger): logger.info(f"Splits written to file {cfg.concat_file.with_suffix('.splits.log')}") def concat_teardown(cfg: TestConfig, logger: logging.Logger): if not cfg.splits: raise ValueError("Splitting not possible without split marker") output_format = cfg.postprocessing["fmt"] out_files = [] out_meta = [] def concat_teardown(x, splits, out_fmt, fs, in_fs, logger: logging.Logger): logger.info(f"Splitting output file in directory {cfg.output_path}") if not splits: raise ValueError("Splitting not possible without split marker") logger.debug("Split files") # if sampling rate changed, adjust splits fs_new = float(cfg.postprocessing["fs"]) fs_old = float(cfg.pre2.in_fs) fs_new = float(fs) fs_old = float(in_fs) relative_fs_change = fs_new / fs_old new_splits = [] for split_i in cfg.splits: for split_i in splits: new_splits.append(int(float(split_i) * relative_fs_change)) cfg.splits = new_splits for odir in cfg.out_dirs: path_input = odir / cfg.items_list[0].name out_paths = split( path_input, odir, cfg.split_names, cfg.splits, in_fs=cfg.postprocessing["fs"], ) splits = new_splits logger.debug( f"Resulting split files condition {odir.name}: {', '.join([str(op) for op in out_paths])}" ) out_files.append(out_paths) split_old = 0 split_signals = [] for idx, split in enumerate(splits): # split y = x[split_old:split, :] # split ISM metadata if output_format.startswith("ISM"): for odir in cfg.out_dirs: path_input = odir / cfg.items_list[0].name out_meta_paths = split_meta_in_file( path_input, odir, cfg.split_names, cfg.splits, output_format, meta_files=cfg.metadata_path[0], ) out_meta.append(out_meta_paths) # windowing y = window(y) # remove concatenated file if cfg.delete_tmp: cfg.concat_file.unlink(missing_ok=True) # add signal to list split_signals.append(y) split_old = split # split ISM metadata if out_fmt.startswith("ISM"): # TODO # for odir in cfg.out_dirs: # path_input = odir / cfg.items_list[0].name # out_meta_paths = split_meta_in_file( # path_input, # odir, # cfg.split_names, # cfg.splits, # output_format, # meta_files=cfg.metadata_path[0], # ) # out_meta.append(out_meta_paths) split_meta = None pass else: split_meta = None return out_files, out_meta return split_signals, split_meta def preprocess(cfg, logger): Loading Loading @@ -322,38 +316,6 @@ def preprocess_2(cfg, logger): return def reverse_process_2(cfg, logger): # remove preamble and first half of signal due to repetition if cfg.pre2.preamble or cfg.pre2.repeat_signal: remove_preamble(cfg, logger) # reverse concatenation if cfg.pre2.concatenate_input: # write out the splits, optionally remove file out_paths_splits, out_meta_splits = concat_teardown(cfg, logger) else: # if no concatenation read files from folder out_paths_splits = [] for out_dir in cfg.out_dirs: list_audio_dir = list_audio(out_dir) out_paths_splits.append(list_audio_dir) if cfg.postprocessing["fmt"].startswith("ISM"): out_meta_splits = [] for i, condition in enumerate(out_paths_splits): meta_condition = metadata_search( cfg.out_dirs[i], condition, num_objects=int(cfg.postprocessing["fmt"][-1]), ) out_meta_splits.append(meta_condition) else: out_meta_splits = None cfg.pre2.out_paths_splits = out_paths_splits cfg.pre2.out_meta_splits = out_meta_splits return def process_item( in_file: Union[Path, str], tmp_dir: Union[Path, str], Loading Loading @@ -413,6 +375,7 @@ def process_item( for im in range(len(in_meta)): out_meta.append(out_dir.joinpath(f"{Path(out_file).stem}.wav.{im}.csv")) tx_condition = False # execute each process sequentially, feed output into input of next process for p, (input, output), input_meta in zip( chain, pairwise(processing_paths), processing_paths_meta[:-1] Loading @@ -424,6 +387,12 @@ def process_item( fh.setFormatter(logging.Formatter(LOGGER_FORMAT, datefmt=LOGGER_DATEFMT)) item_logger.addHandler(fh) # set info about tx and loudness try: tx_condition = p.tx_condition except AttributeError: tx_condition = False p.process(input, output, input_meta, item_logger) # copy output and metadata from final process to output file Loading @@ -431,59 +400,55 @@ def process_item( if processing_paths_meta[-1]: for idx, ppm in enumerate(processing_paths_meta[-1]): copyfile(ppm, out_meta[idx]) # if bitstream processing and loudness adjustment copy additional file if tx_condition: out_file_noerror = Path(f"{out_file.with_suffix('')}.noerror.wav") in_file_noerror = Path(f"{processing_paths[-1].with_suffix('')}.noerror.wav") copyfile(in_file_noerror, out_file_noerror) if processing_paths_meta[-1]: # TODO for idx, ppm in enumerate(processing_paths_meta[-1]): copyfile(ppm, out_meta[idx]) def remove_preamble(cfg, logger): # get number of channels from output format num_channels = audio.fromtype(cfg.postprocessing["fmt"]).num_channels for odir in cfg.out_dirs: for item in cfg.items_list: path_input = odir / item.name def remove_preamble(x, out_fmt, fs, repeat_signal, preamble, logger): # remove preamble for ISM metadata if cfg.postprocessing["fmt"].startswith("ISM"): # search for metadata meta_item = metadata_search( odir, [Path(item.name)], num_objects=num_channels ) metadata_array = [] for meta_i in meta_item: metadata_array.append(np.genfromtxt(meta_i, delimiter=",")) # cut first half of the metadata if cfg.pre2.repeat_signal: metadata_array = [m[int(len(m) / 2) :, :] for m in metadata_array] # remove preamble if cfg.pre2.preamble: metadata_array = add_remove_preamble( metadata_array, cfg.pre2.preamble, add=False ) # write csv files write_ISM_metadata_in_file( metadata_array, [path_input], automatic_naming=True ) # read file x, fs = read( path_input, nchannels=num_channels, fs=cfg.postprocessing["fs"] ) if out_fmt.startswith("ISM"): # TODO # # search for metadata # meta_item = metadata_search( # odir, [Path(item.name)], num_objects=num_channels # ) # metadata_array = [] # for meta_i in meta_item: # metadata_array.append(np.genfromtxt(meta_i, delimiter=",")) # # # cut first half of the metadata # if cfg.pre2.repeat_signal: # metadata_array = [m[int(len(m) / 2) :, :] for m in metadata_array] # # # remove preamble # if cfg.pre2.preamble: # metadata_array = add_remove_preamble( # metadata_array, cfg.pre2.preamble, add=False # ) # # # write csv files # write_ISM_metadata_in_file( # metadata_array, [path_input], automatic_naming=True # ) pass # remove first half of signal if cfg.pre2.repeat_signal: if repeat_signal: logger.info("Remove first half of signal") x = x[int(len(x) / 2):, :] # remove preamble if cfg.pre2.preamble: if preamble: logger.info("Remove preamble") x = trim(x, fs, (cfg.pre2.preamble, 0)) x = trim(x, fs, (preamble, 0)) # write file write(path_input, x, fs) return return x def preprocess_background_noise(cfg): Loading
ivas_processing_scripts/processing/processing_splitting_scaling.py 0 → 100644 +103 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # (C) 2022-2023 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB, # Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., # Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, # Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other # contributors to this repository. All Rights Reserved. # # This software is protected by copyright law and by international treaties. # The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB, # Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., # Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, # Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other # contributors to this repository retain full ownership rights in their respective contributions in # the software. This notice grants no license of any kind, including but not limited to patent # license, nor is any license granted by implication, estoppel or otherwise. # # Contributors are required to enter into the IVAS codec Public Collaboration agreement before making # contributions. # # This software is provided "AS IS", without any express or implied warranties. The software is in the # development stage. It is intended exclusively for experts who have experience with such software and # solely for the purpose of inspection. All implied warranties of non-infringement, merchantability # and fitness for a particular purpose are hereby disclaimed and excluded. # # Any dispute, controversy or claim arising under or in relation to providing this software shall be # submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in # accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and # the United Nations Convention on Contracts on the International Sales of Goods. # import logging from pathlib import Path from itertools import repeat from ivas_processing_scripts.processing.processing import Processing, remove_preamble, concat_teardown, \ scale_resulting_files from ivas_processing_scripts.audiotools import audio from ivas_processing_scripts.audiotools.audiofile import write, read class Processing_splitting_scaling(Processing): def __init__(self, attrs: dict): super().__init__(attrs) self.name = "processing_splitting_scaling" def process(self, in_file: Path, out_file: Path, in_meta, logger: logging.Logger): logger.debug(f"Processing splitting scaling configuration : {self.__dict__}") logger.debug(f"Processing splitting scaling {in_file.absolute()}") # get number of channels from output format num_channels = audio.fromtype(self.out_fmt).num_channels # read file x, fs = read(in_file, nchannels=num_channels, fs=self.fs) if self.tx_condition: in_file_noerror = Path(f"{in_file.with_suffix('')}.noerror.wav") x_noerror, _ = read(in_file_noerror, nchannels=num_channels, fs=self.fs) # remove preamble and first half of signal due to repetition if self.preamble or self.repeat_signal: # TODO: ISM metadata x = remove_preamble(x, self.out_fmt, self.fs, self.repeat_signal, self.preamble, logger) if self.tx_condition: # also process noerror signal x_noerror = remove_preamble(x_noerror, self.out_fmt, self.fs, self.repeat_signal, self.preamble, logger) # reverse concatenation if self.concatenate_input: # read out splits file -> start/end, names, sampling rate splits, split_names, split_fs = read_splits_file(in_file) # split file file_splits,meta_splits = concat_teardown(x, splits, self.out_fmt, fs, split_fs, logger) # set new out_files out_files = split_names # TODO if self.tx_condition: file_splits_noerror, meta_splits_noerror = concat_teardown(x, splits, self.out_fmt, fs, split_fs, logger) else: out_files = [out_file] file_splits = [x] meta_splits = repeat(None) if self.tx_condition: file_splits_noerror = None meta_splits_noerror = None # TODO # scale splitted files if self.loudness: # TODO # scale_resulting_files(cfg, logger) pass # write file(s) for of, file_s, meta_s in zip(out_files, file_splits, meta_splits): write(of, file_s, fs) if meta_s: pass # TODO if self.tx_condition: pass # TODO def read_splits_file(in_file): # TODO return None, None, None