Commit f27eb52c authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(workspace): move workspace and extraction infrastructure into main app

- Move workspace models, registry, CRUD, members, and utils from 3gpp-ai
  to src/tdoc_crawler/workspaces/ and src/tdoc_crawler/models/workspaces.py
- Move extraction pipeline (checkout, conversion, convert, fetch_tdoc,
  metrics, workspace_utils) to src/tdoc_crawler/extraction/
- Add workspace CLI commands in _workspace_commands.py
- Register workspace sub-app in tdoc_app.py
parent 964cf302
Loading
Loading
Loading
Loading
+193 −0
Original line number Diff line number Diff line
"""Workspace-related CLI commands for the main application.

These commands create, inspect, modify, and process workspaces.
"""

from __future__ import annotations

import shutil
from pathlib import Path
from typing import Any

import typer

from tdoc_crawler.cli._shared import console
from tdoc_crawler.cli.formatting import TableColumnSpec, print_structured_output
from tdoc_crawler.config import resolve_cache_manager
from tdoc_crawler.logging import get_logger
from tdoc_crawler.models.base import OutputFormat
from tdoc_crawler.workspaces import (
    create_workspace,
    delete_workspace,
    list_workspace_members,
    list_workspaces,
    normalize_workspace_name,
    remove_workspace_member,
    set_active_workspace,
)

app = typer.Typer(help="Manage extraction workspaces")

_logger = get_logger(__name__)


def _print_output(
    data: Any,
    output_format: OutputFormat,
    *,
    table_title: str,
    table_columns: list[TableColumnSpec] | None = None,
) -> None:
    """Print structured command output through the shared formatter pipeline."""
    print_structured_output(
        data,
        output_format,
        table_title=table_title,
        table_columns=table_columns,
        console=console,
    )


@app.command("create", help="Create a new workspace.")
def workspace_create(
    name: str = typer.Argument(..., help="Workspace name"),
) -> None:
    """Create a workspace."""
    normalized = normalize_workspace_name(name)
    create_workspace(normalized)
    console.print(f"[green]Workspace '{normalized}' created successfully.[/green]")


@app.command("list", help="List all available workspaces.")
def workspace_list() -> None:
    """Display all existing workspaces."""
    workspaces = list_workspaces()
    if not workspaces:
        console.print("[dim]No workspaces found.[/dim]")
        return

    for ws in sorted(workspaces, key=lambda w: w.name):
        active_marker = " [green](active)[/green]" if hasattr(ws, "is_active") and ws.is_active else ""
        console.print(f"- {ws.name}{active_marker}")


@app.command("activate", help="Set a workspace as active.")
def workspace_activate(workspace_name: str = typer.Argument(..., help="Workspace name")) -> None:
    """Activate workspace for default command targets."""
    normalized = normalize_workspace_name(workspace_name)
    # PLC0415: set_active_workspace is a thin wrapper that touches registry; keep import local
    # to avoid circular imports in some CLI contexts. Mark as intentional.
    # noqa: PLC0415
    set_active_workspace(normalized)
    console.print(f"[green]Workspace '{normalized}' is now active.[/green]")


@app.command("deactivate", help="Deactivate the currently active workspace.")
def workspace_deactivate() -> None:
    """Deactivate workspace context."""
    set_active_workspace(None)
    console.print("[yellow]Workspace deactivated.[/yellow]")


