Commit 5e451054 authored by Jan Reimes's avatar Jan Reimes
Browse files

feat: implement HybridTDocCrawler with full functionality

parent 8dbcae47
Loading
Loading
Loading
Loading
+95 −106
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@
from __future__ import annotations

import asyncio
import json
import logging
from collections.abc import Callable
from dataclasses import dataclass
@@ -10,7 +11,7 @@ from dataclasses import dataclass
from tdoc_crawler.crawlers.executor_adapter import Runner
from tdoc_crawler.crawlers.parallel import fetch_meeting_document_list_subinterpreter, fetch_meeting_tdocs
from tdoc_crawler.database import TDocDatabase
from tdoc_crawler.models import CrawlLimits, MeetingMetadata, TDocCrawlConfig, TDocMetadata, WorkingGroup
from tdoc_crawler.models import CrawlLimits, MeetingMetadata, MeetingQueryConfig, SortOrder, TDocCrawlConfig, TDocMetadata, WorkingGroup

logger = logging.getLogger(__name__)

@@ -31,9 +32,101 @@ class HybridCrawlResult:
class HybridTDocCrawler:
    """Hybrid crawler that supports both document list and parallel crawling methods."""

    def __init__(self, database: TDocDatabase) -> None:
        self.database = database

    def crawl(self, config: TDocCrawlConfig, progress_callback: Callable[[float, float], None] | None = None) -> HybridCrawlResult:
        """Execute a hybrid crawl using the provided configuration.

        This method supports both document list and parallel crawling methods
        with automatic fallbacks based on configuration.
        """
        meetings = self._get_meetings_to_crawl(config)
        if not meetings:
            logger.warning("No meetings found matching criteria")
            return HybridCrawlResult(
                processed=0, inserted=0, updated=0, document_list_meetings=0, parallel_meetings=0, fallbacks=0, errors=["No meetings found"]
            )

        meetings = self._apply_meeting_limits(meetings, config.limits)
        if not meetings:
            logger.warning("Meetings filtered out by limits configuration")
            return HybridCrawlResult(
                processed=0,
                inserted=0,
                updated=0,
                document_list_meetings=0,
                parallel_meetings=0,
                fallbacks=0,
                errors=["No meetings found after applying limits"],
            )

        existing_ids: set[str] = set()
        if config.incremental:
            existing_ids = {tdoc_id.upper() for tdoc_id in self.database.get_existing_tdoc_ids(config.working_groups)}

        targets = set(config.target_ids) if config.target_ids else None

        collected: list[TDocMetadata] = []
        errors: list[str] = []

        # Phase 1: Document list method
        document_list_meetings = []
        parallel_meetings = []

        for meeting in meetings:
            if self._should_use_document_list_method(config, meeting):
                document_list_meetings.append(meeting)
            else:
                parallel_meetings.append(meeting)

        document_list_errors, doc_list_meetings_processed = [], 0
        if document_list_meetings:
            logger.info(f"Starting document list crawl for {len(document_list_meetings)} meetings")
            document_list_errors, doc_list_meetings_processed = asyncio.run(
                self._crawl_meetings_document_list(document_list_meetings, config, collected, existing_ids, targets)
            )
            errors.extend(document_list_errors)

        # Phase 2: Parallel method (for meetings not processed by document list or as fallback)
        parallel_meetings_final = []
        for meeting in parallel_meetings:
            parallel_meetings_final.append(meeting)

        # Add failed meetings as fallback if configured
        if config.allow_parallel_fallback:
            for meeting in document_list_meetings:
                if meeting.files_url and meeting.meeting_id not in [m.meeting_id for m in parallel_meetings_final]:
                    # Check if this meeting failed in document list phase
                    # This is a simple heuristic - in practice we'd track failures more precisely
                    parallel_meetings_final.append(meeting)

        parallel_errors, parallel_meetings_processed = [], 0
        if parallel_meetings_final:
            logger.info(f"Starting parallel crawl for {len(parallel_meetings_final)} meetings")
            parallel_errors, parallel_meetings_processed = asyncio.run(
                self._crawl_meetings_parallel(parallel_meetings_final, config, collected, existing_ids, targets)
            )
            errors.extend(parallel_errors)

        # Database operations
        inserted = 0
        updated = 0
        if collected:
            inserted, updated = self.database.bulk_upsert_tdocs(collected, progress_callback=progress_callback)

        return HybridCrawlResult(
            processed=len(collected),
            inserted=inserted,
            updated=updated,
            document_list_meetings=doc_list_meetings_processed,
            parallel_meetings=parallel_meetings_processed,
            fallbacks=len(parallel_meetings_final) - len(parallel_meetings) if config.allow_parallel_fallback else 0,
            errors=errors,
        )

    def _apply_meeting_limits(self, meetings: list, limits: CrawlLimits) -> list:
        """Apply crawl limits to the meeting list."""

        limited = list(meetings)

        if limits.limit_meetings is not None and limits.limit_meetings != 0:
@@ -72,9 +165,6 @@ class HybridTDocCrawler:

    def _get_meetings_to_crawl(self, config: TDocCrawlConfig) -> list:
        """Query meetings from the database based on configuration filters."""

        from tdoc_crawler.models import MeetingQueryConfig, SortOrder

        meeting_config = MeetingQueryConfig(
            cache_dir=config.cache_dir,
            working_groups=config.working_groups,
@@ -172,7 +262,6 @@ class HybridTDocCrawler:
        targets: set[str] | None,
    ) -> tuple[list[str], int]:
        """Launch subinterpreter workers to crawl meetings in parallel."""

        errors: list[str] = []
        seen_ids: set[str] = set()
        max_tdocs = None
@@ -248,7 +337,6 @@ class HybridTDocCrawler:
        targets: set[str] | None,
    ) -> tuple[list[str], int]:
        """Launch subinterpreter workers to crawl meetings in parallel using document list method."""

        errors: list[str] = []
        seen_ids: set[str] = set()
        max_tdocs = None
@@ -294,8 +382,6 @@ class HybridTDocCrawler:
                        continue

                    # Parse the JSON response (could be error or success)
                    import json

                    try:
                        response_data = json.loads(json_payload)

@@ -310,8 +396,6 @@ class HybridTDocCrawler:
                        # Process successful TDoc metadata
                        tdoc_metadata_list = []
                        for tdoc_json in response_data:
                            from tdoc_crawler.models.tdocs import TDocMetadata

                            tdoc_metadata = TDocMetadata.model_validate_json(tdoc_json)
                            tdoc_metadata_list.append(tdoc_metadata)

