Commit 894a6f0e authored by Jan Reimes's avatar Jan Reimes
Browse files

refactor(domain): redistribute crawler logic into tdocs and meetings packages

parent a968acdc
Loading
Loading
Loading
Loading
+0 −62
Original line number Diff line number Diff line
"""Crawlers for retrieving TDoc and meeting metadata from 3GPP resources."""

from __future__ import annotations

from tdoc_crawler.crawlers.constants import (
    EXCLUDED_DIRS,
    EXCLUDED_DIRS_NORMALIZED,
    MEETING_CODE_REGISTRY,
    TDOC_DOWNLOAD_URL,
    TDOC_PATTERN,
    TDOC_PATTERN_STR,
    TDOC_SUBDIRS,
    TDOC_SUBDIRS_NORMALIZED,
)
from tdoc_crawler.crawlers.hybrid import HybridCrawlResult, HybridTDocCrawler
from tdoc_crawler.crawlers.meeting_doclist import DocumentListError, convert_excel_row_to_tdoc_metadata, fetch_meeting_document_list, parse_excel_document_list
from tdoc_crawler.crawlers.meetings import MeetingCrawler, MeetingCrawlResult, normalize_subgroup_alias, normalize_working_group_alias
from tdoc_crawler.crawlers.parallel import fetch_meeting_tdocs
from tdoc_crawler.crawlers.portal import (
    PortalAuthenticationError,
    PortalClient,
    PortalParsingError,
    PortalSession,
    create_portal_client,
    extract_tdoc_url_from_portal,
    parse_tdoc_portal_page,
)
from tdoc_crawler.crawlers.tdocs import TDocCrawler, TDocCrawlResult
from tdoc_crawler.crawlers.whatthespec import WhatTheSpecResolutionError, resolve_via_whatthespec

__all__ = [
    "EXCLUDED_DIRS",
    "EXCLUDED_DIRS_NORMALIZED",
    "MEETING_CODE_REGISTRY",
    "TDOC_DOWNLOAD_URL",
    "TDOC_PATTERN",
    "TDOC_PATTERN_STR",
    "TDOC_SUBDIRS",
    "TDOC_SUBDIRS_NORMALIZED",
    "DocumentListError",
    "HybridCrawlResult",
    "HybridTDocCrawler",
    "MeetingCrawlResult",
    "MeetingCrawler",
    "PortalAuthenticationError",
    "PortalClient",
    "PortalParsingError",
    "PortalSession",
    "TDocCrawlResult",
    "TDocCrawler",
    "WhatTheSpecResolutionError",
    "convert_excel_row_to_tdoc_metadata",
    "create_portal_client",
    "extract_tdoc_url_from_portal",
    "fetch_meeting_document_list",
    "fetch_meeting_tdocs",
    "normalize_subgroup_alias",
    "normalize_working_group_alias",
    "parse_excel_document_list",
    "parse_tdoc_portal_page",
    "resolve_via_whatthespec",
]
+0 −71
Original line number Diff line number Diff line
"""Shared constants and lookup tables for crawler modules."""

from __future__ import annotations

import re
from typing import Final

TDOC_PATTERN_STR: Final[str] = r"([RSC][1-6P].{4,10})\.(zip|txt|pdf)"
TDOC_PATTERN: Final[re.Pattern[str]] = re.compile(TDOC_PATTERN_STR, re.IGNORECASE)

TDOC_SUBDIRS: Final[tuple[str, ...]] = ("Docs", "Documents", "Tdocs", "TDocs", "DOCS")
TDOC_SUBDIRS_NORMALIZED: Final[frozenset[str]] = frozenset(entry.upper() for entry in TDOC_SUBDIRS)

EXCLUDED_DIRS: Final[tuple[str, ...]] = ("Inbox", "Draft", "Drafts", "Agenda", "Invitation", "Report")
EXCLUDED_DIRS_NORMALIZED: Final[frozenset[str]] = frozenset(entry.upper() for entry in EXCLUDED_DIRS)

MEETINGS_BASE_URL: Final[str] = "https://www.3gpp.org/dynareport?code=Meetings-{code}.htm"
PORTAL_BASE_URL: Final[str] = "https://portal.3gpp.org"
TDOC_VIEW_URL: Final[str] = f"{PORTAL_BASE_URL}/ngppapp/CreateTdoc.Aspx"
TDOC_DOWNLOAD_URL: Final[str] = f"{PORTAL_BASE_URL}/ngppapp/DownloadTDoc.aspx"
LOGIN_URL: Final[str] = f"{PORTAL_BASE_URL}/login.aspx"

SPEC_URL_TEMPLATE: Final[str] = "https://www.3gpp.org/ftp/Specs/archive/{series}/{normalized}/{file_name}"

DATE_PATTERN: Final[re.Pattern[str]] = re.compile(r"(\d{4}[\-\u2010-\u2015]\d{2}[\-\u2010-\u2015]\d{2})")

MEETING_CODE_REGISTRY: Final[dict[str, list[tuple[str, str | None]]]] = {
    "RAN": [
        ("RP", "RP"),
        ("R1", "R1"),
        ("R2", "R2"),
        ("R3", "R3"),
        ("R4", "R4"),
        ("R5", "R5"),
        ("R6", "R6"),
    ],
    "SA": [
        ("SP", "SP"),
        ("S1", "S1"),
        ("S2", "S2"),
        ("S3", "S3"),
        ("S4", "S4"),
        ("S5", "S5"),
        ("S6", "S6"),
    ],
    "CT": [
        ("CP", "CP"),
        ("C1", "C1"),
        ("C2", "C2"),
        ("C3", "C3"),
        ("C4", "C4"),
        ("C5", "C5"),
        ("C6", "C6"),
    ],
}

__all__ = [
    "DATE_PATTERN",
    "EXCLUDED_DIRS",
    "EXCLUDED_DIRS_NORMALIZED",
    "LOGIN_URL",
    "MEETINGS_BASE_URL",
    "MEETING_CODE_REGISTRY",
    "PORTAL_BASE_URL",
    "TDOC_DOWNLOAD_URL",
    "TDOC_PATTERN",
    "TDOC_PATTERN_STR",
    "TDOC_SUBDIRS",
    "TDOC_SUBDIRS_NORMALIZED",
    "TDOC_VIEW_URL",
]
+0 −265
Original line number Diff line number Diff line
"""Parallel crawling utilities for TDoc discovery and document list fetching using subinterpreters."""

from __future__ import annotations

import json
import logging
import re
from collections.abc import Iterable
from datetime import UTC, datetime
from pathlib import Path
from urllib.parse import urljoin

import requests
from bs4 import BeautifulSoup

from tdoc_crawler.crawlers.constants import EXCLUDED_DIRS, EXCLUDED_DIRS_NORMALIZED, TDOC_PATTERN_STR, TDOC_SUBDIRS, TDOC_SUBDIRS_NORMALIZED
from tdoc_crawler.crawlers.meeting_doclist import fetch_meeting_document_list
from tdoc_crawler.http_client import create_cached_session

logger = logging.getLogger(__name__)


def _parse_file_size(text_content: str) -> int | None:
    """Extract file size in bytes from text fragments."""
    size_match = re.search(r"(\d+)\s*([KMG]?)B", text_content, re.IGNORECASE)
    if not size_match:
        return None

    size_str = size_match.group(1)
    unit = size_match.group(2).upper()

    try:
        size = int(size_str)
    except ValueError:
        return None

    if unit == "K":
        size *= 1024
    elif unit == "M":
        size *= 1024 * 1024
    elif unit == "G":
        size *= 1024 * 1024 * 1024
    return size


