Commit b5716a0c authored by TYAGIRIS's avatar TYAGIRIS
Browse files

use wavdiff and remove dependency on 3d tools

parent c3125154
Loading
Loading
Loading
Loading
+166 −4
Original line number Diff line number Diff line
@@ -37,14 +37,153 @@ import numpy as np
import subprocess
import tempfile
import sys
from typing import Optional
from typing import Tuple
from multiprocessing import Process, Value
import shutil
import scipy.io.wavfile as wav
import warnings
import math
import scipy.signal as sig

sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))

from pyaudio3dtools.audiofile import readfile, writefile
from pyaudio3dtools.audioarray import resample
#from pyaudio3dtools.audiofile import readfile, writefile
#from pyaudio3dtools.audioarray import resample


def readfile(
    filename: str, nchannels: int = 1, fs: int = 48000, outdtype="float"
) -> Tuple[np.ndarray, int]:
    """Read audio file (.pcm or .wav)

    Parameters
    ----------
    filename: str
        Input file path
    nchannels: Optional[int]
        Number of input channels, required for .pcm otherwise default = 1
    fs: Optional[int]
        Input sampling rate, required for .pcm input file, otherwise default = 48000 (Hz)
    outdtype: Optional[int]
        Data type of output array, python builtin or np.dtype

    Returns
    -------
    x: np array
        audio signal array
    fs: int
        signal sampling frequency

    """
    _, file_extension = os.path.splitext(os.path.basename(filename))

    if file_extension == ".wav":
        fs, data = wav.read(filename)
        if data.dtype == np.int32:
            data = np.interp(
                data,
                (np.iinfo(np.int32).min, np.iinfo(np.int32).max),
                (np.iinfo(np.int16).min, np.iinfo(np.int16).max),
            )
        elif data.dtype == np.float32:
            data = np.interp(
                data,
                (-1, 1),
                (np.iinfo(np.int16).min, np.iinfo(np.int16).max),
            )
        x = np.array(data, dtype=outdtype)
        file_len = x.shape[0]
        if x.ndim == 1:
            # force to be a mtx
            x = np.reshape(x, (file_len, 1))
    elif file_extension == ".pcm" or file_extension == ".raw":
        x = np.fromfile(filename, dtype=np.int16).astype(outdtype)
        signal_len = len(x) // nchannels
        x = x.reshape(signal_len, nchannels)
    else:
        raise ValueError("Wrong input format. Use wav or pcm")

    return x, fs


def writefile(filename: str, x: np.ndarray, fs: int = 48000) -> None:
    """Write audio file (.pcm or .wav)

    Parameters
    ----------
    filename: str
        Output file path (.pcm or .wav)
    x: np array
        Numpy 2D array of dimension: number of samples x number of channels
    fs: Optional[int]
        Output sampling rate, required for .pcm input file, otherwise default = 48000 (Hz)

    Returns
    -------
    None

    """
    _, file_extension = os.path.splitext(os.path.basename(filename))

    clipped_samples = np.sum(
        np.logical_or(x < np.iinfo(np.int16).min, x > np.iinfo(np.int16).max)
    )
    if clipped_samples > 0:
        warnings.warn(f"  Warning: {clipped_samples} samples clipped")
        x = np.clip(x, np.iinfo(np.int16).min, np.iinfo(np.int16).max)

    if file_extension == ".wav":
        x = x.astype(np.int16)
        wav.write(filename, fs, x)
    elif file_extension == ".pcm" or file_extension == ".raw":
        x = x.astype("int16").reshape(-1, 1)
        x.tofile(filename)
    else:
        raise ValueError("Wrong input format. Use wav or pcm")

def resample(x: np.ndarray, in_freq: int, out_freq: int) -> np.ndarray:
    """Resample a multi-channel audio array

    Parameters
    ----------
    x: numpy array
        Input array
    in_fs: int
        Input sampling rate
    out_fs: int
        Output sampling rate

    Returns
    -------
    y:
        Output resampled numpy array

    """

    if in_freq == out_freq or out_freq is None:
        y = x
    else:
        # get gcd of original and deisred frequency
        gcd = math.gcd(in_freq, out_freq)

        # calculate up-sampling factor
        up_factor = int(out_freq / gcd)

        # calculate downsampling factor
        down_factor = int(in_freq / gcd)

        # resample data using polyphase filtering across columns/channels
        if x.ndim == 2:
            y = sig.resample_poly(x[:, 0], up_factor, down_factor)
            y = np.reshape(y, (y.shape[0], 1))
            for k in range(1, x.shape[1]):
                a = sig.resample_poly(x[:, k], up_factor, down_factor)
                a = np.reshape(a, (a.shape[0], 1))
                y = np.append(y, a, axis=1)
        else:
            y = sig.resample_poly(x, up_factor, down_factor)

    return y


class MLDConformance:
@@ -80,6 +219,7 @@ class MLDConformance:
        self.cut_build_path = args.cut_build_path
        self.filter = args.filter
        self.mldbin = os.path.join(self.toolsdir, platform.system(), "mld")
        self.wavdiffbin = os.path.join(self.toolsdir, platform.system(), "wav-diff")
        self.CutBins = dict()
        self.mldcsv = dict()
        self.sampleStats = dict()
@@ -488,6 +628,9 @@ class MLDConformance:
                mldFile = os.path.join(
                    tmpdir, f"{tempfile.gettempprefix()}_ch{ch}_MLD.csv"
                )
                mldFile2 = os.path.join(
                    tmpdir, f"{tempfile.gettempprefix()}_ch{ch}_MLD2.txt"
                )                
                refFileMono = os.path.join(
                    tmpdir, os.path.basename(refFile).replace(".wav", f"_ch{ch}.wav")
                )
@@ -505,7 +648,26 @@ class MLDConformance:
                    dutFileMono,
                ]
                self.process(" ".join(command))
                mldThisChan = np.loadtxt(mldFile, delimiter=" ", dtype=float)

                command = [
                    self.wavdiffbin,
                    "-s",
                    refFileMono,
                    dutFileMono,
                ]
                with open(mldFile2, "w") as fd:
                    c = subprocess.run(
                    " ".join(command), stdout=fd, stderr=subprocess.STDOUT, text=True, shell=True
                    )
                    if c.returncode:
                        with open(self.failedCmdsFile, "a") as f:
                            f.write(command + "\n")
                        self.failedTests.value += 1

                    mldThisChan = np.loadtxt(mldFile2, delimiter=";", dtype=float, skiprows=1)
                    mldThisChan = mldThisChan[:, 2]
                    fd.close()
                mldThisChan2 = np.loadtxt(mldFile, delimiter=" ", dtype=float)
                if ch == 0:
                    mldThisFile = mldThisChan
                else: