Commit 5f621f43 authored by Archit Tamarapu's avatar Archit Tamarapu
Browse files

add measurement and normalization script, update scripts with bugfixes

parent 101cbf22
Loading
Loading
Loading
Loading
Loading
+77 −28
Original line number Diff line number Diff line
@@ -12,7 +12,14 @@ from pathlib import Path

import pandas as pd

sys.path.append("./ivas-processing-scripts")
PROCESSING_SCRIPTS_PATH = (
    Path(__file__).parent.parent.parent.joinpath("ivas-processing-scripts").absolute()
)
if os.environ.get("CI") or not sys.stdout.isatty():
    PROCESSING_SCRIPTS_PATH = Path("./ivas-processing-scripts")
    sys.stdout.reconfigure(line_buffering=True)

sys.path.append(str(PROCESSING_SCRIPTS_PATH))

from ivas_processing_scripts.audiotools.audio import fromfile
from ivas_processing_scripts.audiotools.wrappers.bs1770 import get_loudness
@@ -64,17 +71,47 @@ CFG_TO_PROC_FMT = {
    "OSBA_ISM4_HOA3": "ISM4SBA3",
}

PATTERN_BITRATE = re.compile(r"b([\d_]*|all)(_dtx)?_(swb|wb|fb)")
PATTERN_BITRATE = r"b([\d_]*|all)(_dtx)?_(swb|wb|fb)"
RESULT_OUTPUT_FILE = Path(__file__).parent.parent.joinpath("loudness.csv")


def get_metadata_from_outfile_name(outfile, infile_stem):
def find_metadata_files(format, file):
    metadata = []

    # runIvasCodec metadata outputs are .wav.met and .wav.{0,1,2,3}.csv
    # check also for .met and .{0,1,2,3}.csv
    if "ISM" in format:
        metadata.extend(
            [
                f
                for i in range(4)
                for f in [
                    file.with_suffix(f".{i}.csv"),
                    file.parent / f"{file.name}.{i}.csv",
                ]
                if f.exists()
            ]
        )

    if "MASA" in format:
        metadata.extend(
            [
                f
                for f in [file.with_suffix(".met"), file.with_suffix(".wav.met")]
                if f.exists()
            ]
        )

    return metadata


def get_metadata_from_outfile_name(pattern, outfile, infile_stem):
    outfile_tail = outfile.name.replace(f"{infile_stem}_", "", 1)
    mode_string, dec, outformat, suffix = outfile_tail.split(".")
    assert dec == "dec"
    assert suffix == "wav"

    match = re.search(PATTERN_BITRATE, mode_string)
    match = re.search(pattern, mode_string)
    assert match is not None
    bitrate_str, dtx, bandwidth = match.groups()
    if bitrate_str == "all":
@@ -85,23 +122,27 @@ def get_metadata_from_outfile_name(outfile, infile_stem):
    return outformat, bitrate, dtx is not None, bandwidth


def process_output_file(outfile, infile, format, input_loudness, input_loudness_format):
def process_output_file(
    outfile, infile, format, input_loudness, input_loudness_format, pattern
):
    """Process a single output file"""
    try:
        outformat, bitrate, dtx, bandwidth = get_metadata_from_outfile_name(
            outfile, infile.stem
            pattern,
            outfile,
            infile.stem,
        )

        if outformat == "EXT":
            outformat = format
        # skip planar SBA
        if "Planar" in outfile.name:
            raise NotImplementedError("PlanarSBA measurements ignored")

        metadata = []
        elif outformat == "EXT":
            if format in ["5_1", "7_1", "5_1_2", "5_1_4", "7_1_4"]:
                raise ValueError(f"MC EXT ignored; identical to {format} output")
            outformat = format

        # runIvasCodec metadata outputs are .wav.met and .wav.0.csv
        if "MASA" in outformat:
            metadata.append(outfile.with_suffix(".wav.met"))
        if "ISM" in outformat:
            metadata.append(list(outfile.parent.glob(f"{outfile.name}*.csv")))
        metadata = find_metadata_files(outformat, outfile)

        output_audio = fromfile(outformat.upper(), outfile, in_meta=metadata)
        output_loudness, scale_factor, output_loudness_format = get_loudness(
@@ -143,7 +184,7 @@ def process_output_file(outfile, infile, format, input_loudness, input_loudness_
            "scale_factor": scale_factor,
            "scale_cmd": scale_cmd,
        }
    except (AssertionError, ValueError) as e:
    except (AssertionError, NotImplementedError, ValueError) as e:
        print(f"\n⚠️  Skipping {outfile.stem}: {e}", file=sys.stderr)
        return None

@@ -156,6 +197,7 @@ def main(args):
    FORMAT_2_FILE = {
        CFG_TO_PROC_FMT[k]: Path(v) for k, v in config.items() if k != "SBA"
    }
    cfg_formats = args.formats.copy()

    # mono and stereo are only accepted as lowercase for runIvasCodec.py modes
    # multichannel modes are prefixed with "MC_" for filtering, but without the prefix elsewhere
@@ -174,6 +216,7 @@ def main(args):
    }
    args.formats = [replacements.get(f, f).upper() for f in args.formats]

    # validate supplied formats
    valid_formats = [f for f in args.formats if f in CFG_TO_PROC_FMT]
    invalid_formats = [f for f in args.formats if f not in CFG_TO_PROC_FMT]
    if invalid_formats:
