Commit 2a70b2b8 authored by Jan Reimes's avatar Jan Reimes
Browse files

perf(db): add migrations, tbid column, bulk upsert, and query pushdown

- Add incremental schema migration framework with first migration:
  ALTER TABLE tdocs ADD COLUMN tbid INTEGER
- Add tbid field to TDocMetadata Oxyde model
- Resolve tbid from meeting record when not set on TDoc insert
- Replace one-by-one upserts with bulk_create/bulk_update in
  upsert_meetings_batch() and upsert_tdocs_batch()
- Push spec_number, title, working_group, and status filters to
  Oxyde ORM WHERE clauses instead of filtering in Python
parent 2df79799
Loading
Loading
Loading
Loading
+27 −0
Original line number Diff line number Diff line
@@ -214,6 +214,33 @@ class DocDatabase:
            raw_ir = {"proto": 1, "op": "raw", "table": "", "sql": create_sql}
            await self._database.execute(raw_ir)

        await self._run_migrations()

    async def _run_migrations(self) -> None:
        """Apply incremental schema migrations for existing databases."""
        if self._database is None:
            return

        migrations = [
            (
                "tdocs",
                "tbid",
                "ALTER TABLE tdocs ADD COLUMN tbid INTEGER DEFAULT NULL",
            ),
        ]

        for table_name, column_name, alter_sql in migrations:
            try:
                raw_ir = {"proto": 1, "op": "raw", "table": "", "sql": f"PRAGMA table_info({table_name})"}
                result = await self._database.execute(raw_ir)
                columns = [row[1] for row in result] if result else []
                if column_name not in columns:
                    _logger.info("Migrating %s: adding column %s", table_name, column_name)
                    migrate_ir = {"proto": 1, "op": "raw", "table": "", "sql": alter_sql}
                    await self._database.execute(migrate_ir)
            except Exception as exc:
                _logger.debug("Migration check for %s.%s skipped: %s", table_name, column_name, exc)

    async def _ensure_reference_data(self) -> None:
        """Populate reference tables for working and subworking groups."""
        for record in WORKING_GROUP_RECORDS:
+104 −35
Original line number Diff line number Diff line
@@ -3,7 +3,8 @@
import asyncio
from collections import defaultdict
from collections.abc import Callable, Iterable
from datetime import datetime

from oxyde import Q

from tdoc_crawler.config.settings import PathConfig
from tdoc_crawler.database.base import DocDatabase
@@ -86,70 +87,138 @@ class MeetingDatabase(DocDatabase):
    ) -> tuple[int, int]:
        """Bulk upsert meetings with optional progress callback.

        Splits records into new vs existing, then uses ``bulk_create`` for
        inserts and ``bulk_update`` for updates instead of one-by-one queries.

        Args:
            meetings: Iterable of meeting metadata to upsert
            progress_callback: Optional callback to call after each meeting is processed.
            progress_callback: Optional callback to call after batch processing.
                Takes (completed, total) as float parameters.

        Returns:
            Tuple of (inserted_count, updated_count)
        """
        meetings_list = list(meetings)
        if not meetings_list:
            return 0, 0

        total = float(len(meetings_list))
        now = utc_now()

        to_create: list[MeetingMetadata] = []
        to_update: list[MeetingMetadata] = []
        for meeting in meetings_list:
            record = self._prepare_meeting(meeting)
            existing = await self._get_meeting(record.meeting_id)
            if existing is None:
                to_create.append(
                    MeetingMetadata(
                        meeting_id=record.meeting_id,
                        tbid=record.tbid,
                        subtb=record.subtb,
                        short_name=record.short_name,
                        title=record.title,
                        start_date=record.start_date,
                        end_date=record.end_date,
                        location=record.location,
                        files_url=record.files_url,
                        portal_url=record.portal_url,
                        tdoc_count=record.tdoc_count,
                        last_synced=record.last_synced or now,
                        created_at=now,
                        updated_at=now,
                    )
                )
            else:
                to_update.append(
                    MeetingMetadata(
                        meeting_id=record.meeting_id,
                        tbid=record.tbid,
                        subtb=record.subtb,
                        short_name=record.short_name,
                        title=record.title,
                        start_date=record.start_date,
                        end_date=record.end_date,
                        location=record.location,
                        files_url=record.files_url,
                        portal_url=record.portal_url,
                        tdoc_count=record.tdoc_count,
                        last_synced=record.last_synced or existing.last_synced,
                        created_at=record.created_at or existing.created_at,
                        updated_at=now,
                    )
                )

        if to_create:
            await MeetingMetadata.objects.bulk_create(to_create)
        if to_update:
            await MeetingMetadata.objects.bulk_update(
                to_update,
                fields=[
                    "tbid",
                    "subtb",
                    "short_name",
                    "title",
                    "start_date",
                    "end_date",
                    "location",
                    "files_url",
                    "portal_url",
                    "tdoc_count",
                    "last_synced",
                    "created_at",
                    "updated_at",
                ],
            )

        inserted = 0
        updated = 0
        for index, meeting in enumerate(meetings_list, start=1):
            created, changed = await self.upsert_meeting(meeting)
            if created:
                inserted += 1
            elif changed:
                updated += 1
        if progress_callback:
                progress_callback(float(index), total)
        return inserted, updated
            progress_callback(total, total)

        return len(to_create), len(to_update)

    async def query_meetings(self, config: MeetingQueryConfig) -> list[MeetingMetadata]:
        """Query meetings with filtering and sorting.

        Pushes filters to SQL WHERE clauses via Oxyde ORM instead of loading
        all rows and filtering in Python.

        Args:
            config: Query configuration with filters and options

        Returns:
            List of matching meeting metadata
        """
        meetings = await self._table_rows(MeetingMetadata)
        qs = MeetingMetadata.objects

        if config.working_groups:
            allowed = {wg.tbid for wg in config.working_groups}
            meetings = [meeting for meeting in meetings if meeting.tbid in allowed]
            allowed_tbids = [wg.tbid for wg in config.working_groups]
            qs = qs.filter(tbid__in=allowed_tbids)

        if config.subgroups:
            allowed_subgroups = {value.strip().upper() for value in config.subgroups}
            meetings = [meeting for meeting in meetings if meeting.subtb in SUBTB_INDEX and SUBTB_INDEX[meeting.subtb].code in allowed_subgroups]
            allowed_subtbs = [subtb for subtb, rec in SUBTB_INDEX.items() if rec.code.upper() in allowed_subgroups]
            if allowed_subtbs:
                qs = qs.filter(subtb__in=allowed_subtbs)
            else:
                return []

        if not config.include_without_files:
            meetings = [meeting for meeting in meetings if meeting.files_url]
            qs = qs.exclude(Q(files_url__isnull=True) | Q(files_url=""))

        # Date range filters
        if config.start_date is not None:
            meetings = [meeting for meeting in meetings if meeting.start_date and meeting.start_date >= config.start_date]
            qs = qs.filter(start_date__gte=config.start_date)

        if config.end_date is not None:
            meetings = [meeting for meeting in meetings if meeting.end_date and meeting.end_date <= config.end_date]
            qs = qs.filter(end_date__lte=config.end_date)

        descending = config.order.value.lower() == "desc"
        meetings.sort(
            key=lambda meeting: (
                meeting.start_date or datetime.min.date(),
                meeting.meeting_id,
            ),
            reverse=descending,
        )
        order_prefix = "-" if descending else ""
        qs = qs.order_by(f"{order_prefix}start_date", f"{order_prefix}meeting_id")

        if config.limit is not None:
            meetings = meetings[: config.limit]
            qs = qs.limit(config.limit)

        return meetings
        return await qs.all()

    async def get_existing_meeting_ids(self, working_groups: Iterable[WorkingGroup] | None = None) -> set[int]:
        """Get set of existing meeting IDs, optionally filtered by working group.
@@ -160,12 +229,13 @@ class MeetingDatabase(DocDatabase):
        Returns:
            Set of meeting IDs
        """
        meetings = await self._table_rows(MeetingMetadata)
        if not working_groups:
            meetings = await self._table_rows(MeetingMetadata)
            return {meeting.meeting_id for meeting in meetings}

        allowed = {wg.tbid for wg in working_groups}
        return {meeting.meeting_id for meeting in meetings if meeting.tbid in allowed}
        allowed_tbids = [wg.tbid for wg in working_groups]
        meetings = await MeetingMetadata.objects.filter(tbid__in=allowed_tbids).all()
        return {meeting.meeting_id for meeting in meetings}

    async def resolve_meeting_id(self, meeting_name: str) -> int | None:
        """Resolve meeting name to meeting_id from database.
@@ -270,8 +340,7 @@ class MeetingDatabase(DocDatabase):

    async def _meeting_map(self) -> dict[int, MeetingMetadata]:
        """Get mapping of meeting ID to meeting metadata."""
        meetings = await self._table_rows(MeetingMetadata)
        return {meeting.meeting_id: meeting for meeting in meetings}
        return {m.meeting_id: m for m in await self._table_rows(MeetingMetadata)}

    async def _get_meeting(self, meeting_id: int) -> MeetingMetadata | None:
        """Get a meeting by ID."""
+1 −0
Original line number Diff line number Diff line
@@ -95,6 +95,7 @@ class TDocMetadata(Model):

    tdoc_id: str = Field(db_pk=True)
    meeting_id: int
    tbid: int | None = None
    title: str
    url: str | None = None
    source: str
+21 −10
Original line number Diff line number Diff line
@@ -196,28 +196,39 @@ class SpecDatabase(DocDatabase):
            raise DatabaseError("spec-versions-read-failed", detail=str(exc)) from exc

    async def query_specs(self, filters: SpecQueryFilters) -> list[SpecQueryResult]:
        """Query stored spec metadata."""
        specs = await self._spec_table_rows()
        """Query stored spec metadata.

        Pushes spec_number, title, status, and working_group filters to SQL
        WHERE clauses via Oxyde ORM instead of loading all rows and filtering
        in Python.

        Args:
            filters: Query filters for spec metadata

        Returns:
            List of spec query results with source differences
        """
        source_records = await self._table_rows(SpecificationSourceRecord)
        records_by_spec: dict[str, list[SpecificationSourceRecord]] = defaultdict(list)
        for record in source_records:
            records_by_spec[record.spec_number].append(record)

        qs = Specification.objects

        if filters.spec_numbers:
            allowed = {normalize_spec_number(value) for value in filters.spec_numbers}
            specs = [spec for spec in specs if spec.spec_number in allowed]
            allowed = [normalize_spec_number(value) for value in filters.spec_numbers]
            qs = qs.filter(spec_number__in=allowed)

        if filters.title:
            needle = filters.title.strip().lower()
            specs = [spec for spec in specs if needle in (spec.title or "").lower()]
            qs = qs.filter(title__icontains=filters.title.strip())

        if filters.working_group:
            needle = filters.working_group.strip().lower()
            specs = [spec for spec in specs if (spec.working_group or "").lower() == needle]
            qs = qs.filter(working_group__iexact=filters.working_group.strip())

        if filters.status:
            needle = filters.status.strip().lower()
            specs = [spec for spec in specs if (spec.status or "").lower() == needle]
            qs = qs.filter(status__iexact=filters.status.strip())

        specs = await qs.all()

        def build_source_differences(records: list[SpecificationSourceRecord]) -> dict[str, dict[str, str | None]]:
            if len(records) < 2:
+105 −93
Original line number Diff line number Diff line
@@ -9,7 +9,6 @@ from tdoc_crawler.database.meetings import MeetingDatabase
from tdoc_crawler.database.oxyde_models import MeetingMetadata, TDocMetadata
from tdoc_crawler.logging import get_logger
from tdoc_crawler.models import WorkingGroup
from tdoc_crawler.models.subworking_groups import SUBTB_INDEX
from tdoc_crawler.tdocs.models import TDocQueryConfig
from tdoc_crawler.utils.misc import utc_now
from tdoc_crawler.utils.normalization import normalize_tdoc_id
@@ -31,6 +30,9 @@ class TDocDatabase(MeetingDatabase):
    async def upsert_tdoc(self, metadata: TDocMetadata) -> tuple[bool, bool]:
        """Insert or update a TDoc record.

        If ``tbid`` is not set on the metadata, resolves it from the
        associated meeting record.

        Args:
            metadata: TDoc metadata to insert or update

