Commit 8421d18f authored by Jan Reimes's avatar Jan Reimes
Browse files

cleanup: remove dead context and reorder internals

parent fee3bbf2
Loading
Loading
Loading
Loading
+0 −69
Original line number Diff line number Diff line
"""Document processing context for the AI pipeline.

This module provides the DocumentContext dataclass that carries all information
needed to process a document through the pipeline stages.
"""

from __future__ import annotations

import logging
from dataclasses import dataclass
from pathlib import Path

from threegpp_ai.models import (
    DocumentClassification,
    ProcessingStatus,
    SourceKind,
    WorkspaceMember,
)
from threegpp_ai.operations.workspace_names import normalize_workspace_name

logger = logging.getLogger(__name__)


@dataclass
class DocumentContext:
    """Processing context for a document through the AI pipeline.

    Captures document type, folder path, artifact paths, and computed
    properties derived from SourceKind.
    """

    document_id: str
    source_kind: SourceKind
    folder_path: Path
    workspace: str
    markdown_path: Path | None = None
    meeting_id: str | None = None
    member: WorkspaceMember | None = None
    status: ProcessingStatus | None = None
    classification: DocumentClassification | None = None

    @property
    def is_spec(self) -> bool:
        """True if this document is a specification."""
        return self.source_kind == SourceKind.SPEC

    @property
    def is_tdoc(self) -> bool:
        """True if this document is a TDoc."""
        return self.source_kind == SourceKind.TDOC

    @property
    def artifact_path(self) -> Path:
        """Path to the extracted markdown artifact."""
        if self.markdown_path:
            return self.markdown_path
        return self.folder_path / ".ai" / f"{self.document_id.strip().upper()}.md"

    def ensure_artifact_dir(self) -> Path:
        """Ensure the .ai directory exists and return artifact path."""
        self.artifact_path.parent.mkdir(parents=True, exist_ok=True)
        return self.artifact_path

    def normalized_workspace(self) -> str:
        """Return normalized workspace name."""
        return normalize_workspace_name(self.workspace)


__all__ = ["DocumentContext"]
+59 −59
Original line number Diff line number Diff line
@@ -73,31 +73,6 @@ class WorkspaceIndex:
        self._file_path = self.working_dir / self.workspace_name / "workspace_index.json"
        self._load()

    def _load(self) -> None:
        """Load index from disk if it exists."""
        if self._file_path.exists():
            try:
                with open(self._file_path, encoding="utf-8") as f:
                    self._data = json.load(f)
                logger.debug(
                    "Loaded workspace index for '%s' (%d docs, %d chunks)",
                    self.workspace_name,
                    len(self._data.get("documents", {})),
                    len(self._data.get("chunks", {})),
                )
            except Exception as e:
                logger.warning("Failed to load workspace index: %s", e)
                self._data = {"documents": {}, "chunks": {}, "metadata": {}}
        else:
            logger.debug("Created new workspace index for '%s'", self.workspace_name)

    def _save(self) -> None:
        """Save index to disk."""
        self._file_path.parent.mkdir(parents=True, exist_ok=True)
        with open(self._file_path, "w", encoding="utf-8") as f:
            json.dump(self._data, f, indent=2, ensure_ascii=False)
        logger.debug("Saved workspace index for '%s'", self.workspace_name)

    def add_document(self, doc_id: str, chunk_ids: list[str], metadata: dict[str, Any] | None = None) -> None:
        """Add a document mapping to the index.

@@ -205,6 +180,31 @@ class WorkspaceIndex:
        """
        return self._data.get("metadata", {}).get(chunk_id, {})

    def _load(self) -> None:
        """Load index from disk if it exists."""
        if self._file_path.exists():
            try:
                with open(self._file_path, encoding="utf-8") as f:
                    self._data = json.load(f)
                logger.debug(
                    "Loaded workspace index for '%s' (%d docs, %d chunks)",
                    self.workspace_name,
                    len(self._data.get("documents", {})),
                    len(self._data.get("chunks", {})),
                )
            except Exception as e:
                logger.warning("Failed to load workspace index: %s", e)
                self._data = {"documents": {}, "chunks": {}, "metadata": {}}
        else:
            logger.debug("Created new workspace index for '%s'", self.workspace_name)

    def _save(self) -> None:
        """Save index to disk."""
        self._file_path.parent.mkdir(parents=True, exist_ok=True)
        with open(self._file_path, "w", encoding="utf-8") as f:
            json.dump(self._data, f, indent=2, ensure_ascii=False)
        logger.debug("Saved workspace index for '%s'", self.workspace_name)


@dataclass
class SharedNanoVectorDBStorage(BaseVectorStorage):
@@ -262,40 +262,6 @@ class SharedNanoVectorDBStorage(BaseVectorStorage):
        # Load existing data
        self._load()

    def _load(self) -> None:
        """Load shared storage from disk."""
        if self._db_file.exists():
            try:
                with open(self._db_file, encoding="utf-8") as f:
                    self._data = json.load(f)
                logger.info(
                    "Loaded shared %s storage (%d chunks)",
                    self.namespace,
                    len(self._data.get("data", {})),
                )
            except Exception as e:
                logger.warning("Failed to load shared storage: %s", e)
                self._data = {"data": {}, "index": {}}
        else:
            logger.info("Created new shared %s storage", self.namespace)

    def _save(self) -> None:
        """Save shared storage to disk."""
        self._db_file.parent.mkdir(parents=True, exist_ok=True)
        with open(self._db_file, "w", encoding="utf-8") as f:
            json.dump(self._data, f, indent=2, ensure_ascii=False)

    def _compute_text_hash(self, text: str) -> str:
        """Compute MD5 hash for text deduplication.

        Args:
            text: Text content to hash

        Returns:
            MD5 hash string
        """
        return compute_mdhash_id(text.strip(), prefix="chunk-")

    async def upsert(self, data: dict[str, dict[str, Any]], **kwargs: Any) -> None:
        """Insert or update embeddings with deduplication.

@@ -496,6 +462,40 @@ class SharedNanoVectorDBStorage(BaseVectorStorage):
        """Get total number of unique chunks in shared storage."""
        return len(self._data.get("data", {}))

    def _load(self) -> None:
        """Load shared storage from disk."""
        if self._db_file.exists():
            try:
                with open(self._db_file, encoding="utf-8") as f:
                    self._data = json.load(f)
                logger.info(
                    "Loaded shared %s storage (%d chunks)",
                    self.namespace,
                    len(self._data.get("data", {})),
                )
            except Exception as e:
                logger.warning("Failed to load shared storage: %s", e)
                self._data = {"data": {}, "index": {}}
        else:
            logger.info("Created new shared %s storage", self.namespace)

    def _save(self) -> None:
        """Save shared storage to disk."""
        self._db_file.parent.mkdir(parents=True, exist_ok=True)
        with open(self._db_file, "w", encoding="utf-8") as f:
            json.dump(self._data, f, indent=2, ensure_ascii=False)

    def _compute_text_hash(self, text: str) -> str:
        """Compute MD5 hash for text deduplication.

        Args:
            text: Text content to hash

        Returns:
            MD5 hash string
        """
        return compute_mdhash_id(text.strip(), prefix="chunk-")


async def initialize_shared_storage(working_dir: Path, embedding_model: str) -> None:
    """Initialize shared storage directory structure.
+22 −22
Original line number Diff line number Diff line
@@ -110,6 +110,28 @@ class Converter:
            output_format=output_format,
        )

    def convert_batch(
        self,
        input_files: Iterable[Path],
        output_format: LibreOfficeFormat,
        output_dir: Path,
    ) -> list[ConversionResult]:
        """Convert multiple documents sequentially.

        Args:
            input_files: Iterable of input file paths.
            output_format: Desired output format.
            output_dir: Directory for converted files.

        Returns:
            List of ConversionResult items.
        """
        results: list[ConversionResult] = []
        for input_file in input_files:
            result = self.convert(input_file, output_format, output_dir)
            results.append(result)
        return results

    def _run_conversion(self, input_file: Path, output_format: str, output_dir: Path) -> None:
        """Execute the LibreOffice conversion command.

@@ -164,25 +186,3 @@ class Converter:

        if result.returncode != 0:
            raise subprocess.CalledProcessError(result.returncode, cmd, result.stdout, result.stderr)

    def convert_batch(
        self,
        input_files: Iterable[Path],
        output_format: LibreOfficeFormat,
        output_dir: Path,
    ) -> list[ConversionResult]:
        """Convert multiple documents sequentially.

        Args:
            input_files: Iterable of input file paths.
            output_format: Desired output format.
            output_dir: Directory for converted files.

        Returns:
            List of ConversionResult items.
        """
        results: list[ConversionResult] = []
        for input_file in input_files:
            result = self.convert(input_file, output_format, output_dir)
            results.append(result)
        return results