@@ -185,29 +228,28 @@ def main(args):
        print("No valid formats to process, exiting...")
        exit(-1)
    # map to proc scripts format names internally
    valid_formats = [CFG_TO_PROC_FMT[f] for f in valid_formats]
    proc_formats = [CFG_TO_PROC_FMT[f] for f in valid_formats]

    results = []

    input_audio_cache = {}
    tasks = []

    for format in valid_formats:
        infile = FORMAT_2_FILE[format]
    # we need to loop over both format names so we can get a proper regex
    for cfg_fmt, proc_fmt in zip(cfg_formats, proc_formats):
        infile = FORMAT_2_FILE[proc_fmt]
        output_folder = OUTPUT_FOLDER
        pattern = re.compile(rf"{re.escape(cfg_fmt)}_{PATTERN_BITRATE}")

        # Load and cache input audio once per format
        if format not in input_audio_cache:
            if format.startswith("MASA"):
                input_audio = fromfile(
                    format, infile, in_meta=infile.with_suffix(".met")
                )
            else:
                input_audio = fromfile(format, infile)
        if proc_fmt not in input_audio_cache:
            metadata = find_metadata_files(proc_fmt, infile)

            input_audio = fromfile(proc_fmt, infile, in_meta=metadata)
            input_loudness, _, input_loudness_format = get_loudness(input_audio)
            input_audio_cache[format] = (input_loudness, input_loudness_format)
            input_audio_cache[proc_fmt] = (input_loudness, input_loudness_format)

        input_loudness, input_loudness_format = input_audio_cache[format]
        input_loudness, input_loudness_format = input_audio_cache[proc_fmt]

        # Find all output files for this format
        output_files = [
@@ -218,7 +260,14 @@ def main(args):

        for outfile in output_files:
            tasks.append(
                (outfile, infile, format, input_loudness, input_loudness_format)
                (
                    outfile,
                    infile,
                    proc_fmt,
                    input_loudness,
                    input_loudness_format,
                    pattern,
                )
            )

    print(
+208 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
import argparse
import sys
from pathlib import Path

PROCESSING_SCRIPTS_PATH = (
    Path(__file__).parent.parent.parent.joinpath("ivas-processing-scripts").absolute()
)
sys.path.append(str(PROCESSING_SCRIPTS_PATH))

from ivas_processing_scripts.audiotools.audio import fromfile
from ivas_processing_scripts.audiotools.audiofile import write
from ivas_processing_scripts.audiotools.wrappers.bs1770 import get_loudness

SUPPORTED_FORMATS = [
    "BINAURAL",
    "MONO",
    "STEREO",
    "5_1",
    "5_1_2",
    "5_1_4",
    "7_1",
    "7_1_4",
    "MASA1DIR1",
    "MASA1DIR2",
    "MASA2DIR1",
    "MASA2DIR2",
    "ISM1",
    "ISM2",
    "ISM3",
    "ISM4",
    "FOA",
    "HOA2",
    "HOA3",
    "SBA1",
    "SBA2",
    "SBA3",
    "ISM1MASA1DIR1",
    "ISM1MASA2DIR1",
    "ISM2MASA1DIR1",
    "ISM2MASA2DIR1",
    "ISM3MASA1DIR1",
    "ISM3MASA2DIR1",
    "ISM4MASA1DIR1",
    "ISM4MASA2DIR1",
    "ISM1MASA1DIR2",
    "ISM1MASA2DIR2",
    "ISM2MASA1DIR2",
    "ISM2MASA2DIR2",
    "ISM3MASA1DIR2",
    "ISM3MASA2DIR2",
    "ISM4MASA1DIR2",
    "ISM4MASA2DIR2",
    "ISM1SBA1",
    "ISM1SBA2",
    "ISM1SBA3",
    "ISM2SBA1",
    "ISM2SBA2",
    "ISM2SBA3",
    "ISM3SBA1",
    "ISM3SBA2",
    "ISM3SBA3",
    "ISM4SBA1",
    "ISM4SBA2",
    "ISM4SBA3",
]


def find_metadata_files(format: str, file: Path):
    metadata = []

    # runIvasCodec metadata outputs are .wav.met and .wav.{0,1,2,3}.csv
    # check also for .met and .{0,1,2,3}.csv
    if "ISM" in format:
        metadata.extend(
            [
                f
                for i in range(4)
                for f in [
                    file.with_suffix(f".{i}.csv"),
                    file.parent / f"{file.name}.{i}.csv",
                ]
                if f.exists()
            ]
        )

    if "MASA" in format:
        metadata.extend(
            [
                f
                for f in [file.with_suffix(".met"), file.with_suffix(".wav.met")]
                if f.exists()
            ]
        )

    return metadata


def user_confirmation(message: str) -> bool:
    try:
        response = input(f"{message} (y/N): ").strip().lower()
        return response == "y"
    except (KeyboardInterrupt, EOFError):
        print()
        return False


def main(args: argparse.Namespace) -> int:
    print("===")

    # search for metadata
    if not args.metadata and ("MASA" in args.format or "ISM" in args.format):
        print("No metadata files specified! Searching for existing files...")
        args.metadata = find_metadata_files(args.format, args.file)

        if args.metadata:
            print(
                f"Detected the following metadata files: {[str(f) for f in args.metadata]}"
            )
        print("---")
    elif args.metadata:
        print(f"Using the following metadata files: {[str(f) for f in args.metadata]}")
        print("---")

    # initialise input audio
    in_audio = fromfile(
        args.format, args.file, fs=args.sample_rate, in_meta=args.metadata
    )

    # measure loudness
    print(f"Measuring loudness of input file {args.file} with bs1770demo")
    loudness, scale_factor, loudness_fmt = get_loudness(in_audio, args.target_loudness)
    print(
        f"Input format {args.format}, rendered to {loudness_fmt} and resampled to 48 kHz"
    )

    # print results
    print(f"Loudness: {loudness:.2f} LKFS")
    print(f"Scale factor to achieve {args.target_loudness} LKFS: {scale_factor:.4f}")

    # write output file if specified
    if args.replace and args.outfile:
        print(
            "\nWARNING: Both inplace modification and output file specified, only writing to output!\n"
        )
    elif args.replace:
        if not user_confirmation(f"\nWARNING! Replace input file {args.file}?"):
            print("Operation cancelled.")
            print("===")
            return 0
        args.outfile = args.file

    if args.outfile:
        print(f"Writing scaled output file {args.outfile}")
        write(args.outfile, in_audio.audio * scale_factor, in_audio.fs)

    print("===")
    return 0


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Measure and normalize audio loudness using bs1770demo"
    )
    parser.add_argument("file", type=Path, help="Input file path")
    parser.add_argument(
        "format", type=str, choices=SUPPORTED_FORMATS, help="Input audio format"
    )
    parser.add_argument(
        "-fs",
        "--sample_rate",
        type=int,
        choices=[8000, 16000, 32000, 48000],
        default=None,
        help="Input audio sampling rate (required only for PCM inputs)",
    )
    parser.add_argument(
        "-m",
        "--metadata",
        type=Path,
        default=[],
        nargs="+",
        help="Optional input metadata (FILE.met and FILE.{0,1,2,3}.csv will be automatically detected)",
    )
    parser.add_argument(
        "-t",
        "--target_loudness",
        type=float,
        default=-26.0,
        help="Target level in LKFS (default = %(default)s)",
    )
    parser.add_argument(
        "-o",
        "--outfile",
        type=Path,
        default=None,
        help="Optional path to scaled output file",
    )
    parser.add_argument(
        "-r",
        "--replace",
        help="Flag to scale file inplace, will OVERWRITE input file!",
        action="store_true",
    )

    args = parser.parse_args()

    sys.exit(main(args))
+7 −3
Original line number Diff line number Diff line
@@ -7,12 +7,16 @@ import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

sys.path.append("./ivas-processing-scripts")
from ivas_processing_scripts.utils import progressbar_update

PROCESSING_SCRIPTS_PATH = (
    Path(__file__).parent.parent.parent.joinpath("ivas-processing-scripts").absolute()
)
if os.environ.get("CI") or not sys.stdout.isatty():
    PROCESSING_SCRIPTS_PATH = Path("./ivas-processing-scripts")
    sys.stdout.reconfigure(line_buffering=True)

sys.path.append(str(PROCESSING_SCRIPTS_PATH))
from ivas_processing_scripts.utils import progressbar_update

VALID_BITRATES_IVAS = [
    13.2,
    16.4,