Commit 76fba02a authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(ai): streamline AI CLI integration and enhance cache management

* Remove unused ai_query import from ai.py
* Update AI commands registration in tdoc_app.py to handle optional dependency
* Introduce DEFAULT_AI_CACHE_DIRNAME for AI-related files in config
* Refactor CacheManager to use new AI cache directory structure
parent da60dace
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
"""Compatibility shim for legacy AI CLI import path."""

from tdoc_crawler.cli.ai_app import ai_app, ai_query
from tdoc_crawler.cli.ai_app import ai_app

__all__ = ["ai_app", "ai_query"]
__all__ = ["ai_app"]
+185 −65
Original line number Diff line number Diff line
@@ -19,7 +19,6 @@ from rich.progress import (
    TaskProgressColumn,
    TextColumn,
    TimeElapsedColumn,
    TimeRemainingColumn,
)
from rich.table import Table

@@ -68,7 +67,6 @@ console = get_console()
_logger = logging.getLogger(__name__)
_ai_loaded: list[bool] = [False]
_ai_config: Any = None
_ai_embeddings: Any = None
checkout_spec_to_workspace: Any = None
checkout_tdoc_to_workspace: Any = None
convert_document: Any = None
@@ -101,9 +99,7 @@ if _rag_app is not None:


def _exit_ai_missing() -> NoReturn:
    console.print("[red]AI features require the 3gpp-ai package.[/red]")
    console.print("[yellow]Install with: pip install 3gpp-crawler[ai][/yellow]")
    raise typer.Exit(code=1)
    console.print("[yellow]Install with: uv sync --extra ai[/yellow]")


def _load_ai() -> None:
@@ -117,27 +113,24 @@ def _load_ai() -> None:
    tdoc_ai: Any | None = None
    config: Any | None = None
    models: Any | None = None
    embeddings: Any | None = None
    registry: Any | None = None
    workspaces: Any | None = None
    try:
        tdoc_ai = importlib.import_module("threegpp_ai")
        config = importlib.import_module("threegpp_ai.config")
        models = importlib.import_module("threegpp_ai.models")
        embeddings = importlib.import_module("threegpp_ai.operations.embeddings")
        registry = importlib.import_module("threegpp_ai.operations.workspace_registry")
        workspaces = importlib.import_module("threegpp_ai.operations.workspaces")
    except Exception:
        _exit_ai_missing()

    if tdoc_ai is None or config is None or models is None or embeddings is None or registry is None or workspaces is None:
    if tdoc_ai is None or config is None or models is None or registry is None or workspaces is None:
        _exit_ai_missing()

    # TODO: This construct here is total bullshit??! Just import everything correctly?!
    globals().update(
        {
            "_ai_config": config,
            "_ai_embeddings": embeddings,
            "checkout_spec_to_workspace": tdoc_ai.checkout_spec_to_workspace,
            "checkout_tdoc_to_workspace": tdoc_ai.checkout_tdoc_to_workspace,
            "convert_document": tdoc_ai.convert_document,
@@ -419,19 +412,45 @@ def workspace_clear(
    workspace: WorkspaceNameOption = None,
    json_output: JsonOutputOption = False,
) -> None:
    """Clear all AI artifacts (embeddings, summaries, etc.) while preserving workspace members."""
    """Clear all LightRAG artifacts (vectors, graph, KV) while preserving workspace members.

    This removes the embedding model directory for the workspace, forcing
    complete reprocessing on next 'workspace process' run.
    """
    import shutil

    workspace = resolve_workspace(workspace)
    storage = _ai_embeddings.EmbeddingsManager(_ai_config.AiConfig.from_env()).storage
    manager = _get_cache_manager()

    # Build the workspace directory path (matches rag.py structure)
    # Structure: ~/.3gpp-crawler/lightrag/{embedding_model}/{workspace}/
    from threegpp_ai.lightrag.config import LightRAGConfig

    removed_count = storage.clear_workspace_artifacts(workspace)
    config = LightRAGConfig()
    embedding_model_safe = config.embedding.model.replace(":", "-").replace("/", "-")
    working_dir = manager.ai_embed_dir(embedding_model_safe) / workspace

    if not working_dir.exists():
        if json_output:
        typer.echo(json.dumps({"workspace": normalize_workspace_name(workspace), "cleared_artifacts": removed_count}))
    elif removed_count > 0:
        console.print(f"[green]Cleared {removed_count} artifact(s) from workspace '{normalize_workspace_name(workspace)}'[/green]")
        console.print("[yellow]Note: Workspace members are preserved. Run 'ai workspace process' to regenerate artifacts.[/yellow]")
            typer.echo(json.dumps({"workspace": normalize_workspace_name(workspace), "cleared": False, "message": "No artifacts found"}))
        else:
        console.print(f"[yellow]No artifacts found in workspace '{normalize_workspace_name(workspace)}'[/yellow]")
            console.print(f"[yellow]No LightRAG artifacts found for workspace '{normalize_workspace_name(workspace)}'[/yellow]")
        return

    # Remove the directory
    try:
        shutil.rmtree(working_dir)
        if json_output:
            typer.echo(json.dumps({"workspace": normalize_workspace_name(workspace), "cleared": True, "path": str(working_dir)}))
        else:
            console.print(f"[green]Cleared LightRAG artifacts for workspace '{normalize_workspace_name(workspace)}'[/green]")
            console.print(f"[dim]Removed: {working_dir}[/dim]")
    except Exception as e:
        if json_output:
            typer.echo(json.dumps({"error": f"Failed to clear artifacts: {e}"}))
        else:
            console.print(f"[red]Failed to clear artifacts: {e}[/red]")
        raise typer.Exit(1) from e


@_workspace_app.command("add-members")
@@ -662,86 +681,187 @@ def workspace_process(
    json_output: JsonOutputOption = False,
    limit: ProcessLimitOption = None,
) -> None:
    """Process all active document members in a workspace through the AI pipeline."""
    """Process all active document members in a workspace through LightRAG.

    Extracts text from documents using kreuzberg and builds the knowledge graph.
    """
    import asyncio

    workspace = resolve_workspace(workspace)
    manager = _get_cache_manager()

    # Get workspace members
    members = list_workspace_members(workspace, include_inactive=False)
    document_ids = [m.source_item_id for m in members if m.is_active]
    if not members:
        if json_output:
            typer.echo(json.dumps({"workspace": normalize_workspace_name(workspace), "processed": 0, "message": "No active members found"}))
        else:
            console.print(f"[yellow]No active members found in workspace '{normalize_workspace_name(workspace)}'[/yellow]")
        return

    # Apply limit for testing
    if limit is not None and limit > 0:
        document_ids = document_ids[:limit]
        members = members[:limit]
        console.print(f"[cyan]Limited to {limit} documents for testing[/cyan]")

    if not document_ids:
        if json_output:
            typer.echo(json.dumps({"workspace": normalize_workspace_name(workspace), "processed": 0, "message": "No active members found"}))
    async def _process() -> list[dict]:
        """Process all members asynchronously."""
        from threegpp_ai.lightrag.config import LightRAGConfig
        from threegpp_ai.lightrag.processor import TDocProcessor

        config = LightRAGConfig()
        processor = TDocProcessor(config)
        results = []

        import asyncio

        from threegpp_ai.lightrag.config import LightRAGConfig
        from threegpp_ai.lightrag.metadata import RAGMetadata
        from threegpp_ai.lightrag.processor import TDocProcessor

        config = LightRAGConfig()
        processor = TDocProcessor(config)
        results = []

        # Start LightRAG for this workspace
        await processor.rag.start(workspace)
        try:
            for member in members:
                source_path = Path(member.source_path)

                # Find the actual document file (source_path might be a directory)
                if source_path.is_dir():
                    # Look for supported files in the directory
                    file_path = None
                    for ext in [".docx", ".doc", ".pdf", ".ppt", ".pptx"]:
                        matches = list(source_path.glob(f"*{ext}"))
                        if matches:
                            file_path = matches[0]
                            break
                    if file_path is None:
                        results.append(
                            {
                                "source_item_id": member.source_item_id,
                                "status": "skipped",
                                "reason": f"no supported files found in {source_path}",
                            }
                        )
                        continue
                elif source_path.is_file():
                    file_path = source_path
                else:
            console.print(f"[yellow]No active members found in workspace '{normalize_workspace_name(workspace)}'[/yellow]")
        return
                    results.append(
                        {
                            "source_item_id": member.source_item_id,
                            "status": "skipped",
                            "reason": "path not found",
                        }
                    )
                    continue

                if not file_path.exists():
                    results.append(
                        {
                            "source_item_id": member.source_item_id,
                            "status": "skipped",
                            "reason": "file not found",
                        }
                    )
                    continue

                # Check if already processed (skip if new_only and exists in graph)
                # TODO: Implement proper skip logic based on LightRAG status

                # Extract metadata for enrichment (for TDocs)
                metadata = None
                if member.source_item_id.startswith(("S", "R", "C", "T")):  # TDoc ID pattern
                    try:
                        from tdoc_crawler.database import TDocDatabase
                        from tdoc_crawler.config import resolve_cache_manager

                        manager = resolve_cache_manager()
                        with TDocDatabase(manager.db_file) as db:
                            tdoc_meta = db.get_tdoc(member.source_item_id)
                            if tdoc_meta:
                                metadata = RAGMetadata(
                                    tdoc_id=tdoc_meta.tdoc_id,
                                    title=tdoc_meta.title or "",
                                    meeting_id=tdoc_meta.meeting_id,
                                    source=tdoc_meta.source or "",
                                    tdoc_type=tdoc_meta.tdoc_type or "",
                                    for_purpose=tdoc_meta.for_purpose or "",
                                )
                    except Exception as e:
                        logger.debug(f"Could not load metadata for {member.source_item_id}: {e}")

                result = await processor.process_file(file_path, workspace, metadata=metadata)
                results.append(
                    {
                        "source_item_id": member.source_item_id,
                        "file": str(file_path),
                        "status": result.status.value,
                        "chars_extracted": result.chars_extracted,
                        "reason": result.reason,
                        "error": result.error,
                    }
                )
        finally:
            await processor.rag.stop()

    process_extract = _ai_embeddings.process_extract_phase
    process_embed = _ai_embeddings.process_embed_phase
    process_graph = _ai_embeddings.process_graph_phase
        return results

        return results

    # Run async processing
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        BarColumn(),
        TaskProgressColumn(),
        TimeElapsedColumn(),
        TimeRemainingColumn(),
        console=console,
        refresh_per_second=10,
    ) as progress:
        extract_task = progress.add_task("[cyan]Phase 1: Classifying/Extracting", total=len(document_ids))
        extracted_ids, extract_results = process_extract(
            document_ids=document_ids,
            checkout_base=manager.root,
            new_only=new_only,
            force_rerun=force_rerun,
            workspace=workspace,
            progress_callback=lambda _: progress.advance(extract_task),
            limit=limit,
        )
        process_task = progress.add_task("[cyan]Processing documents", total=len(members))

        embed_task = progress.add_task("[cyan]Phase 2: Embedding", total=len(extracted_ids))
        embedded_ids, embed_results = process_embed(
            document_ids=extracted_ids,
            checkout_base=manager.root,
            force_rerun=force_rerun,
            workspace=workspace,
            progress_callback=lambda _: progress.advance(embed_task),
            limit=limit,
        )
        results = asyncio.run(_process())

        graph_task = progress.add_task("[cyan]Phase 3: Building Graph", total=len(embedded_ids))
        _, graph_results = process_graph(
            document_ids=embedded_ids,
            checkout_base=manager.root,
            workspace=workspace,
            progress_callback=lambda _: progress.advance(graph_task),
            limit=limit,
        )
        # Update progress
        for _ in results:
            progress.advance(process_task)

        results = {**extract_results, **embed_results, **graph_results}
    # Report results
    success_count = sum(1 for r in results if r["status"] == "success")
    skipped_count = sum(1 for r in results if r["status"] == "skipped")
    error_count = sum(1 for r in results if r["status"] == "error")

    console.print(f"[green]OK: Processed {len(results)} documents[/green]")
    if json_output:
        typer.echo(
            json.dumps(
                {
                    "workspace": normalize_workspace_name(workspace),
                    "processed": len(results),
                    "total_members": len(document_ids),
                    "document_ids": list(results.keys()),
                    "processed": success_count,
                    "skipped": skipped_count,
                    "errors": error_count,
                    "total": len(members),
                    "results": results,
                }
            )
        )
    else:
        console.print(f"[green]Processed {len(results)}/{len(document_ids)} document(s) in workspace '{normalize_workspace_name(workspace)}'[/green]")
        console.print(f"\n[green]Processed: {success_count}[/green]")
        if skipped_count > 0:
            console.print(f"[yellow]Skipped: {skipped_count}[/yellow]")
        if error_count > 0:
            console.print(f"[red]Errors: {error_count}[/red]")
        console.print(f"[cyan]Total members: {len(members)}[/cyan]")

        # Show errors if any
        errors = [r for r in results if r["status"] == "error"]
        if errors:
            console.print("\n[bold red]Errors:[/bold red]")
            for err in errors:
                console.print(f"  - {err['source_item_id']}: {err.get('error', 'unknown error')}")


@_workspace_app.command("delete")
+14 −0
Original line number Diff line number Diff line
@@ -35,6 +35,11 @@ from tdoc_crawler.tdocs.models import TDocQueryConfig
from tdoc_crawler.tdocs.operations.checkout import checkout_tdoc, prepare_tdoc_file
from tdoc_crawler.tdocs.operations.fetch import fetch_missing_tdocs

try:
    from tdoc_crawler.cli.ai import ai_app
except ImportError:  # pragma: no cover - optional dependency
    ai_app = None

load_dotenv()

app = typer.Typer(help="3GPP Crawler - TDocs and Meetings")
@@ -212,4 +217,13 @@ app.command("qt", rich_help_panel=HELP_PANEL_QUERY, hidden=True)(query_tdocs)
app.command("qm", rich_help_panel=HELP_PANEL_QUERY, hidden=True)(query_meetings)


def _register_optional_ai_commands() -> None:
    """Register AI commands only when AI extras are available."""
    if ai_app is None:
        return
    app.add_typer(ai_app, name="ai", help="AI document processing")


_register_optional_ai_commands()

__all__ = ["app"]
+6 −4
Original line number Diff line number Diff line
@@ -13,7 +13,9 @@ DEFAULT_HTTP_CACHE_FILENAME = "http-cache.sqlite3"
DEFAULT_CHECKOUT_DIRNAME = "checkout"
DEFAULT_MANAGER = "default"

REGISTRY_FILENAME = "workspaces.json"
DEFAULT_AI_CACHE_DIRNAME = "lightrag" # subdirectory under root cache dir for AI-related files

WORKSPACE_REGISTRY_FILENAME = "workspaces.json"

_cache_managers: dict[str, CacheManager] = {}

@@ -60,7 +62,7 @@ class CacheManager:
        Args:
            root_path: Explicit root path. If None, tries TDC_CACHE_DIR env var,
                       then falls back to DEFAULT_CACHE_DIR.
            ai_cache_dir: Explicit AI cache directory path. If None, defaults to root_path/.ai
            ai_cache_dir: Explicit AI cache directory path. If None, defaults to root_path/lightrag or TDC_AI_STORE_PATH env var if set.
            name: Optional name to register this manager under. If provided, the manager is registered upon initialization.
            ensure_paths: If True, will create the root directory if it doesn't exist.
        """
@@ -79,7 +81,7 @@ class CacheManager:
            if env_ai_cache_dir:
                self.ai_cache_dir = Path(env_ai_cache_dir).resolve()
            else:
                self.ai_cache_dir = (self.root / ".ai").resolve()
                self.ai_cache_dir = (self.root / DEFAULT_AI_CACHE_DIRNAME).resolve()

        if ensure_paths:
            self.ensure_paths()
@@ -116,7 +118,7 @@ class CacheManager:
    @property
    def ai_workspace_file(self) -> Path:
        """Path to the workspace registry file for a specific workspace."""
        return self.ai_cache_dir / REGISTRY_FILENAME
        return self.ai_cache_dir / WORKSPACE_REGISTRY_FILENAME

    def ensure_paths(self) -> None:
        """Ensure the root cache directory exists."""