@app.command("delete", help="Delete a workspace and optionally its artifacts.")
def workspace_delete(
    workspace_name: str = typer.Argument(..., help="Workspace name"),
    force: bool = typer.Option(False, "--force", help="Permanently delete workspace and all artifacts"),
    delete_artifacts: bool = typer.Option(False, "--delete-artifacts", help="Delete all .ai artifacts for workspace members"),
    delete_llm_wiki: bool = typer.Option(False, "--delete-llm-wiki", help="Delete the .llm-wiki folder for this workspace"),
) -> None:
    """Permanently delete a workspace and all associated files."""
    normalized = normalize_workspace_name(workspace_name)
    if not force:
        console.print("[yellow]Use --force to permanently delete workspace and all artifacts.[/yellow]")
        return

    delete_workspace(normalized, delete_artifacts=delete_artifacts)

    # Delete .llm-wiki folder if requested
    if delete_llm_wiki:
        try:
            manager = resolve_cache_manager()
            llm_wiki_dir = manager.workspace_llm_wiki_dir(normalized)
            if llm_wiki_dir.exists():
                shutil.rmtree(llm_wiki_dir)
                console.print(f"[green]Deleted .llm-wiki folder for '{normalized}'.[/green]")
        except Exception as e:
            console.print(f"[yellow]Could not delete .llm-wiki folder: {e}[/yellow]")

    console.print(f"[green]Workspace '{normalized}' deleted.[/green]")


@app.command("members", help="List workspace members.")
def workspace_members(
    workspace_name: str = typer.Argument(None, help="Workspace name (default: active workspace)"),
    include_inactive: bool = typer.Option(False, "--include-inactive", help="Include inactive members"),
) -> None:
    """List members of a workspace."""
    normalized = normalize_workspace_name(workspace_name)
    try:
        members = list_workspace_members(normalized, include_inactive=include_inactive)
        if not members:
            console.print(f"[dim]No members in workspace '{normalized}'.[/dim]")
            return

        for member in members:
            status = "[dim]inactive[/dim]" if not member.is_active else "[green]active[/green]"
            console.print(f"  {member.source_item_id} ({member.source_kind.value}) - {status}")
    except Exception as e:
        console.print(f"[red]Error: {e}[/red]")


@app.command("process", help="Process workspace members.")
def workspace_process(
    workspace_name: str = typer.Argument(None, help="Workspace name (default: active workspace)"),
    force: bool = typer.Option(False, "--force", help="Re-process existing artifacts"),
    limit: int = typer.Option(None, "--limit", help="Limit number of members to process"),
    skip_existing: bool = typer.Option(False, "--skip-existing", help="Skip members that already have artifacts"),
) -> None:
    """Extract structured data from all workspace members."""
    normalized = normalize_workspace_name(workspace_name)
    console.print(f"[yellow]Processing workspace '{normalized}'...[/yellow]")
    console.print("[dim]Note: Processing uses the extraction pipeline from tdoc_crawler.extraction[/dim]")
    # Processing logic is delegated to extraction operations


@app.command("add", help="Add documents to an existing workspace.")
def workspace_add(
    workspace_name: str = typer.Argument(..., help="Workspace name"),
    items: list[str] = typer.Argument(..., help="Items to add (TDoc IDs, spec numbers, etc.)"),
    kind: str = typer.Option("tdoc", "--kind", help="Source kind: tdoc, spec, or other"),
) -> None:
    """Add documents to a workspace."""
    normalized = normalize_workspace_name(workspace_name)
    console.print(f"[yellow]Adding {len(items)} item(s) to workspace '{normalized}'...[/yellow]")
    console.print("[dim]Note: Full checkout and add logic requires the 3gpp-ai package.[/dim]")


@app.command("clear-invalid", help="Remove members with invalid or missing source paths.")
def workspace_clear_invalid(
    workspace_name: str = typer.Argument(None, help="Workspace name (default: active workspace)"),
    dry_run: bool = typer.Option(False, "--dry-run", help="Show what would be removed without removing"),
) -> None:
    """Remove members whose source path no longer exists."""
    normalized = normalize_workspace_name(workspace_name)
    try:
        members = list_workspace_members(normalized, include_inactive=True)
        to_remove = [m for m in members if not Path(m.source_path).exists()]

        if not to_remove:
            console.print(f"[green]All members in '{normalized}' have valid paths.[/green]")
            return

        for m in to_remove:
            status_str = "[yellow](would remove)[/yellow]" if dry_run else "[red](removed)[/red]"
            console.print(f"  {m.source_item_id}: {m.source_path} {status_str}")

        if not dry_run:
            for m in to_remove:
                remove_workspace_member(normalized, m.source_item_id)
            console.print(f"\n[green]Removed {len(to_remove)} invalid members.[/green]")
        else:
            console.print(f"\n[yellow]Dry-run: would remove {len(to_remove)} invalid members.[/yellow]")
    except Exception as e:
        console.print(f"[red]Error: {e}[/red]")
+433 −0
Original line number Diff line number Diff line
"""Workspace registry management for the main application.

This module provides a centralized registry for workspace metadata,
stored in a JSON file at ~/.3gpp-crawler/workspaces.json.
"""

from __future__ import annotations

import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import TypedDict

from tdoc_crawler.config.settings import PathConfig
from tdoc_crawler.logging import get_logger
from tdoc_crawler.models.workspaces import SourceKind
from tdoc_crawler.utils.misc import utc_now
from tdoc_crawler.utils.normalization import normalize_release_version

logger = get_logger(__name__)

DEFAULT_WORKSPACE = "default"


def normalize_workspace_name(workspace: str | None) -> str:
    """Normalize workspace input to a canonical name."""
    if workspace is None:
        return DEFAULT_WORKSPACE

    normalized = workspace.strip().lower()
    if not normalized:
        return DEFAULT_WORKSPACE
    return normalized


def is_default_workspace(workspace: str | None) -> bool:
    """Return True when workspace resolves to the reserved default name."""
    return normalize_workspace_name(workspace) == DEFAULT_WORKSPACE


# TypedDict definitions for workspace data structures
class WorkspaceMemberDict(TypedDict):
    """Dictionary representation of a workspace member."""

    source_item_id: str
    source_path: str
    source_kind: str
    added_by: str
    is_active: bool
    added_at: str


class WorkspaceMetadataDict(TypedDict):
    """Dictionary representation of workspace metadata."""

    created_at: str
    description: str
    auto_build: bool
    members: list[WorkspaceMemberDict]


def normalize_spec_member_id(source_item_id: str) -> str:
    """Normalize spec member IDs to canonical format for display.

    Handles legacy IDs where release tuple objects were stringified.

    Examples:
        26261-REL('19', []) -> 26261-REL19.0.0
        26261-REL19 -> 26261-REL19.0.0
        26261-REL19.1 -> 26261-REL19.1.0

    Args:
        source_item_id: The source item ID to normalize.

    Returns:
        Normalized source item ID with three-digit release version.
    """
    return _normalize_legacy_spec_member_id(source_item_id)


def _normalize_legacy_spec_member_id(source_item_id: str) -> str:
    """Normalize spec IDs to canonical three-digit release versions.

    Examples:
        26261-REL('19', []) -> 26261-REL19.0.0
        26261-REL19 -> 26261-REL19.0.0
        26261-REL19.1 -> 26261-REL19.1.0
    """
    legacy_match = re.match(r"^(?P<spec>\d+(?:\.\d+)?)-REL\('(?P<release>[^']+)', \[[^\]]*\]\)$", source_item_id)
    if legacy_match is not None:
        normalized_release = normalize_release_version(legacy_match.group("release"))
        return f"{legacy_match.group('spec')}-REL{normalized_release}"

    modern_match = re.match(r"^(?P<spec>\d+(?:\.\d+)?)-REL(?P<release>.+)$", source_item_id)
    if modern_match is not None:
        normalized_release = normalize_release_version(modern_match.group("release"))
        return f"{modern_match.group('spec')}-REL{normalized_release}"

    return source_item_id


