Commit da28314d authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(3gpp-ai): add --device, --threads, --batch-size, --vlm, --embed CLI...

feat(3gpp-ai): add --device, --threads, --batch-size, --vlm, --embed CLI options for workspace commands

Add accelerator, VLM, and embed CLI options to workspace add-members and workspace process.
--device: compute device for Docling extraction (auto, cpu, cuda, mps, xpu)
--threads: number of threads for CPU-bound operations
--batch-size: batch size for OCR, layout, and table structure
--vlm: enable VLM picture description and formula enrichment
--embed: insert into LightRAG knowledge graph (implies --convert-md)
Also refactor workspace_add_members to extract _checkout_and_convert_items helper
parent 5e1ae06f
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@ cls
call .venv\scripts\activate.bat

:: SET TDC_AI_CONVERT_MD=1
SET TDC_AI_VLM=1

tdoc-crawler crawl-meetings -s S4
tdoc-crawler crawl-tdocs --start-date 2016
+209 −54
Original line number Diff line number Diff line
@@ -46,6 +46,9 @@ from threegpp_ai import (
    summarize_document,
)
from threegpp_ai.args import (
    AcceleratorBatchSizeOption,
    AcceleratorDeviceOption,
    AcceleratorThreadsOption,
    AgendaPatternExcludeOption,
    AgendaPatternOption,
    CacheDirOption,
@@ -69,6 +72,7 @@ from threegpp_ai.args import (
    WorkspaceActivateOption,
    WorkspaceAutoBuildOption,
    WorkspaceCheckoutOption,
    WorkspaceEmbedOption,
    WorkspaceIncludeInactiveOption,
    WorkspaceItemsArgument,
    WorkspaceKindOption,
@@ -88,7 +92,7 @@ from threegpp_ai.models import WorkspaceNotFoundError
from threegpp_ai.operations.classify import pick_main_document
from threegpp_ai.operations.conversion import OFFICE_FORMATS, convert_to_pdf
from threegpp_ai.operations.convert import convert_document_to_markdown
from threegpp_ai.operations.extraction import VlmOptions, extract_document_structured
from threegpp_ai.operations.extraction import AcceleratorConfig, VlmOptions, extract_document_structured
from threegpp_ai.operations.workspace_registry import WorkspaceRegistry, normalize_spec_member_id
from threegpp_ai.operations.workspace_utils import check_pdf_status

@@ -278,6 +282,8 @@ async def _process_single_item(
    convert_pdf: bool,
    convert_md: bool = False,
    manager: CacheManager,
    vlm_options: VlmOptions | None = None,
    accelerator_config: AcceleratorConfig | None = None,
) -> tuple[Any | None, str | None, bool, bool]:
    """Process a single workspace item (checkout + optional PDF conversion + optional markdown extraction).

@@ -290,6 +296,8 @@ async def _process_single_item(
        convert_pdf: Whether to convert to PDF
        convert_md: Whether to extract markdown (implies convert_pdf)
        manager: CacheManager for paths
        vlm_options: Optional VLM features for extraction.
        accelerator_config: Optional accelerator settings for GPU/CPU and threading.

    Returns:
        Tuple of (member, skip_reason, was_converted, was_md_extracted)
@@ -337,12 +345,24 @@ async def _process_single_item(
        try:
            if source_kind == SourceKind.TDOC:
                # TDoc extraction - uses TDoc ID to fetch files
                convert_document_to_markdown(document_id=item, output_path=None, force=False)
                convert_document_to_markdown(
                    document_id=item,
                    output_path=None,
                    force=False,
                    vlm_options=vlm_options,
                    accelerator_config=accelerator_config,
                )
            else:
                # Generic extraction (specs, other) - uses file path directly
                doc_file = _resolve_process_file(Path(source_path))
                if doc_file:
                    extract_document_structured(doc_file, metadata=None, force=False)
                    extract_document_structured(
                        doc_file,
                        metadata=None,
                        force=False,
                        vlm_options=vlm_options,
                        accelerator_config=accelerator_config,
                    )
            was_md_extracted = True
        except Exception as e:
            _logger.debug("Failed to extract markdown for %s: %s", item, e)
@@ -456,6 +476,7 @@ async def _process_workspace_members(
    checkout: bool = True,
    convert_md: bool = False,
    vlm_options: VlmOptions | None = None,
    accelerator_config: AcceleratorConfig | None = None,
) -> list[dict[str, Any]]:
    """Process workspace members with optional progress callback.

@@ -466,6 +487,7 @@ async def _process_workspace_members(
        checkout: Whether to checkout documents if not available
        convert_md: Whether to extract markdown (implies PDF conversion)
        vlm_options: Optional VLM features for extraction.
        accelerator_config: Optional accelerator settings for GPU/CPU and threading.

    Returns:
        List of processing results
@@ -534,7 +556,9 @@ async def _process_workspace_members(
                        continue

            metadata = await _try_build_tdoc_metadata(member.source_item_id)
            process_result = await processor.process_file(file_path, workspace, metadata=metadata, vlm_options=vlm_options)
            process_result = await processor.process_file(
                file_path, workspace, metadata=metadata, vlm_options=vlm_options, accelerator_config=accelerator_config
            )
            results.append(
                {
                    "source_item_id": member.source_item_id,
@@ -816,6 +840,75 @@ def workspace_clear(
    console.print(f"[green]Cleared LightRAG artifacts for '{workspace_name}'[/green]")


def _checkout_and_convert_items(
    resolved_items: list[str],
    *,
    workspace_name: str,
    source_kind: SourceKind,
    checkout: bool,
    release: str | None,
    convert_pdf: bool,
    convert_md: bool,
    vlm_options: VlmOptions | None,
    accelerator_config: AcceleratorConfig,
) -> tuple[list[Any], list[tuple[str, str]], int, int]:
    """Checkout, optionally convert to PDF, and optionally extract markdown for items.

    Returns:
        Tuple of (members, skipped_items, converted_count, md_extracted_count)
    """
    manager = resolve_cache_manager()
    members: list[Any] = []
    skipped: list[tuple[str, str]] = []
    converted_count = 0
    md_extracted_count = 0

    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        MofNCompleteColumn(),
        TimeElapsedColumn(),
        console=console,
    ) as progress:
        task = progress.add_task(
            f"[cyan]Processing {len(resolved_items)} item(s)...",
            total=len(resolved_items),
        )

        async def _process_items() -> None:
            nonlocal converted_count, md_extracted_count
            for item in resolved_items:
                member, skip_reason, was_converted, was_md_extracted = await _process_single_item(
                    item=item,
                    workspace=workspace_name,
                    source_kind=source_kind,
                    checkout=checkout,
                    release=release,
                    convert_pdf=convert_pdf,
                    convert_md=convert_md,
                    manager=manager,
                    vlm_options=vlm_options,
                    accelerator_config=accelerator_config,
                )
                if skip_reason:
                    skipped.append((item, skip_reason))
                    progress.update(task, advance=1, description=f"[cyan]{item} (skipped)")
                else:
                    members.append(member)
                    if was_md_extracted:
                        md_extracted_count += 1
                        progress.update(task, advance=1, description=f"[cyan]{item} (markdown extracted)")
                    elif was_converted:
                        converted_count += 1
                        progress.update(task, advance=1, description=f"[cyan]{item} (converted)")
                    else:
                        progress.update(task, advance=1, description=f"[cyan]{item}")

        asyncio.run(_process_items())

    return members, skipped, converted_count, md_extracted_count


@workspace_app.command("add-members", help="Add members (TDocs/specs) to a workspace")
def workspace_add_members(
    workspace: WorkspaceNameOption = None,
@@ -834,6 +927,11 @@ def workspace_add_members(
    agenda: AgendaPatternOption = None,
    agenda_ex: AgendaPatternExcludeOption = None,
    limit: WorkspaceLimitOption = None,
    vlm: WorkspaceProcessVlmOption = False,
    embed: WorkspaceEmbedOption = False,
    device: AcceleratorDeviceOption = "auto",
    threads: AcceleratorThreadsOption = 4,
    batch_size: AcceleratorBatchSizeOption = 4,
    verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose output (INFO level logging)"),
) -> None:
    # Set log level based on verbosity
@@ -845,6 +943,16 @@ def workspace_add_members(
    kind_normalized = kind.lower().rstrip("s")
    source_kind = SourceKind(kind_normalized) if kind_normalized in {entry.value for entry in SourceKind} else SourceKind.OTHER

    # --embed implies --convert-md
    if embed:
        convert_md = True

    # Build VLM and accelerator options for extraction
    vlm_options: VlmOptions | None = None
    if vlm:
        vlm_options = VlmOptions(enable_picture_description=True, enable_formula_enrichment=True)
    accelerator_config = AcceleratorConfig(device=device, num_threads=threads, batch_size=batch_size)

    # Phase 1: Resolve items - either directly provided or via database query
    if items is not None:
        resolved_items = items
@@ -855,7 +963,7 @@ def workspace_add_members(
            TextColumn("[progress.description]{task.description}"),
            console=console,
        ) as progress:
            task = progress.add_task("[cyan]Querying database...", total=None)
            progress.add_task("[cyan]Querying database...", total=None)
            resolved_items = _resolve_workspace_items(
                items=None,
                source_kind=source_kind,
@@ -873,53 +981,18 @@ def workspace_add_members(
            console.print("[yellow]No items match the provided filters[/yellow]")
            return

    # Phase 2: Checkout + optional PDF conversion - single progress bar, one tick per document
    manager = resolve_cache_manager()
    members: list[Any] = []
    skipped: list[tuple[str, str]] = []
    converted_count = 0
    md_extracted_count = 0

    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        MofNCompleteColumn(),
        TimeElapsedColumn(),
        console=console,
    ) as progress:
        task = progress.add_task(
            f"[cyan]Processing {len(resolved_items)} item(s)...",
            total=len(resolved_items),
        )

        async def _process_items() -> None:
            nonlocal converted_count, md_extracted_count
            for item in resolved_items:
                member, skip_reason, was_converted, was_md_extracted = await _process_single_item(
                    item=item,
                    workspace=workspace_name,
    # Phase 2: Checkout + optional PDF conversion + optional markdown extraction
    members, skipped, converted_count, md_extracted_count = _checkout_and_convert_items(
        resolved_items,
        workspace_name=workspace_name,
        source_kind=source_kind,
        checkout=checkout,
        release=release,
        convert_pdf=convert_pdf,
        convert_md=convert_md,
                    manager=manager,
        vlm_options=vlm_options,
        accelerator_config=accelerator_config,
    )
                if skip_reason:
                    skipped.append((item, skip_reason))
                    progress.update(task, advance=1, description=f"[cyan]{item} (skipped)")
                else:
                    members.append(member)
                    if was_md_extracted:
                        md_extracted_count += 1
                        progress.update(task, advance=1, description=f"[cyan]{item} (markdown extracted)")
                    elif was_converted:
                        converted_count += 1
                        progress.update(task, advance=1, description=f"[cyan]{item} (converted)")
                    else:
                        progress.update(task, advance=1, description=f"[cyan]{item}")

        asyncio.run(_process_items())

    if skipped:
        console.print("\n[yellow]Skipped invalid items:[/yellow]")
@@ -934,6 +1007,74 @@ def workspace_add_members(
    added = add_workspace_members(workspace_name, members)
    console.print(f"[green]Added {added} member(s) to '{workspace_name}'[/green]")

    # Phase 3: Optional LightRAG embedding and graph insertion
    if embed and members:
        embedded_count = _embed_members(
            workspace_name,
            members,
            vlm_options=vlm_options,
            accelerator_config=accelerator_config,
        )
        console.print(f"[green]Embedded {embedded_count} document(s) into LightRAG[/green]")


def _embed_members(
    workspace_name: str,
    members: list[Any],
    *,
    vlm_options: VlmOptions | None = None,
    accelerator_config: AcceleratorConfig | None = None,
) -> int:
    """Insert workspace members into LightRAG knowledge graph.

    Uses the cached extraction results from .ai/ folders, so extraction
    is a fast cache hit. The main work is chunking, embedding, and graph insertion.

    Returns:
        Number of successfully embedded documents.
    """
    processor = DocumentProcessor(LightRAGConfig.from_env())
    resolve_cache_manager()
    embedded = 0

    async def _run() -> None:
        nonlocal embedded
        await processor.rag.start(workspace_name)
        try:
            with Progress(
                SpinnerColumn(),
                TextColumn("[progress.description]{task.description}"),
                MofNCompleteColumn(),
                TimeElapsedColumn(),
                console=console,
            ) as progress:
                task = progress.add_task("[cyan]Embedding...", total=len(members))

                for member in members:
                    file_path = _resolve_process_file(Path(member.source_path))
                    if file_path is None or not file_path.exists():
                        progress.update(task, advance=1, description=f"[cyan]{member.source_item_id} (no file)")
                        continue

                    metadata = await _try_build_tdoc_metadata(member.source_item_id)
                    result = await processor.process_file(
                        file_path,
                        workspace_name,
                        metadata=metadata,
                        vlm_options=vlm_options,
                        accelerator_config=accelerator_config,
                    )
                    if result.status.value == "success":
                        embedded += 1
                    status = "ok" if result.status.value == "success" else result.status.value
                    progress.update(task, advance=1, description=f"[cyan]{member.source_item_id} ({status})")

        finally:
            await processor.rag.stop()

    asyncio.run(_run())
    return embedded


@workspace_app.command("list-members", help="List members in a workspace")
def workspace_list_members(
@@ -1004,7 +1145,10 @@ def workspace_process(
    limit: WorkspaceLimitOption = None,
    output_format: OutputFormatOption = OutputFormat.TABLE.value,
    checkout: WorkspaceCheckoutOption = True,
    vlm: WorkspaceProcessVlmOption = None,
    vlm: WorkspaceProcessVlmOption = False,
    device: AcceleratorDeviceOption = "auto",
    threads: AcceleratorThreadsOption = 4,
    batch_size: AcceleratorBatchSizeOption = 4,
) -> None:
    workspace_name = _resolve_workspace_name(workspace)

@@ -1029,6 +1173,9 @@ def workspace_process(
    if vlm:
        vlm_options = VlmOptions(enable_picture_description=True, enable_formula_enrichment=True)

    # Build accelerator config from CLI options
    accelerator_config = AcceleratorConfig(device=device, num_threads=threads, batch_size=batch_size)

    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
@@ -1045,7 +1192,15 @@ def workspace_process(
            progress.update(task, completed=completed, description=f"[cyan]{source_item_id}")

        results = asyncio.run(
            _process_workspace_members(workspace_name, members, on_progress=on_progress, checkout=checkout, convert_md=True, vlm_options=vlm_options)
            _process_workspace_members(
                workspace_name,
                members,
                on_progress=on_progress,
                checkout=checkout,
                convert_md=True,
                vlm_options=vlm_options,
                accelerator_config=accelerator_config,
            )
        )
        progress.update(task, completed=len(results), description="[cyan]Processing complete")