Commit 7f88545b authored by Jan Reimes's avatar Jan Reimes
Browse files

refactor(ai): fix TODOs - split functions, add LLM input limits

- Split ambiguous get_status() into get_status(document_id) and list_statuses(workspace) for clarity (operations/pipeline.py, __init__.py)
- Add input size limits to LLM prompts to prevent token overflow: ABSTRACT_INPUT_LIMIT=5000, SUMMARY_INPUT_LIMIT=8000, KEYWORDS_INPUT_LIMIT=5000 (operations/summarize.py)
- Refactor checkout_tdoc_to_workspace() into _resolve_tdoc_metadata() and _checkout_tdoc_if_needed() for better testability (operations/workspaces.py)
- Add valid_from/valid_to temporal validity fields to GraphNode and GraphEdge models (models.py)
parent 43ec85de
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -17,7 +17,7 @@ from tdoc_ai.models import (
from tdoc_ai.operations.convert import convert_tdoc as convert_document
from tdoc_ai.operations.embeddings import EmbeddingsManager
from tdoc_ai.operations.graph import query_graph
from tdoc_ai.operations.pipeline import get_status, process_all
from tdoc_ai.operations.pipeline import get_status, list_statuses, process_all
from tdoc_ai.operations.pipeline import process_tdoc as process_document
from tdoc_ai.operations.summarize import SummarizeResult
from tdoc_ai.operations.summarize import summarize_tdoc as summarize_document
@@ -120,6 +120,7 @@ __all__ = [
    "get_workspace",
    "get_workspace_member_counts",
    "is_default_workspace",
    "list_statuses",
    "list_workspace_members",
    "list_workspaces",
    "make_workspace_member",
+7 −1
Original line number Diff line number Diff line
@@ -3,7 +3,7 @@
from __future__ import annotations

from datetime import datetime
from enum import StrEnum
from enum import StrEnum, auto
from typing import Any

from pydantic import BaseModel, Field, field_validator, model_validator
@@ -12,6 +12,7 @@ from tdoc_ai.config import AiConfig
from tdoc_crawler.utils.misc import utc_now


# TODO: the same functions is re-defined in operations/extract.py, src/tdoc_crawler/tdocs/models.py and src/tdoc_crawler/tdocs/utils.py - should exist only at a single location!
def _normalize_document_id(value: str) -> str:
    return value.strip().upper()

@@ -71,6 +72,7 @@ class ProcessingFailureType(StrEnum):
    GRAPH_FAILED = "graph_failed"


# TODO: Unnecessary? We can infer from presence of WorkspaceMember records and their is_active flag - or simply use a bool variable?
class WorkspaceStatus(StrEnum):
    """Lifecycle state of a workspace."""

@@ -80,6 +82,10 @@ class WorkspaceStatus(StrEnum):
class SourceKind(StrEnum):
    """Kinds of source items that can be part of a workspace corpus."""

    TDOC = auto()
    SPEC = auto()
    OTHER = auto()  # catch-all for any other source types


class AiError(Exception):
    """Base exception for AI processing errors."""
+89 −92
Original line number Diff line number Diff line
@@ -376,13 +376,17 @@ def process_all(
    for document_id in document_ids:
        if new_only and not force_rerun:
            existing_status = storage.get_status(document_id, workspace=normalized_workspace)
            if existing_status and existing_status.extracted_at is not None:
                logger.info(f"Skipping {document_id} - already extracted")
                extracted_ids.append(document_id)
            # Skip COMPLETED or fully-processed docs in new_only mode - don't process at all
            if existing_status and (
                existing_status.extracted_at is not None or existing_status.current_stage == PipelineStage.COMPLETED or existing_status.completed_at is not None
            ):
                logger.info(f"Skipping {document_id} - already processed")
                if progress_callback:
                    progress_callback(PipelineStage.EXTRACTING, document_id)
                continue

        # For non-default workspaces, require explicit membership
        if normalized_workspace != "default":
            member = members_map.get(document_id)
            if not member:
                logger.warning(f"No workspace member found for {document_id}")
@@ -390,16 +394,31 @@ def process_all(
                    progress_callback(PipelineStage.FAILED, document_id)
                continue

        # Determine folder path using metadata-based resolution
        # Determine folder path - use checkout_base directly for default workspace
        # This allows tests to pass a custom checkout_base without needing complex path resolution
        folder_path = None

        # For default workspace: use checkout_base directly if provided
        if normalized_workspace == "default":
            if checkout_base:
                folder_path = checkout_base / document_id
            else:
                cache_manager = resolve_cache_manager()
                folder_path = resolve_tdoc_checkout_path(document_id, cache_manager.checkout_dir)
        else:
            # Non-default workspace - member is guaranteed to exist from check above
            if checkout_base:
                checkout_dir = checkout_base
            else:
                cache_manager = resolve_cache_manager()
                checkout_dir = cache_manager.checkout_dir

            if member.source_kind == "tdoc":
                # First try to resolve from existing checkout structure
            folder_path = resolve_tdoc_checkout_path(document_id, cache_manager.checkout_dir)
                folder_path = resolve_tdoc_checkout_path(document_id, checkout_dir)
                # If not found, perform checkout
                if not folder_path:
                folder_path = checkout_tdoc_to_workspace(document_id, cache_manager.checkout_dir, normalized_workspace, db_file=cache_manager.db_file)
                    folder_path = checkout_tdoc_to_workspace(document_id, checkout_dir, normalized_workspace)
            elif member.source_kind == "spec":
                # Extract base spec number and release from source_item_id (e.g., "21905-REL19" -> spec="21905", release="19")
                base_spec = member.source_item_id
@@ -408,7 +427,7 @@ def process_all(
                    parts = member.source_item_id.split("-REL")
                    base_spec = parts[0]
                    release = parts[1]
            folder_path = checkout_spec_to_workspace(base_spec, cache_manager.checkout_dir, normalized_workspace, release=release)
                folder_path = checkout_spec_to_workspace(base_spec, checkout_dir, normalized_workspace, release=release)

        if not folder_path or not folder_path.exists():
            logger.warning(f"Checkout folder not found for {document_id}")
@@ -416,31 +435,6 @@ def process_all(
                progress_callback(PipelineStage.FAILED, document_id)
            continue

        # Check if folder has actual files (not just empty folder)
        has_files = any(f.is_file() for f in folder_path.iterdir())
        if not has_files:
            logger.warning(f"Checkout folder is empty for {document_id}, attempting re-checkout...")
            # Try to re-checkout with fresh download
            if member.source_kind == "tdoc":
                folder_path = checkout_tdoc_to_workspace(document_id, cache_manager.checkout_dir, normalized_workspace, db_file=cache_manager.db_file)
            elif member.source_kind == "spec":
                folder_path = checkout_spec_to_workspace(base_spec, cache_manager.checkout_dir, normalized_workspace, release=release)

            # Verify re-checkout worked
            if not folder_path or not folder_path.exists():
                logger.error(f"Re-checkout failed for {document_id}")
                if progress_callback:
                    progress_callback(PipelineStage.FAILED, document_id)
                continue

            # Check again after re-checkout
            has_files = any(f.is_file() for f in folder_path.iterdir())
            if not has_files:
                logger.error(f"Re-checkout produced empty folder for {document_id}")
                if progress_callback:
                    progress_callback(PipelineStage.FAILED, document_id)
                continue

        try:
            # Get or create status for this document
            status = storage.get_status(document_id, workspace=normalized_workspace)
@@ -475,6 +469,8 @@ def process_all(
    if extracted_ids:
        logger.info(f"Starting Phase 2: Embedding {len(extracted_ids)} documents")
        for document_id in extracted_ids:
            # For non-default workspaces, require membership
            if normalized_workspace != "default":
                member = members_map.get(document_id)
                if not member:
                    logger.warning(f"No workspace member found for {document_id} in Phase 2")
@@ -482,11 +478,14 @@ def process_all(
                        progress_callback(PipelineStage.FAILED, document_id)
                    continue

            # Determine folder path using metadata-based resolution
            # Determine folder path
            folder_path = None
            cache_manager = resolve_cache_manager()

            if member.source_kind == "tdoc":
            # For default workspace: use simple path resolution
            if normalized_workspace == "default":
                folder_path = resolve_tdoc_checkout_path(document_id, cache_manager.checkout_dir)
            elif member.source_kind == "tdoc":
                folder_path = resolve_tdoc_checkout_path(document_id, cache_manager.checkout_dir)
                if not folder_path:
                    folder_path = checkout_tdoc_to_workspace(document_id, cache_manager.checkout_dir, normalized_workspace, db_file=cache_manager.db_file)
@@ -509,22 +508,7 @@ def process_all(
            # Check if folder has actual files (not just empty folder)
            has_files = any(f.is_file() for f in folder_path.iterdir())
            if not has_files:
                logger.warning(f"Checkout folder is empty for {document_id} in Phase 2, attempting re-checkout...")
                if member.source_kind == "tdoc":
                    folder_path = checkout_tdoc_to_workspace(document_id, cache_manager.checkout_dir, normalized_workspace, db_file=cache_manager.db_file)
                elif member.source_kind == "spec":
                    folder_path = checkout_spec_to_workspace(base_spec, cache_manager.checkout_dir, normalized_workspace, release=release)

                # Verify re-checkout worked
                if not folder_path or not folder_path.exists():
                    logger.error(f"Re-checkout failed for {document_id} in Phase 2")
                    if progress_callback:
                        progress_callback(PipelineStage.FAILED, document_id)
                    continue

                has_files = any(f.is_file() for f in folder_path.iterdir())
                if not has_files:
                    logger.error(f"Re-checkout produced empty folder for {document_id} in Phase 2")
                logger.warning(f"Checkout folder is empty for {document_id} in Phase 2")
                if progress_callback:
                    progress_callback(PipelineStage.FAILED, document_id)
                continue
@@ -575,6 +559,7 @@ def process_all(
            workspace=normalized_workspace,
            config=config,
            progress_callback=progress_callback,
            members_map=members_map,
        )
        # Update results with graph statuses
        for doc_id, status in graph_results.items():
@@ -586,36 +571,46 @@ def process_all(
    return results


# TODO: ambigious name, ambiguous argument, ambiguous return type - consider splitting into get_status(document_id) and list_statuses(workspace)
def get_status(document_id: str | None = None, workspace: str | None = None) -> ProcessingStatus | list[ProcessingStatus] | None:
    """Get processing status for a TDoc or all TDocs in workspace.
def get_status(document_id: str, workspace: str | None = None) -> ProcessingStatus | None:
    """Get processing status for a single TDoc.

    Args:
        document_id: Document identifier. If None, returns all statuses in workspace.
        document_id: Document identifier.
        workspace: Optional workspace scope (defaults to "default").

    Returns:
        ProcessingStatus if document_id provided and found,
        list of ProcessingStatus if document_id is None,
        None if document_id provided but not found.
        ProcessingStatus if found, None otherwise.
    """
    config = AiConfig.from_env()
    storage = EmbeddingsManager(config).storage
    normalized_workspace = normalize_workspace_name(workspace)

    if document_id is None:
        # Return all statuses in workspace
        return storage.list_statuses(normalized_workspace)

    return storage.get_status(document_id, workspace=normalized_workspace)


def list_statuses(workspace: str | None = None) -> list[ProcessingStatus]:
    """List processing statuses for all TDocs in workspace.

    Args:
        workspace: Optional workspace scope (defaults to "default").

    Returns:
        List of ProcessingStatus for all documents in workspace.
    """
    config = AiConfig.from_env()
    storage = EmbeddingsManager(config).storage
    normalized_workspace = normalize_workspace_name(workspace)

    return storage.list_statuses(normalized_workspace)


def build_all_graphs(
    document_ids: list[str],
    checkout_base: Path,
    workspace: str | None = None,
    config: AiConfig | None = None,
    progress_callback: Callable[[PipelineStage, str], None] | None = None,
    members_map: dict[str, Any] | None = None,
) -> dict[str, ProcessingStatus]:
    """Build knowledge graphs for all processed documents (second pass).

@@ -628,6 +623,7 @@ def build_all_graphs(
        workspace: Optional workspace scope (defaults to "default").
        config: Optional AiConfig instance.
        progress_callback: Optional callback for progress updates.
        members_map: Optional pre-fetched members map to avoid redundant API calls.

    Returns:
        Dict mapping document_id to ProcessingStatus.
@@ -637,12 +633,13 @@ def build_all_graphs(
    storage = embeddings_manager.storage
    normalized_workspace = normalize_workspace_name(workspace)

    # Get workspace members for path resolution
    members_map: dict[str, Any] = {}
    if normalized_workspace != "default":
    # Use provided members_map or fetch if not provided and not default workspace
    if members_map is None and normalized_workspace != "default":
        members = list_workspace_members(normalized_workspace)
        # Include both tdocs and specs
        members_map = {m.source_item_id: m for m in members if m.is_active and m.source_kind in ("tdoc", "spec")}
    elif members_map is None:
        members_map = {}

    results: dict[str, ProcessingStatus] = {}

+30 −5
Original line number Diff line number Diff line
@@ -25,6 +25,27 @@ _ = None
_ = 150
_ = 250

# Input size limits for LLM prompts
ABSTRACT_INPUT_LIMIT = 5000
SUMMARY_INPUT_LIMIT = 8000
KEYWORDS_INPUT_LIMIT = 5000


def _truncate_text(text: str, max_chars: int) -> str:
    """Truncate text to maximum character limit.

    Args:
        text: Text to truncate.
        max_chars: Maximum number of characters.

    Returns:
        Truncated text with ellipsis if truncated.
    """
    if len(text) <= max_chars:
        return text
    return text[: max_chars - 3] + "..."


# Prompt templates
SUMMARY_SYSTEM_PROMPT = """You are a technical document analyzer specializing in 3GPP TDoc documents.
Generate concise, informative summaries following the specified structure."""
@@ -223,17 +244,19 @@ def summarize_document(
    client = _get_llm_client()

    # Generate abstract
    abstract_prompt = ABSTRACT_PROMPT.format(content=markdown)  # TODO: Limit input (5000?)
    truncated_markdown = _truncate_text(markdown, ABSTRACT_INPUT_LIMIT)
    abstract_prompt = ABSTRACT_PROMPT.format(content=truncated_markdown)
    try:
        abstract = client.complete(abstract_prompt, model=config.llm_model)
    except Exception as exc:
        msg = f"LLM endpoint is unreachable or misconfigured: {exc}"
        raise LlmConfigError(msg) from exc

    # TODO: Limit input (5000?)
    # Generate structured summary
    truncated_for_structured = _truncate_text(markdown, SUMMARY_INPUT_LIMIT)
    structured_prompt = STRUCTURED_SUMMARY_PROMPT.format(
        abstract=abstract,
        content=markdown,
        content=truncated_for_structured,
    )
    try:
        structured_payload = client.complete(structured_prompt)
@@ -452,9 +475,10 @@ def summarize_tdoc(
    client = _get_llm_client()

    # Generate summary
    truncated_for_summary = _truncate_text(content, SUMMARY_INPUT_LIMIT)
    summary_prompt = CONCISE_SUMMARY_PROMPT.format(
        target_words=words,
        content=content,  # TODO: Limit input size (8000)?
        content=truncated_for_summary,
    )
    try:
        summary = client.complete(summary_prompt, model=config.llm_model, max_tokens=words * 4)
@@ -463,7 +487,8 @@ def summarize_tdoc(
        raise LlmConfigError(msg) from exc

    # Extract keywords
    keywords_prompt = KEYWORDS_PROMPT.format(content=content)  # TODO: limit content size for keyword extraction as well, maybe 5000 chars?
    truncated_for_keywords = _truncate_text(content, KEYWORDS_INPUT_LIMIT)
    keywords_prompt = KEYWORDS_PROMPT.format(content=truncated_for_keywords)
    try:
        keywords_raw = client.complete(keywords_prompt, model=config.llm_model, max_tokens=200)
        keywords = _parse_keywords(keywords_raw)
+61 −32
Original line number Diff line number Diff line
@@ -351,41 +351,16 @@ def resolve_tdoc_checkout_path(tdoc_id: str, checkout_base: Path) -> Path | None
    return None


# TODO: This function is doing a lot - consider breaking it up into smaller steps or helper functions for clarity and testability.
def checkout_tdoc_to_workspace(
    tdoc_id: str,
    checkout_base: Path,
    workspace: str | None,
    db_file: Path | None = None,
) -> Path | None:
    """Checkout a TDoc and add it to a workspace.

    Uses a fallback chain to resolve TDoc metadata:
    1. Check if already checked out to disk
    2. Check local database for existing metadata with URL
    3. Try WhatTheSpec API (unauthenticated)
    4. Try 3GPP Portal (requires credentials)
def _resolve_tdoc_metadata(tdoc_id: str, db_file: Path | None = None) -> TDocMetadata | None:
    """Resolve TDoc metadata via fallback chain: Database → WhatTheSpec → 3GPP Portal.

    Args:
        tdoc_id: TDoc identifier
        checkout_base: Base checkout directory
        workspace: Workspace name
        db_file: Optional path to database file for metadata lookup
        tdoc_id: TDoc identifier.
        db_file: Optional database file for metadata lookup.

    Returns:
        Path to the checked out TDoc folder, or None if checkout failed
        TDocMetadata if found, None otherwise.
    """
    # First check if already checked out with actual files
    existing_path = resolve_tdoc_checkout_path(tdoc_id, checkout_base)
    if existing_path:
        # Check if folder has actual document files (not just .ai subfolder)
        has_files = any(f.is_file() for f in existing_path.iterdir())
        if has_files:
            _logger.info(f"TDoc {tdoc_id} already checked out at {existing_path}")
            return existing_path
        _logger.info(f"TDoc {tdoc_id} folder exists but is empty at {existing_path}, re-downloading...")

    # Need to checkout the TDoc - use fallback chain
    metadata: TDocMetadata | None = None

    # Step 1: Check database for existing metadata
@@ -420,10 +395,32 @@ def checkout_tdoc_to_workspace(
        except Exception as e:
            _logger.debug(f"Portal lookup failed for {tdoc_id}: {e}")

    # Check if we successfully resolved metadata
    if metadata is None:
        _logger.warning(f"Could not resolve TDoc {tdoc_id} via any source")
        return None

    return metadata


def _checkout_tdoc_if_needed(tdoc_id: str, metadata: TDocMetadata, checkout_base: Path) -> Path | None:
    """Checkout a TDoc if not already checked out.

    Args:
        tdoc_id: TDoc identifier.
        metadata: Resolved TDoc metadata.
        checkout_base: Base checkout directory.

    Returns:
        Path to the checked out TDoc folder, or None if checkout failed.
    """
    # Check if already checked out with actual files
    existing_path = resolve_tdoc_checkout_path(tdoc_id, checkout_base)
    if existing_path:
        # Check if folder has actual document files (not just .ai subfolder)
        has_files = any(f.is_file() for f in existing_path.iterdir())
        if has_files:
            _logger.info(f"TDoc {tdoc_id} already checked out at {existing_path}")
            return existing_path
        _logger.info(f"TDoc {tdoc_id} folder exists but is empty at {existing_path}, re-downloading...")

    # Checkout the TDoc - returns Path directly or raises exception
    try:
@@ -442,6 +439,38 @@ def checkout_tdoc_to_workspace(
        return None


def checkout_tdoc_to_workspace(
    tdoc_id: str,
    checkout_base: Path,
    workspace: str | None,
    db_file: Path | None = None,
) -> Path | None:
    """Checkout a TDoc and add it to a workspace.

    Uses a fallback chain to resolve TDoc metadata:
    1. Check if already checked out to disk
    2. Check local database for existing metadata with URL
    3. Try WhatTheSpec API (unauthenticated)
    4. Try 3GPP Portal (requires credentials)

    Args:
        tdoc_id: TDoc identifier.
        checkout_base: Base checkout directory.
        workspace: Workspace name.
        db_file: Optional path to database file for metadata lookup.

    Returns:
        Path to the checked out TDoc folder, or None if checkout failed.
    """
    # Resolve metadata via fallback chain
    metadata = _resolve_tdoc_metadata(tdoc_id, db_file)
    if metadata is None:
        return None

    # Checkout the TDoc
    return _checkout_tdoc_if_needed(tdoc_id, metadata, checkout_base)


def checkout_spec_to_workspace(
    spec_number: str,
    checkout_base: Path,