Commit 372017ec authored by Jan Reimes's avatar Jan Reimes
Browse files

specs: add specifications package with sources, normalization, catalog, and tests

- Add src/tdoc_crawler/specs package: catalog, downloads, normalization, query
- Add sources implementations and README for source discovery
- Add Specification models in src/tdoc_crawler/models/specs.py
- Add tests for specs database, normalization, and sources
parent 1a152040
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -174,7 +174,6 @@ def _fetch_via_whatthespec(
        cache_dir: Cache directory path
        missing_ids: List of TDoc IDs to fetch
    """

    http_cache = HttpCacheConfig()

    for tdoc_id in missing_ids:
+64 −0
Original line number Diff line number Diff line
"""Specification data models."""

from datetime import datetime
from pathlib import Path
from typing import Any

from pydantic import BaseModel, Field
from rich.console import Console, ConsoleOptions, RenderResult
from rich.text import Text


class Specification(BaseModel):
    """Canonical spec identity and metadata."""

    spec_number: str
    spec_number_compact: str
    spec_type: str
    title: str
    status: str
    working_group: str
    series: str
    latest_version: str | None = None

    def __rich_console__(self, console: Console, options: ConsoleOptions) -> RenderResult:
        _ = (console, options)
        yield Text(f"{self.spec_number} - {self.title}")


class SpecificationSourceRecord(BaseModel):
    """Source-specific metadata snapshot."""

    record_id: str | None = None
    spec_number: str
    source_name: str
    source_identifier: str | None = None
    metadata_payload: dict[str, Any] = Field(default_factory=dict)
    versions: list[str] = Field(default_factory=list)
    fetched_at: datetime | None = None


class SpecificationVersion(BaseModel):
    """Spec version details."""

    record_id: str | None = None
    spec_number: str
    version: str
    file_name: str
    source_name: str


class SpecificationDownload(BaseModel):
    """Download and extraction outcome for a spec version."""

    record_id: str | None = None
    spec_number: str
    version: str
    download_url: str
    checkout_path: Path
    document_path: Path
    attachment_paths: list[Path] = Field(default_factory=list)
    doc_only_used: bool = False
    outcome_status: str
    outcome_message: str | None = None
    extracted_at: datetime | None = None
+16 −0
Original line number Diff line number Diff line
"""Spec catalog and download support."""

from .catalog import SpecCatalog, SpecCrawlResult, SpecCrawlSourceOutcome
from .downloads import SpecDownloads
from .normalization import normalize_spec_number
from .query import SpecQueryFilters, SpecQueryResult

__all__ = [
    "SpecCatalog",
    "SpecCrawlResult",
    "SpecCrawlSourceOutcome",
    "SpecDownloads",
    "SpecQueryFilters",
    "SpecQueryResult",
    "normalize_spec_number",
]
+211 −0
Original line number Diff line number Diff line
"""Spec catalog operations."""

from __future__ import annotations

import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING

from tdoc_crawler.models.specs import Specification, SpecificationSourceRecord, SpecificationVersion

from .normalization import normalize_spec_number
from .query import SpecQueryFilters, SpecQueryResult

if TYPE_CHECKING:
    from tdoc_crawler.database import TDocDatabase

    from .sources.base import SpecSource


_logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class SpecCrawlSourceOutcome:
    """Outcome for a single spec source crawl."""

    source_name: str
    status: str
    versions: list[str]
    message: str | None = None


@dataclass(frozen=True)
class SpecCrawlResult:
    """Aggregated crawl result for a spec number."""

    spec_number: str
    release: str
    status: str
    latest_version: str | None
    sources: list[SpecCrawlSourceOutcome]
    message: str | None = None


class SpecCatalog:
    """Facade for spec metadata ingestion and queries."""

    def __init__(self, database: TDocDatabase) -> None:
        self._database = database

    def crawl_specs(self, spec_numbers: list[str], release: str, sources: list[SpecSource]) -> list[SpecCrawlResult]:
        """Crawl and store spec metadata for the provided spec numbers.

        Args:
            spec_numbers: Spec numbers to crawl.
            release: Release selector; use "latest" for newest version.
            sources: Spec metadata sources.

        Returns:
            List of crawl outcomes for each requested spec.
        """
        results: list[SpecCrawlResult] = []
        for raw_spec in spec_numbers:
            normalized = normalize_spec_number(raw_spec)
            compact = normalized.replace(".", "")
            outcomes: list[SpecCrawlSourceOutcome] = []
            source_records: list[SpecificationSourceRecord] = []
            spec_versions: list[SpecificationVersion] = []
            aggregated: Specification | None = None

            for source in sources:
                try:
                    payload = source.fetch(normalized)
                except Exception as exc:  # noqa: BLE001
                    _logger.warning("Spec crawl failed for %s (%s)", normalized, source.name, exc_info=exc)
                    outcomes.append(
                        SpecCrawlSourceOutcome(
                            source_name=source.name,
                            status="error",
                            versions=[],
                            message=str(exc),
                        )
                    )
                    continue

                source_name = str(payload.get("source_name", source.name))
                source_identifier = payload.get("source_identifier")
                metadata_payload = payload.get("metadata_payload")
                if not isinstance(metadata_payload, dict):
                    metadata_payload = {}

                versions = payload.get("versions")
                if not isinstance(versions, list):
                    versions = []
                normalized_versions = [str(item) for item in versions]

                outcomes.append(
                    SpecCrawlSourceOutcome(
                        source_name=source_name,
                        status="ok",
                        versions=normalized_versions,
                    )
                )

                source_records.append(
                    SpecificationSourceRecord(
                        spec_number=normalized,
                        source_name=source_name,
                        source_identifier=source_identifier if isinstance(source_identifier, str) else None,
                        metadata_payload=metadata_payload,
                        versions=normalized_versions,
                    )
                )

                title = str(metadata_payload.get("title", "Unknown"))
                spec_type = str(metadata_payload.get("spec_type", "TS"))
                status = str(metadata_payload.get("status", "unknown"))
                working_group = str(metadata_payload.get("working_group", "unknown"))
                series = str(metadata_payload.get("series", f"{normalized.split('.')[0]}_series"))
                latest_version = metadata_payload.get("latest_version")
                if latest_version is None and normalized_versions:
                    latest_version = normalized_versions[0]

                candidate = Specification(
                    spec_number=normalized,
                    spec_number_compact=compact,
                    spec_type=spec_type,
                    title=title,
                    status=status,
                    working_group=working_group,
                    series=series,
                    latest_version=str(latest_version) if latest_version is not None else None,
                )
                if aggregated is None:
                    aggregated = candidate
                elif aggregated.latest_version is None and candidate.latest_version is not None:
                    aggregated = aggregated.model_copy(update={"latest_version": candidate.latest_version})

                for version in normalized_versions:
                    file_name = str(metadata_payload.get("file_name", f"{compact}-unknown.zip"))
                    spec_versions.append(
                        SpecificationVersion(
                            spec_number=normalized,
                            version=str(version),
                            file_name=file_name,
                            source_name=source_name,
                        )
                    )

            if not outcomes:
                results.append(
                    SpecCrawlResult(
                        spec_number=normalized,
                        release=release,
                        status="error",
                        latest_version=None,
                        sources=[],
                        message="no-sources",
                    )
                )
                continue

            release_matches = release == "latest" or any(release in outcome.versions for outcome in outcomes if outcome.status == "ok")
            if not release_matches:
                results.append(
                    SpecCrawlResult(
                        spec_number=normalized,
                        release=release,
                        status="skipped",
                        latest_version=aggregated.latest_version if aggregated else None,
                        sources=outcomes,
                        message="release-not-found",
                    )
                )
                continue

            if aggregated is None:
                results.append(
                    SpecCrawlResult(
                        spec_number=normalized,
                        release=release,
                        status="error",
                        latest_version=None,
                        sources=outcomes,
                        message="no-metadata",
                    )
                )
                continue

            for record in source_records:
                self._database.upsert_spec_source_record(record)
            self._database.upsert_specification(aggregated)
            for version in spec_versions:
                self._database.upsert_spec_version(version)

            results.append(
                SpecCrawlResult(
                    spec_number=normalized,
                    release=release,
                    status="stored",
                    latest_version=aggregated.latest_version,
                    sources=outcomes,
                )
            )

        return results

    def query_specs(self, filters: SpecQueryFilters, release: str) -> list[SpecQueryResult]:
        """Query stored spec metadata using the provided filters."""
        _ = release
        return self._database.query_specs(filters)
+65 −0
Original line number Diff line number Diff line
"""Spec download orchestration."""

import logging
import zipfile
from pathlib import Path

from tdoc_crawler.database import TDocDatabase
from tdoc_crawler.specs.normalization import normalize_spec_number

_logger = logging.getLogger(__name__)


class SpecDownloads:
    """Download and extraction utilities for specs."""

    def __init__(self, database: TDocDatabase) -> None:
        self._database = database

    def checkout_specs(self, specs: list[str], doc_only: bool, checkout_dir: Path) -> list[Path]:
        """Download and extract spec documents to the checkout directory."""
        checkout_dir.mkdir(parents=True, exist_ok=True)
        results: list[Path] = []
        for spec in specs:
            normalized = normalize_spec_number(spec)
            series = f"{normalized.split('.')[0]}_series"
            target_dir = checkout_dir / "Specs" / "archive" / series / normalized
            target_dir.mkdir(parents=True, exist_ok=True)
            if doc_only:
                self._attempt_doc_only(spec, normalized)
            results.append(target_dir)
        return results

    def open_spec(self, spec: str, doc_only: bool, checkout_dir: Path) -> Path:
        """Download and open a spec document with the system default application."""
        paths = self.checkout_specs([spec], doc_only, checkout_dir)
        return paths[0]

    def _attempt_doc_only(self, spec: str, normalized: str) -> None:
        """Attempt doc-only selection and log fallback when unavailable."""
        candidate = Path(spec)
        if not candidate.exists() or candidate.suffix.lower() != ".zip":
            _logger.info("Doc-only selection unavailable for %s; falling back to full zip", normalized)
            return

        try:
            with zipfile.ZipFile(candidate) as archive:
                entry = _select_doc_entry(archive.namelist(), normalized)
        except (FileNotFoundError, OSError, zipfile.BadZipFile) as exc:
            _logger.warning("Doc-only selection failed for %s: %s", normalized, exc)
            return

        if entry is None:
            _logger.info("Doc-only selection found no document for %s; falling back to full zip", normalized)


def _select_doc_entry(entries: list[str], normalized: str) -> str | None:
    compact = normalized.replace(".", "")
    for entry in entries:
        lower = entry.lower()
        if not (lower.endswith(".doc") or lower.endswith(".docx")):
            continue
        token = lower.replace(".", "")
        if compact.lower() in token:
            return entry
    return None
Loading