Commit 5d8d12e3 authored by sagnowski's avatar sagnowski
Browse files

Port SR + VoIP pytests

parent 34c87d3f
Loading
Loading
Loading
Loading
+0 −0

Empty file added.

+384 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3

"""
   (C) 2022-2024 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB,
   Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
   Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
   Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
   contributors to this repository. All Rights Reserved.

   This software is protected by copyright law and by international treaties.
   The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB,
   Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD.,
   Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange,
   Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other
   contributors to this repository retain full ownership rights in their respective contributions in
   the software. This notice grants no license of any kind, including but not limited to patent
   license, nor is any license granted by implication, estoppel or otherwise.

   Contributors are required to enter into the IVAS codec Public Collaboration agreement before making
   contributions.

   This software is provided "AS IS", without any express or implied warranties. The software is in the
   development stage. It is intended exclusively for experts who have experience with such software and
   solely for the purpose of inspection. All implied warranties of non-infringement, merchantability
   and fitness for a particular purpose are hereby disclaimed and excluded.

   Any dispute, controversy or claim arising under or in relation to providing this software shall be
   submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in
   accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and
   the United Nations Convention on Contracts on the International Sales of Goods.
"""

from __future__ import annotations
import argparse
import math
import sys
from pathlib import Path


class IsarBstoolError(Exception):
    pass


class IsarBitstream:
    def __init__(self, file_path: Path) -> None:
        self.file_path = file_path

        with open(file_path, "rb") as reader:
            self.header = IsarFileHeader(reader)
            self.frames = []

            while reader.peek(1):
                self.frames.append(IsarFileFrame(reader))

    @property
    def duration_seconds(self):
        return self.num_frames * self.isar_frame_size_ms / 1000

    @property
    def duration_samples(self):
        return int(self.duration_seconds * self.sample_rate)

    @property
    def num_frames(self):
        return len(self.frames)

    @property
    def num_empty_frames(self):
        return sum(frame.num_bits == 0 for frame in self.frames)

    @property
    def sample_rate(self):
        return self.header.sample_rate

    @property
    def delay_ns(self):
        return self.header.delay_ns

    @property
    def delay_samples(self):
        return round(self.header.delay_ns * self.sample_rate / 10**9)

    @property
    def isar_frame_size_ms(self):
        return self.header.isar_frame_size_ms

    @property
    def isar_frame_size_samples(self):
        return self.header.isar_frame_size_ms * self.sample_rate // 1000

    @property
    def pose_correction(self):
        return self.header.pose_correction

    @property
    def avg_bitrate_bps(self):
        return sum(frame.num_bits for frame in self.frames) / self.duration_seconds

    @property
    def avg_bitrate_non_empty_frames_bps(self):
        return sum(frame.num_bits for frame in self.frames) / (
            (self.num_frames - self.num_empty_frames) * self.isar_frame_size_ms / 1000
        )

    @property
    def codec(self):
        return self.header.codec

    @property
    def codec_frame_size_ms(self):
        return self.header.codec_frame_size_ms

    @property
    def codec_frame_size_samples(self):
        return self.header.codec_frame_size_ms * self.sample_rate // 1000

    @property
    def lc3plus_hires(self):
        return self.header.lc3plus_hires

    def info(self):
        return (
            "\n"
            f"File             : {self.file_path}\n"
            f"Duration         : {self.duration_seconds} s = {self.duration_samples} samples\n"
            f"Frames           : {self.num_frames} (incl. {self.num_empty_frames} empty)\n"
            f"Sample Rate      : {self.sample_rate} Hz\n"
            f"Delay            : {self.delay_ns} ns = {self.delay_samples} samples\n"
            f"ISAR Frame Size  : {self.isar_frame_size_ms} ms = {self.isar_frame_size_samples} samples\n"
            f"Pose Correction  : {self.pose_correction}\n"
            f"Bitrate          : {self.avg_bitrate_bps:.2f} bps (avg), {self.avg_bitrate_non_empty_frames_bps:.2f} bps (avg non-empty)\n"
            f"Codec            : {self.codec}\n"
            f"Codec Frame Size : {self.codec_frame_size_ms} ms = {self.codec_frame_size_samples} samples\n"
            f"LC3plus HIRES    : {'ON' if self.lc3plus_hires else 'OFF'}\n"
        )

    def write(self, file_path: Path):
        self.file_path = file_path

        with open(file_path, "wb") as writer:
            writer.write(self.header.as_bytes)

            for frame in self.frames:
                writer.write(frame.as_bytes)

    def trim(self, start_time_s: float, length_s: float | None = None) -> IsarBitstream:
        if length_s is None:
            length_s = self.duration_seconds

        start_time_ms = start_time_s * 1000
        length_ms = length_s * 1000

        # Check for unusable values
        if math.isinf(start_time_s) or math.isnan(start_time_s):
            raise IsarBstoolError(f"start_time ({start_time_s} s) has unusable value")
        if math.isinf(length_s) or math.isnan(length_s):
            raise IsarBstoolError(f"length ({length_s} s) has unusable value")

        # Ensure times are not negative
        if start_time_s < 0:
            raise IsarBstoolError(f"start_time ({start_time_s} s) can't be negative")
        if length_s < 0:
            raise IsarBstoolError(f"length ({length_s} s) can't be negative")

        # We can only remove entire frames
        if start_time_ms % self.isar_frame_size_ms != 0:
            raise IsarBstoolError(
                f"start_time ({start_time_s} s) must be an integer multiple of ISAR frame duration ({self.isar_frame_size_ms} ms)"
            )
        if length_ms % self.isar_frame_size_ms != 0:
            raise IsarBstoolError(
                f"length ({length_s} s) must be an integer multiple of ISAR frame duration ({self.isar_frame_size_ms} ms)"
            )

        start_idx = int(start_time_ms / self.isar_frame_size_ms)
        end_idx = start_idx + int(length_ms / self.isar_frame_size_ms)
        self.frames = self.frames[start_idx : min(end_idx, len(self.frames))]

        return self

    def is_same_as(self, other: IsarBitstream) -> bool:
        return self.header == other.header and self.frames == other.frames


