Commit 9d8d53e0 authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(pipeline): enhance document processing with improved folder resolution

- Implement metadata-based resolution for folder paths in the process_all function.
- Add checks to ensure folders contain actual document files before processing.
- Introduce re-checkout logic for empty folders to improve reliability.
- Update ai_query to pass embedding chunks for LLM synthesis in graph queries.
parent 5da98c4f
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -19,11 +19,11 @@ dependencies = [
    "kreuzberg[all]>=4.0.0",
    "lancedb>=0.29.2",
    "litellm>=1.81.15",
    "sentence-transformers[openvino] @ git+https://github.com/huggingface/sentence-transformers.git",
    "sentence-transformers[onnx-gpu] @ git+https://github.com/huggingface/sentence-transformers.git",
    "tokenizers>=0.22.2",
    "optimum[openvino]",
    "hf_xet"
    #"nvidia-cudnn-cu12", "nvidia-cuda-runtime-cu12", "nvidia-cufft-cu12"
    "optimum-onnx[onnxruntime-gpu]",
    "hf_xet",
    "nvidia-cudnn-cu12", "nvidia-cuda-runtime-cu12", "nvidia-cufft-cu12"
]

[project.urls]
+30 −6
Original line number Diff line number Diff line
@@ -75,12 +75,27 @@ class EmbeddingsManager:
    def model(self) -> SentenceTransformer:
        """Return the sentence-transformers model, loading it lazily on first access."""
        if self._model is None:
            try:
                self._model = SentenceTransformer(
                    self._config.embedding_model,
                    trust_remote_code=True,
                    device=None,  # uses GPU if available, otherwise CPU (note: OpenVINO backend requires device="cpu")
                )
                logger.info(f"Loaded embedding model: {self._config.embedding_model}")
            except ImportError as e:
                # Check if it's a version compatibility issue
                error_msg = str(e)
                if "Module" in error_msg or "sentence_transformers.models" in error_msg:
                    raise RuntimeError(
                        f"Model '{self._config.embedding_model}' requires an older version of sentence-transformers.\n"
                        f"Current version: 3.x (installed: {self._get_sentence_transformers_version()})\n"
                        f"This model requires sentence-transformers 2.x.\n"
                        f"Options:\n"
                        f"  1. Use a different embedding model compatible with sentence-transformers 3.x\n"
                        f"  2. Use the default model: 'sentence-transformers/all-MiniLM-L6-v2'\n"
                        f"  3. Use Perplexity API directly instead of local model"
                    ) from e
                raise RuntimeError(f"Failed to load embedding model '{self._config.embedding_model}': {e}") from e

        return self._model

@@ -186,6 +201,15 @@ class EmbeddingsManager:
        # Search in storage
        return storage.search_chunks(query_vector, top_k, workspace=normalized_workspace)

    def _get_sentence_transformers_version(self) -> str:
        """Get the installed sentence-transformers version."""
        try:
            import sentence_transformers

            return sentence_transformers.__version__
        except ImportError:
            return "unknown"

    @classmethod
    def _chunk_by_paragraphs(cls, text: str, max_chars: int = DEFAULT_MAX_CHARS) -> list[str]:
        """Split text into chunks by paragraphs.
+96 −2
Original line number Diff line number Diff line
@@ -606,6 +606,8 @@ def query_graph(
    storage: AiStorage | None = None,
    workspace: str | None = None,
    query_level: GraphQueryLevel | str | None = None,
    embedding_chunks: list[tuple[Any, float]] | None = None,
    max_words: int = 300,
) -> dict:
    """Query knowledge graph for relevant nodes and edges.

