Commit a961853e authored by Luke Mewburn's avatar Luke Mewburn
Browse files

asn_backwards_compat: rewrite to use pycrate

Refactor to use pycrate_asn1c instead of asn1tools.
Use a Context dataclass to maintain state between the functions.

WIP: needs more checks and improvements to type name handling.
parent 97fa3195
Loading
Loading
Loading
Loading
Loading
+222 −81
Original line number Diff line number Diff line
@@ -4,37 +4,206 @@
Module to check backwards compatibility between two ASN.1 files.
"""

import copy
import dataclasses
import logging
import os
import pathlib
import sys

import asn1tools
import pycrate_asn1c.asnobj as pa_asnobj
import pycrate_asn1c.asnproc


# TODO: require python >= 3.7


# pylint: disable=logging-fstring-interpolation


def process_error(errors, message):
@dataclasses.dataclass
class Context:
    fa_path: str
    fb_path: str
    errors: list[str] = dataclasses.field(default_factory=list)
    fa_mod: pa_asnobj.ASN1Dict | None = None
    fb_mod: pa_asnobj.ASN1Dict | None = None
    seen: set[str] = dataclasses.field(default_factory=set)


def process_error(ctx: Context, message: str):
    """Log `message` and append `message` to `errors`."""
    logging.info(f"Test Failure: {message}")
    errors.append(message)
    ctx.errors.append(message)


def dict_swap(dictionary: dict) -> dict:
    """Swap keys and values in `dictionary`.

    Note: Doesn't attempt to handle duplicates."""
    return {value: key for key, value in dictionary.items()}


def parse_asn1(asntext: str) -> pa_asnobj.ASN1Dict:
    """Parse `asntext` as ASN.1 using pycrate, returning the module object."""
    pycrate_asn1c.asnproc.GLOBAL.clear()
    pycrate_asn1c.asnproc.compile_text(asntext)
    res = copy.deepcopy(pycrate_asn1c.asnproc.GLOBAL.MOD)
    return res


def repr_name(asnobj: pa_asnobj.ASN1Obj) -> str:
    """Return name of `asnobj` as MODULE.PARENT.NAME[TAG].

def members_as_tag_dict(atype):
    """Convert atype['members'] into a dict indexed by members.tag.number."""
    "MODULE.", "PARENT.", and "[TAG]" are optional."""

    result = f"{asnobj._name}"
    if (asnparent := getattr(asnobj, "_parent", None)) is not None:
        result = f"{repr_name(asnparent)}.{result}"
    if (asnmod := getattr(asnobj, "_mod", None)) is not None:
        result = f"{asnmod}.{result}"
    if asnobj._tag is not None:
        result = f"{result}[{asnobj._tag[0]}]"
    return result


def repr_type(asnobj: pa_asnobj.ASN1Obj) -> str:
    """Return name of type of `asnobj`, as MODULE.TYPE.

    TODO: fix for anonymous types such as L2AccessPDU.L2CC.l2CCContents CHOICE.
    "MODULE." is optional."""

    if (tr := asnobj.get_typeref()) is not None:
        return repr_name(tr)
    else:
        # TODO: this isn't quite right
        return repr_name(asnobj)  # TODO asnobj._type ?


def container_to_tag_dict(ctx: Context, container: pa_asnobj.ASN1Dict) -> dict:
    """Convert `container` to dict indexed on tag, value is type."""
    result = {}
    for member in atype.get("members", {}):
        if not member:
            continue
        if not "tag" in member:
    for name, asntype in container.items():
        tagtuple = asntype._tag
        if tagtuple is None:
            process_error(
                ctx, f"{repr_name(container)} field {name} doesn't have a tag"
            )
            continue
        result[member["tag"]["number"]] = member
        result[tagtuple[0]] = asntype
    return result


def compare_files(file1, file2):
    """Compare ASN.1 `file1` and `file2` for backwards compatibility
    issues introduced in `file2`:
def compare_enum(ctx: Context, fa_enum: pa_asnobj.ENUM, fb_enum: pa_asnobj.ENUM):
    """Compare ASN.1 ENUMERATED `fa_enum` and `fb_enum`."""
    assert fa_enum.TYPE == pa_asnobj.TYPE_ENUM
    assert fb_enum.TYPE == pa_asnobj.TYPE_ENUM

    fa_tags = dict_swap(fa_enum.get_cont())
    fb_tags = dict_swap(fb_enum.get_cont())

    for tag, ca_name in fa_tags.items():
        cb_name = fb_tags.get(tag, None)
        if cb_name is None:
            process_error(
                ctx,
                f"{repr_name(fa_enum)}.{ca_name}({tag}) missing",
            )
            continue  # tag not in fb_enum, stop checking

        if ca_name != cb_name:
            process_error(
                ctx,
                f"{repr_name(fa_enum)}.{ca_name}({tag}) renamed to '{cb_name}({tag})'",
            )


def compare_container(
    ctx: Context, fa_type: pa_asnobj.ASN1Obj, fb_type: pa_asnobj.ASN1Obj
):
    """Compare ASN.1 containers `fa_type` and `fb_type`."""
    assert fa_type.TYPE in pa_asnobj.TYPE_CONSTRUCT
    assert fb_type.TYPE in pa_asnobj.TYPE_CONSTRUCT

    seenkey = f"{repr_type(fa_type)} and {repr_type(fb_type)}"
    logging.info(f"Comparing container {seenkey}")
    if seenkey in ctx.seen:
        logging.info(f"Skipping, already processed {seenkey}")
    ctx.seen.add(seenkey)

    fa_tags = container_to_tag_dict(ctx=ctx, container=fa_type.get_cont())
    fb_tags = container_to_tag_dict(ctx=ctx, container=fb_type.get_cont())

    for tag, ca_child in fa_tags.items():
        ca_name = ca_child._name
        ca_typerepr = repr_type(ca_child)

        cb_child = fb_tags.get(tag, None)
        if cb_child is None:
            process_error(
                ctx,
                f"{repr_name(fa_type)}.{ca_name}[{tag}] missing",
            )
            continue  # tag not in fb_type, stop checking

        cb_name = cb_child._name
        cb_typerepr = repr_type(cb_child)

        if ca_name != cb_name:
            process_error(
                ctx,
                f"{repr_name(fa_type)}.{ca_name}[{tag}] renamed to '{cb_name}[{tag}]'",
            )

        if ca_typerepr != cb_typerepr:
            process_error(
                ctx,
                f"{repr_name(fa_type)}.{ca_name}[{tag}] type '{ca_typerepr}' changed to type '{cb_typerepr}'",
            )

        compare_asntype(ctx=ctx, fa_type=ca_child, fb_type=cb_child)


def compare_asntype(
    ctx: Context, fa_type: pa_asnobj.ASN1Obj, fb_type: pa_asnobj.ASN1Obj
):
    """Compare ASN.1 types `fa_type` and `fb_type`."""
    fa_desc = f"{repr_name(fa_type)} type {repr_type(fa_type)}"
    fb_desc = f"{repr_name(fb_type)} type {repr_type(fb_type)}"
    logging.info(f"Comparing {fa_desc} with {fb_desc}")

    if fa_type.TYPE != fb_type.TYPE:
        process_error(
            ctx,
            f"{fa_desc} type '{fa_type.TYPE}' mismatch with '{fb_type.TYPE}'",
        )
        # No further processing of this type can be performed
        return

    if fa_type.TYPE in pa_asnobj.TYPE_CONST_SIZE:
        # Check constraint text as is.
        # TODO: enhanced to decode each constraint
        fa_const = [c["text"] for c in fa_type.get_const()]
        fb_const = [c["text"] for c in fb_type.get_const()]

        if fa_const != fb_const:
            process_error(
                ctx,
                f"{fa_desc} constraint '{','.join(fa_const)}' changed to {fb_desc} constraint '{','.join(fb_const)}'",
            )

    if fa_type.TYPE is pa_asnobj.TYPE_ENUM:
        compare_enum(ctx=ctx, fa_enum=fa_type, fb_enum=fb_type)
    elif fa_type.TYPE in pa_asnobj.TYPE_CONSTRUCT:
        compare_container(ctx=ctx, fa_type=fa_type, fb_type=fb_type)
    else:
        # TODO: process_error(ctx, f"Unimplemented check of type '{fa_type.TYPE}'")
        pass


def compare_asntext(ctx: Context, fa_asntext: str, fb_asntext: str):
    """Compare ASN.1 text `fa_asntext` and `fb_asntext` for backwards compatibility
    issues introduced in `fb_asntext`:
    - Module is the same
      - Each type in the module:
         - Exists
