Commit ac46d10c authored by Jan Reimes's avatar Jan Reimes
Browse files

♻️ refactor(database): improve bulk upsert with dynamic batch sizing and field filtering

parent ddd7f68c
Loading
Loading
Loading
Loading
+23 −26
Original line number Diff line number Diff line
@@ -15,6 +15,14 @@ from tdoc_crawler.utils.normalization import normalize_tdoc_id

_logger = get_logger(__name__)

SQLITE_MAX_VARS = 999

DEFAULT_BULK_BATCH_SIZE = 40


def _get_updatable_fields(model_type: type) -> list[str]:
    return [name for name, f in model_type.model_fields.items() if not getattr(f, "db_pk", False)]


class TDocDatabase(MeetingDatabase):
    """Unified database operations for TDocs and Meetings.
@@ -68,6 +76,7 @@ class TDocDatabase(MeetingDatabase):
        self,
        tdocs: Iterable[TDocMetadata],
        progress_callback: Callable[[float, float], None] | None = None,
        batch_size: int = DEFAULT_BULK_BATCH_SIZE,
    ) -> tuple[int, int]:
        """Bulk upsert TDocs with optional progress callback.

@@ -78,6 +87,8 @@ class TDocDatabase(MeetingDatabase):
            tdocs: Iterable of TDoc metadata to upsert
            progress_callback: Optional callback to call after each batch.
                Takes (completed, total) as float parameters.
            batch_size: Number of records per batch. Automatically reduced
                if it would exceed SQLite's variable limit.

        Returns:
            Tuple of (inserted_count, updated_count)
@@ -110,32 +121,18 @@ class TDocDatabase(MeetingDatabase):
            else:
                to_update.append(self._clone_tdoc(record, {"date_updated": now}))

        if to_create:
            await TDocMetadata.objects.bulk_create(to_create)
        if to_update:
            await TDocMetadata.objects.bulk_update(
                to_update,
                fields=[
                    "meeting_id",
                    "tbid",
                    "title",
                    "url",
                    "source",
                    "contact",
                    "tdoc_type",
                    "for_purpose",
                    "agenda_item_nbr",
                    "agenda_item_text",
                    "status",
                    "is_revision_of",
                    "file_size",
                    "date_created",
                    "date_retrieved",
                    "date_updated",
                    "validated",
                    "validation_failed",
                ],
            )
        update_fields = _get_updatable_fields(TDocMetadata)
        n_cols = len(TDocMetadata.model_fields)
        max_batch = SQLITE_MAX_VARS // n_cols
        if batch_size > max_batch:
            _logger.warning("batch_size reduced from %d to %d (SQLITE_MAX_VARS=%d, columns=%d)", batch_size, max_batch, SQLITE_MAX_VARS, n_cols)
            batch_size = max_batch

        for i in range(0, len(to_create), batch_size):
            await TDocMetadata.objects.bulk_create(to_create[i : i + batch_size])

        for i in range(0, len(to_update), batch_size):
            await TDocMetadata.objects.bulk_update(to_update[i : i + batch_size], fields=update_fields)

        if progress_callback:
            progress_callback(total, total)