Commit 0fd889f6 authored by Jan Reimes's avatar Jan Reimes
Browse files

refactor(cli): remove rag subcommand, consolidate under workspace

- Remove rag_app and add_typer from main CLI
- Add workspace query (was rag query)
- Add workspace status (was rag status)
- Remove migrate command and migration.py module
- Add help strings to all workspace commands
- Mark lightrag/cli.py as deprecated
parent e1f50943
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ This document provides a chronological log of all significant changes and improv

## Recent Changes

- **2026-03-27**: [CLI: Remove rag subcommand and consolidate under workspace](history/2026-03-27_SUMMARY_cli_rag_subcommand_removal.md)
- **2026-03-25**: [3gpp-ai list-providers CLI command](history/2026-03-25_SUMMARY_list_providers_cli_command.md)
- **2026-03-25**: [Enhanced RAG pipeline with tables, figures, and equations](history/2026-03-25_SUMMARY_enhanced_rag_pipeline_tables_figures_equations.md)
- **2026-03-24**: [Convert and summarize commands implementation](history/2026-03-24_SUMMARY_convert_summarize_commands_implementation.md)
+59 −0
Original line number Diff line number Diff line
# CLI: Remove rag subcommand and consolidate under workspace

**Date:** 2026-03-27

**Goal:** Simplify CLI by removing redundant `rag` subcommand and consolidating RAG functionality under `workspace`.

## Changes

### Removed Commands
- `3gpp-ai rag migrate` - Removed migration functionality entirely
- `3gpp-ai rag query` - Moved to `workspace query`
- `3gpp-ai rag status` - Moved to `workspace status`

### New Commands (under workspace)
- `3gpp-ai workspace query` - Query the LightRAG knowledge graph
- `3gpp-ai workspace status` - Show LightRAG configuration and status

### Files Modified
- `packages/3gpp-ai/threegpp_ai/cli.py`
  - Removed `rag_app` import and `app.add_typer(rag_app, name="rag")`
  - Added `workspace query` command
  - Added `workspace status` command
  - Added help strings to all workspace commands
  - Added imports: `Annotated`, `QueryMode`, `StorageBackend`, `TDocRAG`

- `packages/3gpp-ai/threegpp_ai/lightrag/cli.py`
  - Removed migrate command
  - Removed migration imports
  - Marked module as deprecated

- `packages/3gpp-ai/threegpp_ai/lightrag/migration.py` - Deleted

### New CLI Structure
```
3gpp-ai --help
├── summarize   (existing)
├── convert     (existing)
├── workspace   (enhanced)
│   ├── create
│   ├── list
│   ├── query   (NEW - was rag query)
│   ├── status  (NEW - was rag status)
│   ├── info
│   ├── activate
│   ├── deactivate
│   ├── clear-invalid
│   ├── clear
│   ├── add-members
│   ├── list-members
│   ├── process
│   └── delete
└── providers   (existing)
```

## Rationale

The `rag` subcommand was redundant since RAG functionality is workspace-centric. Moving `query` and `status` under `workspace` provides a cleaner command hierarchy and aligns with the workspace-based architecture.

The migration functionality was removed as it was deemed unnecessary for the current workflow - shared storage is the default behavior for new workspaces.
+108 −16
Original line number Diff line number Diff line
@@ -12,7 +12,7 @@ import shutil
from collections.abc import Callable
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Literal
from typing import Annotated, Any, Literal

import typer
import yaml
@@ -78,11 +78,10 @@ from threegpp_ai.args import (
    WorkspaceProcessForceOption,
    WorkspaceReleaseOption,
)
from threegpp_ai.lightrag.cli import app as rag_app
from threegpp_ai.lightrag.config import LightRAGConfig
from threegpp_ai.lightrag.config import LightRAGConfig, QueryMode, StorageBackend
from threegpp_ai.lightrag.metadata import RAGMetadata
from threegpp_ai.lightrag.processor import DocumentProcessor
from threegpp_ai.lightrag.rag import PROVIDER_ALIASES, PROVIDERS
from threegpp_ai.lightrag.rag import PROVIDER_ALIASES, PROVIDERS, TDocRAG
from threegpp_ai.models import WorkspaceNotFoundError
from threegpp_ai.operations.classify import pick_main_document
from threegpp_ai.operations.conversion import OFFICE_FORMATS, convert_to_pdf
@@ -98,7 +97,6 @@ workspace_app = typer.Typer(help="Manage GraphRAG workspaces")
providers_app = typer.Typer(help="List and manage AI providers")
app.add_typer(workspace_app, name="workspace")
app.add_typer(providers_app, name="providers")
app.add_typer(rag_app, name="rag")

