Commit ca4da5df authored by Anika Treffehn's avatar Anika Treffehn
Browse files

Merge branch '70-omasa-and-osba-support-missing' into 'main'

Resolve "OMASA and OSBA support missing"

See merge request !144
parents c94eac1f 83e6f94a
Loading
Loading
Loading
Loading
+0 −7
Original line number Diff line number Diff line
@@ -419,8 +419,6 @@ conditions_to_generate:
          # fs: 48000
          ### Additional commandline options; default = null
          # opts: ["-q", "-no_delay_cmp"]
	  ### Option to use SBA format of lower or same order (planar also possible) for SBA input formats
      # sba_fmt: "PLANARFOA"

  ### IVAS condition ###############################
  c07:
@@ -448,8 +446,6 @@ conditions_to_generate:
          # fs: 48000
          ### Additional commandline options; default = null
          # opts: ["-q", "-no_delay_cmp"]
	  ### Option to use SBA format of lower or same order (planar also possible) for SBA input formats
      # sba_fmt: "PLANARFOA"
            
  ### EVS condition ################################
  c08:
@@ -473,8 +469,6 @@ conditions_to_generate:
          bin: ~/git/ivas-codec/EVS_dec
          ### Decoder output sampling rate; default = null (same as input)
          # fs: 48000
	  ### Option to use SBA format of lower or same order (planar also possible) for SBA input formats
      # sba_fmt: "PLANARFOA"
