Commit 5e2037e6 authored by Archit Tamarapu's avatar Archit Tamarapu
Browse files

apply changes for preprocessing as well

parent 83991a54
Loading
Loading
Loading
Loading
Loading
+1 −3
Original line number Diff line number Diff line
@@ -49,7 +49,7 @@ IN_FMT_FOR_MASA_EXPS = {
}


def generate_tests(exp_lab_pairs, run_parallel=True, create_cfg_only=False):
def generate_tests(exp_lab_pairs, create_cfg_only=False):
    """
    Create configs and run them for all given experiment/lab pairs
    """
@@ -71,8 +71,6 @@ class Arguments:
    def __init__(self, config):
        self.config = config
        self.debug = False
        # used to overwrite the multiprocessing key in the configs to rather parallelize on category level
        self.multiprocessing = True


def create_experiment_setup(experiment, lab) -> list[Path]:
+57 −28
Original line number Diff line number Diff line
@@ -33,9 +33,10 @@
import logging
from abc import ABC, abstractmethod
from itertools import repeat
from multiprocessing import Pool
from multiprocessing import Pool, cpu_count
from pathlib import Path
from shutil import copyfile
from time import sleep
from typing import Iterable, Union
from warnings import warn

@@ -50,7 +51,7 @@ from ivas_processing_scripts.audiotools.metadata import (
)
from ivas_processing_scripts.constants import LOGGER_DATEFMT, LOGGER_FORMAT
from ivas_processing_scripts.processing.config import TestConfig
from ivas_processing_scripts.utils import list_audio, pairwise
from ivas_processing_scripts.utils import list_audio, pairwise, progressbar_update


class Processing(ABC):
@@ -235,9 +236,14 @@ def preprocess(cfg, logger):

    logger.info(f"  Generating condition: {preprocessing['name']}")

    # set up values for progress display and chunksize
    count = len(cfg.items_list)
    width = 80
    chunksize = count / cpu_count()

    # run preprocessing
    with Pool() as p:
        p.starmap(
    p = Pool()
    results = p.starmap_async(
        process_item,
        zip(
            cfg.items_list,
@@ -247,9 +253,18 @@ def preprocess(cfg, logger):
            repeat(logger),
            cfg.metadata_path,
        ),
            chunksize=8,
        chunksize=len(cfg.items_list) / cpu_count(),
    )

    # poll progress
    progressbar_update(0, count, width)
    while not results.ready():
        progressbar_update(count - int(results._number_left * chunksize), count, width)
        sleep(0.1)

    p.close()
    p.join()

    # update the configuration to use preprocessing outputs as new inputs
    cfg.items_list = list_audio(
        cfg.out_dirs[0], select_list=getattr(cfg, "input_select", None)
@@ -288,9 +303,14 @@ def preprocess_2(cfg, logger):
    if chain[0].concatenate_input:
        concat_setup(cfg, chain, logger)

    # set up values for progress display and chunksize
    count = len(cfg.items_list)
    width = 80
    chunksize = count / cpu_count()

    # run preprocessing 2
    with Pool() as p:
        p.starmap(
    p = Pool()
    results = p.starmap_async(
        process_item,
        zip(
            cfg.items_list,
@@ -300,9 +320,18 @@ def preprocess_2(cfg, logger):
            repeat(logger),
            cfg.metadata_path,
        ),
            chunksize=8,
        chunksize=len(cfg.items_list) / cpu_count(),
    )

    # poll progress
    progressbar_update(0, count, width)
    while not results.ready():
        progressbar_update(count - int(results._number_left * chunksize), count, width)
        sleep(0.1)

    p.close()
    p.join()

    # update the configuration to use preprocessing 2 outputs as new inputs
    cfg.items_list = list_audio(
        cfg.out_dirs[0], select_list=getattr(cfg, "input_select", None)