console = get_console()
_logger = get_logger(__name__)
@@ -562,7 +560,7 @@ def ai_convert(
    typer.echo(markdown_or_path)


@workspace_app.command("create")
@workspace_app.command("create", help="Create a new workspace")
def workspace_create(
    name: WorkspaceNameArgument,
    auto_build: WorkspaceAutoBuildOption = False,
@@ -583,7 +581,7 @@ def workspace_create(
        console.print("[cyan]Activated as current workspace[/cyan]")


@workspace_app.command("list")
@workspace_app.command("list", help="List all workspaces")
def workspace_list(
    json_output: JsonOutputOption = False,
) -> None:
@@ -619,7 +617,101 @@ def workspace_list(
    console.print(table)


@workspace_app.command("info")
@workspace_app.command("query")
def workspace_query(
    query: str = typer.Argument(..., help="Query string"),
    mode: Annotated[
        QueryMode,
        typer.Option(
            "--mode",
            "-m",
            case_sensitive=False,
            help="Query mode (local, global, hybrid, naive)",
        ),
    ] = QueryMode.HYBRID,
    workspace: WorkspaceNameOption = None,
    json_output: JsonOutputOption = False,
) -> None:
    """Query the LightRAG knowledge graph."""
    workspace_name = _resolve_workspace_name(workspace)

    if not query:
        console.print("[red]Error: query is required[/red]")
        raise typer.Exit(1)

    async def _run() -> str | None:
        config = LightRAGConfig.from_env()
        rag = TDocRAG(config)
        await rag.start(workspace_name)
        try:
            result = await rag.query(query, mode=mode)
            return result
        finally:
            await rag.stop()

    result = asyncio.run(_run())

    if json_output:
        typer.echo(json.dumps({"query": query, "mode": mode.value, "result": result}))
    else:
        console.print(f"\n[bold]Query:[/bold] {query}")
        console.print(f"[bold]Mode:[/bold] {mode.value}\n")
        if result:
            console.print(result)
        else:
            console.print("[yellow]No result returned[/yellow]")


@workspace_app.command("status")
def workspace_status(
    verbose: bool = typer.Option("--verbose", "-v", help="Show full configuration"),
) -> None:
    """Show LightRAG configuration and status."""
    config = LightRAGConfig.from_env()

    # Header
    console.print("\n[bold cyan]LightRAG Configuration[/bold cyan]")

    # Database backend
    backend = config.database.backend
    backend_icon = "🗄️" if backend == StorageBackend.PG0 else "📁"
    console.print(f"\n{backend_icon} [cyan]Storage backend:[/cyan] {backend.value}")

    if backend == StorageBackend.PG0:
        console.print(f"   Instance: [cyan]{config.database.pg0_instance_name}[/cyan]")
        console.print(f"   Port: [cyan]{config.database.pg0_port}[/cyan]")
        console.print(f"   Database: [cyan]{config.database.pg0_database}[/cyan]")
    else:
        console.print(f"   Working dir: [cyan]{config.working_dir}[/cyan]")

    # LLM
    console.print("\n🤖 [cyan]LLM:[/cyan]")
    console.print(f"   Model: [cyan]{config.llm.model}[/cyan]")
    console.print(f"   API Base: [cyan]{config.llm.api_base}[/cyan]")

    # Embedding
    console.print("\n🔢 [cyan]Embedding:[/cyan]")
    console.print(f"   Model: [cyan]{config.embedding.model}[/cyan]")
    console.print(f"   API Base: [cyan]{config.embedding.api_base}[/cyan]")

    # Query defaults
    console.print("\n🔍 [cyan]Query defaults:[/cyan]")
    console.print(f"   Mode: [cyan]{config.default_query_mode.value}[/cyan]")
    console.print(f"   Workspace: [cyan]{config.workspace}[/cyan]")

    # Shared storage status
    console.print(f"   Shared storage: [cyan]{'enabled' if config.shared_storage else 'disabled'}[/cyan]")

    if verbose:
        console.print("\n[bold]Full configuration:[/bold]")
        console.print(f"   working_dir: {config.working_dir}")
        console.print(f"   workspace: {config.workspace}")
        console.print(f"   env_prefix: {config.model_config.get('env_prefix', 'N/A')}")

    console.print()


@workspace_app.command("info", help="Show detailed information about a workspace")
def workspace_info(
    name: WorkspaceNameArgument,
    json_output: JsonOutputOption = False,
@@ -645,19 +737,19 @@ def workspace_info(
    console.print(table)


@workspace_app.command("activate")
@workspace_app.command("activate", help="Activate a workspace (set as current)")
def workspace_activate(name: WorkspaceNameArgument) -> None:
    set_active_workspace(name)
    console.print(f"[green]Activated workspace: {normalize_workspace_name(name)}[/green]")


@workspace_app.command("deactivate")
@workspace_app.command("deactivate", help="Deactivate current workspace (reset to default)")
def workspace_deactivate() -> None:
    set_active_workspace("default")
    console.print("[green]Reset active workspace to default[/green]")


@workspace_app.command("clear-invalid")
@workspace_app.command("clear-invalid", help="Remove invalid members from workspace")
def workspace_clear_invalid(
    workspace: WorkspaceNameOption = None,
) -> None:
@@ -669,7 +761,7 @@ def workspace_clear_invalid(
        console.print(f"[yellow]No invalid members found in '{workspace_name}'[/yellow]")


@workspace_app.command("clear")
@workspace_app.command("clear", help="Clear LightRAG artifacts for a workspace")
def workspace_clear(
    workspace: WorkspaceNameOption = None,
) -> None:
@@ -688,7 +780,7 @@ def workspace_clear(
    console.print(f"[green]Cleared LightRAG artifacts for '{workspace_name}'[/green]")


@workspace_app.command("add-members")
@workspace_app.command("add-members", help="Add members (TDocs/specs) to a workspace")
def workspace_add_members(
    workspace: WorkspaceNameOption = None,
    items: WorkspaceItemsArgument = None,
@@ -796,7 +888,7 @@ def workspace_add_members(
    console.print(f"[green]Added {added} member(s) to '{workspace_name}'[/green]")


@workspace_app.command("list-members")
@workspace_app.command("list-members", help="List members in a workspace")
def workspace_list_members(
    workspace: WorkspaceNameOption = None,
    include_inactive: WorkspaceIncludeInactiveOption = False,
@@ -837,7 +929,7 @@ def workspace_list_members(
    console.print(table)


@workspace_app.command("process")
@workspace_app.command("process", help="Process workspace members (checkout, convert, embed)")
def workspace_process(
    workspace: WorkspaceNameOption = None,
    force: WorkspaceProcessForceOption = False,
@@ -904,7 +996,7 @@ def workspace_process(
        console.print(f"[red]Errors: {error_count}[/red]")


@workspace_app.command("delete")
@workspace_app.command("delete", help="Delete a workspace")
def workspace_delete(
    name: WorkspaceNameArgument,
    preserve_artifacts: WorkspacePreserveArtifactsOption = True,
+3 −81
Original line number Diff line number Diff line
"""CLI commands for LightRAG integration.

This module provides Typer commands for:
- query: Query the knowledge graph
- status: Show LightRAG configuration and status
This module is deprecated - use workspace query and workspace status instead.

Note: These commands are designed to be integrated into the main CLI
via app.add_typer(rag_app). Direct import may cause circular dependencies.
Note: These commands are now integrated into the main CLI under workspace subcommand.
"""

from __future__ import annotations
@@ -16,13 +13,11 @@ from typing import Annotated

import typer
from rich.console import Console
from tdoc_crawler.config import CacheManager

from threegpp_ai.lightrag.config import LightRAGConfig, QueryMode, StorageBackend
from threegpp_ai.lightrag.migration import consolidate_workspaces, migrate_to_shared_storage
from threegpp_ai.lightrag.rag import TDocRAG

app = typer.Typer(name="rag", help="LightRAG knowledge graph commands")
app = typer.Typer(name="rag", help="LightRAG knowledge graph commands (deprecated)")
console = Console()


@@ -130,78 +125,5 @@ def show_status(
    console.print()


@app.command("migrate")
def migrate_storage(
    workspace: Annotated[
        str | None,
        typer.Option("--workspace", "-w", help="Specific workspace to migrate (default: all)"),
    ] = None,
    dry_run: Annotated[
        bool,
        typer.Option("--dry-run", "-n", help="Analyze without making changes"),
    ] = True,
    consolidate: Annotated[
        bool,
        typer.Option("--consolidate", "-c", help="Run full consolidation (removes old files)"),
    ] = False,
) -> None:
    """Migrate existing embeddings to shared storage.

    This command consolidates duplicate embeddings across workspaces into a shared layer,
    reducing storage usage and improving processing efficiency.

    Examples:
        # Analyze current state (dry run)
        tdoc-crawler ai rag migrate --dry-run

        # Migrate all workspaces
        tdoc-crawler ai rag migrate

        # Migrate specific workspace
        tdoc-crawler ai rag migrate -w atias

        # Full consolidation (removes old files)
        tdoc-crawler ai rag migrate --consolidate
    """
    config = LightRAGConfig.from_env()
    manager = CacheManager().register()
    embedding_model_safe = config.embedding.model.replace(":", "-").replace("/", "-")
    working_dir = manager.ai_embed_dir(embedding_model_safe)

    if not working_dir.exists():
        console.print(f"[yellow]Warning: Working directory does not exist yet: {working_dir}[/yellow]")
        console.print("[yellow]Run 'tdoc-crawler ai workspace process' first to create embeddings[/yellow]")
        raise typer.Exit(0)

    console.print(f"\n[bold]Working directory:[/bold] {working_dir}")
    console.print(f"[bold]Embedding model:[/bold] {config.embedding.model}")
    console.print(f"[bold]Workspace:[/bold] {workspace or 'all'}")
    console.print(f"[bold]Mode:[/bold] {'consolidate' if consolidate else 'migrate'}")
    console.print(f"[bold]Dry run:[/bold] {dry_run}\n")

    async def _migrate() -> None:
        if consolidate:
            await consolidate_workspaces(working_dir)
        else:
            stats = await migrate_to_shared_storage(
                working_dir,
                workspace=workspace,
                dry_run=dry_run,
            )

            console.print("\n[bold cyan]Migration Results[/bold cyan]")
            console.print(f"  Workspaces processed: [cyan]{stats['workspaces_processed']}[/cyan]")
            console.print(f"  Chunks migrated: [green]{stats['chunks_migrated']}[/green]")
            console.print(f"  Chunks deduplicated: [yellow]{stats['chunks_deduplicated']}[/yellow]")
            console.print(f"  Storage saved: [green]{stats['storage_saved_bytes'] / (1024 * 1024):.2f} MB[/green]")

            if stats["errors"]:
                console.print(f"\n[red]Errors:[/red] {len(stats['errors'])}")
                for error in stats["errors"]:
                    console.print(f"  - {error}")

    asyncio.run(_migrate())


if __name__ == "__main__":
    app()
+0 −188
Original line number Diff line number Diff line
"""Migration utilities for shared embedding storage.

This module provides tools to migrate existing workspace-specific embeddings
to the shared storage format, eliminating duplicates.

Usage:
    >>> from threegpp_ai.lightrag.migration import migrate_to_shared_storage
    >>> await migrate_to_shared_storage(working_dir, embedding_model)
"""

# This module provides essential functionality for migrating existing workspace
# embeddings to shared storage format, eliminating duplicates and improving efficiency.
# Used by the `tdoc-crawler ai rag migrate` command.

from __future__ import annotations

import json
from pathlib import Path
from typing import Any

from lightrag.utils import compute_mdhash_id
from tdoc_crawler.logging import get_logger

from .shared_storage import SharedNanoVectorDBStorage, WorkspaceIndex

logger = get_logger(__name__)


async def migrate_to_shared_storage(
    working_dir: Path,
    workspace: str | None = None,
    dry_run: bool = True,
) -> dict[str, Any]:
    """Migrate workspace-specific embeddings to shared storage.

    This function:
    1. Loads existing workspace embeddings
    2. Moves them to shared storage with deduplication
    3. Creates workspace index for tracking
    4. Optionally removes old workspace-specific files

    Args:
        working_dir: LightRAG working directory (already at embedding model level)
        workspace: Specific workspace to migrate (None = all workspaces)
        dry_run: If True, only analyze without making changes

    Returns:
        Migration statistics dict
    """
    stats = {
        "workspaces_processed": 0,
        "chunks_migrated": 0,
        "chunks_deduplicated": 0,
        "storage_saved_bytes": 0,
        "errors": [],
    }

    # working_dir is already at the embedding model level
    embedding_dir = working_dir
    if not embedding_dir.exists():
        logger.warning("Embedding directory does not exist: %s", embedding_dir)
        return stats

    # Find all workspace directories (exclude _shared)
    workspaces = []
    for item in embedding_dir.iterdir():
        if item.is_dir() and not item.name.startswith("_") and (workspace is None or item.name == workspace):
            workspaces.append(item)

    logger.info("Found %d workspace(s) to migrate", len(workspaces))

    # Initialize shared storage
    shared_storage = SharedNanoVectorDBStorage(
        global_config={"working_dir": str(embedding_dir)},
        namespace="text_chunks",
    )

    for ws_dir in workspaces:
        logger.info("Processing workspace: %s", ws_dir.name)
        stats["workspaces_processed"] += 1

        # Load workspace-specific vdb file
        vdb_file = ws_dir / "vdb_text_chunks.json"
        if not vdb_file.exists():
            logger.warning("No vdb file found in %s", ws_dir)
            continue

        try:
            with open(vdb_file, encoding="utf-8") as f:
                ws_data = json.load(f)
        except Exception as e:
            logger.error("Failed to load %s: %s", vdb_file, e)
            stats["errors"].append(str(ws_dir))
            continue

        # Create workspace index
        ws_index = WorkspaceIndex(
            workspace_name=ws_dir.name,
            working_dir=embedding_dir,
        )

        # Process chunks
        chunks_data = ws_data.get("data", {})
        for chunk_id, chunk_data in chunks_data.items():
            text = chunk_data.get("text", "")
            embedding = chunk_data.get("embedding")
            metadata = chunk_data.get("metadata", {})

            if not text or embedding is None:
                continue

            # Compute hash for deduplication check
            text_hash = compute_mdhash_id(text.strip(), prefix="chunk-")

            # Check if already in shared storage
            if text_hash in shared_storage._data["index"]:
                # Duplicate - just track in workspace index
                existing_chunk_id = shared_storage._data["index"][text_hash]
                ws_index.add_document(chunk_id, [existing_chunk_id], {existing_chunk_id: metadata})
                stats["chunks_deduplicated"] += 1
                logger.debug("Deduplicated chunk %s → %s", chunk_id, existing_chunk_id)
            else:
                # New unique chunk - add to shared storage
                if not dry_run:
                    await shared_storage.upsert({chunk_id: chunk_data})
                ws_index.add_document(chunk_id, [chunk_id], {chunk_id: metadata})
                stats["chunks_migrated"] += 1
                logger.debug("Migrated unique chunk %s", chunk_id)

        # Estimate storage savings
        if vdb_file.exists():
            stats["storage_saved_bytes"] += vdb_file.stat().st_size

        logger.info(
            "Workspace %s: %d chunks processed",
            ws_dir.name,
            len(chunks_data),
        )

    if not dry_run:
        logger.info("Migration completed successfully")
    else:
        logger.info("Dry run completed - no changes made")

    return stats


async def consolidate_workspaces(
    working_dir: Path,
) -> None:
    """Consolidate all workspace embeddings into shared storage.

    This is a more aggressive migration that:
    1. Merges all workspace embeddings into shared storage
    2. Removes duplicate workspace-specific files
    3. Creates workspace indices for all workspaces

    WARNING: This modifies the workspace structure. Backup first!

    Args:
        working_dir: LightRAG working directory (already at embedding model level)
    """
    logger.info("Starting workspace consolidation...")

    # Run migration in dry-run mode first
    stats = await migrate_to_shared_storage(
        working_dir,
        dry_run=True,
    )

    logger.info(
        "Dry run results: %d chunks would be migrated, %d deduplicated",
        stats["chunks_migrated"],
        stats["chunks_deduplicated"],
    )

    # Run actual migration
    stats = await migrate_to_shared_storage(
        working_dir,
        dry_run=False,
    )

    logger.info(
        "Consolidation complete: %d chunks migrated, %d deduplicated, %.2f MB saved",
        stats["chunks_migrated"],
        stats["chunks_deduplicated"],
        stats["storage_saved_bytes"] / (1024 * 1024),
    )