Commit 78d34a64 authored by Jan Reimes's avatar Jan Reimes
Browse files

refactor(specs): normalize version handling across models and downloads

* Introduce parse_spec_version and parse_spec_version_nbr for version parsing.
* Update Specification model to use SpecificationVersionNumber for latest_version.
* Refactor SpecDownloads to utilize new version parsing functions.
* Normalize version handling in SpecDatabase and SpecificationSourceRecord.
parent cfcef8aa
Loading
Loading
Loading
Loading
+7 −6
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ from tdoc_crawler.specs.models import (
)
from tdoc_crawler.specs.sources.base import SpecSource
from tdoc_crawler.utils.normalization import normalize_release, normalize_spec_number
from tdoc_crawler.utils.parse import parse_spec_version, parse_spec_version_nbr

_logger = get_logger(__name__)

@@ -38,7 +39,7 @@ def _build_spec_candidate(
    series = str(metadata_payload.get("series", f"{normalized.split('.', maxsplit=1)[0]}_series"))
    latest_version = metadata_payload.get("latest_version")
    if latest_version is None and versions:
        latest_version = versions[0]
        latest_version = max(versions, key=parse_spec_version)

    candidate = Specification(
        spec_number=normalized,
@@ -81,7 +82,7 @@ def _build_spec_versions(
        spec_versions.append(
            SpecificationVersion(
                spec_number=normalized,
                version=str(version),
                version=parse_spec_version_nbr(version),
                file_name=file_name,
                source_name=source_name,
            )
@@ -92,12 +93,12 @@ def _build_spec_versions(
def _version_matches_release(version: str, release_type: str, release_value: str, specificity: int) -> bool:
    """Check if a version string matches the release selector."""
    try:
        v_parts = [int(p) for p in version.split(".")]
    except ValueError:
        v_parts = list(parse_spec_version(version).release)
    except Exception:
        return False

    if release_type == "exact":
        return version == release_value
        return parse_spec_version_nbr(version) == parse_spec_version_nbr(release_value)

    # prefix match: check major (specificity=1) or major.minor (specificity=2)
    if len(v_parts) >= specificity:
@@ -334,7 +335,7 @@ class SpecDatabase(DocDatabase):
            versions = payload.get("versions")
            if not isinstance(versions, list):
                versions = []
            normalized_versions = [str(item) for item in versions]
            normalized_versions = [parse_spec_version_nbr(item) for item in versions]

            outcomes.append(
                SpecCrawlSourceOutcome(
+9 −16
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@ from tdoc_crawler.http_client import download_to_file
from tdoc_crawler.logging import get_logger
from tdoc_crawler.specs.sources.base import SpecSource
from tdoc_crawler.utils.normalization import normalize_release, normalize_spec_number
from tdoc_crawler.utils.parse import parse_spec_version, parse_spec_version_nbr

_logger = get_logger(__name__)

@@ -148,7 +149,7 @@ class SpecDownloads:
        if not versions:
            raise ValueError(f"No versions found for spec {normalized}")

        versions.sort(key=lambda version: self._parse_version(version.version), reverse=True)
        versions.sort(key=lambda version: parse_spec_version(version.version), reverse=True)

        # If specific release requested, use normalize_release to handle various formats
        release_type, release_value, specificity = normalize_release(release)
@@ -167,8 +168,12 @@ class SpecDownloads:
        url = SPEC_URL_TEMPLATE.format(series=series, normalized=normalized, file_name=target.file_name)
        return url, target.file_name

    def _download_full_zip(self, url: str, target_path: Path) -> None:
        """Download full zip file, re-use session if already created for doc-only attempt."""
        self.session = download_to_file(url, target_path, session=self.session, close_session=False, cache_manager_name=self._cache_manager.name)

    @staticmethod
    def _filter_versions_by_release(
        self,
        versions: list[Any],
        release_type: str,
        release_value: str,
@@ -184,9 +189,9 @@ class SpecDownloads:

        filtered_versions: list[Any] = []
        for version_entry in versions:
            version_parts = self._parse_version(version_entry.version)
            version_parts = parse_spec_version(version_entry.version).release
            if release_type == "exact":
                if version_entry.version == release_value:
                if parse_spec_version_nbr(version_entry.version) == parse_spec_version_nbr(release_value):
                    filtered_versions.append(version_entry)
                continue

@@ -202,10 +207,6 @@ class SpecDownloads:
        msg = f"No versions found for spec {normalized} with release {release}"
        raise ValueError(msg)

    def _download_full_zip(self, url: str, target_path: Path) -> None:
        """Download full zip file, re-use session if already created for doc-only attempt."""
        self.session = download_to_file(url, target_path, session=self.session, close_session=False, cache_manager_name=self._cache_manager.name)

    @staticmethod
    def _build_target_dir(checkout_dir: Path, normalized: str) -> Path:
        """Create and return target checkout directory for one normalized spec."""
@@ -214,14 +215,6 @@ class SpecDownloads:
        target_dir.mkdir(parents=True, exist_ok=True)
        return target_dir

    @staticmethod
    def _parse_version(version: str) -> tuple[int, ...]:
        """Parse dotted version string into comparable integer tuple."""
        try:
            return tuple(map(int, version.split(".")))
        except ValueError:
            return (0,)

    @staticmethod
    async def _attempt_doc_only_async(url: str, normalized: str, target_dir: Path) -> bool:
        """Attempt to download only the document file from remote zip."""
+48 −5
Original line number Diff line number Diff line
@@ -10,10 +10,13 @@ from datetime import datetime
from pathlib import Path
from typing import Any

from pydantic import BaseModel, Field
from packaging.version import Version
from pydantic import BaseModel, Field, field_validator
from rich.console import Console, ConsoleOptions, RenderResult
from rich.text import Text

from tdoc_crawler.utils.parse import SpecificationVersionNumber, parse_spec_version, parse_spec_version_nbr


class Specification(BaseModel):
    """Canonical spec identity and metadata."""
@@ -25,12 +28,27 @@ class Specification(BaseModel):
    status: str
    working_group: str
    series: str
    latest_version: str | None = None
    latest_version: SpecificationVersionNumber | None = None

    @property
    def latest_version_number(self) -> Version | None:
        """Return latest version as comparable Version object."""
        if self.latest_version is None:
            return None
        return parse_spec_version(self.latest_version)

    def __rich_console__(self, console: Console, options: ConsoleOptions) -> RenderResult:
        _ = (console, options)
        yield Text(f"{self.spec_number} - {self.title}")

    @field_validator("latest_version", mode="before")
    @classmethod
    def _normalize_latest_version(cls, value: Version | SpecificationVersionNumber | None) -> SpecificationVersionNumber | None:
        """Normalize latest version to canonical three-part string when provided."""
        if value is None:
            return None
        return parse_spec_version_nbr(value)


class SpecificationSourceRecord(BaseModel):
    """Source-specific metadata snapshot."""
@@ -40,26 +58,45 @@ class SpecificationSourceRecord(BaseModel):
    source_name: str
    source_identifier: str | None = None
    metadata_payload: dict[str, Any] = Field(default_factory=dict)
    versions: list[str] = Field(default_factory=list)
    versions: list[SpecificationVersionNumber] = Field(default_factory=list)
    fetched_at: datetime | None = None

    @field_validator("versions", mode="before")
    @classmethod
    def _normalize_versions(cls, value: list[Version | SpecificationVersionNumber] | None) -> list[SpecificationVersionNumber]:
        """Normalize source versions to canonical three-part strings."""
        if value is None:
            return []
        return [parse_spec_version_nbr(item) for item in value]


class SpecificationVersion(BaseModel):
    """Spec version details."""

    record_id: str | None = None
    spec_number: str
    version: str
    version: SpecificationVersionNumber
    file_name: str
    source_name: str

    @property
    def version_number(self) -> Version:
        """Return version as comparable Version object."""
        return parse_spec_version(self.version)

    @field_validator("version", mode="before")
    @classmethod
    def _normalize_version(cls, value: Version | SpecificationVersionNumber) -> SpecificationVersionNumber:
        """Normalize version to canonical three-part string."""
        return parse_spec_version_nbr(value)


class SpecificationDownload(BaseModel):
    """Download and extraction outcome for a spec version."""

    record_id: str | None = None
    spec_number: str
    version: str
    version: SpecificationVersionNumber
    download_url: str
    checkout_path: Path
    document_path: Path
@@ -69,6 +106,12 @@ class SpecificationDownload(BaseModel):
    outcome_message: str | None = None
    extracted_at: datetime | None = None

    @field_validator("version", mode="before")
    @classmethod
    def _normalize_version(cls, value: Version | SpecificationVersionNumber) -> SpecificationVersionNumber:
        """Normalize downloaded version to canonical three-part string."""
        return parse_spec_version_nbr(value)


@dataclass
class SpecQueryFilters:
+29 −0
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ _logger = get_logger(__name__)


type AgendaItemNumber = str
type SpecificationVersionNumber = str


def parse_agenda_item_nbr(value: Any) -> AgendaItemNumber:
@@ -36,6 +37,34 @@ def parse_agenda_item_version(value: Any) -> Version:
        return Version("0")


def parse_spec_version_nbr(value: Any) -> SpecificationVersionNumber:
    """Parse specification version as canonical three-part string with fallback."""
    return str(parse_spec_version(value))


def parse_spec_version(value: Any) -> Version:
    """Parse specification version as Version, normalized to major.minor.patch."""
    if value is None:
        return Version("0.0.0")

    raw = str(value).strip()
    if not raw:
        return Version("0.0.0")

    try:
        parsed = Version(raw)
    except (InvalidVersion, ValueError) as exc:
        _logger.warning(f"Invalid specification version '{value}': {exc}")
        return Version("0.0.0")

    release_parts = list(parsed.release)
    if len(release_parts) >= 3:
        return Version(".".join(str(part) for part in release_parts[:3]))

    padded_parts = release_parts + [0] * (3 - len(release_parts))
    return Version(".".join(str(part) for part in padded_parts))


def infer_working_groups_from_subgroups(subgroups: list[str]) -> list[WorkingGroup]:
    """Infer working groups from subgroup codes.