Commit 596bf3e7 authored by Jan Reimes's avatar Jan Reimes
Browse files

refactor(tdocs): split crawl/database filters and add lxml

parent 1988d6c6
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@ dependencies = [
    "typer>=0.19.2",
    "xlsxwriter>=3.2.9",
    "zipinspect>=0.1.2",
    "lxml>=6.0.2",
]

[project.optional-dependencies]
+130 −57
Original line number Diff line number Diff line
@@ -5,6 +5,7 @@ import sqlite3
from collections.abc import Callable, Iterable
from datetime import UTC, datetime
from decimal import Decimal
from typing import Any

from tdoc_crawler.database.meetings import MeetingDatabase
from tdoc_crawler.logging import get_logger
@@ -106,53 +107,16 @@ class TDocDatabase(MeetingDatabase):
        meeting_map = meeting_db._meeting_map()

        if config.tdoc_ids:
            lookup = {value.strip().upper() for value in config.tdoc_ids}
            records = [record for record in records if record.tdoc_id in lookup]
            records = self._filter_by_tdoc_ids(records, config.tdoc_ids)

        if config.working_groups:
            allowed_tbids = {wg.tbid for wg in config.working_groups}
            filtered: list[TDocMetadata] = []
            for record in records:
                meeting = meeting_map.get(record.meeting_id or -1)
                if meeting is None or meeting.tbid not in allowed_tbids:
                    continue
                filtered.append(record)
            records = filtered
            records = self._filter_by_working_groups(records, meeting_map, config.working_groups)

        if config.start_date is not None:
            records = [record for record in records if record.date_retrieved and record.date_retrieved >= config.start_date]
        if config.end_date is not None:
            records = [record for record in records if record.date_retrieved and record.date_retrieved <= config.end_date]
        records = self._filter_by_retrieved_dates(records, config.start_date, config.end_date)

        # Meeting date filters
        if config.meeting_start_date is not None:
            records = [
                record for record in records if (m := meeting_map.get(record.meeting_id or -1)) and m.start_date and m.start_date >= config.meeting_start_date
            ]
        if config.meeting_end_date is not None:
            records = [record for record in records if (m := meeting_map.get(record.meeting_id or -1)) and m.end_date and m.end_date <= config.meeting_end_date]

        # Glob pattern filters (case-insensitive, OR logic within same field)
        if config.source_pattern:
            patterns = [p.lower() for p in config.source_pattern]
            records = [record for record in records if record.source and any(fnmatch.fnmatch(record.source.lower(), p) for p in patterns)]
        if config.source_pattern_exclude:
            patterns = [p.lower() for p in config.source_pattern_exclude]
            records = [record for record in records if not record.source or not any(fnmatch.fnmatch(record.source.lower(), p) for p in patterns)]
        if config.title_pattern:
            patterns = [p.lower() for p in config.title_pattern]
            records = [record for record in records if record.title and any(fnmatch.fnmatch(record.title.lower(), p) for p in patterns)]
        if config.title_pattern_exclude:
            patterns = [p.lower() for p in config.title_pattern_exclude]
            records = [record for record in records if not record.title or not any(fnmatch.fnmatch(record.title.lower(), p) for p in patterns)]
        if config.agenda_pattern:
            patterns = [p.lower() for p in config.agenda_pattern]
            records = [record for record in records if record.agenda_item_text and any(fnmatch.fnmatch(record.agenda_item_text.lower(), p) for p in patterns)]
        if config.agenda_pattern_exclude:
            patterns = [p.lower() for p in config.agenda_pattern_exclude]
            records = [
                record for record in records if not record.agenda_item_text or not any(fnmatch.fnmatch(record.agenda_item_text.lower(), p) for p in patterns)
            ]
        records = self._filter_by_meeting_dates(records, meeting_map, config.meeting_start_date, config.meeting_end_date)

        records = self._apply_pattern_filters(records, config)

        descending = config.order.value.lower() == "desc"
        records.sort(
@@ -221,22 +185,15 @@ class TDocDatabase(MeetingDatabase):

        processed: set[int] = set()
        for record in records:
            if record.meeting_id is None:
            meeting_id = record.meeting_id
            if meeting_id is None:
                continue
            meeting = meeting_map.get(record.meeting_id)
            meeting = meeting_map.get(meeting_id)
            if meeting is None:
                continue
            if allowed_tbids and meeting.tbid not in allowed_tbids:
                continue
            if allowed_subgroups:
                subgroup_code = None
                if meeting.subtb is not None:
                    subgroup_record = SUBTB_INDEX.get(meeting.subtb)
                    if subgroup_record:
                        subgroup_code = subgroup_record.code
                if subgroup_code is None or subgroup_code.upper() not in allowed_subgroups:
            if not self._meeting_matches_filters(meeting, allowed_tbids, allowed_subgroups):
                continue
            processed.add(record.meeting_id)
            processed.add(meeting_id)
        return processed

    def cache_invalid_tdoc(
@@ -286,6 +243,17 @@ class TDocDatabase(MeetingDatabase):
        """
        return {record.tdoc_id for record in self._table_rows("tdocs") if record.validation_failed}

    def _apply_pattern_filters(self, records: list[TDocMetadata], config: TDocQueryConfig) -> list[TDocMetadata]:
        """Apply all supported include/exclude glob filters."""
        filtered = records
        filtered = self._filter_by_pattern(filtered, config.source_pattern, lambda record: record.source, exclude=False)
        filtered = self._filter_by_pattern(filtered, config.source_pattern_exclude, lambda record: record.source, exclude=True)
        filtered = self._filter_by_pattern(filtered, config.title_pattern, lambda record: record.title, exclude=False)
        filtered = self._filter_by_pattern(filtered, config.title_pattern_exclude, lambda record: record.title, exclude=True)
        filtered = self._filter_by_pattern(filtered, config.agenda_pattern, lambda record: record.agenda_item_text, exclude=False)
        filtered = self._filter_by_pattern(filtered, config.agenda_pattern_exclude, lambda record: record.agenda_item_text, exclude=True)
        return filtered

    def _get_tdoc(self, tdoc_id: str) -> TDocMetadata | None:
        """Get a TDoc by ID."""
        try:
@@ -309,6 +277,111 @@ class TDocDatabase(MeetingDatabase):
        except sqlite3.Error:
            return None

    @classmethod
    def _meeting_matches_filters(
        cls,
        meeting: Any,
        allowed_tbids: set[int] | None,
        allowed_subgroups: set[str] | None,
    ) -> bool:
        """Check whether meeting satisfies working-group and subgroup constraints."""
        if allowed_tbids and meeting.tbid not in allowed_tbids:
            return False
        if not allowed_subgroups:
            return True
        subgroup_code = cls._subgroup_code_from_subtb(meeting.subtb)
        return subgroup_code is not None and subgroup_code.upper() in allowed_subgroups

    @staticmethod
    def _filter_by_tdoc_ids(records: list[TDocMetadata], tdoc_ids: list[str]) -> list[TDocMetadata]:
        """Filter records by case-insensitive TDoc ID list."""
        lookup = {value.strip().upper() for value in tdoc_ids}
        return [record for record in records if record.tdoc_id in lookup]

    @staticmethod
    def _filter_by_working_groups(
        records: list[TDocMetadata],
        meeting_map: dict[int, Any],
        working_groups: list[WorkingGroup],
    ) -> list[TDocMetadata]:
        """Filter records to meetings that belong to selected working groups."""
        allowed_tbids = {wg.tbid for wg in working_groups}
        filtered: list[TDocMetadata] = []
        for record in records:
            meeting = meeting_map.get(record.meeting_id or -1)
            if meeting is None or meeting.tbid not in allowed_tbids:
                continue
            filtered.append(record)
        return filtered

    @staticmethod
    def _filter_by_retrieved_dates(
        records: list[TDocMetadata],
        start_date: datetime | None,
        end_date: datetime | None,
    ) -> list[TDocMetadata]:
        """Filter records by retrieval datetime bounds."""
        filtered = records
        if start_date is not None:
            filtered = [record for record in filtered if record.date_retrieved and record.date_retrieved >= start_date]
        if end_date is not None:
            filtered = [record for record in filtered if record.date_retrieved and record.date_retrieved <= end_date]
        return filtered

    @staticmethod
    def _filter_by_meeting_dates(
        records: list[TDocMetadata],
        meeting_map: dict[int, Any],
        meeting_start_date: datetime | None,
        meeting_end_date: datetime | None,
    ) -> list[TDocMetadata]:
        """Filter records by start/end date of the related meeting."""
        filtered = records
        if meeting_start_date is not None:
            filtered = [
                record
                for record in filtered
                if (meeting := meeting_map.get(record.meeting_id or -1)) and meeting.start_date and meeting.start_date >= meeting_start_date
            ]
        if meeting_end_date is not None:
            filtered = [
                record
                for record in filtered
                if (meeting := meeting_map.get(record.meeting_id or -1)) and meeting.end_date and meeting.end_date <= meeting_end_date
            ]
        return filtered

    @staticmethod
    def _filter_by_pattern(
        records: list[TDocMetadata],
        values: list[str] | None,
        value_getter: Callable[[TDocMetadata], str | None],
        exclude: bool,
    ) -> list[TDocMetadata]:
        """Apply case-insensitive glob filter to a selected text field."""
        if not values:
            return records
        patterns = [value.lower() for value in values]
        if exclude:
            return [
                record
                for record in records
                if not value_getter(record) or not any(fnmatch.fnmatch(str(value_getter(record)).lower(), pattern) for pattern in patterns)
            ]
        return [
            record for record in records if value_getter(record) and any(fnmatch.fnmatch(str(value_getter(record)).lower(), pattern) for pattern in patterns)
        ]

    @staticmethod
    def _subgroup_code_from_subtb(subtb: int | None) -> str | None:
        """Resolve subgroup code from subtb identifier."""
        if subtb is None:
            return None
        subgroup_record = SUBTB_INDEX.get(subtb)
        if subgroup_record is None:
            return None
        return subgroup_record.code

    # ------------------------------------------------------------------
    # Normalisation helpers
    # ------------------------------------------------------------------
+228 −115

File changed.

Preview size limit exceeded, changes collapsed.