Commit f8a48b18 authored by Jan Kiene's avatar Jan Kiene
Browse files

Merge branch 'ci/ssnr' into 'main'

[BASOP-CI] Add Segmental SNR computation to pytest tests

See merge request !1635
parents b42586ad 5d3f606e
Loading
Loading
Loading
Loading
Loading
+19 −6
Original line number Diff line number Diff line
@@ -92,11 +92,12 @@ ARROW_DOWN = '<span class="arrowdown">&#11010;</span>'

# expected columns. actual columns are filtered from the incoming data later, this
# is mainly for controlling the order in the output table
COLUMNS = ["testcase", "Result", "MLD", "MAXIMUM ABS DIFF"]
COLUMNS = ["testcase", "Result", "MLD", "MAXIMUM ABS DIFF", "MIN_SSNR"]
COLUMNS_GLOBAL = COLUMNS[:1]
COLUMNS_DIFFERENTIAL = COLUMNS[1:]
COLUMNS_DIFFERENTIAL_NOT_MLD = COLUMNS_DIFFERENTIAL[2:]


def create_subpage(
    html_out,
    csv_out,
@@ -111,11 +112,18 @@ def create_subpage(
    )
    write_out_csv(merged_reports, merged_reports[0].keys(), csv_out)

    table_header_a = "".join([TH_TMPL_GLOBAL.format(c) for c in COLUMNS_GLOBAL] + [TH_TMPL_DIFFERENTIAL.format(c) for c in COLUMNS_DIFFERENTIAL])
    table_header_a = "".join(
        [TH_TMPL_GLOBAL.format(c) for c in COLUMNS_GLOBAL]
        + [TH_TMPL_DIFFERENTIAL.format(c) for c in COLUMNS_DIFFERENTIAL]
    )
    table_header_b = list()
    for c in COLUMNS_DIFFERENTIAL:
        table_header_b.append(TH_TMPL_SECOND_ROW.format(f"Previous Run<br>ID: {id_previous}"))
        table_header_b.append(TH_TMPL_SECOND_ROW.format(f"Current Run<br>ID: {id_current}"))
        table_header_b.append(
            TH_TMPL_SECOND_ROW.format(f"Previous Run<br>ID: {id_previous}")
        )
        table_header_b.append(
            TH_TMPL_SECOND_ROW.format(f"Current Run<br>ID: {id_current}")
        )
    table_header_b = "".join(table_header_b)
    table_body = "\n".join(
        tr_from_row(row, id_current, id_previous) for row in merged_reports
@@ -241,8 +249,13 @@ def merge_and_cleanup_mld_reports(

        return diff

    other_col_pairs = [(f"{col}-{id_previous}", f"{col}-{id_current}") for col in COLUMNS_DIFFERENTIAL_NOT_MLD]
    merged = sorted(merged, key=partial(sort_func, other_col_pairs=other_col_pairs), reverse=True)
    other_col_pairs = [
        (f"{col}-{id_previous}", f"{col}-{id_current}")
        for col in COLUMNS_DIFFERENTIAL_NOT_MLD
    ]
    merged = sorted(
        merged, key=partial(sort_func, other_col_pairs=other_col_pairs), reverse=True
    )

    # remove the unecessary whole path from the testcase names
    for row in merged:
+1 −1
Original line number Diff line number Diff line
@@ -7,7 +7,7 @@ from xml.etree import ElementTree
Parse a junit report and create an MLD summary report.
"""

PROPERTIES = ["MLD", "MAXIMUM ABS DIFF"]
PROPERTIES = ["MLD", "MAXIMUM ABS DIFF", "MIN_SSNR"]


# Main routine
+143 −31
Original line number Diff line number Diff line
@@ -31,6 +31,7 @@
"""

import logging
import warnings
import math
import multiprocessing as mp
import platform
@@ -232,6 +233,10 @@ def compare(
    fs: int,
    per_frame: bool = True,
    get_mld: bool = False,
    get_ssnr: bool = False,
    ssnr_thresh_low: float = -np.inf,
    ssnr_thresh_high: float = np.inf,
    apply_thresholds_to_ref_only: bool = False,
) -> dict:
    """Compare two audio arrays

@@ -247,6 +252,18 @@ def compare(
        Compute difference per frame (default True)
    get_mld: bool
        Run MLD tool if there is a difference between the signals (default False)
    get_ssnr: bool
        Compute Segmental SNR between signals
    ssnr_thresh_low: float
        Low threshold for including a segment in the SSNR computation. Per default, both
        reference and test signal power are compared to this threshold, see below
    ssnr_thresh_high: float
        High threshold for including a segment in the SSNR computation. Per default, both
        reference and test signal power are compared to this threshold, see below
    apply_thresholds_to_ref_only: bool
        Set to True to only apply the threshold comparison for the reference signal
        for whether to include a segment in the ssnr computation. Use this to align
        behaviour with the MPEG-D conformance specification.

    Returns
    -------
@@ -266,8 +283,13 @@ def compare(
        "first_diff_pos_sample": -1,
        "first_diff_pos_channel": -1,
        "first_diff_pos_frame": -1,
        "MLD": 0 if get_mld else None,
    }

    if get_mld:
        result["MLD"] = 0
    if get_ssnr:
        result["SSNR"] = np.asarray([np.inf] * ref.shape[1])

    if per_frame:
        result["max_abs_diff_pos_frame"] = 0
        result["nframes_diff"] = 0
@@ -320,7 +342,6 @@ def compare(
            result["nframes_diff_percentage"] = nframes_diff_percentage

        if get_mld:

            mld_max = 0
            toolsdir = Path(__file__).parent.parent.joinpath("tools")
            if platform.system() == "Windows":
@@ -334,8 +355,14 @@ def compare(
                for i in range(nchannels):
                    tmpfile_ref = Path(tmpdir).joinpath(f"ref_ch{i+1}.wav")
                    tmpfile_test = Path(tmpdir).joinpath(f"test_ch{i+1}.wav")
                    r48 = np.clip( resample(ref[:, i].astype(float), fs, 48000), -32768, 32767 ).astype(np.int16) # Convert to float for resample, then to int16 for wavfile.write
                    t48 = np.clip( resample(test[:, i].astype(float), fs, 48000), -32768, 32767 ).astype(np.int16)
                    r48 = np.clip(
                        resample(ref[:, i].astype(float), fs, 48000), -32768, 32767
                    ).astype(
                        np.int16
                    )  # Convert to float for resample, then to int16 for wavfile.write
                    t48 = np.clip(
                        resample(test[:, i].astype(float), fs, 48000), -32768, 32767
                    ).astype(np.int16)
                    wavfile.write(str(tmpfile_ref), 48000, r48)
                    wavfile.write(str(tmpfile_test), 48000, t48)
                    out = subprocess.check_output([mld, tmpfile_ref, tmpfile_test])
@@ -343,6 +370,19 @@ def compare(

            result["MLD"] = mld_max

        if get_ssnr:
            # length of segment is always 20ms
            len_seg = int(0.02 * fs)
            print(len_seg, ref.shape, test.shape)
            result["SSNR"] = ssnr(
                ref,
                test,
                len_seg,
                thresh_low=ssnr_thresh_low,
                thresh_high=ssnr_thresh_high,
                apply_thresholds_to_ref_only=apply_thresholds_to_ref_only,
            )

    return result


@@ -513,3 +553,75 @@ def process_async(files: Iterable, func: Callable, **kwargs):
    for r in results:
        r.get()
    return results


def ssnr(
    ref_sig: np.ndarray,
    test_sig: np.ndarray,
    len_seg: int,
    thresh_low: float = -200,
    thresh_high: float = 0,
    apply_thresholds_to_ref_only: bool = False,
) -> np.ndarray:
    """
    Calculate Segmental SNR for test_sig to ref_sig as defined in ISO/IEC 14496-4
    """
    ss = list()

    ref_sig_norm = ref_sig / -np.iinfo(np.int16).min
    test_sig_norm = test_sig / -np.iinfo(np.int16).min

    # check if diff of signal is zero already, then SNR is infinite, since no noise
    diff_sig_norm = ref_sig_norm - test_sig_norm
    if np.all(diff_sig_norm == 0):
        return np.asarray([np.inf] * ref_sig_norm.shape[1])

    channels_identical_idx = np.sum(np.abs(diff_sig_norm), axis=0) == 0

    denom_add = 10**-13 * len_seg
    segment_counter = np.zeros(ref_sig.shape[1])

    # iterate over test signal too to allow power comparison to threshold
    for ref_seg, diff_seg, test_seg in zip(
        get_framewise(ref_sig_norm, len_seg, zero_pad=True),
        get_framewise(diff_sig_norm, len_seg, zero_pad=True),
        get_framewise(test_sig_norm, len_seg, zero_pad=True),
    ):
        nrg_ref = np.sum(ref_seg**2, axis=0)
        nrg_diff = np.sum(diff_seg**2, axis=0)

        ss_seg = np.log10(1 + nrg_ref / (denom_add + nrg_diff))

        # only sum up segments that fall inside the thresholds
        # add small eps to nrg_ref to prevent RuntimeWarnings from numpy
        ref_power = 10 * np.log10((nrg_ref + 10**-7) / len_seg)
        zero_mask = np.logical_or(ref_power < thresh_low, ref_power > thresh_high)

        # create same mask for test signal
        if not apply_thresholds_to_ref_only:
            nrg_test = np.sum(test_seg**2, axis=0)
            test_power = 10 * np.log10((nrg_test + 10**-7) / len_seg)
            zero_mask_test = np.logical_or(
                test_power < thresh_low, test_power > thresh_high
            )
            zero_mask = np.logical_or(zero_mask, zero_mask_test)

        ss_seg[zero_mask] = 0
        # increase segment counter only for channels that were not zeroed
        segment_counter += np.logical_not(zero_mask)

        ss.append(ss_seg)

    # if the reference signal was outside the thresholds for all segments in a channel, segment_counter is zero
    # for that channel and the division here would trigger a warning. We supress the warning and later
    # set the SSNR for those channels to nan manually instead (overwriting later is simply easier than adding ifs here)
    with warnings.catch_warnings():
        ssnr = np.round(
            10 * np.log10(10 ** (np.sum(ss, axis=0) / segment_counter) - 1), 2
        )
    ssnr[segment_counter == 0] = np.nan

    # this prevents all-zero channels in both signals to be reported as -inf
    ssnr[channels_identical_idx] = np.inf

    return ssnr

scripts/ssnr.py

0 → 100644
+62 −0
Original line number Diff line number Diff line
import argparse
import sys
import pathlib
from pyaudio3dtools import audiofile, audioarray


def main(args):
    ref_sig, fs_ref = audiofile.readfile(args.ref_file)
    test_sig, fs_test = audiofile.readfile(args.test_file)

    if fs_ref != fs_test:
        print("Files need to have same sampling rate!")
        return -1

    len_seg = int(20 * fs_ref / 1000)
    print(len_seg, ref_sig.shape, test_sig.shape)
    ssnr = audioarray.ssnr(
        ref_sig,
        test_sig,
        len_seg,
        args.thresh_low,
        args.thresh_high,
        args.apply_thresholds_on_ref_only,
    )

    for i, s in enumerate(ssnr, start=1):
        print(f"Channel {i}: {s}")

    return 0


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("ref_file", type=pathlib.Path, help="Reference signal wav file")
    parser.add_argument(
        "test_file", type=pathlib.Path, help="Signal under test wav file"
    )
    parser.add_argument(
        "--thresh_low",
        type=float,
        default="-inf",
        help="Low threshold for signal power in a segment to be used in the SSNR calculation (default: -inf).\n"
        "Applied to both signals per default (see apply_thresholds_on_ref_only argument).",
    )
    parser.add_argument(
        "--thresh_high",
        type=float,
        default="inf",
        help="High threshold for signal power in a segment to be used in the SSNR calculation (default: +inf).\n"
        "Applied to both signals per default (see apply_thresholds_on_ref_only argument).",
    )
    parser.add_argument(
        "--apply_thresholds_on_ref_only",
        action="store_true",
        default=False,
        help="Use this to apply the thresholding on signal power to the reference signal only.\n"
        "This makes the implementation behaviour conform to the MPEG-D conformance spec.",
    )

    args = parser.parse_args()

    sys.exit(main(args))
+25 −7
Original line number Diff line number Diff line
@@ -13,14 +13,15 @@ import pyivastest


def cmp_pcm(
    file1,
    file2,
    ref_file,
    cmp_file,
    out_config,
    fs,
    get_mld=False,
    allow_differing_lengths=False,
    mld_lim=0,
    abs_tol=0,
    get_ssnr=False,
) -> (int, str):
    """
    Compare 2 PCM files for bitexactness
@@ -39,8 +40,12 @@ def cmp_pcm(
    else:
        nchannels = pyivastest.constants.OC_TO_NCHANNELS[out_config.upper()]

    s1, _ = pyaudio3dtools.audiofile.readfile(file1, nchannels, fs, outdtype=np.int16)
    s2, _ = pyaudio3dtools.audiofile.readfile(file2, nchannels, fs, outdtype=np.int16)
    s1, _ = pyaudio3dtools.audiofile.readfile(
        ref_file, nchannels, fs, outdtype=np.int16
    )
    s2, _ = pyaudio3dtools.audiofile.readfile(
        cmp_file, nchannels, fs, outdtype=np.int16
    )

    # In case of wav input, override the nchannels with the one from the wav header
    nchannels = s1.shape[1]
@@ -62,7 +67,13 @@ def cmp_pcm(
        return 1, reason

    cmp_result = pyaudio3dtools.audioarray.compare(
        s1, s2, fs, per_frame=False, get_mld=get_mld
        s1,
        s2,
        fs,
        per_frame=False,
        get_mld=get_mld,
        get_ssnr=get_ssnr,
        ssnr_thresh_low=-50,
    )

    output_differs = 0
@@ -90,13 +101,20 @@ def cmp_pcm(
        else:
            reason += f" > {mld_lim}"

    if get_ssnr:
        reason += "\n"
        for i, s in enumerate(cmp_result["SSNR"], start=1):
            msg = f"Channel {i} SSNR: {s}"
            reason += msg + "\n"
            print(msg)

    return output_differs, reason


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("file1", type=str)
    parser.add_argument("file2", type=str)
    parser.add_argument("ref_file", type=str)
    parser.add_argument("cmp_file", type=str)
    parser.add_argument(
        "-o",
        "--out_config",
Loading