Commit dac33e63 authored by Vladimir Malenovsky's avatar Vladimir Malenovsky
Browse files

debug: try running truncate_signal as session-scope function

parent 079c0bc5
Loading
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -1546,3 +1546,15 @@ def ref_postrend_frontend(ref_postrend_path, request) -> Optional[PostRendFronte
    # Fixture teardown
    if postrend is not None:
        postrend._check_run()

def pytest_sessionstart(session):
    session.test_functions = []

def pytest_collection_finish(session):
    # Collect all test function names
    session.test_functions = [item.nodeid for item in session.items]

@pytest.fixture(scope="session")
def collected_tests(request):
    # Access the collected test function names
    return getattr(request.session, "test_functions", [])
 No newline at end of file
+64 −2
Original line number Diff line number Diff line
@@ -33,9 +33,72 @@ the United Nations Convention on Contracts on the International Sales of Goods.
import pytest

from tests.split_rendering.utils import *
import pdb

""" Ambisonics """

@pytest.fixture(scope="session", autouse=True)
def truncate_input_files(collected_tests, request):
    """
    Truncate the test vectors to maximum INPUT_DURATION_SEC seconds
    """
    
    INPUT_FORMATS_FULL_CHAIN_SPLIT_PCM = [entry[0] for entry in full_chain_split_pcm_params]
    INPUT_FORMATS_EXT_SPLIT_PCM = [entry[0] for entry in external_split_pcm_params] 

    TEST_FUNCTIONS_TO_FORMATS = {
        "test_ambisonics_full_chain_split": INPUT_FORMATS_AMBI_SPLIT_REND,
        "test_ambisonics_external_split": INPUT_FORMATS_AMBI_SPLIT_REND,
        "test_multichannel_full_chain_split": INPUT_FORMATS_MC_SPLIT_REND,
        "test_multichannel_external_split": INPUT_FORMATS_MC_SPLIT_REND,
        "test_ism_full_chain_split": INPUT_FORMATS_ISM_SPLIT_REND,
        "test_ism_external_split": INPUT_FORMATS_ISM_SPLIT_REND,
        "test_masa_full_chain_split": INPUT_FORMATS_MASA_SPLIT_REND,
        "test_masa_external_split": INPUT_FORMATS_MASA_SPLIT_REND,
        "test_omasa_full_chain_split": INPUT_FORMATS_OMASA_SPLIT_REND,
        "test_omasa_external_split": INPUT_FORMATS_OMASA_SPLIT_REND,
        "test_osba_full_chain_split": INPUT_FORMATS_OSBA_SPLIT_REND,
        "test_osba_external_split": INPUT_FORMATS_OSBA_SPLIT_REND,
        "test_post_rend_plc": [INPUT_FORMATS_AMBI_SPLIT_REND[-1]],
        "test_full_chain_split_pcm": INPUT_FORMATS_FULL_CHAIN_SPLIT_PCM,
        "test_external_split_pcm": INPUT_FORMATS_EXT_SPLIT_PCM,
        "test_framing_combinations_full_chain_split": ["5_1", "FOA"],
        "test_framing_combinations_external_split": ["5_1"],
    }
    
    pattern = r'::([^\[]+)\['

    # get the list of test functions to run
    functions_to_run = {re.search(pattern, entry).group(1) for entry in collected_tests if re.search(pattern, entry)}
    functions_to_run = list(functions_to_run)

    # get the list of file formats for test functions
    formats = []
    for name in functions_to_run:
        formats.extend(TEST_FUNCTIONS_TO_FORMATS.get(name, []))
    seen = set()
    formats = [fmt for fmt in formats if not (fmt in seen or seen.add(fmt))]    # remove duplicates
    
    logging.info(f"\nCutting input files for the following formats: {', '.join(formats)}\n")

    # pdb.set_trace()
    for in_fmt in formats:
        in_file = FORMAT_TO_FILE_COMPARETEST[in_fmt]
        cut_in_file = in_file.with_stem(in_file.stem + "_cut")

        if not path.exists(cut_in_file):
            data, fs = readfile(in_file)

            if data.ndim == 1:
                data_out = data[: INPUT_DURATION_SEC * fs]
            elif data.ndim == 2:
                data_out = data[: INPUT_DURATION_SEC * fs, :]
            else:
                raise ValueError(f"Cannot truncate data with dimension of {data.ndim}")

            writefile(cut_in_file, data_out, fs)


""" Ambisonics """

@pytest.mark.parametrize("delay_profile", DELAY_PROFILES)
@pytest.mark.parametrize("trajectory", SPLIT_REND_HR_TRAJECTORIES_TO_TEST)
@@ -557,7 +620,6 @@ def test_post_rend_plc(

""" BINAURAL_SPLIT_PCM """


full_chain_split_pcm_params = [
    ("HOA3", "96000", "split_renderer_config_1dof_512k_default"),
    ("7_1_4", "512000", "split_renderer_config_3dofhq_512k_lc3plus"),
+26 −26
Original line number Diff line number Diff line
@@ -36,7 +36,7 @@ from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Tuple, Optional
from os import path
import threading
# from filelock import FileLock

import numpy as np
import pytest
@@ -63,7 +63,7 @@ from ..conftest import parse_properties
sys.path.append(SCRIPTS_DIR)
from pyaudio3dtools.audiofile import readfile, writefile

file_lock = threading.Lock()
# file_lock = FileLock("tmp_truncate_signal_token.lock")

def lc3plus_used(test_info, in_fmt, render_config):
    return (
@@ -151,25 +151,25 @@ def check_xfail(
        )


def truncate_signal(
    in_file: Path,
    out_file: Path,
) -> None:
    """
    Truncate the signal in in_file to maximum INPUT_DURATION_SEC seconds,
    and write the truncated signal to out_file
    """
# def truncate_signal(
    # in_file: Path,
    # out_file: Path,
# ) -> None:
    # """
    # Truncate the signal in in_file to maximum INPUT_DURATION_SEC seconds,
    # and write the truncated signal to out_file
    # """
    
    data, fs = readfile(in_file)
    # data, fs = readfile(in_file)

    if data.ndim == 1:
        data_out = data[: INPUT_DURATION_SEC * fs]
    elif data.ndim == 2:
        data_out = data[: INPUT_DURATION_SEC * fs, :]
    else:
        raise ValueError(f"Cannot truncate data with dimension of {data.ndim}")
    # if data.ndim == 1:
        # data_out = data[: INPUT_DURATION_SEC * fs]
    # elif data.ndim == 2:
        # data_out = data[: INPUT_DURATION_SEC * fs, :]
    # else:
        # raise ValueError(f"Cannot truncate data with dimension of {data.ndim}")

    writefile(out_file, data_out, fs)
    # writefile(out_file, data_out, fs)


def run_full_chain_split_rendering(
@@ -249,10 +249,10 @@ def run_full_chain_split_rendering(
        # if in REF or CUT creation mode use the comparetestv
        if test_info.config.option.create_ref or test_info.config.option.create_cut:
            in_file = FORMAT_TO_FILE_COMPARETEST[in_fmt]
            cut_in_file = output_path_base.joinpath(in_file.stem + "_cut" + in_file.suffix) 
            with file_lock:
                if not path.exists(cut_in_file):
                    truncate_signal(in_file, cut_in_file)
            cut_in_file = in_file.with_stem(in_file.stem + "_cut")
            # with file_lock:
                # if not path.exists(cut_in_file):
                    # truncate_signal(in_file, cut_in_file)

            enc_cmd[3] = str(cut_in_file)
        else:
@@ -428,10 +428,10 @@ def run_external_split_rendering(
        # if in REF or CUT creation mode use the comparetestv
        if test_info.config.option.create_ref or test_info.config.option.create_cut:
            in_file = FORMAT_TO_FILE_COMPARETEST[in_fmt]
            cut_in_file = output_path_base.joinpath(in_file.stem + "_cut" + in_file.suffix) 
            with file_lock:
                if not path.exists(cut_in_file):
                    truncate_signal(in_file, cut_in_file)
            cut_in_file = in_file.with_stem(in_file.stem + "_cut")
            # with file_lock:
                # if not path.exists(cut_in_file):
                    # truncate_signal(in_file, cut_in_file)

            split_pre_cmd[6] = str(cut_in_file)
        else: