Commit 2071de0f authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(extraction): add hybrid timeout, auto-restart on server errors, and diagnostics

- Add hybrid_timeout_ms (600s default) to OpendataloaderConfig to prevent
  infinite hangs during ADVANCED profile conversions
- Auto-restart hybrid server and retry once on 500/503 errors
- Fix to_convert_kwargs: page_range→pages, workers→threads
- Add diagnostic INFO logging for opendataloader output analysis
- Move Counter import to top-level (PLC0415)
parent 060493be
Loading
Loading
Loading
Loading
+104 −16
Original line number Diff line number Diff line
@@ -10,6 +10,7 @@ import asyncio
import json
import logging
import tempfile
from collections import Counter
from pathlib import Path
from typing import Any

@@ -23,7 +24,7 @@ from tdoc_crawler.database.specs import SpecDatabase
from tdoc_crawler.extraction.conversion import ensure_pdf
from tdoc_crawler.extraction.fetch_spec import fetch_spec_files
from tdoc_crawler.extraction.fetch_tdoc import _lookup_tdoc_in_db, fetch_tdoc_files
from tdoc_crawler.extraction.hybrid_server import ensure_hybrid_server
from tdoc_crawler.extraction.hybrid_server import HybridServerManager, ensure_hybrid_server, get_manager
from tdoc_crawler.extraction.metrics import MetricType, get_metrics_tracker, timed_operation
from tdoc_crawler.extraction.profiles import DEFAULT_EXTRACTION_PROFILE, ExtractionProfile
from tdoc_crawler.models.workspaces import SourceKind
@@ -61,11 +62,13 @@ class OpendataloaderConfig:
        hybrid_mode: str | None = None,
        page_range: str | None = None,
        workers: int | None = None,
        hybrid_timeout_ms: int | None = None,
    ) -> None:
        self.hybrid = hybrid
        self.hybrid_mode = hybrid_mode
        self.page_range = page_range
        self.workers = workers
        self.hybrid_timeout_ms = hybrid_timeout_ms

    def to_convert_kwargs(self) -> dict[str, object]:
        """Convert config to kwargs for opendataloader_pdf.convert()."""
@@ -75,11 +78,18 @@ class OpendataloaderConfig:
        if self.hybrid_mode is not None:
            kwargs["hybrid_mode"] = self.hybrid_mode
        if self.page_range is not None:
            kwargs["page_range"] = self.page_range
            kwargs["pages"] = self.page_range
        if self.workers is not None:
            kwargs["workers"] = self.workers
            kwargs["threads"] = self.workers
        if self.hybrid_timeout_ms is not None:
            kwargs["hybrid_timeout"] = self.hybrid_timeout_ms
        return kwargs

    @classmethod
    def default_hybrid_timeout_ms(cls) -> int:
        """Return the default hybrid timeout in milliseconds."""
        return _DEFAULT_HYBRID_TIMEOUT_S * 1000


ExtractedTable = dict[str, object]
ExtractedFigure = dict[str, object]
@@ -408,6 +418,18 @@ def convert_tdoc_metadata(
    return convert_document_to_markdown(document_id, force=force, config=config)


_SERVER_ERROR_PATTERNS = ("status 500", "status_code=500", "server request failed", "503")
_MAX_CONVERSION_RETRIES = 1
_DEFAULT_HYBRID_TIMEOUT_S = 600
_MAX_HYBRID_TIMEOUT_S = 1800


def _is_server_error(exc: Exception) -> bool:
    """Check if an exception indicates a hybrid backend server failure."""
    msg = str(exc).lower()
    return any(p in msg for p in _SERVER_ERROR_PATTERNS)


def _ensure_hybrid_server_if_needed(config: OpendataloaderConfig) -> None:
    """Auto-start the hybrid server if the config requires hybrid mode.

@@ -424,6 +446,26 @@ def _ensure_hybrid_server_if_needed(config: OpendataloaderConfig) -> None:
        raise ConversionError(msg)


def _restart_hybrid_server() -> HybridServerManager:
    """Kill and restart the hybrid server, returning the manager.

    Returns:
        The restarted HybridServerManager.

    Raises:
        ConversionError: If the server cannot be restarted.
    """
    manager = get_manager()
    logger.warning("Hybrid server returned server error — restarting...")
    manager.stop()
    status = manager.start(wait=True)
    if not status.running:
        msg = f"Hybrid server failed to restart: {status.error}"
        raise ConversionError(msg)
    logger.info("Hybrid server restarted successfully (pid=%s)", status.pid)
    return manager


def _run_opendataloader(
    input_file: Path,
    output_dir: Path,
@@ -432,6 +474,9 @@ def _run_opendataloader(
) -> tuple[str, Path]:
    """Run OpenDataLoader conversion on a single file.

    If the hybrid backend returns a server error (500/503), the server is
    automatically restarted and the conversion is retried once.

    Args:
        input_file: Path to the PDF file.
        output_dir: Directory for output files.
@@ -441,21 +486,47 @@ def _run_opendataloader(
        Tuple of (markdown_content, json_file).

    Raises:
        ConversionError: If conversion fails.
        ConversionError: If conversion fails after retry.
    """
    config = config or OpendataloaderConfig()
    _ensure_hybrid_server_if_needed(config)

    if config.hybrid_timeout_ms is None and config.hybrid is not None:
        config = OpendataloaderConfig(
            hybrid=config.hybrid,
            hybrid_mode=config.hybrid_mode,
            page_range=config.page_range,
            workers=config.workers,
            hybrid_timeout_ms=OpendataloaderConfig.default_hybrid_timeout_ms(),
        )

    formats = "markdown,json,markdown-with-images"
    kwargs = config.to_convert_kwargs()
    timeout_s = (config.hybrid_timeout_ms or 0) / 1000
    logger.info(
        "opendataloader: input=%s output_dir=%s format=%s timeout=%.0fs kwargs=%s",
        input_file,
        output_dir,
        formats,
        timeout_s,
        kwargs,
    )

    for attempt in range(_MAX_CONVERSION_RETRIES + 1):
        try:
            opendataloader_pdf.convert(
                input_path=[str(input_file)],
                output_dir=str(output_dir),
                format=formats,
                quiet=True,
            **config.to_convert_kwargs(),
                **kwargs,
            )
            break
        except Exception as e:
            if _is_server_error(e) and attempt < _MAX_CONVERSION_RETRIES:
                logger.warning("Conversion failed with server error (attempt %d/%d): %s", attempt + 1, _MAX_CONVERSION_RETRIES + 1, e)
                _restart_hybrid_server()
                continue
            msg = f"OpenDataLoader conversion failed for {input_file.name}: {e}"
            logger.error(msg)
            raise ConversionError(msg) from e
@@ -464,6 +535,23 @@ def _run_opendataloader(
    md_file = output_dir / f"{stem}.md"
    json_file = output_dir / f"{stem}.json"

    logger.info("opendataloader output: md_exists=%s json_exists=%s", md_file.exists(), json_file.exists())
    if md_file.exists():
        logger.info("opendataloader md size: %d bytes", md_file.stat().st_size)
    if json_file.exists():
        json_text = json_file.read_text(encoding="utf-8")
        logger.info("opendataloader json size: %d bytes", len(json_text))
        try:
            json_data = json.loads(json_text)
            if isinstance(json_data, dict):
                kids = json_data.get("kids", [])
                logger.info("opendataloader json keys=%s kids_count=%d", list(json_data.keys()), len(kids))
                if kids:
                    kid_types = [k.get("type", "?") for k in kids if isinstance(k, dict)]
                    logger.info("opendataloader kid types: %s", dict(Counter(kid_types)))
        except json.JSONDecodeError:
            logger.warning("opendataloader json is not valid JSON")

    markdown_content = md_file.read_text(encoding="utf-8")

    return markdown_content, json_file