Commit 4f19b797 authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(download): implement download_to_path function for file retrieval

* Add download_to_path function to handle downloading files from URLs.
* Ensure proper error handling for unsupported URL schemes and download failures.
* Update relevant modules to utilize the new download function.
* Refactor existing code to improve clarity and maintainability.
parent 187f46cd
Loading
Loading
Loading
Loading
+63 −141
Original line number Diff line number Diff line
@@ -10,6 +10,7 @@ from __future__ import annotations
import logging
import posixpath
import shutil
import zipfile
from contextlib import suppress
from pathlib import Path
from urllib.parse import urlparse
@@ -17,68 +18,15 @@ from urllib.parse import urlparse
import requests

from tdoc_crawler.models import TDocMetadata
from tdoc_crawler.http_client import download_to_path

logger = logging.getLogger(__name__)


def _sanitize_path_component(component: str) -> str:
    """Sanitize a path component to be valid on all platforms.

    Removes or replaces characters that are invalid in file/directory names:
    - Windows reserved names (CON, PRN, AUX, NUL, COM1-9, LPT1-9)
    - Invalid characters: < > : " | ? *
    - Special sequences like "..." which can cause issues

    Args:
        component: Path component to sanitize

    Returns:
        Sanitized path component
    """
    if not component:
        return "_"

    # Replace problematic sequences
    sanitized = component.replace("...", "_")

    # Windows reserved names (case-insensitive)
    reserved = {
        "con",
        "prn",
        "aux",
        "nul",
        "com1",
        "com2",
        "com3",
        "com4",
        "com5",
        "com6",
        "com7",
        "com8",
        "com9",
        "lpt1",
        "lpt2",
        "lpt3",
        "lpt4",
        "lpt5",
        "lpt6",
        "lpt7",
        "lpt8",
        "lpt9",
    }
    if sanitized.lower() in reserved:
        sanitized = f"_{sanitized}"

    return sanitized


def get_checkout_path(metadata: TDocMetadata, checkout_dir: Path) -> Path:
    """Calculate the checkout path for a TDoc based on its URL.

    The checkout path mirrors the 3GPP server directory structure.
    For example:
    - URL: https://www.3gpp.org/ftp/tsg_sa/SA4/s4-251234.zip
    - Checkout: checkout_dir/tsg_sa/SA4/s4-251234/

    Args:
        metadata: TDoc metadata containing the URL
@@ -86,39 +34,21 @@ def get_checkout_path(metadata: TDocMetadata, checkout_dir: Path) -> Path:

    Returns:
        Path to the checkout directory for this TDoc

    Raises:
        ValueError: If the URL is invalid or contains placeholder patterns
    """
    # Validate URL before processing
    if not metadata.is_valid:
        raise ValueError(f"Invalid or corrupt URL for TDoc {metadata.tdoc_id}: {metadata.url}")
    url_path_str: str = str(urlparse(metadata.url).path)
    url_path_str = url_path_str.lstrip("/")
    path_parts: list[str] = url_path_str.split("/")

    url_path = urlparse(metadata.url).path

    # Normalize the path: remove leading slash and split into components
    # Use str.split('/') to avoid Path treating it as absolute on Windows
    url_path = url_path.lstrip("/")
    path_parts = url_path.split("/")

    # Find the 'ftp' component and take everything after it
    try:
        ftp_index = path_parts.index("ftp")
        relative_parts = path_parts[ftp_index + 1 :]
        relative_parts: list[str] = path_parts[ftp_index + 1 :]
    except ValueError:
        # If 'ftp' not found, use the full path
        relative_parts = path_parts

    # Remove the filename (last component) - we'll use tdoc_id as folder name
    if relative_parts:
        relative_parts = relative_parts[:-1]

    # Sanitize path components to avoid invalid directory names
    sanitized_parts = [_sanitize_path_component(part) for part in relative_parts if part]

    # Build the checkout path: checkout_dir / path / tdoc_id
    checkout_path = checkout_dir.joinpath(*sanitized_parts) / metadata.tdoc_id if sanitized_parts else checkout_dir / metadata.tdoc_id

    checkout_path = checkout_dir.joinpath(*relative_parts) / metadata.tdoc_id if relative_parts else checkout_dir / metadata.tdoc_id
    return checkout_path


@@ -140,92 +70,83 @@ def checkout_tdoc(

    Raises:
        FileNotFoundError: If download fails or zip is empty
        ValueError: If URL scheme is not supported
        zipfile.BadZipFile: If the downloaded file is not a valid zip
    """
    checkout_path = get_checkout_path(metadata, checkout_dir)

    # Check if already checked out
    if checkout_path.exists() and not force:
        logger.debug(f"TDoc {metadata.tdoc_id} already checked out at {checkout_path}")
        return checkout_path

    # Create checkout directory
    checkout_path.mkdir(parents=True, exist_ok=True)
    temp_zip_path = checkout_path / f"{metadata.tdoc_id}.zip"

    cache_dir = checkout_dir.parent if checkout_dir.name == "checkout" else checkout_dir

    if force:
        downloads_dir = cache_dir / "checkout"
        extract_dir = downloads_dir / metadata.tdoc_id
        if extract_dir.exists():
            shutil.rmtree(extract_dir)
        zip_path = downloads_dir / f"{metadata.tdoc_id}.zip"
        with suppress(FileNotFoundError):
            zip_path.unlink()
        filename = posixpath.basename(urlparse(metadata.url).path)
        if filename:
            with suppress(FileNotFoundError):
                (downloads_dir / filename).unlink()

    import importlib  # noqa: PLC0415

    from tdoc_crawler.cli.helpers import prepare_tdoc_file  # noqa: PLC0415
    if metadata.url is None:
        raise ValueError(f"TDoc {metadata.tdoc_id} has no URL")

    cli_helpers = importlib.import_module("tdoc_crawler.cli.helpers")
    original_download = cli_helpers.download_to_path
    try:
        cli_helpers.download_to_path = _download_file
        prepared_path = prepare_tdoc_file(metadata, cache_dir, return_dir=True)
    finally:
        cli_helpers.download_to_path = original_download
    if prepared_path.is_dir():
        if prepared_path != checkout_path:
            shutil.copytree(prepared_path, checkout_path, dirs_exist_ok=True)
            shutil.rmtree(prepared_path)
    else:
        target_path = checkout_path / prepared_path.name
        shutil.copy2(prepared_path, target_path)
        if prepared_path != target_path:
            with suppress(FileNotFoundError):
                prepared_path.unlink()

        download_to_path(metadata.url, temp_zip_path)
        with zipfile.ZipFile(temp_zip_path) as archive:
            archive.extractall(checkout_path)
        logger.info(f"Checked out {metadata.tdoc_id} to {checkout_path}")
    finally:
        if temp_zip_path.exists():
            temp_zip_path.unlink()

    return checkout_path


def _download_file(url: str, destination: Path) -> None:
    """Download a file from URL to destination path.
def prepare_tdoc_file(metadata: TDocMetadata, cache_dir: Path, return_dir: bool = False) -> Path:
    """Prepare TDoc file for opening (download and extract if needed).

    Args:
        url: Source URL
        destination: Destination path
        metadata: TDoc metadata with download URL.
        cache_dir: Cache directory for downloads and extracted files.
        return_dir: When True and TDoc is a zip, return the extract directory.

    Raises:
        ValueError: If URL scheme is not supported
        FileNotFoundError: If download fails
    Returns:
        Path to the downloaded file, or the extract directory when return_dir is True.
    """
    destination.parent.mkdir(parents=True, exist_ok=True)
    # Handle the case where metadata.url is None
    if metadata.url is None:
        raise ValueError(f"Cannot prepare TDoc file for {metadata.tdoc_id}: URL is None")

    # Validate URL scheme
    allowed_schemes = ("ftp://", "http://", "https://")
    lowered = url.lower()
    if not lowered.startswith(allowed_schemes):
        raise ValueError(f"unsupported-url-scheme: {url}")
    downloads_dir = cache_dir / "checkout"
    downloads_dir.mkdir(parents=True, exist_ok=True)
    path = urlparse(metadata.url).path
    filename = str(posixpath.basename(path))
    suffix = Path(filename).suffix.lower()

    if suffix == ".zip":
        extract_dir = downloads_dir / metadata.tdoc_id
        if extract_dir.exists():
            files = sorted(p for p in extract_dir.rglob("*") if p.is_file())
            if files:
                return extract_dir if return_dir else files[0]
            shutil.rmtree(extract_dir)
        zip_path = downloads_dir / f"{metadata.tdoc_id}.zip"
        download_to_path(metadata.url, zip_path)
        try:
            with zipfile.ZipFile(zip_path) as archive:
                archive.extractall(extract_dir)
        finally:
            with suppress(FileNotFoundError):
                zip_path.unlink()
        files = sorted(p for p in extract_dir.rglob("*") if p.is_file())
        if not files:
            raise FileNotFoundError("no-files-in-archive")
        return extract_dir if return_dir else files[0]

    # For non-zip files, download directly
    target_suffix = suffix or ""
    target_name = filename if filename else f"{metadata.tdoc_id}{target_suffix or '.bin'}"
    target_path = downloads_dir / target_name
    if not target_path.exists():
        try:
        response = requests.get(url, timeout=300, stream=True)  # noqa: S113
        response.raise_for_status()
        with destination.open("wb") as target:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    target.write(chunk)
            download_to_path(metadata.url, target_path)
        except requests.exceptions.HTTPError as exc:
            status_code = exc.response.status_code if exc.response is not None else "unknown"
        raise FileNotFoundError(f"failed-to-download ({status_code}): {url}") from exc
    except Exception as exc:
        raise FileNotFoundError(f"failed-to-download: {url}") from exc
            raise FileNotFoundError(f"failed-to-download ({status_code}): {metadata.url}") from exc
    return target_path


def get_checked_out_tdocs(checkout_dir: Path) -> list[str]:
@@ -256,4 +177,5 @@ __all__ = [
    "checkout_tdoc",
    "get_checked_out_tdocs",
    "get_checkout_path",
    "prepare_tdoc_file",
]
+3 −1
Original line number Diff line number Diff line
@@ -66,7 +66,7 @@ from .args import (
)
from .console import get_console
from .fetching import maybe_fetch_missing_tdocs
from .helpers import build_limits, collect_spec_numbers, database_path, launch_file, parse_subgroups, parse_working_groups, prepare_tdoc_file
from .helpers import build_limits, collect_spec_numbers, launch_file, parse_subgroups, parse_working_groups
from .printing import (
    meeting_to_dict,
    print_checkout_results,
@@ -78,6 +78,8 @@ from .printing import (
    spec_query_to_dict,
    tdoc_to_dict,
)
from tdoc_crawler.checkout import prepare_tdoc_file
from tdoc_crawler.database import database_path

load_dotenv()

+1 −66
Original line number Diff line number Diff line
@@ -9,7 +9,7 @@ from pydantic import ValidationError

from tdoc_crawler.cli.console import get_console
from tdoc_crawler.cli.helpers import resolve_meeting_id
from tdoc_crawler.crawlers import TDocCrawlResult, WhatTheSpecResolutionError, extract_tdoc_url_from_portal, fetch_tdoc_metadata, resolve_via_whatthespec
from tdoc_crawler.crawlers import TDocCrawlResult, WhatTheSpecResolutionError, fetch_tdoc_metadata, resolve_via_whatthespec
from tdoc_crawler.credentials import resolve_credentials
from tdoc_crawler.database import TDocDatabase
from tdoc_crawler.models import HttpCacheConfig, PortalCredentials, QueryConfig, TDocMetadata
@@ -18,71 +18,6 @@ console = get_console()
_logger = logging.getLogger(__name__)


def fetch_tdoc(
    tdoc_id: str,
    cache_dir: Path,
    http_cache: HttpCacheConfig,
    full_metadata: bool = False,
    use_whatthespec: bool = False,
    credentials: PortalCredentials | None = None,
    timeout: int = 30,
) -> TDocMetadata:
    """Fetch TDoc using the appropriate method based on flags.

    Args:
        tdoc_id: TDoc identifier (e.g., "S4-260001").
        cache_dir: Directory for HTTP cache storage.
        http_cache: HTTP cache configuration.
        full_metadata: If True, fetch full metadata (requires credentials for portal method).
        use_whatthespec: If True, always use WhatTheSpec method regardless of full_metadata.
        credentials: Portal credentials (required for authenticated portal method).
        timeout: Request timeout in seconds.

    Returns:
        TDocMetadata with available information.

    Raises:
        Exception: If fetching fails for any reason.
    """
    if use_whatthespec:
        # Always use WhatTheSpec method (Method 3)
        _logger.debug(f"Fetching {tdoc_id} via WhatTheSpec API")
        return resolve_via_whatthespec(tdoc_id, cache_dir, http_cache, timeout)

    elif full_metadata:
        # Use authenticated portal method (Method 2)
        if credentials is None:
            raise ValueError("Portal credentials required for full metadata fetching")
        _logger.debug(f"Fetching {tdoc_id} via authenticated 3GPP portal")
        return fetch_tdoc_metadata(tdoc_id, credentials, cache_dir, http_cache.ttl, http_cache.refresh_ttl_on_access, timeout)

    else:
        # Use unauthenticated portal method (Method 1) - URL only
        _logger.debug(f"Fetching {tdoc_id} via unauthenticated 3GPP portal")
        # Extract URL and create minimal TDocMetadata
        url = extract_tdoc_url_from_portal(tdoc_id, timeout=min(timeout, 15))
        return TDocMetadata(
            tdoc_id=tdoc_id,
            url=url,
            title="",
            meeting_id=0,
            source="",
            contact="",
            agenda_item_nbr=0,
            date=None,
            revision_of="",
            technical_committee="",
            working_group="",
            type="",
            status="",
            referenced_documents=[],
            filename="",
            size=0,
            validated=False,
            validation_failed=False,
        )


def fetch_missing_tdocs(
    database: TDocDatabase,
    cache_dir: Path,
+1 −93
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ import typer

from tdoc_crawler.crawlers import normalize_subgroup_alias, normalize_working_group_alias
from tdoc_crawler.database import TDocDatabase
from tdoc_crawler.http_client import download_to_path
from tdoc_crawler.models import CrawlLimits, HttpCacheConfig, MeetingQueryConfig, SortOrder, TDocMetadata, WorkingGroup

from .console import get_console
@@ -162,12 +163,6 @@ def build_limits(
    )


def database_path(cache_dir: Path) -> Path:
    """Get database path from cache directory."""
    cache_dir.mkdir(parents=True, exist_ok=True)
    return cache_dir / DEFAULT_DATABASE_FILENAME


def infer_working_groups_from_ids(ids: Iterable[str]) -> list[WorkingGroup]:
    """Infer working groups from TDoc IDs based on first character."""
    mapping = {
@@ -186,38 +181,6 @@ def infer_working_groups_from_ids(ids: Iterable[str]) -> list[WorkingGroup]:
    return resolved or [WorkingGroup.RAN, WorkingGroup.SA, WorkingGroup.CT]


def normalize_portal_meeting_name(portal_meeting: str) -> str:
    """Normalize portal meeting name to database format.

    The portal uses format like "SA4#133-e" while the database uses "S4-133-e".
    This function converts portal format to database format.

    Args:
        portal_meeting: Meeting name from portal (e.g., "SA4#133-e")

    Returns:
        Normalized meeting name (e.g., "S4-133-e")
    """
    # Replace "SA4#" with "S4-", "RAN1#" with "R1-", etc.
    normalized = portal_meeting.replace("#", "-")

    # Handle full working group names (SA, RAN, CT)
    for full_name, short_prefix in [("SA", "S"), ("RAN", "R"), ("CT", "C")]:
        # Match patterns like "SA4-" and replace with "S4-"
        if normalized.startswith(f"{full_name}"):
            # Extract the subgroup number if present
            for i, char in enumerate(normalized[len(full_name) :]):
                if not char.isdigit():
                    subgroup_num = normalized[len(full_name) : len(full_name) + i] if i > 0 else ""
                    rest = normalized[len(full_name) + i :]
                    if subgroup_num:
                        normalized = f"{short_prefix}{subgroup_num}{rest}"
                    break
            break

    return normalized


def resolve_meeting_id(database: TDocDatabase, meeting_name: str) -> int | None:
    """Resolve meeting name to meeting_id from database.

@@ -275,10 +238,6 @@ def resolve_meeting_id(database: TDocDatabase, meeting_name: str) -> int | None:

    return None


def download_to_path(url: str, destination: Path) -> None:
    """Download a file from URL to destination path."""
    destination.parent.mkdir(parents=True, exist_ok=True)
    lowered = url.lower()
    if not lowered.startswith(ALLOWED_DOWNLOAD_SCHEMES):
        raise ValueError("unsupported-url-scheme")
@@ -303,57 +262,6 @@ def download_to_path(url: str, destination: Path) -> None:
        target.write(response.content)


def prepare_tdoc_file(metadata: TDocMetadata, cache_dir: Path, return_dir: bool = False) -> Path:
    """Prepare TDoc file for opening (download and extract if needed).

    Args:
        metadata: TDoc metadata with download URL.
        cache_dir: Cache directory for downloads and extracted files.
        return_dir: When True and the TDoc is a zip, return the extract directory.

    Returns:
        Path to the downloaded file, or the extract directory when return_dir is True.
    """
    # Handle the case where metadata.url is None
    if metadata.url is None:
        raise ValueError(f"Cannot prepare TDoc file for {metadata.tdoc_id}: URL is None")

    downloads_dir = cache_dir / "checkout"
    downloads_dir.mkdir(parents=True, exist_ok=True)
    path = urlparse(metadata.url).path
    filename = str(posixpath.basename(path))

    suffix = Path(filename).suffix.lower()

    if suffix == ".zip":
        extract_dir = downloads_dir / metadata.tdoc_id
        if extract_dir.exists():
            files = sorted(p for p in extract_dir.rglob("*") if p.is_file())
            if files:
                return extract_dir if return_dir else files[0]
            shutil.rmtree(extract_dir)
        zip_path = downloads_dir / f"{metadata.tdoc_id}.zip"
        download_to_path(metadata.url, zip_path)
        try:
            with zipfile.ZipFile(zip_path) as archive:
                archive.extractall(extract_dir)
        finally:
            with suppress(FileNotFoundError):
                zip_path.unlink()
        files = sorted(p for p in extract_dir.rglob("*") if p.is_file())
        if not files:
            raise FileNotFoundError("no-files-in-archive")
        return extract_dir if return_dir else files[0]

    # For non-zip files, download directly
    target_suffix = suffix or ""
    target_name = filename if filename else f"{metadata.tdoc_id}{target_suffix or '.bin'}"
    target_path = downloads_dir / target_name
    if not target_path.exists():
        download_to_path(metadata.url, target_path)
    return target_path


def launch_file(path: Path) -> None:
    """Launch file in system's default application."""
    if not path.exists():
+3 −3
Original line number Diff line number Diff line
@@ -3,13 +3,13 @@
from __future__ import annotations

import logging
from decimal import Decimal, InvalidOperation
from pathlib import Path

from decimal import Decimal, InvalidOperation

import requests

from tdoc_crawler.cli.helpers import database_path, resolve_meeting_id
from tdoc_crawler.database import TDocDatabase
from tdoc_crawler.database import resolve_meeting_id, TDocDatabase
from tdoc_crawler.http_client import create_cached_session
from tdoc_crawler.models.base import HttpCacheConfig
from tdoc_crawler.models.tdocs import TDocMetadata
Loading