Commit 6dca357c authored by Ripinder Singh's avatar Ripinder Singh
Browse files

Add pytest for PACK API



- Add python version of Unpack
- Check RTP Header for Timestamp, Sequenc Number, Header, SSRC
- Check Payload for Codec, Bitrate and AU bytes
- Check PI data for Type, Size and Pi data bytes

Signed-off-by: default avatarSingh, Ripinder <ripinder.singh@dolby.com>
parent a1458540
Loading
Loading
Loading
Loading
Loading

tests/rtp/ivasrtp.py

0 → 100644
+446 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3

__copyright__ = """
(C) 2022-2025 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB,
Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
contributors to this repository. All Rights Reserved.

This software is protected by copyright law and by international treaties.
The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB,
Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
contributors to this repository retain full ownership rights in their respective contributions in
the software. This notice grants no license of any kind, including but not limited to patent
license, nor is any license granted by implication, estoppel or otherwise.

Contributors are required to enter into the IVAS codec Public Collaboration agreement before making
contributions.

This software is provided "AS IS", without any express or implied warranties. The software is in the
development stage. It is intended exclusively for experts who have experience with such software and
solely for the purpose of inspection. All implied warranties of non-infringement, merchantability
and fitness for a particular purpose are hereby disclaimed and excluded.

Any dispute, controversy or claim arising under or in relation to providing this software shall be
submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in
accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and
the United Nations Convention on Contracts on the International Sales of Goods.
"""

__doc__ = """
To configure test modules.
"""

import struct
from enum import Enum
from dataclasses import dataclass, field, asdict
from bitstring import ConstBitStream, ReadError
import json
import base64
import argparse

class CODECS(str, Enum):
    AMRWB = "amrwb_io"
    EVS = "evs"
    IVAS = "ivas"

class SRCODEC(str, Enum):
    LCLD = "lcld"
    LC3PLUS = "lc3+"
    NA = "NONE"

class BANDWIDTH(str, Enum):
    NB = "narrowband"
    WB = "wideband"
    SWB = "super wideband"
    FB = "fullband"
    NREQ = "NO_REQ"

class REQUESTS(str, Enum):
    CODEC = "codec",
    BR = "bitrate"
    BW = "bandwidth"
    CA = "ca-mode"
    FMT = "format"
    SUBFMT = "sub-format"
    SRCFG = "sr-config"

@dataclass
class RTPHDR:
    version: int = 2
    padding: bool = False
    extension: bool = False
    csrcCount: int = 0
    marker: bool = False
    payloadType: int = 0
    sequenceNum: int = 0
    timestamp: int = 0
    ssrc: int = 0
    extensionType: int = 0
    extensionLength: int = 0
    csrcList: list = field(default_factory=list)
    extensionBytes: list = field(default_factory=list)

    def __init__(self, bitstrm: ConstBitStream):
        self.version = bitstrm.read(2).uint
        self.padding = bitstrm.read(1).bool
        self.extension = bitstrm.read(1).bool
        self.csrcCount = bitstrm.read(4).int
        self.marker = bitstrm.read(1).bool
        self.payloadType = bitstrm.read(7).int
        self.sequenceNum = bitstrm.read(16).uintbe
        self.timestamp = bitstrm.read(32).uintbe
        self.ssrc = bitstrm.read(32).uintbe
        self.csrcList = [ bitstrm.read(32).uintbe for _ in range(self.csrcCount) ]
        if self.extension:
            self.extensionType = bitstrm.read(16).uintbe
            self.extensionLength = bitstrm.read(16).uintbe
            self.extensionBytes = [ bitstrm.read(32).uintbe for _ in range(self.extensionLength) ]
        else:
            self.extensionType = 0
            self.extensionLength = 0
            self.extensionBytes = list()    

@dataclass
class SRCONFIG:
    enabled: bool = False
    diegetic: bool = False
    yaw: bool = False
    pitch: bool = False
    roll: bool = False

@dataclass
class CMR:
    bandwidth: BANDWIDTH
    codec: CODECS = CODECS.IVAS
    startIdx: int = 0
    endIdx: int = 0
    bitrates: list = field(default_factory=list)

@dataclass
class SRINFO:
    bitrate: int = 0
    diegetic: bool = False
    transportCodec: SRCODEC = SRCODEC.NA

