Commit 6a473f30 authored by Archit Tamarapu's avatar Archit Tamarapu
Browse files

[fix] multiprocessing setting in config not respected in some processing stages

parent ddd3ae98
Loading
Loading
Loading
Loading
Loading
+3 −27
Original line number Diff line number Diff line
@@ -190,34 +190,10 @@ def main(args):
                (item, tmp_dir, out_dir, chain["processes"], logger, metadata)
            )

        if cfg.multiprocessing:
            # set up values for progress display and chunksize
            count = len(item_args)
            width = 80

            # submit tasks to the pool
            p = Pool()
            results = p.starmap_async(
                process_item,
                item_args,
        apply_func_parallel(
            process_item, item_args, None, "mp" if cfg.multiprocessing else None, True
        )

            # poll progress
            progressbar_update(0, count, width)
            while not results.ready():
                progressbar_update(count - int(results._number_left), count, width)
                spinner()
                sleep(0.1)
            progressbar_update(count, count, width)
            print("", flush=True, file=sys.stdout)
            results.get()

            p.close()
            p.join()

        else:
            apply_func_parallel(process_item, item_args, None, None, True)

    # copy configuration to output directory
    cfg.to_file(cfg.output_path.joinpath(f"{cfg.name}.yml"))

+6 −36
Original line number Diff line number Diff line
@@ -52,11 +52,11 @@ from ivas_processing_scripts.audiotools.metadata import (
)
from ivas_processing_scripts.constants import LOGGER_DATEFMT, LOGGER_FORMAT
from ivas_processing_scripts.processing.config import TestConfig
from ivas_processing_scripts.processing.tx import get_timescaled_splits
from ivas_processing_scripts.utils import (
    apply_func_parallel,
    list_audio,
    pairwise,
    progressbar_update,
    spinner,
)


@@ -323,25 +323,10 @@ def preprocess(cfg, logger):
            cfg.metadata_path,
        )
    )
    p = Pool()
    results = p.starmap_async(
        process_item,
        args,
    apply_func_parallel(
        process_item, args, None, "mp" if cfg.multiprocessing else None, True
    )

    # poll progress
    progressbar_update(0, count, width)
    while not results.ready():
        progressbar_update(count - int(results._number_left), count, width)
        spinner()
        sleep(0.1)
    progressbar_update(count, count, width)
    print("", flush=True, file=sys.stdout)
    results.get()

    p.close()
    p.join()

    # update the configuration to use preprocessing outputs as new inputs
    cfg.items_list = list_audio(
        cfg.out_dirs[0], select_list=getattr(cfg, "input_select", None)
@@ -419,25 +404,10 @@ def preprocess_2(cfg, logger):
            cfg.metadata_path,
        )
    )
    p = Pool()
    results = p.starmap_async(
        process_item,
        args,
    apply_func_parallel(
        process_item, args, None, "mp" if cfg.multiprocessing else None, True
    )

    # poll progress
    progressbar_update(0, count, width)
    while not results.ready():
        progressbar_update(count - int(results._number_left), count, width)
        spinner()
        sleep(0.1)
    progressbar_update(count, count, width)
    print("", flush=True, file=sys.stdout)
    results.get()

    p.close()
    p.join()

    # update the configuration to use preprocessing 2 outputs as new inputs
    cfg.items_list = list_audio(
        cfg.out_dirs[0], select_list=getattr(cfg, "input_select", None)