Commit f30aff5f authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(crawlers): enhance HybridTDocCrawler with async crawl support

- Implement internal asynchronous crawl method for better performance.
- Update existing crawl methods to utilize async/await for document list and parallel crawling.
- Improve error handling and logging during the crawling process.
parent 6cabbea1
Loading
Loading
Loading
Loading
+8 −4
Original line number Diff line number Diff line
@@ -41,6 +41,10 @@ class HybridTDocCrawler:
        This method supports both document list and parallel crawling methods
        with automatic fallbacks based on configuration.
        """
        return asyncio.run(self._crawl(config, progress_callback))

    async def _crawl(self, config: TDocCrawlConfig, progress_callback: Callable[[float, float], None] | None = None) -> HybridCrawlResult:
        """Internal asynchronous crawl implementation."""
        meetings = self._get_meetings_to_crawl(config)
        if not meetings:
            logger.warning("No meetings found matching criteria")
@@ -83,8 +87,8 @@ class HybridTDocCrawler:
        document_list_errors, doc_list_meetings_processed = [], 0
        if document_list_meetings:
            logger.info(f"Starting document list crawl for {len(document_list_meetings)} meetings")
            document_list_errors, doc_list_meetings_processed = asyncio.run(
                self._crawl_meetings_document_list(document_list_meetings, config, collected, existing_ids, targets)
            document_list_errors, doc_list_meetings_processed = await self._crawl_meetings_document_list(
                document_list_meetings, config, collected, existing_ids, targets
            )
            errors.extend(document_list_errors)

@@ -104,8 +108,8 @@ class HybridTDocCrawler:
        parallel_errors, parallel_meetings_processed = [], 0
        if parallel_meetings_final:
            logger.info(f"Starting parallel crawl for {len(parallel_meetings_final)} meetings")
            parallel_errors, parallel_meetings_processed = asyncio.run(
                self._crawl_meetings_parallel(parallel_meetings_final, config, collected, existing_ids, targets)
            parallel_errors, parallel_meetings_processed = await self._crawl_meetings_parallel(
                parallel_meetings_final, config, collected, existing_ids, targets
            )
            errors.extend(parallel_errors)

+5 −5
Original line number Diff line number Diff line
@@ -2,7 +2,6 @@

from __future__ import annotations

import asyncio
import logging
import time
from collections.abc import Callable
@@ -52,7 +51,7 @@ class TDocCrawler:
        targets = set(config.target_ids) if config.target_ids else None

        collected: list[TDocMetadata] = []
        errors = asyncio.run(self._crawl_meetings_parallel(meetings, config, collected, existing_ids, targets))
        errors = self._crawl_meetings_parallel(meetings, config, collected, existing_ids, targets)

        inserted = 0
        updated = 0
@@ -190,9 +189,10 @@ class TDocCrawler:
            executor = create_executor("serial")
            executor_type = "serial"
        else:
            # Auto-selects InterpreterPoolExecutor (Python 3.14+) or ProcessPoolExecutor (fallback)
            executor = create_executor("subinterpreter", max_workers=config.workers)
            executor_type = "subinterpreter/process"
            # Note: Using multiprocessing by default because numpy/pandas (used in document list parsing)
            # are currently incompatible with subinterpreters in Python 3.14.
            executor = create_executor("multiprocessing", max_workers=config.workers)
            executor_type = "multiprocessing"

        logger.debug("Using %s executor with %d worker(s)", executor_type, config.workers)