Commit ec42815c authored by Archit Tamarapu's avatar Archit Tamarapu
Browse files

attempt to pick between item and condition parallelisation based on test setup

parent c3ade900
Loading
Loading
Loading
Loading
Loading
+10 −3
Original line number Diff line number Diff line
@@ -45,6 +45,7 @@ from ivas_processing_scripts.processing.processing import (
    preprocess_2,
    preprocess_background_noise,
    process_condition,
    process_item,
    reorder_items_list,
)
from ivas_processing_scripts.utils import DirManager, apply_func_parallel
@@ -147,14 +148,20 @@ def main(args):
            # preprocess 2
            preprocess_2(cfg, logger)

        # parallel processing of conditions
        # attempt to parallelise either items or conditions based on test setup
        parallelise_items = len(cfg.items_list) > len(cfg.proc_chains)
        apply_func_parallel(
            process_condition,
            zip(
                repeat(cfg), cfg.proc_chains, cfg.tmp_dirs, cfg.out_dirs, repeat(logger)
                repeat(cfg),
                cfg.proc_chains,
                cfg.tmp_dirs,
                cfg.out_dirs,
                repeat(logger),
                repeat(parallelise_items),
            ),
            None,
            "mp" if cfg.multiprocessing else None,
            "mp" if (not parallelise_items and cfg.multiprocessing) else None,
        )

    # copy configuration to output directory
+24 −22
Original line number Diff line number Diff line
@@ -408,11 +408,27 @@ def process_condition(
    tmp_dir: Union[str, Path],
    out_dir: Union[str, Path],
    logger: logging.Logger,
    parallelise_items: bool = False,
):
    chain = condition["processes"]

    logger.info(f"  Generating condition: {condition['name']}")

    if parallelise_items:
        apply_func_parallel(
            process_item,
            zip(
                cfg.items_list,
                repeat(tmp_dir),
                repeat(out_dir),
                repeat(chain),
                repeat(logger),
                cfg.metadata_path,
            ),
            None,
            "mp" if cfg.multiprocessing else None,
        )
    else:
        for item, metadata in zip(cfg.items_list, cfg.metadata_path):
            process_item(
                item,
@@ -422,20 +438,6 @@ def process_condition(
                logger,
                metadata,
            )
    # apply_func_parallel(
    #     process_item,
    #     zip(
    #         cfg.items_list,
    #         repeat(tmp_dir),
    #         repeat(out_dir),
    #         repeat(chain),
    #         repeat(logger),
    #         cfg.metadata_path,
    #     ),
    #     None,
    #     "mp" if cfg.multiprocessing else None,
    # )
    #


def remove_pre_and_postamble(