```

</details>
@@ -547,7 +541,6 @@ This configuration has to match the channel configuration. If the provided list
For the encoding stage `cod` and the decoding stage `dec`, the path to the IVAS_cod and IVAS_dec binaries can be specified under the key `bin`.
Additionally some resampling can be applied by using the key `fs` followed by the desired sampling rate.
The general bitstream processing configuration can be locally overwritten for each EVS and IVAS condition with the key `tx`.
For IVAS and EVS conditions the `sba_fmt` key is available to specify a SBA format of lower or same order compared to the input for SBA input formats.
The additional key `evs_lfe_9k6bps_nb` is only available for EVS conditions and ensures a bitrate of 9.6kbps and narrow band processing of the LFE channel(s).
#### IVAS
The configuration of the IVAS condition is similar to the EVS condition. However, only one bitrate for all channels (and metadata) can be specified.
+0 −6
Original line number Diff line number Diff line
@@ -243,8 +243,6 @@ conditions_to_generate:
      ### Bitstream options
      # tx:
          ### For possible arguments see overall bitstream modification
      ### Option to use SBA format of lower or same order (planar also possible) for SBA input formats
      # sba_fmt: "PLANARFOA"

  ### IVAS condition ###############################
  c07:
@@ -275,8 +273,6 @@ conditions_to_generate:
      ### Bitstream options
      # tx:
          ### For possible arguments see overall bitstream modification
      ### Option to use SBA format of lower or same order (planar also possible) for SBA input formats
      # sba_fmt: "PLANARFOA"
            
  ### EVS condition ################################
  c08:
@@ -305,8 +301,6 @@ conditions_to_generate:
      ### Bitstream options
      # tx:
          ### For possible arguments see overall bitstream modification
      ### Option to use SBA format of lower or same order (planar also possible) for SBA input formats
      # sba_fmt: "PLANARFOA"

################################################
### Post-processing
+26 −6
Original line number Diff line number Diff line
@@ -36,7 +36,7 @@ from itertools import product
from multiprocessing import Pool
from time import sleep

from ivas_processing_scripts.audiotools.metadata import check_ISM_metadata
from ivas_processing_scripts.audiotools.metadata import check_ISM_metadata, check_MASA_metadata
from ivas_processing_scripts.constants import (
    LOGGER_DATEFMT,
    LOGGER_FORMAT,
@@ -112,24 +112,44 @@ def main(args):
                    cfg.items_list, cfg.preprocessing_2["concatenation_order"]
                )

        metadata = [[]] * len(cfg.items_list)
        # check for ISM metadata
        if cfg.input["fmt"].startswith("ISM"):
            metadata = check_ISM_metadata(
            metadata_ISM = check_ISM_metadata(
                cfg.metadata_path,
                num_objects=int(cfg.input["fmt"][3]),
                num_items=len(cfg.items_list),
                item_names=cfg.items_list,
            )
            # print info about found and used metadata files
            for i in range(len(metadata)):
            for i in range(len(metadata_ISM)):
                metadata_str = []
                for o in range(len(metadata[i])):
                    metadata_str.append(str(metadata[i][o]))
                for o in range(len(metadata_ISM[i])):
                    metadata_str.append(str(metadata_ISM[i][o]))
                logger.debug(
                    f"  ISM metadata files item {cfg.items_list[i]}: {', '.join(metadata_str)}"
                )
            metadata = metadata_ISM

        else:
        # check for MASA metadata
        if "MASA" in cfg.input["fmt"]:
            metadata_MASA = check_MASA_metadata(
                cfg.metadata_path,
                num_items=len(cfg.items_list),
                item_names=cfg.items_list,
            )
            # print info about found and used metadata files
            for i in range(len(metadata_MASA)):
                metadata_str = []
                for o in range(len(metadata_MASA[i])):
                    metadata_str.append(str(metadata_MASA[i][o]))
                logger.debug(
                    f"  MASA metadata file item {cfg.items_list[i]}: {', '.join(metadata_str)}"
                )
            for i, meta in enumerate(metadata):
                meta.extend(metadata_MASA[i])

        if not cfg.input["fmt"].startswith("ISM") and not "MASA" in cfg.input["fmt"]:
            metadata = [None] * len(cfg.items_list)

        cfg.metadata_path = metadata
+229 −8
Original line number Diff line number Diff line
@@ -48,6 +48,8 @@ from ivas_processing_scripts.audiotools.constants import (
    NUMBER_COLUMNS_ISM_METADATA,
    OBJECT_BASED_AUDIO_FORMATS,
    SCENE_BASED_AUDIO_FORMATS,
    OMASA_AUDIO_FORMATS,
    OSBA_AUDIO_FORMATS,
)

from .EFAP import wrap_angles
@@ -216,18 +218,23 @@ class MetadataAssistedSpatialAudio(Audio):
            raise ValueError(
                f"Unsupported metadata assisted spatial audio format {name}"
            )
        self.metadata_files = []
        self.metadata_file = None

    @classmethod
    def _from_file(
        cls,
        name: str,
        filename: Path,
        metadata_files: list[str],
        metadata_file: Union[str, list],
        fs: Optional[int] = None,
    ) -> "MetadataAssistedSpatialAudio":
        obj = super()._from_file(name, filename, fs)
        obj.metadata_file = Path(metadata_files[0])
        if isinstance(metadata_file, list):
            if len(metadata_file) > 1:
                warn("Only first metadata file used. Additional metadata ignored for MASA")
            obj.metadata_file = Path(metadata_file[0])
        else:
            obj.metadata_file = Path(metadata_file)
        return obj

    @classmethod
@@ -235,11 +242,11 @@ class MetadataAssistedSpatialAudio(Audio):
        cls,
        name: str,
        filename: Path,
        metadata_files: list[str],
        metadata_file: str,
        fs: Optional[int] = None,
    ) -> "MetadataAssistedSpatialAudio":
        obj = super()._from_file(name, filename, fs)
        obj.metadata_file = Path(metadata_files[0])
        obj.metadata_file = Path(metadata_file)
        return obj


@@ -353,6 +360,12 @@ class SceneBasedAudio(Audio):
            name = "HOA2"
        elif name == "SBA3":
            name = "HOA3"
        elif name == "PLANARSBA1":
            name = "PLANARFOA"
        elif name == "PLANARSBA2":
            name = "PLANARHOA2"
        elif name == "PLANARSBA3":
            name = "PLANARHOA3"

        super().__init__(name)
        try:
@@ -376,6 +389,210 @@ class SceneBasedAudio(Audio):
        return super()._from_filelist(name, filename, fs)


class OMASAAudio(Audio):
    """Sub-class for combined OMASA format"""
    def __init__(self, name: str):
        super().__init__(name)
        try:
            self.__dict__.update(OMASA_AUDIO_FORMATS[name.upper()])
        except KeyError:
            raise ValueError(f"Unsupported OMASA audio format {name}")
        self.object_pos = []
        self.metadata_files = []  # first ISM metadata followed by masa metadata

    @classmethod
    def _from_file(
        cls,
        name: str,
        filename: Union[str, Path],
        metadata_files: list[Union[str, Path]],
        fs: Optional[int] = None,
    ) -> "OMASAAudio":
        obj = super()._from_file(name, filename, fs)
        if metadata_files is not None:
            obj.metadata_files = [Path(f) for f in metadata_files]
        else:
            # search for metadata with naming scheme: name.(wav, pcm).(0-3).csv
            for obj_idx in range(obj.num_ism_channels):
                file_name_meta = filename.with_suffix(
                    f"{filename.suffix}.{obj_idx}.csv"
                )
                if file_name_meta.is_file():
                    obj.metadata_files.append(file_name_meta)
                else:
                    raise ValueError(f"Metadata file {file_name_meta} not found.")
            warn(
                f"No metadata files specified: The following files were found and used: \n {*obj.metadata_files,}"
            )

        obj.init_metadata()
        return obj

    @classmethod
    def _from_filelist(
        cls,
        name: str,
        filename: Path,
        metadata_files: list[Union[str, Path]],
        fs: Optional[int] = None,
    ) -> "OMASAAudio":
        obj = super()._from_filelist(name, filename, fs)
        obj.metadata_files = [Path(f) for f in metadata_files]
        obj.init_metadata()
        return obj

    def init_metadata(self):
        # check if number of metadata files matches format
        if self.num_ism_channels != len(self.metadata_files)-1:
            raise ValueError(
                f"Mismatch between number of ism channels [{self.num_ism_channels}], and metadata [{len(self.metadata_files)}]. Note: metadata should also include masa metadata file"
            )

        self.object_pos = []
        for i, f in enumerate(self.metadata_files):
            if i >= self.num_ism_channels:
                # only read ISM metadata, not MASA metadata
                break

            pos = np.genfromtxt(f, delimiter=",")

            # check if metadata has right number of columns
            num_columns = pos.shape[1]
            if num_columns < 2:
                raise ValueError(
                    "Metadata incomplete. Columns are missing. Azimuth and elevation are mandatory."
                )
            elif num_columns > NUMBER_COLUMNS_ISM_METADATA:
                raise ValueError("Too many columns in metadata")

            # pad metadata to max number of columns
            if num_columns < NUMBER_COLUMNS_ISM_METADATA:
                pos = np.hstack(
                    [pos, np.array(pos.shape[0] * [DEFAULT_ISM_METADATA[num_columns:]])]
                )

            # check if metadata is longer than file -> cut off
            num_frames = int(
                np.ceil(self.audio.shape[0] / (self.fs * IVAS_FRAME_LEN_MS / 1000))
            )
            if num_frames < pos.shape[0]:
                pos = pos[:num_frames]
            # check if metadata is shorter than file -> loop
            elif num_frames > pos.shape[0]:
                pos_loop = np.zeros((num_frames, pos.shape[1]))
                pos_loop[: pos.shape[0]] = pos
                for idx in range(pos.shape[0], num_frames):
                    pos_loop[idx, :2] = pos[idx % pos.shape[0], :2]
                pos = pos_loop

            # wrap metadata to target value range
            for j in range(num_frames):
                pos[j, 0], pos[j, 1] = wrap_angles(pos[j, 0], pos[j, 1], clip_ele=True)

            self.object_pos.append(pos)


class OSBAAudio(Audio):
    """Sub-class for OSBA audio"""

    def __init__(self, name: str):
        super().__init__(name)
        try:
            self.__dict__.update(OSBA_AUDIO_FORMATS[name.upper()])
        except KeyError:
            raise ValueError(f"Unsupported OSBA audio format {name}")
        self.object_pos = []
        self.metadata_files = []
        self.ambi_order = int(np.sqrt(self.num_channels-self.num_ism_channels) - 1)

    @classmethod
    def _from_file(
        cls,
        name: str,
        filename: Union[str, Path],
        metadata_files: list[Union[str, Path]],
        fs: Optional[int] = None,
    ) -> "OSBAAudio":
        obj = super()._from_file(name, filename, fs)
        if metadata_files is not None:
            obj.metadata_files = [Path(f) for f in metadata_files]
        else:
            # search for metadata with naming scheme: name.(wav, pcm).(0-3).csv
            for obj_idx in range(obj.num_ism_channels):
                file_name_meta = filename.with_suffix(
                    f"{filename.suffix}.{obj_idx}.csv"
                )
                if file_name_meta.is_file():
                    obj.metadata_files.append(file_name_meta)
                else:
                    raise ValueError(f"Metadata file {file_name_meta} not found.")
            warn(
                f"No metadata files specified: The following files were found and used: \n {*obj.metadata_files,}"
            )

        obj.init_metadata()
        return obj

    @classmethod
    def _from_filelist(
        cls,
        name: str,
        filename: Path,
        metadata_files: list[Union[str, Path]],
        fs: Optional[int] = None,
    ) -> "OSBAAudio":
        obj = super()._from_filelist(name, filename, fs)
        obj.metadata_files = [Path(f) for f in metadata_files]
        obj.init_metadata()
        return obj

    def init_metadata(self):
        # check if number of metadata files matches format
        if self.num_ism_channels != len(self.metadata_files):
            raise ValueError(
                f"Mismatch between number of ism channels [{self.num_ism_channels}], and metadata [{len(self.metadata_files)}]"
            )

        self.object_pos = []
        for i, f in enumerate(self.metadata_files):
            pos = np.genfromtxt(f, delimiter=",")

            # check if metadata has right number of columns
            num_columns = pos.shape[1]
            if num_columns < 2:
                raise ValueError(
                    "Metadata incomplete. Columns are missing. Azimuth and elevation are mandatory."
                )
            elif num_columns > NUMBER_COLUMNS_ISM_METADATA:
                raise ValueError("Too many columns in metadata")

            # pad metadata to max number of columns
            if num_columns < NUMBER_COLUMNS_ISM_METADATA:
                pos = np.hstack(
                    [pos, np.array(pos.shape[0] * [DEFAULT_ISM_METADATA[num_columns:]])]
                )

            # check if metadata is longer than file -> cut off
            num_frames = int(
                np.ceil(self.audio.shape[0] / (self.fs * IVAS_FRAME_LEN_MS / 1000))
            )
            if num_frames < pos.shape[0]:
                pos = pos[:num_frames]
            # check if metadata is shorter than file -> loop
            elif num_frames > pos.shape[0]:
                pos_loop = np.zeros((num_frames, pos.shape[1]))
                pos_loop[: pos.shape[0]] = pos
                for idx in range(pos.shape[0], num_frames):
                    pos_loop[idx, :2] = pos[idx % pos.shape[0], :2]
                pos = pos_loop

            # wrap metadata to target value range
            for j in range(num_frames):
                pos[j, 0], pos[j, 1] = wrap_angles(pos[j, 0], pos[j, 1], clip_ele=True)

            self.object_pos.append(pos)


def _get_audio_class(fmt) -> Audio:
    """Return a child audio class corresponding to the specifed format"""
    if fmt in BINAURAL_AUDIO_FORMATS.keys():
@@ -387,9 +604,13 @@ def _get_audio_class(fmt) -> Audio:
    elif fmt in SCENE_BASED_AUDIO_FORMATS.keys():
        return SceneBasedAudio
    elif (
        fmt in CHANNEL_BASED_AUDIO_FORMATS.keys() or CHANNEL_BASED_AUDIO_ALTNAMES.keys()
        fmt in CHANNEL_BASED_AUDIO_FORMATS.keys() or fmt in CHANNEL_BASED_AUDIO_ALTNAMES.keys()
    ):
        return ChannelBasedAudio
    elif fmt in OSBA_AUDIO_FORMATS.keys():
        return OSBAAudio
    elif fmt in OMASA_AUDIO_FORMATS.keys():
        return OMASAAudio
    elif Path(fmt).suffix == ".txt":
        return ChannelBasedAudio
    else:
@@ -403,7 +624,7 @@ def fromtype(fmt: str) -> Audio:
def fromarray(fmt: str, x: np.ndarray, fs: int) -> Audio:
    """Wrap the given array into an audio format"""
    if x is None or not fs:
        return ValueError("Both array and sampling rate must be specified!")
        raise ValueError("Both array and sampling rate must be specified!")

    output = _get_audio_class(fmt)(fmt)

@@ -422,7 +643,7 @@ def fromfile(
    """Create an Audio object of the specified format from the given file"""
    filename = Path(filename)
    fmt_cls = _get_audio_class(fmt)
    if fmt_cls is ObjectBasedAudio or fmt_cls is MetadataAssistedSpatialAudio:
    if fmt_cls is ObjectBasedAudio or fmt_cls is MetadataAssistedSpatialAudio or fmt_cls is OMASAAudio or fmt_cls is OSBAAudio:
        return fmt_cls._from_file(fmt, filename, in_meta, fs)
    else:
        return fmt_cls._from_file(fmt, filename, fs)
+10 −2
Original line number Diff line number Diff line
@@ -298,6 +298,7 @@ def combine(
    out_file: str,
    in_fs: Optional[int] = 48000,
    is_planar: Optional[bool] = False,
    is_planar_offset: Optional[int] = 0,
) -> None:
    """
    Combines audio files into one multi-channel file
@@ -310,6 +311,10 @@ def combine(
        Output multi-channel audio file name (.pcm, .raw or .wav)
    in_fs: Optional[int]
        Input sampling rate, required for .pcm and .raw input file, default 48000 Hz
    is_planar: Optional[bool]
        If true vertical SBA channels are set to zero
    is_planar_offset: Optional[int]
        Offset of SBA due to OSBA (corresponds to num of ISM channels)

    Returns
    -------
@@ -338,7 +343,7 @@ def combine(

    # set vertical channels to zero
    if is_planar:
        y[:, VERT_HOA_CHANNELS_ACN[VERT_HOA_CHANNELS_ACN < len(in_filenames)]] = 0
        y[:, VERT_HOA_CHANNELS_ACN[VERT_HOA_CHANNELS_ACN < (len(in_filenames) - is_planar_offset)] + is_planar_offset] = 0

    write(out_file, y, fs=in_fs)

@@ -349,6 +354,7 @@ def split_channels(
    in_nchans: int,
    out_nchans: int,
    is_planar: Optional[bool] = False,
    is_planar_offset: Optional[int] = 0,
    in_fs: Optional[int] = 48000,
) -> None:
    """
@@ -366,6 +372,8 @@ def split_channels(
        Number of channels to be split
    is_planar: Optional[bool]
        If true vertical SBA channels are set to zero
    is_planar_offset: Optional[int]
        Offset of SBA due to OSBA (corresponds to num of ISM channels)
    in_fs: Optional[int] = 48000
        Input sampling rate, default 48000 Hz

@@ -386,7 +394,7 @@ def split_channels(
    x, in_fs = read(in_file, nchannels=in_nchans, fs=in_fs)

    if is_planar:
        x[:, VERT_HOA_CHANNELS_ACN[VERT_HOA_CHANNELS_ACN < in_nchans]] = 0
        x[:, VERT_HOA_CHANNELS_ACN[VERT_HOA_CHANNELS_ACN < (in_nchans - is_planar_offset)] + is_planar_offset] = 0

    # Write output files
    for idx, out_file in enumerate(out_filenames):
Loading