Commit 7d863503 authored by BOHMRR's avatar BOHMRR
Browse files

Merge branch '42-add-sba-pytest-run-to-mr-pipeline' into 'main'

Resolve  "Add SBA pytest run to MR pipeline"

See merge request !24
parents 0c0f9370 03c50ca3
Loading
Loading
Loading
Loading
+30 −3
Original line number Diff line number Diff line
@@ -146,15 +146,42 @@ self-test-on-merge-request:
    - python3 ./scripts/self_test.py --encref IVAS_cod_ref --decref IVAS_dec_ref --enctest IVAS_cod_test --dectest IVAS_dec_test | tee test_output.txt

    ### analyse test output

    # some helper variables - "|| true" to prevent failures from grep not finding anything
    - non_be_flag=$(echo $CI_MERGE_REQUEST_TITLE | grep -c --ignore-case "\[non[ -]*be\]") || true
    - run_errors=$(cat test_output.txt | grep -c "test conditions had run errors") || true
    - bitexact=$(cat test_output.txt | grep -c "All [0-9]* tests are bitexact") || true
    - EXIT_CODE_NON_BE=123
    - EXIT_CODE_FAIL=1

    - selftest_exit_code=0

    # check for crashes during the test, if any happened, fail the test
    - if cat test_output.txt | grep -c "Run errors were encountered for the following conditions:"; then echo "Codec had run errors"; exit 1; fi
    # check for non bitexact output and fail test if the merge request does not have a non-BE tag
    - if ! cat test_output.txt | grep -c "All [0-9]* tests are bitexact" && ! echo $CI_MERGE_REQUEST_TITLE | grep -c --ignore-case "\[non[ -]*be\]"; then echo "Non-bitexact cases without non-BE tag encountered"; exit 1; fi
    - if [ $run_errors != 0 ] ; then echo "Run errors in self_test.py"; exit $EXIT_CODE_FAIL; fi

    # check for non bitexact output and store exit code to also always run the SBA pytest
    - if [ $bitexact == 0 ] && [ $non_be_flag == 0 ] ; then echo "Non-bitexact cases without non-BE tag encountered"; selftest_exit_code=$EXIT_CODE_FAIL; fi
    - if [ $bitexact == 0 ] && [ $non_be_flag != 0 ]; then echo "Non-bitexact cases with non-BE tag encountered"; selftest_exit_code=$EXIT_CODE_NON_BE; fi

    ### run SBA pytest
    - exit_code=0
    - python3 ./scripts/ivas_pytests/self_test_b.py --encref IVAS_cod_ref --decref IVAS_dec_ref --encdut IVAS_cod_test --decdut IVAS_dec_test || exit_code=$?
    - if [ $exit_code -eq 1 ] && [ $non_be_flag == 0 ]; then echo "pytest run had failures and non-BE flag not present"; exit $EXIT_CODE_FAIL; fi
    - zero_errors=$(cat report-junit.xml | grep -c 'testsuite errors="0"') || true
    - if [ $exit_code -eq 1 ] && [ $zero_errors == 1]; then echo "pytest run had failures, but no errors and non-BE flag present"; exit $EXIT_CODE_NON_BE; fi
    - if [ $exit_code -ne 0 ]; then echo "pytest run had errors"; exit $EXIT_CODE_FAIL; fi;
    # return exit code from selftest if everything went well with the pytest run
    - exit $selftest_exit_code
  allow_failure:
    exit_codes:
      - 123
  artifacts:
    paths:
      - test_output.txt
      - scripts/test/logs
      - scripts/ref/logs
    reports:
      junit: report-junit.xml


# Pull state of a branch on 3GPP repo, push to a mirror repo.

pytest.ini