@@ -43,73 +212,46 @@ def compare_files(file1, file2):
           - Same inner type at the tag
           - Warns if the type renamed (not fatal)

    Returns a list of error messages (if any).
    Updates ctx.errors with a list of error messages (if any).
    """
    # pylint: disable=too-many-locals

    errors = []
    logging.info(f"Comparing file {ctx.fa_path} with {ctx.fb_path}")

    f1_asn = asn1tools.parse_files(file1)
    f2_asn = asn1tools.parse_files(file2)
    logging.info(f"Parsing {ctx.fa_path}")
    ctx.fa_mod = parse_asn1(fa_asntext)

    logging.info(f"Comparing file {file1} with {file2}")
    for module_name, f1_module in f1_asn.items():
        logging.info(f"Checking module {module_name}")
        f2_module = f2_asn.get(module_name)
        if not f2_module:
            process_error(errors, f"Module {module_name} not present")
            continue
        f2_types = f2_module["types"]
        for type_name, f1_type_def in f1_module["types"].items():
            logging.info(f"Checking type {type_name}")
            f2_type_def = f2_types.get(type_name)
            if not f2_type_def:
                process_error(errors, f"Type {type_name} not present")
                continue
            if f1_type_def["type"] != f2_type_def["type"]:
                process_error(
                    errors,
                    f"Type {type_name} type '{f1_type_def['type']}'"
                    f" mismatch with '{f2_type_def['type']}'",
                )
                continue
            f1_tags = members_as_tag_dict(f1_type_def)
            f2_tags = members_as_tag_dict(f2_type_def)
            if not f1_tags:
                if f2_tags:
                    process_error(
                        errors, f"Type {type_name} contains unexpected members"
                    )
    logging.info(f"Parsing {ctx.fb_path}")
    ctx.fb_mod = parse_asn1(fb_asntext)

    for module_name, fa_module in ctx.fa_mod.items():
        if module_name.startswith("_"):
            # Skip pycrate internal keys
            continue
            for tag_number, f1_tag in f1_tags.items():
                logging.info(
                    f"Checking {f1_tag['name']} [{tag_number}] {f1_tag['type']}"
                )
                f2_tag = f2_tags.get(tag_number, {})
                if not f2_tag:
                    process_error(
                        errors,
                        f"Type {type_name} tag {tag_number} field '{f1_tag['name']}'"
                        f" missing",
                    )

        logging.info(f"Checking module {module_name}")
        if module_name not in ctx.fb_mod:
            process_error(ctx, f"Module {module_name} not present in {ctx.fb_path}")
            continue
                if f1_tag["type"] != f2_tag["type"]:
                    process_error(
                        errors,
                        f"Type {type_name} tag {tag_number} field '{f1_tag['name']}'"
                        f" type '{f1_tag['type']}' mismatch with '{f2_tag['type']}'",
                    )
        fb_module = ctx.fb_mod[module_name]

        for type_name in fa_module["_type_"]:
            fa_type = fa_module[type_name]
            logging.info(f"Checking type {repr_name(fa_type)}")

            if type_name not in fb_module:
                process_error(ctx, f"Type {type_name} not present in {ctx.fb_path}")
                continue
                if f1_tag["name"] != f2_tag["name"]:
                    process_error(
                        errors,
                        f"Type {type_name} tag {tag_number} field '{f1_tag['name']}'"
                        f" renamed to '{f2_tag['name']}'",
                    )
                    # this is not fatal - continue
                # no other checks for now
            fb_type = fb_module[type_name]

            compare_asntype(ctx=ctx, fa_type=fa_type, fb_type=fb_type)


def compare_files(ctx: Context):
    """Read ASN.1 files `ctx.fa_path` and `ctx.fb_path` and compare with compare_asntext."""

    return errors
    fa_text = pathlib.Path(ctx.fa_path).read_text()
    fb_text = pathlib.Path(ctx.fb_path).read_text()
    compare_asntext(ctx=ctx, fa_asntext=fa_text, fb_asntext=fb_text)


def main():
@@ -118,16 +260,15 @@ def main():
    logging.basicConfig(level=loglevel)

    if len(sys.argv) != 3:
        raise RuntimeError(f"Usage: {sys.argv[0]} file1.asn file2.asn")
    file1 = sys.argv[1]
    file2 = sys.argv[2]
    results = compare_files(file1, file2)
    if results:
        raise RuntimeError(f"Usage: {sys.argv[0]} FILE1.ASN FILE2.ASN")
    ctx = Context(fa_path=sys.argv[1], fb_path=sys.argv[2])
    compare_files(ctx=ctx)
    if ctx.errors:
        print("-----------------------------")
        print(f"File 1: {file1}")
        print(f"File 2: {file2}")
        print(f"File 1: {ctx.fa_path}")
        print(f"File 2: {ctx.fb_path}")
        print("Errors:")
        for error in results:
        for error in ctx.errors:
            print(f"  {error}")
        print("-----------------------------")
        sys.exit(1)