diff --git a/testing/asn_backwards_compat.py b/testing/asn_backwards_compat.py new file mode 100644 index 0000000000000000000000000000000000000000..a015c073caa23c8e5c27bcaebb1cb7559debb2e8 --- /dev/null +++ b/testing/asn_backwards_compat.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 + +""" +Module to check backwards compatibility between two ASN.1 files. +""" + +import logging +import os +import sys + +import asn1tools + + +# pylint: disable=logging-fstring-interpolation + + +def process_error(errors, message): + """Log `message` and append `message` to `errors`.""" + logging.info(f"Test Failure: {message}") + errors.append(message) + + +def members_as_tag_dict(atype): + """Convert atype['members'] into a dict indexed by members.tag.number.""" + result = {} + for member in atype.get("members", {}): + if not member: + continue + if not "tag" in member: + continue + result[member["tag"]["number"]] = member + return result + + +def compare_files(file1, file2): + """Compare ASN.1 `file1` and `file2` for backwards compatibility + issues introduced in `file2`: + - Module is the same + - Each type in the module: + - Exists + - Has the same member types + - Each complex member type has: + - Same inner type at the tag + - Warns if the type renamed (not fatal) + + Returns a list of error messages (if any). + """ + # pylint: disable=too-many-locals + + errors = [] + + f1_asn = asn1tools.parse_files(file1) + f2_asn = asn1tools.parse_files(file2) + + logging.info(f"Comparing file {file1} with {file2}") + for module_name, f1_module in f1_asn.items(): + logging.info(f"Checking module {module_name}") + f2_module = f2_asn.get(module_name) + if not f2_module: + process_error(errors, f"Module {module_name} not present") + continue + f2_types = f2_module["types"] + for type_name, f1_type_def in f1_module["types"].items(): + logging.info(f"Checking type {type_name}") + f2_type_def = f2_types.get(type_name) + if not f2_type_def: + process_error(errors, f"Type {type_name} not present") + continue + if f1_type_def["type"] != f2_type_def["type"]: + process_error( + errors, + f"Type {type_name} type '{f1_type_def['type']}'" + f" mismatch with '{f2_type_def['type']}'", + ) + continue + f1_tags = members_as_tag_dict(f1_type_def) + f2_tags = members_as_tag_dict(f2_type_def) + if not f1_tags: + if f2_tags: + process_error( + errors, f"Type {type_name} contains unexpected members" + ) + continue + for tag_number, f1_tag in f1_tags.items(): + logging.info( + f"Checking {f1_tag['name']} [{tag_number}] {f1_tag['type']}" + ) + f2_tag = f2_tags.get(tag_number, {}) + if not f2_tag: + process_error( + errors, + f"Type {type_name} tag {tag_number} field '{f1_tag['name']}'" + f" missing", + ) + continue + if f1_tag["type"] != f2_tag["type"]: + process_error( + errors, + f"Type {type_name} tag {tag_number} field '{f1_tag['name']}'" + f" type '{f1_tag['type']}' mismatch with '{f2_tag['type']}'", + ) + continue + if f1_tag["name"] != f2_tag["name"]: + process_error( + errors, + f"Type {type_name} tag {tag_number} field '{f1_tag['name']}'" + f" renamed to '{f2_tag['name']}'", + ) + # this is not fatal - continue + # no other checks for now + + return errors + + +def main(): + """standalone main.""" + loglevel = os.environ.get("LOGLEVEL", "WARNING").upper() + logging.basicConfig(level=loglevel) + + if len(sys.argv) != 3: + raise RuntimeError(f"Usage: {sys.argv[0]} file1.asn file2.asn") + file1 = sys.argv[1] + file2 = sys.argv[2] + results = compare_files(file1, file2) + if results: + print("-----------------------------") + print(f"File 1: {file1}") + print(f"File 2: {file2}") + print("Errors:") + for error in results: + print(f" {error}") + print("-----------------------------") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/testing/asn_lint_exceptions.json b/testing/asn_lint_exceptions.json index 38681a069327fd156244e85ba65a4fd25899d3b2..b2d9c288db24918a9e51bce095a7c2a649729308 100644 --- a/testing/asn_lint_exceptions.json +++ b/testing/asn_lint_exceptions.json @@ -1,30 +1,68 @@ { "33128/r15/TS33128Payloads.asn" : [ + "Tag 16 IRIEvent field 'mDFCellSiteReport' is not present in XIRIEvent", + "Tag 5 XIRIEvent field 'unsuccessfulAMProcedure' differs from IRIEvent field 'unsuccessfulRegistrationProcedure'", + "Tag 10 XIRIEvent field 'unsuccessfulSMProcedure' differs from IRIEvent field 'unsuccessfulSessionProcedure'", "Enumerations for UDMServingSystemMethod start at 0, not 1", "Field 'aNNodeID' in GlobalRANNodeID is an anonymous CHOICE" ], "33128/r16/TS33128Payloads.asn" : [ + "Tag 16 IRIEvent field 'mDFCellSiteReport' is not present in XIRIEvent", + "Tag 5 XIRIEvent field 'unsuccessfulAMProcedure' differs from IRIEvent field 'unsuccessfulRegistrationProcedure'", + "Tag 10 XIRIEvent field 'unsuccessfulSMProcedure' differs from IRIEvent field 'unsuccessfulSessionProcedure'", + "Tag 16 missing in XIRIEvent", "Enumerations for EstablishmentStatus start at 0, not 1", "Enumerations for RequestIndication start at 0, not 1", "Enumerations for UDMServingSystemMethod start at 0, not 1", "Enumerations for MMSDirection start at 0, not 1", "Enumerations for MMSReplyCharging start at 0, not 1", - "Enumerations for MMStatusExtension start at 0, not 1" + "Enumerations for MMStatusExtension start at 0, not 1", + "Tag 2 missing in LALSReport", + "Tag 6 missing in LALSReport" ], "33128/r17/TS33128Payloads.asn" : [ + "Tag 100 XIRIEvent field 'n9HRPDUSessionInfo' is not present in IRIEvent", + "Tag 101 XIRIEvent field 's8HRBearerInfo' is not present in IRIEvent", + "Tag 16 IRIEvent field 'mDFCellSiteReport' is not present in XIRIEvent", + "Tag 5 XIRIEvent field 'unsuccessfulAMProcedure' differs from IRIEvent field 'unsuccessfulRegistrationProcedure'", + "Tag 10 XIRIEvent field 'unsuccessfulSMProcedure' differs from IRIEvent field 'unsuccessfulSessionProcedure'", + "Tag 108 XIRIEvent field 'uDMLocationInformationResult' differs from IRIEvent field 'uDMLocationInformationResultRecord'", + "Tag 16 missing in XIRIEvent", + "Tags 113-131 missing in XIRIEvent", + "Tags 100-101 missing in IRIEvent", + "Tags 113-131 missing in IRIEvent", + "Tag 12 missing in SCEFCommunicationPatternUpdate", "Enumerations for EstablishmentStatus start at 0, not 1", "Enumerations for RequestIndication start at 0, not 1", "Enumerations for UDMServingSystemMethod start at 0, not 1", "Enumerations for MMSDirection start at 0, not 1", "Enumerations for MMSReplyCharging start at 0, not 1", - "Enumerations for MMStatusExtension start at 0, not 1" + "Enumerations for MMStatusExtension start at 0, not 1", + "Tags 4-5 missing in IMSMessage", + "Tag 6 missing in StartOfInterceptionForActiveIMSSession", + "Tag 2 missing in LALSReport", + "Tag 6 missing in LALSReport", + "Tag 8 missing in MMEStartOfInterceptionWithEPSAttachedUE", + "Tag 11 missing in MMEStartOfInterceptionWithEPSAttachedUE" ], "33128/r18/TS33128Payloads.asn" : [ + "Tag 100 XIRIEvent field 'n9HRPDUSessionInfo' is not present in IRIEvent", + "Tag 101 XIRIEvent field 's8HRBearerInfo' is not present in IRIEvent", + "Tag 16 IRIEvent field 'mDFCellSiteReport' is not present in XIRIEvent", + "Tag 16 missing in XIRIEvent", + "Tags 100-101 missing in IRIEvent", + "Tag 12 missing in SCEFCommunicationPatternUpdate", "Enumerations for EstablishmentStatus start at 0, not 1", "Enumerations for RequestIndication start at 0, not 1", "Enumerations for UDMServingSystemMethod start at 0, not 1", "Enumerations for MMSDirection start at 0, not 1", "Enumerations for MMSReplyCharging start at 0, not 1", - "Enumerations for MMStatusExtension start at 0, not 1" + "Enumerations for MMStatusExtension start at 0, not 1", + "Tags 4-5 missing in IMSMessage", + "Tag 6 missing in StartOfInterceptionForActiveIMSSession", + "Tag 2 missing in LALSReport", + "Tag 6 missing in LALSReport", + "Tag 8 missing in MMEStartOfInterceptionWithEPSAttachedUE", + "Tag 11 missing in MMEStartOfInterceptionWithEPSAttachedUE" ] -} \ No newline at end of file +} diff --git a/testing/asn_process.py b/testing/asn_process.py old mode 100644 new mode 100755 index 49deb16100ca83550c74d973fdd8c1b92de58c06..08079144f7c1dac4cb0d19cb946f46a2470528f1 --- a/testing/asn_process.py +++ b/testing/asn_process.py @@ -1,4 +1,7 @@ +#!/usr/bin/env python3 + import logging +import os import json from pathlib import Path from subprocess import run @@ -8,7 +11,7 @@ from pycrate_asn1c.asnproc import * import lint_asn1 -def syntaxCheckASN (fileList): +def syntaxCheckASN(fileList): """ Performs ASN syntax checking on a list of filenames (or pathlib Paths) @@ -21,28 +24,21 @@ def syntaxCheckASN (fileList): results = {} for file in fileList: try: - p = run(['asn1c', '-E', str(file)], capture_output=True) - if (p.returncode != 0): + p = run(["asn1c", "-E", str(file)], capture_output=True) + if p.returncode != 0: results[str(file)] = { - 'ok' : False, - 'code' : p.returncode, - 'message' : p.stderr.decode().splitlines()[0] + "ok": False, + "code": p.returncode, + "message": p.stderr.decode().splitlines()[0], } else: - results[str(file)] = { - 'ok' : True - } + results[str(file)] = {"ok": True} except Exception as ex: - results[str(file)] = { - 'ok' : False, - 'code' : -1, - 'message' : f"{ex!r}" - } + results[str(file)] = {"ok": False, "code": -1, "message": f"{ex.__class__.__qualname__}: {ex}"} return results - -def compileAllTargets (compileTargets): +def compileAllTargets(compileTargets): """ Attempts to compile a set of compile targets using the pycrate ASN1 tools @@ -53,10 +49,10 @@ def compileAllTargets (compileTargets): to be the "primary" file. This doesn't have any relavance to the compilation, but will be used as the identifier when reporting any compile errors. The compilation is performed by the pycrate ASN compile functions; errors - are caught as exceptions and rendered into a list. - + are caught as exceptions and rendered into a list. + Unfortunately, the pycrate compiler doesn't report line numbers. - The asn1c compiler does, but doesn't properly handle identifiers with the + The asn1c compiler does, but doesn't properly handle identifiers with the same name in different modules; as this occurs multiple times in TS 33.108, we can't use it. """ @@ -72,112 +68,118 @@ def compileAllTargets (compileTargets): with open(filename) as f: fileTexts.append(f.read()) fileNames.append(str(filename)) - logging.debug (f" Loading {filename}") - compile_text(fileTexts, filenames = fileNames) + logging.debug(f" Loading {filename}") + compile_text(fileTexts, filenames=fileNames) results[str(firstTarget)] = { - 'ok' : True, + "ok": True, } except Exception as ex: results[str(firstTarget)] = { - 'ok' : False, - 'code' : -1, - 'message' : f"{ex!r}" + "ok": False, + "code": -1, + "message": f"{ex!r}", } continue return results - -def processResults (results, stageName): +def processResults(results, stageName): """ Counts the number of errors and writes out the output per filename :param results: List of filenames (str or Pathlib Path) :param stageName: Name to decorate the output with :returns: The number of files which had errors - """ + """ print("") - errorCount = sum([1 for r in results.values() if not r['ok']]) + errorCount = sum([1 for r in results.values() if not r["ok"]]) logging.info(f"{errorCount} {stageName} errors encountered") - + print(f"{'-':-<60}") print(f"{stageName} results:") print(f"{'-':-<60}") for filename, result in results.items(): print(f" {filename:.<55}{'..OK' if result['ok'] else 'FAIL'}") - if not result['ok']: - if isinstance(result['message'], list): - for thing in result['message']: + if not result["ok"]: + if isinstance(result["message"], list): + for thing in result["message"]: print(f" {thing['message']}") else: print(f" {result['message']}") - + print(f"{'-':-<60}") print(f"{stageName} errors: {errorCount}") print(f"{'-':-<60}") - + return errorCount -if __name__ == '__main__': - logging.info('Searching for ASN.1 files') +def main(): + loglevel = os.environ.get("LOGLEVEL", "WARNING").upper() + logging.basicConfig(level=loglevel) + + logging.info("Searching for ASN.1 files") fileList = list(Path(".").rglob("*.asn1")) + list(Path(".").rglob("*.asn")) - logging.info(f'{len(fileList)} ASN.1 files found') + logging.info(f"{len(fileList)} ASN.1 files found") for file in fileList: - logging.debug(f' {file}') - - ignoreList = Path('testing/asn_ignore.txt').read_text().splitlines() + logging.debug(f" {file}") + + ignoreList = Path("testing/asn_ignore.txt").read_text().splitlines() ignoredFiles = [] for ignore in ignoreList: - logging.debug(f'Ignoring pattern {ignore}') + logging.debug(f"Ignoring pattern {ignore}") for file in fileList: if ignore in str(file): ignoredFiles.append(file) logging.debug(f" Ignoring {str(file)} as contains {ignore}") ignoredFiles = list(set(ignoredFiles)) - logging.info(f'{len(ignoredFiles)} files ignored') + logging.info(f"{len(ignoredFiles)} files ignored") for file in ignoredFiles: - logging.debug(f' {file}') - + logging.debug(f" {file}") + fileList = [file for file in fileList if file not in ignoredFiles] - logging.info(f'{len(fileList)} files to process') + logging.info(f"{len(fileList)} files to process") for file in fileList: - logging.debug(f' {file}') + logging.debug(f" {file}") if len(fileList) == 0: - logging.warning ("No files specified") + logging.warning("No files specified") exit(0) - + logging.info("Parsing ASN1 files") parseResults = syntaxCheckASN(fileList) if processResults(parseResults, "Parsing") > 0: - exit(-1) + exit(1) - logging.info ("Getting compile targets") - compileTargets = json.loads(Path('testing/asn_compile_targets.json').read_text()) - logging.info (f"{len(compileTargets)} compile targets found") + logging.info("Getting compile targets") + compileTargets = json.loads(Path("testing/asn_compile_targets.json").read_text()) + logging.info(f"{len(compileTargets)} compile targets found") compileResults = compileAllTargets(compileTargets) if processResults(compileResults, "Compiling") > 0: - exit(-1) + exit(1) - logging.info ("Linting files") - ignoreLintingList = Path('testing/asn_ignore_lint.txt').read_text().splitlines() + logging.info("Linting files") + ignoreLintingList = Path("testing/asn_ignore_lint.txt").read_text().splitlines() ignoredFiles = [] for ignore in ignoreLintingList: - logging.debug(f'Ignoring pattern {ignore} for linting') + logging.debug(f"Ignoring pattern {ignore} for linting") for file in fileList: if ignore in str(file): ignoredFiles.append(file) logging.debug(f" Ignoring {str(file)} for linting as contains {ignore}") ignoredFiles = list(set(ignoredFiles)) - logging.info(f'{len(ignoredFiles)} files ignored for linting') + logging.info(f"{len(ignoredFiles)} files ignored for linting") for file in ignoredFiles: - logging.debug(f' {file}') + logging.debug(f" {file}") fileList = [file for file in fileList if file not in ignoredFiles] - lintExceptions = json.loads(Path('testing/asn_lint_exceptions.json').read_text()) + lintExceptions = json.loads(Path("testing/asn_lint_exceptions.json").read_text()) lintResults = lint_asn1.lintASN1Files(fileList, lintExceptions) if processResults(lintResults, "Linting") > 0: - exit(-1) - + exit(1) + exit(0) + + +if __name__ == "__main__": + main() diff --git a/testing/check_asn_backwards_compat.py b/testing/check_asn_backwards_compat.py new file mode 100644 index 0000000000000000000000000000000000000000..76168f292b35e47817a8a82c12d0eaf833ddef7e --- /dev/null +++ b/testing/check_asn_backwards_compat.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 + +import os +import sys + +import asn_backwards_compat + + +def compare_releases(files): + total_errors = 0 + for idx in range(len(files) - 1): + file1 = files[idx] + file2 = files[idx + 1] + errors = asn_backwards_compat.compare_files(file1, file2) + if errors: + print("-----------------------------") + print(f"File 1: {file1}") + print(f"File 2: {file2}") + print("Errors:") + for error in errors: + print(f" {error}") + print("-----------------------------") + total_errors += len(errors) + return total_errors + + +def main(): + loglevel = os.environ.get("LOGLEVEL", "WARNING").upper() + logging.basicConfig(level=loglevel) + + error_count = 0 + + error_count += compare_releases( + [ + "33128/r15/TS33128Payloads.asn", + "33128/r16/TS33128Payloads.asn", + "33128/r17/TS33128Payloads.asn", + "33128/r18/TS33128Payloads.asn", + ] + ) + + error_count += compare_releases( + [ + "33128/r16/TS33128IdentityAssociation.asn", + "33128/r17/TS33128IdentityAssociation.asn", + "33128/r18/TS33128IdentityAssociation.asn", + ] + ) + + if error_count: + print(f"Total errors: {error_count}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/testing/lint_asn1.py b/testing/lint_asn1.py index 822c3e4bcdc3ec338143c02fee713f12a2b0429b..e0fab9acc481348df5abb37f810787bd9ca20a57 100644 --- a/testing/lint_asn1.py +++ b/testing/lint_asn1.py @@ -1,4 +1,7 @@ +#!/usr/bin/env python3 + import logging +import os from asn1tools import parse_files, compile_dict, ParseError, CompileError from glob import glob @@ -14,26 +17,27 @@ typeLevelTests = [] fileLevelTests = [] -def lintingTest (testName, testKind, testDescription): - def decorate (func): +def lintingTest(testName, testKind, testDescription): + def decorate(func): @functools.wraps(func) def wrapper(*args, **kwargs): - logging.debug (f" Running test {testName}") + logging.debug(f" Running test {testName}") errors = func(*args, **kwargs) for error in errors: - error['testName'] = testName - error['testKind'] = testKind - error['testDescription'] = testDescription + error["testName"] = testName + error["testKind"] = testKind + error["testDescription"] = testDescription return errors - if (testKind == "type"): + + if testKind == "type": typeLevelTests.append(wrapper) - if (testKind == "module"): + if testKind == "module": moduleLevelTests.append(wrapper) - if (testKind == "file"): + if testKind == "file": fileLevelTests.append(wrapper) return wrapper - return decorate + return decorate def formatFailure(f): @@ -42,208 +46,400 @@ def formatFailure(f): def appendFailure(failures, context, newFailure): combinedFailure = {**context, **newFailure} - logging.info (f"Test Failure: {combinedFailure}") + logging.info(f"Test Failure: {combinedFailure}") failures.append(combinedFailure) -#-------------------------------------------------------------------- +def membersAsTagDict(atype): + """Convert atype['members'] into a dict indexed by members.tag.number.""" + result = {} + for member in atype.get("members", {}): + if not member: + continue + if not "tag" in member: + continue + result[member["tag"]["number"]] = member + return result + + +# -------------------------------------------------------------------- # File level tests -#-------------------------------------------------------------------- +# -------------------------------------------------------------------- -@lintingTest(testName = "D.4.9", - testKind = "file", - testDescription = "Fields, tags, types and flags are space aligned") -def D41 (fileLines, context): + +@lintingTest( + testName="D.4.9", + testKind="file", + testDescription="Fields, tags, types and flags are space aligned", +) +def D41(fileLines, context): errors = [] for lineNumber, line in enumerate(fileLines): - if '\t' in line: - appendFailure(errors, context, { "line" : lineNumber, - "message" : f"Line {lineNumber} contains tab characters"}) + if "\t" in line: + appendFailure( + errors, + context, + { + "line": lineNumber, + "message": f"Line {lineNumber} contains tab characters", + }, + ) return errors -@lintingTest(testName = "D.4.11", - testKind = "file", - testDescription = "Braces are given their own line") -def D41 (fileLines, context): +@lintingTest( + testName="D.4.11", + testKind="file", + testDescription="Braces are given their own line", +) +def D41(fileLines, context): errors = [] for lineNumber, line in enumerate(fileLines): - if ('{' in line and line.strip().replace(",","") != '{') or ('}' in line and line.strip().replace(",","") != '}'): - if "itu-t(0)" in line: continue - if "OBJECT IDENTIFIER" in line: continue - if "RELATIVE-OID" in line: continue - appendFailure(errors, context, { "line" : lineNumber + 1, - "message" : f"Line {lineNumber + 1} contains a brace but also other characters ('{line}')"}) + if ("{" in line and line.strip().replace(",", "") != "{") or ( + "}" in line and line.strip().replace(",", "") != "}" + ): + if "itu-t(0)" in line: + continue + if "OBJECT IDENTIFIER" in line: + continue + if "RELATIVE-OID" in line: + continue + appendFailure( + errors, + context, + { + "line": lineNumber + 1, + "message": f"Line {lineNumber + 1} contains a brace but also other characters ('{line}')", + }, + ) return errors -#-------------------------------------------------------------------- +# -------------------------------------------------------------------- # Module level tests -#-------------------------------------------------------------------- +# -------------------------------------------------------------------- + -@lintingTest(testName = "D.4.1", - testKind = "module", - testDescription = "EXTENSIBILITY IMPLIED directive set") -def D41 (module, context): +@lintingTest( + testName="D.4.1", + testKind="module", + testDescription="EXTENSIBILITY IMPLIED directive set", +) +def D41(module, context): errors = [] - if (not ('extensibility-implied' in module.keys()) or (module['extensibility-implied'] == False)): - appendFailure(errors, context, {"message" : "EXTENSIBILITY IMPLIED directive not set"}) + if not ("extensibility-implied" in module) or ( + module["extensibility-implied"] == False + ): + appendFailure( + errors, + context, + {"message": "EXTENSIBILITY IMPLIED directive not set"}, + ) return errors -@lintingTest(testName = "D.4.2", - testKind = "module", - testDescription = "AUTOMATIC TAGS not used") +@lintingTest( + testName="D.4.2", + testKind="module", + testDescription="AUTOMATIC TAGS not used", +) def D42(module, context): errors = [] - if ('tags' in module) and (module['tags'] == 'AUTOMATIC'): - appendFailure(errors, context, {"message" : "AUTOMATIC TAGS directive used"}) + if ("tags" in module) and (module["tags"] == "AUTOMATIC"): + appendFailure(errors, context, {"message": "AUTOMATIC TAGS directive used"}) return errors -#-------------------------------------------------------------------- +@lintingTest( + testName="XIriVsIri", + testKind="module", + testDescription="XIRIEvent and IRIEvent use same tags", +) +def XIriVsIri(module, context): + errors = [] + xirievent = module["types"].get("XIRIEvent") + irievent = module["types"].get("IRIEvent") + if xirievent is None or irievent is None: + logging.debug( + f" XVI ignoring {context['module']} missing both XIRIEvent and IRIEvent" + ) + return [] + + xiri_by_tag = membersAsTagDict(xirievent) + xiri_tag_set = set(xiri_by_tag) + iri_by_tag = membersAsTagDict(irievent) + iri_tag_set = set(iri_by_tag) + xiri_only_tags = xiri_tag_set - iri_tag_set + iri_only_tags = iri_tag_set - xiri_tag_set + for tag in xiri_only_tags: + appendFailure( + errors, + context, + { + "message": f"Tag {tag} XIRIEvent field '{xiri_by_tag[tag]['name']}' is not present in IRIEvent" + }, + ) + for tag in iri_only_tags: + appendFailure( + errors, + context, + { + "message": f"Tag {tag} IRIEvent field '{iri_by_tag[tag]['name']}' is not present in XIRIEvent" + }, + ) + for tag in xiri_by_tag: + if tag in xiri_only_tags: + continue + xiri = xiri_by_tag[tag] + iri = iri_by_tag[tag] + if xiri["name"] != iri["name"]: + appendFailure( + errors, + context, + { + "message": f"Tag {tag} XIRIEvent field '{xiri_by_tag[tag]['name']}' differs from IRIEvent field '{iri_by_tag[tag]['name']}'" + }, + ) + if xiri["type"] != iri["type"]: + appendFailure( + errors, + context, + { + "message": f"Tag {tag} XIRIEvent type {xiri_by_tag[tag]['type']} differs from IRIEvent type '{iri_by_tag[tag]['type']}'" + }, + ) + + return errors + + +# -------------------------------------------------------------------- # Type level tests -#-------------------------------------------------------------------- +# -------------------------------------------------------------------- + -@lintingTest(testName = "D.3.4", - testKind = "type", - testDescription = "Field names only contain characters A-Z, a-z, 0-9") +@lintingTest( + testName="D.3.4", + testKind="type", + testDescription="Field names only contain characters A-Z, a-z, 0-9", +) def D34(t, context): - if not 'members' in t.keys(): - logging.debug (f" D34 ignoring {context['module']} '{context['type']}' as it has no members") + if not "members" in t: + logging.debug( + f" D34 ignoring {context['module']} '{context['type']}' as it has no members" + ) return [] errors = [] - for m in t['members']: - logging.debug (f" D34 checking member {m}") + for m in t["members"]: + logging.debug(f" D34 checking member {m}") if not m: - logging.debug (" (appears to be None, ignoring)") + logging.debug(" (appears to be None, ignoring)") continue - badLetters = list(set([letter for letter in m['name'] if not ((letter in string.ascii_letters) or (letter in string.digits)) ])) + badLetters = list( + set( + [ + letter + for letter in m["name"] + if not ( + (letter in string.ascii_letters) or (letter in string.digits) + ) + ] + ) + ) if len(badLetters) > 0: - appendFailure (errors, context, { "field" : m['name'], - "message" : f"Field '{m['name']}' contains disallowed characters {badLetters!r}"}) + appendFailure( + errors, + context, + { + "field": m["name"], + "message": f"Field '{m['name']}' contains disallowed characters {badLetters!r}", + }, + ) return errors -@lintingTest(testName = "D.4.3", - testKind = "type", - testDescription = "Tag numbers start at zero") -def D43 (t, context): +@lintingTest( + testName="D.4.3", + testKind="type", + testDescription="Tag numbers start at one", +) +def D43(t, context): errors = [] - if (t['type'] == 'SEQUENCE') or (t['type'] == 'CHOICE'): - if not 'tag' in t['members'][0]: + if (t["type"] == "SEQUENCE") or (t["type"] == "CHOICE"): + if not "tag" in t["members"][0]: return errors - if t['members'][0]['tag']['number'] != 1: - appendFailure (errors, context, {"message" : f"Tag numbers for {context['type']} start at {t['members'][0]['tag']['number']}, not 1"}) + if t["members"][0]["tag"]["number"] != 1: + appendFailure( + errors, + context, + { + "message": f"Tag numbers for {context['type']} start at {t['members'][0]['tag']['number']}, not 1" + }, + ) return errors -@lintingTest(testName = "D.4.4", - testKind = "type", - testDescription = "Enumerations start at zero") -def D44 (t, context): +@lintingTest( + testName="D.4.4", + testKind="type", + testDescription="Enumerations start at one", +) +def D44(t, context): errors = [] - if t['type'] == 'ENUMERATED': - if t['values'][0][1] != 1: - appendFailure(errors, context, { "message" : f"Enumerations for {context['type']} start at {t['values'][0][1]}, not 1"}) + if t["type"] == "ENUMERATED": + if t["values"][0][1] != 1: + appendFailure( + errors, + context, + { + "message": f"Enumerations for {context['type']} start at {t['values'][0][1]}, not 1" + }, + ) return errors -@lintingTest(testName = "D.4.5", - testKind = "type", - testDescription = "No anonymous types") -def checkD45 (t, context): - if not 'members' in t: - logging.debug (f" D45: No members in type {context['type']}, ignoring") +@lintingTest(testName="D.4.5", testKind="type", testDescription="No anonymous types") +def checkD45(t, context): + if not "members" in t: + logging.debug(f" D45: No members in type {context['type']}, ignoring") return [] errors = [] - for m in t['members']: - if not m: continue - if m['type'] in ['ENUMERATED','SEQUENCE','CHOICE', 'SET']: - appendFailure(errors, context, { "field" : m['name'], - "message" : f"Field '{m['name']}' in {context['type']} is an anonymous {m['type']}"}) + for m in t["members"]: + if not m: + continue + if m["type"] in ["ENUMERATED", "SEQUENCE", "CHOICE", "SET"]: + appendFailure( + errors, + context, + { + "field": m["name"], + "message": f"Field '{m['name']}' in {context['type']} is an anonymous {m['type']}", + }, + ) + return errors + + +@lintingTest(testName="D.4.holes", testKind="type", testDescription="No tag holes") +def checkD4holes(t, context): + if not "members" in t: + logging.debug(f" D4holes: No members in type {context['type']}, ignoring") + return [] + errors = [] + wantTag = None + for m in t["members"]: + if not m: + continue + thisTag = m["tag"]["number"] + if wantTag: + if wantTag < thisTag - 1: + appendFailure( + errors, + context, + { + "field": m["name"], + "message": f"Tags {wantTag}-{thisTag - 1} missing in {context['type']}", + }, + ) + elif wantTag == thisTag - 1: + appendFailure( + errors, + context, + { + "field": m["name"], + "message": f"Tag {wantTag} missing in {context['type']}", + }, + ) + wantTag = thisTag + 1 return errors -def lintASN1File (asnFile, exceptions): +def lintASN1File(asnFile, exceptions): errors = [] suppressed = [] - context = {'file' : asnFile} + context = {"file": asnFile} try: - logging.info ("Checking file {0}...".format(asnFile)) + logging.info("Checking file {0}...".format(asnFile)) with open(asnFile) as f: s = f.read().splitlines() for test in fileLevelTests: errors += test(s, context) d = parse_files(asnFile) for moduleName, module in d.items(): - logging.info (" Checking module {0}".format(moduleName)) + logging.info(" Checking module {0}".format(moduleName)) for test in moduleLevelTests: - context['module'] = moduleName + context["module"] = moduleName errors += test(module, context) - for typeName, typeDef in module['types'].items(): - context['type'] = typeName - context['module'] = moduleName + for typeName, typeDef in module["types"].items(): + context["type"] = typeName + context["module"] = moduleName for test in typeLevelTests: errors += test(typeDef, context) except ParseError as ex: - appendFailure(errors, context, { "message" : "ParseError: {0}".format(ex)}) + appendFailure(errors, context, {"message": "ParseError: {0}".format(ex)}) logging.error("ParseError: {0}".format(ex)) if len(exceptions) > 0: - suppressed = [error for error in errors if error['message'] in exceptions] - errors = [error for error in errors if error['message'] not in exceptions] - return { - 'ok' : len(errors) == 0, - 'message' : errors, - 'suppressed' : suppressed - } + suppressed = [error for error in errors if error["message"] in exceptions] + errors = [error for error in errors if error["message"] not in exceptions] + return {"ok": len(errors) == 0, "message": errors, "suppressed": suppressed} -def lintASN1Files (fileList, exceptions): +def lintASN1Files(fileList, exceptions): if len(fileList) == 0: - logging.warning ("No files specified") + logging.warning("No files specified") return {} errorMap = {} logging.info("Checking files...") for f in fileList: - unixf = str(f).replace('\\', '/') - errorMap[str(f)] = lintASN1File(str(f), exceptions[unixf] if unixf in exceptions else []) + unixf = str(f).replace("\\", "/") + errorMap[str(f)] = lintASN1File( + str(f), exceptions[unixf] if unixf in exceptions else [] + ) return errorMap -ignoreReleases = {'33108' : [f'r{i}' for i in range(5, 17)], - '33128' : [] } +ignoreReleases = {"33108": [f"r{i}" for i in range(5, 17)], "33128": []} -def lintAllASN1FilesInPath (path): - fileList = list(Path(path).rglob("*.asn1")) + list(Path(path).rglob("*.asn")) - ignoredFiles = [file for file in fileList if file.parts[1] in ignoreReleases[file.parts[0]]] +def lintAllASN1FilesInPath(path): + fileList = list(Path(path).rglob("*.asn1")) + list(Path(path).rglob("*.asn")) + ignoredFiles = [ + file + for file in fileList + if len(file.parts) > 1 + and file.parts[1] in ignoreReleases.get(file.parts[0], []) + ] logging.info(f"Ignoring {len(ignoredFiles)} files") logging.debug(ignoredFiles) - + fileList = [file for file in fileList if file not in ignoredFiles] - return lintASN1Files(fileList) - -if __name__ == '__main__': - result = lintAllASN1FilesInPath("./") - totalErrors = 0 - totalSuppressed = 0 - print ("Drafting rule checks:") - print ("-----------------------------") - for filename, results in result.items(): - errors = [r for r in results if not (formatFailure(r) in lintingexceptions.exceptedStrings)] - suppressedErrors = [r for r in results if formatFailure(r) in lintingexceptions.exceptedStrings] - print (f"{filename}: {'OK' if len(errors) == 0 else f'{len(errors)} errors detected'}") - for error in errors: - print(" " + formatFailure(error)) - for error in suppressedErrors: - print(" (" + formatFailure(error) + " - suppressed)") - totalErrors += len(errors) - totalSuppressed += len(suppressedErrors) - - print ("-----------------------------") - print (f"{totalErrors} non-compliances detected, {totalSuppressed} errors suppressed") - exit(totalErrors) + return lintASN1Files(fileList, {}) + + +def main(): + loglevel = os.environ.get("LOGLEVEL", "WARNING").upper() + logging.basicConfig(level=loglevel) + + results = lintAllASN1FilesInPath("./") + errorCount = sum([1 for r in results.values() if not r["ok"]]) + print("Drafting rule checks:") + print("-----------------------------") + for filename, result in results.items(): + print(f" {filename:.<55}{'..OK' if result['ok'] else 'FAIL'}") + if not result["ok"]: + if isinstance(result["message"], list): + for thing in result["message"]: + print(f" {thing['message']}") + else: + print(f" {result['message']}") + + print("-----------------------------") + print(f"errors: {errorCount}") + if errorCount: + exit(1) + exit(0) + + +if __name__ == "__main__": + main() diff --git a/testing/merge_test.py b/testing/merge_test.py index b7a82b39c4958ea30d4ac9a96e100e3041fe73c0..382453d372ec5517565afed411771ff6e180ef00 100644 --- a/testing/merge_test.py +++ b/testing/merge_test.py @@ -8,61 +8,66 @@ crCommitBranch = os.environ.get("CI_COMMIT_REF_NAME", "NOTFOUND") apiUrl = os.environ.get("CI_API_V4_URL", "https://forge.3gpp.org/rep/api/v4") projectId = os.environ.get("CI_PROJECT_ID", "13") -def gapi (query): + +def gapi(query): url = f"{apiUrl}/projects/{projectId}/{query}" r = requests.get(url) return json.loads(r.text) -def do (commandline): - #print (" Attempting: " + commandline) + +def do(commandline): + # print (" Attempting: " + commandline) completedProc = subprocess.run(commandline, capture_output=True, shell=True) - #print (" STDOUT > " + ("empty" if completedProc.stdout is None else completedProc.stdout.decode('utf-8'))) - #print (" STDERR > " + ("empty" if completedProc.stderr is None else completedProc.stderr.decode('utf-8'))) - #print (f" Completed with code {completedProc.returncode}") - return (completedProc.returncode == 0, completedProc.stdout.decode('utf-8')) + # print (" STDOUT > " + ("empty" if completedProc.stdout is None else completedProc.stdout.decode('utf-8'))) + # print (" STDERR > " + ("empty" if completedProc.stderr is None else completedProc.stderr.decode('utf-8'))) + # print (f" Completed with code {completedProc.returncode}") + return (completedProc.returncode == 0, completedProc.stdout.decode("utf-8")) + -print ("Searching for corresponding MR...") +print("Searching for corresponding MR...") mrs = gapi(f"merge_requests?source_branch={crCommitBranch}&state=opened") if len(mrs) == 0: - print ("No MR found... aborting") - exit() + print("No MR found... aborting") + exit(1) if len(mrs) > 1: - print (f"{len(mrs)} MRs found, 1 expected - aborting") + print(f"{len(mrs)} MRs found, 1 expected - aborting") for m in mrs: pprint.pprint(m) - exit(-1) + exit(1) mr = mrs[0] -print (f"Found MR {mr['reference']} ({mr['title']})") -print (f"Target branch is {mr['target_branch']}") -print ("Searching for open MRs targeting same branch...") +print(f"Found MR {mr['reference']} ({mr['title']})") +print(f"Target branch is {mr['target_branch']}") +print("Searching for open MRs targeting same branch...") mrs = gapi(f"merge_requests?target_branch={mr['target_branch']}&state=opened") -mrs = [m for m in mrs if m['reference'] != mr['reference']] -print (f"{len(mrs)} MRs found") +mrs = [m for m in mrs if m["reference"] != mr["reference"]] +print(f"{len(mrs)} MRs found") mergeConflicts = {} for mr in mrs: - source_branch = mr['source_branch'] - print (source_branch) + source_branch = mr["source_branch"] + print(source_branch) try: do(f"git fetch origin {source_branch}:{source_branch}") success, errStr = do(f"git merge --no-commit {source_branch}") if not success: - print ("Merge NOT OK") + print("Merge NOT OK") mergeConflicts[source_branch] = errStr else: - print ("Merge OK") + print("Merge OK") except Exception as ex: mergeConflicts[source_branch] = str(ex) raise finally: do("git merge --abort") -print (f"Merge conflicts with following branches: {mergeConflicts}") -exit(len(mergeConflicts.keys())) \ No newline at end of file +print(f"Merge conflicts with following branches: {mergeConflicts}") +if mergeConflicts: + exit(1) +exit(0) diff --git a/testing/xsd_process.py b/testing/xsd_process.py old mode 100644 new mode 100755 index 4340df3fd064abe851db434769b970d203b8fff0..2d644f5549d5bf323b5fb647925ddd73231a6f0f --- a/testing/xsd_process.py +++ b/testing/xsd_process.py @@ -1,16 +1,21 @@ +#!/usr/bin/env python3 + import logging +import os from pathlib import Path -from xmlschema.etree import etree_tostring +from xmlschema import etree_tostring from xmlschema import XMLSchema, XMLSchemaParseError + def getError(e): try: return f"{etree_tostring(e.elem, e.namespaces, ' ', 20)} - {e.message}" except Exception as ex: return repr(e) -def BuildSchemaDictonary (fileList): + +def BuildSchemaDictonary(fileList): if len(fileList) == 0: logging.info("No schema files provided") return [] @@ -19,15 +24,17 @@ def BuildSchemaDictonary (fileList): schemaLocations = [] for schemaFile in fileList: try: - xs = XMLSchema(schemaFile, validation='skip') - schemaLocations.append((xs.default_namespace, str(Path(schemaFile).resolve()))) + xs = XMLSchema(schemaFile, validation="skip") + schemaLocations.append( + (xs.default_namespace, str(Path(schemaFile).resolve())) + ) logging.info(" [ {0} -> {1} ]".format(xs.default_namespace, schemaFile)) except XMLSchemaParseError as ex: - logging.warning (" [ {0} failed to parse: {1} ]".format(schemaFile, ex)) + logging.warning(" [ {0} failed to parse: {1} ]".format(schemaFile, ex)) return schemaLocations -def BuildSchema (coreFile, fileList = None): +def BuildSchema(coreFile, fileList=None): schemaLocations = [] if fileList and len(fileList) > 0: schemaLocations = BuildSchemaDictonary(fileList) @@ -36,38 +43,40 @@ def BuildSchema (coreFile, fileList = None): return coreSchema -def ValidateXSDFiles (fileList): +def ValidateXSDFiles(fileList): if len(fileList) == 0: logging.info("No schema files provided") return {} - + schemaLocations = BuildSchemaDictonary(fileList) errors = {} logging.info("Schema validation:") for schemaFile in fileList: try: - schema = XMLSchema(schemaFile, locations = schemaLocations, validation="lax") + schema = XMLSchema(schemaFile, locations=schemaLocations, validation="lax") logging.info(schemaFile + ": OK") errors[schemaFile] = [getError(e) for e in schema.all_errors] except XMLSchemaParseError as ex: logging.warning(schemaFile + ": Failed validation ({0})".format(ex.message)) if (ex.schema_url) and (ex.schema_url != ex.origin_url): - logging.warning(" Error comes from {0}, suppressing".format(ex.schema_url)) - errors[schemaFile] = [] + logging.warning( + " Error comes from {0}, suppressing".format(ex.schema_url) + ) + errors[schemaFile] = [] else: errors[schemaFile] = [ex] return errors -def ValidateAllXSDFilesInPath (path): +def ValidateAllXSDFilesInPath(path): schemaGlob = [str(f) for f in Path(path).rglob("*.xsd")] return ValidateXSDFiles(schemaGlob) -def ValidateInstanceDocuments (coreFile, supportingSchemas, instanceDocs): +def ValidateInstanceDocuments(coreFile, supportingSchemas, instanceDocs): if (instanceDocs is None) or len(instanceDocs) == 0: - logging.warning ("No instance documents provided") + logging.warning("No instance documents provided") return [] schema = BuildSchema(coreFile, supportingSchemas) @@ -75,32 +84,39 @@ def ValidateInstanceDocuments (coreFile, supportingSchemas, instanceDocs): for instanceDoc in instanceDocs: try: schema.validate(instanceDoc) - logging.info ("{0} passed validation".format(instanceDoc)) + logging.info("{0} passed validation".format(instanceDoc)) except Exception as ex: - logging.error ("{0} failed validation: {1}".format(instanceDoc, ex)) + logging.error("{0} failed validation: {1}".format(instanceDoc, ex)) return errors - -if __name__ == '__main__': +def main(): + loglevel = os.environ.get("LOGLEVEL", "WARNING").upper() + logging.basicConfig(level=loglevel) results = ValidateAllXSDFilesInPath("./") - print ("XSD validation checks:") - print ("-----------------------------") + print("XSD validation checks:") + print("-----------------------------") errorCount = 0 for fileName, errors in results.items(): if len(errors) > 0: errorCount += len(errors) - print (f" {fileName}: {len(errors)} errors") + print(f" {fileName}: {len(errors)} errors") for error in errors: if isinstance(error, XMLSchemaParseError): - print (error.msg) + print(error.msg) else: - print (f" {str(error)}") + print(f" {str(error)}") else: - print (f" {fileName}: OK") + print(f" {fileName}: OK") + + print("-----------------------------") + print(f"{errorCount} errors detected") + if errorCount: + exit(1) + exit(0) + - print ("-----------------------------") - print (f"{errorCount} errors detected") - exit(errorCount) \ No newline at end of file +if __name__ == "__main__": + main()