0 → 100644
+8 −0
Original line number Diff line number Diff line
# pytest.ini
# note: per convention, this file is placed in the root directory of the repository
[pytest]
addopts = -ra --tb=short --basetemp=./tmp -v
junit_family=xunit1
log_file_level = DEBUG
log_format = %(asctime)s %(levelname)s %(message)s
log_date_format = %Y-%m-%d %H:%M:%S
+12 −8
Original line number Diff line number Diff line
@@ -206,11 +206,13 @@ class EncoderFrontend:
        if self.stderr:
            stderr_str = textwrap.indent(self.stderr, prefix="\t")
            log_dbg_msg(f"{self._type} encoder stderr:\n{stderr_str}")
        assert self.returncode == 0, self._type + " encoder terminated with a non-0 return code"
        if self.returncode:
            pytest.fail(f"{self._type} encoder terminated with a non-0 return code: {self.returncode}")

    def _check_run(self):
        if self.returncode is not None:
            if self.returncode:
            assert self.returncode == 0, self._type + " encoder terminated with a non-0 return code"
                pytest.fail(f"{self._type} encoder terminated with a non-0 return code: {self.returncode}")
        else:
            logger.warning("%s encoder was set-up, but not run", self._type)
        # next assert is not OK since stderr contains messages even when encoding was successful
@@ -223,7 +225,7 @@ def dut_encoder_frontend(dut_encoder_path) -> EncoderFrontend:
    yield encoder

    # Fixture teardown
#    encoder._check_run()
    encoder._check_run()


@pytest.fixture(scope="session")
@@ -322,11 +324,13 @@ class DecoderFrontend:
        if self.stderr:
            stderr_str = textwrap.indent(self.stderr, prefix="\t")
            log_dbg_msg(f"{self._type} decoder stderr:\n{stderr_str}")
        assert self.returncode == 0, self._type + " decoder terminated with a non-0 return code"
        if self.returncode:
            pytest.fail(f"{self._type} decoder terminated with a non-0 return code: {self.returncode}")

    def _check_run(self):
        if self.returncode is not None:
            if self.returncode:
            assert self.returncode == 0, self._type + " decoder terminated with a non-0 return code"
                pytest.fail(f"{self._type} decoder terminated with a non-0 return code: {self.returncode}")
        else:
            logger.warning("%s decoder was set-up, but not run", self._type)
        # next assert is not OK since stderr contains messages even when decoding was successful
@@ -339,7 +343,7 @@ def dut_decoder_frontend(dut_decoder_path) -> DecoderFrontend:
    yield decoder

    # Fixture teardown
#    decoder._check_run()
    decoder._check_run()


@pytest.fixture(scope="session")
+12 −141
Original line number Diff line number Diff line
@@ -42,15 +42,9 @@ import os
import sys
import argparse
import subprocess
import shutil
import platform
from pathlib import Path

TRUNK_SVN_PATH = "https://svnext02.iis.fraunhofer.de/ivas_dev/trunk"

SVNUSER = os.getenv("SVNUSER")
SVNPASSWD = os.getenv("SVNPASSWD")

BIN_EXT = ".exe" if platform.system() == "Windows" else ""
HERE = Path(__file__).parent.resolve()
DEFAULT_ENCODER_DUT = str(HERE.joinpath(f"../../IVAS_cod{BIN_EXT}").resolve())
@@ -58,54 +52,11 @@ DEFAULT_DECODER_DUT = str(HERE.joinpath(f"../../IVAS_dec{BIN_EXT}").resolve())
DEFAULT_ENCODER_REF = str(HERE.joinpath(f"../../IVAS_cod_ref{BIN_EXT}").resolve())
DEFAULT_DECODER_REF = str(HERE.joinpath(f"../../IVAS_dec_ref{BIN_EXT}").resolve())
CREND_UNITTEST_REF = str(HERE.joinpath(f"tests/unit_tests/crend/IVAS_crend_unit_test_ref{BIN_EXT}").resolve())
DATA_DIR = str(HERE.joinpath("../../../data").resolve())
TEST_VECTOR_DIR = str(HERE.joinpath("testv").resolve())
TEST_VECTOR_DIR = str(HERE.joinpath("../testv").resolve())
REFERENCE_DIR = str(HERE.joinpath("ref").resolve())
DUT_BASE_DIR = str(HERE.joinpath("dut").resolve())


def get_svn_trunk_revision():
    """
    Get the SVN HEAD revision number of the trunk folder.
    """
    command = ["svn"]
    if SVNUSER:
        command.extend(["--username", SVNUSER])
    if SVNPASSWD:
        command.extend(["--password", SVNPASSWD])
    command.extend(["info", TRUNK_SVN_PATH])
    result = subprocess.run(command, capture_output=True, check=True)
    for line in result.stdout.decode("ascii").splitlines():
        if line.startswith("Revision:"):
            return int(line.split()[1])
    assert 0, "unable to determine SVN trunk revision"


def get_svn_local_revision():
    """
    Get the SVN revision number of the local (trunk) folder.
    """
    command = ["svn", "info", "."]
    result = subprocess.run(command, capture_output=True, check=True)
    for line in result.stdout.decode("ascii").splitlines():
        if line.startswith("Revision:"):
            return int(line.split()[1])
    return 0  # unable to determine local SVN revision


def svn_export_trunk(revision, out_folder):
    """
    Export the SVN trunk folder in a specific revision.
    """
    command = ["svn"]
    if SVNUSER:
        command.extend(["--username", SVNUSER])
    if SVNPASSWD:
        command.extend(["--password", SVNPASSWD])
    command.extend(["export", "-q", "-r", str(revision), TRUNK_SVN_PATH, out_folder])
    subprocess.run(command, check=True)


def build_enc_and_dec(src_dir):
    """
    Build the encoder and decoder binaries.
@@ -146,40 +97,6 @@ def build_crend_unittest(src_dir):
        subprocess.run(command, check=True)


def build_ref_binaries(ref_decoder_path, ref_encoder_path, ref_version):
    """
    Build the REF binaries.
    """
    print("Building the REF binaries")
    if ref_version is None:
        ref_version = get_svn_local_revision()
        if ref_version == 0:
            ref_version = get_svn_trunk_revision()

    if ref_version == 0:
        # special REF binaries generation: copy DUT binaries
        print("- copy DUT binaries -> REF binaries")
        dut_src_dir = str(HERE.joinpath("../..").resolve())
        shutil.copy(f"{dut_src_dir}/IVAS_cod{BIN_EXT}", ref_encoder_path)
        shutil.copy(f"{dut_src_dir}/IVAS_dec{BIN_EXT}", ref_decoder_path)
        shutil.copy(f"{dut_src_dir}/scripts/ivas_pytests/tests/unit_tests/crend/IVAS_crend_unit_test{BIN_EXT}", CREND_UNITTEST_REF)
        return

    # is a local directory with the reference version available?
    ref_src_dir = str(HERE.joinpath(f"../../../ref_src/{ref_version}").resolve())
    if not os.path.exists(ref_src_dir):
        print(f"ref src dir {ref_src_dir} does not exist")
        svn_export_trunk(ref_version, ref_src_dir)
    build_enc_and_dec(ref_src_dir)
    print(f"copy: {ref_src_dir}/IVAS_cod{BIN_EXT} -> {ref_encoder_path}")
    shutil.copy(f"{ref_src_dir}/IVAS_cod{BIN_EXT}", ref_encoder_path)
    print(f"copy: {ref_src_dir}/IVAS_dec{BIN_EXT} -> {ref_decoder_path}")
    shutil.copy(f"{ref_src_dir}/IVAS_dec{BIN_EXT}", ref_decoder_path)
    build_crend_unittest(ref_src_dir)
    print(f"copy: {ref_src_dir}/scripts/ivas_pytests/tests/unit_tests/crend/IVAS_crend_unit_test{BIN_EXT} -> {CREND_UNITTEST_REF}")
    shutil.copy(f"{ref_src_dir}/scripts/ivas_pytests/tests/unit_tests/crend/IVAS_crend_unit_test{BIN_EXT}", CREND_UNITTEST_REF)


def build_dut_binaries():
    """
    Build the DUT binaries.
@@ -190,60 +107,12 @@ def build_dut_binaries():
    build_crend_unittest(dut_src_dir)


