Commit 253433ef authored by Vladimir Malenovsky's avatar Vladimir Malenovsky
Browse files

add ISAR test

parent 6941acb1
Loading
Loading
Loading
Loading
Loading
+125 −116
Original line number Diff line number Diff line
@@ -789,7 +789,6 @@ def dut_decoder_path(request) -> str:

    return path


class DecoderFrontend:
    def __init__(self, path, dec_type, record_property, timeout=None, fr=20) -> None:
        self._path = str(Path(path).absolute())
@@ -866,9 +865,7 @@ class DecoderFrontend:

            # TODO: centralize this in a utils file
            if system == "Windows":
                netsim_path = SCRIPTS_DIR.joinpath(
                    "tools/Win32/networkSimulator_g192.exe"
                )
                netsim_path = SCRIPTS_DIR.joinpath("tools/Win32/networkSimulator_g192.exe")
            elif system == "Linux":
                netsim_path = SCRIPTS_DIR.joinpath("tools/Linux/networkSimulator_g192")
            elif system == "Darwin":
@@ -1049,7 +1046,6 @@ def ref_decoder_frontend(
        # Fixture teardown
        decoder._check_run()


@pytest.fixture(scope="session")
def test_vector_path(request) -> str:
    """
@@ -1341,93 +1337,87 @@ def get_format_from_enc_opts(enc_opts: str) -> str:
    return format


@pytest.fixture(scope="session")
def dut_postrend_path(request) -> Optional[str]:
    """
    Return path of DUT postrend binary.
    """
    if request.config.option.dut_postrend_path:
        return request.config.option.dut_postrend_path

    if request.config.option.update_ref == "1":
        return None

    here = Path(__file__).parent.resolve()
    system = platform.system()

    if system == "Windows":
        path = here.joinpath("../ISAR_post_rend.exe")
    elif system in ["Darwin", "Linux"]:
        path = here.joinpath("../ISAR_post_rend")
    else:
        raise ValueError(f'Wrong system "{system}"!')

    path = str(path.resolve())

    if not os.path.isfile(path):
        raise FileNotFoundError(f"DUT postrend binary {path} not found!\n!")

    return path


class PostRendFrontend:
    def __init__(self, path, postrend_type, timeout=None, fr=20) -> None:
        self._path = Path(path).absolute()
    def __init__(self, path, postrend_type, record_property, timeout=None, fr=20) -> None:
        self._path = str(Path(path).absolute())
        self._type = postrend_type            
        self.returncode = None
        self.stdout = None
        self.stderr = None
        self.timeout = timeout
        self.fr = fr
        self.record_property = record_property

    def run(
        self,
        output_sampling_rate: int,
        input_path: Path,
        output_path: Path,
        head_trajectory: Path,
        metadata_input_path: Optional[Path] = None,
        input_file: Path,
        input_file_format: str,
        output_file: Path,
        input_sampling_rate: Optional[int] = 48,
        input_metadata: Optional[Path] = None,
        quiet_mode: Optional[bool] = True,
        bfi_file: Optional[Path] = None,
        trajectory_file: Optional[Path] = None,
        prbfi_file: Optional[Path] = None,
        framing: Optional[int] = None,
        add_option_list: Optional[list] = None,
        run_dir: Optional[Path] = None,
    ) -> None:
        command = [str(self._path)]
        command = [self._path]
        
        # add optional parameters
        if quiet_mode:
            command.extend(["-q"])
            
        if bfi_file is not None:
            command.extend(["-prbfi", str(bfi_file)])
        if input_file_format == "BINAURAL_SPLIT_PCM":
            # check if input sampling rate has been provided
            if not input_sampling_rate:
                raise ValueError(f"No input sampling rate specified for PCM input file {input_file}!\n")
            elif input_sampling_rate not in [16, 32, 48]:
                raise ValueError(f"Incorrect input sampling rate specified for PCM input file {input_file}!\n")
                
            command.extend(["-fs", str(input_sampling_rate)])
            
            # check if input metadata file has been provided
            if not input_metadata.exists():
                raise FileNotFoundError(f"The metadata file {input_metadata} for PCM input file {input_file} not found!\n")
                
            command.extend(["-im", str(input_metadata)])
            
        if prbfi_file:
            if not prbfi_file.exists():
                raise FileNotFoundError(f"The file {prbfi_file} not found!\n")
            
            command.extend(["-prbfi", str(prbfi_file)])

        if trajectory_file:
            if not trajectory_file.exists():
                raise FileNotFoundError(f"The file {trajectory_file} not found!\n")
            
            command.extend(["-T", str(trajectory_file)])
            
        if framing:
            if framing not in [5, 10, 20]:
                raise ValueError(f"Incorrect framing size specified {framing}!\n")
                
            command.extend(["-fr", str(framing)])           
            
        if add_option_list is not None:
            command.extend(add_option_list)

        if metadata_input_path is not None:
            # If we have metadata input file, then input format must be PCM
            command.extend(["-if", "BINAURAL_SPLIT_PCM"])
            command.extend(["-im", str(metadata_input_path)])
        else:
            command.extend(["-if", "BINAURAL_SPLIT_CODED"])

        command.extend(
            [
                "-fr",
                str(self.fr),
                "-fs",
                str(output_sampling_rate),
        # add mandatory parameters
        command += [
            "-i",
                str(input_path),
            str(input_file),
            "-if",
            str(input_file_format),
            "-o",
                str(output_path),
                "-T",
                str(head_trajectory),
            str(output_file),
            "-fs",
            "48",               # !!!!! should be eventually removed from the mandatory cmd-line parameters
        ]
        )

        cmd_str = textwrap.indent(" ".join(command), prefix="\t")
        log_dbg_msg(f"{self._type} post-rend command:\n{cmd_str}")
        log_dbg_msg(f"{self._type} post-renderer command:\n{cmd_str}")

        try:
            with tempfile.TemporaryDirectory() as tmp_dir:
@@ -1443,20 +1433,20 @@ class PostRendFrontend:
                    cwd=cwd,
                )
        except TimeoutExpired:
            pytest.fail(f"{self._type} post-rend run timed out after {self.timeout}s.")
            pytest.fail(f"{self._type} post-renderer run timed out after {self.timeout}s.")

        self.returncode = result.returncode
        self.stderr = result.stderr.decode("ascii")
        self.stdout = result.stdout.decode("ascii")
        if self.stdout:
            stdout_str = textwrap.indent(self.stdout, prefix="\t")
            log_dbg_msg(f"{self._type} post-rend stdout:\n{stdout_str}")
            log_dbg_msg(f"{self._type} post-renderer stdout:\n{stdout_str}")
        if self.stderr:
            stderr_str = textwrap.indent(self.stderr, prefix="\t")
            log_dbg_msg(f"{self._type} post-rend stderr:\n{stderr_str}")
            log_dbg_msg(f"{self._type} post-renderer stderr:\n{stderr_str}")
        if self.returncode:
            pytest.fail(
                f"{self._type} post-rend terminated with a non-0 return code: {self.returncode}"
                f"{self._type} post-renderer terminated with a non-0 return code: {self.returncode}"
            )
        if self.stderr and "UndefinedBehaviorSanitizer" in self.stderr:
            pytest.fail("Undefined Behaviour runtime error encountered")
@@ -1465,39 +1455,12 @@ class PostRendFrontend:
        if self.returncode is not None:
            if self.returncode:
                pytest.fail(
                    f"{self._type} post-rend terminated with a non-0 return code: {self.returncode}"
                    f"{self._type} post-renderer terminated with a non-0 return code: {self.returncode}"
                )
        else:
            logger.warning("%s post-rend was set-up, but not run", self._type)
            logger.warning("%s post-renderer was set-up, but not run", self._type)
        # next assert is not OK since stderr contains messages even when decoding was successful
        # assert not self.stderr, self._type + " decoder stderr is not empty"


@pytest.fixture(scope="function")
def dut_postrend_frontend(dut_postrend_path, request) -> Optional[PostRendFrontend]:
    """
    Return a :class:`conftest.PostRendFrontend` instance as DUT for the test session.
    """
    postrend = None

    if dut_postrend_path and request.node.funcargs["out_format"] in [
        "BINAURAL_SPLIT_CODED",
        "BINAURAL_SPLIT_PCM",
    ]:
        timeout = request.config.getoption("--testcase_timeout")
        postrend = PostRendFrontend(
            dut_postrend_path,
            "DUT",
            timeout=timeout,
            fr=request.config.option.dut_fr,
        )

    yield postrend

    # Fixture teardown
    if postrend is not None:
        postrend._check_run()

        # assert not self.stderr, self._type + " post-renderer stderr is not empty"

@pytest.fixture(scope="session")
def ref_postrend_path(request) -> Optional[str]:
@@ -1507,9 +1470,6 @@ def ref_postrend_path(request) -> Optional[str]:
    if request.config.option.ref_postrend_path:
        return request.config.option.ref_postrend_path

    if request.config.option.update_ref == "0":
        return None

    here = Path(__file__).parent.resolve()
    system = platform.system()

@@ -1529,20 +1489,42 @@ def ref_postrend_path(request) -> Optional[str]:


@pytest.fixture(scope="function")
def ref_postrend_frontend(ref_postrend_path, request) -> Optional[PostRendFrontend]:
def ref_postrend_frontend(ref_postrend_path, request, record_property) -> Union[None, PostRendFrontend]:
    """
    Return a :class:`conftest.PostRendFrontend` instance as REF for the test session.
    """
    postrend = None

    if ref_postrend_path and request.node.funcargs["out_format"] in [
        "BINAURAL_SPLIT_CODED",
        "BINAURAL_SPLIT_PCM",
    ]:
    if ref_postrend_path:
        timeout = request.config.getoption("--testcase_timeout")
        postrend = PostRendFrontend(
            ref_postrend_path,
            "REF",
            record_property,
            timeout=timeout,
            fr=request.config.option.dut_fr,
        )

    yield postrend

    # Fixture teardown
    if postrend is not None:
        postrend._check_run()


@pytest.fixture(scope="function")
def dut_postrend_frontend(dut_postrend_path, request, record_property) -> Union[None, PostRendFrontend]:
    """
    Return a :class:`conftest.PostRendFrontend` instance as DUT for the test session.
    """
    postrend = None

    if dut_postrend_path:
        timeout = request.config.getoption("--testcase_timeout")
        postrend = PostRendFrontend(
            dut_postrend_path,
            "DUT",
            record_property,
            timeout=timeout,
            fr=request.config.option.dut_fr,
        )
@@ -1552,3 +1534,30 @@ def ref_postrend_frontend(ref_postrend_path, request) -> Optional[PostRendFronte
    # Fixture teardown
    if postrend is not None:
        postrend._check_run()



@pytest.fixture(scope="session")
def dut_postrend_path(request) -> Optional[str]:
    """
    Return path of DUT postrend binary.
    """
    if request.config.option.dut_postrend_path:
        return request.config.option.dut_postrend_path

    here = Path(__file__).parent.resolve()
    system = platform.system()

    if system == "Windows":
        path = here.joinpath("../ISAR_post_rend.exe")
    elif system in ["Darwin", "Linux"]:
        path = here.joinpath("../ISAR_post_rend")
    else:
        raise ValueError(f'Wrong system "{system}"!')

    path = str(path.resolve())

    if not os.path.isfile(path):
        raise FileNotFoundError(f"DUT postrend binary {path} not found!\n!")

    return path
 No newline at end of file
+102 −16
Original line number Diff line number Diff line
@@ -67,7 +67,7 @@ from pyaudio3dtools.audiofile import readfile
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
sys.path.append(ROOT_DIR)

from tests.conftest import EncoderFrontend, DecoderFrontend
from tests.conftest import EncoderFrontend, DecoderFrontend, PostRendFrontend


@pytest.mark.parametrize("dtx", [False, True])
@@ -89,6 +89,7 @@ def test_rtp_bitstream_amrwb(
        "MONO",
        dtx,
        framesPerPacket,
        False,
        dut_encoder_frontend,
        dut_decoder_frontend,
    )
@@ -117,6 +118,7 @@ def test_rtp_bitstream_evs(
        "MONO",
        dtx,
        framesPerPacket,
        False,
        dut_encoder_frontend,
        dut_decoder_frontend,
    )
@@ -143,6 +145,7 @@ def test_rtp_bitstream_ivas_nodtx(
        format,
        False,
        framesPerPacket,
        False,
        dut_encoder_frontend,
        dut_decoder_frontend,
    )
@@ -169,11 +172,42 @@ def test_rtp_bitstream_ivas_dtx(
        format,
        True,
        framesPerPacket,
        False,
        dut_encoder_frontend,
        dut_decoder_frontend,
    )


@pytest.mark.parametrize("bitrate", [48000])
@pytest.mark.parametrize("bandwidth", ["FB"])
@pytest.mark.parametrize("format", ["SBA"])
@pytest.mark.parametrize("framesPerPacket", [1])
@pytest.mark.parametrize("splitRendering_format", ["BINAURAL_SPLIT_CODED"])
def test_rtp_bitstream_isar(
    test_info,
    bitrate: int,
    bandwidth: str,
    format: str,
    framesPerPacket: int,
    dut_encoder_frontend: EncoderFrontend,
    dut_decoder_frontend: DecoderFrontend,
    splitRendering_format: str,
    dut_postrend_frontend: PostRendFrontend,
):
    run_rtp_bitstream_tests(
        CODECS.IVAS,
        bitrate,
        bandwidth,
        "OFF",
        format,
        False,
        framesPerPacket,
        dut_encoder_frontend,
        dut_decoder_frontend,
        splitRendering_format,
        dut_postrend_frontend,
    )

def generateRequests(startTs: int, endTs: int) -> dict:
    requests = dict()
    return requests
@@ -556,6 +590,13 @@ class TVARGS:
        self.deviceFile = (
            Path(ROOT_DIR).joinpath("scripts/trajectories/headrot-1.5s.csv").absolute()
        )
        self.preRendHeadRotFile = (
            Path(ROOT_DIR).joinpath("scripts/trajectories/rotate_euler_quaternion_30s_delayed.csv").absolute()
        )
        self.postRendHeadRotFile = (
            Path(ROOT_DIR).joinpath("scripts/trajectories/rotate_euler_quaternion_30s.csv").absolute()
        )
        

    def add(self, fmt: str, inputFile: str, args: list[str] = []):
        inputFile = str(TVARGS.TVROOT.joinpath(inputFile).absolute())
@@ -591,6 +632,8 @@ def run_rtp_bitstream_tests(
    framesPerPacket: int,
    dut_encoder_frontend: EncoderFrontend,
    dut_decoder_frontend: DecoderFrontend,
    splitRendering_format: Optional[str] = None,
    dut_postrend_frontend: Optional[PostRendFrontend] = None,
):
    tvArgs = TVARGS()
    tvArgs.add("MONO", "stv48n.wav")
@@ -635,6 +678,11 @@ def run_rtp_bitstream_tests(
            .joinpath(f"input-{codec}-{bitrate}-{caMode}-{format}-{dtx}.rtpdump")
            .absolute()
        )
        splitOut = (
            Path(tmp_dir)
            .joinpath(f"split-{codec}-{bitrate}-{caMode}-{format}-{dtx}.cod")
            .absolute()
        )
        pcmOut = (
            Path(tmp_dir)
            .joinpath(f"output-{codec}-{bitrate}-{caMode}-{format}-{dtx}.wav")
@@ -666,7 +714,12 @@ def run_rtp_bitstream_tests(

        packer = IvasRtp(numFramesPerPacket=framesPerPacket, codec=codec)

        split_rendering = False
        if codec == CODECS.IVAS:
            if splitRendering_format:
                split_rendering = True
                outMode = splitRendering_format
            else:
                outMode = "STEREO" if format == "STEREO" else "BINAURAL"
            generatedPIData = generatePiData(0, 16000)
        else:
@@ -680,6 +733,7 @@ def run_rtp_bitstream_tests(
            requestsData=generateRequests(0, 1600),
        )

        if not split_rendering:
            dut_decoder_frontend.run(
                output_config=outMode,
                output_sampling_rate=48,
@@ -688,6 +742,23 @@ def run_rtp_bitstream_tests(
                add_option_list=[],
            )
            
        else:
            dut_decoder_frontend.run(
                output_config=outMode,
                output_sampling_rate=48,
                input_bitstream_path=g192Out,
                output_path=splitOut,
                add_option_list=["-T", str(tvArgs.preRendHeadRotFile)],
            )

            dut_postrend_frontend.run(
                input_file=splitOut,
                input_file_format=outMode,
                output_file=pcmOutG192,
                add_option_list=["-T", str(tvArgs.postRendHeadRotFile)],
            )

        if not split_rendering:
            dut_decoder_frontend.run(
                output_config=outMode,
                output_sampling_rate=48,
@@ -695,6 +766,21 @@ def run_rtp_bitstream_tests(
                output_path=pcmOut,
                add_option_list=["-VOIP_HF_ONLY=1", "-PiDataFile", str(piDataOutJson)],
            )
        else:
            dut_decoder_frontend.run(
                output_config=outMode,
                output_sampling_rate=48,
                input_bitstream_path=rtpdumpIn,
                output_path=splitOut,
                add_option_list=["-VOIP_HF_ONLY=1", "-PiDataFile", str(piDataOutJson), "-T", str(tvArgs.preRendHeadRotFile)],
            )
            
            dut_postrend_frontend.run(
                input_file=splitOut,
                input_file_format=outMode,
                output_file=pcmOut,
                add_option_list=["-T", str(tvArgs.postRendHeadRotFile)],
            )

        decAudio, fs = readfile(pcmOut)
        g192Audio, Fs = readfile(pcmOutG192)