diff --git a/ivas_processing_scripts/audiotools/audioarray.py b/ivas_processing_scripts/audiotools/audioarray.py index 9278b2402a3cf9a69c7decd6929b2d92b35f8d03..50096ba147be6e75a7dc8d00008ae77760eb3a34 100755 --- a/ivas_processing_scripts/audiotools/audioarray.py +++ b/ivas_processing_scripts/audiotools/audioarray.py @@ -163,12 +163,28 @@ def window( return x -def delay_compensation( +def pad_delay( x: np.ndarray, flt_type: str, fs: Optional[int] = 48000, up: Optional[bool] = False, down: Optional[bool] = False, + before: Optional[bool] = False, +): + # Get the delay in number of samples + d_samples = get_delay_flt_type(flt_type, up, down, before=before) + + # pad by length of delay + y = trim(x, fs, (0, -d_samples), pad_noise=True, samples=True, seed=0) + + return y + + +def delay_compensation( + x: np.ndarray, + flt_type: str, + up: Optional[bool] = False, + down: Optional[bool] = False, ) -> np.ndarray: """ Compensation for a delayed signal @@ -192,21 +208,41 @@ def delay_compensation( Delay compensated test array """ + # Get the delay in number of samples + d_samples = get_delay_flt_type(flt_type, up, down) + + # Delay compensation + if flt_type == "SHQ3" and up: + # delay not divisible by 3 + d_samples += 2 + + x = x[d_samples:, :] + + return x + + +def get_delay_flt_type(flt_type, up, down, before=False): # Get the delay in number of samples if flt_type == "SHQ2" and up: d_samples = DELAY_COMPENSATION_FOR_FILTERING["SHQ2"]["up"] + if before: + d_samples = int(d_samples / 2) elif flt_type == "SHQ2" and down: d_samples = DELAY_COMPENSATION_FOR_FILTERING["SHQ2"]["down"] + if before: + d_samples = d_samples * 2 elif flt_type == "SHQ3" and up: d_samples = DELAY_COMPENSATION_FOR_FILTERING["SHQ3"]["up"] + if before: + d_samples = int(np.ceil(d_samples / 3)) elif flt_type == "SHQ3" and down: d_samples = DELAY_COMPENSATION_FOR_FILTERING["SHQ3"]["down"] + if before: + d_samples = d_samples * 3 else: d_samples = DELAY_COMPENSATION_FOR_FILTERING[flt_type] - # Delay compensation - x = delay(x, fs, -d_samples, samples=True) - return x + return d_samples def delay( diff --git a/ivas_processing_scripts/audiotools/wrappers/filter.py b/ivas_processing_scripts/audiotools/wrappers/filter.py index 426a0f2c0598598943a857a105b72fdc102d55e5..f9734b35c4fbced343766ea3df48627a7b224599 100755 --- a/ivas_processing_scripts/audiotools/wrappers/filter.py +++ b/ivas_processing_scripts/audiotools/wrappers/filter.py @@ -31,7 +31,7 @@ # import re -from copy import copy +from copy import deepcopy from pathlib import Path from tempfile import TemporaryDirectory from typing import Optional @@ -40,7 +40,7 @@ from warnings import warn import numpy as np from ivas_processing_scripts.audiotools.audio import Audio, ChannelBasedAudio -from ivas_processing_scripts.audiotools.audioarray import delay_compensation +from ivas_processing_scripts.audiotools.audioarray import delay_compensation, pad_delay from ivas_processing_scripts.audiotools.audiofile import read, write from ivas_processing_scripts.constants import DEFAULT_CONFIG_BINARIES from ivas_processing_scripts.utils import find_binary, run @@ -214,7 +214,7 @@ def lpfilter_itu( # resample if samplingrate is not supported old_fs = None - tmp = copy(x) + tmp = deepcopy(x) if x.fs != 48000: warn( f"Filter type {flt_type} only supported for 48kHz samplingrate, not for {x.fs}Hz -> resampling" @@ -223,11 +223,14 @@ def lpfilter_itu( tmp.audio = resample_itu(tmp, 48000) tmp.fs = 48000 + # pad to avoid loosing samples due to delay + tmp.audio = pad_delay(tmp.audio, flt_type=flt_type, fs=tmp.fs) + # apply filter y = filter_itu(tmp, flt_type=flt_type, block_size=960) # delay compensation - y = delay_compensation(y, flt_type=flt_type, fs=tmp.fs) + y = delay_compensation(y, flt_type=flt_type) # reverse resampling if old_fs: @@ -256,7 +259,7 @@ def hp50filter_itu( # set filter type and check if sampling rate is supported old_fs = None - tmp = copy(x) + tmp = deepcopy(x) if x.fs == 48000: flt_type = "HP50_48KHZ" elif x.fs == 32000: @@ -277,11 +280,14 @@ def hp50filter_itu( else: skip_channel = None + # pad to avoid loosing samples due to delay + tmp.audio = pad_delay(tmp.audio, flt_type=flt_type, fs=tmp.fs) + # apply filter y = filter_itu(tmp, flt_type=flt_type, skip_channel=skip_channel) # delay compensation - y = delay_compensation(y, flt_type=flt_type, fs=tmp.fs) + y = delay_compensation(y, flt_type=flt_type) # reverse resampling if old_fs: @@ -310,7 +316,7 @@ def kbp20filter_itu( # set filter type and check if sampling rate is supported old_fs = None - tmp = copy(x) + tmp = deepcopy(x) if x.fs == 48000: flt_type = "20KBP" else: @@ -329,11 +335,14 @@ def kbp20filter_itu( else: skip_channel = None + # pad to avoid loosing samples due to delay + tmp.audio = pad_delay(tmp.audio, flt_type=flt_type, fs=tmp.fs) + # apply filter y = filter_itu(tmp, flt_type=flt_type, skip_channel=skip_channel) # delay compensation - y = delay_compensation(y, flt_type=flt_type, fs=tmp.fs) + y = delay_compensation(y, flt_type=flt_type) # reverse resampling if old_fs: @@ -428,21 +437,17 @@ def resample_itu( raise ValueError("Ratio of input and output sampling frequency not supported") # apply filter - y = copy(x) + y = deepcopy(x) for i, flt in enumerate(flt_type): - y.audio = filter_itu(y, flt_type=flt, up=up[i], down=down[i]) - y.audio = delay_compensation( - y.audio, flt_type=flt, fs=y.fs, up=up[i], down=down[i] + # pad to avoid loosing samples due to delay + y.audio = pad_delay( + y.audio, flt_type=flt, fs=y.fs, up=up[i], down=down[i], before=True ) - # if up[i]: - # if flt == "SHQ2": - # y.fs = y.fs * 2 - # elif flt == "SHQ3": - # y.fs = y.fs * 3 - # elif down[i]: - # if flt == "SHQ2": - # y.fs = int(y.fs / 2) - # elif flt == "SHQ3": - # y.fs = int(y.fs / 3) + + # resampling + y.audio = filter_itu(y, flt_type=flt, up=up[i], down=down[i]) + + # delay compensation + y.audio = delay_compensation(y.audio, flt_type=flt, up=up[i], down=down[i]) return y.audio