class _AsBytes:
    def __init__(self) -> None:
        self.as_bytes = bytearray()

    def _read(self, reader, num_bytes):
        bytes_ = reader.read(num_bytes)
        self.as_bytes.extend(bytes_)
        return bytes_

    def __eq__(self, value: object, /) -> bool:
        if not isinstance(value, _AsBytes):
            return False
        return self.as_bytes == value.as_bytes


class IsarFileHeader(_AsBytes):
    def __init__(self, reader) -> None:
        super().__init__()

        FILE_HEADER = b"MAIN_SPLITH"
        file_header_top = self._read(reader, len(FILE_HEADER))
        if file_header_top != FILE_HEADER:
            raise IsarBstoolError(f"Not a valid ISAR file: {reader.name}")

        self.delay_ns = _int_from_bytes(self._read(reader, 4))
        self.codec = _codec_from_bytes(self._read(reader, 4))
        self.pose_correction = _pose_corr_from_bytes(self._read(reader, 4))
        self.codec_frame_size_ms = _int_from_bytes(self._read(reader, 2))
        self.isar_frame_size_ms = _int_from_bytes(self._read(reader, 2))
        self.sample_rate = _int_from_bytes(self._read(reader, 4))
        self.lc3plus_hires = bool(_int_from_bytes(self._read(reader, 2)))


class IsarFileFrame(_AsBytes):
    def __init__(self, reader) -> None:
        super().__init__()

        FRAME_HEADER = b"SPLIT_FRAME"
        frame_header = self._read(reader, len(FRAME_HEADER))
        if frame_header != FRAME_HEADER:
            raise IsarBstoolError(f"Not a valid ISAR file: {reader.name}")

        version = _int_from_bytes(self._read(reader, 1))
        if version != 0:
            raise IsarBstoolError(
                f"Unupported version of ISAR file format: {reader.name}"
            )

        self.num_bits = _int_from_bytes(self._read(reader, 4))

        payload_size = math.ceil(self.num_bits / 8)
        self.payload = self._read(reader, payload_size)


######################################################################################
#                                    utilities
######################################################################################


def _int_from_bytes(bytes_):
    return int.from_bytes(bytes_, byteorder="little")


def _codec_from_bytes(bytes_):
    # Refer to ISAR_SPLIT_REND_CODEC enum in C code
    CODECS = ["LCLD", "LC3PLUS", "DEFAULT", "NONE"]
    x = _int_from_bytes(bytes_)

    if x < len(CODECS):
        return CODECS[x]

    return "UNKNOWN"


def _pose_corr_from_bytes(bytes_):
    # Refer to ISAR_SPLIT_REND_POSE_CORRECTION_MODE enum in C code
    POSE_CORR_MODES = ["NONE", "CLDFB"]
    x = _int_from_bytes(bytes_)

    if x < len(POSE_CORR_MODES):
        return POSE_CORR_MODES[x]

    return "UNKNOWN"


######################################################################################
#                             subcommand functions
######################################################################################


def _subcmd_info(args):
    bs = IsarBitstream(args.file_in)

    match args.only:
        case "duration_seconds":
            print(bs.duration_seconds)
        case "duration_samples":
            print(bs.duration_samples)
        case "num_frames":
            print(bs.num_frames)
        case "num_empty_frames":
            print(bs.num_empty_frames)
        case "sample_rate":
            print(bs.sample_rate)
        case "delay_ns":
            print(bs.delay_ns)
        case "delay_samples":
            print(bs.delay_samples)
        case "isar_frame_size_ms":
            print(bs.isar_frame_size_ms)
        case "isar_frame_size_samples":
            print(bs.isar_frame_size_samples)
        case "pose_correction":
            print(bs.pose_correction)
        case "avg_bitrate":
            print(bs.avg_bitrate_bps)
        case "avg_bitrate_non_empty_frames":
            print(bs.avg_bitrate_non_empty_frames_bps)
        case "codec":
            print(bs.codec)
        case "codec_frame_size_ms":
            print(bs.codec_frame_size_ms)
        case "codec_frame_size_samples":
            print(bs.codec_frame_size_samples)
        case "lc3plus_hires":
            print("ON" if bs.lc3plus_hires else "OFF")
        case None:
            print(bs.info())
        case _:
            raise IsarBstoolError(f"Not a valid parameter value: '{args.only}'")


def _subcmd_trim(args):
    bs = IsarBitstream(args.file_in)
    bs.trim(float(args.start_time), float(args.length) if args.length else None)
    bs.write(args.file_out)


######################################################################################
#                                     main
######################################################################################

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        prog="isar_bstool",
        description="Utility for inspecting and modifying ISAR bitstreams",
    )
    parser.set_defaults(func=lambda _: parser.print_help())
    subparsers = parser.add_subparsers(title="Commands")

    info = subparsers.add_parser("info", help="Print information about a bitstream")
    info.add_argument("file_in", help="Path to input file")
    info.add_argument(
        "--only",
        help="Print only a specific parameter",
        default=None,
        choices=[
            "duration_seconds",
            "duration_samples",
            "num_frames",
            "num_empty_frames",
            "sample_rate",
            "delay_ns",
            "delay_samples",
            "isar_frame_size_ms",
            "isar_frame_size_samples",
            "pose_correction",
            "avg_bitrate",
            "avg_bitrate_non_empty_frames",
            "codec",
            "codec_frame_size_ms",
            "codec_frame_size_samples",
            "lc3plus_hires",
        ],
    )
    info.set_defaults(func=_subcmd_info)

    trim = subparsers.add_parser(
        "trim", help="Remove initial frames from a bitstream file"
    )
    trim.add_argument("file_in", help="Path to input file")
    trim.add_argument("file_out", help="Path to output file")
    trim.add_argument(
        "start_time",
        help="Start point (in s) from which content should be copied to the output.",
    )
    trim.add_argument(
        "--length",
        help="Amount of time (in s) to copy to the output. If not given, content is copied until the end of the input is reached.",
        default=None,
    )
    trim.set_defaults(func=_subcmd_trim)

    args = parser.parse_args()

    try:
        args.func(args)
    except (FileNotFoundError, PermissionError, IsarBstoolError) as e:
        print(e, file=sys.stderr)
        sys.exit(1)
+6000 −0

File added.

Preview size limit exceeded, changes collapsed.

+6020 −0

File added.

Preview size limit exceeded, changes collapsed.

+6032 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading