Commit a266f1fb authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(threegpp): enhance metadata fetching and parsing logic

* Remove unused cache manager parameter from spec resolution functions.
* Update fetch_threegpp_metadata to parse HTML directly from the portal.
* Add utility functions for decoding version from filenames.
* Improve test coverage for metadata fetching and version decoding.
parent 9492dba2
Loading
Loading
Loading
Loading
+6 −14
Original line number Diff line number Diff line
@@ -24,7 +24,6 @@ async def resolve_spec_release(
    release: str,
    *,
    auto_crawl: bool = True,
    cache_manager_name: str | None = None,
) -> str:
    """Resolve a release selector to a concrete version string for a spec.

@@ -38,7 +37,6 @@ async def resolve_spec_release(
        spec_number: Spec number in any format (e.g., ``"26.260"``, ``"26260"``).
        release: Release selector (e.g., ``"latest"``, ``"17"``, ``"17.1.0"``).
        auto_crawl: Crawl spec metadata when not in database.
        cache_manager_name: Optional cache manager name for HTTP caching.

    Returns:
        Full 3-part version string (e.g., ``"19.0.0"``).
@@ -57,7 +55,7 @@ async def resolve_spec_release(
    versions = await _get_versions()

    if not versions and auto_crawl:
        sources = build_default_spec_sources(cache_manager_name=cache_manager_name)
        sources = build_default_spec_sources()
        async with SpecDatabase(db_file) as db:
            results = await db.crawl_specs([normalized], release, sources)
        for result in results:
@@ -87,18 +85,15 @@ def clear_checkout_specs(checkout_dir: Path) -> int:
    return 1


def build_default_spec_sources(cache_manager_name: str | None = None) -> list[SpecSource]:
def build_default_spec_sources() -> list[SpecSource]:
    """Build the default list of spec sources.

    Args:
        cache_manager_name: Optional cache manager name for HTTP caching

    Returns:
        List of SpecSource instances for fetching spec metadata
    """
    return [
        cast("SpecSource", FunctionSpecSource("3gpp", fetch_threegpp_metadata, fetcher_kwargs={"cache_manager_name": cache_manager_name})),
        cast("SpecSource", FunctionSpecSource("whatthespec", fetch_whatthespec_metadata, fetcher_kwargs={"cache_manager_name": cache_manager_name})),
        cast("SpecSource", FunctionSpecSource("3gpp", fetch_threegpp_metadata)),
        cast("SpecSource", FunctionSpecSource("whatthespec", fetch_whatthespec_metadata)),
    ]


@@ -108,7 +103,6 @@ def checkout_specs(
    database: SpecDatabase,
    release: str = "latest",
    doc_only: bool = False,
    cache_manager_name: str | None = None,
) -> list[Path]:
    """Checkout spec documents to the checkout directory.