def setup_test_vector_directory():
    """
    Setup the test vector directory.

    Copy test vectors from data directory.
    Current assumption: data directory is available and parallel to trunk.
    Later, after some cleanup in the data folder, we can consider to do
    "svn export" of the test vectors folder.
    """
    os.makedirs(TEST_VECTOR_DIR)
    os.makedirs(f"{TEST_VECTOR_DIR}/spar_foa_bs/enc")
    os.makedirs(f"{TEST_VECTOR_DIR}/spar_foa_bs/dec")
    os.makedirs(f"{TEST_VECTOR_DIR}/crend/tests/crend")
    os.makedirs(f"{TEST_VECTOR_DIR}/crend/tests/reverb")
    os.makedirs(f"{TEST_VECTOR_DIR}/crend/tests/proximity")
    os.makedirs(f"{TEST_VECTOR_DIR}/crend/tests/ingest")
    os.makedirs(f"{TEST_VECTOR_DIR}/crend/tests/otr")
    systest_subfolder = ["spar_foa_bs/enc/in", "spar_foa_bs/dec/plc_file"]
    for folder in systest_subfolder:
        shutil.copytree(f"{DATA_DIR}/ivas_pytests/system_tests/{folder}", f"{TEST_VECTOR_DIR}/{folder}")
    unittest_subfolder = [
        "crend/tests/crend/in_wav",
        "crend/tests/crend/in_new_wav",
        "crend/tests/crend/in_csv_wav",
        "crend/tests/reverb/in_wav",
        "crend/tests/proximity/in_wav",
        "crend/tests/proximity/bitstream",
        "crend/tests/ingest/in_wav",
        "crend/tests/custom/in_wav",
        "crend/tests/hr/in_wav",
        "crend/tests/SOFA",
        "crend/tests/default-SOFA",
        "crend/tests/custom_SOFA",
        "crend/tests/hrtf_ref",
        "crend/tests/csv",
        "crend/tests/render_config",
    ]
    for folder in unittest_subfolder:
        shutil.copytree(f"{DATA_DIR}/ivas_pytests/unit_tests/{folder}", f"{TEST_VECTOR_DIR}/{folder}")


def main(argv):
    # check for python >= 3.7
    if sys.version_info[0] < 3 or sys.version_info[1] < 7:
        sys.exit("This script is written for Python >= 3.7. Found: " + platform.python_version())

    parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
    parser.add_argument(
        "--rref",
        action="store",
        type=int,
        default=None,
        help="SVN revision for the reference (default: HEAD)",
    )
    parser.add_argument(
        "--create_only",
        action="store_true",
@@ -294,14 +163,11 @@ def main(argv):
        else:
            decref_path = DEFAULT_DECODER_REF
        if not os.path.exists(encref_path) or not os.path.exists(decref_path):
            build_ref_binaries(decref_path, encref_path, args.rref)
            sys.exit("Reference binaries do not exist.")

    # check for test vectors
    if os.path.exists(TEST_VECTOR_DIR):
        print(f"Using existing test vector directory {TEST_VECTOR_DIR}")
    else:
        print(f"Creating test vector directory {TEST_VECTOR_DIR}")
        setup_test_vector_directory()
    if not os.path.exists(TEST_VECTOR_DIR):
        sys.exit(f"Test vector directory {TEST_VECTOR_DIR} does not exist.")

    # check for references
    if os.path.exists(REFERENCE_DIR):
@@ -328,6 +194,10 @@ def main(argv):
            encref_path,
            "--ref_decoder_path",
            decref_path,
            "--dut_encoder_path",
            encdut_path,
            "--dut_decoder_path",
            decdut_path,
        ]
        # work-around in unit tests via environment variable
        # TESTVECTOR_PATH_REL_GROUPB: to specify the test vector directory relative to ivas_pytests folder
@@ -336,10 +206,10 @@ def main(argv):
        my_env["TESTVECTOR_PATH_REL_GROUPB"] = "testv/"
        my_env["TESTVECTOR_PATH_REL_TRUNK"] = "/scripts/ivas_pytests/testv/"  # leading "/" is important
        my_env["CREND_UNIT_TEST_BIN"] = CREND_UNITTEST_REF
        print("pytest command line to be executed from trunk folder:")
        print("pytest command line to be executed from project root folder:")
        print(" ".join(base_cmd + ["-m", "create_ref", "-n", args.numprocesses]))
        subprocess.run(base_cmd + ["-m", "create_ref", "-n", args.numprocesses], check=False, env=my_env)
        print("pytest command line to be executed from trunk folder:")
        print("pytest command line to be executed from project root folder:")
        print(" ".join(base_cmd + ["-m", "create_ref_serial"]))
        subprocess.run(base_cmd + ["-m", "create_ref_serial"], check=False, env=my_env)

@@ -366,9 +236,10 @@ def main(argv):
        encdut_path,
        "--dut_decoder_path",
        decdut_path,
        "--junit-xml=report-junit.xml",
    ]
    # print pytest commandline
    print("pytest command line to be executed from trunk folder:")
    print("pytest command line to be executed from project root folder:")
    print(" ".join(cmd))
    result = subprocess.run(cmd, check=False)
    return result.returncode
+113 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3

__license__ = """
(C) 2022 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB,
Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
contributors to this repository. All Rights Reserved.

This software is protected by copyright law and by international treaties.
The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB,
Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
contributors to this repository retain full ownership rights in their respective contributions in
the software. This notice grants no license of any kind, including but not limited to patent
license, nor is any license granted by implication, estoppel or otherwise.

Contributors are required to enter into the IVAS codec Public Collaboration agreement before making
contributions.

This software is provided "AS IS", without any express or implied warranties. The software is in the
development stage. It is intended exclusively for experts who have experience with such software and
solely for the purpose of inspection. All implied warranties of non-infringement, merchantability
and fitness for a particular purpose are hereby disclaimed and excluded.

Any dispute, controversy or claim arising under or in relation to providing this software shall be
submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in
accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and
the United Nations Convention on Contracts on the International Sales of Goods.
"""

__doc__ = """
Script to cut samples from a 16-bit PCM file.

USAGE : cut_pcm.py  in_file_pcm  out_file_pcm  num_channels  sample_rate  start  duration  [gain]
in_file_pcm:  input PCM file
out_file_pcm: output PCM file
num_channels: number of channels
sample_rate:  sample rate in Hz
start:        first sample from input file in seconds
duration:     duration in seconds
gain:         optional gain value to apply to the copied samples
"""

import sys
import platform


def usage():
    print(__doc__)
    return 1


def cut_samples(in_file, out_file, num_channels, sample_rate, start, duration, gain="1.0"):
    """
    Function to cut samples from a 16-bit PCM file.
    """

    # check for python >= 3.7
    if sys.version_info[0] < 3 or sys.version_info[1] < 7:
        sys.exit("This script is written for Python >= 3.7. Found: " + platform.python_version())

    # all input parameters are strings - convert some
    num_ch = int(num_channels)
    fs = int(sample_rate)
    start_sec = float(start)
    dur_sec = float(duration)
    gain_f = float(gain)

    with open(in_file, "rb") as fid_in:
        fid_in.seek(0, 2)
        num_in_samples = fid_in.tell() / 2
        num_in_samples_ch = num_in_samples / num_ch
        num_samples_to_skip = int(start_sec * fs)
        dur_samples = dur_sec * fs
        if num_samples_to_skip + dur_samples > num_in_samples_ch:
            sys.exit(
                f"requested too many samples ({num_samples_to_skip}+{dur_samples})"
                + f" - input is too short ({num_in_samples_ch})"
            )
        num_bytes_to_skip = num_samples_to_skip * num_ch * 2
        num_bytes_to_copy = dur_samples * num_ch * 2
        fid_in.seek(num_bytes_to_skip, 0)
        num_clip = 0
        with open(out_file, "wb") as fid_out:
            bytes_written = 0
            while bytes_written < num_bytes_to_copy:
                data = fid_in.read(2)
                val = int.from_bytes(data, byteorder="little", signed=True)
                val = int(val * gain_f)
                if val > 32767:
                    val = 32767
                    num_clip += 1
                if val < -32768:
                    val = -32768
                    num_clip += 1
                data = val.to_bytes(2, byteorder="little", signed=True)
                written = fid_out.write(data)
                assert written == 2, f"Error writing data: {written} != {2}"
                bytes_written += 2
        if num_clip:
            print(f"{num_clip} output samples have been clipped.")


def main(argv):
    if len(argv) < 7:
        return usage()
    return cut_samples(*argv[1:])


if __name__ == "__main__":
    sys.exit(main(sys.argv))
Loading