Commit 7113d287 authored by Archit Tamarapu's avatar Archit Tamarapu
Browse files

remove chunksize parameter

parent 5e2037e6
Loading
Loading
Loading
Loading
Loading
+5 −6
Original line number Diff line number Diff line
@@ -31,8 +31,9 @@
#

import logging
import sys
from itertools import product
from multiprocessing import Pool, cpu_count
from multiprocessing import Pool
from time import sleep

from ivas_processing_scripts.audiotools.metadata import check_ISM_metadata
@@ -167,23 +168,21 @@ def main(args):
            # set up values for progress display and chunksize
            count = len(item_args)
            width = 80
            chunksize = count / cpu_count()

            # submit tasks to the pool
            p = Pool()
            results = p.starmap_async(
                process_item,
                item_args,
                chunksize,
            )

            # poll progress
            progressbar_update(0, count, width)
            while not results.ready():
                progressbar_update(
                    count - int(results._number_left * chunksize), count, width
                )
                progressbar_update(count - int(results._number_left), count, width)
                sleep(0.1)
            progressbar_update(count, count, width)
            print("\n", flush=True, file=sys.stdout)

            p.close()
            p.join()
+22 −42
Original line number Diff line number Diff line
@@ -31,9 +31,10 @@
#

import logging
import sys
from abc import ABC, abstractmethod
from itertools import repeat
from multiprocessing import Pool, cpu_count
from multiprocessing import Pool
from pathlib import Path
from shutil import copyfile
from time import sleep
@@ -239,12 +240,9 @@ def preprocess(cfg, logger):
    # set up values for progress display and chunksize
    count = len(cfg.items_list)
    width = 80
    chunksize = count / cpu_count()

    # run preprocessing
    p = Pool()
    results = p.starmap_async(
        process_item,
    args = list(
        zip(
            cfg.items_list,
            repeat(cfg.tmp_dirs[0]),
@@ -252,15 +250,21 @@ def preprocess(cfg, logger):
            repeat(chain),
            repeat(logger),
            cfg.metadata_path,
        ),
        chunksize=len(cfg.items_list) / cpu_count(),
        )
    )
    p = Pool()
    results = p.starmap_async(
        process_item,
        args,
    )

    # poll progress
    progressbar_update(0, count, width)
    while not results.ready():
        progressbar_update(count - int(results._number_left * chunksize), count, width)
        progressbar_update(count - int(results._number_left), count, width)
        sleep(0.1)
    progressbar_update(count, count, width)
    print("\n", flush=True, file=sys.stdout)

    p.close()
    p.join()
@@ -306,12 +310,9 @@ def preprocess_2(cfg, logger):
    # set up values for progress display and chunksize
    count = len(cfg.items_list)
    width = 80
    chunksize = count / cpu_count()

    # run preprocessing 2
    p = Pool()
    results = p.starmap_async(
        process_item,
    args = list(
        zip(
            cfg.items_list,
            repeat(cfg.tmp_dirs[0]),
@@ -319,15 +320,21 @@ def preprocess_2(cfg, logger):
            repeat(chain),
            repeat(logger),
            cfg.metadata_path,
        ),
        chunksize=len(cfg.items_list) / cpu_count(),
        )
    )
    p = Pool()
    results = p.starmap_async(
        process_item,
        args,
    )

    # poll progress
    progressbar_update(0, count, width)
    while not results.ready():
        progressbar_update(count - int(results._number_left * chunksize), count, width)
        progressbar_update(count - int(results._number_left), count, width)
        sleep(0.1)
    progressbar_update(count, count, width)
    print("\n", flush=True, file=sys.stdout)

    p.close()
    p.join()
@@ -432,33 +439,6 @@ def process_item(
                copyfile(ppm, out_meta[idx])


def process_condition(
    cfg: TestConfig,
    condition: dict,
    tmp_dir: Union[str, Path],
    out_dir: Union[str, Path],
    logger: logging.Logger,
    # parallelise_items: bool = False,
):
    chain = condition["processes"]

    logger.info(f"  Generating condition: {condition['name']}")

    with Pool() as p:
        p.starmap(
            process_item,
            zip(
                cfg.items_list,
                repeat(tmp_dir),
                repeat(out_dir),
                repeat(chain),
                repeat(logger),
                cfg.metadata_path,
            ),
            chunksize=8,
        )


def remove_pre_and_postamble(
    x, out_fmt, fs, repeat_signal, preamble_len_ms, postamble_len_ms, meta, logger
):