From 5cd304bcdc7d8f3597e7c9e794597a015d6c4e54 Mon Sep 17 00:00:00 2001 From: Lauros Pajunen Date: Thu, 9 Oct 2025 14:22:36 +0300 Subject: [PATCH 1/7] Add rtpdump python tests and requirement --- tests/requirements.txt | 1 + tests/rtp/ivasrtp.py | 1654 ++++++++++++++++++++++++++++++++++++++++ tests/rtp/test_rtp.py | 665 ++++++++++++++++ 3 files changed, 2320 insertions(+) create mode 100644 tests/rtp/ivasrtp.py create mode 100644 tests/rtp/test_rtp.py diff --git a/tests/requirements.txt b/tests/requirements.txt index 2eb090f4fb..00100fae88 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -2,3 +2,4 @@ pytest>=5.3.5 pytest-xdist>=1.31.0 scipy>=1.5.2 numpy>=1.19.2 +bitstring>=4.3.1 diff --git a/tests/rtp/ivasrtp.py b/tests/rtp/ivasrtp.py new file mode 100644 index 0000000000..bd13acaf56 --- /dev/null +++ b/tests/rtp/ivasrtp.py @@ -0,0 +1,1654 @@ +#!/usr/bin/env python3 + +__copyright__ = """ +(C) 2022-2025 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB, +Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., +Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, +Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other +contributors to this repository. All Rights Reserved. + +This software is protected by copyright law and by international treaties. +The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB, +Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., +Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, +Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other +contributors to this repository retain full ownership rights in their respective contributions in +the software. This notice grants no license of any kind, including but not limited to patent +license, nor is any license granted by implication, estoppel or otherwise. + +Contributors are required to enter into the IVAS codec Public Collaboration agreement before making +contributions. + +This software is provided "AS IS", without any express or implied warranties. The software is in the +development stage. It is intended exclusively for experts who have experience with such software and +solely for the purpose of inspection. All implied warranties of non-infringement, merchantability +and fitness for a particular purpose are hereby disclaimed and excluded. + +Any dispute, controversy or claim arising under or in relation to providing this software shall be +submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in +accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and +the United Nations Convention on Contracts on the International Sales of Goods. +""" + +__doc__ = """ +To configure test modules. +""" + +import struct +from enum import Enum +from dataclasses import dataclass, field, asdict +from bitstring import ConstBitStream, BitStream, ReadError +import json +import base64 +import argparse +from pathlib import Path +from typing import cast, Optional + +NO_REQ = "NO_REQ" + + +class CAMODE(str, Enum): + CA_LO_02 = "CA-LO-O2" + CA_LO_03 = "CA-LO-O3" + CA_LO_05 = "CA-LO-O5" + CA_LO_07 = "CA-LO-O7" + CA_HI_02 = "CA-HI-O2" + CA_HI_03 = "CA-HI-O3" + CA_HI_05 = "CA-HI-O5" + CA_HI_07 = "CA-HI-O7" + + +class CODECS(str, Enum): + AMRWB = "amrwb_io" + EVS = "evs" + IVAS = "ivas" + + +class SRCODEC(str, Enum): + LCLD = "lcld" + LC3PLUS = "lc3+" + + +class BANDWIDTH(str, Enum): + NB = "narrowband" + WB = "wideband" + SWB = "super wideband" + FB = "fullband" + + +class REQUESTS(str, Enum): + CODEC = ("codec",) + BR = "bitrate" + BW = "bandwidth" + CA = "ca-mode" + FMT = "format" + SUBFMT = "sub-format" + SRCFG = "sr-config" + + +class FORMATS(str, Enum): + STEREO = "Stereo" + SBA = "SBA" + MASA = "MASA" + ISM = "ISM" + MC = "MC" + OMASA = "OMASA" + OSBA = "OSBA" + + +class SUBFORMATS(str, Enum): + FOA_PLANAR = "FOA planar" + HOA2_PLANAR = "HOA2 planar" + HOA3_PLANAR = "HOA3 planar" + FOA = "FOA" + HOA2 = "HOA2" + HOA3 = "HOA3" + MASA1 = "MASA1" + MASA2 = "MASA2" + ISM1 = "ISM1" + ISM2 = "ISM2" + ISM3 = "ISM3" + ISM4 = "ISM4" + ISM1_EXTENDED_METADATA = "ISM1 extended metadata" + ISM2_EXTENDED_METADATA = "ISM2 extended metadata" + ISM3_EXTENDED_METADATA = "ISM3 extended metadata" + ISM4_EXTENDED_METADATA = "ISM4 extended metadata" + MC_5_1 = "MC 5.1" + MC_7_1 = "MC 7.1" + MC_5_1_2 = "MC 5.1.2" + MC_5_1_4 = "MC 5.1.4" + MC_7_1_4 = "MC 7.1.4" + Reserved22 = "Reserved22" + Reserved23 = "Reserved23" + Reserved24 = "Reserved24" + Reserved25 = "Reserved25" + Reserved26 = "Reserved26" + Reserved27 = "Reserved27" + Reserved28 = "Reserved28" + Reserved29 = "Reserved29" + Reserved30 = "Reserved30" + Reserved31 = "Reserved31" + Reserved32 = "Reserved32" + OMASA_ISM1_1TC = "OMASA ISM1 1TC" + OMASA_ISM2_1TC = "OMASA ISM2 1TC" + OMASA_ISM3_1TC = "OMASA ISM3 1TC" + OMASA_ISM4_1TC = "OMASA ISM4 1TC" + OMASA_ISM1_2TC = "OMASA ISM1 2TC" + OMASA_ISM2_2TC = "OMASA ISM2 2TC" + OMASA_ISM3_2TC = "OMASA ISM3 2TC" + OMASA_ISM4_2TC = "OMASA ISM4 2TC" + OSBA_ISM1_FOA_PLANAR = "OSBA ISM1 FOA planar" + OSBA_ISM2_FOA_PLANAR = "OSBA ISM2 FOA planar" + OSBA_ISM3_FOA_PLANAR = "OSBA ISM3 FOA planar" + OSBA_ISM4_FOA_PLANAR = "OSBA ISM4 FOA planar" + OSBA_ISM1_FOA = "OSBA ISM1 FOA" + OSBA_ISM2_FOA = "OSBA ISM2 FOA" + OSBA_ISM3_FOA = "OSBA ISM3 FOA" + OSBA_ISM4_FOA = "OSBA ISM4 FOA" + OSBA_ISM1_HOA2_PLANAR = "OSBA ISM1 HOA2 planar" + OSBA_ISM2_HOA2_PLANAR = "OSBA ISM2 HOA2 planar" + OSBA_ISM3_HOA2_PLANAR = "OSBA ISM3 HOA2 planar" + OSBA_ISM4_HOA2_PLANAR = "OSBA ISM4 HOA2 planar" + OSBA_ISM1_HOA2 = "OSBA ISM1 HOA2" + OSBA_ISM2_HOA2 = "OSBA ISM2 HOA2" + OSBA_ISM3_HOA2 = "OSBA ISM3 HOA2" + OSBA_ISM4_HOA2 = "OSBA ISM4 HOA2" + OSBA_ISM1_HOA3_PLANAR = "OSBA ISM1 HOA3 planar" + OSBA_ISM2_HOA3_PLANAR = "OSBA ISM2 HOA3 planar" + OSBA_ISM3_HOA3_PLANAR = "OSBA ISM3 HOA3 planar" + OSBA_ISM4_HOA3_PLANAR = "OSBA ISM4 HOA3 planar" + OSBA_ISM1_HOA3 = "OSBA ISM1 HOA3" + OSBA_ISM2_HOA3 = "OSBA ISM2 HOA3" + OSBA_ISM3_HOA3 = "OSBA ISM3 HOA3" + OSBA_ISM4_HOA3 = "OSBA ISM4 HOA3" + + +class PIDATAS(str, Enum): + SCENE_ORIENTATION = "SCENE_ORIENTATION" + DEVICE_ORIENTATION_COMPENSATED = "DEVICE_ORIENTATION_COMPENSATED" + DEVICE_ORIENTATION_UNCOMPENSATED = "DEVICE_ORIENTATION_UNCOMPENSATED" + ACOUSTIC_ENVIRONMENT = "ACOUSTIC_ENVIRONMENT" + AUDIO_DESCRIPTION = "AUDIO_DESCRIPTION" + ISM_NUM = "ISM_NUM" + ISM_ID = "ISM_ID" + ISM_GAIN = "ISM_GAIN" + ISM_ORIENTATION = "ISM_ORIENTATION" + ISM_POSITION = "ISM_POSITION" + ISM_DISTANCE_ATTENUATION = "ISM_DISTANCE_ATTENUATION" + ISM_DIRECTIVITY = "ISM_DIRECTIVITY" + DIEGETIC_TYPE = "DIEGETIC_TYPE" + RESERVED13 = "RESERVED13" + AUDIO_FOCUS_INDICATION = "AUDIO_FOCUS_INDICATION" + RESERVED15 = "RESERVED15" + PLAYBACK_DEVICE_ORIENTATION = "PLAYBACK_DEVICE_ORIENTATION" + HEAD_ORIENTATION = "HEAD_ORIENTATION" + LISTENER_POSITION = "LISTENER_POSITION" + DYNAMIC_AUDIO_SUPPRESSION = "DYNAMIC_AUDIO_SUPPRESSION" + AUDIO_FOCUS_REQUEST = "AUDIO_FOCUS_REQUEST" + PI_LATENCY = "PI_LATENCY" + R_ISM_ID = "R_ISM_ID" + R_ISM_GAIN = "R_ISM_GAIN" + R_ISM_ORIENTATION = "R_ISM_ORIENTATION" + R_ISM_POSITION = "R_ISM_POSITION" + R_ISM_DIRECTION = "R_ISM_DIRECTION" + RESERVED27 = "RESERVED27" + RESERVED28 = "RESERVED28" + RESERVED29 = "RESERVED29" + RESERVED30 = "RESERVED30" + NO_PI_DATA = "NO_PI_DATA" + + +class SUPPRESSION_LEVEL(int, Enum): + SUPPRESSION_LEVEL_NONE = 0 + SUPPRESSION_LEVEL_1 = 1 + SUPPRESSION_LEVEL_2 = 2 + SUPPRESSION_LEVEL_3 = 3 + SUPPRESSION_LEVEL_4 = 4 + SUPPRESSION_LEVEL_5 = 5 + SUPPRESSION_LEVEL_6 = 6 + SUPPRESSION_LEVEL_7 = 7 + SUPPRESSION_LEVEL_8 = 8 + SUPPRESSION_LEVEL_9 = 9 + SUPPRESSION_LEVEL_10 = 10 + SUPPRESSION_LEVEL_11 = 11 + SUPPRESSION_LEVEL_12 = 12 + SUPPRESSION_LEVEL_13 = 13 + SUPPRESSION_LEVEL_14 = 14 + SUPPRESSION_LEVEL_MAX = 15 + + +class AUDIO_FOCUS_LEVEL(int, Enum): + AUDIO_FOCUS_LEVEL_NONE = 0 + AUDIO_FOCUS_LEVEL_1 = 1 + AUDIO_FOCUS_LEVEL_2 = 2 + AUDIO_FOCUS_LEVEL_3 = 3 + AUDIO_FOCUS_LEVEL_4 = 4 + AUDIO_FOCUS_LEVEL_5 = 5 + AUDIO_FOCUS_LEVEL_6 = 6 + AUDIO_FOCUS_LEVEL_7 = 7 + AUDIO_FOCUS_LEVEL_8 = 8 + AUDIO_FOCUS_LEVEL_9 = 9 + AUDIO_FOCUS_LEVEL_10 = 10 + AUDIO_FOCUS_LEVEL_11 = 11 + AUDIO_FOCUS_LEVEL_12 = 12 + AUDIO_FOCUS_LEVEL_13 = 13 + AUDIO_FOCUS_LEVEL_DEFAULT = 14 + AUDIO_FOCUS_LEVEL_NO_PREFERENCE = 15 + + +@dataclass +class RTPHDR: + version: int = 2 + padding: bool = False + extension: bool = False + csrcCount: int = 0 + marker: bool = False + payloadType: int = 0 + sequenceNum: int = 0 + timestamp: int = 0 + ssrc: int = 0 + extensionType: int = 0 + extensionLength: int = 0 + csrcList: list = field(default_factory=list) + extensionWords: list = field(default_factory=list) + + def updateHeader(self, numFrames: int): + self.sequenceNum = (self.sequenceNum + 1) % 65536 + self.timestamp += 320 * numFrames + + def pack(self, bitstrm: BitStream): + bitstrm.append(f"uint:2={self.version}") + bitstrm.append(f"bool={self.padding}") + bitstrm.append(f"bool={self.extension}") + bitstrm.append(f"uint:4={self.csrcCount}") + bitstrm.append(f"bool={self.marker}") + bitstrm.append(f"uint:7={self.payloadType}") + bitstrm.append(f"uintbe:16={self.sequenceNum}") + bitstrm.append(f"uintbe:32={self.timestamp}") + bitstrm.append(f"uintbe:32={self.ssrc}") + assert ( + len(self.csrcList) == self.csrcCount + ), "csrcList must be of length csrcCount" + for csrc in self.csrcList: + bitstrm.append(f"uintbe:32={csrc}") + if self.extension: + bitstrm.append(f"uintbe:16={self.extensionType}") + bitstrm.append(f"uintbe:16={self.extensionLength}") + assert ( + len(self.extensionWords) == self.extensionLength + ), "extensionWords must be of extensionLength csrcCount" + for ext in self.extensionWords: + bitstrm.append(f"uintbe:32={ext}") + + @classmethod + def unpack(cls, bitstrm: ConstBitStream): + hdr = cls() + hdr.version = bitstrm.read(2).uint + hdr.padding = bitstrm.read(1).bool + hdr.extension = bitstrm.read(1).bool + hdr.csrcCount = bitstrm.read(4).int + hdr.marker = bitstrm.read(1).bool + hdr.payloadType = bitstrm.read(7).int + hdr.sequenceNum = bitstrm.read(16).uintbe + hdr.timestamp = bitstrm.read(32).uintbe + hdr.ssrc = bitstrm.read(32).uintbe + if hdr.csrcCount: + hdr.csrcList = [bitstrm.read(32).uintbe for _ in range(hdr.csrcCount)] + if hdr.extension: + hdr.extensionType = bitstrm.read(16).uintbe + hdr.extensionLength = bitstrm.read(16).uintbe + hdr.extensionWords = [ + bitstrm.read(32).uintbe for _ in range(hdr.extensionLength) + ] + return hdr + + +@dataclass +class SRCONFIG: + diegetic: bool = False + yaw: bool = False + pitch: bool = False + roll: bool = False + + +@dataclass +class CMR: + bandwidth: BANDWIDTH + codec: CODECS = CODECS.IVAS + startIdx: int = 0 + endIdx: int = 0 + bitrates: list = field(default_factory=list) + + +@dataclass +class SRINFO: + bitrate: int = 0 + diegetic: bool = False + transportCodec: SRCODEC = SRCODEC.LCLD + + +@dataclass +class FRAME: + codec: CODECS = CODECS.IVAS + frmSizeBits: int = 0 + bitrate: int = 0 + speechLost: bool = False + srInfo: SRINFO = None + timestamp: int = 0 + au: bytes = field(default_factory=bytes) + + +# PI DATA STRUCTURES +@dataclass +class ORIENTATION: + w: float = 0.0 + x: float = 0.0 + y: float = 0.0 + z: float = 0.0 + + +@dataclass +class POSITION: + x: float = 0.0 + y: float = 0.0 + z: float = 0.0 + + +class ISM_POSITIONS: + positions: list[POSITION] + + +@dataclass +class AUDIO_DESCRIPTION: + isSpeech: bool = False + isMusic: bool = False + isAmbiance: bool = False + isEditable: bool = False + isBinaural: bool = False + + +@dataclass +class DYNAMIC_AUDIO_SUPPRESSION: + preferSpeech: bool = False + preferMusic: bool = False + preferAmbiance: bool = False + level: SUPPRESSION_LEVEL = SUPPRESSION_LEVEL.SUPPRESSION_LEVEL_MAX + + +@dataclass +class DIEGETIC_TYPE: + isDigetic: list[bool] + + +@dataclass +class ACOUSTIC_ENVIRONMENT: + aeid: int = 0 + rt60: tuple[float, float, float] = () + dsr: tuple[float, float, float] = () + dim: tuple[float, float, float] = () + abscoeff: tuple[float, float, float, float, float, float] = () + + +@dataclass +class AUDIO_FOCUS: + direction: Optional[ORIENTATION] = None + level: Optional[AUDIO_FOCUS_LEVEL] = None + + +@dataclass +class PIDATA: + timestamp: int = 0 + type: str = "NO_PI_DATA" + data: any = None + + +MAX_PACKED_PI_SIZE = 32 +ivasBitrates = [ + 13200, + 16400, + 24400, + 32000, + 48000, + 64000, + 80000, + 96000, + 128000, + 160000, + 192000, + 256000, + 384000, + 512000, + -1, + 5200, +] +evsBitrates = [ + 5900, + 7200, + 8000, + 9600, + 13200, + 16400, + 24400, + 32000, + 48000, + 64000, + 96000, + 128000, + 2400, + -1, + -1, + -1, +] +amrwbBitrates = [ + 6600, + 8850, + 12650, + 14250, + 15850, + 18250, + 19850, + 23050, + 23850, + 1750, + -1, + -1, + -1, + -1, + -1, + -1, +] +requestBitratesForCodec = { + CODECS.AMRWB: amrwbBitrates[0:9], + CODECS.EVS: evsBitrates[0:12], + CODECS.IVAS: ivasBitrates[0:14], +} +rt60Value = [ + 0.01, + 0.0126, + 0.0159, + 0.02, + 0.0252, + 0.0317, + 0.04, + 0.0504, + 0.0635, + 0.08, + 0.1008, + 0.1270, + 0.16, + 0.2016, + 0.2540, + 0.32, + 0.4032, + 0.5080, + 0.64, + 0.8063, + 1.0159, + 1.28, + 1.6127, + 2.0319, + 2.56, + 3.2254, + 4.0637, + 5.12, + 6.4508, + 8.1275, + 10.24, + 12.9016, +] +dsrValue = [ + -20.0, + -21.0, + -22.0, + -23.0, + -24.0, + -25.0, + -26.0, + -27.0, + -28.0, + -29.0, + -30.0, + -31.0, + -32.0, + -33.0, + -34.0, + -35.0, + -36.0, + -37.0, + -38.0, + -39.0, + -40.0, + -41.0, + -42.0, + -43.0, + -44.0, + -45.0, + -46.0, + -47.0, + -48.0, + -49.0, + -50.0, + -51.0, + -52.0, + -53.0, + -54.0, + -55.0, + -56.0, + -57.0, + -58.0, + -59.0, + -60.0, + -61.0, + -62.0, + -63.0, + -64.0, + -65.0, + -66.0, + -67.0, + -68.0, + -69.0, + -70.0, + -71.0, + -72.0, + -73.0, + -74.0, + -75.0, + -76.0, + -77.0, + -78.0, + -79.0, + -80.0, + -81.0, + -82.0, + -83.0, +] +roomDimensionValue = [ + 0.5, + 0.707, + 1.0, + 1.4141, + 2.0, + 2.8282, + 4.0, + 5.6568, + 8.0, + 11.314, + 16.0, + 22.627, + 32.0, + 45.255, + 64.0, + 90.51, +] +absorptionCoeffValues = [0.0800, 0.1656, 0.3430, 0.7101] +codedFormats = list(FORMATS) +codedSubFormats = list(SUBFORMATS) +PiTypeNames = list(PIDATAS) + + +def mapNearestIndex(table: list, val: float) -> int: + for idx, entry in enumerate(table): + if abs(entry) >= abs(val): + return idx + return len(table) - 1 + + +getListIndex = lambda mylist, val: mylist.index(val) if val in mylist else -1 + +cmrLookup = [ + CMR( + bandwidth=BANDWIDTH.NB, + codec=CODECS.EVS, + startIdx=0, + endIdx=7, + bitrates=evsBitrates, + ), # 000 = NB-EVS + CMR( + bandwidth=BANDWIDTH.WB, + codec=CODECS.AMRWB, + startIdx=0, + endIdx=9, + bitrates=amrwbBitrates, + ), # 001 = AMRWB IO + CMR( + bandwidth=BANDWIDTH.WB, + codec=CODECS.EVS, + startIdx=0, + endIdx=12, + bitrates=evsBitrates, + ), # 010 = WB-EVS + CMR( + bandwidth=BANDWIDTH.SWB, + codec=CODECS.EVS, + startIdx=3, + endIdx=12, + bitrates=evsBitrates, + ), # 011 = SWB-EVS + CMR( + bandwidth=BANDWIDTH.FB, + codec=CODECS.EVS, + startIdx=5, + endIdx=12, + bitrates=evsBitrates, + ), # 100 = FB-EVS + CMR( + bandwidth=BANDWIDTH.WB, codec=CODECS.EVS, startIdx=0, endIdx=0, bitrates=[] + ), # 101 = WB-CA + CMR( + bandwidth=BANDWIDTH.SWB, codec=CODECS.EVS, startIdx=0, endIdx=0, bitrates=[] + ), # 110 = SWB-CA + CMR( + bandwidth=NO_REQ, + codec=CODECS.IVAS, + startIdx=0, + endIdx=14, + bitrates=ivasBitrates, + ), # 111 = IVAS +] + +q15 = lambda x: int(min(32767.0, max(-32768.0, x * 32768.0))) + + +def unpackUnsupported(bitstrm: ConstBitStream, piSize: int) -> any: + # assert False, "Unsupported PI Data" + return base64.b64encode(bitstrm.read(piSize * 8).tobytes()).decode("utf-8") + + +def packUnsupported(bitstrm: ConstBitStream, data: any) -> any: + assert False, f"unsupported PI Data of type : {type(data)}" + + +def unpackNoPiData(bitstrm: ConstBitStream, piSize: int) -> None: + assert piSize == 0, "NO_PI_DATA should be 0 size" + + +def packNoPiData(bitstrm: BitStream, data: any = None): + pass + + +def unpackOrientations(bitstrm: ConstBitStream, piSize: int) -> list[ORIENTATION]: + assert ( + piSize % 8 + ) == 0 and piSize <= 32, "Incorrect PI Data Size for list[ORIENTATION]" + orientations = list() + while piSize > 0: + w = bitstrm.read(16).int / 32768.0 + x = bitstrm.read(16).int / 32768.0 + y = bitstrm.read(16).int / 32768.0 + z = bitstrm.read(16).int / 32768.0 + orientations.append(ORIENTATION(w, x, y, z)) + piSize -= 8 + return orientations + + +def packOrientations(bitstrm: BitStream, data: any): + assert type(data) == list, "Orientation PI Data expects a data of type list" + for orientation in cast(list, data): + assert ( + type(orientation) == ORIENTATION + ), "Orientation PI Data expects a data of type list[ORIENTATION]" + bitstrm.append(f"intbe:16={q15(orientation.w)}") + bitstrm.append(f"intbe:16={q15(orientation.x)}") + bitstrm.append(f"intbe:16={q15(orientation.y)}") + bitstrm.append(f"intbe:16={q15(orientation.z)}") + + +def unpackPositions(bitstrm: ConstBitStream, piSize: int) -> list[POSITION]: + assert piSize <= 24 and (piSize % 6) == 0, "Incorrect PI Data Size for Positions" + positions = list() + while piSize > 0: + x = bitstrm.read(16).int / 100.0 + y = bitstrm.read(16).int / 100.0 + z = bitstrm.read(16).int / 100.0 + positions.append(POSITION(x, y, z)) + piSize -= 6 + return positions + + +def packPositions(bitstrm: BitStream, data: any): + assert type(data) == list, "Position PI Data expects a data of type list" + positions = cast(list, data) + assert len(positions) <= 4, "Max one position per ISM object" + for position in positions: + assert ( + type(position) == POSITION + ), "Position PI Data expects a data of type list[POSITIONS]" + bitstrm.append(f"intbe:16={q15(position.x / 327.68)}") + bitstrm.append(f"intbe:16={q15(position.y / 327.68)}") + bitstrm.append(f"intbe:16={q15(position.z / 327.68)}") + + +def unpackOrientation(bitstrm: ConstBitStream, piSize: int) -> ORIENTATION: + assert piSize == 8, "Incorrect PI Data Size for ORIENTATION" + orientations = unpackOrientations(bitstrm, piSize) + assert len(orientations) == 1 + return orientations[0] + + +def packOrientation(bitstrm: BitStream, data: any): + assert ( + type(data) == ORIENTATION + ), "Orientation PI Data expects a data of type ORIENTATION" + orientation = cast(ORIENTATION, data) + packOrientations(bitstrm, [orientation]) + + +def unpackPosition(bitstrm: ConstBitStream, piSize: int) -> POSITION: + assert piSize == 6, "Incorrect PI Data Size for POSITION" + positions = unpackPositions(bitstrm, piSize) + assert len(positions) == 1 + return positions[0] + + +def packPosition(bitstrm: BitStream, data: any): + assert type(data) == POSITION, "Position PI Data expects a data of type POSITION" + position = cast(POSITION, data) + packPositions(bitstrm, [position]) + + +def unpackAudioDescription( + bitstrm: ConstBitStream, piSize: int +) -> list[AUDIO_DESCRIPTION]: + assert piSize <= 5, "Incorrect PI Data Size for AUDIO_DESCRIPTION" + ad = list() + for byte in range(piSize): + V = bitstrm.read(1).bool + M = bitstrm.read(1).bool + A = bitstrm.read(1).bool + E = bitstrm.read(1).bool + B = bitstrm.read(1).bool + _ = bitstrm.read(3) + ad.append( + AUDIO_DESCRIPTION( + isSpeech=V, isMusic=M, isAmbiance=A, isEditable=E, isBinaural=B + ) + ) + return ad + + +def packAudioDescription(bitstrm: BitStream, data: any): + assert ( + type(data) == list + ), "Audio Description PI Data expects a data of type list[AUDIO_DESCRIPTION]" + for desc in cast(list, data): + assert ( + type(desc) == AUDIO_DESCRIPTION + ), "Audio Description PI Data expects a data of type list[AUDIO_DESCRIPTION]" + ad = cast(AUDIO_DESCRIPTION, desc) + bitstrm.append(f"bool={ad.isSpeech}") + bitstrm.append(f"bool={ad.isMusic}") + bitstrm.append(f"bool={ad.isAmbiance}") + bitstrm.append(f"bool={ad.isEditable}") + bitstrm.append(f"bool={ad.isBinaural}") + bitstrm.append(f"uint:3=0") + + +def unpackDAS(bitstrm: ConstBitStream, piSize: int) -> list[AUDIO_DESCRIPTION]: + assert piSize == 2, "Incorrect PI Data Size for DYNAMIC_AUDIO_SUPPRESSION" + V = bitstrm.read(1).bool + M = bitstrm.read(1).bool + A = bitstrm.read(1).bool + _ = bitstrm.read(5) + SLI = bitstrm.read(4).uint + _ = bitstrm.read(4) + return DYNAMIC_AUDIO_SUPPRESSION( + preferSpeech=V, preferMusic=M, preferAmbiance=A, level=SLI + ) + + +def packDAS(bitstrm: BitStream, data: any): + assert ( + type(data) == DYNAMIC_AUDIO_SUPPRESSION + ), "Dynamic Audio Suppression PI Data expects a data of type DYNAMIC_AUDIO_SUPPRESSION" + das = cast(DYNAMIC_AUDIO_SUPPRESSION, data) + bitstrm.append(f"bool={das.preferSpeech}") + bitstrm.append(f"bool={das.preferMusic}") + bitstrm.append(f"bool={das.preferAmbiance}") + bitstrm.append(f"uint:5=0") + bitstrm.append(f"uint:4={das.level}") + bitstrm.append(f"uint:4=0") + + +def unpackDiegetic(bitstrm: ConstBitStream, piSize: int) -> DIEGETIC_TYPE: + assert piSize == 1, "Incorrect PI Data Size for DIEGETIC_TYPE" + digType = list() + for _ in range(5): # no way to know how many bits are valid bits, so all 5 read + digType.append(bitstrm.read(1).bool) + bitstrm.bytealign() + return DIEGETIC_TYPE(isDigetic=digType) + + +def packDiegetic(bitstrm: BitStream, data: any): + assert ( + type(data) == DIEGETIC_TYPE + ), "Diegetic type PI Data expects a data of type DIEGETIC_TYPE" + diegetic = cast(DIEGETIC_TYPE, data) + assert ( + len(diegetic.isDigetic) <= 5 + ), "Maximum 1 bit per object + 1 bit for SBA/MASA is required (max 5)" + for isDigetic in diegetic.isDigetic: + bitstrm.append(f"bool={isDigetic}") + nPad = 8 - (bitstrm.pos % 8) + if nPad > 0: + bitstrm.append(f"uint:{nPad}=0") + + +def unpackAcousticEnv(bitstrm: ConstBitStream, piSize: int) -> DIEGETIC_TYPE: + assert ( + piSize == 1 or piSize == 5 or piSize == 8 + ), "Incorrect PI Data Size for ACOUSTIC_ENVIRONMENT" + rt60 = list() + dsr = list() + dim = list() + absCoeff = list() + + if piSize == 1: + bitstrm.read(1) + + aeid = bitstrm.read(7).uint + + if piSize >= 5: + for _ in range(3): + rt60.append(rt60Value[bitstrm.read(5).uint]) + dsr.append(dsrValue[bitstrm.read(6).uint]) + if piSize == 8: + for _ in range(3): + dim.append(roomDimensionValue[bitstrm.read(4).uint]) + for _ in range(6): + absCoeff.append(absorptionCoeffValues[bitstrm.read(2).uint]) + + return ACOUSTIC_ENVIRONMENT( + aeid=aeid, + rt60=tuple(rt60), + dsr=tuple(dsr), + dim=tuple(dim), + abscoeff=tuple(absCoeff), + ) + + +def packAcousticEnv(bitstrm: BitStream, data: any): + assert ( + type(data) == ACOUSTIC_ENVIRONMENT + ), "Diegetic type PI Data expects a data of type ACOUSTIC_ENVIRONMENT" + aenv = cast(ACOUSTIC_ENVIRONMENT, data) + if not aenv.rt60 and not aenv.dsr: + bitstrm.append(f"uint:8={aenv.aeid % 128}") + else: + assert ( + len(aenv.rt60) == 3 and len(aenv.dsr) == 3 + ), "Lo, Mi, Hi only required for RT60 and DSR values" + bitstrm.append(f"uint:7={aenv.aeid % 128}") + for n in range(3): + rt60 = mapNearestIndex(rt60Value, aenv.rt60[n]) + dsr = mapNearestIndex(dsrValue, aenv.dsr[n]) + bitstrm.append(f"uint:5={rt60}") + bitstrm.append(f"uint:6={dsr}") + if aenv.abscoeff and aenv.dim: + assert len(aenv.abscoeff) == 6 and len(aenv.dim) == 3 + for n in range(3): + dim = mapNearestIndex(roomDimensionValue, aenv.dim[n]) + bitstrm.append(f"uint:4={dim}") + for n in range(6): + absCoeff = mapNearestIndex(absorptionCoeffValues, aenv.abscoeff[n]) + bitstrm.append(f"uint:2={absCoeff}") + + +def unpackAudioFocus(bitstrm: ConstBitStream, piSize: int) -> AUDIO_FOCUS: + assert ( + piSize == 1 or piSize == 8 or piSize == 9 + ), "Incorrect PI Data Size for AUDIO_FOCUS" + direction = None + level = None + if piSize == 1: + level = bitstrm.read(4).uint + _ = bitstrm.read(4) + else: + direction = unpackOrientation(bitstrm, 8) + if piSize == 9: + level = bitstrm.read(4).uint + _ = bitstrm.read(4) + + return AUDIO_FOCUS(direction=direction, level=level) + + +def packAudioFocus(bitstrm: BitStream, data: any): + assert ( + type(data) == AUDIO_FOCUS + ), "Audio focus PI Data expects a data of type AUDIO_FOCUS" + auFocus = cast(AUDIO_FOCUS, data) + if auFocus.direction is not None: + packOrientations(bitstrm, [auFocus.direction]) + if auFocus.level is not None: + level = int(auFocus.level) + bitstrm.append(f"uint:4={level}") + bitstrm.append(f"uint:4=0") + + +PIDataUnpacker = [ + unpackOrientation, # SCENE_ORIENTATION, + unpackOrientation, # DEVICE_ORIENTATION_COMPENSATED, + unpackOrientation, # DEVICE_ORIENTATION_UNCOMPENSATED + unpackAcousticEnv, # ACOUSTIC_ENVIRONMENT + unpackAudioDescription, # AUDIO_DESCRIPTION + unpackUnsupported, # ISM_NUM + unpackUnsupported, # ISM_ID + unpackUnsupported, # ISM_GAIN + unpackOrientations, # ISM_ORIENTATION + unpackPositions, # ISM_POSITION + unpackUnsupported, # ISM_DISTANCE_ATTENUATION + unpackUnsupported, # ISM_DIRECTIVITY + unpackDiegetic, # DIEGETIC_TYPE + unpackUnsupported, # RESERVED13 + unpackAudioFocus, # AUDIO_FOCUS_INDICATION + unpackUnsupported, # RESERVED15 + unpackOrientation, # PLAYBACK_DEVICE_ORIENTATION + unpackOrientation, # HEAD_ORIENTATION + unpackPosition, # LISTENER_POSITION + unpackDAS, # DYNAMIC_AUDIO_SUPPRESSION + unpackAudioFocus, # AUDIO_FOCUS_REQUEST + unpackUnsupported, # PI_LATENCY + unpackUnsupported, # R_ISM_ID + unpackUnsupported, # R_ISM_GAIN + unpackOrientation, # R_ISM_ORIENTATION + unpackPosition, # R_ISM_POSITION + unpackUnsupported, # R_ISM_DIRECTION + unpackUnsupported, # RESERVED27 + unpackUnsupported, # RESERVED28 + unpackUnsupported, # RESERVED29 + unpackUnsupported, # RESERVED30 + unpackNoPiData, # NO_DATA +] + +PIDataPacker = [ + packOrientation, # SCENE_ORIENTATION, + packOrientation, # DEVICE_ORIENTATION_COMPENSATED, + packOrientation, # DEVICE_ORIENTATION_UNCOMPENSATED + packAcousticEnv, # ACOUSTIC_ENVIRONMENT + packAudioDescription, # AUDIO_DESCRIPTION + packUnsupported, # ISM_NUM + packUnsupported, # ISM_ID + packUnsupported, # ISM_GAIN + packOrientations, # ISM_ORIENTATION + packPositions, # ISM_POSITION + packUnsupported, # ISM_DISTANCE_ATTENUATION + packUnsupported, # ISM_DIRECTIVITY + packDiegetic, # DIEGETIC_TYPE + packUnsupported, # RESERVED13 + packAudioFocus, # AUDIO_FOCUS_INDICATION + packUnsupported, # RESERVED15 + packOrientation, # PLAYBACK_DEVICE_ORIENTATION + packOrientation, # HEAD_ORIENTATION + packPosition, # LISTENER_POSITION + packDAS, # DYNAMIC_AUDIO_SUPPRESSION + packAudioFocus, # AUDIO_FOCUS_DIRECTION + packUnsupported, # PI_LATENCY + packUnsupported, # R_ISM_ID + packUnsupported, # R_ISM_GAIN + packOrientation, # R_ISM_ORIENTATION + packPosition, # R_ISM_POSITION + packUnsupported, # R_ISM_DIRECTION + packUnsupported, # RESERVED27 + packUnsupported, # RESERVED28 + packUnsupported, # RESERVED29 + packUnsupported, # RESERVED30 + packNoPiData, # NO_DATA +] + + +def ReadG192Bitstream(g192File: Path) -> list[bytes]: + refPackets = list[bytes]() + with open(g192File, "rb") as fd: + refBitStrm = ConstBitStream(fd.read()) + while refBitStrm.pos < refBitStrm.len: + sync = hex(refBitStrm.read(16).intle) + nBits = refBitStrm.read(16).intle + assert sync == "0x6b21", "G192 syncword not found at start of packet" + writer = BitStream() + for _ in range(nBits): + bit = "0b1" if refBitStrm.read(16).uintle == 129 else "0b0" + writer.append(bit) + refPackets.append(writer.tobytes()) + return refPackets + + +def unpackEBytes(bitstrm: ConstBitStream) -> tuple[bool, dict]: + piIndicated = False + requests = dict() + try: + if bitstrm.read(1).bool: + T = bitstrm.read(3).uint + BR = bitstrm.read(4).uint + if T in [5, 6]: # CA MODES + if BR < 8: + requests[REQUESTS.CODEC] = cmrLookup[T].codec + requests[REQUESTS.BR] = 13200 + requests[REQUESTS.CA] = BR + requests[REQUESTS.BW] = cmrLookup[T].bandwidth + else: + raise Exception("Unsupported BR bits in CA Mode") + elif T == 7: # IVAS + if BR < 14: + requests[REQUESTS.CODEC] = cmrLookup[T].codec + requests[REQUESTS.BR] = cmrLookup[T].bitrates[BR] + requests[REQUESTS.CA] = -1 + elif BR == 14: + raise Exception("Reserved BR idx in IVAS EByte") + else: + if BR >= cmrLookup[T].startIdx and BR < cmrLookup[T].endIdx: + requests[REQUESTS.CODEC] = cmrLookup[T].codec + requests[REQUESTS.BR] = cmrLookup[T].bitrates[BR] + requests[REQUESTS.CA] = -1 + requests[REQUESTS.BW] = cmrLookup[T].bandwidth + else: + raise Exception( + "Reserved BR idx in {} EByte".format(cmrLookup[T].codec) + ) + # Try to get all subsequent E-bytes + while bitstrm.read(1).bool: + ET = bitstrm.read(3).uint + if ET == 0: + supportedBW = [ + BANDWIDTH.WB, + BANDWIDTH.SWB, + BANDWIDTH.FB, + BANDWIDTH.NREQ, + ] + reserved = bitstrm.read(2) + BW = bitstrm.read(2).uint + requests[REQUESTS.BW] = supportedBW[BW] + elif ET == 1: + S = bitstrm.read(1).bool + FMT = bitstrm.read(3).uint + if not S: + requests[REQUESTS.FMT] = codedFormats[FMT] + requests[REQUESTS.SUBFMT] = NO_REQ + else: + reserved = bitstrm.read(2) + subFMT = bitstrm.read(6).uint + requests[REQUESTS.FMT] = NO_REQ + requests[REQUESTS.SUBFMT] = codedSubFormats[subFMT] + elif ET == 2: + reserved = bitstrm.read(4) + piIndicated = True + elif ET == 3: + D = bitstrm.read(1).bool + Y = bitstrm.read(1).bool + P = bitstrm.read(1).bool + R = bitstrm.read(1).bool + requests[REQUESTS.SRCFG] = SRCONFIG( + diegetic=D, yaw=Y, pitch=P, roll=R + ) + else: + reserved = bitstrm.read(4) + raise Exception( + "Unsupported subsequent EByte with ET={}".format(ET) + ) + except ReadError as error: + print("Underflow in E-Bytes parsing during unpacking, error = {}".format(error)) + return piIndicated, requests + + +def packEBytes(bitstrm: BitStream, requests: dict[str:any], piIndication: bool = False): + codec = requests[REQUESTS.CODEC] if REQUESTS.CODEC in requests.keys() else NO_REQ + bitrate = requests[REQUESTS.BR] if REQUESTS.BR in requests.keys() else 0 + bandwidth = requests[REQUESTS.BW] if REQUESTS.BW in requests.keys() else NO_REQ + camode = requests[REQUESTS.CA] if REQUESTS.CA in requests.keys() else NO_REQ + format = requests[REQUESTS.FMT] if REQUESTS.FMT in requests.keys() else NO_REQ + subFormat = ( + requests[REQUESTS.SUBFMT] if REQUESTS.SUBFMT in requests.keys() else NO_REQ + ) + srcfg = ( + SRCONFIG(requests[REQUESTS.SRCFG]) + if REQUESTS.SRCFG in requests.keys() + else None + ) + + # Check if any request needs to be sent + isInitialEByteNeeded = ( + piIndication + or bitrate != 0 + or bandwidth != NO_REQ + or camode != NO_REQ + or format != NO_REQ + or subFormat != NO_REQ + or srcfg != None + ) + + if not isInitialEByteNeeded: + return + + if camode != NO_REQ: + T = 6 if bandwidth == BANDWIDTH.SWB else 5 + BR = getListIndex(list(CAMODE), camode) + assert BR > 0, "Channel Aware Mode not supported" + elif bitrate == 0: + T = 7 + BR = 15 + else: + mapBandwidthToTBit = { + BANDWIDTH.NB: 0, + BANDWIDTH.WB: 2, + BANDWIDTH.SWB: 3, + BANDWIDTH.FB: 4, + NO_REQ: 2, + }[bandwidth] + T = {CODECS.AMRWB: 1, CODECS.IVAS: 7, CODECS.EVS: mapBandwidthToTBit}[codec] + BR = getListIndex(requestBitratesForCodec[codec], bitrate) + assert ( + BR >= cmrLookup[T].startIdx and BR < cmrLookup[T].endIdx + ), "EVS Bitrate Index and Bandwidth Combination cannot be requested" + + # Write the Initial E-Byte + bitstrm.append(f"bool={True}") # E-Byte H=1 + bitstrm.append(f"uint:3={T}") + bitstrm.append(f"uint:4={BR}") + + # Subsequent E-bytes follow + if piIndication: + bitstrm.append("hex:8=A0") + + if codec != CODECS.IVAS: + return + + # Bandwidth E-Byte + if bandwidth != NO_REQ: + bw = {BANDWIDTH.WB: 0, BANDWIDTH.SWB: 1, BANDWIDTH.FB: 2}[bandwidth] + bitstrm.append("hex:4=8") + bitstrm.append(f"uint:4={bw}") + + # Coded Format/SubFormat Request E-Byte + if subFormat in SUBFORMATS: + subFmt = getListIndex(codedSubFormats, subFormat) + bitstrm.append("hex:8=9F") # S=0, FMT=111 + bitstrm.append(f"uint:8={subFmt}") + elif format in FORMATS: + fmt = getListIndex(codedFormats, format) + bitstrm.append("hex:4=9") + bitstrm.append(f"bool={False}") # S=0 + bitstrm.append(f"uint:3={fmt}") + + # SR Config E-Byte + with srcfg: + bitstrm.append("hex:4=B") + bitstrm.append(f"bool={srcfg.diegetic}") + bitstrm.append(f"bool={srcfg.yaw}") + bitstrm.append(f"bool={srcfg.pitch}") + bitstrm.append(f"bool={srcfg.roll}") + + +def unpackAUFrames(bitstrm: ConstBitStream, frameList: list[FRAME]): + try: + # Unpack Frame AUs here + for idx, frm in enumerate(frameList): + auSize = (frm.frmSizeBits + 7) // 8 # Zero padded bytes in amrwb_io mode + frm.au = bitstrm.read(auSize * 8).tobytes() + except ReadError as error: + print( + "Underflow in AU Frames parsing during unpacking, error = {}".format(error) + ) + + +def packAUFrames(bitstrm: BitStream, frameList: list[FRAME]): + for frm in frameList: + bitstrm.append(frm.au) + bitstrm.bytealign() + + +def unpackToCBytes( + bitstrm: ConstBitStream, rtpTimestamp: int, Codec: CODECS +) -> list[FRAME]: + F = True + frmList = list() + + try: + while F: + F = bitstrm.read(1).bool + FT = bitstrm.read(2).uint + BR = bitstrm.read(4).uint + frm = FRAME(timestamp=rtpTimestamp) + if FT == 1: + frm.codec = CODECS.IVAS + if BR == 14: + supportedBitrates = [-1, 256000, 384000, 512000] + reserved = bitstrm.read(1) + D = bitstrm.read(1).bool + C = bitstrm.read(1).bool + SR_BR = bitstrm.read(2).uint + reserved = bitstrm.read(3) + if SR_BR == 0: + raise Exception("Reserved bitrate in SR Config ToC Indicated") + frm.srInfo = SRINFO( + bitrate=supportedBitrates[SR_BR], + diegetic=D, + transportCodec=SRCODEC.LC3PLUS if C else SRCODEC.LCLD, + ) + else: + frm.bitrate = ivasBitrates[BR] + elif FT == 0: + # Codec switch only if not NO_DATA_FRAME, as IVAS/EVS signal using this + frm.codec = CODECS.EVS if BR != 15 else Codec + if BR == 13: + raise Exception("Reserved bitrate in EVS ToC Indicated") + frm.speechLost = BR == 14 + frm.bitrate = evsBitrates[BR] if BR < 13 else 0 + else: + frm.codec = CODECS.AMRWB + if BR >= 10 and BR <= 13: + raise Exception("Reserved bitrate in AMRWB-IO ToC Indicated") + frm.speechLost = (BR == 14) or (FT == 2) + frm.bitrate = amrwbBitrates[BR] if BR < 10 else 0 + frm.frmSizeBits = frm.bitrate // 50 + rtpTimestamp += 320 + frmList.append(frm) + if F: + # skip all frame specific E-bytes before next header + while bitstrm.read(1).bool: + print("Skipping unsupported frame specific subsequent Ebytes") + reserved = bitstrm.read(7) + except ReadError as error: + print("Underflow in ToC parsing during unpacking, error = {}".format(error)) + + return frmList + + +def packToCBytes(bitstrm: BitStream, frameList: list[FRAME]): + numFrames = len(frameList) + + for idx, frame in enumerate(frameList): + F = idx != (numFrames - 1) + FT = 0 + BR = 0 + if frame.frmSizeBits == 0: + FT = ( + 3 if frame.codec == CODECS.AMRWB else 0 + ) # Only AMRWB or EVS support 0 frame case + BR = 14 if frame.speechLost else 15 # SPEECH_LOST or NO_DATA + elif frame.codec == CODECS.AMRWB: + FT = 3 + BR = getListIndex(amrwbBitrates[0:10], frame.bitrate) + elif frame.codec == CODECS.IVAS: + FT = 1 + BR = 14 if frame.srInfo else getListIndex(ivasBitrates, frame.bitrate) + else: + FT = 0 + BR = getListIndex(evsBitrates[0:13], frame.bitrate) + + assert BR >= 0, "Index for table not found" + + bitstrm.append(f"bool={False}") # ToC 0 bit + bitstrm.append(f"bool={F}") # Frame follows bit + bitstrm.append(f"uint:2={FT}") # Frame Type + bitstrm.append(f"uint:4={BR}") # Frame Type + if frame.srInfo and frame.codec == CODECS.IVAS: + SRBR = (frame.srInfo.bitrate // 128000) - 1 + bitstrm.append(f"bool={False}") # ToC 0 bit + bitstrm.append(f"bool={frame.srInfo.diegetic}") + bitstrm.append(f"bool={frame.srInfo.transportCodec == SRCODEC.LC3PLUS}") + bitstrm.append(f"uint:2={SRBR}") + bitstrm.append("uint:3=0") + + +def unpackPiData(bitstrm: ConstBitStream, rtpTimestamp: int) -> list[PIDATA]: + piDataList = list[PIDATA]() + try: + # PI Data if Indicated + PF = True + piTimeStamps = rtpTimestamp + while PF: + PF = bitstrm.read(1).bool + PM = bitstrm.read(2).uint + PiType = bitstrm.read(5).uint + PiSize = 0 + byte = 255 + while byte == 255: + byte = bitstrm.read(8).uint + PiSize += byte + + PiFrameData = PIDataUnpacker[PiType](bitstrm, PiSize) + + if PiTypeNames[PiType] != PIDATAS.NO_PI_DATA: + piDataList.append( + PIDATA( + timestamp=( + piTimeStamps if PM != 3 else rtpTimestamp + ), # Generic Pi has base timestamp + type=PiTypeNames[PiType], + data=PiFrameData, + ) + ) + piTimeStamps += 320 if PM == 2 else 0 + except ReadError as error: + print("Underflow before completion of unpacking, error = {}".format(error)) + return piDataList + + +def packPiData( + bitstrm: BitStream, rtpTimestampBounds: tuple[int, int], piDataList: list[PIDATA] +): + # sort the piDataList by timestamp and eliminate data where timestamp is OOB for this packet + piDataList = [ + data + for data in piDataList + if ( + data.timestamp >= rtpTimestampBounds[0] + and data.timestamp < rtpTimestampBounds[1] + ) + ] + sorted(piDataList, key=lambda data: data.timestamp) + numPiData = len(piDataList) + + # Group PI data by timestamps + piDataDict = dict[int, list[PIDATA]]() + for data in piDataList: + ts = (data.timestamp // 320) * 320 + if ts not in piDataDict.keys(): + piDataDict[ts] = list() + piDataDict[ts].append(data) + + curTimestamp = rtpTimestampBounds[0] + for ts, dataList in piDataDict.items(): + while curTimestamp < ts: + # Inset NO_PI_DATA till current Timestamp is reached + bitstrm.append( + "hex:16=DF00" + ) # PI Frame follows, Last PI header for this frame, NO_PI_DATA, size=0 + curTimestamp += 320 + + for idx, data in enumerate(dataList): + pack = BitStream() + PM = 2 if idx == len(dataList) - 1 else 1 + PF = 0 if numPiData == 1 else 1 + TYPE = PiTypeNames.index(data.type) + PIDataPacker[TYPE](pack, data.data) + assert (pack.pos % 8) == 0, "PI data must be byte aligned" + SIZE = pack.pos // 8 + assert ( + SIZE < MAX_PACKED_PI_SIZE + ), f"Packed PI Size should be less than MAX_PACKED_PI_SIZE ({MAX_PACKED_PI_SIZE})" + + bitstrm.append(f"uint:1={PF}") + bitstrm.append(f"uint:2={PM}") + bitstrm.append(f"uint:5={TYPE}") + bitstrm.append(f"uint:8={SIZE}") + bitstrm.append(pack) + numPiData -= 1 + curTimestamp += 320 + assert numPiData == 0, "Not all PI data was packed due to internal error" + + +@dataclass +class IvasPayload: + frameList: list[FRAME] = field(default_factory=list) + piDataList: list[PIDATA] = field(default_factory=list) + requests: dict[str, any] = field(default_factory=dict) + + def pack(self, bitstrm: BitStream) -> int: + piIndication = len(self.piDataList) > 0 + numFrames = len(self.frameList) + packEBytes(bitstrm, self.requests, piIndication) + packToCBytes(bitstrm, self.frameList) + packAUFrames(bitstrm, self.frameList) + if piIndication: + packPiData( + bitstrm, + (self.frameList[0].timestamp, self.frameList[-1].timestamp + 320), + self.piDataList, + ) + return numFrames + + @classmethod + def unpack(cls, bitstrm: ConstBitStream, rtpTimestamp: int, Codec: CODECS): + # Unpack the E-bytes + piIndicated, requests = unpackEBytes(bitstrm) + + # ToC Byte parsing starts with 'F' bit as H bit is already read for E-byte parsing + frameList = unpackToCBytes(bitstrm, rtpTimestamp, Codec) + + # Extract packed AU + unpackAUFrames(bitstrm, frameList) + + if piIndicated: + piDataList = unpackPiData(bitstrm, rtpTimestamp) + else: + piDataList = list() + + return cls(frameList=frameList, piDataList=piDataList, requests=requests) + + +@dataclass +class IvasPacket: + hdr: RTPHDR = field(default_factory=RTPHDR) + payload: IvasPayload = field(default_factory=IvasPayload) + + def pack(self, bitstrm: BitStream): + self.hdr.pack(bitstrm) + numFrames = self.payload.pack(bitstrm) + self.hdr.updateHeader(numFrames) + + +class IvasRtp: + def __init__(self, numFramesPerPacket=4, codec: CODECS = CODECS.IVAS): + self.numFramesPerPacket = numFramesPerPacket + self.packets = list[IvasPacket]() + self.Codec: CODECS = codec # Track last frame's codec + self.requests = dict() + self.piData = dict() + + def dumpToJSON(self, jsonFileName): + with open(jsonFileName, "w") as fd: + packets = list() + for packet in self.packets: + packetDict = asdict(packet) + for frame in packetDict["payload"]["frameList"]: + frame["au"] = base64.b64encode(frame["au"]).decode("utf-8") + packets.append(packetDict) + json_output = json.dumps(packets, indent=4) + fd.write(json_output) + + def requestReader(self, timestamp: int) -> dict[str, any]: + tsList = sorted(self.requests.keys()) + if len(tsList) > 0: + lastTs = int(tsList[0]) + for ts in tsList: + if timestamp >= lastTs and timestamp < int(ts): + return self.requests[str(lastTs)] + lastTs = int(ts) + return dict() + + def piDataReader(self, startTimestamp: int, endTimestamp: int) -> list[PIDATA]: + + piDataList = list() + while startTimestamp < endTimestamp: + ts = str(startTimestamp) + if ts in self.piData.keys(): + for piTypes in self.piData[ts].keys(): + dataDict = self.piData[ts][piTypes] + if type(dataDict) != dict: + data = dataDict + elif piTypes == PIDATAS.ISM_ORIENTATION: + data = list() + for orientation in dataDict: + data.append(ORIENTATION(**orientation)) + elif ( + "ORIENTATION" in piTypes + or piTypes == PIDATAS.AUDIO_FOCUS_DIRECTION + ): + data = ORIENTATION(**dataDict) + elif piTypes == PIDATAS.ACOUSTIC_ENVIRONMENT: + data = ACOUSTIC_ENVIRONMENT(**dataDict) + elif piTypes == PIDATAS.AUDIO_DESCRIPTION: + data = list() + for desc in dataDict: + data.append(AUDIO_DESCRIPTION(**desc)) + elif piTypes == PIDATAS.DIEGETIC_TYPE: + data = DIEGETIC_TYPE(**dataDict) + elif ( + piTypes == PIDATAS.LISTENER_POSITION + or piTypes == PIDATAS.R_ISM_POSITION + ): + data = POSITION(**dataDict) + elif piTypes == PIDATAS.DYNAMIC_AUDIO_SUPPRESSION: + data = DYNAMIC_AUDIO_SUPPRESSION(**dataDict) + else: + assert False, "Unhandled PI Data" + piDataList.append( + PIDATA(timestamp=startTimestamp, type=piTypes, data=data) + ) + startTimestamp += 320 + return piDataList + + def packG192File( + self, + g192File: Path, + rtpDumpOut: Path, + piData: dict = None, + requestsData: dict = None, + ): + packet = IvasPacket() + packet.hdr.sequenceNum = int("0xFFFF", 16) + packet.hdr.timestamp = 0 + packet.hdr.ssrc = int("0xDEADBEEF", 16) + + self.piData = piData + self.requests = requestsData + + timestamp = packet.hdr.timestamp + piTimestamps = 0 + with open(g192File, "rb") as fin: + with open(rtpDumpOut, "wb") as fout: + refBitStrm = ConstBitStream(fin.read()) + frames = list[FRAME]() + while refBitStrm.pos < refBitStrm.len: + sync = hex(refBitStrm.read(16).intle) + nBits = refBitStrm.read(16).intle + assert ( + sync == "0x6b21" + ), "G192 syncword not found at start of packet" + writer = BitStream() + for _ in range(nBits): + bit = "0b1" if refBitStrm.read(16).uintle == 129 else "0b0" + writer.append(bit) + frames.append( + FRAME( + codec=self.Codec, + frmSizeBits=nBits, + bitrate=nBits * 50, + speechLost=False, + timestamp=timestamp, + au=writer.tobytes(), + ) + ) + if (len(frames) == self.numFramesPerPacket) or ( + refBitStrm.pos == refBitStrm.len + ): + rtpBitstrm = BitStream() + numFrames = len(frames) + packet.payload = IvasPayload( + frameList=frames, + piDataList=self.piDataReader( + piTimestamps, piTimestamps + (numFrames * 320) + ), + requests=self.requestReader(piTimestamps), + ) + packet.pack(bitstrm=rtpBitstrm) + fout.write(struct.pack("i", rtpBitstrm.bytepos)) + fout.write(rtpBitstrm.tobytes()) + frames = list() + piTimestamps += numFrames * 320 + timestamp += 320 + + def getPackets(self): + return self.packets + + def unpackFile(self, rtpDumpFile): + with open(rtpDumpFile, mode="rb") as fd: + while True: + size = fd.read(4) + if not size: + break + size = struct.unpack("i", size)[0] + packet = fd.read(size) + if not packet: + break + self.packets.append(self.unpackPacket(packet)) + + def unpackPacket(self, packet) -> IvasPacket: + bitStrm = ConstBitStream(packet) + hdr = RTPHDR.unpack(bitStrm) + payload = IvasPayload.unpack( + bitStrm, rtpTimestamp=hdr.timestamp, Codec=self.Codec + ) + self.Codec = payload.frameList[ + -1 + ].codec # Last Frame's codec for next frame for NO_DATA case + return IvasPacket(hdr=hdr, payload=payload) + + +class ArgsParser: + def __init__(self): + self.parser = argparse.ArgumentParser() + self.parser.add_argument( + "-r", "--rtpdump", type=str, default=None, help="RTP Dump to unpack" + ) + self.parser.add_argument( + "-j", + "--json", + type=str, + default="unpack.json", + help="Output unpacked RTP frames to JSON file", + ) + self.parser.add_argument( + "-g", + "--g192", + type=str, + default=None, + help="G192 bitstream input for RTP Packing", + ) + self.parser.add_argument( + "-f", + "--framesPerPacket", + type=int, + default=1, + help="Number of IVAS frames per RTP Packet", + ) + self.parser.add_argument( + "-o", + "--outrtpdump", + type=str, + default="output.rtpdump", + help="Output RTP Dump file", + ) + self.parser.add_argument( + "-p", "--piDataJson", type=str, default=None, help="piData to be packed" + ) + self.parser.add_argument( + "-x", "--requestsJson", type=str, default=None, help="Requests to be packed" + ) + + def parse(self): + args = self.parser.parse_args() + return args + + +if __name__ == "__main__": + args = ArgsParser().parse() + rtp = IvasRtp(numFramesPerPacket=args.framesPerPacket) + + if args.rtpdump: + rtp.unpackFile(args.rtpdump) + rtp.dumpToJSON(args.json) + elif args.g192: + piData = dict() + requestsData = dict() + if args.piDataJson: + with open(args.piDataJson) as f: + piData = json.load(f) + if args.requestsJson: + with open(args.requestsJson) as f: + requestsData = json.load(f) + rtp.packG192File( + g192File=args.g192, + rtpDumpOut=args.outrtpdump, + piData=piData, + requestsData=requestsData, + ) diff --git a/tests/rtp/test_rtp.py b/tests/rtp/test_rtp.py new file mode 100644 index 0000000000..69b85dc9af --- /dev/null +++ b/tests/rtp/test_rtp.py @@ -0,0 +1,665 @@ +#!/usr/bin/env python3 + +__copyright__ = """ +(C) 2022-2025 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB, +Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., +Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, +Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other +contributors to this repository. All Rights Reserved. + +This software is protected by copyright law and by international treaties. +The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB, +Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., +Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, +Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other +contributors to this repository retain full ownership rights in their respective contributions in +the software. This notice grants no license of any kind, including but not limited to patent +license, nor is any license granted by implication, estoppel or otherwise. + +Contributors are required to enter into the IVAS codec Public Collaboration agreement before making +contributions. + +This software is provided "AS IS", without any express or implied warranties. The software is in the +development stage. It is intended exclusively for experts who have experience with such software and +solely for the purpose of inspection. All implied warranties of non-infringement, merchantability +and fitness for a particular purpose are hereby disclaimed and excluded. + +Any dispute, controversy or claim arising under or in relation to providing this software shall be +submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in +accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and +the United Nations Convention on Contracts on the International Sales of Goods. +""" + +__doc__ = """ +This test does the following procedure:- + +DECODER TEST +============ +* Encode a input for given format, bitrate and dtx info using IVAS Encoder to g192 +* Generate random PI data to be used for RTP packaging +* Use g192 and random PI data to pack a stream into RTPDump stream using the reference rtp packer implementation in python +* Provide the rtpdump stream to IVAS decoder to decode the stream and generate a PI data dump in a JSON file +* Decode G192 stream using IVAS decoder and compare against output of RTPdump for bit-exactness +* Validate dumped JSON agains original random PI data to check for similar data after deocde + +ENCODER TEST +============ +* Encode the input for given format, bitrate and dtx info using IVAS Encoder to rtpdump directly +* Validate the generated rtp dump using reference python implementation of RTP depacker for following:- + * RTP Header consistency (Timestamp, seq number, etc) + * IVAS Payload (no of frames in packet, bitrate indicated, bitexactness of frames in packet) + * PI Data verification (Timestamp, Pidata Type, similarity of PI data) +""" + +import pytest +import csv +import os +import sys +import random + +from tempfile import TemporaryDirectory +from pathlib import Path +from ivasrtp import * +import numpy as np +from pyaudio3dtools.audiofile import readfile + +ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) +sys.path.append(ROOT_DIR) + +from tests.conftest import EncoderFrontend, DecoderFrontend + + +@pytest.mark.parametrize("dtx", [False, True]) +@pytest.mark.parametrize("bitrate", [6600, 12650, 23850]) +@pytest.mark.parametrize("framesPerPacket", [1, 3, 8]) +def test_rtp_bitstream_amrwb( + test_info, + bitrate: int, + dtx: bool, + framesPerPacket: int, + dut_encoder_frontend: EncoderFrontend, + dut_decoder_frontend: DecoderFrontend, +): + run_rtp_bitstream_tests( + CODECS.AMRWB, + bitrate, + "WB", + "OFF", + "MONO", + dtx, + framesPerPacket, + dut_encoder_frontend, + dut_decoder_frontend, + ) + + +@pytest.mark.parametrize("dtx", [False, True]) +@pytest.mark.parametrize("bitrate", [9600, 24400, 128000]) +@pytest.mark.parametrize("bandwidth", ["NB", "WB", "SWB", "FB"]) +@pytest.mark.parametrize("caMode", ["OFF", "LO", "HI"]) +@pytest.mark.parametrize("framesPerPacket", [1, 3, 8]) +def test_rtp_bitstream_evs( + test_info, + bitrate: int, + bandwidth: str, + caMode: str, + dtx: bool, + framesPerPacket: int, + dut_encoder_frontend: EncoderFrontend, + dut_decoder_frontend: DecoderFrontend, +): + run_rtp_bitstream_tests( + CODECS.EVS, + bitrate, + bandwidth, + caMode, + "MONO", + dtx, + framesPerPacket, + dut_encoder_frontend, + dut_decoder_frontend, + ) + + +@pytest.mark.parametrize("bitrate", [24400, 80000, 512000]) +@pytest.mark.parametrize("bandwidth", ["WB", "SWB", "FB"]) +@pytest.mark.parametrize("format", ["STEREO", "SBA", "MC", "MASA"]) +@pytest.mark.parametrize("framesPerPacket", [8]) +def test_rtp_bitstream_ivas_nodtx( + test_info, + bitrate: int, + bandwidth: str, + format: str, + framesPerPacket: int, + dut_encoder_frontend: EncoderFrontend, + dut_decoder_frontend: DecoderFrontend, +): + run_rtp_bitstream_tests( + CODECS.IVAS, + bitrate, + bandwidth, + "OFF", + format, + False, + framesPerPacket, + dut_encoder_frontend, + dut_decoder_frontend, + ) + + +@pytest.mark.parametrize("bitrate", [13200, 24400, 80000]) +@pytest.mark.parametrize("bandwidth", ["WB", "SWB", "FB"]) +@pytest.mark.parametrize("format", ["STEREO", "SBA"]) +@pytest.mark.parametrize("framesPerPacket", [1, 3]) +def test_rtp_bitstream_ivas_dtx( + test_info, + bitrate: int, + bandwidth: str, + format: str, + framesPerPacket: int, + dut_encoder_frontend: EncoderFrontend, + dut_decoder_frontend: DecoderFrontend, +): + run_rtp_bitstream_tests( + CODECS.IVAS, + bitrate, + bandwidth, + "OFF", + format, + True, + framesPerPacket, + dut_encoder_frontend, + dut_decoder_frontend, + ) + + +def generateRequests(startTs: int, endTs: int) -> dict: + requests = dict() + return requests + + +def generatePiData(startTs: int, endTs: int) -> dict: + data = dict() + + someOrientation = lambda: ORIENTATION( + w=2 * random.random() - 1.0, + x=2 * random.random() - 1.0, + y=2 * random.random() - 1.0, + z=2 * random.random() - 1.0, + ) + somePosition = lambda: POSITION( + x=random.randint(-32788, 32767) / 100.0, + y=random.randint(-32788, 32767) / 100.0, + z=random.randint(-32788, 32767) / 100.0, + ) + someDesc = lambda: AUDIO_DESCRIPTION( + isSpeech=bool(random.getrandbits(1)), + isMusic=bool(random.getrandbits(1)), + isAmbiance=bool(random.getrandbits(1)), + isEditable=bool(random.getrandbits(1)), + isBinaural=bool(random.getrandbits(1)), + ) + someDAS = lambda: DYNAMIC_AUDIO_SUPPRESSION( + preferSpeech=bool(random.getrandbits(1)), + preferMusic=bool(random.getrandbits(1)), + preferAmbiance=bool(random.getrandbits(1)), + level=random.randint(0, 15), + ) + someDIG = lambda: DIEGETIC_TYPE( + isDigetic=[bool(random.getrandbits(1)) for _ in range(random.randint(1, 5))] + ) + someAuFocusDirLvl = lambda: AUDIO_FOCUS( + ORIENTATION( + w=2 * random.random() - 1.0, + x=2 * random.random() - 1.0, + y=2 * random.random() - 1.0, + z=2 * random.random() - 1.0, + ), + level=AUDIO_FOCUS_LEVEL(random.randint(0, 15)), + ) + someAuFocusDir = lambda: AUDIO_FOCUS( + ORIENTATION( + w=2 * random.random() - 1.0, + x=2 * random.random() - 1.0, + y=2 * random.random() - 1.0, + z=2 * random.random() - 1.0, + ) + ) + someAuFocusLvl = lambda: AUDIO_FOCUS(level=AUDIO_FOCUS_LEVEL(random.randint(0, 15))) + someAuFocusList = [someAuFocusDirLvl, someAuFocusDir, someAuFocusLvl] + + for ts in range(startTs, endTs, 320): + pidata = dict() + pidata["SCENE_ORIENTATION"] = someOrientation() + pidata["DEVICE_ORIENTATION_COMPENSATED"] = someOrientation() + pidata["DEVICE_ORIENTATION_UNCOMPENSATED"] = someOrientation() + pidata["PLAYBACK_DEVICE_ORIENTATION"] = someOrientation() + pidata["HEAD_ORIENTATION"] = someOrientation() + pidata["AUDIO_FOCUS_REQUEST"] = random.choice(someAuFocusList)() + pidata["LISTENER_POSITION"] = somePosition() + pidata["DYNAMIC_AUDIO_SUPPRESSION"] = someDAS() + pidata["AUDIO_DESCRIPTION"] = [someDesc() for n in range(random.randint(1, 5))] + pidata["DIEGETIC_TYPE"] = someDIG() + pidata["ACOUSTIC_ENVIRONMENT"] = ACOUSTIC_ENVIRONMENT( + aeid=random.randint(0, 127) + ) + data[str(ts)] = pidata + return data + + +def isEqualFrame(refFrame: bytes, dutFrame: bytes): + assert len(refFrame) == len(dutFrame), "Encoded frame size is different" + for refByte, dutByte in zip(refFrame, dutFrame): + assert ( + refByte == dutByte + ), "Encoded frames should be bitexact between ref and rtpdump" + + +def isEqualOrientation(ref: ORIENTATION, dut: ORIENTATION): + assert abs(ref.w - dut.w) < 0.0001, "Scene Orientation PI Data mismatch in w" + assert abs(ref.x - dut.x) < 0.0001, "Scene Orientation PI Data mismatch in x" + assert abs(ref.y - dut.y) < 0.0001, "Scene Orientation PI Data mismatch in y" + assert abs(ref.z - dut.z) < 0.0001, "Scene Orientation PI Data mismatch in z" + + +def isEqualPosition(ref: POSITION, dut: POSITION): + assert abs(ref.x - dut.x) < 0.3, "Position PI Data mismatch in x" + assert abs(ref.y - dut.y) < 0.3, "Position PI Data mismatch in y" + assert abs(ref.z - dut.z) < 0.3, "Position PI Data mismatch in z" + + +def isEqualAD(ref: AUDIO_DESCRIPTION, dut: AUDIO_DESCRIPTION): + assert ( + ref.isSpeech == dut.isSpeech + ), "Audio Description PI Data mismatch in isSpeech" + assert ref.isMusic == dut.isMusic, "Audio Description PI Data mismatch in isMusic" + assert ( + ref.isAmbiance == dut.isAmbiance + ), "Audio Description PI Data mismatch in isAmbiance" + assert ( + ref.isEditable == dut.isEditable + ), "Audio Description PI Data mismatch in isEditable" + assert ( + ref.isBinaural == dut.isBinaural + ), "Audio Description PI Data mismatch in isBinaural" + + +def isEqualDAS(ref: DYNAMIC_AUDIO_SUPPRESSION, dut: DYNAMIC_AUDIO_SUPPRESSION): + assert ( + ref.preferSpeech == dut.preferSpeech + ), "Dynamic Audio Suppression PI Data mismatch in preferSpeech" + assert ( + ref.preferMusic == dut.preferMusic + ), "Dynamic Audio Suppression PI Data mismatch in preferMusic" + assert ( + ref.preferAmbiance == dut.preferAmbiance + ), "Dynamic Audio Suppression PI Data mismatch in preferAmbiance" + assert ref.level == dut.level, "Dynamic Audio Suppression PI Data mismatch in level" + + +def isEqualDiegetic(ref: DIEGETIC_TYPE, dut: DIEGETIC_TYPE): + for r, d in zip(ref.isDigetic, dut.isDigetic): + assert r == d, f"Diegetic PI Data mismatch {r} != {d}" + + +def isEqualAcousticEnv(ref: ACOUSTIC_ENVIRONMENT, dut: ACOUSTIC_ENVIRONMENT): + assert ref.aeid == dut.aeid, "Acoustic Env PI Data mismatch in Acoustic Identifier" + assert len(ref.rt60) == len(dut.rt60), "Acoustic Env PI Data mismatch in len(rt60)" + assert len(ref.dsr) == len(dut.dsr), "Acoustic Env PI Data mismatch in len(dsr)" + assert len(ref.dim) == len(dut.dim), "Acoustic Env PI Data mismatch in len(dim)" + assert len(ref.abscoeff) == len( + dut.abscoeff + ), "Acoustic Env PI Data mismatch in len(abscoeff)" + for r, d in zip(ref.rt60, dut.rt60): + assert r == d, f"Acoustic Env PI Data mismatch in rt60 {r} != {d}" + + +def isEqualAudioFocus(ref: AUDIO_FOCUS, dut: AUDIO_FOCUS): + if ref.direction is not None or dut.direction is not None: + assert ref.direction is not None, "Audio Focus PI Data missing direction" + assert dut.direction is not None, "Audio Focus PI Data missing direction" + if ref.direction is not None and dut.direction is not None: + assert ( + abs(ref.direction["w"] - dut.direction.w) < 0.0001 + ), "Audio Focus PI Data mismatch in direction w" + assert ( + abs(ref.direction["x"] - dut.direction.x) < 0.0001 + ), "Audio Focus PI Data mismatch in direction x" + assert ( + abs(ref.direction["y"] - dut.direction.y) < 0.0001 + ), "Audio Focus PI Data mismatch in direction y" + assert ( + abs(ref.direction["z"] - dut.direction.z) < 0.0001 + ), "Audio Focus PI Data mismatch in direction z" + assert ref.level == dut.level, "Audio Focus PI Data mismatch in level" + + +class CSVREADER: + def __init__(self, csvFile: Path): + self.rIdx = 0 + self.rows = [] + with open(csvFile, "r") as fd: + self.rows = [row for row in csv.reader(fd)] + self.count = len(self.rows) + + def next(self) -> list[float]: + row = self.rows[self.rIdx] + self.rIdx += 1 + if self.rIdx == self.count: + self.rIdx = 0 + return [float(x) for x in row] + + +class RTPVALIDATE: + + DTX_BITRATES = {CODECS.IVAS: 5200, CODECS.EVS: 2400, CODECS.AMRWB: 1750} + + def __init__(self, codec=CODECS.IVAS, bitrate=24400, framesPerPacket=1, dtx=False): + self.framesPerPacket = framesPerPacket + self.dtx = dtx + self.codec = codec + self.bitrate = bitrate + self.timestamp = 0 + self.seqnum = -1 + self.ssrc = -1 + self.numFrames = 0 + self.validatePiData = False + self.g192File = None + self.frameIdx = 0 + # PI DATA + self.readers: dict[str:CSVREADER] = dict() + + def setPiDataFiles(self, piFiles: tuple[Path]): + self.validatePiData = True + self.readers[PiTypeNames[0]] = CSVREADER(piFiles[0]) + self.readers[PiTypeNames[1]] = CSVREADER(piFiles[1]) + + def setRefG192Bitstream(self, g192File: Path): + self.refPackets = ReadG192Bitstream(g192File) + + def packet(self, packet: IvasPacket): + self.header(packet.hdr) + self.payload(packet.payload) + self.seqnum = (self.seqnum + 1) % 65536 + self.timestamp += 320 * self.numFrames + + def header(self, hdr: RTPHDR): + if self.timestamp == 0: + self.seqnum = hdr.sequenceNum + self.ssrc = hdr.ssrc + assert hdr.version == 2, "RTP Header Version must be 2" + assert self.ssrc == hdr.ssrc, "SSRC changed mid-stream in RTP Header" + assert self.timestamp == hdr.timestamp, "Timestamp mismatch in RTP Header" + assert self.seqnum == hdr.sequenceNum, "Sequence number mismatch in RTP Header" + + def payload(self, payload: IvasPayload): + self.numFrames = len(payload.frameList) + assert ( + self.numFrames >= 1 and self.numFrames <= self.framesPerPacket + ), f"Packet must have atleast 1 frame and atmost {self.framesPerPacket} frames" + + for frame in payload.frameList: + assert self.codec == frame.codec, "Codec mismatch in RTP Payload" + if self.dtx: + assert frame.bitrate in ( + self.bitrate, + RTPVALIDATE.DTX_BITRATES[self.codec], + 0, + ), "Bitrate mismatch in RTP Payload in DTX mode" + else: + assert frame.bitrate == self.bitrate, "Bitrate mismatch in RTP Payload" + + assert self.frameIdx < len(self.refPackets), "No. of frames mismatch" + isEqualFrame(frame.au, self.refPackets[self.frameIdx]) + self.frameIdx += 1 + + # Vallidate the PI Data + if self.validatePiData: + self.piData(payload.piDataList) + + def piData(self, piDataList: list[PIDATA]): + for piData in piDataList: + assert ( + piData.timestamp >= self.timestamp + and piData.timestamp < self.timestamp + (self.numFrames * 320) + ), "PI Data Time stamp is OOB" + assert ( + piData.type == PiTypeNames[0] or piData.type == PiTypeNames[1] + ), "PI Data is neither Scene nor Device Orientation" + assert type(piData.data) == ORIENTATION, "Orientation type data expected" + # validate the PI Data provided is the PI data in the packet + refData = self.readers[piData.type].next() + isEqualOrientation( + ORIENTATION(w=refData[0], x=refData[1], y=refData[2], z=refData[3]), + piData.data, + ) + + +@dataclass +class TVARGS: + TVROOT = Path(ROOT_DIR).joinpath("scripts/testv") + + def __init__(self): + self.tvDict = dict() + self.sceneFile = ( + Path(ROOT_DIR) + .joinpath("scripts/trajectories/azi_plus_2-ele_plus_2-every-25-rows.csv") + .absolute() + ) + self.deviceFile = ( + Path(ROOT_DIR).joinpath("scripts/trajectories/headrot-1.5s.csv").absolute() + ) + + def add(self, fmt: str, inputFile: str, args: list[str] = []): + inputFile = str(TVARGS.TVROOT.joinpath(inputFile).absolute()) + if fmt == "MASA": + args[2] = str(TVARGS.TVROOT.joinpath(args[2]).absolute()) + self.tvDict[fmt] = (inputFile, args) + + def input(self, fmt): + return self.tvDict[fmt][0] + + def args(self, fmt, addPI=False) -> list[str]: + args = [x for x in self.tvDict[fmt][1]] + if addPI and fmt != "MONO": + args += [ + "-scene_orientation", + str(self.sceneFile), + "-device_orientation", + str(self.deviceFile), + ] + return args + + def piFiles(self) -> tuple[Path]: + return (self.sceneFile, self.deviceFile) + + +def run_rtp_bitstream_tests( + codec: CODECS, + bitrate: int, + bandwidth: str, + caMode: str, + format: str, + dtx: bool, + framesPerPacket: int, + dut_encoder_frontend: EncoderFrontend, + dut_decoder_frontend: DecoderFrontend, +): + tvArgs = TVARGS() + tvArgs.add("MONO", "stv48n.wav") + + if dtx: # use bigger file for dtx stereo + tvArgs.add("STEREO", "stvST48n.wav", ["-stereo"]) + else: + tvArgs.add("STEREO", "stv2MASA2TC48c.wav", ["-stereo"]) + + tvArgs.add("MC", "stv51MC48c.wav", ["-mc", "5_1"]) + tvArgs.add("MASA", "stv2MASA2TC48c.wav", ["-masa", "2", "stv2MASA2TC48c.met"]) + tvArgs.add("SBA", "stvFOA48c.wav", ["-sba", "+1"]) + + if (bitrate > 24400 and bandwidth == "NB") or ( + format == "STEREO" and bitrate > 256000 + ): + pytest.skip() + + print( + "Test: dut_encoder_frontend={}, dtx={}, codec:={}, bitrate={}, bandwidth={}, caMode={}, format={},".format( + dut_encoder_frontend._path, dtx, codec, bitrate, bandwidth, caMode, format + ) + ) + + validate = RTPVALIDATE( + codec=codec, bitrate=bitrate, framesPerPacket=framesPerPacket, dtx=dtx + ) + + with TemporaryDirectory() as tmp_dir: + g192Out = ( + Path(tmp_dir) + .joinpath(f"output-{codec}-{bitrate}-{caMode}-{format}-{dtx}.g192") + .absolute() + ) + rtpdumpOut = ( + Path(tmp_dir) + .joinpath(f"output-{codec}-{bitrate}-{caMode}-{format}-{dtx}.rtpdump") + .absolute() + ) + rtpdumpIn = ( + Path(tmp_dir) + .joinpath(f"input-{codec}-{bitrate}-{caMode}-{format}-{dtx}.rtpdump") + .absolute() + ) + pcmOut = ( + Path(tmp_dir) + .joinpath(f"output-{codec}-{bitrate}-{caMode}-{format}-{dtx}.wav") + .absolute() + ) + pcmOutG192 = ( + Path(tmp_dir) + .joinpath(f"output_g192-{codec}-{bitrate}-{caMode}-{format}-{dtx}.wav") + .absolute() + ) + piDataOutJson = ( + Path(tmp_dir) + .joinpath(f"piData-{codec}-{bitrate}-{caMode}-{format}-{dtx}.json") + .absolute() + ) + + # Run WITHOUT rtpdump first to generate reference bitstream + dut_encoder_frontend.run( + bitrate=bitrate, + input_sampling_rate=48, + input_path=tvArgs.input(format), + output_bitstream_path=g192Out, + sba_order=None, + dtx_mode=dtx, + max_band=bandwidth, + add_option_list=tvArgs.args(format), + ) + validate.setRefG192Bitstream(g192File=g192Out) + + packer = IvasRtp(numFramesPerPacket=framesPerPacket, codec=codec) + + if codec == CODECS.IVAS: + outMode = "STEREO" if format == "STEREO" else "BINAURAL" + generatedPIData = generatePiData(0, 16000) + else: + outMode = "" + generatedPIData = dict() + + packer.packG192File( + g192File=g192Out, + rtpDumpOut=rtpdumpIn, + piData=generatedPIData, + requestsData=generateRequests(0, 1600), + ) + + dut_decoder_frontend.run( + output_config=outMode, + output_sampling_rate=48, + input_bitstream_path=g192Out, + output_path=pcmOutG192, + add_option_list=[], + ) + + dut_decoder_frontend.run( + output_config=outMode, + output_sampling_rate=48, + input_bitstream_path=rtpdumpIn, + output_path=pcmOut, + add_option_list=["-VOIP_HF_ONLY=1", "-PiDataFile", str(piDataOutJson)], + ) + + decAudio, fs = readfile(pcmOut) + g192Audio, Fs = readfile(pcmOutG192) + decAudio /= 32768.0 # readfile reuturns 16 bit int + g192Audio /= 32768.0 # readfile reuturns 16 bit int + decAudio = decAudio[4 * 960 :] + assert abs(decAudio.shape[0] - g192Audio.shape[0]) <= ( + 4 * 960 + ), "Decoded PCM Audio is not same length as input" + minSamples = min(decAudio.shape[0], g192Audio.shape[0]) + rmsdB = 10.0 * np.log10( + np.finfo(float).eps + + np.sum(np.abs(g192Audio[:minSamples] - decAudio[:minSamples]) ** 2) + / minSamples + ) + + if dtx: + assert ( + rmsdB < -60.0 + ), "Bitdiff in the RTP unpacked and G192 streams for DTX stream" + else: + assert rmsdB < -96.0, "Bitdiff in the RTP unpacked and G192 streams" + + with open(piDataOutJson, "r") as fd: + decodedPiData = json.load(fd) + assert ( + decodedPiData.keys() == generatedPIData.keys() + ), f"Timestamp of PI data {generatedPIData.keys()} not found in Decoded PI Data {decodedPiData.keys()}" + for ts in generatedPIData.keys(): + for pitype in generatedPIData[ts]: + data = generatedPIData[ts][pitype] + decoded = decodedPiData[ts][pitype] + if type(generatedPIData[ts][pitype]) == ORIENTATION: + isEqualOrientation(ORIENTATION(**decoded), data) + elif type(generatedPIData[ts][pitype]) == POSITION: + isEqualPosition(POSITION(**decoded), data) + elif type(generatedPIData[ts][pitype]) == DYNAMIC_AUDIO_SUPPRESSION: + isEqualDAS(DYNAMIC_AUDIO_SUPPRESSION(**decoded), data) + elif type(generatedPIData[ts][pitype]) == DIEGETIC_TYPE: + isEqualDiegetic(DIEGETIC_TYPE(**decoded), data) + elif type(generatedPIData[ts][pitype]) == ACOUSTIC_ENVIRONMENT: + isEqualAcousticEnv(ACOUSTIC_ENVIRONMENT(**decoded), data) + elif type(generatedPIData[ts][pitype]) == AUDIO_FOCUS: + isEqualAudioFocus(AUDIO_FOCUS(**decoded), data) + elif type(generatedPIData[ts][pitype]) == list: + for r, d in zip( + generatedPIData[ts][pitype], decodedPiData[ts][pitype] + ): + isEqualAD(AUDIO_DESCRIPTION(**d), r) + else: + assert False, "Unsupported PI data found" + + # Generate RTPDUMP + addPI = False if format == "MONO" else True + if addPI: + # Add PI Data to Pack in RTP + validate.setPiDataFiles(tvArgs.piFiles()) + + extra_args = tvArgs.args(format, addPI) + extra_args += ["-rtpdump", str(framesPerPacket)] + dut_encoder_frontend.run( + bitrate=bitrate, + input_sampling_rate=48, + input_path=tvArgs.input(format), + output_bitstream_path=rtpdumpOut, + sba_order=None, + dtx_mode=dtx, + max_band=bandwidth, + add_option_list=extra_args, + ) + + unpacker = IvasRtp() + unpacker.unpackFile(rtpdumpOut) + for packet in unpacker.getPackets(): + validate.packet(packet) -- GitLab From 356e06023a5b662eb4cb6604632cde250b588185 Mon Sep 17 00:00:00 2001 From: Lauros Pajunen Date: Thu, 9 Oct 2025 14:23:31 +0300 Subject: [PATCH 2/7] Add rtpdump test to pipeline --- .gitlab-ci.yml | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 0b735a6df4..ef260a2b99 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -707,6 +707,28 @@ split-rendering-voip-be-to-binaural: junit: - report-junit.xml +# test rtpdump +rtpdump-test: + extends: + - .test-job-linux + - .rules-merge-request-to-main + needs: ["build-codec-linux-make"] + stage: test + script: + - make -j + - testcase_timeout=30 + - python3 -m pytest -q -n auto -rA --junit-xml=report-junit.xml tests/rtp/test_rtp.py --testcase_timeout=$testcase_timeout + artifacts: + name: "mr-$CI_MERGE_REQUEST_IID--sha-$CI_COMMIT_SHORT_SHA--job-$CI_JOB_NAME--results" + expire_in: 1 week + when: always + paths: + - report-junit.xml + expose_as: "rtpdump pytest results" + reports: + junit: + - report-junit.xml + lc3-wrapper-unit-test: extends: - .test-job-linux -- GitLab From f3902b293200b76842bea1216e46c07e02b3fbd3 Mon Sep 17 00:00:00 2001 From: Lauros Pajunen Date: Mon, 13 Oct 2025 17:12:54 +0300 Subject: [PATCH 3/7] Add html report, remove xfail for rtpdump tests --- .gitlab-ci.yml | 3 ++- tests/codec_be_on_mr_nonselection/test_param_file.py | 3 --- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index ef260a2b99..4890fefa32 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -717,13 +717,14 @@ rtpdump-test: script: - make -j - testcase_timeout=30 - - python3 -m pytest -q -n auto -rA --junit-xml=report-junit.xml tests/rtp/test_rtp.py --testcase_timeout=$testcase_timeout + - python3 -m pytest -q -n auto -rA --html=report.html --self-contained-html --junit-xml=report-junit.xml tests/rtp/test_rtp.py --testcase_timeout=$testcase_timeout artifacts: name: "mr-$CI_MERGE_REQUEST_IID--sha-$CI_COMMIT_SHORT_SHA--job-$CI_JOB_NAME--results" expire_in: 1 week when: always paths: - report-junit.xml + - report.html expose_as: "rtpdump pytest results" reports: junit: diff --git a/tests/codec_be_on_mr_nonselection/test_param_file.py b/tests/codec_be_on_mr_nonselection/test_param_file.py index 0e0e082c63..d2acacd39e 100644 --- a/tests/codec_be_on_mr_nonselection/test_param_file.py +++ b/tests/codec_be_on_mr_nonselection/test_param_file.py @@ -228,9 +228,6 @@ def test_param_file_tests( ): enc_opts, dec_opts, sim_opts, eid_opts = param_file_test_dict[test_tag] - if "rtpdump" in test_tag: - pytest.xfail("Skip RTP tests for now") - run_test( test_info, props_to_record, -- GitLab From e6681da504e878f4bdcf7f834210b4cd74f1a5cb Mon Sep 17 00:00:00 2001 From: Lauros Pajunen Date: Tue, 14 Oct 2025 16:04:17 +0300 Subject: [PATCH 4/7] Add tb=no, reduce rtpdump test combinations --- .gitlab-ci.yml | 2 +- tests/rtp/test_rtp.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 4890fefa32..986ec444e1 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -717,7 +717,7 @@ rtpdump-test: script: - make -j - testcase_timeout=30 - - python3 -m pytest -q -n auto -rA --html=report.html --self-contained-html --junit-xml=report-junit.xml tests/rtp/test_rtp.py --testcase_timeout=$testcase_timeout + - python3 -m pytest -q --tb=no -n auto -rA --html=report.html --self-contained-html --junit-xml=report-junit.xml tests/rtp/test_rtp.py --testcase_timeout=$testcase_timeout artifacts: name: "mr-$CI_MERGE_REQUEST_IID--sha-$CI_COMMIT_SHORT_SHA--job-$CI_JOB_NAME--results" expire_in: 1 week diff --git a/tests/rtp/test_rtp.py b/tests/rtp/test_rtp.py index 69b85dc9af..fd177fc84e 100644 --- a/tests/rtp/test_rtp.py +++ b/tests/rtp/test_rtp.py @@ -94,10 +94,10 @@ def test_rtp_bitstream_amrwb( @pytest.mark.parametrize("dtx", [False, True]) -@pytest.mark.parametrize("bitrate", [9600, 24400, 128000]) -@pytest.mark.parametrize("bandwidth", ["NB", "WB", "SWB", "FB"]) +@pytest.mark.parametrize("bitrate", [24400, 48000]) +@pytest.mark.parametrize("bandwidth", ["WB", "SWB", "FB"]) @pytest.mark.parametrize("caMode", ["OFF", "LO", "HI"]) -@pytest.mark.parametrize("framesPerPacket", [1, 3, 8]) +@pytest.mark.parametrize("framesPerPacket", [3]) def test_rtp_bitstream_evs( test_info, bitrate: int, @@ -121,7 +121,7 @@ def test_rtp_bitstream_evs( ) -@pytest.mark.parametrize("bitrate", [24400, 80000, 512000]) +@pytest.mark.parametrize("bitrate", [16400, 32000, 48000]) @pytest.mark.parametrize("bandwidth", ["WB", "SWB", "FB"]) @pytest.mark.parametrize("format", ["STEREO", "SBA", "MC", "MASA"]) @pytest.mark.parametrize("framesPerPacket", [8]) @@ -147,7 +147,7 @@ def test_rtp_bitstream_ivas_nodtx( ) -@pytest.mark.parametrize("bitrate", [13200, 24400, 80000]) +@pytest.mark.parametrize("bitrate", [13200, 24400, 64000]) @pytest.mark.parametrize("bandwidth", ["WB", "SWB", "FB"]) @pytest.mark.parametrize("format", ["STEREO", "SBA"]) @pytest.mark.parametrize("framesPerPacket", [1, 3]) -- GitLab From 3927ad65aff53a9727eb57bc56e619ab0afc6aac Mon Sep 17 00:00:00 2001 From: Lauros Pajunen Date: Wed, 15 Oct 2025 14:38:07 +0300 Subject: [PATCH 5/7] Reduce rtpdump test timeout to 15min --- .gitlab-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d0789767ef..f532bfb1d8 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -716,7 +716,7 @@ rtpdump-test: stage: test script: - make -j - - testcase_timeout=30 + - testcase_timeout=15 - python3 -m pytest -q --tb=no -n auto -rA --html=report.html --self-contained-html --junit-xml=report-junit.xml tests/rtp/test_rtp.py --testcase_timeout=$testcase_timeout artifacts: name: "mr-$CI_MERGE_REQUEST_IID--sha-$CI_COMMIT_SHORT_SHA--job-$CI_JOB_NAME--results" -- GitLab From a41939636af3b033e56869442f804e2f6f878249 Mon Sep 17 00:00:00 2001 From: Lauros Pajunen Date: Tue, 21 Oct 2025 14:52:13 +0300 Subject: [PATCH 6/7] Correct reader handling for audio focus pi --- tests/rtp/ivasrtp.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/rtp/ivasrtp.py b/tests/rtp/ivasrtp.py index bd13acaf56..94584e901b 100644 --- a/tests/rtp/ivasrtp.py +++ b/tests/rtp/ivasrtp.py @@ -1469,10 +1469,7 @@ class IvasRtp: data = list() for orientation in dataDict: data.append(ORIENTATION(**orientation)) - elif ( - "ORIENTATION" in piTypes - or piTypes == PIDATAS.AUDIO_FOCUS_DIRECTION - ): + elif "ORIENTATION" in piTypes: data = ORIENTATION(**dataDict) elif piTypes == PIDATAS.ACOUSTIC_ENVIRONMENT: data = ACOUSTIC_ENVIRONMENT(**dataDict) @@ -1489,6 +1486,8 @@ class IvasRtp: data = POSITION(**dataDict) elif piTypes == PIDATAS.DYNAMIC_AUDIO_SUPPRESSION: data = DYNAMIC_AUDIO_SUPPRESSION(**dataDict) + elif "AUDIO_FOCUS" in piTypes: + data = AUDIO_FOCUS(**dataDict) else: assert False, "Unhandled PI Data" piDataList.append( -- GitLab From 5d3fac18d3d28e02e111d53b375dbbefc6aab889 Mon Sep 17 00:00:00 2001 From: Ripinder Singh Date: Thu, 23 Oct 2025 19:46:11 +1100 Subject: [PATCH 7/7] Update legal third party notice for bitstring library * Add licenses for bitstring which is dependent on bitarray * bitstring licence is MIT License * bitarray license is PSF-2.0 Signed-off-by: Ripinder Singh --- .../thirdPartyLegalNotices.txt | 81 ++++++++++++++++++- 1 file changed, 79 insertions(+), 2 deletions(-) diff --git a/tests/thirdPartyLegalNotices/thirdPartyLegalNotices.txt b/tests/thirdPartyLegalNotices/thirdPartyLegalNotices.txt index abff6f2cde..14815692f4 100755 --- a/tests/thirdPartyLegalNotices/thirdPartyLegalNotices.txt +++ b/tests/thirdPartyLegalNotices/thirdPartyLegalNotices.txt @@ -101,7 +101,7 @@ exceptiongroup MIT License The MIT License (MIT) -Copyright (c) 2022 Alex Grönholm +Copyright (c) 2022 Alex Grönholm Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -2244,4 +2244,81 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file +SOFTWARE. + + +bitstring +4.3.1 +MIT License +The MIT License + +Copyright (c) 2006 Scott Griffiths (dr.scottgriffiths@gmail.com) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + + +bitarray +3.7.2 +PSF-2.0 +PYTHON SOFTWARE FOUNDATION LICENSE +---------------------------------- + +1. This LICENSE AGREEMENT is between Ilan Schnell, and the Individual or +Organization ("Licensee") accessing and otherwise using this software +("bitarray") in source or binary form and its associated documentation. + +2. Subject to the terms and conditions of this License Agreement, Ilan Schnell +hereby grants Licensee a nonexclusive, royalty-free, world-wide +license to reproduce, analyze, test, perform and/or display publicly, +prepare derivative works, distribute, and otherwise use bitarray +alone or in any derivative version, provided, however, that Ilan Schnell's +License Agreement and Ilan Schnell's notice of copyright, i.e., "Copyright (c) +2008 - 2025 Ilan Schnell; All Rights Reserved" are retained in bitarray +alone or in any derivative version prepared by Licensee. + +3. In the event Licensee prepares a derivative work that is based on +or incorporates bitarray or any part thereof, and wants to make +the derivative work available to others as provided herein, then +Licensee hereby agrees to include in any such work a brief summary of +the changes made to bitarray. + +4. Ilan Schnell is making bitarray available to Licensee on an "AS IS" +basis. ILAN SCHNELL MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR +IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, ILAN SCHNELL MAKES NO AND +DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS +FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF BITARRAY WILL NOT +INFRINGE ANY THIRD PARTY RIGHTS. + +5. ILAN SCHNELL SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF BITARRAY +FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS +A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING BITARRAY, +OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF. + +6. This License Agreement will automatically terminate upon a material +breach of its terms and conditions. + +7. Nothing in this License Agreement shall be deemed to create any +relationship of agency, partnership, or joint venture between Ilan Schnell +and Licensee. This License Agreement does not grant permission to use Ilan +Schnell trademarks or trade name in a trademark sense to endorse or promote +products or services of Licensee, or any third party. + +8. By copying, installing or otherwise using bitarray, Licensee +agrees to be bound by the terms and conditions of this License +Agreement. -- GitLab