@dataclass
class FRAME:
    codec: CODECS = CODECS.IVAS
    frmSizeBits: int = 0
    bitrate: int = 0
    speechLost: bool = False
    srInfo: SRINFO = None
    timestamp: int = 0
    au: bytes = field(default_factory=bytes)

@dataclass
class ORIENTATION:
    w: float = 0.0
    x: float = 0.0
    y: float = 0.0
    z: float = 0.0

@dataclass
class PIDATA:
    timestamp: int = 0
    type: str = "NO_PI_DATA"
    data: any = None

ivasBitrates = [13200, 16400, 24400, 32000, 48000, 64000, 80000, 96000, 128000, 160000, 192000, 256000, 384000, 512000, -1, 5200]
evsBitrates = [5900, 7200, 8000, 9600, 13200, 16400, 24400, 32000, 48000, 64000, 96000, 128000, 2400, -1, -1, -1]
amrwbBitrates = [6600, 8850, 12650, 14250, 15850, 18250, 19850, 23050, 23850, 1750, -1, -1, -1, -1, -1, -1]
cmrLookup = [
    CMR(bandwidth=BANDWIDTH.NB, codec=CODECS.EVS, startIdx=0, endIdx=7, bitrates = evsBitrates), #000 = NB-EVS
    CMR(bandwidth=BANDWIDTH.WB, codec=CODECS.AMRWB, startIdx=0, endIdx=9, bitrates = amrwbBitrates), #001 = AMRWB IO
    CMR(bandwidth=BANDWIDTH.WB, codec=CODECS.EVS, startIdx=0, endIdx=12, bitrates = evsBitrates), #010 = WB-EVS
    CMR(bandwidth=BANDWIDTH.SWB, codec=CODECS.EVS, startIdx=3, endIdx=12, bitrates = evsBitrates), #011 = SWB-EVS
    CMR(bandwidth=BANDWIDTH.FB, codec=CODECS.EVS, startIdx=5, endIdx=12, bitrates = evsBitrates), #100 = FB-EVS
    CMR(bandwidth=BANDWIDTH.WB, codec=CODECS.EVS, startIdx=0, endIdx=0, bitrates = []), #101 = WB-CA
    CMR(bandwidth=BANDWIDTH.SWB, codec=CODECS.EVS, startIdx=0, endIdx=0, bitrates = []), #110 = SWB-CA
    CMR(bandwidth=BANDWIDTH.NREQ, codec=CODECS.IVAS, startIdx=0, endIdx=14, bitrates = ivasBitrates), #111 = IVAS
]

codedFormats = ["Stereo", "SBA", "MASA", "ISM", "MC", "OMASA", "OSBA", "NO_REQ"]
codedSubFormats = [
    "FOA planar", "HOA2 planar", "HOA3 planar", "FOA", "HOA2", "HOA3", "MASA1", "MASA2", "ISM1",
    "ISM2", "ISM3", "ISM4", "ISM1 extended metadata", "ISM2 extended metadata", 
    "ISM3 extended metadata", "ISM4 extended metadata", "MC 5.1", "MC 7.1", "MC 5.1.2", "MC 5.1.4",
    "MC 7.1.4", "Reserved", "Reserved", "Reserved", "Reserved", "Reserved", "Reserved", "Reserved",
    "Reserved", "Reserved", "Reserved", "Reserved", "OMASA ISM1 1TC", "OMASA ISM2 1TC",
    "OMASA ISM3 1TC", "OMASA ISM4 1TC", "OMASA ISM1 2TC", "OMASA ISM2 2TC", "OMASA ISM3 2TC",
    "OMASA ISM4 2TC", "OSBA ISM1 FOA planar", "OSBA ISM2 FOA planar", "OSBA ISM3 FOA planar",
    "OSBA ISM4 FOA planar", "OSBA ISM1 FOA", "OSBA ISM2 FOA", "OSBA ISM3 FOA", "OSBA ISM4 FOA",
    "OSBA ISM1 HOA2 planar", "OSBA ISM2 HOA2 planar", "OSBA ISM3 HOA2 planar", "OSBA ISM4 HOA2 planar",
    "OSBA ISM1 HOA2", "OSBA ISM2 HOA2", "OSBA ISM3 HOA2", "OSBA ISM4 HOA2", "OSBA ISM1 HOA3 planar",
    "OSBA ISM2 HOA3 planar", "OSBA ISM3 HOA3 planar", "OSBA ISM4 HOA3 planar", "OSBA ISM1 HOA3",
    "OSBA ISM2 HOA3", "OSBA ISM3 HOA3", "OSBA ISM4 HOA3"]
PiTypeNames = [
    "SCENE_ORIENTATION", "DEVICE_ORIENTATION_COMPENSATED", "DEVICE_ORIENTATION_UNCOMPENSATED", 
    "ACOUSTIC_ENVIRONMENT", "AUDIO_DESCRIPTION", "ISM_NUM", "ISM_ID", "ISM_GAIN", "ISM_ORIENTATION",
    "ISM_POSITION", "ISM_DISTANCE_ATTENUATION", "ISM_DIRECTIVITY", "DIEGETIC_TYPE", "RESERVED13", "RESERVED14",
    "RESERVED15","PLAYBACK_DEVICE_ORIENTATION", "HEAD_ORIENTATION", "LISTENER_POSITION", "DYNAMIC_AUDIO_SUPPRESSION",
    "AUDIO_FOCUS_DIRECTION", "PI_LATENCY", "R_ISM_ID", "R_ISM_GAIN", "R_ISM_ORIENTATION", "R_ISM_POSITION", "R_ISM_DIRECTION",
    "RESERVED27", "RESERVED28", "RESERVED29", "RESERVED30", "NO_PI_DATA"
]

def unpackUnsupported(bitstrm: ConstBitStream, piSize: int) -> any:
    #assert False, "Unsupported PI Data"
    return base64.b64encode(bitstrm.read(piSize * 8).tobytes()).decode('utf-8')

def unpackNoPiData(bitstrm: ConstBitStream, piSize: int) -> None:
    assert piSize == 0, "NO_PI_DATA should be 0 size"

def unpackOrientation(bitstrm: ConstBitStream, piSize: int) -> ORIENTATION:
    assert piSize == 8, "Incorrect PI Data Size for Orientation"
    w = bitstrm.read(16).int / 32768.0
    x = bitstrm.read(16).int / 32768.0
    y = bitstrm.read(16).int / 32768.0
    z = bitstrm.read(16).int / 32768.0
    return ORIENTATION(w, x, y, z)

PIDataUnpacker = [
    unpackOrientation, # SCENE_ORIENTATION,
    unpackOrientation, # DEVICE_ORIENTATION_COMPENSATED,
    unpackOrientation, # DEVICE_ORIENTATION_UNCOMPENSATED
    unpackUnsupported, # ACOUSTIC_ENVIRONMENT
    unpackUnsupported, # AUDIO_DESCRIPTION
    unpackUnsupported, # ISM_NUM
    unpackUnsupported, # ISM_ID
    unpackUnsupported, # ISM_GAIN
    unpackUnsupported, # ISM_ORIENTATION
    unpackUnsupported, # ISM_POSITION
    unpackUnsupported, # ISM_DISTANCE_ATTENUATION
    unpackUnsupported, # ISM_DIRECTIVITY
    unpackUnsupported, # DIEGETIC_TYPE
    unpackUnsupported, # RESERVED13
    unpackUnsupported, # RESERVED14
    unpackUnsupported, # RESERVED15
    unpackUnsupported, # PLAYBACK_DEVICE_ORIENTATION
    unpackUnsupported, # HEAD_ORIENTATION
    unpackUnsupported, # LISTENER_POSITION
    unpackUnsupported, # DYNAMIC_AUDIO_SUPPRESSION
    unpackUnsupported, # AUDIO_FOCUS_DIRECTION
    unpackUnsupported, # PI_LATENCY
    unpackUnsupported, # R_ISM_ID
    unpackUnsupported, # R_ISM_GAIN
    unpackUnsupported, # R_ISM_ORIENTATION
    unpackUnsupported, # R_ISM_POSITION
    unpackUnsupported, # R_ISM_DIRECTION
    unpackUnsupported, # RESERVED27
    unpackUnsupported, # RESERVED28
    unpackUnsupported, # RESERVED29
    unpackUnsupported, # RESERVED30
    unpackNoPiData     # NO_DATA
]

@dataclass
class IvasPayload:
    lastCodec: CODECS = CODECS.IVAS # Track last frame's codec
    frameList: list[FRAME] = field(default_factory=list)
    piDataList: list[PIDATA] = field(default_factory=list)
    requests: dict [str, any] = field(default_factory=dict)

    def __init__(self, bitstrm: ConstBitStream, rtpTimestamp, lastCodec: CODECS):
        self.lastCodec = lastCodec
        self.frameList = list[FRAME]()
        self.piDataList = list[PIDATA]()
        self.requests = dict()

        try:
            piIndicated = False
            if bitstrm.read(1).bool:
                T = bitstrm.read(3).uint
                BR = bitstrm.read(4).uint

                if T in [5, 6]:#CA MODES
                    if BR < 8:
                        self.requests[REQUESTS.CODEC] = cmrLookup[T].codec
                        self.requests[REQUESTS.BR] = 13200
                        self.requests[REQUESTS.CA] = BR
                        self.requests[REQUESTS.BW] = cmrLookup[T].bandwidth
                    else:
                        raise Exception("Unsupported BR bits in CA Mode")
                elif T == 7: #IVAS
                    if BR < 14 :
                        self.requests[REQUESTS.CODEC] = cmrLookup[T].codec
                        self.requests[REQUESTS.BR] = cmrLookup[T].bitrates[BR]
                        self.requests[REQUESTS.CA] = -1
                    elif BR == 14 :
                        raise Exception("Reserved BR idx in IVAS EByte")
                else:
                    if BR >= cmrLookup[T].startIdx and BR < cmrLookup[T].endIdx:
                        self.requests[REQUESTS.CODEC] = cmrLookup[T].codec
                        self.requests[REQUESTS.BR] = cmrLookup[T].bitrates[BR]
                        self.requests[REQUESTS.CA] = -1
                        self.requests[REQUESTS.BW] = cmrLookup[T].bandwidth
                    else:
                        raise Exception("Reserved BR idx in {} EByte".format(cmrLookup[T].codec))

                #Try to get all subsequent E-bytes
                while bitstrm.read(1).bool:
                    ET = bitstrm.read(3).uint
                    if ET == 0 :
                        supportedBW = [BANDWIDTH.WB, BANDWIDTH.SWB, BANDWIDTH.FB, BANDWIDTH.NREQ]
                        reserved = bitstrm.read(2)
                        BW = bitstrm.read(2).uint
                        self.requests[REQUESTS.BW] = supportedBW[BW]
                    elif ET == 1 :
                        S = bitstrm.read(1).bool
                        FMT = bitstrm.read(3).uint
                        if not S:
                            self.requests[REQUESTS.FMT] = codedFormats[FMT]
                            self.requests[REQUESTS.SUBFMT] = "NO_REQ"
                        else:
                            reserved = bitstrm.read(2)
                            subFMT = bitstrm.read(6).uint
                            self.requests[REQUESTS.FMT] = "NO_REQ"
                            self.requests[REQUESTS.SUBFMT] = codedSubFormats[subFMT]
                    elif ET == 2:
                        reserved = bitstrm.read(4)
                        piIndicated = True
                    elif ET == 3:
                        D =  bitstrm.read(1).bool
                        Y =  bitstrm.read(1).bool
                        P =  bitstrm.read(1).bool
                        R =  bitstrm.read(1).bool
                        self.requests[REQUESTS.SRCFG] = SRCONFIG(enabled=True, diegetic=D, yaw=Y, pitch=P, roll=R)
                    else:
                        reserved = bitstrm.read(4)
                        raise Exception("Unsupported subsequent EByte with ET={}".format(ET))

            # ToC Byte parsing starts with 'F' bit as H bit is already read above
            frameTimeStamps = rtpTimestamp
            F = True
            frmList = list()
            while F:
                F = bitstrm.read(1).bool
                FT = bitstrm.read(2).uint
                BR = bitstrm.read(4).uint

                frm = FRAME(timestamp=frameTimeStamps)
                if FT == 1 :
                    frm.codec = CODECS.IVAS
                    if BR == 14 :
                        supportedBitrates = [-1, 256000, 384000, 512000]
                        reserved = bitstrm.read(1)
                        D = bitstrm.read(1).bool
                        C = bitstrm.read(1).bool
                        SR_BR = bitstrm.read(2).uint
                        reserved = bitstrm.read(3)
                        if SR_BR == 0:
                            raise Exception("Reserved bitrate in SR Config ToC Indicated")
                        frm.srInfo = SRINFO(bitrate=supportedBitrates[SR_BR], diegetic=D,
                                            transportCodec= SRCODEC.LC3PLUS if C else SRCODEC.LCLD)
                    else:
                        frm.bitrate = ivasBitrates[BR]
                elif FT == 0:
                    # Codec switch only if not NO_DATA_FRAME, as IVAS/EVS signal using this
                    frm.codec = CODECS.EVS if BR != 15 else self.lastCodec
                    if BR == 13:
                        raise Exception("Reserved bitrate in EVS ToC Indicated")
                    frm.speechLost = (BR == 14)
                    frm.bitrate = evsBitrates[BR] if BR < 13 else 0
                else:
                    frm.codec = CODECS.AMRWB
                    if BR >= 10 and BR <= 13:
                        raise Exception("Reserved bitrate in AMRWB-IO ToC Indicated")
                    frm.speechLost = (BR == 14) or (FT == 2)
                    frm.bitrate = amrwbBitrates[BR] if BR < 10 else 0
                frm.frmSizeBits = frm.bitrate // 50
                frameTimeStamps += 320
                frmList.append(frm)

                if F:
                    #skip all frame specific E-bytes before next header
                    while (bitstrm.read(1).bool):
                        print("Skipping unsupported frame specific subsequent Ebytes")
                        reserved = bitstrm.read(7)

            #Frame AUs here
            for (idx, frm) in enumerate(frmList):
                auSize = (frm.frmSizeBits + 7) // 8 # Zero padded bytes in amrwb_io mode
                frm.au = bitstrm.read(auSize * 8).tobytes()
                self.frameList.append(frm)

            # PI Data if Indicated
            piTimeStamps = rtpTimestamp
            PF = piIndicated
            while PF :
                PF = bitstrm.read(1).bool
                PM = bitstrm.read(2).uint
                PiType = bitstrm.read(5).uint
                PiSize = 0
                byte = 255
                while byte == 255:
                    byte = bitstrm.read(8).uint
                    PiSize += byte
                
                PiFrameData = PIDataUnpacker[PiType](bitstrm, PiSize)
                
                self.piDataList.append(
                    PIDATA(timestamp=piTimeStamps if PM != 3 else rtpTimestamp, # Generic Pi has base timestamp
                           type=PiTypeNames[PiType],
                           data=PiFrameData))
                piTimeStamps += 320 if PM == 2 else 0
        except ReadError as error:
            print ("Underflow before completion of unpacking, error = {}".format(error))

