From d700915dcbcc5537d9c77e06b9c7810fc341b3b8 Mon Sep 17 00:00:00 2001 From: Treffehn Date: Mon, 8 May 2023 12:49:06 +0200 Subject: [PATCH 1/7] tried to run conditions in parallel --- ivas_processing_scripts/__init__.py | 74 +++++++++++++++++++++-------- 1 file changed, 54 insertions(+), 20 deletions(-) diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index a16309e0..02f9e426 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -147,26 +147,39 @@ def main(args): preprocess_2(cfg, logger) # run conditions - for condition, out_dir, tmp_dir in zip( - cfg.proc_chains, cfg.out_dirs, cfg.tmp_dirs - ): - chain = condition["processes"] - - logger.info(f" Generating condition: {condition['name']}") - - apply_func_parallel( - process_item, - zip( - cfg.items_list, - repeat(tmp_dir), - repeat(out_dir), - repeat(chain), - repeat(logger), - cfg.metadata_path, - ), - None, - "mp" if cfg.multiprocessing else None, - ) + # for condition, out_dir, tmp_dir in zip( + # cfg.proc_chains, cfg.out_dirs, cfg.tmp_dirs + # ): + # chain = condition["processes"] + # + # logger.info(f" Generating condition: {condition['name']}") + # + # apply_func_parallel( + # process_item, + # zip( + # cfg.items_list, + # repeat(tmp_dir), + # repeat(out_dir), + # repeat(chain), + # repeat(logger), + # cfg.metadata_path, + # ), + # None, + # "mp" if cfg.multiprocessing else None, + # ) + + apply_func_parallel( + process_item_parallel, + zip( + repeat(cfg), + cfg.proc_chains, + cfg.tmp_dirs, + cfg.out_dirs, + repeat(logger) + ), + None, + "mp" if cfg.multiprocessing else None, + ) if hasattr(cfg, "preprocessing_2"): reverse_process_2(cfg, logger) @@ -174,3 +187,24 @@ def main(args): # copy configuration to output directory with open(cfg.output_path.joinpath(f"{cfg.name}.yml"), "w") as f: yaml.safe_dump(cfg._yaml_dump, f) + + +def process_item_parallel(cfg, condition, tmp_dir, out_dir, logger): + + chain = condition["processes"] + + logger.info(f" Generating condition: {condition['name']}") + + apply_func_parallel( + process_item, + zip( + cfg.items_list, + repeat(tmp_dir), + repeat(out_dir), + repeat(chain), + repeat(logger), + cfg.metadata_path, + ), + None, + "mp" if cfg.multiprocessing else None, + ) \ No newline at end of file -- GitLab From 278766838ab8e39d49085dcdc740d5dfe8504690 Mon Sep 17 00:00:00 2001 From: Treffehn Date: Mon, 8 May 2023 14:04:18 +0200 Subject: [PATCH 2/7] added different multiprocessing keys --- examples/TEMPLATE.yml | 5 ++++- ivas_processing_scripts/__init__.py | 26 ++------------------------ ivas_processing_scripts/constants.py | 2 ++ 3 files changed, 8 insertions(+), 25 deletions(-) diff --git a/examples/TEMPLATE.yml b/examples/TEMPLATE.yml index 09881a3b..2b90e4dc 100755 --- a/examples/TEMPLATE.yml +++ b/examples/TEMPLATE.yml @@ -9,8 +9,11 @@ ### git commit SHA; default = git rev-parse HEAD # git_sha: abc123 -### Whether to use multiprocessing; default = true +### Whether to use multiprocessing in general; default = true # multiprocessing: false +### Whether to use multiprocessing in main loop for items and/or conditions; default = true +# multiprocessing_items: false +# multiprocessing_conditions: false ### Deletion of temporary directories containing ### intermediate processing files, bitstreams etc.; default = false # delete_tmp: true diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index 02f9e426..f13ee0bf 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -146,28 +146,6 @@ def main(args): # preprocess 2 preprocess_2(cfg, logger) - # run conditions - # for condition, out_dir, tmp_dir in zip( - # cfg.proc_chains, cfg.out_dirs, cfg.tmp_dirs - # ): - # chain = condition["processes"] - # - # logger.info(f" Generating condition: {condition['name']}") - # - # apply_func_parallel( - # process_item, - # zip( - # cfg.items_list, - # repeat(tmp_dir), - # repeat(out_dir), - # repeat(chain), - # repeat(logger), - # cfg.metadata_path, - # ), - # None, - # "mp" if cfg.multiprocessing else None, - # ) - apply_func_parallel( process_item_parallel, zip( @@ -178,7 +156,7 @@ def main(args): repeat(logger) ), None, - "mp" if cfg.multiprocessing else None, + "mp" if cfg.multiprocessing_conditions else None, ) if hasattr(cfg, "preprocessing_2"): @@ -206,5 +184,5 @@ def process_item_parallel(cfg, condition, tmp_dir, out_dir, logger): cfg.metadata_path, ), None, - "mp" if cfg.multiprocessing else None, + "mp" if cfg.multiprocessing_items else None, ) \ No newline at end of file diff --git a/ivas_processing_scripts/constants.py b/ivas_processing_scripts/constants.py index f89e8589..d6457f95 100755 --- a/ivas_processing_scripts/constants.py +++ b/ivas_processing_scripts/constants.py @@ -58,6 +58,8 @@ DEFAULT_CONFIG = { "date": f"{datetime.now().strftime('%Y%m%d_%H.%M.%S')}", "git_sha": f"{get_gitsha()}", "multiprocessing": True, + "multiprocessing_items": True, + "multiprocessing_conditions": True, "delete_tmp": False, "master_seed": 0, "metadata_path": None, -- GitLab From a45a6b0aade36d700026e00dbaa1c8c8f515b01f Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Mon, 12 Jun 2023 11:17:41 +0200 Subject: [PATCH 3/7] [revert] d700915d and 278766838 --- examples/TEMPLATE.yml | 5 +-- ivas_processing_scripts/__init__.py | 53 +++++++++++----------------- ivas_processing_scripts/constants.py | 2 -- 3 files changed, 22 insertions(+), 38 deletions(-) diff --git a/examples/TEMPLATE.yml b/examples/TEMPLATE.yml index 305614af..89415cee 100755 --- a/examples/TEMPLATE.yml +++ b/examples/TEMPLATE.yml @@ -9,11 +9,8 @@ ### git commit SHA; default = git rev-parse HEAD # git_sha: abc123 -### Whether to use multiprocessing in general; default = true +### Whether to use multiprocessing; default = true # multiprocessing: false -### Whether to use multiprocessing in main loop for items and/or conditions; default = true -# multiprocessing_items: false -# multiprocessing_conditions: false ### Deletion of temporary directories containing ### intermediate processing files, bitstreams etc.; default = false # delete_tmp: true diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index d808dd91..15424a3a 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -147,38 +147,27 @@ def main(args): # preprocess 2 preprocess_2(cfg, logger) - apply_func_parallel( - process_item_parallel, - zip( - repeat(cfg), - cfg.proc_chains, - cfg.tmp_dirs, - cfg.out_dirs, - repeat(logger) - ), - None, - "mp" if cfg.multiprocessing_conditions else None, - ) + # run conditions + for condition, out_dir, tmp_dir in zip( + cfg.proc_chains, cfg.out_dirs, cfg.tmp_dirs + ): + chain = condition["processes"] + + logger.info(f" Generating condition: {condition['name']}") + + apply_func_parallel( + process_item, + zip( + cfg.items_list, + repeat(tmp_dir), + repeat(out_dir), + repeat(chain), + repeat(logger), + cfg.metadata_path, + ), + None, + "mp" if cfg.multiprocessing else None, + ) # copy configuration to output directory cfg.to_file(cfg.output_path.joinpath(f"{cfg.name}.yml")) - -def process_item_parallel(cfg, condition, tmp_dir, out_dir, logger): - - chain = condition["processes"] - - logger.info(f" Generating condition: {condition['name']}") - - apply_func_parallel( - process_item, - zip( - cfg.items_list, - repeat(tmp_dir), - repeat(out_dir), - repeat(chain), - repeat(logger), - cfg.metadata_path, - ), - None, - "mp" if cfg.multiprocessing_items else None, - ) \ No newline at end of file diff --git a/ivas_processing_scripts/constants.py b/ivas_processing_scripts/constants.py index 190d691d..5cc47609 100755 --- a/ivas_processing_scripts/constants.py +++ b/ivas_processing_scripts/constants.py @@ -58,8 +58,6 @@ DEFAULT_CONFIG = { "date": f"{datetime.now().strftime('%Y%m%d_%H.%M.%S')}", "git_sha": f"{get_gitsha()}", "multiprocessing": True, - "multiprocessing_items": True, - "multiprocessing_conditions": True, "use_windows_codec_binaries": False, "delete_tmp": False, "master_seed": 0, -- GitLab From 0a76ac396e3099a2db98f257cfda1156f596e3bc Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Mon, 12 Jun 2023 11:18:49 +0200 Subject: [PATCH 4/7] parallelise conditions instead of items --- ivas_processing_scripts/__init__.py | 31 +++++----------- .../processing/processing.py | 37 ++++++++++++++++++- 2 files changed, 46 insertions(+), 22 deletions(-) diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index 15424a3a..ea735648 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -44,6 +44,7 @@ from ivas_processing_scripts.processing.processing import ( preprocess, preprocess_2, preprocess_background_noise, + process_condition, process_item, reorder_items_list, ) @@ -147,27 +148,15 @@ def main(args): # preprocess 2 preprocess_2(cfg, logger) - # run conditions - for condition, out_dir, tmp_dir in zip( - cfg.proc_chains, cfg.out_dirs, cfg.tmp_dirs - ): - chain = condition["processes"] - - logger.info(f" Generating condition: {condition['name']}") - - apply_func_parallel( - process_item, - zip( - cfg.items_list, - repeat(tmp_dir), - repeat(out_dir), - repeat(chain), - repeat(logger), - cfg.metadata_path, - ), - None, - "mp" if cfg.multiprocessing else None, - ) + # parallel processing of conditions + apply_func_parallel( + process_condition, + zip( + repeat(cfg), cfg.proc_chains, cfg.tmp_dirs, cfg.out_dirs, repeat(logger) + ), + None, + "mp" if cfg.multiprocessing else None, + ) # copy configuration to output directory cfg.to_file(cfg.output_path.joinpath(f"{cfg.name}.yml")) diff --git a/ivas_processing_scripts/processing/processing.py b/ivas_processing_scripts/processing/processing.py index 29797295..d4f95301 100755 --- a/ivas_processing_scripts/processing/processing.py +++ b/ivas_processing_scripts/processing/processing.py @@ -402,6 +402,42 @@ def process_item( copyfile(ppm, out_meta[idx]) +def process_condition( + cfg: TestConfig, + condition: dict, + tmp_dir: Union[str, Path], + out_dir: Union[str, Path], + logger: logging.Logger, +): + chain = condition["processes"] + + logger.info(f" Generating condition: {condition['name']}") + + for item, metadata in zip(cfg.items_list, cfg.metadata_path): + process_item( + item, + tmp_dir, + out_dir, + chain, + logger, + metadata, + ) + # apply_func_parallel( + # process_item, + # zip( + # cfg.items_list, + # repeat(tmp_dir), + # repeat(out_dir), + # repeat(chain), + # repeat(logger), + # cfg.metadata_path, + # ), + # None, + # "mp" if cfg.multiprocessing else None, + # ) + # + + def remove_pre_and_postamble( x, out_fmt, fs, repeat_signal, preamble_len_ms, postamble_len_ms, meta, logger ): @@ -464,4 +500,3 @@ def preprocess_background_noise(cfg): ] = output_audio return - -- GitLab From c3ade900103e44dac1f6b8b2ab2adccdf1e6ef68 Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Mon, 12 Jun 2023 11:35:40 +0200 Subject: [PATCH 5/7] [fix] lint --- ivas_processing_scripts/__init__.py | 1 - ivas_processing_scripts/generation/__init__.py | 1 - 2 files changed, 2 deletions(-) diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index ea735648..228950af 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -45,7 +45,6 @@ from ivas_processing_scripts.processing.processing import ( preprocess_2, preprocess_background_noise, process_condition, - process_item, reorder_items_list, ) from ivas_processing_scripts.utils import DirManager, apply_func_parallel diff --git a/ivas_processing_scripts/generation/__init__.py b/ivas_processing_scripts/generation/__init__.py index 8ee7c026..53892744 100755 --- a/ivas_processing_scripts/generation/__init__.py +++ b/ivas_processing_scripts/generation/__init__.py @@ -31,7 +31,6 @@ # import logging -import os import yaml -- GitLab From ec42815c7cab62b4e38573f9226622599f7fa28c Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Mon, 12 Jun 2023 12:41:31 +0200 Subject: [PATCH 6/7] attempt to pick between item and condition parallelisation based on test setup --- ivas_processing_scripts/__init__.py | 13 ++++-- .../processing/processing.py | 46 ++++++++++--------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index 228950af..eacd0e63 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -45,6 +45,7 @@ from ivas_processing_scripts.processing.processing import ( preprocess_2, preprocess_background_noise, process_condition, + process_item, reorder_items_list, ) from ivas_processing_scripts.utils import DirManager, apply_func_parallel @@ -147,14 +148,20 @@ def main(args): # preprocess 2 preprocess_2(cfg, logger) - # parallel processing of conditions + # attempt to parallelise either items or conditions based on test setup + parallelise_items = len(cfg.items_list) > len(cfg.proc_chains) apply_func_parallel( process_condition, zip( - repeat(cfg), cfg.proc_chains, cfg.tmp_dirs, cfg.out_dirs, repeat(logger) + repeat(cfg), + cfg.proc_chains, + cfg.tmp_dirs, + cfg.out_dirs, + repeat(logger), + repeat(parallelise_items), ), None, - "mp" if cfg.multiprocessing else None, + "mp" if (not parallelise_items and cfg.multiprocessing) else None, ) # copy configuration to output directory diff --git a/ivas_processing_scripts/processing/processing.py b/ivas_processing_scripts/processing/processing.py index d4f95301..23be6073 100755 --- a/ivas_processing_scripts/processing/processing.py +++ b/ivas_processing_scripts/processing/processing.py @@ -408,34 +408,36 @@ def process_condition( tmp_dir: Union[str, Path], out_dir: Union[str, Path], logger: logging.Logger, + parallelise_items: bool = False, ): chain = condition["processes"] logger.info(f" Generating condition: {condition['name']}") - for item, metadata in zip(cfg.items_list, cfg.metadata_path): - process_item( - item, - tmp_dir, - out_dir, - chain, - logger, - metadata, + if parallelise_items: + apply_func_parallel( + process_item, + zip( + cfg.items_list, + repeat(tmp_dir), + repeat(out_dir), + repeat(chain), + repeat(logger), + cfg.metadata_path, + ), + None, + "mp" if cfg.multiprocessing else None, ) - # apply_func_parallel( - # process_item, - # zip( - # cfg.items_list, - # repeat(tmp_dir), - # repeat(out_dir), - # repeat(chain), - # repeat(logger), - # cfg.metadata_path, - # ), - # None, - # "mp" if cfg.multiprocessing else None, - # ) - # + else: + for item, metadata in zip(cfg.items_list, cfg.metadata_path): + process_item( + item, + tmp_dir, + out_dir, + chain, + logger, + metadata, + ) def remove_pre_and_postamble( -- GitLab From e0a531a22bd3b4e2ca28d77942adbb34139cff07 Mon Sep 17 00:00:00 2001 From: Archit Tamarapu Date: Mon, 12 Jun 2023 12:44:54 +0200 Subject: [PATCH 7/7] [fix] lint again --- ivas_processing_scripts/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ivas_processing_scripts/__init__.py b/ivas_processing_scripts/__init__.py index eacd0e63..abeb9f4f 100755 --- a/ivas_processing_scripts/__init__.py +++ b/ivas_processing_scripts/__init__.py @@ -45,7 +45,6 @@ from ivas_processing_scripts.processing.processing import ( preprocess_2, preprocess_background_noise, process_condition, - process_item, reorder_items_list, ) from ivas_processing_scripts.utils import DirManager, apply_func_parallel -- GitLab