Commit 484a7090 authored by Jan Reimes's avatar Jan Reimes
Browse files

refactor(extraction): add spec support, retries, security validation, remove TYPE_CHECKING

parent e5157cbd
Loading
Loading
Loading
Loading
+0 −2
Original line number Diff line number Diff line
@@ -28,7 +28,6 @@ from tdoc_crawler.extraction.metrics import (
    timed_operation,
)
from tdoc_crawler.extraction.profiles import DEFAULT_EXTRACTION_PROFILE, ExtractionProfile
from tdoc_crawler.extraction.workspace_utils import check_pdf_status

__all__ = [
    "DEFAULT_EXTRACTION_PROFILE",
@@ -42,7 +41,6 @@ __all__ = [
    "OpendataloaderConfig",
    "TDocFiles",
    "TimedOperationResult",
    "check_pdf_status",
    "convert_document_to_markdown",
    "convert_tdoc_metadata",
    "convert_tdoc_to_markdown",
+6 −9
Original line number Diff line number Diff line
"""Workspace item checkout and registration logic.

Handles the checkout phase of workspace item management:
downloading/fetching documents, setting up .ai folders, and creating
workspace member records.
downloading/fetching documents and creating workspace member records.

This module is intentionally free of document processing logic
(PDF conversion, markdown extraction, VLM). That responsibility
@@ -39,8 +38,8 @@ async def checkout_single_item(
) -> tuple[WorkspaceMember | None, str | None]:
    """Checkout a single workspace item and create a member record.

    Downloads the document if checkout is enabled, sets up the .ai subfolder,
    and returns a WorkspaceMember ready for registration.
    Downloads the document if checkout is enabled and returns a
    WorkspaceMember ready for registration.

    Args:
        item: Item ID to checkout (TDoc ID, spec number, or path).
@@ -64,7 +63,6 @@ async def checkout_single_item(
            checkout_path = await checkout_tdoc_to_workspace(
                item,
                path_config.checkout_dir,
                workspace,
                db_file=path_config.db_file,
            )
            if checkout_path is None:
@@ -74,7 +72,6 @@ async def checkout_single_item(
            checkout_path = await checkout_spec_to_workspace(
                item,
                path_config.checkout_dir,
                workspace,
                release or "latest",
                db_file=path_config.db_file,
            )
@@ -84,10 +81,10 @@ async def checkout_single_item(
        if checkout_path is not None:
            source_path = str(checkout_path)

    # Resolve release for spec member IDs
    # Resolve release for spec member IDs (always resolve for specs)
    resolved_release: str | None = None
    if source_kind == SourceKind.SPEC and release:
        resolved_release, _ = await resolve_spec_release_from_db(item, release)
    if source_kind == SourceKind.SPEC:
        resolved_release, _ = await resolve_spec_release_from_db(item, release or "latest")

    source_item_id = f"{item}-REL{normalize_release_version(resolved_release)}" if resolved_release else item
    member = make_workspace_member(source_item_id, source_path, source_kind)
+15 −32
Original line number Diff line number Diff line
@@ -7,6 +7,7 @@ supporting both local LibreOffice conversion and remote API fallback.
from __future__ import annotations

import os
import shutil
import tempfile
from dataclasses import dataclass
from enum import Enum
@@ -18,6 +19,7 @@ from convert_lo.converter import Converter

from tdoc_crawler.logging import get_logger
from tdoc_crawler.models.workspaces import ConversionError
from tdoc_crawler.utils.security import validate_api_url

logger = get_logger(__name__)

@@ -67,25 +69,9 @@ def is_office_format(source_file: Path) -> bool:
    return source_file.suffix.lower() in OFFICE_FORMATS


def get_cached_pdf_path(source_file: Path) -> Path | None:
    """Get the path to a cached PDF conversion if it exists.

    The cached PDF is stored in a `.ai` subdirectory next to the original file.

    Args:
        source_file: Path to the original Office document.

    Returns:
        Path to cached PDF if it exists, None otherwise.
    """
    ai_dir = source_file.parent / ".ai"
    cached_pdf = ai_dir / f"{source_file.stem}.pdf"
    return cached_pdf if cached_pdf.exists() else None


def convert_to_pdf(
    source_file: Path,
    output_dir: Path | None = None,
    output_dir: Path,
    *,
    force: bool = False,
    config: ConverterConfig | None = None,
@@ -93,14 +79,12 @@ def convert_to_pdf(
    """Convert an Office document to PDF.

    This is the main entry point for PDF conversion. It handles:
    - Checking for cached conversions
    - Selecting the appropriate backend (LibreOffice or remote)
    - Fallback from local to remote on failure (when using AUTO backend)

    Args:
        source_file: Path to the Office document (DOCX, DOC, PPT, etc.)
        output_dir: Optional output directory for the PDF. If None, uses
            the `.ai` subdirectory next to the source file.
        output_dir: Output directory for the PDF.
        force: If True, re-convert even if a cached PDF exists.
        config: Optional converter configuration. If None, uses defaults
            from environment variables.
@@ -113,13 +97,14 @@ def convert_to_pdf(
        FileNotFoundError: If the input file does not exist.
    """
    if not source_file.exists():
        raise FileNotFoundError(f"Input file not found: {source_file}")
        msg = f"Input file not found: {source_file}"
        raise FileNotFoundError(msg)

    if not is_office_format(source_file):
        raise ConversionError(f"Unsupported file format: {source_file.suffix}")
        msg = f"Unsupported file format: {source_file.suffix}"
        raise ConversionError(msg)

    config = config or ConverterConfig.from_env()
    output_dir = output_dir or source_file.parent / ".ai"
    output_file = output_dir / f"{source_file.stem}.pdf"

    # Check for cached conversion
@@ -140,11 +125,12 @@ def convert_to_pdf(
    # AUTO: Try LibreOffice first, fallback to remote
    try:
        return convert_via_libreoffice(source_file, output_dir)
    except Exception as e:
    except ConversionError as e:
        logger.warning("LibreOffice conversion failed for %s: %s", source_file.name, e)
        logger.info("Falling back to remote converter for %s", source_file.name)
        return convert_via_remote(source_file, output_dir, config)


def ensure_pdf(
    source_file: Path,
    output_dir: Path,
@@ -166,8 +152,6 @@ def ensure_pdf(
    Returns:
        Path to the PDF file.
    """
    import shutil

    pdf_path = output_dir / f"{source_file.stem}.pdf"

    if pdf_path.exists() and not force:
@@ -182,8 +166,6 @@ def ensure_pdf(
    return pdf_path




def convert_via_libreoffice(
    source_file: Path,
    output_dir: Path,
@@ -211,7 +193,8 @@ def convert_via_libreoffice(
            result = converter.convert(source_file, LibreOfficeFormat.PDF, Path(tmpdir))

            if result is None or result.output_file is None:
                raise ConversionError(f"LibreOffice returned empty result for {source_file.name}")
                msg = f"LibreOffice returned empty result for {source_file.name}"
                raise ConversionError(msg)

            # Copy the converted PDF to the output directory
            output_file.write_bytes(result.output_file.read_bytes())
@@ -223,7 +206,7 @@ def convert_via_libreoffice(
        raise
    except Exception as e:
        msg = f"LibreOffice conversion failed for {source_file.name}: {e}"
        logger.error(msg)
        logger.exception(msg)
        raise ConversionError(msg) from e


@@ -238,6 +221,7 @@ def convert_via_remote(
    when local LibreOffice is not available or fails.
    """
    config = config or ConverterConfig.from_env()
    validate_api_url(config.api_base)
    output_file = output_dir / f"{source_file.stem}.pdf"

    try:
@@ -262,7 +246,7 @@ def convert_via_remote(

    except requests.RequestException as e:
        msg = f"Remote conversion failed for {source_file.name}: {e}"
        logger.error(msg)
        logger.exception(msg)
        raise ConversionError(msg) from e


@@ -273,6 +257,5 @@ __all__ = [
    "convert_to_pdf",
    "convert_via_libreoffice",
    "convert_via_remote",
    "get_cached_pdf_path",
    "is_office_format",
]
+162 −56
Original line number Diff line number Diff line
@@ -8,22 +8,27 @@ from __future__ import annotations

import json
import logging
import re
import time
from pathlib import Path
from typing import Any

import opendataloader_pdf
from rich.console import Console

from tdoc_crawler.config import resolve_cache_manager
from tdoc_crawler.config.settings import PathConfig
from tdoc_crawler.extraction.conversion import ensure_pdf
from tdoc_crawler.extraction.fetch_tdoc import fetch_tdoc_files
from tdoc_crawler.extraction.fetch_tdoc import fetch_spec_files, fetch_tdoc_files
from tdoc_crawler.extraction.metrics import MetricType, get_metrics_tracker, timed_operation
from tdoc_crawler.extraction.profiles import DEFAULT_EXTRACTION_PROFILE, ExtractionProfile
from tdoc_crawler.models.workspaces import SourceKind
from tdoc_crawler.tdocs.sources.whatthespec import resolve_via_whatthespec
from tdoc_crawler.utils.async_helpers import run_async
from tdoc_crawler.utils.normalization import normalize_tdoc_id
from tdoc_crawler.workspaces import (
    checkout_spec_to_workspace,
)

logger = logging.getLogger(__name__)
console = Console()

__all__ = [
    "ExtractedEquation",
@@ -40,7 +45,6 @@ __all__ = [

class ConversionError(Exception):
    """Raised when document conversion fails."""
    pass


class OpendataloaderConfig:
@@ -94,10 +98,8 @@ def _extract_tables_from_opendataloader_json(json_path: Path) -> list[ExtractedT
    try:
        data = json.loads(json_path.read_text(encoding="utf-8"))
        items = data if isinstance(data, list) else data.get("results", [])
        for item in items:
            if isinstance(item, dict) and item.get("type") == "table":
                tables.append(item)
    except Exception as e:
        tables.extend(item for item in items if isinstance(item, dict) and item.get("type") == "table")
    except (OSError, json.JSONDecodeError) as e:
        logger.warning("Failed to extract tables from JSON: %s", e)
    return tables

@@ -108,10 +110,8 @@ def _extract_figures_from_opendataloader_json(json_path: Path) -> list[Extracted
    try:
        data = json.loads(json_path.read_text(encoding="utf-8"))
        items = data if isinstance(data, list) else data.get("results", [])
        for item in items:
            if isinstance(item, dict) and item.get("type") in ("image", "figure"):
                figures.append(item)
    except Exception as e:
        figures.extend(item for item in items if isinstance(item, dict) and item.get("type") in ("image", "figure"))
    except (OSError, json.JSONDecodeError) as e:
        logger.warning("Failed to extract figures from JSON: %s", e)
    return figures

@@ -122,57 +122,65 @@ def _extract_formulas_from_opendataloader_json(json_path: Path) -> list[Extracte
    try:
        data = json.loads(json_path.read_text(encoding="utf-8"))
        items = data if isinstance(data, list) else data.get("results", [])
        for item in items:
            if isinstance(item, dict) and item.get("type") == "formula":
                equations.append(item)
    except Exception as e:
        equations.extend(item for item in items if isinstance(item, dict) and item.get("type") == "formula")
    except (OSError, json.JSONDecodeError) as e:
        logger.warning("Failed to extract formulas from JSON: %s", e)
    return equations


def _ensure_converted(
    document_id: str,
    output_dir: Path,
    *,
    primary_path: Path | None = None,
    force: bool = False,
    config: OpendataloaderConfig | None = None,
    source_pdf: Path | None = None,
) -> tuple[str, Path, str]:
    """Fetch TDoc and ensure markdown conversion exists.
    """Ensure markdown conversion exists for a document.

    Handles file fetching, cache checking, and OpenDataLoader conversion.
    Handles cache checking and OpenDataLoader conversion.
    Writes only raw content to cache (no metadata) for consistency across callers.

    Args:
        document_id: TDoc identifier (e.g., "S4-260001")
        document_id: Document identifier (e.g., "S4-260001")
        output_dir: Directory for output files (.md, .json).
        primary_path: Optional explicit path to the source document.
            If not provided, it will be fetched as a TDoc.
        force: Force reconversion even if cached
        config: Optional OpenDataLoader configuration
        source_pdf: If provided, opendataloader processes this PDF instead of the
            original DOCX. This ensures all profiles use the same PDF that was
            original document. This ensures all profiles use the same PDF that was
            generated for the wiki dir.

    Returns:
        Tuple of (raw_markdown, json_path, normalized_id).
        json_path may not exist on disk ÔÇö caller must check.
        json_path may not exist on disk  caller must check.

    Raises:
        ConversionError: If no document files found or conversion fails.
    """
    normalized_id = normalize_tdoc_id(document_id)
    normalized_id = normalize_tdoc_id(document_id) or document_id

    if primary_path is None:
        tdoc_files = fetch_tdoc_files(normalized_id, force_download=force)
        primary = tdoc_files.primary_path
    else:
        primary = primary_path

    if primary is None:
        raise ConversionError(f"No document files found for {normalized_id}")
        msg = f"No document files found for {document_id}"
        raise ConversionError(msg)

    ai_dir = _get_ai_directory(primary)
    md_file = ai_dir / f"{primary.stem}.md"
    json_file = ai_dir / f"{primary.stem}.json"
    md_file = output_dir / f"{primary.stem}.md"
    json_file = output_dir / f"{primary.stem}.json"

    if md_file.exists() and not force:
        markdown_content = md_file.read_text(encoding="utf-8")
    else:
        # Use the provided PDF (from wiki dir) or fall back to original source
        input_file = source_pdf if source_pdf is not None and source_pdf.exists() else primary
        markdown_content, _ = _run_opendataloader(input_file, ai_dir, config=config)
        markdown_content, _ = _run_opendataloader(input_file, output_dir, config=config)
        md_file.write_text(markdown_content, encoding="utf-8")

    return markdown_content, json_file, normalized_id
@@ -182,6 +190,8 @@ def convert_for_wiki(
    document_id: str,
    wiki_source_dir: Path,
    *,
    source_kind: SourceKind = SourceKind.TDOC,
    source_path: str | Path | None = None,
    profile: ExtractionProfile | None = None,
    force: bool = False,
) -> Path | None:
@@ -192,8 +202,10 @@ def convert_for_wiki(
    consistency: the source_pdf in JSON points to the actual PDF that was analyzed.

    Args:
        document_id: Document identifier (TDoc ID).
        document_id: Document identifier (TDoc ID or spec number).
        wiki_source_dir: Target directory under wiki/<workspace>/sources/<doc-id>/.
        source_kind: Kind of source (TDOC, SPEC, etc.).
        source_path: Optional explicit path to the source document/directory.
        profile: Extraction profile to use. Defaults to DEFAULT_EXTRACTION_PROFILE.
        force: Force reconversion.

@@ -206,16 +218,46 @@ def convert_for_wiki(

    wiki_source_dir.mkdir(parents=True, exist_ok=True)

    # TDoc pipeline
    # Resolve files based on kind
    if source_kind == SourceKind.SPEC:
        # For specs, source_path should be the checkout directory.
        # If the path doesn't exist, auto-checkout the spec first.
        # The document_id may contain a -REL suffix (e.g., "21905-REL19.0.0").
        spec_rel_match = re.match(r"^(\d+(?:\.\d+)?)(?:-REL(.+))?$", document_id)
        spec_number = spec_rel_match.group(1) if spec_rel_match else document_id
        spec_release = spec_rel_match.group(2) if spec_rel_match and spec_rel_match.group(2) else "latest"

        actual_path: Path | None = None
        if source_path and Path(str(source_path)).exists():
            actual_path = Path(str(source_path))
        else:
            # Spec not checked out yet — check it out now
            checkout_base = PathConfig().checkout_dir
            actual_path = run_async(
                checkout_spec_to_workspace(
                    spec_number,
                    checkout_base,
                    release=spec_release,
                ),
            )

        if actual_path is None:
            msg = f"Spec {document_id} could not be checked out or found on disk"
            raise ConversionError(msg)

        doc_files = fetch_spec_files(actual_path)
    else:
        # TDoc pipeline (default)
        normalized_id = normalize_tdoc_id(document_id)
    tdoc_files = fetch_tdoc_files(normalized_id, force_download=force)
    primary = tdoc_files.primary_path
        doc_files = fetch_tdoc_files(normalized_id, force_download=force)

    primary = doc_files.primary_path
    if primary is None:
        raise ConversionError(f"No document files found for {normalized_id}")
        msg = f"No document files found for {document_id}"
        raise ConversionError(msg)

    if profile == ExtractionProfile.PDF_ONLY:
        pdf_path = ensure_pdf(primary, wiki_source_dir, force=force)
        return pdf_path
        return ensure_pdf(primary, wiki_source_dir, force=force)

    # default or advanced: generate wiki PDF first, then opendataloader processes it
    pdf_path = ensure_pdf(primary, wiki_source_dir, force=force)
@@ -225,7 +267,12 @@ def convert_for_wiki(
        hybrid_mode="full" if profile == ExtractionProfile.ADVANCED else None,
    )
    markdown_content, json_path, _ = _ensure_converted(
        document_id, force=force, config=config, source_pdf=pdf_path
        document_id,
        wiki_source_dir,
        primary_path=primary,
        force=force,
        config=config,
        source_pdf=pdf_path,
    )

    # Write markdown to wiki source dir
@@ -248,7 +295,7 @@ def _add_source_pdf_to_json(json_path: Path, pdf_path: Path) -> None:
        elif isinstance(data, list) and len(data) > 0:
            data[0]["source_pdf"] = str(pdf_path)
        json_path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
    except Exception as e:
    except (OSError, json.JSONDecodeError, KeyError, IndexError) as e:
        logger.warning("Failed to add source_pdf to JSON: %s", e)


@@ -279,7 +326,23 @@ def convert_document_to_markdown(
        ExtractionResult dict with 'content' (markdown) and optional 'tables', 'figures', 'equations'
    """
    with timed_operation(get_metrics_tracker(), normalize_tdoc_id(document_id), MetricType.CONVERSION):
        markdown_content, json_file, normalized_id = _ensure_converted(document_id, force, config=config)
        # For standalone conversion, we use the default AI directory next to the source
        # We need to find the primary path first
        normalized_id = normalize_tdoc_id(document_id)
        tdoc_files = fetch_tdoc_files(normalized_id, force_download=force)
        primary = tdoc_files.primary_path
        if primary is None:
            msg = f"No document files found for {normalized_id}"
            raise ConversionError(msg)

        ai_dir = _get_ai_directory(primary)
        markdown_content, json_file, normalized_id = _ensure_converted(
            document_id,
            ai_dir,
            primary_path=primary,
            force=force,
            config=config,
        )

        tables: list[dict[str, Any]] = []
        figures: list[dict[str, Any]] = []
@@ -312,6 +375,18 @@ def convert_tdoc_metadata(
    return convert_document_to_markdown(document_id, force=force, config=config)


def _is_valid_pdf(file_path: Path) -> bool:
    """Check if a file is a valid PDF by reading its magic bytes."""
    if not file_path.exists() or file_path.stat().st_size == 0:
        return False
    try:
        with file_path.open("rb") as f:
            header = f.read(8)
        return header.startswith(b"%PDF-")
    except OSError:
        return False


def _get_ai_directory(source_file: Path) -> Path:
    """Get the .ai directory adjacent to the source file."""
    return source_file.parent / ".ai"
@@ -334,11 +409,27 @@ def _run_opendataloader(
        Tuple of (markdown_content, json_file).

    Raises:
        ConversionError: If conversion fails.
        ConversionError: If conversion fails or input is not a valid PDF.
    """
    # Validate input is a proper PDF before attempting conversion
    if not _is_valid_pdf(input_file):
        msg = (
            f"Input file is not a valid PDF: {input_file.name} "
            f"(size={input_file.stat().st_size if input_file.exists() else 'N/A'}, "
            f"exists={input_file.exists()})"
        )
        raise ConversionError(
            msg,
        )

    config = config or OpendataloaderConfig()
    formats = "markdown,json,markdown-with-images"

    max_retries = 3
    retry_delays = [1, 3]
    last_exc: Exception | None = None

    for attempt in range(max_retries):
        try:
            opendataloader_pdf.convert(
                input_path=[str(input_file)],
@@ -347,10 +438,28 @@ def _run_opendataloader(
                quiet=True,
                **config.to_convert_kwargs(),
            )
            break
        except (OSError, ConnectionError) as e:
            last_exc = e
            if attempt < max_retries - 1:
                delay = retry_delays[attempt]
                logger.warning(
                    "OpenDataLoader attempt %d/%d failed for %s, retrying in %ds: %s",
                    attempt + 1,
                    max_retries,
                    input_file.name,
                    delay,
                    e,
                )
                time.sleep(delay)
        except Exception as e:
            msg = f"OpenDataLoader conversion failed for {input_file.name}: {e}"
        logger.error(msg)
            logger.exception(msg)
            raise ConversionError(msg) from e
    else:
        msg = f"OpenDataLoader conversion failed after {max_retries} attempts for {input_file.name}: {last_exc}"
        logger.error(msg)
        raise ConversionError(msg) from last_exc

    stem = input_file.stem
    md_file = output_dir / f"{stem}.md"
@@ -382,6 +491,3 @@ def extract_document_structured_from_tdoc(

# Public alias for convert operation.
convert_tdoc_to_markdown = convert_document_to_markdown


+62 −7
Original line number Diff line number Diff line
@@ -5,11 +5,16 @@ from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path

from tdoc_crawler.config import resolve_cache_manager
from tdoc_crawler.config.settings import PathConfig
from tdoc_crawler.database.oxyde_models import TDocMetadata as TDocRecord
from tdoc_crawler.database.tdocs import TDocDatabase
from tdoc_crawler.http_client import create_cached_session
from tdoc_crawler.models.workspaces import TDocNotFoundError
from tdoc_crawler.tdocs.operations.checkout import checkout_tdoc, get_checkout_path
from tdoc_crawler.tdocs.sources.whatthespec import resolve_via_whatthespec
from tdoc_crawler.utils.async_helpers import run_async
from tdoc_crawler.utils.normalization import normalize_tdoc_id


@dataclass
@@ -31,10 +36,11 @@ def fetch_tdoc_files(document_id: str, force_download: bool = False) -> TDocFile
    """Fetch TDoc files from checkout or download from 3GPP FTP.

    Pipeline:
    1. Resolve TDoc ID to metadata via WhatTheSpec
    2. Calculate checkout path
    3. If not in checkout, download via checkout_tdoc
    4. Find available file types in checkout directory
    1. Check local database for metadata
    2. If not found, resolve TDoc ID to metadata via WhatTheSpec and save to DB
    3. Calculate checkout path
    4. If not in checkout, download via checkout_tdoc
    5. Find available file types in checkout directory

    Args:
        document_id: TDoc identifier (e.g., "S4-260001")
@@ -46,9 +52,40 @@ def fetch_tdoc_files(document_id: str, force_download: bool = False) -> TDocFile
    Raises:
        TDocNotFoundError: If TDoc cannot be found or downloaded
    """
    metadata = resolve_via_whatthespec(document_id)
    normalized_id = normalize_tdoc_id(document_id)

    async def _resolve_metadata() -> TDocRecord | None:
        manager = resolve_cache_manager()
        async with TDocDatabase(manager.db_file) as db:
            # 1. Check database
            record = await db._get_tdoc(normalized_id)
            if record:
                return record

            # 2. Resolve via WhatTheSpec
            metadata = resolve_via_whatthespec(normalized_id)
            if metadata:
                # Convert Pydantic to Oxyde record
                new_record = TDocRecord(
                    tdoc_id=metadata.tdoc_id,
                    meeting_id=metadata.meeting_id,
                    title=metadata.title,
                    url=metadata.url,
                    source=metadata.source,
                    agenda_item_nbr=metadata.agenda_item_nbr,
                    agenda_item_text=metadata.agenda_item_text,
                    status=metadata.status,
                    is_withdrawn=metadata.is_withdrawn,
                )
                # 3. Put into database
                await db.upsert_tdoc(new_record)
                return new_record
        return None

    metadata = run_async(_resolve_metadata())
    if metadata is None:
        raise TDocNotFoundError(f"TDoc {document_id} not found via WhatTheSpec")
        msg = f"TDoc {document_id} not found"
        raise TDocNotFoundError(msg)

    checkout_dir = PathConfig().checkout_dir
    checkout_path = get_checkout_path(metadata, checkout_dir)
@@ -57,9 +94,27 @@ def fetch_tdoc_files(document_id: str, force_download: bool = False) -> TDocFile
        with create_cached_session() as session:
            checkout_tdoc(metadata, checkout_dir, force=force_download, session=session)

    return _find_files_in_checkout(checkout_path)


def fetch_spec_files(checkout_path: Path) -> TDocFiles:
    """Find available spec files in a checkout directory.

    Args:
        checkout_path: Path to the checked out spec directory.

    Returns:
        TDocFiles with paths to available documents.
    """
    return _find_files_in_checkout(checkout_path)


def _find_files_in_checkout(checkout_path: Path) -> TDocFiles:
    """Find available file types in checkout directory."""
    files = TDocFiles(checkout_dir=checkout_path)

    if checkout_path.is_dir():
        # Prefer PDF > DOCX > DOC
        for file_path in sorted(checkout_path.rglob("*")):
            if file_path.is_file() and not file_path.name.startswith("."):
                suffix = file_path.suffix.lower()
@@ -73,4 +128,4 @@ def fetch_tdoc_files(document_id: str, force_download: bool = False) -> TDocFile
    return files


__all__ = ["TDocFiles", "fetch_tdoc_files"]
__all__ = ["TDocFiles", "fetch_spec_files", "fetch_tdoc_files"]
Loading