Commit f8ca8f80 authored by Fabian Müller's avatar Fabian Müller
Browse files

Make thread lock more thread-safe

parent dd407797
Loading
Loading
Loading
Loading
+239 −247
Original line number Diff line number Diff line
@@ -242,7 +242,7 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):

        """

        self.lock.acquire()
        with self.lock:
            if self.stats:
                if not self.run_encoder:
                    if config["config"]["num_dec_remaining"] == config["config"]["num_dec"]:
@@ -250,7 +250,6 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                self.stats["num_decs_running"] += 1
                config["config"]["num_dec_remaining"] -= 1
                self.show_progress()
        self.lock.release()
        enc_dec_cmd = config["config"]["cmd"]
        item_base_name = os.path.splitext(os.path.basename(config["enc_file_name"]))[0]
        output_config = config["out_config"]
@@ -347,7 +346,7 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                    )

                self.logger.error(fail_string.format(enc_file_name, dec_file_name))
                self.lock.acquire()
                with self.lock:
                    if self.stats:
                        self.stats["num_dec_errors"] += 1
                        self.show_progress()
@@ -370,7 +369,6 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                        self.failed_modes["dec"][output_config].append(mode)
                    else:
                        self.failed_modes["dec"][output_config] = [mode]
                self.lock.release()
            else:
                self.logger.info(
                    "Decoding successful for {} to {}".format(
@@ -388,8 +386,8 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                    config["enc_file_name"], output_config
                )
            )
        self.lock.acquire()

        with self.lock:
            config["config"]["num_dec_done"] += 1
            if self.stats:
                self.stats["num_decs_finished"] += 1
@@ -401,8 +399,6 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                    self.stats["num_modes_finished"] += 1
                self.show_progress()

        self.lock.release()

    def clean_pcm_directory(self):
        """
        Remove .LOCK files and corresponding intermediate files from the pcm directory
@@ -439,8 +435,8 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
        error = 0
        enc_dec_cmd = deepcopy(config["cmd"])
        # get next item
        config["lock"].acquire()
        self.lock.acquire()
        with config["lock"]:
            with self.lock:
                if self.stats:
                    if config["num_enc_remaining"] == config["num_enc"]:
                        self.stats["num_modes_running"] += 1
@@ -449,9 +445,7 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                    in_file_name = config["item_list"].pop()
                    config["num_enc_remaining"] -= 1
                else:
            config["lock"].release()
                    return
        self.lock.release()

            metadata_file_names = list()

@@ -460,12 +454,11 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                metadata_file_names = in_file_name[1:]
                in_file_name = in_file_name[0]
            self.logger.info("Encoding Mode {} input file {}".format(mode, in_file_name))
        config["lock"].release()
        self.lock.acquire()

        with self.lock:
            if self.stats:
                self.stats["num_encs_running"] += 1
                self.show_progress()
        self.lock.release()
        pcm_name_lock = None
        item_base_name = os.path.splitext(os.path.basename(in_file_name))[0]
        enc_file_name = os.path.join(
@@ -524,14 +517,21 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                pcm_name = os.path.join(self.dir_name, "pcm", pcm_base_name)
                pcm_name_lock = "".join([pcm_name, ".LOCK"])
                pcm_info_file = f"{pcm_name}.json"
                self.lock.acquire()

                create_input_file = False
                pcm_ready = False

                with self.lock:
                    if not os.path.exists(pcm_name_lock):
                        if not os.path.exists(pcm_name):
                        # create the lock
                        fp = open(pcm_name_lock, "w")
                        fp.close()
                        self.lock.release()
                            # create lock file
                            with open(pcm_name_lock, "w"):
                                pass

                            create_input_file = True

                if create_input_file:

                    self.logger.info(
                        "Creating input pcm for item {} at sample rate {}: {}".format(
                            item_base_name, sample_rate_in, pcm_name
@@ -626,7 +626,7 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                    pcm_log.flush()
                    pcm_log.close()
                    # remove file lock
                        self.lock.acquire()
                    with self.lock:
                        os.remove(pcm_name_lock)
                        # os.remove(pcm_name_res_tmp)
                        if do_limit_duration and cut_len_samples < in_len:
@@ -634,26 +634,24 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                        self.logger.info(
                            "PCM file {} successfully created!".format(pcm_name)
                        )
                        self.lock.release()
                    else:
                        self.lock.release()
                else:

                    # PCM is now ready, no need to wait for anybody
                    pcm_ready = True

                # wait for pcm to be ready
                    self.lock.release()
                    not_ready = 1
                    while not_ready:
                while not pcm_ready:
                    time.sleep(1)
                        self.lock.acquire()

                    with self.lock:
                        if not os.path.exists(pcm_name_lock):
                            not_ready = 0
                        self.lock.release()
                            pcm_ready = True

            else:
                pcm_name = in_file_name
                pcm_info_file = f"{pcm_name}.json"

            # get input format dictionary for the input file
            self.lock.acquire()
            with self.lock:
                if pcm_name not in self.pcm_info:
                    if os.path.exists(pcm_info_file):
                        with open(pcm_info_file, "r") as pcmifp:
@@ -666,7 +664,6 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                            in_format = "HOA3"
                        in_format_dict = spformat.get_format_dict(in_format)
                    self.pcm_info.update({pcm_name: in_format_dict})
            self.lock.release()

            # build the encoder commandline
            enc_options = enc_dec_cmd["encmodeoption"]
@@ -815,7 +812,7 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):

        except Exception as exc:
            # make sure we do not have a deadlock...
            self.lock.acquire()
            with self.lock:
                if pcm_name_lock:

                    if os.path.exists(pcm_name_lock):
@@ -836,11 +833,10 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                    "Traceback: {}".format(traceback.format_tb(exc.__traceback__)),
                    logging.ERROR,
                )
            self.lock.release()
            enc_file_name_dec = enc_file_name
            error = 1

        self.lock.acquire()
        with self.lock:
            if self.stats:
                config["num_enc_done"] += 1
                if config["num_enc_done"] == config["num_enc"]:
@@ -853,16 +849,15 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                self.stats["num_encs_finished"] += 1
                self.stats["num_encs_running"] -= 1
                self.show_progress()
        self.lock.release()

        if error != 0:
            fail_string = "Encoding failed for {}"
            fail_string = "error " + str(error) + ": Encoding failed for {}"
            if error == RET_CODE_TIMEOUT_EXP:
                fail_string = (
                    fail_string + f" due to timeout after {self.timeout} seconds"
                )

            self.lock.acquire()
            with self.lock:
                if self.stats:
                    self.stats["num_enc_errors"] += 1
                    self.show_progress()
@@ -875,7 +870,6 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                    ]
                )
                self.failed_modes["enc"].append(mode)
            self.lock.release()
            self.logger.error(fail_string.format(enc_file_name))
        else:
            self.logger.info("Encoding successful for {}".format(enc_file_name))
@@ -1169,7 +1163,7 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
        return missing_mode_flat_dict

    def bs_processing_thread(self, bs_entry):
        self.lock.acquire()
        with self.lock:
            self.bs_processing_queue["stats"]["num_bs_running"] += 1
            line = "Bit stream processing: {}/{} ({} running)".format(
                self.bs_processing_queue["stats"]["num_bs_done"],
@@ -1178,7 +1172,6 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
            )
            self.logger.progress(line)
            enc_file_name = bs_entry["item_list"].pop()
        self.lock.release()
        bs_in_file = enc_file_name
        proc_chain = deepcopy(bs_entry["proc_chain"])
        suffices = self.get_bs_processing_suffices(bs_entry)
@@ -1239,7 +1232,7 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
            self.logger.error(
                "Traceback: {}".format(traceback.format_tb(exc.__traceback__))
            )
        self.lock.acquire()
        with self.lock:
            self.bs_processing_queue["stats"]["num_bs_running"] -= 1
            self.bs_processing_queue["stats"]["num_bs_done"] += 1
            line = "Bit stream processing: {}/{} ({} running)".format(
@@ -1248,7 +1241,6 @@ class IvasModeRunner(IvasModeCollector.IvasModeCollector):
                self.bs_processing_queue["stats"]["num_bs_running"],
            )
            self.logger.progress(line)
        self.lock.release()

    def run_bs_processing_queue(self):
        self.logger.console("Postprocessing of bit streams ...", logging.INFO)