@@ -118,12 +112,11 @@ def checkout_specs(
        database: SpecDatabase instance for metadata lookup
        release: Release version to checkout
        doc_only: If True, download only document files instead of full zip
        cache_manager_name: Optional cache manager name for HTTP caching

    Returns:
        List of paths to checked out specs
    """
    sources = build_default_spec_sources(cache_manager_name=cache_manager_name)
    sources = build_default_spec_sources()
    downloader = SpecDownloads(database)
    return downloader.checkout_specs(spec_numbers, doc_only, checkout_dir, release, sources=sources)

@@ -134,10 +127,9 @@ async def checkout_specs_async(
    database: SpecDatabase,
    release: str = "latest",
    doc_only: bool = False,
    cache_manager_name: str | None = None,
) -> list[Path]:
    """Async variant of checkout_specs for async call paths."""
    sources = build_default_spec_sources(cache_manager_name=cache_manager_name)
    sources = build_default_spec_sources()
    downloader = SpecDownloads(database)
    return await downloader.checkout_specs_async(spec_numbers, doc_only, checkout_dir, release, sources=sources)

+249 −31
Original line number Diff line number Diff line
"""3GPP portal metadata fetcher for specs."""
"""3GPP portal metadata fetcher for specs.

Fetches spec metadata and version listings from the 3GPP portal
via the dynareport redirect. This is the primary and authoritative
source for spec metadata and version information.

Flow:
  1. Fetch ``https://www.3gpp.org/dynareport/{compact}.htm``
  2. Follow redirect to the portal specification details page
  3. Parse the portal HTML for metadata (General, Responsibility tabs)
     and version listings (Versions tab)
"""

from __future__ import annotations

import re
from pathlib import Path
from urllib.parse import parse_qs, urlparse

from packaging.version import Version

from tdoc_crawler.config.settings import HttpConfig
from tdoc_crawler.http_client import create_cached_session
@@ -10,59 +25,262 @@ from tdoc_crawler.utils.normalization import normalize_spec_number

_logger = get_logger(__name__)

# Release letter → major version mapping (Rel-10 through Rel-20+)
# Kept as utility for filename-based version decoding elsewhere.
_RELEASE_LETTER_MAP: dict[str, int] = {
    "a": 10,
    "b": 11,
    "c": 12,
    "d": 13,
    "e": 14,
    "f": 15,
    "g": 16,
    "h": 17,
    "i": 18,
    "j": 19,
    "k": 20,
}

_DYNAREPORT_URL = "https://www.3gpp.org/dynareport/{compact}.htm"

# ---------------------------------------------------------------------------
# Regex patterns for parsing the portal spec details page
# ---------------------------------------------------------------------------

# General tab — stable element IDs
_RE_TITLE = re.compile(r'<span id="titleVal">([^<]+)</span>')
_RE_STATUS = re.compile(r'<span id="statusVal">([^<]+)</span>')
_RE_TYPE = re.compile(r'<span id="typeVal">([^<]+)</span>')

# Responsibility tab — primary working group
_RE_PRIMARY_GROUP = re.compile(
    r'PrimaryResponsibleGroupLbl.*?<span>\s*([^<]+?)\s*</span>',
    re.DOTALL,
)

# Versions tab — download links with version text
# <a id="..._lnkFtpDownload" ... href=".../26260-j10.zip">19.1.0</a>
_RE_VERSION_LINK = re.compile(
    r'<a\s[^>]*id="[^"]*_lnkFtpDownload"[^>]*href="([^"]+)"[^>]*>'
    r'\s*(\d+\.\d+\.\d+)\s*</a>',
    re.IGNORECASE,
)


class SpecNotFoundError(Exception):
    """Raised when a specification is not found on 3GPP portal."""
    """Raised when a specification is not found on the 3GPP portal."""

    pass


def fetch_threegpp_metadata(spec_number: str, http_config: HttpConfig | None = None, http_cache_file: Path | None = None) -> dict[str, object]:
    """Fetch spec metadata via 3GPP.org redirect flow.
# ---------------------------------------------------------------------------
# Utility: filename-based version decoding (for FTP archives)
# ---------------------------------------------------------------------------

# Regex for 3GPP spec zip filenames from FTP archive:
#   {compact}-{version_code}.zip
_SPEC_FILE_RE = re.compile(
    r"^(?P<compact>\d{5})-(?P<vercode>[a-z]\d{2}|\d{3})\.zip$",
    re.IGNORECASE,
)


def _decode_version_from_filename(compact: str, filename: str) -> str | None:
    """Decode a 3GPP FTP archive filename to a version string.

    Filename format: ``{compact}-{vercode}.zip``

    Version code encoding:
      - Rel >= 10: one letter (release) + two digits (minor, editorial)
        e.g., ``j10`` → release 19, minor 1, editorial 0 → ``19.1.0``
      - Rel < 10: three digits (major, minor, editorial)
        e.g., ``100`` → ``1.0.0``

    Args:
        compact: Five-digit compact spec number (e.g., "26260").
        filename: Filename from the FTP directory (e.g., "26260-j10.zip").

    Returns:
        Version string (e.g., "19.1.0") or None if the filename cannot be decoded.
    """
    match = _SPEC_FILE_RE.match(filename)
    if not match or match.group("compact") != compact:
        return None

    vercode = match.group("vercode").lower()

    # Rel >= 10: letter + two digits
    if vercode[0].isalpha():
        release = _RELEASE_LETTER_MAP.get(vercode[0])
        if release is None:
            return None
        minor = int(vercode[1])
        editorial = int(vercode[2])
        return f"{release}.{minor}.{editorial}"

    # Rel < 10: three digits
    return f"{int(vercode[0])}.{int(vercode[1])}.{int(vercode[2])}"


# ---------------------------------------------------------------------------
# Portal HTML parsing
# ---------------------------------------------------------------------------


def _parse_spec_type(raw_type: str) -> str:
    """Normalize spec type from portal text to short form.

    Examples:
        "Technical specification (TS)""TS"
        "Technical Report (TR)""TR"
    """
    raw = raw_type.strip().upper()
    if "REPORT" in raw:
        return "TR"
    return "TS"


def _extract_filename_from_url(url: str) -> str:
    """Extract the zip filename from a full FTP URL."""
    return url.rsplit("/", maxsplit=1)[-1] if "/" in url else url


def _parse_portal_html(html: str, normalized: str) -> dict[str, object]:
    """Parse the 3GPP portal spec details page HTML.

    Extracts metadata from the General and Responsibility tabs,
    and version listings from the Versions tab.
    """
    # General tab
    title_match = _RE_TITLE.search(html)
    status_match = _RE_STATUS.search(html)
    type_match = _RE_TYPE.search(html)

    title = title_match.group(1).strip() if title_match else "Unknown"
    status = status_match.group(1).strip() if status_match else "unknown"
    raw_type = type_match.group(1).strip() if type_match else "Technical specification (TS)"
    spec_type = _parse_spec_type(raw_type)

    # Series from spec number
    series_prefix = normalized.split(".", maxsplit=1)[0]
    series = f"{series_prefix}_series"

    # Responsibility tab — working group
    wg_match = _RE_PRIMARY_GROUP.search(html)
    working_group = wg_match.group(1).strip() if wg_match else "unknown"
    # Normalize: "SA 4" → "SA4"
    working_group = re.sub(r"\s+", "", working_group)

    # Versions tab — all download links with version text
    version_links = _RE_VERSION_LINK.findall(html)

    if not version_links:
        return {
            "has_versions": False,
            "title": title,
            "spec_type": spec_type,
            "status": status,
            "working_group": working_group,
            "series": series,
        }

    # Build (version, filename) pairs, sort descending by version
    entries: list[tuple[str, str]] = []
    for url, version_text in version_links:
        filename = _extract_filename_from_url(url)
        entries.append((version_text, filename))

    entries.sort(key=lambda e: Version(e[0]), reverse=True)

    versions = [e[0] for e in entries]
    spec_files = [e[1] for e in entries]
    latest_version = versions[0] if versions else None

    return {
        "has_versions": True,
        "title": title,
        "spec_type": spec_type,
        "status": status,
        "working_group": working_group,
        "series": series,
        "latest_version": latest_version,
        "specfile": spec_files,
        "versions": versions,
    }


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------


def fetch_threegpp_metadata(
    spec_number: str,
    http_config: HttpConfig | None = None,
    http_cache_file: Path | None = None,
) -> dict[str, object]:
    """Fetch spec metadata from the 3GPP portal via dynareport redirect.

    1. Fetches ``https://www.3gpp.org/dynareport/{compact}.htm``
       which redirects to the portal specification details page.
    2. Parses the portal page HTML for:
       - General tab: title, status, type
       - Responsibility tab: primary working group
       - Versions tab: all version numbers with download filenames

    Args:
        spec_number: Spec number to fetch (e.g., "26.131").
        http_config: Optional HTTP configuration from ThreeGPPConfig
        spec_number: Spec number to fetch (e.g., "26.260").
        http_config: Optional HTTP configuration.
        http_cache_file: Optional explicit path to the HTTP cache database.

    Returns:
        Dictionary containing spec metadata from 3GPP portal.
        Dictionary with keys:
          - spec_number: Normalized spec number
          - source_name: "3gpp"
          - source_identifier: Portal specification URL
          - metadata_payload: Dict with title, spec_type, status, working_group,
            series, latest_version, and specfile list
          - versions: List of version strings (e.g., ["19.1.0", "19.0.0", ...])

    Raises:
        SpecNotFoundError: If the spec does not exist (redirects to unknown spec page).
        requests.HTTPError: If the HTTP request fails.
        SpecNotFoundError: If the spec is not found or has no versions.
    """
    normalized = normalize_spec_number(spec_number)
    compact = normalized.replace(".", "")
    url = f"https://www.3gpp.org/dynareport/{compact}.htm"
    dynareport_url = _DYNAREPORT_URL.format(compact=compact)

    session = create_cached_session(http_config=http_config, http_cache_file=http_cache_file)
    response = session.get(url, timeout=30, allow_redirects=True)
    response = session.get(dynareport_url, timeout=30)
    response.raise_for_status()

    parsed = urlparse(response.request.url)
    query = parse_qs(parsed.query)
    spec_id = query.get("specificationId", [None])[0]

    # Verify the spec exists by checking for specificationId in the redirect URL
    if spec_id is None:
        _logger.warning("Spec %s not found (no specificationId in redirect URL: %s)", normalized, response.url)
        raise SpecNotFoundError(f"Spec {normalized} not found on 3GPP portal")
    html_body = response.text or ""
    parsed = _parse_portal_html(html_body, normalized)

    payload: dict[str, object]
    try:
        payload = response.json()
    except ValueError:
        payload = {}
    if not parsed.get("has_versions"):
        _logger.warning(
            "Spec %s not found or no versions on portal (dynareport: %s)",
            normalized,
            dynareport_url,
        )
        raise SpecNotFoundError(
            f"Spec {normalized} not found on 3GPP portal or has no versions"
        )

    versions = payload.get("versions")
    if not isinstance(versions, list):
        versions = []
    # Use the redirected URL as source identifier
    portal_url = str(response.url) if hasattr(response, "url") else dynareport_url

    return {
        "spec_number": normalized,
        "source_name": "3gpp",
        "source_identifier": int(spec_id),
        "metadata_payload": payload,
        "versions": versions,
        "source_identifier": portal_url,
        "metadata_payload": {
            "title": parsed["title"],
            "spec_type": parsed["spec_type"],
            "status": parsed["status"],
            "working_group": parsed["working_group"],
            "series": parsed["series"],
            "latest_version": parsed.get("latest_version"),
            "specfile": parsed.get("specfile", []),
        },
        "versions": parsed.get("versions", []),
    }
+144 −25
Original line number Diff line number Diff line
@@ -5,7 +5,11 @@ from unittest.mock import MagicMock

import pytest

from tdoc_crawler.specs.sources.threegpp import SpecNotFoundError, fetch_threegpp_metadata
from tdoc_crawler.specs.sources.threegpp import (
    SpecNotFoundError,
    _decode_version_from_filename,
    fetch_threegpp_metadata,
)
from tdoc_crawler.specs.sources.whatthespec import fetch_whatthespec_metadata


@@ -25,12 +29,103 @@ class _FakeResponse:
        return self._payload


def test_fetch_threegpp_metadata_uses_redirect(monkeypatch: object) -> None:
    payload = {"title": "Spec title", "versions": ["19.0.0"]}
    response = _FakeResponse("https://portal.3gpp.org/desktopmodules/Specifications/SpecificationDetails.aspx?specificationId=12345", payload)
class _FakePortalResponse:
    """Fake HTTP response with HTML text from the portal spec details page."""

    _PORTAL_URL = (
        "https://portal.3gpp.org/desktopmodules/Specifications"
        "/SpecificationDetails.aspx?specificationId=3314"
    )

    def __init__(self, html_body: str, url: str = _PORTAL_URL) -> None:
        self.text = html_body
        self.url = url
        self.status_code = 200

    def raise_for_status(self) -> None:
        return None


# ---------------------------------------------------------------------------
# 3GPP filename-to-version decoder tests (utility)
# ---------------------------------------------------------------------------


def test_decode_version_rel10_plus() -> None:
    assert _decode_version_from_filename("26260", "26260-j10.zip") == "19.1.0"
    assert _decode_version_from_filename("26260", "26260-j00.zip") == "19.0.0"
    assert _decode_version_from_filename("26260", "26260-i10.zip") == "18.1.0"
    assert _decode_version_from_filename("26260", "26260-a00.zip") == "10.0.0"


def test_decode_version_pre_rel10() -> None:
    assert _decode_version_from_filename("26260", "26260-100.zip") == "1.0.0"
    assert _decode_version_from_filename("26260", "26260-001.zip") == "0.0.1"


def test_decode_version_wrong_compact_returns_none() -> None:
    assert _decode_version_from_filename("99999", "26260-j10.zip") is None


def test_decode_version_invalid_vercode_returns_none() -> None:
    assert _decode_version_from_filename("26260", "26260-z00.zip") is None


# ---------------------------------------------------------------------------
# Portal-based 3GPP source tests
# ---------------------------------------------------------------------------

_PORTAL_HTML_26260 = """\
<html><body>
<div id="RadPageGeneral">
    <span id="referenceVal">26.260</span>
    <span id="titleVal">Objective test methodologies for the evaluation of immersive audio systems</span>
    <span id="statusVal">Under change control</span>
    <span id="typeVal">Technical specification (TS)</span>
</div>
<div id="RadPageResponsibility">
    <span id="PrimaryResponsibleGroupLbl">Primary responsible group:</span>
    <span>SA 4</span>
</div>
<div id="RadPageReleases">
    <table>
        <tr><td>
            <a id="SpecificationReleaseControl1_rpbReleases_i0_ctl00_specificationsVersionGrid_ctl00_ctl04_lnkFtpDownload"
               title="Click to download this version"
               href="https://www.3gpp.org/ftp/Specs/archive/26_series/26.260/26260-j10.zip">19.1.0</a>
        </td></tr>
        <tr><td>
            <a id="SpecificationReleaseControl1_rpbReleases_i0_ctl00_specificationsVersionGrid_ctl00_ctl06_lnkFtpDownload"
               title="Click to download this version"
               href="https://www.3gpp.org/ftp/Specs/archive/26_series/26.260/26260-j00.zip">19.0.0</a>
        </td></tr>
        <tr><td>
            <a id="SpecificationReleaseControl1_rpbReleases_i1_ctl00_specificationsVersionGrid_ctl00_ctl04_lnkFtpDownload"
               title="Click to download this version"
               href="https://www.3gpp.org/ftp/Specs/archive/26_series/26.260/26260-i10.zip">18.1.0</a>
        </td></tr>
        <tr><td>
            <a id="SpecificationReleaseControl1_rpbReleases_i1_ctl00_specificationsVersionGrid_ctl00_ctl06_lnkFtpDownload"
               title="Click to download this version"
               href="https://www.3gpp.org/ftp/Specs/archive/26_series/26.260/26260-i00.zip">18.0.0</a>
        </td></tr>
        <tr><td>
            <a id="SpecificationReleaseControl1_rpbReleases_i2_ctl00_specificationsVersionGrid_ctl00_ctl04_lnkFtpDownload"
               title="Click to download this version"
               href="https://www.3gpp.org/ftp/Specs/archive/26_series/26.260/26260-h00.zip">17.0.0</a>
        </td></tr>
    </table>
</div>
</body></html>
"""


def test_fetch_threegpp_metadata_parses_portal_page(monkeypatch: object) -> None:
    """Test that fetch_threegpp_metadata parses portal HTML correctly."""
    response = _FakePortalResponse(_PORTAL_HTML_26260)

    class _FakeSession:
        def get(self, *_args: object, **_kwargs: object) -> _FakeResponse:
        def get(self, *_args: object, **_kwargs: object) -> _FakePortalResponse:
            return response

    def _fake_create_cached_session(*_args: object, **_kwargs: object) -> _FakeSession:
@@ -38,20 +133,39 @@ def test_fetch_threegpp_metadata_uses_redirect(monkeypatch: object) -> None:

    monkeypatch.setattr("tdoc_crawler.specs.sources.threegpp.create_cached_session", _fake_create_cached_session)

    result = fetch_threegpp_metadata("26.132")
    assert result["spec_number"] == "26.132"
    result = fetch_threegpp_metadata("26.260")
    assert result["spec_number"] == "26.260"
    assert result["source_name"] == "3gpp"
    assert result["source_identifier"] == 12345


def test_fetch_threegpp_metadata_raises_when_spec_not_found(monkeypatch: object) -> None:
    """Test that SpecNotFoundError is raised when spec doesn't exist (no specificationId in redirect)."""
    payload = {"title": "Unknown Specification"}
    # Simulate redirect to unknown spec page without specificationId parameter
    response = _FakeResponse("https://portal.3gpp.org/desktopmodules/Specifications/SpecificationDetails.aspx", payload)
    assert "19.1.0" in result["versions"]
    assert "19.0.0" in result["versions"]
    assert "18.1.0" in result["versions"]
    assert "18.0.0" in result["versions"]
    assert "17.0.0" in result["versions"]
    # Sorted descending: latest first
    assert result["versions"][0] == "19.1.0"
    # Metadata extracted from portal
    payload = result["metadata_payload"]
    assert payload["title"] == "Objective test methodologies for the evaluation of immersive audio systems"
    assert payload["spec_type"] == "TS"
    assert payload["status"] == "Under change control"
    assert payload["working_group"] == "SA4"
    assert payload["series"] == "26_series"
    assert payload["latest_version"] == "19.1.0"
    # Filenames matched to versions
    assert isinstance(payload["specfile"], list)
    assert len(payload["specfile"]) == len(result["versions"])
    assert payload["specfile"][0] == "26260-j10.zip"


def test_fetch_threegpp_metadata_parses_tr_type(monkeypatch: object) -> None:
    """Test that Technical Report type is correctly parsed to TR."""
    html = _PORTAL_HTML_26260.replace(
        "Technical specification (TS)", "Technical Report (TR)"
    )
    response = _FakePortalResponse(html)

    class _FakeSession:
        def get(self, *_args: object, **_kwargs: object) -> _FakeResponse:
        def get(self, *_args: object, **_kwargs: object) -> _FakePortalResponse:
            return response

    def _fake_create_cached_session(*_args: object, **_kwargs: object) -> _FakeSession:
@@ -59,18 +173,23 @@ def test_fetch_threegpp_metadata_raises_when_spec_not_found(monkeypatch: object)

    monkeypatch.setattr("tdoc_crawler.specs.sources.threegpp.create_cached_session", _fake_create_cached_session)

    with pytest.raises(SpecNotFoundError, match="not found on 3GPP portal"):
        fetch_threegpp_metadata("99.999")
    result = fetch_threegpp_metadata("26.260")
    assert result["metadata_payload"]["spec_type"] == "TR"


def test_fetch_threegpp_metadata_raises_when_redirect_to_unknown_page(monkeypatch: object) -> None:
    """Test that SpecNotFoundError is raised when redirected to an unknown/invalid page."""
    payload = {}
    # Simulate redirect to a generic page without specificationId
    response = _FakeResponse("https://www.3gpp.org/specifications/unknown", payload)
def test_fetch_threegpp_metadata_raises_when_no_versions(monkeypatch: object) -> None:
    """Test that SpecNotFoundError is raised when portal page has no version links."""
    html = """<html><body>
    <div id="RadPageGeneral">
        <span id="titleVal">Some spec</span>
        <span id="statusVal">Under change control</span>
        <span id="typeVal">Technical specification (TS)</span>
    </div>
    </body></html>"""
    response = _FakePortalResponse(html)

    class _FakeSession:
        def get(self, *_args: object, **_kwargs: object) -> _FakeResponse:
        def get(self, *_args: object, **_kwargs: object) -> _FakePortalResponse:
            return response

    def _fake_create_cached_session(*_args: object, **_kwargs: object) -> _FakeSession:
@@ -79,7 +198,7 @@ def test_fetch_threegpp_metadata_raises_when_redirect_to_unknown_page(monkeypatc
    monkeypatch.setattr("tdoc_crawler.specs.sources.threegpp.create_cached_session", _fake_create_cached_session)

    with pytest.raises(SpecNotFoundError, match="not found on 3GPP portal"):
        fetch_threegpp_metadata("26.999")
        fetch_threegpp_metadata("99.999")


def test_fetch_whatthespec_metadata_parses_json(monkeypatch: object) -> None: