Commit e9781599 authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(pipeline): add limit parameter to processing phases

- Introduced a limit parameter to control the maximum number of documents processed in each phase (extract, embed, graph).
- Updated function signatures and logic to accommodate the new limit functionality.
parent 9f358c6e
Loading
Loading
Loading
Loading
+34 −14
Original line number Diff line number Diff line
@@ -335,6 +335,7 @@ def process_all(
        workspace=normalized_workspace,
        config=config,
        embeddings_manager=embeddings_manager,
        limit=limit,
    )
    results.update(extract_results)

@@ -346,6 +347,7 @@ def process_all(
        workspace=normalized_workspace,
        config=config,
        embeddings_manager=embeddings_manager,
        limit=limit,
    )
    results.update(embed_results)

@@ -356,6 +358,7 @@ def process_all(
        workspace=normalized_workspace,
        config=config,
        embeddings_manager=embeddings_manager,
        limit=limit,
    )
    results.update(graph_results)
    return results
@@ -403,22 +406,25 @@ def process_extract_phase(
    workspace: str | None = None,
    config: AiConfig | None = None,
    embeddings_manager: EmbeddingsManager | None = None,
    limit: int | None = None,
) -> tuple[list[str], dict[str, ProcessingStatus]]:
    """Run the classify and extract phase for all documents.

    This phase identifies the primary document in each folder, then converts it
    to Markdown and records status updates in storage. Failures are logged and do
    to Markdown and records status updates in storage. Documents with completed
    extraction are skipped unless force_rerun is set. Failures are logged and do
    not interrupt processing of other documents.

    Args:
        document_ids: Document identifiers to process.
        checkout_base: Base checkout directory for document folders.
        new_only: Whether to process only documents without existing extraction.
        new_only: Whether to return already-extracted documents in results.
        force_rerun: Whether to re-run classification/extraction regardless of status.
        progress_callback: Optional callback invoked after each document.
        workspace: Optional workspace name (defaults to "default").
        config: Optional AI configuration override.
        embeddings_manager: Optional embeddings manager to reuse.
        limit: Optional maximum number of documents to process.

    Returns:
        Tuple of (extracted_ids, results) where extracted_ids contains document IDs
@@ -430,13 +436,17 @@ def process_extract_phase(
    results: dict[str, ProcessingStatus] = {}
    extracted_ids: list[str] = []

    # Apply limit for testing
    if limit is not None and limit > 0:
        document_ids = document_ids[:limit]

    for document_id in document_ids:
        if new_only and not force_rerun:
        existing_status = storage.get_status(document_id, workspace=normalized_workspace)
            if existing_status and (
                existing_status.extracted_at is not None or existing_status.current_stage == PipelineStage.COMPLETED or existing_status.completed_at is not None
            ):
                logger.info(f"Skipping {document_id} - already processed")
        if not force_rerun and existing_status and existing_status.extracted_at is not None:
            logger.debug(f"Skipping {document_id} - already extracted")
            if not new_only:
                results[document_id] = existing_status
                extracted_ids.append(document_id)
            if progress_callback:
                progress_callback(document_id)
            continue
@@ -454,9 +464,7 @@ def process_extract_phase(
            continue

        try:
            status = storage.get_status(document_id, workspace=normalized_workspace)
            if status is None:
                status = ProcessingStatus(document_id=document_id)
            status = existing_status or ProcessingStatus(document_id=document_id)

            if not force_rerun and status.classified_at:
                logger.info(f"Skipping classification for {document_id} - already classified")
@@ -488,6 +496,7 @@ def process_embed_phase(
    workspace: str | None = None,
    config: AiConfig | None = None,
    embeddings_manager: EmbeddingsManager | None = None,
    limit: int | None = None,
) -> tuple[list[str], dict[str, ProcessingStatus]]:
    """Run the embedding phase for extracted documents.

@@ -502,6 +511,7 @@ def process_embed_phase(
        workspace: Optional workspace name (defaults to "default").
        config: Optional AI configuration override.
        embeddings_manager: Optional embeddings manager to reuse.
        limit: Optional maximum number of documents to process.

    Returns:
        Tuple of (embedded_ids, results) where embedded_ids contains document IDs
@@ -513,6 +523,10 @@ def process_embed_phase(
    results: dict[str, ProcessingStatus] = {}
    embedded_ids: list[str] = []

    # Apply limit for testing
    if limit is not None and limit > 0:
        document_ids = document_ids[:limit]

    for document_id in document_ids:
        folder_path = _resolve_document_folder(
            document_id=document_id,
@@ -572,6 +586,7 @@ def process_graph_phase(
    workspace: str | None = None,
    config: AiConfig | None = None,
    embeddings_manager: EmbeddingsManager | None = None,
    limit: int | None = None,
) -> tuple[list[str], dict[str, ProcessingStatus]]:
    """Run the graph phase for embedded documents.

@@ -586,6 +601,7 @@ def process_graph_phase(
        workspace: Optional workspace name (defaults to "default").
        config: Optional AI configuration override.
        embeddings_manager: Optional embeddings manager to reuse.
        limit: Optional maximum number of documents to process.

    Returns:
        Tuple of (graphed_ids, results) where graphed_ids contains document IDs
@@ -597,6 +613,10 @@ def process_graph_phase(
    results: dict[str, ProcessingStatus] = {}
    graphed_ids: list[str] = []

    # Apply limit for testing
    if limit is not None and limit > 0:
        document_ids = document_ids[:limit]

    for document_id in document_ids:
        status = _resolve_graph_status(storage, document_id, normalized_workspace)
        if status is None:
+7 −4
Original line number Diff line number Diff line
@@ -15,8 +15,8 @@ from rich.progress import (
    SpinnerColumn,
    TaskProgressColumn,
    TextColumn,
    TimeElapsedColumn,
    TimeRemainingColumn,
    TransferSpeedColumn,
)
from rich.table import Table
from tdoc_ai import (
@@ -88,7 +88,7 @@ from tdoc_crawler.cli.args import (
    WorkspaceReleaseOption,
)
from tdoc_crawler.config import CacheManager, resolve_cache_manager
from tdoc_crawler.database import TDocDatabase
from tdoc_crawler.database import SpecDatabase, TDocDatabase
from tdoc_crawler.logging import get_console
from tdoc_crawler.tdocs.models import TDocQueryConfig
from tdoc_crawler.utils.date_parser import parse_partial_date
@@ -602,7 +602,7 @@ def workspace_add_members(
        if source_kind == SourceKind.SPEC and release:
            try:
                # Get available versions for this spec from database
                with TDocDatabase(manager.db_file) as db:
                with SpecDatabase(manager.db_file) as db:
                    available_versions = db.get_spec_versions(item)
                    if available_versions:
                        version_list = [v.version for v in available_versions]
@@ -716,7 +716,7 @@ def workspace_process(
        TextColumn("[progress.description]{task.description}"),
        BarColumn(),
        TaskProgressColumn(),
        TransferSpeedColumn(),
        TimeElapsedColumn(),
        TimeRemainingColumn(),
        console=console,
        refresh_per_second=10,
@@ -729,6 +729,7 @@ def workspace_process(
            force_rerun=force_rerun,
            workspace=workspace,
            progress_callback=lambda _: progress.advance(extract_task),
            limit=limit,
        )

        embed_task = progress.add_task("[cyan]Phase 2: Embedding", total=len(extracted_ids))
@@ -738,6 +739,7 @@ def workspace_process(
            force_rerun=force_rerun,
            workspace=workspace,
            progress_callback=lambda _: progress.advance(embed_task),
            limit=limit,
        )

        graph_task = progress.add_task("[cyan]Phase 3: Building Graph", total=len(embedded_ids))
@@ -746,6 +748,7 @@ def workspace_process(
            checkout_base=manager.root,
            workspace=workspace,
            progress_callback=lambda _: progress.advance(graph_task),
            limit=limit,
        )

        results = {**extract_results, **embed_results, **graph_results}