diff --git a/generate_test.py b/generate_test.py old mode 100644 new mode 100755 index c187dbc2d76e298c4990f965f7449c297e5ddbb9..141b11063c611a99668dd3346a5ca4fb693284db --- a/generate_test.py +++ b/generate_test.py @@ -49,7 +49,7 @@ IN_FMT_FOR_MASA_EXPS = { } -def generate_tests(exp_lab_pairs, run_parallel=True, create_cfg_only=False): +def generate_tests(exp_lab_pairs, create_cfg_only=False): """ Create configs and run them for all given experiment/lab pairs """ @@ -64,15 +64,13 @@ def generate_tests(exp_lab_pairs, run_parallel=True, create_cfg_only=False): return args = [Arguments(str(cfg)) for cfg in cfgs] - apply_func_parallel(generate_test, zip(args), type="mp" if run_parallel else None) + apply_func_parallel(generate_test, zip(args), None) class Arguments: def __init__(self, config): self.config = config self.debug = False - # used to overwrite the multiprocessing key in the configs to rather parallelize on category level - self.multiprocessing = False def create_experiment_setup(experiment, lab) -> list[Path]: @@ -170,15 +168,10 @@ if __name__ == "__main__": nargs="+", help="The combinations of experiment/lab-id that you want to generate, separated by whitespace. Experiment and lab id need to be separated by a comma.", ) - parser.add_argument( - "--no_parallel", - action="store_true", - help="If given, configs will not be run in parallel", - ) parser.add_argument( "--create_cfg_only", action="store_true", help="If given, only create the configs and folder structure without processing items", ) args = parser.parse_args() - generate_tests(args.exp_lab_pairs, not args.no_parallel, args.create_cfg_only) + generate_tests(args.exp_lab_pairs, args.create_cfg_only) diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index 15424a3a36d74396fc4bbbcab3025475ace59838..4b508fc75d70e8d07b006b0311d115863d9798f8 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -31,7 +31,10 @@ # import logging -from itertools import repeat +import sys +from itertools import product +from multiprocessing import Pool +from time import sleep from ivas_processing_scripts.audiotools.metadata import check_ISM_metadata from ivas_processing_scripts.constants import ( @@ -47,7 +50,11 @@ from ivas_processing_scripts.processing.processing import ( process_item, reorder_items_list, ) -from ivas_processing_scripts.utils import DirManager, apply_func_parallel +from ivas_processing_scripts.utils import ( + DirManager, + apply_func_parallel, + progressbar_update, +) def logging_init(args, cfg): @@ -117,7 +124,7 @@ def main(args): metadata_str = [] for o in range(len(metadata[i])): metadata_str.append(str(metadata[i][o])) - logger.info( + logger.debug( f" ISM metadata files item {cfg.items_list[i]}: {', '.join(metadata_str)}" ) @@ -147,27 +154,41 @@ def main(args): # preprocess 2 preprocess_2(cfg, logger) - # run conditions - for condition, out_dir, tmp_dir in zip( - cfg.proc_chains, cfg.out_dirs, cfg.tmp_dirs + # assemble a list of all item and condition combinations + item_args = list() + for (chain, tmp_dir, out_dir), (item, metadata) in product( + zip(cfg.proc_chains, cfg.tmp_dirs, cfg.out_dirs), + zip(cfg.items_list, cfg.metadata_path), ): - chain = condition["processes"] + item_args.append( + (item, tmp_dir, out_dir, chain["processes"], logger, metadata) + ) - logger.info(f" Generating condition: {condition['name']}") + if cfg.multiprocessing: + # set up values for progress display and chunksize + count = len(item_args) + width = 80 - apply_func_parallel( + # submit tasks to the pool + p = Pool() + results = p.starmap_async( process_item, - zip( - cfg.items_list, - repeat(tmp_dir), - repeat(out_dir), - repeat(chain), - repeat(logger), - cfg.metadata_path, - ), - None, - "mp" if cfg.multiprocessing else None, + item_args, ) + # poll progress + progressbar_update(0, count, width) + while not results.ready(): + progressbar_update(count - int(results._number_left), count, width) + sleep(0.1) + progressbar_update(count, count, width) + print("\n", flush=True, file=sys.stdout) + + p.close() + p.join() + + else: + apply_func_parallel(process_item, item_args, None, None, True) + # copy configuration to output directory cfg.to_file(cfg.output_path.joinpath(f"{cfg.name}.yml")) diff --git a/ivas_processing_scripts/audiotools/binauralobjectrenderer.py b/ivas_processing_scripts/audiotools/binauralobjectrenderer.py index efc0bae707cbc905ac0d5ed9bbed075c1ec4f4e9..dda742ba5095ac885f1bdbbce9926d70c38f5f57 100755 --- a/ivas_processing_scripts/audiotools/binauralobjectrenderer.py +++ b/ivas_processing_scripts/audiotools/binauralobjectrenderer.py @@ -542,7 +542,7 @@ def binaural_fftconv_framewise( repeat(indices_HRIR), ), None, - "mp", + None, False, ) @@ -607,7 +607,7 @@ def render_ear( repeat(N_HRIR_taps), ), None, - "mt", + None, False, ) diff --git a/ivas_processing_scripts/audiotools/convert/objectbased.py b/ivas_processing_scripts/audiotools/convert/objectbased.py index c6d0f1144768abc513b186d516961e7cf3ce0be4..083c14400c1e31b9e59d133113957a9d0c265879 100755 --- a/ivas_processing_scripts/audiotools/convert/objectbased.py +++ b/ivas_processing_scripts/audiotools/convert/objectbased.py @@ -135,7 +135,7 @@ def render_oba_to_binaural( repeat(SourcePosition), ), None, - "mt", + None, False, ) diff --git a/ivas_processing_scripts/generation/__init__.py b/ivas_processing_scripts/generation/__init__.py index 8ee7c026d64269a0de0d7988b9df7ea5424a66e4..53892744063538d89de3137b14c0fe3ddd7f1b06 100755 --- a/ivas_processing_scripts/generation/__init__.py +++ b/ivas_processing_scripts/generation/__init__.py @@ -31,7 +31,6 @@ # import logging -import os import yaml diff --git a/ivas_processing_scripts/processing/processing.py b/ivas_processing_scripts/processing/processing.py index 29797295cc205b6fb5b2ad726211fba53a3d693e..62cd2aea369d6a8b8774690f3098907734edf0e2 100755 --- a/ivas_processing_scripts/processing/processing.py +++ b/ivas_processing_scripts/processing/processing.py @@ -31,10 +31,13 @@ # import logging +import sys from abc import ABC, abstractmethod from itertools import repeat +from multiprocessing import Pool from pathlib import Path from shutil import copyfile +from time import sleep from typing import Iterable, Union from warnings import warn @@ -49,7 +52,7 @@ from ivas_processing_scripts.audiotools.metadata import ( ) from ivas_processing_scripts.constants import LOGGER_DATEFMT, LOGGER_FORMAT from ivas_processing_scripts.processing.config import TestConfig -from ivas_processing_scripts.utils import apply_func_parallel, list_audio, pairwise +from ivas_processing_scripts.utils import list_audio, pairwise, progressbar_update class Processing(ABC): @@ -234,9 +237,12 @@ def preprocess(cfg, logger): logger.info(f" Generating condition: {preprocessing['name']}") + # set up values for progress display and chunksize + count = len(cfg.items_list) + width = 80 + # run preprocessing - apply_func_parallel( - process_item, + args = list( zip( cfg.items_list, repeat(cfg.tmp_dirs[0]), @@ -244,11 +250,25 @@ def preprocess(cfg, logger): repeat(chain), repeat(logger), cfg.metadata_path, - ), - None, - "mp" if cfg.multiprocessing else None, + ) + ) + p = Pool() + results = p.starmap_async( + process_item, + args, ) + # poll progress + progressbar_update(0, count, width) + while not results.ready(): + progressbar_update(count - int(results._number_left), count, width) + sleep(0.1) + progressbar_update(count, count, width) + print("\n", flush=True, file=sys.stdout) + + p.close() + p.join() + # update the configuration to use preprocessing outputs as new inputs cfg.items_list = list_audio( cfg.out_dirs[0], select_list=getattr(cfg, "input_select", None) @@ -287,9 +307,12 @@ def preprocess_2(cfg, logger): if chain[0].concatenate_input: concat_setup(cfg, chain, logger) + # set up values for progress display and chunksize + count = len(cfg.items_list) + width = 80 + # run preprocessing 2 - apply_func_parallel( - process_item, + args = list( zip( cfg.items_list, repeat(cfg.tmp_dirs[0]), @@ -297,11 +320,25 @@ def preprocess_2(cfg, logger): repeat(chain), repeat(logger), cfg.metadata_path, - ), - None, - "mp" if cfg.multiprocessing else None, + ) + ) + p = Pool() + results = p.starmap_async( + process_item, + args, ) + # poll progress + progressbar_update(0, count, width) + while not results.ready(): + progressbar_update(count - int(results._number_left), count, width) + sleep(0.1) + progressbar_update(count, count, width) + print("\n", flush=True, file=sys.stdout) + + p.close() + p.join() + # update the configuration to use preprocessing 2 outputs as new inputs cfg.items_list = list_audio( cfg.out_dirs[0], select_list=getattr(cfg, "input_select", None) @@ -464,4 +501,3 @@ def preprocess_background_noise(cfg): ] = output_audio return - diff --git a/ivas_processing_scripts/utils.py b/ivas_processing_scripts/utils.py index b46a104e1535e8bdfe4bda5939c6be22f1621c91..1ad9319f7aa3b777cdd119f7ef50ac41cc82f6bd 100755 --- a/ivas_processing_scripts/utils.py +++ b/ivas_processing_scripts/utils.py @@ -268,23 +268,24 @@ def pairwise(iter): return zip(a, b) +def progressbar_update(progress, count, width): + fill = int(width * progress / count) + print( + f"{int(progress/count*100):3d}%{'|'}{'='*fill}{(' '*(width-fill))}{'|'}{progress:4d}/{count:4d}", + end="\r", + file=sys.stdout, + flush=True, + ) + + def progressbar(iter: Iterable, width=80): """simple unicode progressbar""" count = len(iter) - def update(progress): - fill = int(width * progress / count) - print( - f"{int(progress/count*100):3d}%{'|'}{'='*fill}{(' '*(width-fill))}{'|'}{progress}/{count}", - end="\r", - file=sys.stdout, - flush=True, - ) - - update(0) + progressbar_update(0, count, width) for i, item in enumerate(iter): yield item - update(i + 1) + progressbar_update(i + 1, count, width) print("\n", flush=True, file=sys.stdout)