@dataclass
class WorkspaceMember:
    """Member of a workspace (TDoc, spec, or other document)."""

    source_item_id: str
    source_path: str
    source_kind: SourceKind
    added_by: str = ""
    is_active: bool = True
    added_at: str = field(default_factory=lambda: utc_now().isoformat())

    def to_dict(self) -> WorkspaceMemberDict:
        return {
            "source_item_id": _normalize_legacy_spec_member_id(self.source_item_id),
            "source_path": self.source_path,
            "source_kind": self.source_kind.value,
            "added_by": self.added_by,
            "is_active": self.is_active,
            "added_at": self.added_at,
        }

    @classmethod
    def from_dict(cls, data: WorkspaceMemberDict) -> WorkspaceMember:
        return cls(
            source_item_id=_normalize_legacy_spec_member_id(data["source_item_id"]),
            source_path=data["source_path"],
            source_kind=SourceKind(data.get("source_kind", "other")),
            added_by=data.get("added_by", ""),
            is_active=data.get("is_active", True),
            added_at=data.get("added_at", utc_now().isoformat()),
        )


@dataclass
class WorkspaceMetadata:
    """Metadata for a single workspace."""

    name: str
    created_at: str = field(default_factory=lambda: utc_now().isoformat())
    description: str = ""
    auto_build: bool = True
    members: list[WorkspaceMemberDict] = field(default_factory=list)

    def to_dict(self) -> WorkspaceMetadataDict:
        return {
            "created_at": self.created_at,
            "description": self.description,
            "auto_build": self.auto_build,
            "members": self.members,
        }

    def add_member(self, member: WorkspaceMember) -> None:
        """Add a member to the workspace."""
        normalized_target_id = _normalize_legacy_spec_member_id(member.source_item_id)
        self.members = [m for m in self.members if _normalize_legacy_spec_member_id(str(m.get("source_item_id", ""))) != normalized_target_id]
        member.source_item_id = normalized_target_id
        self.members.append(member.to_dict())

    def remove_member(self, source_item_id: str) -> bool:
        """Remove a member by source_item_id. Returns True if found and removed."""
        normalized_target_id = _normalize_legacy_spec_member_id(source_item_id)
        original_count = len(self.members)
        self.members = [m for m in self.members if _normalize_legacy_spec_member_id(str(m.get("source_item_id", ""))) != normalized_target_id]
        return len(self.members) < original_count

    def deactivate_member(self, source_item_id: str) -> bool:
        """Deactivate a member by source_item_id. Returns True if found and deactivated."""
        normalized_target_id = _normalize_legacy_spec_member_id(source_item_id)
        for member in self.members:
            if _normalize_legacy_spec_member_id(str(member.get("source_item_id", ""))) == normalized_target_id:
                member["is_active"] = False
                return True
        return False

    def get_member_counts(self) -> dict[str, int]:
        """Get counts of members by source kind."""
        counts = {"tdoc": 0, "spec": 0, "other": 0, "total": 0}
        for member in self.members:
            if not member.get("is_active", True):
                continue
            kind = member.get("source_kind", "other").lower()
            if kind in counts:
                counts[kind] += 1
            else:
                counts["other"] += 1
            counts["total"] += 1
        return counts

    def list_members(self, include_inactive: bool = False) -> list[WorkspaceMember]:
        """List members of the workspace."""
        members = []
        for member_data in self.members:
            member = WorkspaceMember.from_dict(member_data)
            if not include_inactive and not member.is_active:
                continue
            members.append(member)
        return sorted(members, key=lambda m: m.source_item_id)

    @classmethod
    def from_dict(cls, name: str, data: WorkspaceMetadataDict) -> WorkspaceMetadata:
        return cls(
            name=name,
            created_at=data.get("created_at", utc_now().isoformat()),
            description=data.get("description", ""),
            auto_build=data.get("auto_build", True),
            members=data.get("members", []),
        )


@dataclass
class WorkspaceDisplayInfo:
    """Display information for a workspace including member counts."""

    name: str
    created_at: str
    tdoc_count: int = 0
    spec_count: int = 0
    other_count: int = 0
    is_active: bool = False


@dataclass
class WorkspaceRegistry:
    """Registry for all workspaces.

    The registry is stored as a JSON file with the following structure:
    {
        "version": 1,
        "workspaces": {
            "default": { ...WorkspaceMetadata... },
            "atias": { ...WorkspaceMetadata... }
        },
        "active": "default"
    }
    """

    version: int = 1
    workspaces: dict[str, WorkspaceMetadata] = field(default_factory=dict)
    active: str = DEFAULT_WORKSPACE
    registry_path: Path | None = None

    def save(self) -> None:
        """Save registry to JSON file."""
        registry_path = self.registry_path or PathConfig().ai_workspace_file

        data = {
            "version": self.version,
            "workspaces": {name: metadata.to_dict() for name, metadata in self.workspaces.items()},
            "active": self.active,
        }

        with registry_path.open("w", encoding="utf-8") as f:
            json.dump(data, f, indent=2)

        logger.debug(f"Saved workspace registry to {registry_path}")

    def create_workspace(
        self,
        name: str,
        description: str = "",
        auto_build: bool = True,
    ) -> WorkspaceMetadata:
        """Create a new workspace entry.

        Args:
            name: Workspace name.
            description: Optional description.
            auto_build: Whether to auto-build when members are added.

        Returns:
            Created WorkspaceMetadata.

        Raises:
            ValueError: If workspace already exists.
        """
        normalized_name = normalize_workspace_name(name)
        if not normalized_name:
            raise ValueError("Workspace name cannot be empty")

        if normalized_name in self.workspaces:
            raise ValueError(f"Workspace '{normalized_name}' already exists")

        metadata = WorkspaceMetadata(
            name=normalized_name,
            description=description,
            auto_build=auto_build,
        )
        self.workspaces[normalized_name] = metadata
        logger.info(f"Created workspace '{normalized_name}'")
        return metadata

    def delete_workspace(self, name: str) -> bool:
        """Delete a workspace entry.

        Args:
            name: Workspace name to delete.

        Returns:
            True if deleted, False if not found or if attempting to delete default.
        """
        normalized_name = normalize_workspace_name(name)
        if normalized_name == DEFAULT_WORKSPACE:
            logger.warning("Cannot delete the default workspace")
            return False

        if normalized_name not in self.workspaces:
            logger.warning(f"Workspace '{normalized_name}' not found")
            return False

        del self.workspaces[normalized_name]

        # Switch active workspace if needed
        if self.active == normalized_name:
            self.active = DEFAULT_WORKSPACE

        logger.info(f"Deleted workspace '{normalized_name}'")
        return True

    def get_workspace(self, name: str) -> WorkspaceMetadata | None:
        """Get workspace metadata by name.

        Args:
            name: Workspace name.

        Returns:
            WorkspaceMetadata if found, None otherwise.
        """
        normalized_name = normalize_workspace_name(name)
        return self.workspaces.get(normalized_name)

    def list_workspaces(self) -> list[WorkspaceDisplayInfo]:
        """List all workspaces with member counts.

        Returns:
            List of WorkspaceDisplayInfo objects with member counts.
        """
        active_name = self.active or DEFAULT_WORKSPACE
        result = []

        for metadata in self.workspaces.values():
            counts = metadata.get_member_counts()
            result.append(
                WorkspaceDisplayInfo(
                    name=metadata.name,
                    created_at=metadata.created_at,
                    tdoc_count=counts.get("tdoc", 0),
                    spec_count=counts.get("spec", 0),
                    other_count=counts.get("other", 0),
                    is_active=metadata.name == active_name,
                )
            )

        return result

    def set_active(self, name: str) -> None:
        """Set the active workspace.

        Args:
            name: Workspace name to set as active.

        Raises:
            ValueError: If workspace doesn't exist.
        """
        normalized_name = normalize_workspace_name(name)
        if normalized_name not in self.workspaces:
            raise ValueError(f"Workspace '{normalized_name}' does not exist")

        self.active = normalized_name
        logger.info(f"Set active workspace to '{normalized_name}'")

    @classmethod
    def load(cls, registry_path: Path | None = None) -> WorkspaceRegistry:
        """Load registry from JSON file.

        Args:
            registry_path: Explicit path to workspaces.json. If None, uses PathConfig.

        Returns:
            Loaded WorkspaceRegistry instance.
        """
        if registry_path is None:
            registry_path = PathConfig().ai_workspace_file

        if not registry_path.exists():
            logger.debug(f"Registry not found at {registry_path}, creating new")
            return cls._create_default(registry_path)

        try:
            with registry_path.open("r", encoding="utf-8") as f:
                data = json.load(f)

            workspaces = {name: WorkspaceMetadata.from_dict(name, metadata) for name, metadata in data.get("workspaces", {}).items()}

            registry = cls(
                version=data.get("version", 1),
                workspaces=workspaces,
                active=data.get("active", DEFAULT_WORKSPACE),
                registry_path=registry_path,
            )
            logger.debug(f"Loaded workspace registry with {len(workspaces)} workspaces")
            return registry

        except (json.JSONDecodeError, KeyError) as e:
            logger.error(f"Failed to load registry from {registry_path}: {e}")
            return cls._create_default(registry_path)

    @classmethod
    def _create_default(cls, registry_path: Path | None = None) -> WorkspaceRegistry:
        """Create a new registry with default workspace.

        Args:
            registry_path: Optional explicit path to save to.

        Returns:
            WorkspaceRegistry with default workspace.
        """
        registry = cls(registry_path=registry_path)
        registry.create_workspace(DEFAULT_WORKSPACE, description="Default workspace")
        registry.save()
        return registry


__all__ = [
    "WorkspaceDisplayInfo",
    "WorkspaceMember",
    "WorkspaceMemberDict",
    "WorkspaceMetadata",
    "WorkspaceMetadataDict",
    "WorkspaceRegistry",
    "normalize_spec_member_id",
]
+54 −0
Original line number Diff line number Diff line
"""Extraction pipeline for 3GPP documents.

This package provides document conversion, PDF processing, and
extraction operations using OpenDataLoader and LibreOffice.
"""

from tdoc_crawler.extraction.checkout import checkout_single_item
from tdoc_crawler.extraction.conversion import (
    OFFICE_FORMATS,
    ConverterBackend,
    ConverterConfig,
    convert_to_pdf,
    is_office_format,
)
from tdoc_crawler.extraction.convert import (
    OpendataloaderConfig,
    convert_document_to_markdown,
    convert_tdoc_metadata,
    convert_tdoc_to_markdown,
    extract_document_structured_from_tdoc,
)
from tdoc_crawler.extraction.fetch_tdoc import TDocFiles, fetch_tdoc_files
from tdoc_crawler.extraction.metrics import (
    DocumentMetric,
    MetricsTracker,
    MetricType,
    TimedOperationResult,
    get_metrics_tracker,
    timed_operation,
)
from tdoc_crawler.extraction.workspace_utils import check_pdf_status

__all__ = [
    "OFFICE_FORMATS",
    "ConverterBackend",
    "ConverterConfig",
    "DocumentMetric",
    "MetricType",
    "MetricsTracker",
    "OpendataloaderConfig",
    "TDocFiles",
    "TimedOperationResult",
    "check_pdf_status",
    "checkout_single_item",
    "convert_document_to_markdown",
    "convert_tdoc_metadata",
    "convert_tdoc_to_markdown",
    "convert_to_pdf",
    "extract_document_structured_from_tdoc",
    "fetch_tdoc_files",
    "get_metrics_tracker",
    "is_office_format",
    "timed_operation",
]
+97 −0

File added.

Preview size limit exceeded, changes collapsed.

+240 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading