From d700915dcbcc5537d9c77e06b9c7810fc341b3b8 Mon Sep 17 00:00:00 2001 From: Treffehn Date: Mon, 8 May 2023 12:49:06 +0200 Subject: [PATCH 01/14] tried to run conditions in parallel --- ivas_processing_scripts/__init__.py | 74 +++++++++++++++++++++-------- 1 file changed, 54 insertions(+), 20 deletions(-) diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index a16309e0..02f9e426 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -147,26 +147,39 @@ def main(args): preprocess_2(cfg, logger) # run conditions - for condition, out_dir, tmp_dir in zip( - cfg.proc_chains, cfg.out_dirs, cfg.tmp_dirs - ): - chain = condition["processes"] - - logger.info(f" Generating condition: {condition['name']}") - - apply_func_parallel( - process_item, - zip( - cfg.items_list, - repeat(tmp_dir), - repeat(out_dir), - repeat(chain), - repeat(logger), - cfg.metadata_path, - ), - None, - "mp" if cfg.multiprocessing else None, - ) + # for condition, out_dir, tmp_dir in zip( + # cfg.proc_chains, cfg.out_dirs, cfg.tmp_dirs + # ): + # chain = condition["processes"] + # + # logger.info(f" Generating condition: {condition['name']}") + # + # apply_func_parallel( + # process_item, + # zip( + # cfg.items_list, + # repeat(tmp_dir), + # repeat(out_dir), + # repeat(chain), + # repeat(logger), + # cfg.metadata_path, + # ), + # None, + # "mp" if cfg.multiprocessing else None, + # ) + + apply_func_parallel( + process_item_parallel, + zip( + repeat(cfg), + cfg.proc_chains, + cfg.tmp_dirs, + cfg.out_dirs, + repeat(logger) + ), + None, + "mp" if cfg.multiprocessing else None, + ) if hasattr(cfg, "preprocessing_2"): reverse_process_2(cfg, logger) @@ -174,3 +187,24 @@ def main(args): # copy configuration to output directory with open(cfg.output_path.joinpath(f"{cfg.name}.yml"), "w") as f: yaml.safe_dump(cfg._yaml_dump, f) + + +def process_item_parallel(cfg, condition, tmp_dir, out_dir, logger): + + chain = condition["processes"] + + logger.info(f" Generating condition: {condition['name']}") + + apply_func_parallel( + process_item, + zip( + cfg.items_list, + repeat(tmp_dir), + repeat(out_dir), + repeat(chain), + repeat(logger), + cfg.metadata_path, + ), + None, + "mp" if cfg.multiprocessing else None, + ) \ No newline at end of file -- GitLab From 278766838ab8e39d49085dcdc740d5dfe8504690 Mon Sep 17 00:00:00 2001 From: Treffehn Date: Mon, 8 May 2023 14:04:18 +0200 Subject: [PATCH 02/14] added different multiprocessing keys --- examples/TEMPLATE.yml | 5 ++++- ivas_processing_scripts/__init__.py | 26 ++------------------------ ivas_processing_scripts/constants.py | 2 ++ 3 files changed, 8 insertions(+), 25 deletions(-) diff --git a/examples/TEMPLATE.yml b/examples/TEMPLATE.yml index 09881a3b..2b90e4dc 100755 --- a/examples/TEMPLATE.yml +++ b/examples/TEMPLATE.yml @@ -9,8 +9,11 @@ ### git commit SHA; default = git rev-parse HEAD # git_sha: abc123 -### Whether to use multiprocessing; default = true +### Whether to use multiprocessing in general; default = true # multiprocessing: false +### Whether to use multiprocessing in main loop for items and/or conditions; default = true +# multiprocessing_items: false +# multiprocessing_conditions: false ### Deletion of temporary directories containing ### intermediate processing files, bitstreams etc.; default = false # delete_tmp: true diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index 02f9e426..f13ee0bf 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -146,28 +146,6 @@ def main(args): # preprocess 2 preprocess_2(cfg, logger) - # run conditions - # for condition, out_dir, tmp_dir in zip( - # cfg.proc_chains, cfg.out_dirs, cfg.tmp_dirs - # ): - # chain = condition["processes"] - # - # logger.info(f" Generating condition: {condition['name']}") - # - # apply_func_parallel( - # process_item, - # zip( - # cfg.items_list, - # repeat(tmp_dir), - # repeat(out_dir), - # repeat(chain), - # repeat(logger), - # cfg.metadata_path, - # ), - # None, - # "mp" if cfg.multiprocessing else None, - # ) - apply_func_parallel( process_item_parallel, zip( @@ -178,7 +156,7 @@ def main(args): repeat(logger) ), None, - "mp" if cfg.multiprocessing else None, + "mp" if cfg.multiprocessing_conditions else None, ) if hasattr(cfg, "preprocessing_2"): @@ -206,5 +184,5 @@ def process_item_parallel(cfg, condition, tmp_dir, out_dir, logger): cfg.metadata_path, ), None, - "mp" if cfg.multiprocessing else None, + "mp" if cfg.multiprocessing_items else None, ) \ No newline at end of file diff --git a/ivas_processing_scripts/constants.py b/ivas_processing_scripts/constants.py index f89e8589..d6457f95 100755 --- a/ivas_processing_scripts/constants.py +++ b/ivas_processing_scripts/constants.py @@ -58,6 +58,8 @@ DEFAULT_CONFIG = { "date": f"{datetime.now().strftime('%Y%m%d_%H.%M.%S')}", "git_sha": f"{get_gitsha()}", "multiprocessing": True, + "multiprocessing_items": True, + "multiprocessing_conditions": True, "delete_tmp": False, "master_seed": 0, "metadata_path": None, -- GitLab From a45a6b0aade36d700026e00dbaa1c8c8f515b01f Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Mon, 12 Jun 2023 11:17:41 +0200 Subject: [PATCH 03/14] [revert] d700915d and 278766838 --- examples/TEMPLATE.yml | 5 +-- ivas_processing_scripts/__init__.py | 53 +++++++++++----------------- ivas_processing_scripts/constants.py | 2 -- 3 files changed, 22 insertions(+), 38 deletions(-) diff --git a/examples/TEMPLATE.yml b/examples/TEMPLATE.yml index 305614af..89415cee 100755 --- a/examples/TEMPLATE.yml +++ b/examples/TEMPLATE.yml @@ -9,11 +9,8 @@ ### git commit SHA; default = git rev-parse HEAD # git_sha: abc123 -### Whether to use multiprocessing in general; default = true +### Whether to use multiprocessing; default = true # multiprocessing: false -### Whether to use multiprocessing in main loop for items and/or conditions; default = true -# multiprocessing_items: false -# multiprocessing_conditions: false ### Deletion of temporary directories containing ### intermediate processing files, bitstreams etc.; default = false # delete_tmp: true diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index d808dd91..15424a3a 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -147,38 +147,27 @@ def main(args): # preprocess 2 preprocess_2(cfg, logger) - apply_func_parallel( - process_item_parallel, - zip( - repeat(cfg), - cfg.proc_chains, - cfg.tmp_dirs, - cfg.out_dirs, - repeat(logger) - ), - None, - "mp" if cfg.multiprocessing_conditions else None, - ) + # run conditions + for condition, out_dir, tmp_dir in zip( + cfg.proc_chains, cfg.out_dirs, cfg.tmp_dirs + ): + chain = condition["processes"] + + logger.info(f" Generating condition: {condition['name']}") + + apply_func_parallel( + process_item, + zip( + cfg.items_list, + repeat(tmp_dir), + repeat(out_dir), + repeat(chain), + repeat(logger), + cfg.metadata_path, + ), + None, + "mp" if cfg.multiprocessing else None, + ) # copy configuration to output directory cfg.to_file(cfg.output_path.joinpath(f"{cfg.name}.yml")) - -def process_item_parallel(cfg, condition, tmp_dir, out_dir, logger): - - chain = condition["processes"] - - logger.info(f" Generating condition: {condition['name']}") - - apply_func_parallel( - process_item, - zip( - cfg.items_list, - repeat(tmp_dir), - repeat(out_dir), - repeat(chain), - repeat(logger), - cfg.metadata_path, - ), - None, - "mp" if cfg.multiprocessing_items else None, - ) \ No newline at end of file diff --git a/ivas_processing_scripts/constants.py b/ivas_processing_scripts/constants.py index 190d691d..5cc47609 100755 --- a/ivas_processing_scripts/constants.py +++ b/ivas_processing_scripts/constants.py @@ -58,8 +58,6 @@ DEFAULT_CONFIG = { "date": f"{datetime.now().strftime('%Y%m%d_%H.%M.%S')}", "git_sha": f"{get_gitsha()}", "multiprocessing": True, - "multiprocessing_items": True, - "multiprocessing_conditions": True, "use_windows_codec_binaries": False, "delete_tmp": False, "master_seed": 0, -- GitLab From 0a76ac396e3099a2db98f257cfda1156f596e3bc Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Mon, 12 Jun 2023 11:18:49 +0200 Subject: [PATCH 04/14] parallelise conditions instead of items --- ivas_processing_scripts/__init__.py | 31 +++++----------- .../processing/processing.py | 37 ++++++++++++++++++- 2 files changed, 46 insertions(+), 22 deletions(-) diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index 15424a3a..ea735648 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -44,6 +44,7 @@ from ivas_processing_scripts.processing.processing import ( preprocess, preprocess_2, preprocess_background_noise, + process_condition, process_item, reorder_items_list, ) @@ -147,27 +148,15 @@ def main(args): # preprocess 2 preprocess_2(cfg, logger) - # run conditions - for condition, out_dir, tmp_dir in zip( - cfg.proc_chains, cfg.out_dirs, cfg.tmp_dirs - ): - chain = condition["processes"] - - logger.info(f" Generating condition: {condition['name']}") - - apply_func_parallel( - process_item, - zip( - cfg.items_list, - repeat(tmp_dir), - repeat(out_dir), - repeat(chain), - repeat(logger), - cfg.metadata_path, - ), - None, - "mp" if cfg.multiprocessing else None, - ) + # parallel processing of conditions + apply_func_parallel( + process_condition, + zip( + repeat(cfg), cfg.proc_chains, cfg.tmp_dirs, cfg.out_dirs, repeat(logger) + ), + None, + "mp" if cfg.multiprocessing else None, + ) # copy configuration to output directory cfg.to_file(cfg.output_path.joinpath(f"{cfg.name}.yml")) diff --git a/ivas_processing_scripts/processing/processing.py b/ivas_processing_scripts/processing/processing.py index 29797295..d4f95301 100755 --- a/ivas_processing_scripts/processing/processing.py +++ b/ivas_processing_scripts/processing/processing.py @@ -402,6 +402,42 @@ def process_item( copyfile(ppm, out_meta[idx]) +def process_condition( + cfg: TestConfig, + condition: dict, + tmp_dir: Union[str, Path], + out_dir: Union[str, Path], + logger: logging.Logger, +): + chain = condition["processes"] + + logger.info(f" Generating condition: {condition['name']}") + + for item, metadata in zip(cfg.items_list, cfg.metadata_path): + process_item( + item, + tmp_dir, + out_dir, + chain, + logger, + metadata, + ) + # apply_func_parallel( + # process_item, + # zip( + # cfg.items_list, + # repeat(tmp_dir), + # repeat(out_dir), + # repeat(chain), + # repeat(logger), + # cfg.metadata_path, + # ), + # None, + # "mp" if cfg.multiprocessing else None, + # ) + # + + def remove_pre_and_postamble( x, out_fmt, fs, repeat_signal, preamble_len_ms, postamble_len_ms, meta, logger ): @@ -464,4 +500,3 @@ def preprocess_background_noise(cfg): ] = output_audio return - -- GitLab From c3ade900103e44dac1f6b8b2ab2adccdf1e6ef68 Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Mon, 12 Jun 2023 11:35:40 +0200 Subject: [PATCH 05/14] [fix] lint --- ivas_processing_scripts/__init__.py | 1 - ivas_processing_scripts/generation/__init__.py | 1 - 2 files changed, 2 deletions(-) diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index ea735648..228950af 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -45,7 +45,6 @@ from ivas_processing_scripts.processing.processing import ( preprocess_2, preprocess_background_noise, process_condition, - process_item, reorder_items_list, ) from ivas_processing_scripts.utils import DirManager, apply_func_parallel diff --git a/ivas_processing_scripts/generation/__init__.py b/ivas_processing_scripts/generation/__init__.py index 8ee7c026..53892744 100755 --- a/ivas_processing_scripts/generation/__init__.py +++ b/ivas_processing_scripts/generation/__init__.py @@ -31,7 +31,6 @@ # import logging -import os import yaml -- GitLab From ec42815c7cab62b4e38573f9226622599f7fa28c Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Mon, 12 Jun 2023 12:41:31 +0200 Subject: [PATCH 06/14] attempt to pick between item and condition parallelisation based on test setup --- ivas_processing_scripts/__init__.py | 13 ++++-- .../processing/processing.py | 46 ++++++++++--------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index 228950af..eacd0e63 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -45,6 +45,7 @@ from ivas_processing_scripts.processing.processing import ( preprocess_2, preprocess_background_noise, process_condition, + process_item, reorder_items_list, ) from ivas_processing_scripts.utils import DirManager, apply_func_parallel @@ -147,14 +148,20 @@ def main(args): # preprocess 2 preprocess_2(cfg, logger) - # parallel processing of conditions + # attempt to parallelise either items or conditions based on test setup + parallelise_items = len(cfg.items_list) > len(cfg.proc_chains) apply_func_parallel( process_condition, zip( - repeat(cfg), cfg.proc_chains, cfg.tmp_dirs, cfg.out_dirs, repeat(logger) + repeat(cfg), + cfg.proc_chains, + cfg.tmp_dirs, + cfg.out_dirs, + repeat(logger), + repeat(parallelise_items), ), None, - "mp" if cfg.multiprocessing else None, + "mp" if (not parallelise_items and cfg.multiprocessing) else None, ) # copy configuration to output directory diff --git a/ivas_processing_scripts/processing/processing.py b/ivas_processing_scripts/processing/processing.py index d4f95301..23be6073 100755 --- a/ivas_processing_scripts/processing/processing.py +++ b/ivas_processing_scripts/processing/processing.py @@ -408,34 +408,36 @@ def process_condition( tmp_dir: Union[str, Path], out_dir: Union[str, Path], logger: logging.Logger, + parallelise_items: bool = False, ): chain = condition["processes"] logger.info(f" Generating condition: {condition['name']}") - for item, metadata in zip(cfg.items_list, cfg.metadata_path): - process_item( - item, - tmp_dir, - out_dir, - chain, - logger, - metadata, + if parallelise_items: + apply_func_parallel( + process_item, + zip( + cfg.items_list, + repeat(tmp_dir), + repeat(out_dir), + repeat(chain), + repeat(logger), + cfg.metadata_path, + ), + None, + "mp" if cfg.multiprocessing else None, ) - # apply_func_parallel( - # process_item, - # zip( - # cfg.items_list, - # repeat(tmp_dir), - # repeat(out_dir), - # repeat(chain), - # repeat(logger), - # cfg.metadata_path, - # ), - # None, - # "mp" if cfg.multiprocessing else None, - # ) - # + else: + for item, metadata in zip(cfg.items_list, cfg.metadata_path): + process_item( + item, + tmp_dir, + out_dir, + chain, + logger, + metadata, + ) def remove_pre_and_postamble( -- GitLab From e0a531a22bd3b4e2ca28d77942adbb34139cff07 Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Mon, 12 Jun 2023 12:44:54 +0200 Subject: [PATCH 07/14] [fix] lint again --- ivas_processing_scripts/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index eacd0e63..abeb9f4f 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -45,7 +45,6 @@ from ivas_processing_scripts.processing.processing import ( preprocess_2, preprocess_background_noise, process_condition, - process_item, reorder_items_list, ) from ivas_processing_scripts.utils import DirManager, apply_func_parallel -- GitLab From 6124103f6abfedb1fd23071d96f054bf2818ea00 Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Tue, 13 Jun 2023 12:40:07 +0200 Subject: [PATCH 08/14] [fix] try to remove inner forks and switch to multiprocessing.Pool.starmap() instead of ProcessPoolExecutor.submit() --- ivas_processing_scripts/__init__.py | 32 ++++---- .../audiotools/binauralobjectrenderer.py | 8 +- .../audiotools/convert/objectbased.py | 4 +- .../processing/processing.py | 74 ++++++++----------- 4 files changed, 54 insertions(+), 64 deletions(-) diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index abeb9f4f..f96935e2 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -32,6 +32,7 @@ import logging from itertools import repeat +from multiprocessing import Pool from ivas_processing_scripts.audiotools.metadata import check_ISM_metadata from ivas_processing_scripts.constants import ( @@ -47,7 +48,7 @@ from ivas_processing_scripts.processing.processing import ( process_condition, reorder_items_list, ) -from ivas_processing_scripts.utils import DirManager, apply_func_parallel +from ivas_processing_scripts.utils import DirManager def logging_init(args, cfg): @@ -117,7 +118,7 @@ def main(args): metadata_str = [] for o in range(len(metadata[i])): metadata_str.append(str(metadata[i][o])) - logger.info( + logger.debug( f" ISM metadata files item {cfg.items_list[i]}: {', '.join(metadata_str)}" ) @@ -148,20 +149,19 @@ def main(args): preprocess_2(cfg, logger) # attempt to parallelise either items or conditions based on test setup - parallelise_items = len(cfg.items_list) > len(cfg.proc_chains) - apply_func_parallel( - process_condition, - zip( - repeat(cfg), - cfg.proc_chains, - cfg.tmp_dirs, - cfg.out_dirs, - repeat(logger), - repeat(parallelise_items), - ), - None, - "mp" if (not parallelise_items and cfg.multiprocessing) else None, - ) + # parallelise_items = len(cfg.items_list) > len(cfg.proc_chains) + with Pool() as p: + p.starmap( + process_condition, + zip( + repeat(cfg), + cfg.proc_chains, + cfg.tmp_dirs, + cfg.out_dirs, + repeat(logger), + ), + chunksize=8, + ) # copy configuration to output directory cfg.to_file(cfg.output_path.joinpath(f"{cfg.name}.yml")) diff --git a/ivas_processing_scripts/audiotools/binauralobjectrenderer.py b/ivas_processing_scripts/audiotools/binauralobjectrenderer.py index efc0bae7..8f9d246c 100755 --- a/ivas_processing_scripts/audiotools/binauralobjectrenderer.py +++ b/ivas_processing_scripts/audiotools/binauralobjectrenderer.py @@ -542,8 +542,8 @@ def binaural_fftconv_framewise( repeat(indices_HRIR), ), None, - "mp", - False, + None, + False ) y = np.stack(result, axis=1) @@ -607,8 +607,8 @@ def render_ear( repeat(N_HRIR_taps), ), None, - "mt", - False, + None, + False ) return np.hstack(result) diff --git a/ivas_processing_scripts/audiotools/convert/objectbased.py b/ivas_processing_scripts/audiotools/convert/objectbased.py index c6d0f114..18f3162a 100755 --- a/ivas_processing_scripts/audiotools/convert/objectbased.py +++ b/ivas_processing_scripts/audiotools/convert/objectbased.py @@ -135,8 +135,8 @@ def render_oba_to_binaural( repeat(SourcePosition), ), None, - "mt", - False, + None, + False ) # sum results over all objects diff --git a/ivas_processing_scripts/processing/processing.py b/ivas_processing_scripts/processing/processing.py index 23be6073..4f045a3f 100755 --- a/ivas_processing_scripts/processing/processing.py +++ b/ivas_processing_scripts/processing/processing.py @@ -33,6 +33,7 @@ import logging from abc import ABC, abstractmethod from itertools import repeat +from multiprocessing import Pool from pathlib import Path from shutil import copyfile from typing import Iterable, Union @@ -49,7 +50,7 @@ from ivas_processing_scripts.audiotools.metadata import ( ) from ivas_processing_scripts.constants import LOGGER_DATEFMT, LOGGER_FORMAT from ivas_processing_scripts.processing.config import TestConfig -from ivas_processing_scripts.utils import apply_func_parallel, list_audio, pairwise +from ivas_processing_scripts.utils import list_audio, pairwise class Processing(ABC): @@ -235,19 +236,19 @@ def preprocess(cfg, logger): logger.info(f" Generating condition: {preprocessing['name']}") # run preprocessing - apply_func_parallel( - process_item, - zip( - cfg.items_list, - repeat(cfg.tmp_dirs[0]), - repeat(cfg.out_dirs[0]), - repeat(chain), - repeat(logger), - cfg.metadata_path, - ), - None, - "mp" if cfg.multiprocessing else None, - ) + with Pool() as p: + p.starmap( + process_item, + zip( + cfg.items_list, + repeat(cfg.tmp_dirs[0]), + repeat(cfg.out_dirs[0]), + repeat(chain), + repeat(logger), + cfg.metadata_path, + ), + chunksize=8, + ) # update the configuration to use preprocessing outputs as new inputs cfg.items_list = list_audio( @@ -288,19 +289,19 @@ def preprocess_2(cfg, logger): concat_setup(cfg, chain, logger) # run preprocessing 2 - apply_func_parallel( - process_item, - zip( - cfg.items_list, - repeat(cfg.tmp_dirs[0]), - repeat(cfg.out_dirs[0]), - repeat(chain), - repeat(logger), - cfg.metadata_path, - ), - None, - "mp" if cfg.multiprocessing else None, - ) + with Pool() as p: + p.starmap( + process_item, + zip( + cfg.items_list, + repeat(cfg.tmp_dirs[0]), + repeat(cfg.out_dirs[0]), + repeat(chain), + repeat(logger), + cfg.metadata_path, + ), + chunksize=8, + ) # update the configuration to use preprocessing 2 outputs as new inputs cfg.items_list = list_audio( @@ -408,14 +409,14 @@ def process_condition( tmp_dir: Union[str, Path], out_dir: Union[str, Path], logger: logging.Logger, - parallelise_items: bool = False, + # parallelise_items: bool = False, ): chain = condition["processes"] logger.info(f" Generating condition: {condition['name']}") - if parallelise_items: - apply_func_parallel( + with Pool() as p: + p.starmap( process_item, zip( cfg.items_list, @@ -425,19 +426,8 @@ def process_condition( repeat(logger), cfg.metadata_path, ), - None, - "mp" if cfg.multiprocessing else None, + chunksize=8, ) - else: - for item, metadata in zip(cfg.items_list, cfg.metadata_path): - process_item( - item, - tmp_dir, - out_dir, - chain, - logger, - metadata, - ) def remove_pre_and_postamble( -- GitLab From 2548b4021e58c718f1a37979006c8c1e3494b70a Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Tue, 13 Jun 2023 13:13:34 +0200 Subject: [PATCH 09/14] add change to generate_test.py too --- generate_test.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/generate_test.py b/generate_test.py index c187dbc2..4724c822 100644 --- a/generate_test.py +++ b/generate_test.py @@ -31,11 +31,11 @@ # import argparse +from multiprocessing import Pool from pathlib import Path from ivas_processing_scripts import config from ivas_processing_scripts import main as generate_test -from ivas_processing_scripts.utils import apply_func_parallel HERE = Path(__file__).parent.absolute() EXPERIMENTS_P800 = [f"P800-{i}" for i in range(1, 10)] @@ -64,7 +64,12 @@ def generate_tests(exp_lab_pairs, run_parallel=True, create_cfg_only=False): return args = [Arguments(str(cfg)) for cfg in cfgs] - apply_func_parallel(generate_test, zip(args), type="mp" if run_parallel else None) + if run_parallel: + with Pool() as p: + p.starmap(generate_test, zip(args), chunksize=8) + else: + map(generate_test, args) + # apply_func_parallel(generate_test, zip(args), type="mp" if run_parallel else None) class Arguments: -- GitLab From fb531a80f8dcfe0c7e0241d314faf5cfd958e05f Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Tue, 13 Jun 2023 15:42:35 +0200 Subject: [PATCH 10/14] [fix] add a progressbar and dispatch all jobs to the same pool --- generate_test.py | 11 +--- ivas_processing_scripts/__init__.py | 56 +++++++++++++------ .../audiotools/binauralobjectrenderer.py | 4 +- .../audiotools/convert/objectbased.py | 2 +- ivas_processing_scripts/utils.py | 23 ++++---- 5 files changed, 58 insertions(+), 38 deletions(-) diff --git a/generate_test.py b/generate_test.py index 4724c822..ebd403c8 100644 --- a/generate_test.py +++ b/generate_test.py @@ -31,11 +31,11 @@ # import argparse -from multiprocessing import Pool from pathlib import Path from ivas_processing_scripts import config from ivas_processing_scripts import main as generate_test +from ivas_processing_scripts.utils import apply_func_parallel HERE = Path(__file__).parent.absolute() EXPERIMENTS_P800 = [f"P800-{i}" for i in range(1, 10)] @@ -64,12 +64,7 @@ def generate_tests(exp_lab_pairs, run_parallel=True, create_cfg_only=False): return args = [Arguments(str(cfg)) for cfg in cfgs] - if run_parallel: - with Pool() as p: - p.starmap(generate_test, zip(args), chunksize=8) - else: - map(generate_test, args) - # apply_func_parallel(generate_test, zip(args), type="mp" if run_parallel else None) + apply_func_parallel(generate_test, zip(args), None) class Arguments: @@ -77,7 +72,7 @@ class Arguments: self.config = config self.debug = False # used to overwrite the multiprocessing key in the configs to rather parallelize on category level - self.multiprocessing = False + self.multiprocessing = True def create_experiment_setup(experiment, lab) -> list[Path]: diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index f96935e2..40b10c3a 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -31,8 +31,9 @@ # import logging -from itertools import repeat +from itertools import product from multiprocessing import Pool +from time import sleep from ivas_processing_scripts.audiotools.metadata import check_ISM_metadata from ivas_processing_scripts.constants import ( @@ -45,10 +46,14 @@ from ivas_processing_scripts.processing.processing import ( preprocess, preprocess_2, preprocess_background_noise, - process_condition, + process_item, reorder_items_list, ) -from ivas_processing_scripts.utils import DirManager +from ivas_processing_scripts.utils import ( + DirManager, + apply_func_parallel, + progressbar_update, +) def logging_init(args, cfg): @@ -148,20 +153,39 @@ def main(args): # preprocess 2 preprocess_2(cfg, logger) - # attempt to parallelise either items or conditions based on test setup - # parallelise_items = len(cfg.items_list) > len(cfg.proc_chains) - with Pool() as p: - p.starmap( - process_condition, - zip( - repeat(cfg), - cfg.proc_chains, - cfg.tmp_dirs, - cfg.out_dirs, - repeat(logger), - ), - chunksize=8, + # assemble a list of all item and condition combinations + item_args = list() + for (chain, tmp_dir, out_dir), (item, metadata) in product( + zip(cfg.proc_chains, cfg.tmp_dirs, cfg.out_dirs), + zip(cfg.items_list, cfg.metadata_path), + ): + item_args.append( + (item, tmp_dir, out_dir, chain["processes"], logger, metadata) + ) + + if cfg.multiprocessing: + p = Pool() + chunksize = 8 + results = p.starmap_async( + process_item, + item_args, + chunksize, ) + width = 80 + count = len(item_args) + + progressbar_update(0, count, width) + while not results.ready(): + progressbar_update( + count - (results._number_left * chunksize), count, width + ) + sleep(0.1) + + p.close() + p.join() + + else: + apply_func_parallel(process_item, item_args, None, None, True) # copy configuration to output directory cfg.to_file(cfg.output_path.joinpath(f"{cfg.name}.yml")) diff --git a/ivas_processing_scripts/audiotools/binauralobjectrenderer.py b/ivas_processing_scripts/audiotools/binauralobjectrenderer.py index 8f9d246c..dda742ba 100755 --- a/ivas_processing_scripts/audiotools/binauralobjectrenderer.py +++ b/ivas_processing_scripts/audiotools/binauralobjectrenderer.py @@ -543,7 +543,7 @@ def binaural_fftconv_framewise( ), None, None, - False + False, ) y = np.stack(result, axis=1) @@ -608,7 +608,7 @@ def render_ear( ), None, None, - False + False, ) return np.hstack(result) diff --git a/ivas_processing_scripts/audiotools/convert/objectbased.py b/ivas_processing_scripts/audiotools/convert/objectbased.py index 18f3162a..083c1440 100755 --- a/ivas_processing_scripts/audiotools/convert/objectbased.py +++ b/ivas_processing_scripts/audiotools/convert/objectbased.py @@ -136,7 +136,7 @@ def render_oba_to_binaural( ), None, None, - False + False, ) # sum results over all objects diff --git a/ivas_processing_scripts/utils.py b/ivas_processing_scripts/utils.py index b46a104e..8ec5de45 100755 --- a/ivas_processing_scripts/utils.py +++ b/ivas_processing_scripts/utils.py @@ -268,23 +268,24 @@ def pairwise(iter): return zip(a, b) +def progressbar_update(progress, count, width): + fill = int(width * progress / count) + print( + f"{int(progress/count*100):3d}%{'|'}{'='*fill}{(' '*(width-fill))}{'|'}{progress}/{count}", + end="\r", + file=sys.stdout, + flush=True, + ) + + def progressbar(iter: Iterable, width=80): """simple unicode progressbar""" count = len(iter) - def update(progress): - fill = int(width * progress / count) - print( - f"{int(progress/count*100):3d}%{'|'}{'='*fill}{(' '*(width-fill))}{'|'}{progress}/{count}", - end="\r", - file=sys.stdout, - flush=True, - ) - - update(0) + progressbar_update(0, count, width) for i, item in enumerate(iter): yield item - update(i + 1) + progressbar_update(i + 1, count, width) print("\n", flush=True, file=sys.stdout) -- GitLab From 83991a54156da6a3dd646ae2fce0aae725091221 Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Tue, 13 Jun 2023 15:55:20 +0200 Subject: [PATCH 11/14] cleanup and formatting --- ivas_processing_scripts/__init__.py | 14 +++++++++----- ivas_processing_scripts/utils.py | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index 40b10c3a..00e1a25a 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -32,7 +32,7 @@ import logging from itertools import product -from multiprocessing import Pool +from multiprocessing import Pool, cpu_count from time import sleep from ivas_processing_scripts.audiotools.metadata import check_ISM_metadata @@ -164,20 +164,24 @@ def main(args): ) if cfg.multiprocessing: + # set up values for progress display and chunksize + count = len(item_args) + width = 80 + chunksize = count / cpu_count() + + # submit tasks to the pool p = Pool() - chunksize = 8 results = p.starmap_async( process_item, item_args, chunksize, ) - width = 80 - count = len(item_args) + # poll progress progressbar_update(0, count, width) while not results.ready(): progressbar_update( - count - (results._number_left * chunksize), count, width + count - int(results._number_left * chunksize), count, width ) sleep(0.1) diff --git a/ivas_processing_scripts/utils.py b/ivas_processing_scripts/utils.py index 8ec5de45..1ad9319f 100755 --- a/ivas_processing_scripts/utils.py +++ b/ivas_processing_scripts/utils.py @@ -271,7 +271,7 @@ def pairwise(iter): def progressbar_update(progress, count, width): fill = int(width * progress / count) print( - f"{int(progress/count*100):3d}%{'|'}{'='*fill}{(' '*(width-fill))}{'|'}{progress}/{count}", + f"{int(progress/count*100):3d}%{'|'}{'='*fill}{(' '*(width-fill))}{'|'}{progress:4d}/{count:4d}", end="\r", file=sys.stdout, flush=True, -- GitLab From 5e2037e603e356b099239824018f056f9f955be0 Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Tue, 13 Jun 2023 16:06:30 +0200 Subject: [PATCH 12/14] apply changes for preprocessing as well --- generate_test.py | 4 +- .../processing/processing.py | 85 +++++++++++++------ 2 files changed, 58 insertions(+), 31 deletions(-) diff --git a/generate_test.py b/generate_test.py index ebd403c8..f5490a06 100644 --- a/generate_test.py +++ b/generate_test.py @@ -49,7 +49,7 @@ IN_FMT_FOR_MASA_EXPS = { } -def generate_tests(exp_lab_pairs, run_parallel=True, create_cfg_only=False): +def generate_tests(exp_lab_pairs, create_cfg_only=False): """ Create configs and run them for all given experiment/lab pairs """ @@ -71,8 +71,6 @@ class Arguments: def __init__(self, config): self.config = config self.debug = False - # used to overwrite the multiprocessing key in the configs to rather parallelize on category level - self.multiprocessing = True def create_experiment_setup(experiment, lab) -> list[Path]: diff --git a/ivas_processing_scripts/processing/processing.py b/ivas_processing_scripts/processing/processing.py index 4f045a3f..3baf612e 100755 --- a/ivas_processing_scripts/processing/processing.py +++ b/ivas_processing_scripts/processing/processing.py @@ -33,9 +33,10 @@ import logging from abc import ABC, abstractmethod from itertools import repeat -from multiprocessing import Pool +from multiprocessing import Pool, cpu_count from pathlib import Path from shutil import copyfile +from time import sleep from typing import Iterable, Union from warnings import warn @@ -50,7 +51,7 @@ from ivas_processing_scripts.audiotools.metadata import ( ) from ivas_processing_scripts.constants import LOGGER_DATEFMT, LOGGER_FORMAT from ivas_processing_scripts.processing.config import TestConfig -from ivas_processing_scripts.utils import list_audio, pairwise +from ivas_processing_scripts.utils import list_audio, pairwise, progressbar_update class Processing(ABC): @@ -235,20 +236,34 @@ def preprocess(cfg, logger): logger.info(f" Generating condition: {preprocessing['name']}") + # set up values for progress display and chunksize + count = len(cfg.items_list) + width = 80 + chunksize = count / cpu_count() + # run preprocessing - with Pool() as p: - p.starmap( - process_item, - zip( - cfg.items_list, - repeat(cfg.tmp_dirs[0]), - repeat(cfg.out_dirs[0]), - repeat(chain), - repeat(logger), - cfg.metadata_path, - ), - chunksize=8, - ) + p = Pool() + results = p.starmap_async( + process_item, + zip( + cfg.items_list, + repeat(cfg.tmp_dirs[0]), + repeat(cfg.out_dirs[0]), + repeat(chain), + repeat(logger), + cfg.metadata_path, + ), + chunksize=len(cfg.items_list) / cpu_count(), + ) + + # poll progress + progressbar_update(0, count, width) + while not results.ready(): + progressbar_update(count - int(results._number_left * chunksize), count, width) + sleep(0.1) + + p.close() + p.join() # update the configuration to use preprocessing outputs as new inputs cfg.items_list = list_audio( @@ -288,20 +303,34 @@ def preprocess_2(cfg, logger): if chain[0].concatenate_input: concat_setup(cfg, chain, logger) + # set up values for progress display and chunksize + count = len(cfg.items_list) + width = 80 + chunksize = count / cpu_count() + # run preprocessing 2 - with Pool() as p: - p.starmap( - process_item, - zip( - cfg.items_list, - repeat(cfg.tmp_dirs[0]), - repeat(cfg.out_dirs[0]), - repeat(chain), - repeat(logger), - cfg.metadata_path, - ), - chunksize=8, - ) + p = Pool() + results = p.starmap_async( + process_item, + zip( + cfg.items_list, + repeat(cfg.tmp_dirs[0]), + repeat(cfg.out_dirs[0]), + repeat(chain), + repeat(logger), + cfg.metadata_path, + ), + chunksize=len(cfg.items_list) / cpu_count(), + ) + + # poll progress + progressbar_update(0, count, width) + while not results.ready(): + progressbar_update(count - int(results._number_left * chunksize), count, width) + sleep(0.1) + + p.close() + p.join() # update the configuration to use preprocessing 2 outputs as new inputs cfg.items_list = list_audio( -- GitLab From 7113d28772c8c43f433e3777c565e8d3b64ed66d Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Tue, 13 Jun 2023 16:28:44 +0200 Subject: [PATCH 13/14] remove chunksize parameter --- ivas_processing_scripts/__init__.py | 11 ++-- .../processing/processing.py | 64 +++++++------------ 2 files changed, 27 insertions(+), 48 deletions(-) diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index 00e1a25a..4b508fc7 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -31,8 +31,9 @@ # import logging +import sys from itertools import product -from multiprocessing import Pool, cpu_count +from multiprocessing import Pool from time import sleep from ivas_processing_scripts.audiotools.metadata import check_ISM_metadata @@ -167,23 +168,21 @@ def main(args): # set up values for progress display and chunksize count = len(item_args) width = 80 - chunksize = count / cpu_count() # submit tasks to the pool p = Pool() results = p.starmap_async( process_item, item_args, - chunksize, ) # poll progress progressbar_update(0, count, width) while not results.ready(): - progressbar_update( - count - int(results._number_left * chunksize), count, width - ) + progressbar_update(count - int(results._number_left), count, width) sleep(0.1) + progressbar_update(count, count, width) + print("\n", flush=True, file=sys.stdout) p.close() p.join() diff --git a/ivas_processing_scripts/processing/processing.py b/ivas_processing_scripts/processing/processing.py index 3baf612e..62cd2aea 100755 --- a/ivas_processing_scripts/processing/processing.py +++ b/ivas_processing_scripts/processing/processing.py @@ -31,9 +31,10 @@ # import logging +import sys from abc import ABC, abstractmethod from itertools import repeat -from multiprocessing import Pool, cpu_count +from multiprocessing import Pool from pathlib import Path from shutil import copyfile from time import sleep @@ -239,12 +240,9 @@ def preprocess(cfg, logger): # set up values for progress display and chunksize count = len(cfg.items_list) width = 80 - chunksize = count / cpu_count() # run preprocessing - p = Pool() - results = p.starmap_async( - process_item, + args = list( zip( cfg.items_list, repeat(cfg.tmp_dirs[0]), @@ -252,15 +250,21 @@ def preprocess(cfg, logger): repeat(chain), repeat(logger), cfg.metadata_path, - ), - chunksize=len(cfg.items_list) / cpu_count(), + ) + ) + p = Pool() + results = p.starmap_async( + process_item, + args, ) # poll progress progressbar_update(0, count, width) while not results.ready(): - progressbar_update(count - int(results._number_left * chunksize), count, width) + progressbar_update(count - int(results._number_left), count, width) sleep(0.1) + progressbar_update(count, count, width) + print("\n", flush=True, file=sys.stdout) p.close() p.join() @@ -306,12 +310,9 @@ def preprocess_2(cfg, logger): # set up values for progress display and chunksize count = len(cfg.items_list) width = 80 - chunksize = count / cpu_count() # run preprocessing 2 - p = Pool() - results = p.starmap_async( - process_item, + args = list( zip( cfg.items_list, repeat(cfg.tmp_dirs[0]), @@ -319,15 +320,21 @@ def preprocess_2(cfg, logger): repeat(chain), repeat(logger), cfg.metadata_path, - ), - chunksize=len(cfg.items_list) / cpu_count(), + ) + ) + p = Pool() + results = p.starmap_async( + process_item, + args, ) # poll progress progressbar_update(0, count, width) while not results.ready(): - progressbar_update(count - int(results._number_left * chunksize), count, width) + progressbar_update(count - int(results._number_left), count, width) sleep(0.1) + progressbar_update(count, count, width) + print("\n", flush=True, file=sys.stdout) p.close() p.join() @@ -432,33 +439,6 @@ def process_item( copyfile(ppm, out_meta[idx]) -def process_condition( - cfg: TestConfig, - condition: dict, - tmp_dir: Union[str, Path], - out_dir: Union[str, Path], - logger: logging.Logger, - # parallelise_items: bool = False, -): - chain = condition["processes"] - - logger.info(f" Generating condition: {condition['name']}") - - with Pool() as p: - p.starmap( - process_item, - zip( - cfg.items_list, - repeat(tmp_dir), - repeat(out_dir), - repeat(chain), - repeat(logger), - cfg.metadata_path, - ), - chunksize=8, - ) - - def remove_pre_and_postamble( x, out_fmt, fs, repeat_signal, preamble_len_ms, postamble_len_ms, meta, logger ): -- GitLab From 8b9bb6ed6c351b7d0776130e02cf512754e3e26c Mon Sep 17 00:00:00 2001 From: Markus Multrus Date: Tue, 13 Jun 2023 22:18:04 +0200 Subject: [PATCH 14/14] fix call to generate_tests(), remove option --no_parallel --- generate_test.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) mode change 100644 => 100755 generate_test.py diff --git a/generate_test.py b/generate_test.py old mode 100644 new mode 100755 index f5490a06..141b1106 --- a/generate_test.py +++ b/generate_test.py @@ -168,15 +168,10 @@ if __name__ == "__main__": nargs="+", help="The combinations of experiment/lab-id that you want to generate, separated by whitespace. Experiment and lab id need to be separated by a comma.", ) - parser.add_argument( - "--no_parallel", - action="store_true", - help="If given, configs will not be run in parallel", - ) parser.add_argument( "--create_cfg_only", action="store_true", help="If given, only create the configs and folder structure without processing items", ) args = parser.parse_args() - generate_tests(args.exp_lab_pairs, not args.no_parallel, args.create_cfg_only) + generate_tests(args.exp_lab_pairs, args.create_cfg_only) -- GitLab