@@ -38,6 +40,8 @@ class TDocDatabase(MeetingDatabase):
            Tuple of (created, changed) booleans
        """
        record = self._prepare_tdoc(metadata)
        if getattr(record, "tbid", None) is None:
            record = await self._resolve_tbid(record)
        existing = await self._get_tdoc(record.tdoc_id)
        now = utc_now()

@@ -67,54 +71,114 @@ class TDocDatabase(MeetingDatabase):
    ) -> tuple[int, int]:
        """Bulk upsert TDocs with optional progress callback.

        Splits records into new vs existing, then uses ``bulk_create`` for
        inserts and ``bulk_update`` for updates instead of one-by-one queries.

        Args:
            tdocs: Iterable of TDoc metadata to upsert
            progress_callback: Optional callback to call after each TDoc is processed.
            progress_callback: Optional callback to call after each batch.
                Takes (completed, total) as float parameters.

        Returns:
            Tuple of (inserted_count, updated_count)
        """
        tdocs_list = list(tdocs)
        if not tdocs_list:
            return 0, 0

        total = float(len(tdocs_list))

        inserted = 0
        updated = 0
        for index, metadata in enumerate(tdocs_list, start=1):
            created, changed = await self.upsert_tdoc(metadata)
            if created:
                inserted += 1
            elif changed:
                updated += 1
        prepared: list[TDocMetadata] = []
        for metadata in tdocs_list:
            record = self._prepare_tdoc(metadata)
            if getattr(record, "tbid", None) is None:
                record = await self._resolve_tbid(record)
            prepared.append(record)

        existing_ids = set()
        for record in prepared:
            existing = await self._get_tdoc(record.tdoc_id)
            if existing is not None:
                existing_ids.add(record.tdoc_id)

        now = utc_now()
        to_create: list[TDocMetadata] = []
        to_update: list[TDocMetadata] = []
        for record in prepared:
            if record.tdoc_id not in existing_ids:
                to_create.append(self._clone_tdoc(record, {"date_updated": now, "date_created": record.date_created or record.date_retrieved}))
            else:
                to_update.append(self._clone_tdoc(record, {"date_updated": now}))

        if to_create:
            await TDocMetadata.objects.bulk_create(to_create)
        if to_update:
            await TDocMetadata.objects.bulk_update(
                to_update,
                fields=[
                    "meeting_id",
                    "tbid",
                    "title",
                    "url",
                    "source",
                    "contact",
                    "tdoc_type",
                    "for_purpose",
                    "agenda_item_nbr",
                    "agenda_item_text",
                    "status",
                    "is_revision_of",
                    "file_size",
                    "date_created",
                    "date_retrieved",
                    "date_updated",
                    "validated",
                    "validation_failed",
                ],
            )

        if progress_callback:
                progress_callback(float(index), total)
        return inserted, updated
            progress_callback(total, total)

        return len(to_create), len(to_update)

    async def query_tdocs(self, config: TDocQueryConfig) -> list[TDocMetadata]:
        """Query TDocs with filtering and sorting.

        Pushes tdoc_id and working_group (via denormalized tbid) filters to
        SQL WHERE via Oxyde ORM. Meeting date filters and glob pattern filters
        still require in-memory processing because they span multiple tables
        or use fnmatch.

        Args:
            config: Query configuration with filters and options

        Returns:
            List of matching TDoc metadata
        """
        records = await self._table_rows(TDocMetadata)
        if not records:
            return []

        meeting_map = {meeting.meeting_id: meeting for meeting in await self._table_rows(MeetingMetadata)}
        qs = TDocMetadata.objects

        if config.tdoc_ids:
            records = self._filter_by_tdoc_ids(records, config.tdoc_ids)
            lookup = [normalize_tdoc_id(value) for value in config.tdoc_ids]
            qs = qs.filter(tdoc_id__in=lookup)

        if config.working_groups:
            records = self._filter_by_working_groups(records, meeting_map, config.working_groups)
            allowed_tbids = [wg.tbid for wg in config.working_groups]
            qs = qs.filter(tbid__in=allowed_tbids)

        records = self._filter_by_retrieved_dates(records, config.start_date, config.end_date)
        records = await qs.all()

        if not records:
            return []

        need_meeting_map = config.meeting_start_date is not None or config.meeting_end_date is not None

        if need_meeting_map:
            meeting_map = {meeting.meeting_id: meeting for meeting in await self._table_rows(MeetingMetadata)}
            records = self._filter_by_meeting_dates(records, meeting_map, config.meeting_start_date, config.meeting_end_date)

        records = self._filter_by_retrieved_dates(records, config.start_date, config.end_date)

        records = self._apply_pattern_filters(records, config)

        descending = config.order.value.lower() == "desc"
@@ -134,28 +198,22 @@ class TDocDatabase(MeetingDatabase):
    async def get_existing_tdoc_ids(self, working_groups: Iterable[WorkingGroup] | None = None) -> set[str]:
        """Get set of existing TDoc IDs, optionally filtered by working group.

        Uses the denormalized ``tbid`` field on TDocMetadata to filter
        without joining the meetings table.

        Args:
            working_groups: Optional list of working groups to filter by

        Returns:
            Set of TDoc IDs
        """
        records = await self._table_rows(TDocMetadata)
        if not working_groups:
            records = await self._table_rows(TDocMetadata)
            return {record.tdoc_id for record in records}

        meeting_map = {meeting.meeting_id: meeting for meeting in await self._table_rows(MeetingMetadata)}

        allowed_tbids = {wg.tbid for wg in working_groups}

        result: set[str] = set()
        for record in records:
            meeting = meeting_map.get(record.meeting_id or -1)
            if meeting is None:
                continue
            if meeting.tbid in allowed_tbids:
                result.add(record.tdoc_id)
        return result
        allowed_tbids = [wg.tbid for wg in working_groups]
        records = await TDocMetadata.objects.filter(tbid__in=allowed_tbids).all()
        return {record.tdoc_id for record in records}

    def _apply_pattern_filters(self, records: list[TDocMetadata], config: TDocQueryConfig) -> list[TDocMetadata]:
        """Apply all supported include/exclude glob filters."""
@@ -172,42 +230,14 @@ class TDocDatabase(MeetingDatabase):
        """Get a TDoc by ID."""
        return await TDocMetadata.objects.filter(tdoc_id=tdoc_id.upper()).first()

    @classmethod
    def _meeting_matches_filters(
        cls,
        meeting: Any,
        allowed_tbids: set[int] | None,
        allowed_subgroups: set[str] | None,
    ) -> bool:
        """Check whether meeting satisfies working-group and subgroup constraints."""
        if allowed_tbids and meeting.tbid not in allowed_tbids:
            return False
        if not allowed_subgroups:
            return True
        subgroup_code = cls._subgroup_code_from_subtb(meeting.subtb)
        return subgroup_code is not None and subgroup_code.upper() in allowed_subgroups

    @staticmethod
    def _filter_by_tdoc_ids(records: list[TDocMetadata], tdoc_ids: list[str]) -> list[TDocMetadata]:
        """Filter records by case-insensitive TDoc ID list."""
        lookup = {normalize_tdoc_id(value) for value in tdoc_ids}
        return [record for record in records if record.tdoc_id in lookup]

    @staticmethod
    def _filter_by_working_groups(
        records: list[TDocMetadata],
        meeting_map: dict[int, Any],
        working_groups: list[WorkingGroup],
    ) -> list[TDocMetadata]:
        """Filter records to meetings that belong to selected working groups."""
        allowed_tbids = {wg.tbid for wg in working_groups}
        filtered: list[TDocMetadata] = []
        for record in records:
            meeting = meeting_map.get(record.meeting_id or -1)
            if meeting is None or meeting.tbid not in allowed_tbids:
                continue
            filtered.append(record)
        return filtered
    async def _resolve_tbid(self, record: TDocMetadata) -> TDocMetadata:
        """Resolve tbid from the associated meeting and return updated record."""
        if record.meeting_id is None:
            return record
        meeting = await MeetingMetadata.objects.filter(meeting_id=record.meeting_id).first()
        if meeting is None or meeting.tbid is None:
            return record
        return self._clone_tdoc(record, {"tbid": meeting.tbid})

    @staticmethod
    def _filter_by_retrieved_dates(
@@ -226,17 +256,9 @@ class TDocDatabase(MeetingDatabase):

        filtered = records
        if start_date is not None:
            filtered = [
                record
                for record in filtered
                if record.date_retrieved and _normalized_bound(record.date_retrieved, start_date)[0] >= _normalized_bound(record.date_retrieved, start_date)[1]
            ]
            filtered = [record for record in filtered if record.date_retrieved and (nb := _normalized_bound(record.date_retrieved, start_date))[0] >= nb[1]]
        if end_date is not None:
            filtered = [
                record
                for record in filtered
                if record.date_retrieved and _normalized_bound(record.date_retrieved, end_date)[0] <= _normalized_bound(record.date_retrieved, end_date)[1]
            ]
            filtered = [record for record in filtered if record.date_retrieved and (nb := _normalized_bound(record.date_retrieved, end_date))[0] <= nb[1]]
        return filtered

    @staticmethod
@@ -283,16 +305,6 @@ class TDocDatabase(MeetingDatabase):
            record for record in records if value_getter(record) and any(fnmatch.fnmatch(str(value_getter(record)).lower(), pattern) for pattern in patterns)
        ]

    @staticmethod
    def _subgroup_code_from_subtb(subtb: int | None) -> str | None:
        """Resolve subgroup code from subtb identifier."""
        if subtb is None:
            return None
        subgroup_record = SUBTB_INDEX.get(subtb)
        if subgroup_record is None:
            return None
        return subgroup_record.code

    # ------------------------------------------------------------------
    # Normalisation helpers
    # ------------------------------------------------------------------
@@ -312,14 +324,14 @@ class TDocDatabase(MeetingDatabase):
        for field in TDocMetadata.__annotations__:
            if field == "date_updated":
                continue
            if getattr(current, field) != getattr(candidate, field):
            if getattr(current, field, None) != getattr(candidate, field, None):
                return True
        return False

    @staticmethod
    def _clone_tdoc(metadata: TDocMetadata, updates: dict[str, object]) -> TDocMetadata:
        """Create a new TDocMetadata instance with updates applied."""
        data = {field: getattr(metadata, field) for field in TDocMetadata.__annotations__}
        data = {field: getattr(metadata, field, None) for field in TDocMetadata.__annotations__}
        data.update(updates)
        return TDocMetadata(**data)