Loading scripts/ivas_conformance/runConformance.py 0 → 100644 +558 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 """ (C) 2022-2025 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB, Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other contributors to this repository. All Rights Reserved. This software is protected by copyright law and by international treaties. The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB, Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other contributors to this repository retain full ownership rights in their respective contributions in the software. This notice grants no license of any kind, including but not limited to patent license, nor is any license granted by implication, estoppel or otherwise. Contributors are required to enter into the IVAS codec Public Collaboration agreement before making contributions. This software is provided "AS IS", without any express or implied warranties. The software is in the development stage. It is intended exclusively for experts who have experience with such software and solely for the purpose of inspection. All implied warranties of non-infringement, merchantability and fitness for a particular purpose are hereby disclaimed and excluded. Any dispute, controversy or claim arising under or in relation to providing this software shall be submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and the United Nations Convention on Contracts on the International Sales of Goods. """ import argparse import os import platform import re import numpy as np import subprocess import tempfile import sys from typing import Optional from multiprocessing import Process, Value sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) from pyaudio3dtools.audiofile import readfile, writefile from pyaudio3dtools.audioarray import resample class MLDConformance: IVAS_Bins = { "ENC": "IVAS_cod", "DEC": "IVAS_dec", "REND": "IVAS_rend", "ISAR": "ISAR_post_rend", } def __init__(self, args) -> None: self.RefBins = dict() self.CutBins = dict() self.Commands = dict() self.multiprocessing = not args.no_multi_processing self.dryrun = args.dryrun self.verbose = args.verbose self.executedTests = Value("i", 0) self.failedTests = Value("i", 0) self.testvecDir = args.testvecDir self.ref_build_path = args.ref_build_path self.cut_build_path = args.cut_build_path self.filter = args.filter self.EncoderToDecoderCmdMap = dict() self.scriptsDir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) self.outputDir = os.path.join(self.scriptsDir, "CUT_OUTPUTS") self.toolsdir = os.path.join(self.scriptsDir, "tools") self.testvDir = os.path.join(self.testvecDir, "testv") self.refDir = self.testvDir self.mldbin = os.path.join(self.toolsdir, platform.system(), "mld") self.logFile = os.path.join(self.outputDir, "runlog.txt") self.failedCmdsFile = os.path.join(self.outputDir, "failedCmds.txt") open(self.logFile, "w").close() open(self.failedCmdsFile, "w").close() self.mldcsv = dict() self.sampleStats = dict() for tag in MLDConformance.IVAS_Bins.keys(): self.mldcsv[tag] = os.path.join(self.outputDir, f"mld_{tag}.csv") self.sampleStats[tag] = os.path.join(self.outputDir, f"sampleStats_{tag}.csv") self.setup() def createDirs(self): os.makedirs(self.outputDir, exist_ok=True) subdirs = ["enc", "dec", "renderer", "split_rendering"] for odir in subdirs: os.makedirs(os.path.join(self.outputDir, "ref", odir), exist_ok=True) os.makedirs(os.path.join(self.outputDir, "dut", odir), exist_ok=True) def setup(self): self.createDirs() for tag in MLDConformance.IVAS_Bins.keys(): self.RefBins[tag] = os.path.join( self.ref_build_path, MLDConformance.IVAS_Bins[tag] ) self.CutBins[tag] = os.path.join( self.cut_build_path, MLDConformance.IVAS_Bins[tag] ) self.Commands[tag] = list() def accumulateCommands(self): for root, _, files in os.walk(self.testvecDir): for file_name in files: basename, ext = os.path.splitext(file_name) if ( ("Readme_IVAS_" in basename) and ext == ".txt" and not ("ISAR" in basename) ): print(f"Accumulating commands from {file_name}") file = os.path.join(root, file_name) self.parseCommandsFile(file) self.mapEncoderToDecoderCommands() print("No of tests :") for key in self.Commands.keys(): print(f" {key} : {len(self.Commands[key])}") def parseCommandsFile(self, filePath): with open(filePath) as fp: for line in fp.readlines(): m = re.search(r"^\$(CUT_.+_BIN) ", line) if m: tag = m.group(1).split("_")[1] if tag in self.Commands.keys(): self.Commands[tag].append(line) def getPcmPytestTag(self, command: str) -> str: decInput = ( os.path.basename(command.split()[-2]) .replace(".fer", "") .replace("_cut", "") ) return decInput.split(".")[-2] def getEncPytestTag(self, command: str) -> str: return os.path.basename(command.split()[-1]).split(".")[-2] def mapEncoderToDecoderCommands(self): decoderPyTestTags = dict() encoderPyTestTags = dict() for idx, command in enumerate(self.Commands["DEC"]): decoderPyTestTags[self.getPcmPytestTag(command)] = idx for idx, command in enumerate(self.Commands["ENC"]): encoderPyTestTags[self.getEncPytestTag(command)] = idx for encTag in encoderPyTestTags.keys(): if encTag in decoderPyTestTags.keys(): self.EncoderToDecoderCmdMap[encoderPyTestTags[encTag]] = ( decoderPyTestTags[encTag] ) if self.verbose: print( f"{encTag} {encoderPyTestTags[encTag]} -> {decoderPyTestTags[encTag]}" ) print(f"{self.Commands['ENC'][encoderPyTestTags[encTag]]}") print(f"{self.Commands['DEC'][decoderPyTestTags[encTag]]}") else: print(f"{encTag} not fount in decoder") print( f"Mapped decoder tests for {len(self.EncoderToDecoderCmdMap)} encoder tests out of {len(self.Commands['ENC'])} tests" ) assert len(self.EncoderToDecoderCmdMap) == len( self.Commands["ENC"] ), "Failed to Map Encoder Commands to Decoder Commands" def runOneEncoderTest(self, command: str): encPytestTag = self.getEncPytestTag(command) refCommand = self.reformatCommand(command=command, ref=True) refEncOutput = self.getOutputFile(refCommand) refCommand = self.setCommandExec(tag="ENC", command=refCommand, ref=True) self.process(command=refCommand) # Run reference Encoder encCommandIdx = self.Commands["ENC"].index(command) command = self.reformatCommand(command=command, ref=False) command = self.setCommandExec(tag="ENC", command=command, ref=False) dutEncOutput = self.getOutputFile(command) self.process(command=command) assert ".192" in dutEncOutput, "Output file not identified" # Decode the encoded output with Reference decoder dutDecOutputFile = dutEncOutput.replace(".192", "_CUT_REFDECODED.wav") refDecOutputFile = dutEncOutput.replace(".192", "_REF_REFDECODED.wav") decCommandIdx = self.EncoderToDecoderCmdMap[encCommandIdx] command = self.reformatCommand( command=self.Commands["DEC"][decCommandIdx], ref=False ) command = command.replace("-VOIP", "") dutDecCmd = ( [self.RefBins["DEC"]] + command.split()[1:-2] + [dutEncOutput, dutDecOutputFile] ) refDecCmd = ( [self.RefBins["DEC"]] + command.split()[1:-2] + [refEncOutput, refDecOutputFile] ) self.process(command=" ".join(dutDecCmd)) self.process(command=" ".join(refDecCmd)) self.mld("ENC", encPytestTag, refFile=refDecOutputFile, dutFile=dutDecOutputFile) def runOneDecoderTest(self, tag: str, command: str): dutPytestTag = self.getPcmPytestTag(command) refInputFile = command.split()[-2].replace( "$REF_PATH/ref", f"{self.testvDir}/ref" ) refInputFile = refInputFile.replace("_cut.192.fer", ".192") # refInputFile = refInputFile.replace(".fer.192", ".192").replace(".192.fer", ".192").replace("_cut.192.fer", ".192").replace("_cut.192", ".192") refDecOutput = self.getOutputFile(command).replace( "$CUT_PATH/dut", f"{self.testvDir}/ref" ) command = self.reformatCommand(command=command, ref=False) # command = command.replace("-VOIP", "") dutDecOutputFile = self.getOutputFile(command) dutDecCmd = ( [self.CutBins["DEC"]] + command.split()[1:-2] + [refInputFile, dutDecOutputFile] ) self.process(command=" ".join(dutDecCmd)) self.mld("DEC", dutPytestTag, refFile=refDecOutput, dutFile=dutDecOutputFile) def getRendOutputFile(self, command: str): cmds = command.split() for idx, cmd in enumerate(cmds): if cmd == "-o" and (idx + 1) < len(cmds): return cmds[idx + 1] assert False, "Outputname not found" def runOneRendererTest(self, tag: str, command: str): refRendOutputFile = self.getRendOutputFile(command).replace( "$CUT_PATH/renderer", f"{self.testvDir}/renderer" ) rendPytestTag = os.path.basename(refRendOutputFile).split(".")[-2] command = self.reformatCommand(command=command, ref=False) dutRendCmd = " ".join([self.CutBins["REND"]] + command.split()[1:]) dutRendOutputFile = self.getRendOutputFile(dutRendCmd) self.process(command=dutRendCmd) self.mld("REND", rendPytestTag, refFile=refRendOutputFile, dutFile=dutRendOutputFile) def getOutputFile(self, command: str): return command.split()[-1] def setCommandExec(self, tag: str, command, ref: bool): exec = self.RefBins[tag] if ref else self.CutBins[tag] commands = command.split() return " ".join([exec, *commands[1:]]) def reformatCommand(self, command: str, ref: bool) -> str: command = command.replace("$TESTV_PATH", self.testvecDir) command = command.replace( "$REF_PATH/split_rendering", f"{self.testvecDir}/testv/split_rendering" ) ################ HACKS ######################### command = command.replace("_cut.192.fer", ".192") command = command.replace("_cut.192", ".192") command = command.replace(".fer.192", ".192") command = command.replace(".192.fer", ".192") ################################################## command = command.replace( "$REF_PATH/ref/param_file/enc/", f"{self.outputDir}/ref/enc/" ) command = command.replace( "$REF_PATH/ref/param_file/dec/", f"{self.outputDir}/ref/dec/" ) command = command.replace( "$REF_PATH/ref/sba_bs/pkt/", f"{self.outputDir}/ref/enc/" ) if ref: command = command.replace( "$CUT_PATH/dut/param_file/enc/", f"{self.outputDir}/ref/enc/" ) command = command.replace( "$CUT_PATH/dut/param_file/dec/", f"{self.outputDir}/ref/dec/" ) command = command.replace( "$CUT_PATH/renderer/cut/", f"{self.outputDir}/ref/renderer/" ) command = command.replace( "$CUT_PATH/split_rendering/cut/", f"{self.outputDir}/ref/split_rendering/", ) command = command.replace( "$CUT_PATH/dut/sba_bs/pkt/", f"{self.outputDir}/ref/enc/" ) command = command.replace( "$CUT_PATH/dut/sba_bs/raw/", f"{self.outputDir}/ref/dec/" ) else: command = command.replace( "$CUT_PATH/dut/param_file/enc/", f"{self.outputDir}/dut/enc/" ) command = command.replace( "$CUT_PATH/dut/param_file/dec/", f"{self.outputDir}/dut/dec/" ) command = command.replace( "$CUT_PATH/renderer/cut/", f"{self.outputDir}/dut/renderer/" ) command = command.replace( "$CUT_PATH/split_rendering/cut/", f"{self.outputDir}/dut/split_rendering/", ) command = command.replace( "$CUT_PATH/dut/sba_bs/pkt/", f"{self.outputDir}/dut/enc/" ) command = command.replace( "$CUT_PATH/dut/sba_bs/raw/", f"{self.outputDir}/dut/dec/" ) return command def runOneCommand(self, tag: str, command: str, ref: bool): if tag == "ENC": self.runOneEncoderTest(command) elif tag == "DEC": self.runOneDecoderTest(tag, command) elif tag == "REND": self.runOneRendererTest(tag, command) else: assert False, f"Un-implemented Tag {tag}" self.executedTests.value += 1 self.stats() def runTag(self, tag: str, ref: bool = False): self.executedTests.value = 0 self.failedTests.value = 0 # reset MLD, Sample Stats open(self.mldcsv[tag], "w").close() with open(self.sampleStats[tag], "w") as f: f.write(f"PYTESTTAG, MAXDIFF, RMSdB, BEFRAMES_PERCENT, MAX_MLD\n") processes = list() # Multiprocess list commands = list() if self.filter: for command in self.Commands[tag]: if self.filter in command: commands.append(command) else: commands = self.Commands[tag] self.totalTests = len(commands) print( f"Executing tests for {tag} {'Filter='+self.filter if self.filter else ''} ({self.totalTests} tests)" ) if self.multiprocessing: for command in commands: p = Process( target=self.runOneCommand, args=(tag, command, ref), ) processes.append(p) p.start() for p in processes: p.join() else: for command in commands: self.runOneCommand(tag, command, ref) def process(self, command) -> int: if self.verbose: print(command) with open(self.logFile, "a") as fd: fd.write(command + "\n") with open(self.logFile, "a") as fd: if not self.dryrun: c = subprocess.run( command, stdout=fd, stderr=subprocess.STDOUT, text=True, shell=True ) if c.returncode: with open(self.failedCmdsFile, "a") as f: f.write(command + "\n") self.failedTests.value += 1 # c.check_returncode() return 0 def stats(self): print( f"Executed: {self.executedTests.value} / {self.totalTests} Failed: {self.failedTests.value}", end="\r", ) def getSampleStats(self, refSamples: np.ndarray, dutSamples: np.ndarray): nSamples = min(refSamples.shape[0], dutSamples.shape[0]) diff = (refSamples[:nSamples] / 32768.0) - (dutSamples[:nSamples] / 32768.0) beSamplesPercent = ( 100.0 * np.sum(diff == 0) / refSamples.shape[0] / refSamples.shape[1] ) maxDiff = np.abs(diff).max() rmsdB = int(10.0 * np.log10(np.average(diff**2) + np.finfo(np.float64).eps) * 10) / 10.0 return (maxDiff, rmsdB, beSamplesPercent) def mld(self, tag, pytestTag, refFile, dutFile): mldThisFile = np.zeros(0) with tempfile.TemporaryDirectory() as tmpdir: refSamples, fsR = readfile(refFile, outdtype="float") dutSamples, fsD = readfile(dutFile, outdtype="float") assert ( refSamples.shape[1] == dutSamples.shape[1] ), "No of channels mismatch if ref vs cut" maxDiff, rmsdB, beSamplesPercent = self.getSampleStats(refSamples, dutSamples) nChans = refSamples.shape[1] if fsR != 48000: refSamples = np.clip(resample(refSamples, fsR, 48000), -32768, 32767) if fsD != 48000: dutSamples = np.clip(resample(dutSamples, fsD, 48000), -32768, 32767) for ch in range(nChans): mldFile = os.path.join( tmpdir, f"{tempfile.gettempprefix()}_ch{ch}_MLD.csv" ) refFileMono = os.path.join( tmpdir, os.path.basename(refFile).replace(".wav", f"_ch{ch}.wav") ) dutFileMono = os.path.join( tmpdir, os.path.basename(dutFile).replace(".wav", f"_ch{ch}.wav") ) writefile(refFileMono, refSamples[:, ch], 48000) writefile(dutFileMono, dutSamples[:, ch], 48000) command = [ self.mldbin, "-f", "20", "-o", mldFile, "-s", refFileMono, dutFileMono, ] self.process(" ".join(command)) mldThisChan = np.loadtxt(mldFile, delimiter=" ", dtype=float) if ch == 0: mldThisFile = mldThisChan else: mldThisFile = np.maximum(mldThisFile, mldThisChan) if mldThisFile.size > 0: with open(self.mldcsv[tag], "ab") as f: np.savetxt(f, mldThisFile, delimiter=",") with open(self.sampleStats[tag], "a") as f: f.write(f"{pytestTag}, {maxDiff}, {rmsdB}, {beSamplesPercent}, {mldThisFile.max()}\n") def doAnalysis(self, selectTag = "all"): keys = MLDConformance.IVAS_Bins.keys() if selectTag == "all" else [ selectTag ] for tag in keys: if os.path.exists(self.mldcsv[tag]): mdlValues = np.loadtxt(self.mldcsv[tag], delimiter=" ", dtype=float) N = mdlValues.shape[0] if N == 0: continue m0 = np.sum(mdlValues == 0) m1 = np.sum(mdlValues <= 1.0) m2 = np.sum(mdlValues <= 2.0) m5 = np.sum(mdlValues <= 5.0) PCNT = lambda num: int(1000 * num / N) / 10.0 print(f"\n##########################################################") print(f"<{tag}> Total Frames: {N}") print(f"<{tag}> MAX MLD across all frames : {mdlValues.max()}") print(f"<{tag}> Frames with MLD == 0 : {m0} frames ({PCNT(m0)}%)") print(f"<{tag}> Frames with MLD <= 1 : {m1} frames ({PCNT(m1)}%)") print(f"<{tag}> Frames with MLD <= 2 : {m2} frames ({PCNT(m2)}%)") print(f"<{tag}> Frames with MLD <= 5 : {m5} frames ({PCNT(m5)}%)") print("##########################################################\n") if __name__ == "__main__": parser = argparse.ArgumentParser( description="Compare .wav files in two folders using mld per frame" ) parser.add_argument( "--testvecDir", type=str, required=True, help="Base folder containing a set of conformance scripts named Readme_IVAS_xxxx.txt", ) parser.add_argument( "--ref_build_path", type=str, required=True, help="Path to the reference build folder containing IVAS Encoder, Decoder, Renderer and Post Render binaries", ) parser.add_argument( "--cut_build_path", type=str, required=True, help="Path to the CUT build folder containing IVAS Encoder, Decoder, Renderer and Post Render binaries", ) parser.add_argument( "--verbose", default=False, action="store_true", help="Enable verbose printing", ) parser.add_argument( "--dryrun", default=False, action="store_true", help="Do not run any processing, just dump command lines to runlog", ) parser.add_argument( "--filter", type=str, default=None, help="Filter test based on text provided", ) parser.add_argument( "--test-mode", type=str, default="ALL", help='Choose tests to run ["ENC", "DEC", "REND", "ISAR", "ALL"]', ) parser.add_argument( "--no-multi-processing", default=False, action="store_true", help="Disable multi-processing for sequential test run (debugging)", ) parser.add_argument( "--analyse-only", default=False, action="store_true", help="Do not run DUT, use existing mld and bitdiff stats files to generate analysis only", ) args = parser.parse_args() conformance = MLDConformance(args) conformance.accumulateCommands() # import sys # sys.exit(0) testTags = MLDConformance.IVAS_Bins.keys() if args.test_mode == "ALL" else [args.test_mode] for tag in testTags: if tag == "ISAR": # Not implemented yet continue if not args.analyse_only: conformance.runTag(tag, ref=True) conformance.doAnalysis(selectTag=tag) Loading
scripts/ivas_conformance/runConformance.py 0 → 100644 +558 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 """ (C) 2022-2025 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB, Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other contributors to this repository. All Rights Reserved. This software is protected by copyright law and by international treaties. The IVAS codec Public Collaboration consisting of Dolby International AB, Ericsson AB, Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, Panasonic Holdings Corporation, Qualcomm Technologies, Inc., VoiceAge Corporation, and other contributors to this repository retain full ownership rights in their respective contributions in the software. This notice grants no license of any kind, including but not limited to patent license, nor is any license granted by implication, estoppel or otherwise. Contributors are required to enter into the IVAS codec Public Collaboration agreement before making contributions. This software is provided "AS IS", without any express or implied warranties. The software is in the development stage. It is intended exclusively for experts who have experience with such software and solely for the purpose of inspection. All implied warranties of non-infringement, merchantability and fitness for a particular purpose are hereby disclaimed and excluded. Any dispute, controversy or claim arising under or in relation to providing this software shall be submitted to and settled by the final, binding jurisdiction of the courts of Munich, Germany in accordance with the laws of the Federal Republic of Germany excluding its conflict of law rules and the United Nations Convention on Contracts on the International Sales of Goods. """ import argparse import os import platform import re import numpy as np import subprocess import tempfile import sys from typing import Optional from multiprocessing import Process, Value sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) from pyaudio3dtools.audiofile import readfile, writefile from pyaudio3dtools.audioarray import resample class MLDConformance: IVAS_Bins = { "ENC": "IVAS_cod", "DEC": "IVAS_dec", "REND": "IVAS_rend", "ISAR": "ISAR_post_rend", } def __init__(self, args) -> None: self.RefBins = dict() self.CutBins = dict() self.Commands = dict() self.multiprocessing = not args.no_multi_processing self.dryrun = args.dryrun self.verbose = args.verbose self.executedTests = Value("i", 0) self.failedTests = Value("i", 0) self.testvecDir = args.testvecDir self.ref_build_path = args.ref_build_path self.cut_build_path = args.cut_build_path self.filter = args.filter self.EncoderToDecoderCmdMap = dict() self.scriptsDir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) self.outputDir = os.path.join(self.scriptsDir, "CUT_OUTPUTS") self.toolsdir = os.path.join(self.scriptsDir, "tools") self.testvDir = os.path.join(self.testvecDir, "testv") self.refDir = self.testvDir self.mldbin = os.path.join(self.toolsdir, platform.system(), "mld") self.logFile = os.path.join(self.outputDir, "runlog.txt") self.failedCmdsFile = os.path.join(self.outputDir, "failedCmds.txt") open(self.logFile, "w").close() open(self.failedCmdsFile, "w").close() self.mldcsv = dict() self.sampleStats = dict() for tag in MLDConformance.IVAS_Bins.keys(): self.mldcsv[tag] = os.path.join(self.outputDir, f"mld_{tag}.csv") self.sampleStats[tag] = os.path.join(self.outputDir, f"sampleStats_{tag}.csv") self.setup() def createDirs(self): os.makedirs(self.outputDir, exist_ok=True) subdirs = ["enc", "dec", "renderer", "split_rendering"] for odir in subdirs: os.makedirs(os.path.join(self.outputDir, "ref", odir), exist_ok=True) os.makedirs(os.path.join(self.outputDir, "dut", odir), exist_ok=True) def setup(self): self.createDirs() for tag in MLDConformance.IVAS_Bins.keys(): self.RefBins[tag] = os.path.join( self.ref_build_path, MLDConformance.IVAS_Bins[tag] ) self.CutBins[tag] = os.path.join( self.cut_build_path, MLDConformance.IVAS_Bins[tag] ) self.Commands[tag] = list() def accumulateCommands(self): for root, _, files in os.walk(self.testvecDir): for file_name in files: basename, ext = os.path.splitext(file_name) if ( ("Readme_IVAS_" in basename) and ext == ".txt" and not ("ISAR" in basename) ): print(f"Accumulating commands from {file_name}") file = os.path.join(root, file_name) self.parseCommandsFile(file) self.mapEncoderToDecoderCommands() print("No of tests :") for key in self.Commands.keys(): print(f" {key} : {len(self.Commands[key])}") def parseCommandsFile(self, filePath): with open(filePath) as fp: for line in fp.readlines(): m = re.search(r"^\$(CUT_.+_BIN) ", line) if m: tag = m.group(1).split("_")[1] if tag in self.Commands.keys(): self.Commands[tag].append(line) def getPcmPytestTag(self, command: str) -> str: decInput = ( os.path.basename(command.split()[-2]) .replace(".fer", "") .replace("_cut", "") ) return decInput.split(".")[-2] def getEncPytestTag(self, command: str) -> str: return os.path.basename(command.split()[-1]).split(".")[-2] def mapEncoderToDecoderCommands(self): decoderPyTestTags = dict() encoderPyTestTags = dict() for idx, command in enumerate(self.Commands["DEC"]): decoderPyTestTags[self.getPcmPytestTag(command)] = idx for idx, command in enumerate(self.Commands["ENC"]): encoderPyTestTags[self.getEncPytestTag(command)] = idx for encTag in encoderPyTestTags.keys(): if encTag in decoderPyTestTags.keys(): self.EncoderToDecoderCmdMap[encoderPyTestTags[encTag]] = ( decoderPyTestTags[encTag] ) if self.verbose: print( f"{encTag} {encoderPyTestTags[encTag]} -> {decoderPyTestTags[encTag]}" ) print(f"{self.Commands['ENC'][encoderPyTestTags[encTag]]}") print(f"{self.Commands['DEC'][decoderPyTestTags[encTag]]}") else: print(f"{encTag} not fount in decoder") print( f"Mapped decoder tests for {len(self.EncoderToDecoderCmdMap)} encoder tests out of {len(self.Commands['ENC'])} tests" ) assert len(self.EncoderToDecoderCmdMap) == len( self.Commands["ENC"] ), "Failed to Map Encoder Commands to Decoder Commands" def runOneEncoderTest(self, command: str): encPytestTag = self.getEncPytestTag(command) refCommand = self.reformatCommand(command=command, ref=True) refEncOutput = self.getOutputFile(refCommand) refCommand = self.setCommandExec(tag="ENC", command=refCommand, ref=True) self.process(command=refCommand) # Run reference Encoder encCommandIdx = self.Commands["ENC"].index(command) command = self.reformatCommand(command=command, ref=False) command = self.setCommandExec(tag="ENC", command=command, ref=False) dutEncOutput = self.getOutputFile(command) self.process(command=command) assert ".192" in dutEncOutput, "Output file not identified" # Decode the encoded output with Reference decoder dutDecOutputFile = dutEncOutput.replace(".192", "_CUT_REFDECODED.wav") refDecOutputFile = dutEncOutput.replace(".192", "_REF_REFDECODED.wav") decCommandIdx = self.EncoderToDecoderCmdMap[encCommandIdx] command = self.reformatCommand( command=self.Commands["DEC"][decCommandIdx], ref=False ) command = command.replace("-VOIP", "") dutDecCmd = ( [self.RefBins["DEC"]] + command.split()[1:-2] + [dutEncOutput, dutDecOutputFile] ) refDecCmd = ( [self.RefBins["DEC"]] + command.split()[1:-2] + [refEncOutput, refDecOutputFile] ) self.process(command=" ".join(dutDecCmd)) self.process(command=" ".join(refDecCmd)) self.mld("ENC", encPytestTag, refFile=refDecOutputFile, dutFile=dutDecOutputFile) def runOneDecoderTest(self, tag: str, command: str): dutPytestTag = self.getPcmPytestTag(command) refInputFile = command.split()[-2].replace( "$REF_PATH/ref", f"{self.testvDir}/ref" ) refInputFile = refInputFile.replace("_cut.192.fer", ".192") # refInputFile = refInputFile.replace(".fer.192", ".192").replace(".192.fer", ".192").replace("_cut.192.fer", ".192").replace("_cut.192", ".192") refDecOutput = self.getOutputFile(command).replace( "$CUT_PATH/dut", f"{self.testvDir}/ref" ) command = self.reformatCommand(command=command, ref=False) # command = command.replace("-VOIP", "") dutDecOutputFile = self.getOutputFile(command) dutDecCmd = ( [self.CutBins["DEC"]] + command.split()[1:-2] + [refInputFile, dutDecOutputFile] ) self.process(command=" ".join(dutDecCmd)) self.mld("DEC", dutPytestTag, refFile=refDecOutput, dutFile=dutDecOutputFile) def getRendOutputFile(self, command: str): cmds = command.split() for idx, cmd in enumerate(cmds): if cmd == "-o" and (idx + 1) < len(cmds): return cmds[idx + 1] assert False, "Outputname not found" def runOneRendererTest(self, tag: str, command: str): refRendOutputFile = self.getRendOutputFile(command).replace( "$CUT_PATH/renderer", f"{self.testvDir}/renderer" ) rendPytestTag = os.path.basename(refRendOutputFile).split(".")[-2] command = self.reformatCommand(command=command, ref=False) dutRendCmd = " ".join([self.CutBins["REND"]] + command.split()[1:]) dutRendOutputFile = self.getRendOutputFile(dutRendCmd) self.process(command=dutRendCmd) self.mld("REND", rendPytestTag, refFile=refRendOutputFile, dutFile=dutRendOutputFile) def getOutputFile(self, command: str): return command.split()[-1] def setCommandExec(self, tag: str, command, ref: bool): exec = self.RefBins[tag] if ref else self.CutBins[tag] commands = command.split() return " ".join([exec, *commands[1:]]) def reformatCommand(self, command: str, ref: bool) -> str: command = command.replace("$TESTV_PATH", self.testvecDir) command = command.replace( "$REF_PATH/split_rendering", f"{self.testvecDir}/testv/split_rendering" ) ################ HACKS ######################### command = command.replace("_cut.192.fer", ".192") command = command.replace("_cut.192", ".192") command = command.replace(".fer.192", ".192") command = command.replace(".192.fer", ".192") ################################################## command = command.replace( "$REF_PATH/ref/param_file/enc/", f"{self.outputDir}/ref/enc/" ) command = command.replace( "$REF_PATH/ref/param_file/dec/", f"{self.outputDir}/ref/dec/" ) command = command.replace( "$REF_PATH/ref/sba_bs/pkt/", f"{self.outputDir}/ref/enc/" ) if ref: command = command.replace( "$CUT_PATH/dut/param_file/enc/", f"{self.outputDir}/ref/enc/" ) command = command.replace( "$CUT_PATH/dut/param_file/dec/", f"{self.outputDir}/ref/dec/" ) command = command.replace( "$CUT_PATH/renderer/cut/", f"{self.outputDir}/ref/renderer/" ) command = command.replace( "$CUT_PATH/split_rendering/cut/", f"{self.outputDir}/ref/split_rendering/", ) command = command.replace( "$CUT_PATH/dut/sba_bs/pkt/", f"{self.outputDir}/ref/enc/" ) command = command.replace( "$CUT_PATH/dut/sba_bs/raw/", f"{self.outputDir}/ref/dec/" ) else: command = command.replace( "$CUT_PATH/dut/param_file/enc/", f"{self.outputDir}/dut/enc/" ) command = command.replace( "$CUT_PATH/dut/param_file/dec/", f"{self.outputDir}/dut/dec/" ) command = command.replace( "$CUT_PATH/renderer/cut/", f"{self.outputDir}/dut/renderer/" ) command = command.replace( "$CUT_PATH/split_rendering/cut/", f"{self.outputDir}/dut/split_rendering/", ) command = command.replace( "$CUT_PATH/dut/sba_bs/pkt/", f"{self.outputDir}/dut/enc/" ) command = command.replace( "$CUT_PATH/dut/sba_bs/raw/", f"{self.outputDir}/dut/dec/" ) return command def runOneCommand(self, tag: str, command: str, ref: bool): if tag == "ENC": self.runOneEncoderTest(command) elif tag == "DEC": self.runOneDecoderTest(tag, command) elif tag == "REND": self.runOneRendererTest(tag, command) else: assert False, f"Un-implemented Tag {tag}" self.executedTests.value += 1 self.stats() def runTag(self, tag: str, ref: bool = False): self.executedTests.value = 0 self.failedTests.value = 0 # reset MLD, Sample Stats open(self.mldcsv[tag], "w").close() with open(self.sampleStats[tag], "w") as f: f.write(f"PYTESTTAG, MAXDIFF, RMSdB, BEFRAMES_PERCENT, MAX_MLD\n") processes = list() # Multiprocess list commands = list() if self.filter: for command in self.Commands[tag]: if self.filter in command: commands.append(command) else: commands = self.Commands[tag] self.totalTests = len(commands) print( f"Executing tests for {tag} {'Filter='+self.filter if self.filter else ''} ({self.totalTests} tests)" ) if self.multiprocessing: for command in commands: p = Process( target=self.runOneCommand, args=(tag, command, ref), ) processes.append(p) p.start() for p in processes: p.join() else: for command in commands: self.runOneCommand(tag, command, ref) def process(self, command) -> int: if self.verbose: print(command) with open(self.logFile, "a") as fd: fd.write(command + "\n") with open(self.logFile, "a") as fd: if not self.dryrun: c = subprocess.run( command, stdout=fd, stderr=subprocess.STDOUT, text=True, shell=True ) if c.returncode: with open(self.failedCmdsFile, "a") as f: f.write(command + "\n") self.failedTests.value += 1 # c.check_returncode() return 0 def stats(self): print( f"Executed: {self.executedTests.value} / {self.totalTests} Failed: {self.failedTests.value}", end="\r", ) def getSampleStats(self, refSamples: np.ndarray, dutSamples: np.ndarray): nSamples = min(refSamples.shape[0], dutSamples.shape[0]) diff = (refSamples[:nSamples] / 32768.0) - (dutSamples[:nSamples] / 32768.0) beSamplesPercent = ( 100.0 * np.sum(diff == 0) / refSamples.shape[0] / refSamples.shape[1] ) maxDiff = np.abs(diff).max() rmsdB = int(10.0 * np.log10(np.average(diff**2) + np.finfo(np.float64).eps) * 10) / 10.0 return (maxDiff, rmsdB, beSamplesPercent) def mld(self, tag, pytestTag, refFile, dutFile): mldThisFile = np.zeros(0) with tempfile.TemporaryDirectory() as tmpdir: refSamples, fsR = readfile(refFile, outdtype="float") dutSamples, fsD = readfile(dutFile, outdtype="float") assert ( refSamples.shape[1] == dutSamples.shape[1] ), "No of channels mismatch if ref vs cut" maxDiff, rmsdB, beSamplesPercent = self.getSampleStats(refSamples, dutSamples) nChans = refSamples.shape[1] if fsR != 48000: refSamples = np.clip(resample(refSamples, fsR, 48000), -32768, 32767) if fsD != 48000: dutSamples = np.clip(resample(dutSamples, fsD, 48000), -32768, 32767) for ch in range(nChans): mldFile = os.path.join( tmpdir, f"{tempfile.gettempprefix()}_ch{ch}_MLD.csv" ) refFileMono = os.path.join( tmpdir, os.path.basename(refFile).replace(".wav", f"_ch{ch}.wav") ) dutFileMono = os.path.join( tmpdir, os.path.basename(dutFile).replace(".wav", f"_ch{ch}.wav") ) writefile(refFileMono, refSamples[:, ch], 48000) writefile(dutFileMono, dutSamples[:, ch], 48000) command = [ self.mldbin, "-f", "20", "-o", mldFile, "-s", refFileMono, dutFileMono, ] self.process(" ".join(command)) mldThisChan = np.loadtxt(mldFile, delimiter=" ", dtype=float) if ch == 0: mldThisFile = mldThisChan else: mldThisFile = np.maximum(mldThisFile, mldThisChan) if mldThisFile.size > 0: with open(self.mldcsv[tag], "ab") as f: np.savetxt(f, mldThisFile, delimiter=",") with open(self.sampleStats[tag], "a") as f: f.write(f"{pytestTag}, {maxDiff}, {rmsdB}, {beSamplesPercent}, {mldThisFile.max()}\n") def doAnalysis(self, selectTag = "all"): keys = MLDConformance.IVAS_Bins.keys() if selectTag == "all" else [ selectTag ] for tag in keys: if os.path.exists(self.mldcsv[tag]): mdlValues = np.loadtxt(self.mldcsv[tag], delimiter=" ", dtype=float) N = mdlValues.shape[0] if N == 0: continue m0 = np.sum(mdlValues == 0) m1 = np.sum(mdlValues <= 1.0) m2 = np.sum(mdlValues <= 2.0) m5 = np.sum(mdlValues <= 5.0) PCNT = lambda num: int(1000 * num / N) / 10.0 print(f"\n##########################################################") print(f"<{tag}> Total Frames: {N}") print(f"<{tag}> MAX MLD across all frames : {mdlValues.max()}") print(f"<{tag}> Frames with MLD == 0 : {m0} frames ({PCNT(m0)}%)") print(f"<{tag}> Frames with MLD <= 1 : {m1} frames ({PCNT(m1)}%)") print(f"<{tag}> Frames with MLD <= 2 : {m2} frames ({PCNT(m2)}%)") print(f"<{tag}> Frames with MLD <= 5 : {m5} frames ({PCNT(m5)}%)") print("##########################################################\n") if __name__ == "__main__": parser = argparse.ArgumentParser( description="Compare .wav files in two folders using mld per frame" ) parser.add_argument( "--testvecDir", type=str, required=True, help="Base folder containing a set of conformance scripts named Readme_IVAS_xxxx.txt", ) parser.add_argument( "--ref_build_path", type=str, required=True, help="Path to the reference build folder containing IVAS Encoder, Decoder, Renderer and Post Render binaries", ) parser.add_argument( "--cut_build_path", type=str, required=True, help="Path to the CUT build folder containing IVAS Encoder, Decoder, Renderer and Post Render binaries", ) parser.add_argument( "--verbose", default=False, action="store_true", help="Enable verbose printing", ) parser.add_argument( "--dryrun", default=False, action="store_true", help="Do not run any processing, just dump command lines to runlog", ) parser.add_argument( "--filter", type=str, default=None, help="Filter test based on text provided", ) parser.add_argument( "--test-mode", type=str, default="ALL", help='Choose tests to run ["ENC", "DEC", "REND", "ISAR", "ALL"]', ) parser.add_argument( "--no-multi-processing", default=False, action="store_true", help="Disable multi-processing for sequential test run (debugging)", ) parser.add_argument( "--analyse-only", default=False, action="store_true", help="Do not run DUT, use existing mld and bitdiff stats files to generate analysis only", ) args = parser.parse_args() conformance = MLDConformance(args) conformance.accumulateCommands() # import sys # sys.exit(0) testTags = MLDConformance.IVAS_Bins.keys() if args.test_mode == "ALL" else [args.test_mode] for tag in testTags: if tag == "ISAR": # Not implemented yet continue if not args.analyse_only: conformance.runTag(tag, ref=True) conformance.doAnalysis(selectTag=tag)