def fetch_meeting_tdocs(
    meeting_id: int,
    files_url: str,
    meeting_short_name: str,
    timeout: int,
    max_retries: int,
    target_ids: Iterable[str] | None,
    cache_dir_str: str,
    cache_ttl: int,
    cache_refresh_on_access: bool,
) -> tuple[str, ...]:
    """Fetch all TDocs for a meeting directory in a subinterpreter.

    Returns a tuple of JSON-serialized :class:`TDocMetadata` records. JSON is
    used because it is comprised of simple shareable types (str) which can be
    transmitted across the subinterpreter boundary.
    """
    cache_dir = Path(cache_dir_str)
    session = create_cached_session(
        cache_dir=cache_dir,
        ttl=cache_ttl,
        refresh_ttl_on_access=cache_refresh_on_access,
        max_retries=max_retries,
    )
    base_url = files_url if files_url.endswith("/") else f"{files_url}/"
    target_set = {item.upper() for item in target_ids} if target_ids else None
    pattern = re.compile(TDOC_PATTERN_STR, re.IGNORECASE)
    seen_ids: set[str] = set()
    collected: list[str] = []

    try:
        base_html = _fetch_directory(session, base_url, timeout)
        if base_html is None:
            return ()

        subdirectories = _extract_subdirectories(base_url, base_html)
        directories = subdirectories if subdirectories else [base_url]

        for directory in directories:
            directory_html = _fetch_directory(session, directory, timeout)
            if directory_html is None:
                continue

            collected.extend(
                _collect_tdocs_from_html(
                    directory_html,
                    directory,
                    meeting_id,
                    meeting_short_name,
                    pattern,
                    target_set,
                    seen_ids,
                )
            )
    finally:
        session.close()

    return tuple(collected)


def _fetch_directory(session: requests.Session, url: str, timeout: int) -> str | None:
    """Fetch a directory listing and return its HTML content."""
    try:
        response = session.get(url, timeout=timeout)
        response.raise_for_status()
    except Exception as exc:  # pragma: no cover - network failures
        logger.debug("Failed to fetch %s: %s", url, exc)
        return None
    return response.text


def _extract_subdirectories(base_url: str, html: str) -> list[str]:
    """Detect potential TDoc subdirectories (Docs/, Documents/, ...)."""
    soup = BeautifulSoup(html, "html.parser")
    subdirs: list[str] = []
    for link in soup.find_all("a"):
        href = link.get("href")
        if not href or not isinstance(href, str):
            continue
        if href.startswith(("?", "..")):
            continue

        dir_name = href.rstrip("/").split("/")[-1]
        if dir_name.upper() not in TDOC_SUBDIRS_NORMALIZED:
            continue
        subdir_url = href if href.startswith("http") else urljoin(base_url, href)
        if not subdir_url.endswith("/"):
            subdir_url += "/"
        subdirs.append(subdir_url)
    return subdirs


def _collect_tdocs_from_html(
    html: str,
    directory_url: str,
    meeting_id: int,
    meeting_short_name: str,
    pattern: re.Pattern[str],
    target_ids: set[str] | None,
    seen_ids: set[str],
) -> list[str]:
    """Parse a directory HTML listing for TDoc links."""
    soup = BeautifulSoup(html, "html.parser")
    serialized: list[str] = []

    for link in soup.find_all("a"):
        href_value = link.get("href")
        if not href_value or not isinstance(href_value, str):
            continue

        href = href_value.strip()
        if href in ("../", "./", "..", "."):
            continue
        if href.rstrip("/").split("/")[-1].upper() in EXCLUDED_DIRS_NORMALIZED:
            continue

        match = pattern.search(href)
        if not match:
            continue

        tdoc_id = match.group(1).upper()
        if tdoc_id in seen_ids:
            continue
        if target_ids is not None and tdoc_id not in target_ids:
            continue

        file_url = href if href.startswith(("http://", "https://")) else urljoin(directory_url, href)

        file_size = None
        parent = link.parent
        if parent:
            text_content = parent.get_text(" ")
            file_size = _parse_file_size(text_content)

        now = datetime.now(UTC)
        payload = {
            "tdoc_id": tdoc_id,
            "meeting_id": meeting_id,
            "title": "Pending validation",
            "url": file_url,
            "source": "Unknown",
            "contact": "Unknown",
            "tdoc_type": "unknown",
            "for_purpose": "unknown",
            "agenda_item_nbr": "0.0",
            "agenda_item_text": "Unknown",
            "status": None,
            "meeting_name": meeting_short_name or None,
            "is_revision_of": None,
            "file_size": file_size,
            "date_created": None,
            "date_retrieved": now.isoformat().replace("+00:00", "Z"),
            "date_updated": now.isoformat().replace("+00:00", "Z"),
            "validated": False,
            "validation_failed": False,
        }
        serialized.append(json.dumps(payload))
        seen_ids.add(tdoc_id)

    return serialized


def fetch_meeting_document_list_subinterpreter(
    meeting_id: int,
    cache_dir: str,
    cache_ttl: int,
    cache_refresh_on_access: bool,
    timeout: int,
) -> str:
    """Fetch meeting document list in subinterpreter context.

    This function is designed to be called from a subinterpreter worker.

    Args:
        meeting_id: Meeting identifier
        cache_dir: Cache directory path as string
        cache_ttl: HTTP cache TTL in seconds
        cache_refresh_on_access: Whether to refresh cache TTL on access
        timeout: Request timeout in seconds

    Returns:
        tuple[str, ...]: TDoc IDs from the meeting or errors
    """
    try:
        # Convert string path to Path object
        cache_dir_path = Path(cache_dir)

        # Fetch document list for the meeting
        tdoc_metadata_list = fetch_meeting_document_list(
            meeting_id=meeting_id,
            cache_path=cache_dir_path,
            cache_ttl=cache_ttl,
            cache_refresh_on_access=cache_refresh_on_access,
            timeout=timeout,
        )

        # Serialize to JSON for inter-process communication
        serialized = []
        for tdoc_metadata in tdoc_metadata_list:
            serialized.append(tdoc_metadata.model_dump_json())

        return json.dumps(serialized)

    except Exception as exc:
        # Return error information as JSON
        error_data = {
            "error": str(exc),
            "meeting_id": meeting_id,
            "cache_dir": cache_dir,
        }
        return json.dumps({"_error": error_data})


__all__ = [
    "EXCLUDED_DIRS",
    "TDOC_PATTERN_STR",
    "TDOC_SUBDIRS",
    "fetch_meeting_document_list_subinterpreter",
    "fetch_meeting_tdocs",
]
+0 −720

File deleted.

Preview size limit exceeded, changes collapsed.

+0 −301
Original line number Diff line number Diff line
"""TDoc crawler orchestrating parallel workers."""

from __future__ import annotations

import logging
import time
from collections.abc import Callable
from concurrent.futures import Future, as_completed
from dataclasses import dataclass

from pool_executors.pool_executors import create_executor
from tdoc_crawler.crawlers.parallel import fetch_meeting_tdocs
from tdoc_crawler.database import TDocDatabase
from tdoc_crawler.models import CrawlLimits, MeetingMetadata, MeetingQueryConfig, SortOrder, TDocCrawlConfig, TDocMetadata, WorkingGroup

logger = logging.getLogger(__name__)


@dataclass(slots=True, frozen=True)
class TDocCrawlResult:
    """Summary of a TDoc crawl run."""

    processed: int
    inserted: int
    updated: int
    errors: list[str]


class TDocCrawler:
    """Configuration-driven crawler orchestrating subinterpreter workers."""

    def __init__(self, database: TDocDatabase) -> None:
        self.database = database

    def crawl(self, config: TDocCrawlConfig, progress_callback: Callable[[float, float], None] | None = None) -> TDocCrawlResult:
        """Execute a crawl using the provided configuration."""
        meetings = self._get_meetings_to_crawl(config)
        if not meetings:
            logger.warning("No meetings found matching criteria")
            return TDocCrawlResult(processed=0, inserted=0, updated=0, errors=["No meetings found"])

        meetings = self._apply_meeting_limits(meetings, config.limits)
        if not meetings:
            logger.warning("Meetings filtered out by limits configuration")
            return TDocCrawlResult(processed=0, inserted=0, updated=0, errors=["No meetings found after applying limits"])

        existing_ids: set[str] = set()
        if config.incremental:
            existing_ids = {tdoc_id.upper() for tdoc_id in self.database.get_existing_tdoc_ids(config.working_groups)}

        targets = set(config.target_ids) if config.target_ids else None

        collected: list[TDocMetadata] = []
        errors = self._crawl_meetings_parallel(meetings, config, collected, existing_ids, targets)

        inserted = 0
        updated = 0
        if collected:
            inserted, updated = self.database.bulk_upsert_tdocs(collected, progress_callback=progress_callback)

        return TDocCrawlResult(
            processed=len(collected),
            inserted=inserted,
            updated=updated,
            errors=errors,
        )

    def _apply_meeting_limits(self, meetings: list, limits: CrawlLimits) -> list:
        """Apply crawl limits to the meeting list."""
        limited = list(meetings)

        if limits.limit_meetings is not None and limits.limit_meetings != 0:
            limit = limits.limit_meetings
            limited = limited[:limit] if limit > 0 else limited[limit:]

        if limits.limit_meetings_per_wg is not None and limits.limit_meetings_per_wg != 0:
            per_wg_limit = abs(limits.limit_meetings_per_wg)
            per_wg_counts: dict[WorkingGroup, int] = {}
            filtered: list = []
            for meeting in limited:
                working_group = meeting.working_group
                count = per_wg_counts.get(working_group, 0)
                if count >= per_wg_limit:
                    continue
                per_wg_counts[working_group] = count + 1
                filtered.append(meeting)
            limited = filtered

        if limits.limit_wgs is not None and limits.limit_wgs != 0:
            max_groups = abs(limits.limit_wgs)
            seen_groups: set[WorkingGroup] = set()
            filtered: list = []
            for meeting in limited:
                working_group = meeting.working_group
                if working_group in seen_groups:
                    filtered.append(meeting)
                    continue
                if len(seen_groups) >= max_groups:
                    continue
                seen_groups.add(working_group)
                filtered.append(meeting)
            limited = filtered

        return limited

    def _get_meetings_to_crawl(self, config: TDocCrawlConfig) -> list:
        """Query meetings from the database based on configuration filters."""
        meeting_config = MeetingQueryConfig(
            cache_dir=config.cache_dir,
            working_groups=config.working_groups,
            subgroups=config.subgroups,
            limit=config.limits.limit_meetings,
            order=SortOrder.DESC,
            include_without_files=False,
        )

        meetings = self.database.query_meetings(meeting_config)

        if config.meeting_ids:
            meeting_id_set = set(config.meeting_ids)
            meetings = [meeting for meeting in meetings if meeting.meeting_id in meeting_id_set]

        if config.start_date:
            meetings = [meeting for meeting in meetings if meeting.start_date and meeting.start_date >= config.start_date]
        if config.end_date:
            meetings = [meeting for meeting in meetings if meeting.start_date and meeting.start_date <= config.end_date]

        return meetings

    def _filter_meetings_for_parallel_crawl(
        self,
        meetings: list,
        max_tdocs: int | None,
    ) -> list:
        """Filter meetings to respect max_tdocs limit via tdoc_count optimization.

        This method pre-filters meetings to skip those that would cause
        cumulative TDoc count to exceed max_tdocs. This prevents unnecessary
        subinterpreter task submission when limits will not be met.

        Args:
            meetings: List of meetings to filter
            max_tdocs: Maximum TDocs to crawl (None = no limit)

        Returns:
            Filtered list of meetings respecting tdoc_count optimization
        """
        if max_tdocs is None:
            return [m for m in meetings if m.files_url]

        filtered: list = []
        cumulative_tdocs = 0

        for meeting in meetings:
            if not meeting.files_url:
                continue

            meeting_tdocs = meeting.tdoc_count or 0
            if cumulative_tdocs + meeting_tdocs >= max_tdocs:
                continue

            filtered.append(meeting)
            cumulative_tdocs += meeting_tdocs

        return filtered

    def _crawl_meetings_parallel(
        self,
        meetings: list,
        config: TDocCrawlConfig,
        collected: list[TDocMetadata],
        existing_ids: set[str],
        targets: set[str] | None,
    ) -> list[str]:
        """Launch subinterpreter workers to crawl meetings in parallel."""
        errors: list[str] = []
        seen_ids: set[str] = set()
        max_tdocs = None
        if config.limits.limit_tdocs is not None and config.limits.limit_tdocs != 0:
            max_tdocs = abs(config.limits.limit_tdocs)

        # Pre-filter meetings to optimize task submission
        meetings = self._filter_meetings_for_parallel_crawl(meetings, max_tdocs)

        target_tuple = tuple(targets) if targets else None

        # Select executor based on worker count
        if config.workers == 1:
            executor = create_executor("serial")
            executor_type = "serial"
        else:
            # Note: Using multiprocessing by default because numpy/pandas (used in document list parsing)
            # are currently incompatible with subinterpreters in Python 3.14.
            executor = create_executor("multiprocessing", max_workers=config.workers)
            executor_type = "multiprocessing"

        logger.debug("Using %s executor with %d worker(s)", executor_type, config.workers)

        # Track start time for overall timeout enforcement
        start_time = time.monotonic()

        with executor:
            futures: dict[Future[tuple[str, ...]], MeetingMetadata] = {}

            for meeting in meetings:
                base_url = meeting.files_url.rstrip("/") + "/"
                future = executor.submit(
                    fetch_meeting_tdocs,
                    meeting.meeting_id,
                    base_url,
                    meeting.short_name or "",
                    config.timeout,
                    config.max_retries,
                    target_tuple,
                    str(config.cache_dir),
                    config.http_cache.ttl,
                    config.http_cache.refresh_ttl_on_access,
                )
                futures[future] = meeting

            # Process results as they complete
            for future in as_completed(futures):
                meeting = futures[future]
                try:
                    payloads = future.result()
                except Exception as exc:  # pragma: no cover - defensive, depends on network
                    short_name = meeting.short_name or str(meeting.meeting_id)
                    message = f"Meeting {short_name}: {exc}"
                    logger.warning(message)
                    errors.append(message)
                    continue

                limit_reached = self._process_payloads(
                    payloads,
                    collected,
                    seen_ids,
                    existing_ids,
                    config.incremental,
                    targets,
                    max_tdocs,
                )

                # Check if overall timeout exceeded
                elapsed = time.monotonic() - start_time
                timeout_exceeded = config.overall_timeout is not None and elapsed >= config.overall_timeout

                if limit_reached or (targets is not None and not targets) or timeout_exceeded:
                    # Cancel remaining futures
                    for pending_future in futures:
                        if not pending_future.done():
                            pending_future.cancel()
                    if timeout_exceeded:
                        message = f"Overall timeout of {config.overall_timeout}s exceeded after {elapsed:.1f}s"
                        logger.warning(message)
                        errors.append(message)
                    break

        return errors

    def _process_payloads(
        self,
        payloads: tuple[str, ...],
        collected: list[TDocMetadata],
        seen_ids: set[str],
        existing_ids: set[str],
        incremental: bool,
        targets: set[str] | None,
        max_tdocs: int | None,
    ) -> bool:
        """Convert worker payloads into TDocMetadata and apply filters.

        Returns True when a crawl limit (target IDs or limit_tdocs) has been
        reached and processing should stop.
        """
        for json_payload in payloads:
            metadata = TDocMetadata.model_validate_json(json_payload)
            tdoc_id = metadata.tdoc_id

            if tdoc_id in seen_ids:
                continue
            if incremental and tdoc_id in existing_ids:
                continue

            collected.append(metadata)
            seen_ids.add(tdoc_id)

            if targets is not None:
                targets.discard(tdoc_id)
                if not targets:
                    return True

            if max_tdocs is not None and len(collected) >= max_tdocs:
                return True

        return False


__all__ = [
    "TDocCrawlResult",
    "TDocCrawler",
]
Loading