Commit 9551d34b authored by Anika Treffehn's avatar Anika Treffehn
Browse files

Merge branch '30-padding-before-filtering-and-delay-adjustment-missing' into 'main'

added padding for delay compensation in filtering

See merge request !70
parents a5b512c4 9a452db2
Loading
Loading
Loading
Loading
+40 −4
Original line number Diff line number Diff line
@@ -163,12 +163,28 @@ def window(
    return x


def delay_compensation(
def pad_delay(
    x: np.ndarray,
    flt_type: str,
    fs: Optional[int] = 48000,
    up: Optional[bool] = False,
    down: Optional[bool] = False,
    before: Optional[bool] = False,
):
    # Get the delay in number of samples
    d_samples = get_delay_flt_type(flt_type, up, down, before=before)

    # pad by length of delay
    y = trim(x, fs, (0, -d_samples), pad_noise=True, samples=True, seed=0)

    return y


def delay_compensation(
    x: np.ndarray,
    flt_type: str,
    up: Optional[bool] = False,
    down: Optional[bool] = False,
) -> np.ndarray:
    """
    Compensation for a delayed signal
@@ -192,21 +208,41 @@ def delay_compensation(
        Delay compensated test array
    """

    # Get the delay in number of samples
    d_samples = get_delay_flt_type(flt_type, up, down)

    # Delay compensation
    if flt_type == "SHQ3" and up:
        # delay not divisible by 3
        d_samples += 2

    x = x[d_samples:, :]

    return x


def get_delay_flt_type(flt_type, up, down, before=False):
    # Get the delay in number of samples
    if flt_type == "SHQ2" and up:
        d_samples = DELAY_COMPENSATION_FOR_FILTERING["SHQ2"]["up"]
        if before:
            d_samples = int(d_samples / 2)
    elif flt_type == "SHQ2" and down:
        d_samples = DELAY_COMPENSATION_FOR_FILTERING["SHQ2"]["down"]
        if before:
            d_samples = d_samples * 2
    elif flt_type == "SHQ3" and up:
        d_samples = DELAY_COMPENSATION_FOR_FILTERING["SHQ3"]["up"]
        if before:
            d_samples = int(np.ceil(d_samples / 3))
    elif flt_type == "SHQ3" and down:
        d_samples = DELAY_COMPENSATION_FOR_FILTERING["SHQ3"]["down"]
        if before:
            d_samples = d_samples * 3
    else:
        d_samples = DELAY_COMPENSATION_FOR_FILTERING[flt_type]
    # Delay compensation
    x = delay(x, fs, -d_samples, samples=True)

    return x
    return d_samples


def delay(
+27 −22
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@
#

import re
from copy import copy
from copy import deepcopy
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional
@@ -40,7 +40,7 @@ from warnings import warn
import numpy as np

from ivas_processing_scripts.audiotools.audio import Audio, ChannelBasedAudio
from ivas_processing_scripts.audiotools.audioarray import delay_compensation
from ivas_processing_scripts.audiotools.audioarray import delay_compensation, pad_delay
from ivas_processing_scripts.audiotools.audiofile import read, write
from ivas_processing_scripts.constants import DEFAULT_CONFIG_BINARIES
from ivas_processing_scripts.utils import find_binary, run
@@ -214,7 +214,7 @@ def lpfilter_itu(

    # resample if samplingrate is not supported
    old_fs = None
    tmp = copy(x)
    tmp = deepcopy(x)
    if x.fs != 48000:
        warn(
            f"Filter type {flt_type} only supported for 48kHz samplingrate, not for {x.fs}Hz -> resampling"
@@ -223,11 +223,14 @@ def lpfilter_itu(
        tmp.audio = resample_itu(tmp, 48000)
        tmp.fs = 48000

    # pad to avoid loosing samples due to delay
    tmp.audio = pad_delay(tmp.audio, flt_type=flt_type, fs=tmp.fs)

    # apply filter
    y = filter_itu(tmp, flt_type=flt_type, block_size=960)

    # delay compensation
    y = delay_compensation(y, flt_type=flt_type, fs=tmp.fs)
    y = delay_compensation(y, flt_type=flt_type)

    # reverse resampling
    if old_fs:
@@ -256,7 +259,7 @@ def hp50filter_itu(

    # set filter type and check if sampling rate is supported
    old_fs = None
    tmp = copy(x)
    tmp = deepcopy(x)
    if x.fs == 48000:
        flt_type = "HP50_48KHZ"
    elif x.fs == 32000:
@@ -277,11 +280,14 @@ def hp50filter_itu(
    else:
        skip_channel = None

    # pad to avoid loosing samples due to delay
    tmp.audio = pad_delay(tmp.audio, flt_type=flt_type, fs=tmp.fs)

    # apply filter
    y = filter_itu(tmp, flt_type=flt_type, skip_channel=skip_channel)

    # delay compensation
    y = delay_compensation(y, flt_type=flt_type, fs=tmp.fs)
    y = delay_compensation(y, flt_type=flt_type)

    # reverse resampling
    if old_fs:
@@ -310,7 +316,7 @@ def kbp20filter_itu(

    # set filter type and check if sampling rate is supported
    old_fs = None
    tmp = copy(x)
    tmp = deepcopy(x)
    if x.fs == 48000:
        flt_type = "20KBP"
    else:
@@ -329,11 +335,14 @@ def kbp20filter_itu(
    else:
        skip_channel = None

    # pad to avoid loosing samples due to delay
    tmp.audio = pad_delay(tmp.audio, flt_type=flt_type, fs=tmp.fs)

    # apply filter
    y = filter_itu(tmp, flt_type=flt_type, skip_channel=skip_channel)

    # delay compensation
    y = delay_compensation(y, flt_type=flt_type, fs=tmp.fs)
    y = delay_compensation(y, flt_type=flt_type)

    # reverse resampling
    if old_fs:
@@ -428,21 +437,17 @@ def resample_itu(
        raise ValueError("Ratio of input and output sampling frequency not supported")

    # apply filter
    y = copy(x)
    y = deepcopy(x)
    for i, flt in enumerate(flt_type):
        y.audio = filter_itu(y, flt_type=flt, up=up[i], down=down[i])
        y.audio = delay_compensation(
            y.audio, flt_type=flt, fs=y.fs, up=up[i], down=down[i]
        # pad to avoid loosing samples due to delay
        y.audio = pad_delay(
            y.audio, flt_type=flt, fs=y.fs, up=up[i], down=down[i], before=True
        )
        # if up[i]:
        #     if flt == "SHQ2":
        #         y.fs = y.fs * 2
        #     elif flt == "SHQ3":
        #         y.fs = y.fs * 3
        # elif down[i]:
        #     if flt == "SHQ2":
        #         y.fs = int(y.fs / 2)
        #     elif flt == "SHQ3":
        #         y.fs = int(y.fs / 3)

        # resampling
        y.audio = filter_itu(y, flt_type=flt, up=up[i], down=down[i])

        # delay compensation
        y.audio = delay_compensation(y.audio, flt_type=flt, up=up[i], down=down[i])

    return y.audio