Commit fd6ca52a authored by Jan Reimes's avatar Jan Reimes
Browse files

refactor(parse): simplify parsing functions and remove fallbacks



* Refactored parse_agenda_item_nbr and parse_agenda_item_version to remove fallback logic.
* Updated parse_spec_version to streamline version parsing without fallbacks.
* Cleaned up infer_working_groups_from_subgroups for better readability.
* Removed unnecessary logging and exception handling in collect_spec_numbers.

Co-authored-by: default avatarCopilot <copilot@github.com>
parent 62b8a6d9
Loading
Loading
Loading
Loading
+34 −72
Original line number Diff line number Diff line
@@ -3,61 +3,42 @@ from __future__ import annotations
import sys
from collections.abc import Iterable
from pathlib import Path
from typing import Any

import click
import typer
from packaging.version import InvalidVersion, Version
from packaging.version import Version

from tdoc_crawler.logging import get_logger
from tdoc_crawler.meetings.utils import normalize_subgroup_alias, normalize_working_group_alias
from tdoc_crawler.models.working_groups import WorkingGroup
from tdoc_crawler.utils.normalization import expand_spec_ranges_batch

_logger = get_logger(__name__)


type AgendaItemNumber = str
type SpecificationVersionNumber = str


def parse_agenda_item_nbr(value: Any) -> AgendaItemNumber:
    """Parse agenda item number as canonical string with fallback to zero."""
def parse_agenda_item_nbr(value: str | int | None) -> AgendaItemNumber:
    """Parse agenda item number as canonical string."""
    return str(parse_agenda_item_version(value))


def parse_agenda_item_version(value: Any) -> Version:
    """Parse agenda item number as Version with fallback to zero."""
def parse_agenda_item_version(value: str | int | None) -> Version:
    """Parse agenda item number as Version."""
    if value is None:
        return Version("0")
    try:
    return Version(str(value).strip())
    except (InvalidVersion, ValueError) as exc:
        _logger.warning(f"Invalid agenda item number '{value}': {exc}")
        return Version("0")


def parse_spec_version_nbr(value: Any) -> SpecificationVersionNumber:
    """Parse specification version as canonical three-part string with fallback."""
def parse_spec_version_nbr(value: str | int | None) -> SpecificationVersionNumber:
    """Parse specification version as canonical three-part string."""
    return str(parse_spec_version(value))


def parse_spec_version(value: Any) -> Version:
def parse_spec_version(value: str | int | None) -> Version:
    """Parse specification version as Version, normalized to major.minor.patch."""
    if value is None:
        return Version("0.0.0")

    raw = str(value).strip()
    if not raw:
        return Version("0.0.0")

    try:
        parsed = Version(raw)
    except (InvalidVersion, ValueError) as exc:
        _logger.warning(f"Invalid specification version '{value}': {exc}")
        return Version("0.0.0")

    parsed = Version(str(value).strip())
    release_parts = list(parsed.release)

    if len(release_parts) >= 3:
        return Version(".".join(str(part) for part in release_parts[:3]))

@@ -76,8 +57,8 @@ def infer_working_groups_from_subgroups(subgroups: list[str]) -> list[WorkingGro
    """
    working_groups: list[WorkingGroup] = []
    for subgroup in subgroups:
        # Extract first character to determine working group
        if subgroup and len(subgroup) >= 1:
        if not subgroup:
            continue
        first_char = subgroup[0].upper()
        if first_char == "R":
            wg = WorkingGroup.RAN
@@ -91,7 +72,7 @@ def infer_working_groups_from_subgroups(subgroups: list[str]) -> list[WorkingGro
        if wg not in working_groups:
            working_groups.append(wg)

    return working_groups if working_groups else [WorkingGroup.RAN, WorkingGroup.SA, WorkingGroup.CT]
    return working_groups


def parse_working_groups(values: list[str] | None, subgroups: list[str] | None = None) -> list[WorkingGroup]:
@@ -105,19 +86,15 @@ def parse_working_groups(values: list[str] | None, subgroups: list[str] | None =
        List of working groups to crawl
    """
    if not values:
        # If subgroups are specified but no explicit working groups, infer from subgroups
        if subgroups:
            return infer_working_groups_from_subgroups(subgroups)
        # Otherwise default to all working groups
        return [WorkingGroup.RAN, WorkingGroup.SA, WorkingGroup.CT]

    resolved: list[WorkingGroup] = []
    for item in values:
        # Try alias normalization first (RP->RAN, SP->SA, CP->CT)
        normalized = normalize_working_group_alias(item)
        resolved.append(normalized)
    if not resolved:
        _logger.warning("No valid working groups specified")
        raise typer.Exit(code=2)

    return resolved


@@ -128,13 +105,7 @@ def parse_subgroups(values: list[str] | None) -> list[str] | None:

    resolved: list[str] = []
    for item in values:
        # Convert SubWorkingGroup enums to their names (e.g., S4, R1, CP)
        normalized = normalize_subgroup_alias(item)
        if not normalized:
            _logger.warning(f"Unknown subgroup: {item}")
            raise typer.Exit(code=2)

        # Use .name to get the short code (e.g., "S4" not "SA4")
        resolved.append(normalized.name)

    return resolved
@@ -147,7 +118,6 @@ def collect_spec_numbers(specs: list[str] | None, spec_file: Path | None) -> lis
    if specs:
        for spec in specs:
            if spec == "-":
                # Read from stdin
                for line in sys.stdin:
                    line_stripped = line.strip()
                    if line_stripped:
@@ -156,24 +126,16 @@ def collect_spec_numbers(specs: list[str] | None, spec_file: Path | None) -> lis
                collected.append(spec.strip())

    if spec_file and spec_file.exists():
        try:
        with spec_file.open("r", encoding="utf-8") as f:
            for line in f:
                line_stripped = line.strip()
                if line_stripped:
                    collected.append(line_stripped)
        except OSError as exc:
            raise click.FileError(str(spec_file), hint=f"Cannot read spec file: {exc}")

    if not collected:
        return []

    try:
        expanded = expand_spec_ranges_batch(collected)
    except ValueError as e:
        raise click.UsageError(str(e))

    return expanded
    return expand_spec_ranges_batch(collected)


def infer_working_groups_from_ids(ids: Iterable[str]) -> list[WorkingGroup]:
@@ -191,4 +153,4 @@ def infer_working_groups_from_ids(ids: Iterable[str]) -> list[WorkingGroup]:
        group = mapping.get(value[0].upper())
        if group and group not in resolved:
            resolved.append(group)
    return resolved or [WorkingGroup.RAN, WorkingGroup.SA, WorkingGroup.CT]
    return resolved