@dataclass
class IvasPacket:
    hdr: RTPHDR = field(default_factory=RTPHDR)
    payload: IvasPayload = field(default_factory=IvasPayload)

class IvasRtp:
    def __init__(self):
        self.packets = list[IvasPacket]()
        self.lastCodec: CODECS = CODECS.IVAS # Track last frame's codec

    def dumpToJSON(self, jsonFileName):
        with open(jsonFileName, "w") as fd:
            packets = list()
            for packet in self.packets:
                packets.append(asdict(packet))
            json_output = json.dumps(packets, indent=4)
            fd.write(json_output)

    def getPackets(self):
        return self.packets

    def unpackFile(self, rtpDumpFile):
        with open (rtpDumpFile, mode="rb") as fd:
            while True:
                size = fd.read(4)
                if not size:
                    break
                size = struct.unpack( 'i', size)[0]
                packet = fd.read(size)
                if not packet:
                    break
                self.packets.append(self.unpackPacket(packet))

    def unpackPacket(self, packet) -> IvasPacket :
        bitStrm = ConstBitStream(packet)
        hdr = RTPHDR(bitStrm)
        payload = IvasPayload(bitStrm, rtpTimestamp=hdr.timestamp, lastCodec=self.lastCodec)
        self.lastCodec = payload.frameList[-1].codec #Last Frame's codec for next frame for NO_DATA case
        return IvasPacket(hdr=hdr, payload=payload)

class ArgsParser:
    def __init__(self):
        self.parser = argparse.ArgumentParser()
        self.parser.add_argument("-r", "--rtpdump", type=str, required=True, help="RTP Dump to unpack")
        self.parser.add_argument("-j", "--json", type=str, default="unpack.json", help="Output unpacked RTP frames to JSON file")

    def parse(self):
        args = self.parser.parse_args()
        return args

if __name__ == "__main__":
    args = ArgsParser().parse()
    rtp = IvasRtp()
    if args.rtpdump:
        rtp.unpackFile(args.rtpdump)
        rtp.dumpToJSON(args.json)

tests/rtp/test_rtp.py

0 → 100644
+355 −0

File added.

Preview size limit exceeded, changes collapsed.