Commit 189a6593 authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(context): add DocumentContext for managing document processing state

- Introduced DocumentContext dataclass to encapsulate document metadata.
- Updated pipeline operations to utilize DocumentContext for folder resolution.
- Refactored workspace member handling to improve clarity and maintainability.
- Enhanced error handling and logging for document processing phases.
parent 55257e32
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -5,6 +5,7 @@ from __future__ import annotations
import litellm

from tdoc_ai.config import AiConfig
from tdoc_ai.context import DocumentContext
from tdoc_ai.models import (
    DocumentChunk,
    DocumentClassification,
@@ -13,6 +14,7 @@ from tdoc_ai.models import (
    GraphNode,
    PipelineStage,
    ProcessingStatus,
    SourceKind,
)
from tdoc_ai.operations.convert import convert_tdoc as convert_document
from tdoc_ai.operations.embeddings import EmbeddingsManager
@@ -96,11 +98,13 @@ __all__ = [
    "CacheManager",
    "DocumentChunk",
    "DocumentClassification",
    "DocumentContext",
    "DocumentSummary",
    "GraphEdge",
    "GraphNode",
    "PipelineStage",
    "ProcessingStatus",
    "SourceKind",
    "SummarizeResult",
    "WorkspaceDisplayInfo",
    "WorkspaceRegistry",
+69 −0
Original line number Diff line number Diff line
"""Document processing context for the AI pipeline.

This module provides the DocumentContext dataclass that carries all information
needed to process a document through the pipeline stages.
"""

from __future__ import annotations

import logging
from dataclasses import dataclass
from pathlib import Path

from tdoc_ai.models import (
    DocumentClassification,
    ProcessingStatus,
    SourceKind,
    WorkspaceMember,
)
from tdoc_ai.operations.workspace_names import normalize_workspace_name

logger = logging.getLogger(__name__)


@dataclass
class DocumentContext:
    """Processing context for a document through the AI pipeline.

    Captures document type, folder path, artifact paths, and computed
    properties derived from SourceKind.
    """

    document_id: str
    source_kind: SourceKind
    folder_path: Path
    workspace: str
    markdown_path: Path | None = None
    meeting_id: str | None = None
    member: WorkspaceMember | None = None
    status: ProcessingStatus | None = None
    classification: DocumentClassification | None = None

    @property
    def is_spec(self) -> bool:
        """True if this document is a specification."""
        return self.source_kind == SourceKind.SPEC

    @property
    def is_tdoc(self) -> bool:
        """True if this document is a TDoc."""
        return self.source_kind == SourceKind.TDOC

    @property
    def artifact_path(self) -> Path:
        """Path to the extracted markdown artifact."""
        if self.markdown_path:
            return self.markdown_path
        return self.folder_path / ".ai" / f"{self.document_id.strip().upper()}.md"

    def ensure_artifact_dir(self) -> Path:
        """Ensure the .ai directory exists and return artifact path."""
        self.artifact_path.parent.mkdir(parents=True, exist_ok=True)
        return self.artifact_path

    def normalized_workspace(self) -> str:
        """Return normalized workspace name."""
        return normalize_workspace_name(self.workspace)


__all__ = ["DocumentContext"]
+1 −0
Original line number Diff line number Diff line
"""Pydantic models for the AI document processing pipeline."""
# TODO: Consider splitting into multiple files if this grows too large, e.g. models.py for core data models, exceptions.py for error classes, enums.py for enums, etc.

from __future__ import annotations

+70 −42
Original line number Diff line number Diff line
@@ -8,7 +8,14 @@ from pathlib import Path
from typing import Any

from tdoc_ai.config import AiConfig
from tdoc_ai.models import DocumentClassification, PipelineStage, ProcessingFailureType, ProcessingStatus
from tdoc_ai.context import DocumentContext
from tdoc_ai.models import (
    DocumentClassification,
    PipelineStage,
    ProcessingFailureType,
    ProcessingStatus,
    SourceKind,
)
from tdoc_ai.operations.classify import classify_document_files
from tdoc_ai.operations.embeddings import EmbeddingsManager
from tdoc_ai.operations.extract import extract_from_folder
@@ -335,7 +342,6 @@ def process_all(
        workspace=normalized_workspace,
        config=config,
        embeddings_manager=embeddings_manager,
        limit=limit,
    )
    results.update(extract_results)

@@ -347,7 +353,6 @@ def process_all(
        workspace=normalized_workspace,
        config=config,
        embeddings_manager=embeddings_manager,
        limit=limit,
    )
    results.update(embed_results)

@@ -358,7 +363,6 @@ def process_all(
        workspace=normalized_workspace,
        config=config,
        embeddings_manager=embeddings_manager,
        limit=limit,
    )
    results.update(graph_results)
    return results
@@ -411,14 +415,13 @@ def process_extract_phase(
    """Run the classify and extract phase for all documents.

    This phase identifies the primary document in each folder, then converts it
    to Markdown and records status updates in storage. Documents with completed
    extraction are skipped unless force_rerun is set. Failures are logged and do
    to Markdown and records status updates in storage. Failures are logged and do
    not interrupt processing of other documents.

    Args:
        document_ids: Document identifiers to process.
        checkout_base: Base checkout directory for document folders.
        new_only: Whether to return already-extracted documents in results.
        new_only: Whether to process only documents without existing extraction.
        force_rerun: Whether to re-run classification/extraction regardless of status.
        progress_callback: Optional callback invoked after each document.
        workspace: Optional workspace name (defaults to "default").
@@ -436,45 +439,46 @@ def process_extract_phase(
    results: dict[str, ProcessingStatus] = {}
    extracted_ids: list[str] = []

    # Apply limit for testing
    if limit is not None and limit > 0:
        document_ids = document_ids[:limit]

    for document_id in document_ids:
        if new_only and not force_rerun:
            existing_status = storage.get_status(document_id, workspace=normalized_workspace)
        if not force_rerun and existing_status and existing_status.extracted_at is not None:
            logger.debug(f"Skipping {document_id} - already extracted")
            if not new_only:
                results[document_id] = existing_status
                extracted_ids.append(document_id)
            if existing_status and (
                existing_status.extracted_at is not None or existing_status.current_stage == PipelineStage.COMPLETED or existing_status.completed_at is not None
            ):
                logger.info(f"Skipping {document_id} - already processed")
                if progress_callback:
                    progress_callback(document_id)
                continue

        folder_path = _resolve_document_folder(
        ctx = _resolve_document_folder(
            document_id=document_id,
            checkout_base=checkout_base,
            workspace=normalized_workspace,
            members_map=members_map,
        )
        if not folder_path or not folder_path.exists():
        if ctx is None or not ctx.folder_path.exists():
            logger.warning(f"Checkout folder not found for {document_id}")
            if progress_callback:
                progress_callback(document_id)
            continue

        try:
            status = existing_status or ProcessingStatus(document_id=document_id)
            status = storage.get_status(document_id, workspace=normalized_workspace)
            if status is None:
                status = ProcessingStatus(document_id=document_id)

            if not force_rerun and status.classified_at:
                logger.info(f"Skipping classification for {document_id} - already classified")
            else:
                _run_classify_stage(document_id, folder_path, storage, status, workspace=normalized_workspace)
                _run_classify_stage(document_id, ctx.folder_path, storage, status, workspace=normalized_workspace)

            if not force_rerun and status.extracted_at:
                logger.info(f"Skipping extraction for {document_id} - already extracted")
            else:
                _run_extract_stage(document_id, folder_path, storage, status, workspace=normalized_workspace)
                _run_extract_stage(document_id, ctx.folder_path, storage, status, workspace=normalized_workspace)

            results[document_id] = status
            if status.extracted_at is not None:
@@ -523,24 +527,23 @@ def process_embed_phase(
    results: dict[str, ProcessingStatus] = {}
    embedded_ids: list[str] = []

    # Apply limit for testing
    if limit is not None and limit > 0:
        document_ids = document_ids[:limit]

    for document_id in document_ids:
        folder_path = _resolve_document_folder(
        ctx = _resolve_document_folder(
            document_id=document_id,
            checkout_base=checkout_base,
            workspace=normalized_workspace,
            members_map=members_map,
        )
        if not folder_path or not folder_path.exists():
        if ctx is None or not ctx.folder_path.exists():
            logger.warning(f"Checkout folder not found for {document_id} in Phase 2")
            if progress_callback:
                progress_callback(document_id)
            continue

        if not any(f.is_file() for f in folder_path.iterdir()):
        if not any(f.is_file() for f in ctx.folder_path.iterdir()):
            logger.warning(f"Checkout folder is empty for {document_id} in Phase 2")
            if progress_callback:
                progress_callback(document_id)
@@ -561,7 +564,7 @@ def process_embed_phase(

            _run_embedding_stage(
                document_id,
                folder_path,
                ctx.folder_path,
                storage,
                status,
                workspace=normalized_workspace,
@@ -613,7 +616,6 @@ def process_graph_phase(
    results: dict[str, ProcessingStatus] = {}
    graphed_ids: list[str] = []

    # Apply limit for testing
    if limit is not None and limit > 0:
        document_ids = document_ids[:limit]

@@ -632,20 +634,20 @@ def process_graph_phase(
                progress_callback(document_id)
            continue

        folder_path = _resolve_document_folder(
        ctx = _resolve_document_folder(
            document_id=document_id,
            checkout_base=checkout_base,
            workspace=normalized_workspace,
            members_map=members_map,
        )
        if not folder_path or not folder_path.exists():
        if ctx is None or not ctx.folder_path.exists():
            logger.warning(f"Checkout folder not found for {document_id} in graph phase")
            if progress_callback:
                progress_callback(document_id)
            continue

        try:
            _build_graph_for_document(document_id, folder_path, storage, status, normalized_workspace)
            _build_graph_for_document(document_id, ctx.folder_path, storage, status, normalized_workspace)
            results[document_id] = status
            graphed_ids.append(document_id)
        except Exception as exc:
@@ -681,7 +683,7 @@ def _resolve_workspace_document_ids(
        resolved_ids = document_ids
    else:
        members = list_workspace_members(workspace)
        member_ids = {m.source_item_id for m in members if m.is_active and m.source_kind in ("tdoc", "spec")}
        member_ids = {m.source_item_id for m in members if m.is_active and m.source_kind in (SourceKind.TDOC, SourceKind.SPEC)}
        resolved_ids = list(member_ids) if not document_ids else [tid for tid in document_ids if tid in member_ids]

    if limit is not None and limit > 0:
@@ -703,7 +705,7 @@ def _resolve_members_map(workspace: str) -> dict[str, Any]:
    if workspace == "default":
        return {}
    members = list_workspace_members(workspace)
    return {m.source_item_id: m for m in members if m.is_active and m.source_kind in ("tdoc", "spec")}
    return {m.source_item_id: m for m in members if m.is_active and m.source_kind in (SourceKind.TDOC, SourceKind.SPEC)}


def _resolve_storage(config: AiConfig | None, embeddings_manager: EmbeddingsManager | None = None) -> AiStorage:
@@ -727,8 +729,8 @@ def _resolve_document_folder(
    checkout_base: Path,
    workspace: str,
    members_map: dict[str, Any],
) -> Path | None:
    """Resolve a document folder path for the given workspace.
) -> DocumentContext | None:
    """Resolve a document context for the given workspace.

    Handles default workspace lookups and workspace members for TDocs and
    specifications. Performs checkouts when necessary.
@@ -740,24 +742,50 @@ def _resolve_document_folder(
        members_map: Mapping of workspace members keyed by source_item_id.

    Returns:
        Document folder path if found, otherwise None.
        DocumentContext with folder path and metadata if found, otherwise None.
    """
    cache_manager = resolve_cache_manager()
    if workspace == "default":
        return _resolve_default_document_folder(document_id, checkout_base, cache_manager.checkout_dir)
        folder_path = _resolve_default_document_folder(document_id, checkout_base, cache_manager.checkout_dir)
        if folder_path is None:
            return None
        meeting_id = _extract_meeting_id(folder_path)
        return DocumentContext(
            document_id=document_id,
            source_kind=SourceKind.TDOC,
            folder_path=folder_path,
            workspace=workspace,
            meeting_id=meeting_id,
        )

    member = members_map.get(document_id)
    if not member:
        return None

    if member.source_kind == "tdoc":
    folder_path: Path | None = None
    source_kind = member.source_kind

    if source_kind == SourceKind.TDOC:
        folder_path = resolve_tdoc_checkout_path(document_id, cache_manager.checkout_dir)
        if not folder_path:
            folder_path = checkout_tdoc_to_workspace(document_id, cache_manager.checkout_dir, workspace, db_file=cache_manager.db_file)
        return folder_path

    elif source_kind == SourceKind.SPEC:
        base_spec, release = _parse_spec_release(member.source_item_id)
    return checkout_spec_to_workspace(base_spec, cache_manager.checkout_dir, workspace, release=release)
        folder_path = checkout_spec_to_workspace(base_spec, cache_manager.checkout_dir, workspace, release=release)

    if folder_path is None:
        return None

    meeting_id = _extract_meeting_id(folder_path) if source_kind == SourceKind.TDOC else None

    return DocumentContext(
        document_id=document_id,
        source_kind=source_kind,
        folder_path=folder_path,
        workspace=workspace,
        meeting_id=meeting_id,
        member=member,
    )


def _resolve_default_document_folder(document_id: str, checkout_base: Path, checkout_dir: Path) -> Path | None:
+18 −6
Original line number Diff line number Diff line
@@ -111,13 +111,23 @@ def _get_cache_manager() -> CacheManager:


def resolve_workspace(workspace: str | None) -> str:
    """Resolve workspace name from option or active workspace."""
    """Resolve workspace name from option or active workspace.

    Returns normalized workspace name, falling back to active workspace
    if not specified. Exits with error if neither is available.
    """
    # If workspace provided, normalize and return it
    if workspace:
        return normalize_workspace_name(workspace)

    # Fall back to active workspace
    active = get_active_workspace()
    if active:
        return active
    console.print("[red]No workspace specified and no active workspace set. Use -w <name> or 'ai workspace activate <name>'[/red]")

    # No workspace available - show error and exit
    console.print("[red]No workspace specified and no active workspace set.[/red]")
    console.print("[red]Use -w <name> or 'ai workspace activate <name>'[/red]")
    raise typer.Exit(1)


@@ -180,16 +190,18 @@ def ai_query(
        console.print("[red]Error: query is required (positional or --query).[/red]")
        raise typer.Exit(1)

    resolved_workspace = resolve_workspace(workspace)
    # Resolve workspace (from option, active workspace, or error)
    normalized_workspace = resolve_workspace(workspace)

    _get_cache_manager()  # Ensure cache manager is registered before config loads
    embeddings_manager = EmbeddingsManager(AiConfig.from_env())

    embedding_results = embeddings_manager.query_embeddings(query_text, resolved_workspace, top_k)
    embedding_results = embeddings_manager.query_embeddings(query_text, normalized_workspace, top_k)

    # Pass embedding chunks to query_graph for LLM synthesis
    graph_result = query_graph(
        query_text,
        workspace=resolved_workspace,
        workspace=normalized_workspace,
        embedding_chunks=embedding_results,
        max_words=max_words,
        query_level="advanced",  # Always use LLM synthesis
@@ -205,7 +217,7 @@ def ai_query(
        typer.echo(json.dumps(payload))
    else:
        if embedding_results:
            table = Table(title=f"Embedding results for '{query_text}' (workspace: {resolved_workspace})")
            table = Table(title=f"Embedding results for '{query_text}' (workspace: {normalized_workspace})")
            table.add_column("TDoc", style="cyan")
            table.add_column("Section", style="green")
            table.add_column("Score", style="magenta")