@@ -619,6 +621,8 @@ def query_graph(
        workspace: Workspace name for filtering.
        query_level: Query sophistication level (simple|medium|advanced).
            If None, uses config default.
        embedding_chunks: Optional retrieved document chunks for LLM synthesis.
        max_words: Target word count for LLM-generated answer.

    Returns:
        Dict with 'answer', 'nodes', and 'edges' keys for CLI compatibility.
@@ -662,7 +666,12 @@ def query_graph(
        node_edges = [e for e in edges if node.node_id in (e.source_id, e.target_id)]
        results.append(GraphQueryResult(node=node, connected_edges=node_edges))

    # Generate answer based on query level
    # Generate answer based on query level and context availability
    if embedding_chunks and query_level in ("advanced", "medium"):
        # Use LLM synthesis with RAG + GraphRAG context
        answer = _synthesize_with_llm(query, embedding_chunks, matching_nodes, edges, max_words=300)
    else:
        # Use simple/medium keyword-based answer
        answer = _generate_answer(query, matching_nodes, edges, query_level)

    # Log level - handle both string and Literal types
@@ -677,6 +686,91 @@ def query_graph(
    }


# Prompt for RAG + GraphRAG synthesis
RAG_SYNTHESIS_PROMPT = """You are a technical analyst specializing in 3GPP TDoc documents.
Answer the user's question based ONLY on the provided context from embeddings (RAG) and knowledge graph.

Context:
- EMBEDDINGS (RAG): Relevant document chunks retrieved by semantic similarity
- GRAPH: Related entities and relationships from the knowledge graph

Instructions:
1. Answer the question based ONLY on the provided context
2. If the context doesn't contain enough information, say so
3. Cite specific document IDs and sections when possible
4. Be concise but comprehensive (target: ~{max_words} words)

Question: {query}

Embedding Context:
{embedding_context}

Graph Context:
{graph_context}

Answer (aim for ~{max_words} words):"""


def _synthesize_with_llm(
    query: str,
    embedding_chunks: list[tuple[Any, float]],
    nodes: list[GraphNode],
    edges: list[GraphEdge],
    max_words: int = 300,
) -> str:
    """Synthesize answer using LLM with RAG + GraphRAG context.

    Args:
        query: User's question.
        embedding_chunks: Retrieved document chunks with scores.
        nodes: Relevant graph nodes.
        edges: Related graph edges.
        max_words: Target word count for the answer.

    Returns:
        LLM-generated answer.
    """
    from tdoc_ai.operations.summarize import _get_llm_client

    # Build embedding context
    embedding_context_parts = []
    for chunk, score in embedding_chunks[:10]:  # Top 10 chunks
        embedding_context_parts.append(f"- {chunk.document_id} (section: {chunk.section_heading}, score: {score:.3f}):\n  {chunk.content[:500]}")
    embedding_context = "\n".join(embedding_context_parts) if embedding_context_parts else "No embedding context available."

    # Build graph context
    graph_context_parts = []
    # Add node info
    if nodes:
        graph_context_parts.append(f"Relevant entities ({len(nodes)} nodes):")
        for node in nodes[:20]:  # Top 20 nodes
            graph_context_parts.append(f"- {node.node_type.value}: {node.node_id} - {node.label}")
    # Add edge info
    if edges:
        graph_context_parts.append(f"\nRelationships ({len(edges)} edges):")
        for edge in edges[:20]:  # Top 20 edges
            graph_context_parts.append(f"- {edge.source_id} --[{edge.edge_type.value}]--> {edge.target_id}")
    graph_context = "\n".join(graph_context_parts) if graph_context_parts else "No graph context available."

    # Build prompt
    prompt = RAG_SYNTHESIS_PROMPT.format(
        query=query,
        embedding_context=embedding_context,
        graph_context=graph_context,
        max_words=max_words,
    )

    # Call LLM - calculate roughly 1.3 tokens per word
    int(max_words * 1.3)
    try:
        llm_client = _get_llm_client()
        answer = llm_client.complete(prompt, max_tokens=4096)
        return answer
    except Exception as e:
        logger.error(f"LLM synthesis failed: {e}")
        return f"Error generating answer: {e}"


def _generate_answer(
    query: str,
    nodes: list[GraphNode],
+108 −48
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ from tdoc_ai.operations.workspaces import (
    checkout_spec_to_workspace,
    checkout_tdoc_to_workspace,
    list_workspace_members,
    resolve_tdoc_checkout_path,
)
from tdoc_ai.storage import AiStorage
from tdoc_crawler.config import resolve_cache_manager
@@ -389,22 +390,25 @@ def process_all(
                progress_callback(PipelineStage.FAILED, document_id)
            continue

        folder_path = Path(member.source_path) if member.source_path and Path(member.source_path).exists() else checkout_base / document_id

        # Check if folder has actual document files (not just .ai subfolder)
        has_files = folder_path.exists() and any(f.is_file() for f in folder_path.iterdir())

        # Try to re-checkout if folder doesn't exist or has no files
        if not has_files:
            logger.warning(f"Folder empty for {document_id} at {folder_path}, re-checkouting...")
        # Determine folder path using metadata-based resolution
        folder_path = None
        cache_manager = resolve_cache_manager()

        if member.source_kind == "tdoc":
            # First try to resolve from existing checkout structure
            folder_path = resolve_tdoc_checkout_path(document_id, cache_manager.checkout_dir)
            # If not found, perform checkout
            if not folder_path:
                folder_path = checkout_tdoc_to_workspace(document_id, cache_manager.checkout_dir, normalized_workspace, db_file=cache_manager.db_file)
        elif member.source_kind == "spec":
            # Extract base spec number and release from source_item_id (e.g., "21905-REL19" -> spec="21905", release="19")
            base_spec = member.source_item_id
            release = "latest"
            if member.source_item_id and "-REL" in member.source_item_id:
                    release = member.source_item_id.split("-REL")[-1]
                folder_path = checkout_spec_to_workspace(document_id, cache_manager.checkout_dir, normalized_workspace, release=release)
                parts = member.source_item_id.split("-REL")
                base_spec = parts[0]
                release = parts[1]
            folder_path = checkout_spec_to_workspace(base_spec, cache_manager.checkout_dir, normalized_workspace, release=release)

        if not folder_path or not folder_path.exists():
            logger.warning(f"Checkout folder not found for {document_id}")
@@ -412,6 +416,31 @@ def process_all(
                progress_callback(PipelineStage.FAILED, document_id)
            continue

        # Check if folder has actual files (not just empty folder)
        has_files = any(f.is_file() for f in folder_path.iterdir())
        if not has_files:
            logger.warning(f"Checkout folder is empty for {document_id}, attempting re-checkout...")
            # Try to re-checkout with fresh download
            if member.source_kind == "tdoc":
                folder_path = checkout_tdoc_to_workspace(document_id, cache_manager.checkout_dir, normalized_workspace, db_file=cache_manager.db_file)
            elif member.source_kind == "spec":
                folder_path = checkout_spec_to_workspace(base_spec, cache_manager.checkout_dir, normalized_workspace, release=release)

            # Verify re-checkout worked
            if not folder_path or not folder_path.exists():
                logger.error(f"Re-checkout failed for {document_id}")
                if progress_callback:
                    progress_callback(PipelineStage.FAILED, document_id)
                continue

            # Check again after re-checkout
            has_files = any(f.is_file() for f in folder_path.iterdir())
            if not has_files:
                logger.error(f"Re-checkout produced empty folder for {document_id}")
                if progress_callback:
                    progress_callback(PipelineStage.FAILED, document_id)
                continue

        try:
            # Get or create status for this document
            status = storage.get_status(document_id, workspace=normalized_workspace)
@@ -453,22 +482,23 @@ def process_all(
                    progress_callback(PipelineStage.FAILED, document_id)
                continue

            folder_path = Path(member.source_path) if member.source_path and Path(member.source_path).exists() else checkout_base / document_id

            # Check if folder has actual document files (not just .ai subfolder)
            has_files = folder_path.exists() and any(f.is_file() for f in folder_path.iterdir())

            # Try to re-checkout if folder doesn't exist or has no files
            if not has_files:
                logger.info(f"Folder empty for {document_id} in Phase 2, re-checkouting...")
            # Determine folder path using metadata-based resolution
            folder_path = None
            cache_manager = resolve_cache_manager()

            if member.source_kind == "tdoc":
                folder_path = resolve_tdoc_checkout_path(document_id, cache_manager.checkout_dir)
                if not folder_path:
                    folder_path = checkout_tdoc_to_workspace(document_id, cache_manager.checkout_dir, normalized_workspace, db_file=cache_manager.db_file)
            elif member.source_kind == "spec":
                # Extract base spec number and release from source_item_id (e.g., "21905-REL19" -> spec="21905", release="19")
                base_spec = member.source_item_id
                release = "latest"
                if member.source_item_id and "-REL" in member.source_item_id:
                        release = member.source_item_id.split("-REL")[-1]
                    folder_path = checkout_spec_to_workspace(document_id, cache_manager.checkout_dir, normalized_workspace, release=release)
                    parts = member.source_item_id.split("-REL")
                    base_spec = parts[0]
                    release = parts[1]
                folder_path = checkout_spec_to_workspace(base_spec, cache_manager.checkout_dir, normalized_workspace, release=release)

            if not folder_path or not folder_path.exists():
                logger.warning(f"Checkout folder not found for {document_id} in Phase 2")
@@ -476,6 +506,29 @@ def process_all(
                    progress_callback(PipelineStage.FAILED, document_id)
                continue

            # Check if folder has actual files (not just empty folder)
            has_files = any(f.is_file() for f in folder_path.iterdir())
            if not has_files:
                logger.warning(f"Checkout folder is empty for {document_id} in Phase 2, attempting re-checkout...")
                if member.source_kind == "tdoc":
                    folder_path = checkout_tdoc_to_workspace(document_id, cache_manager.checkout_dir, normalized_workspace, db_file=cache_manager.db_file)
                elif member.source_kind == "spec":
                    folder_path = checkout_spec_to_workspace(base_spec, cache_manager.checkout_dir, normalized_workspace, release=release)

                # Verify re-checkout worked
                if not folder_path or not folder_path.exists():
                    logger.error(f"Re-checkout failed for {document_id} in Phase 2")
                    if progress_callback:
                        progress_callback(PipelineStage.FAILED, document_id)
                    continue

                has_files = any(f.is_file() for f in folder_path.iterdir())
                if not has_files:
                    logger.error(f"Re-checkout produced empty folder for {document_id} in Phase 2")
                    if progress_callback:
                        progress_callback(PipelineStage.FAILED, document_id)
                    continue

            # Check if already embedded (skip if already done)
            # Note: If embeddings were manually deleted, use --force to reprocess
            status = storage.get_status(document_id, workspace=normalized_workspace)
@@ -533,6 +586,7 @@ def process_all(
    return results


# TODO: ambigious name, ambiguous argument, ambiguous return type - consider splitting into get_status(document_id) and list_statuses(workspace)
def get_status(document_id: str | None = None, workspace: str | None = None) -> ProcessingStatus | list[ProcessingStatus] | None:
    """Get processing status for a TDoc or all TDocs in workspace.

@@ -587,7 +641,8 @@ def build_all_graphs(
    members_map: dict[str, Any] = {}
    if normalized_workspace != "default":
        members = list_workspace_members(normalized_workspace)
        members_map = {m.source_item_id: m for m in members if m.is_active and m.source_kind == "tdoc"}
        # Include both tdocs and specs
        members_map = {m.source_item_id: m for m in members if m.is_active and m.source_kind in ("tdoc", "spec")}

    results: dict[str, ProcessingStatus] = {}

@@ -606,28 +661,33 @@ def build_all_graphs(
                progress_callback(PipelineStage.GRAPHING, document_id)
            continue

        # Determine folder path - use workspace member path if available
        # Determine folder path using metadata-based resolution
        member = members_map.get(document_id)
        cache_manager = resolve_cache_manager()

        if not member:
            # For specs, try to construct path from document_id
            if "-REL" in document_id:
                # Spec format: 26260-REL19
                spec_number = document_id.split("-REL")[0]
                release = document_id.split("-REL")[1]
                folder_path = checkout_base / "specs" / f"{spec_number}-REL{release}" / spec_number
            else:
                logger.warning(f"No workspace member found for {document_id} in graph phase")
                continue

        folder_path = Path(member.source_path) if member.source_path and Path(member.source_path).exists() else checkout_base / document_id

        # Check if folder has actual document files (not just .ai subfolder)
        has_files = folder_path.exists() and any(f.is_file() for f in folder_path.iterdir())

        # Try to re-checkout if folder doesn't exist or has no files
        if not has_files:
            logger.info(f"Folder empty for {document_id} in graph phase, re-checkouting...")
            cache_manager = resolve_cache_manager()
            if member.source_kind == "tdoc":
        elif member.source_kind == "tdoc":
            folder_path = resolve_tdoc_checkout_path(document_id, cache_manager.checkout_dir)
            if not folder_path:
                folder_path = checkout_tdoc_to_workspace(document_id, cache_manager.checkout_dir, normalized_workspace, db_file=cache_manager.db_file)
            elif member.source_kind == "spec":
        else:  # member.source_kind == "spec"
            # Extract base spec number and release from source_item_id (e.g., "21905-REL19" -> spec="21905", release="19")
            base_spec = member.source_item_id
            release = "latest"
            if member.source_item_id and "-REL" in member.source_item_id:
                    release = member.source_item_id.split("-REL")[-1]
                folder_path = checkout_spec_to_workspace(document_id, cache_manager.checkout_dir, normalized_workspace, release=release)
                parts = member.source_item_id.split("-REL")
                base_spec = parts[0]
                release = parts[1]
            folder_path = checkout_spec_to_workspace(base_spec, cache_manager.checkout_dir, normalized_workspace, release=release)

        if not folder_path or not folder_path.exists():
            logger.warning(f"Checkout folder not found for {document_id} in graph phase")
+55 −89

File changed.

Preview size limit exceeded, changes collapsed.