@@ -369,7 +453,6 @@ class HybridTDocCrawler:
        Returns True when a crawl limit (target IDs or limit_tdocs) has been
        reached and processing should stop.
        """

        for json_payload in payloads:
            metadata = TDocMetadata.model_validate_json(json_payload)
            tdoc_id = metadata.tdoc_id
@@ -392,100 +475,6 @@ class HybridTDocCrawler:

        return False

    def __init__(self, database: TDocDatabase) -> None:
        self.database = database

    def crawl(self, config: TDocCrawlConfig, progress_callback: Callable[[float, float], None] | None = None) -> HybridCrawlResult:
        """Execute a hybrid crawl using the provided configuration.

        This method supports both document list and parallel crawling methods
        with automatic fallbacks based on configuration.
        """

        meetings = self._get_meetings_to_crawl(config)
        if not meetings:
            logger.warning("No meetings found matching criteria")
            return HybridCrawlResult(
                processed=0, inserted=0, updated=0, document_list_meetings=0, parallel_meetings=0, fallbacks=0, errors=["No meetings found"]
            )

        meetings = self._apply_meeting_limits(meetings, config.limits)
        if not meetings:
            logger.warning("Meetings filtered out by limits configuration")
            return HybridCrawlResult(
                processed=0,
                inserted=0,
                updated=0,
                document_list_meetings=0,
                parallel_meetings=0,
                fallbacks=0,
                errors=["No meetings found after applying limits"],
            )

        existing_ids: set[str] = set()
        if config.incremental:
            existing_ids = {tdoc_id.upper() for tdoc_id in self.database.get_existing_tdoc_ids(config.working_groups)}

        targets = set(config.target_ids) if config.target_ids else None

        collected: list[TDocMetadata] = []
        errors: list[str] = []

        # Phase 1: Document list method
        document_list_meetings = []
        parallel_meetings = []

        for meeting in meetings:
            if self._should_use_document_list_method(config, meeting):
                document_list_meetings.append(meeting)
            else:
                parallel_meetings.append(meeting)

        document_list_errors, doc_list_meetings_processed = [], 0
        if document_list_meetings:
            logger.info(f"Starting document list crawl for {len(document_list_meetings)} meetings")
            document_list_errors, doc_list_meetings_processed = asyncio.run(
                self._crawl_meetings_document_list(document_list_meetings, config, collected, existing_ids, targets)
            )
            errors.extend(document_list_errors)

        # Phase 2: Parallel method (for meetings not processed by document list or as fallback)
        parallel_meetings_final = []
        for meeting in parallel_meetings:
            parallel_meetings_final.append(meeting)

        # Add failed meetings as fallback if configured
        if config.allow_parallel_fallback:
            for meeting in document_list_meetings:
                if meeting.files_url and meeting.meeting_id not in [m.meeting_id for m in parallel_meetings_final]:
                    # Check if this meeting failed in document list phase
                    # This is a simple heuristic - in practice we'd track failures more precisely
                    parallel_meetings_final.append(meeting)

        parallel_errors, parallel_meetings_processed = [], 0
        if parallel_meetings_final:
            logger.info(f"Starting parallel crawl for {len(parallel_meetings_final)} meetings")
            parallel_errors, parallel_meetings_processed = asyncio.run(
                self._crawl_meetings_parallel(parallel_meetings_final, config, collected, existing_ids, targets)
            )
            errors.extend(parallel_errors)

        # Database operations
        inserted = 0
        updated = 0
        if collected:
            inserted, updated = self.database.bulk_upsert_tdocs(collected, progress_callback=progress_callback)

        return HybridCrawlResult(
            processed=len(collected),
            inserted=inserted,
            updated=updated,
            document_list_meetings=doc_list_meetings_processed,
            parallel_meetings=parallel_meetings_processed,
            fallbacks=len(parallel_meetings_final) - len(parallel_meetings) if config.allow_parallel_fallback else 0,
            errors=errors,
        )


__all__ = [
    "HybridCrawlResult",
+2 −7
Original line number Diff line number Diff line
@@ -8,8 +8,7 @@ from pathlib import Path

from pydantic import BaseModel, Field, field_validator, model_validator

from .base import (DEFAULT_CACHE_DIR, BaseConfigModel, PortalCredentials,
                   SortOrder, utc_now)
from .base import DEFAULT_CACHE_DIR, BaseConfigModel, PortalCredentials, SortOrder, utc_now
from .crawl_limits import CrawlLimits, _new_crawl_limits
from .subworking_groups import CODE_INDEX, SUBTB_INDEX
from .working_groups import WorkingGroup
@@ -72,7 +71,7 @@ class MeetingMetadata(BaseModel):

    @field_validator("working_group", mode="before")
    @classmethod
    def _validate_working_group(cls, value):
    def _validate_working_group(cls, value: WorkingGroup | str | None) -> str | None:
        if value is None:
            return None
        # Accept enum or str, store as str
@@ -111,7 +110,6 @@ class MeetingCrawlConfig(BaseConfigModel):
    @classmethod
    def _normalize_working_groups(cls, value: Iterable[str | WorkingGroup]) -> list[WorkingGroup]:
        """Ensure the working groups list only contains valid enum members."""

        normalized: list[WorkingGroup] = []
        for item in value:
            normalized.append(WorkingGroup(item) if not isinstance(item, WorkingGroup) else item)
@@ -121,7 +119,6 @@ class MeetingCrawlConfig(BaseConfigModel):
    @classmethod
    def _normalize_subgroups(cls, value: Iterable[str] | None) -> list[str] | None:
        """Normalize subgroup names (uppercase and strip whitespace)."""

        if value is None:
            return None
        return [str(item).strip().upper() for item in value]
@@ -141,7 +138,6 @@ class MeetingQueryConfig(BaseConfigModel):
    @classmethod
    def _normalize_working_groups(cls, value: Iterable[str | WorkingGroup] | None) -> list[WorkingGroup] | None:
        """Ensure the working group list is comprised of enum members."""

        if value is None:
            return None
        normalized: list[WorkingGroup] = []
@@ -153,7 +149,6 @@ class MeetingQueryConfig(BaseConfigModel):
    @classmethod
    def _normalize_subgroups(cls, value: Iterable[str] | None) -> list[str] | None:
        """Normalize subgroup names (uppercase and strip whitespace)."""

        if value is None:
            return None
        return [str(item).strip().upper() for item in value]
+12 −20
Original line number Diff line number Diff line
@@ -9,9 +9,7 @@ from pathlib import Path

from pydantic import BaseModel, Field, field_validator

from .base import (DEFAULT_CACHE_DIR, BaseConfigModel, OutputFormat,
                   PortalCredentials, SortOrder, _normalize_tdoc_ids, utc_now)
    HttpCacheConfig,
from .base import DEFAULT_CACHE_DIR, BaseConfigModel, HttpCacheConfig, OutputFormat, PortalCredentials, SortOrder, _normalize_tdoc_ids, utc_now
from .crawl_limits import CrawlLimits, _new_crawl_limits
from .working_groups import WorkingGroup

@@ -49,7 +47,6 @@ class TDocMetadata(BaseModel):
    @classmethod
    def _normalize_tdoc_id(cls, value: str) -> str:
        """Ensure identifiers are uppercase and trimmed."""

        return value.strip().upper()


@@ -86,7 +83,6 @@ class TDocCrawlConfig(BaseConfigModel):
    @classmethod
    def _normalize_working_groups(cls, value: Iterable[str | WorkingGroup]) -> list[WorkingGroup]:
        """Ensure the working groups list only contains valid enum members."""

        normalized: list[WorkingGroup] = []
        for item in value:
            normalized.append(WorkingGroup(item) if not isinstance(item, WorkingGroup) else item)
@@ -96,7 +92,6 @@ class TDocCrawlConfig(BaseConfigModel):
    @classmethod
    def _normalize_subgroups(cls, value: Iterable[str] | None) -> list[str] | None:
        """Normalize subgroup names to uppercase."""

        if value is None:
            return None
        return [str(item).upper().strip() for item in value]
@@ -105,7 +100,6 @@ class TDocCrawlConfig(BaseConfigModel):
    @classmethod
    def _normalize_target_ids(cls, value: Iterable[str] | None) -> list[str] | None:
        """Ensure target identifiers are normalized."""

        if value is None:
            return None
        return _normalize_tdoc_ids(value)
@@ -123,21 +117,8 @@ class QueryConfig(BaseConfigModel):
    limit: int | None = Field(None, ge=1, description="Maximum results")
    order: SortOrder = Field(SortOrder.DESC, description="Sort order applied to date_retrieved")

    @field_validator("working_groups", mode="before")
    @classmethod
    def _normalize_working_groups(cls, value: Iterable[str | WorkingGroup] | None) -> list[WorkingGroup] | None:
        """Ensure the working group list is comprised of enum members."""

        if value is None:
            return None
        normalized: list[WorkingGroup] = []
        for item in value:
            normalized.append(WorkingGroup(item) if not isinstance(item, WorkingGroup) else item)
        return normalized

    def __init__(self, **data: object) -> None:
        """Normalize identifiers and accept enum values from strings."""

        tdoc_ids = data.get("tdoc_ids")
        if tdoc_ids:
            if isinstance(tdoc_ids, str):
@@ -150,6 +131,17 @@ class QueryConfig(BaseConfigModel):
            data["output_format"] = OutputFormat(output_format.lower())
        super().__init__(**data)

    @field_validator("working_groups", mode="before")
    @classmethod
    def _normalize_working_groups(cls, value: Iterable[str | WorkingGroup] | None) -> list[WorkingGroup] | None:
        """Ensure the working group list is comprised of enum members."""
        if value is None:
            return None
        normalized: list[WorkingGroup] = []
        for item in value:
            normalized.append(WorkingGroup(item) if not isinstance(item, WorkingGroup) else item)
        return normalized


# Legacy alias
CrawlConfig = TDocCrawlConfig