Commit 0e39e5a9 authored by Jan Reimes's avatar Jan Reimes
Browse files

refactor(3gpp-ai): remove dead code

- Delete unused chunking.py module (no imports, superseded by extraction)
- Remove graph models (GraphNodeType, GraphEdgeType, GraphNode, GraphEdge)
- Remove unused exceptions (AiConfigError, EmbeddingDimensionError)
- Remove dead DocumentSummary model and summarize_document function
- Remove dead persist_figures_from_docling_result function
- Delete orphaned test_chunking.py
parent 2908a858
Loading
Loading
Loading
Loading
+9 −105
Original line number Diff line number Diff line
@@ -7,7 +7,7 @@ from datetime import datetime
from enum import StrEnum, auto
from typing import Any

from pydantic import BaseModel, Field, field_validator, model_validator
from pydantic import BaseModel, Field, field_validator
from tdoc_crawler.utils.misc import utc_now
from tdoc_crawler.utils.normalization import normalize_tdoc_id

@@ -15,31 +15,6 @@ from threegpp_ai.config import AiConfig
from threegpp_ai.operations.workspace_names import normalize_workspace_name


class GraphNodeType(StrEnum):
    """Types of nodes in the knowledge graph."""

    DOCUMENT = auto()
    TDOC = auto()
    MEETING = auto()
    SPEC = auto()
    WORK_ITEM = "work_item"
    CHANGE_REQUEST = "cr"
    COMPANY = auto()


# Use LLM to synthesize answer from graph + embeddings (GraphRAG)


class GraphEdgeType(StrEnum):
    """Types of edges in the knowledge graph."""

    DISCUSSES = auto()
    REFERENCES = auto()
    AUTHORED_BY = auto()
    PRESENTED_AT = auto()
    REVISION_OF = auto()  # is_revision_of metadata relationship


class ExtractionQualityStatus(StrEnum):
    """Deterministic extraction quality status."""

@@ -85,14 +60,6 @@ class LlmConfigError(AiError):
    """LLM endpoint not configured or unreachable."""


class AiConfigError(AiError):
    """Invalid or missing AI configuration."""


class EmbeddingDimensionError(AiError):
    """Embedding model dimension mismatch with stored vectors."""


class WorkspaceNotFoundError(AiError):
    """Workspace does not exist in registry."""

@@ -117,7 +84,11 @@ class Workspace:

@dataclass
class WorkspaceMember:
    """Source item assigned to one workspace corpus."""
    """Source item assigned to one workspace corpus.

    NOTE: This class is being merged into workspace_registry.WorkspaceMember.
    Prefer importing from workspace_registry for new code.
    """

    workspace_name: str = field(metadata={"description": "Workspace identifier"})
    source_item_id: str = field(metadata={"description": "Stable source item identifier"})
@@ -140,13 +111,10 @@ class WorkspaceMember:
            raise ValueError(msg)

        self.source_item_id = normalized
        self.source_kind = SourceKind(self.source_kind)  # Ensure source_kind is a SourceKind enum
        self.source_kind = SourceKind(self.source_kind)

    def to_dict(self) -> dict[str, Any]:
        """Serialize to dict compatible with WorkspaceMetadata storage.

        Returns added_at as ISO string for JSON serialization.
        """
        """Serialize to dict compatible with WorkspaceMetadata storage."""
        result = asdict(self)
        result["source_kind"] = self.source_kind.value
        result["added_by"] = self.added_by or ""
@@ -311,64 +279,6 @@ class StructuredExtractionResult(BaseModel):
        return len(self.equations)


class DocumentSummary(BaseModel):
    """AI-generated summary for a TDoc."""

    document_id: str = Field(..., description="Document identifier (normalized via .upper())")
    abstract: str = Field(..., description="150-250 word abstract")
    key_points: list[str] = Field(default_factory=list, description="Key findings")
    action_items: list[str] = Field(default_factory=list, description="Action items")
    decisions: list[str] = Field(default_factory=list, description="Decisions recorded")
    affected_specs: list[str] = Field(default_factory=list, description="Affected specification IDs")
    llm_model: str = Field(
        default_factory=lambda: AiConfig().llm_model,
        description="Model used for generation",
    )
    prompt_version: str = Field("v1", description="Prompt template version")
    generated_at: datetime = Field(default_factory=utc_now, description="Generation timestamp")

    @field_validator("document_id")
    @classmethod
    def _normalize_document_id(cls, value: str) -> str:
        normalized = normalize_tdoc_id(value)
        if not normalized:
            msg = "document_id must not be empty"
            raise ValueError(msg)
        return normalized


class GraphNode(BaseModel):
    """A node in the temporal knowledge graph."""

    node_id: str = Field(..., description="Unique node identifier")
    node_type: GraphNodeType = Field(..., description="Node type")
    label: str = Field(..., description="Human-readable label")
    valid_from: datetime | None = Field(None, description="Temporal validity start")
    valid_to: datetime | None = Field(None, description="Temporal validity end")
    properties: dict[str, Any] = Field(default_factory=dict, description="Type-specific properties")
    created_at: datetime = Field(default_factory=utc_now, description="Node creation timestamp")


class GraphEdge(BaseModel):
    """An edge in the temporal knowledge graph."""

    edge_id: str = Field(..., description="Edge identifier '{source}->{edge_type}->{target}'")
    source_id: str = Field(..., description="Source node id")
    target_id: str = Field(..., description="Target node id")
    edge_type: GraphEdgeType = Field(..., description="Edge type")
    weight: float = Field(1.0, description="Relationship strength")
    temporal_context: str | None = Field(None, description="Meeting or date context")
    provenance: str = Field(..., description="How this edge was derived")
    created_at: datetime = Field(default_factory=utc_now, description="Edge creation timestamp")

    @model_validator(mode="after")
    def _validate_weight(self) -> GraphEdge:
        if self.weight <= 0:
            msg = "weight must be positive"
            raise ValueError(msg)
        return self


class SummarizeResult(BaseModel):
    """Result of TDoc summarization operation."""

@@ -390,13 +300,10 @@ class SummarizeResult(BaseModel):


__all__ = [
    "AiConfigError",
    "AiError",
    "ConversionError",
    "DocumentClassification",
    "DocumentMetadataContract",
    "DocumentSummary",
    "EmbeddingDimensionError",
    "ExtractedEquationElement",
    "ExtractedFigureElement",
    "ExtractedTableElement",
@@ -404,10 +311,6 @@ __all__ = [
    "ExtractionQualityReasonCode",
    "ExtractionQualityReport",
    "ExtractionQualityStatus",
    "GraphEdge",
    "GraphEdgeType",
    "GraphNode",
    "GraphNodeType",
    "LlmConfigError",
    "PageMetadataContract",
    "QualityGateCheckResult",
@@ -417,4 +320,5 @@ __all__ = [
    "TDocNotFoundError",
    "Workspace",
    "WorkspaceMember",
    "WorkspaceNotFoundError",
]
+0 −394
Original line number Diff line number Diff line
"""Document chunking strategies for large documents.

Provides semantic-aware chunking to split large documents into manageable
pieces while preserving context and section boundaries.
"""

from __future__ import annotations

import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Any

import tiktoken
from tdoc_crawler.logging import get_logger

logger = get_logger(__name__)

# Default encoding for token counting (cl100k_base is used by GPT-4, GPT-3.5-turbo)
_DEFAULT_ENCODING = "cl100k_base"


class ChunkingStrategy(Enum):
    """Document chunking strategies."""

    TRUNCATE = "truncate"  # Simple truncation at max_tokens
    SEMANTIC = "semantic"  # Split on section boundaries (markdown headers)
    OVERLAP = "overlap"  # Overlapping chunks with context window
    STRUCTURAL = "structural"  # Preserve protected structural blocks


@dataclass
class ChunkingConfig:
    """Configuration for document chunking.

    Attributes:
        strategy: Chunking strategy to use.
        max_tokens: Maximum tokens per chunk.
        overlap_tokens: Number of overlapping tokens between chunks (for overlap strategy).
        respect_sections: Whether to avoid splitting within sections (for semantic strategy).
        encoding: Tiktoken encoding name for token counting.
    """

    strategy: ChunkingStrategy = ChunkingStrategy.TRUNCATE
    max_tokens: int = 100_000
    overlap_tokens: int = 500
    respect_sections: bool = True
    encoding: str = field(default=_DEFAULT_ENCODING, repr=False)

    _encoder: tiktoken.Encoding | None = field(default=None, repr=False, compare=False)

    def get_encoder(self) -> tiktoken.Encoding:
        """Get or create the tiktoken encoder."""
        if self._encoder is None:
            self._encoder = tiktoken.get_encoding(self.encoding)
        return self._encoder

    def count_tokens(self, text: str) -> int:
        """Count tokens in text using the configured encoding."""
        return len(self.get_encoder().encode(text))


def _find_section_boundaries(content: str) -> list[int]:
    """Find positions of markdown section boundaries.

    Identifies positions after:
    - Markdown headers (# , ##, ###, etc.)
    - Horizontal rules (---, ***)
    - Double newlines (paragraph breaks)

    Args:
        content: Markdown content to analyze.

    Returns:
        List of character positions suitable for splitting.
    """
    boundaries: list[int] = []

    # Pattern for markdown headers (## Header, ### Header, etc.)
    header_pattern = re.compile(r"^(#{1,6})\s+.+$", re.MULTILINE)

    # Pattern for horizontal rules
    hr_pattern = re.compile(r"^(?:---|\*\*\*|___)\s*$", re.MULTILINE)

    # Find all header positions (split BEFORE headers)
    for match in header_pattern.finditer(content):
        boundaries.append(match.start())

    # Find horizontal rule positions
    for match in hr_pattern.finditer(content):
        boundaries.append(match.end())

    # Find double newline positions (paragraph breaks)
    # Only add if not too close to existing boundaries
    para_pattern = re.compile(r"\n\n+")
    for match in para_pattern.finditer(content):
        pos = match.start()
        # Only add if not within 100 chars of existing boundary
        if not any(abs(pos - b) < 100 for b in boundaries):
            boundaries.append(pos)

    # Sort and deduplicate
    boundaries = sorted(set(boundaries))
    return boundaries


def chunk_by_tokens(content: str, max_tokens: int, config: ChunkingConfig | None = None) -> list[str]:
    """Split content by token count.

    Simple token-based splitting without semantic awareness.
    Splits at the last space before max_tokens to avoid mid-word splits.

    Args:
        content: Text content to chunk.
        max_tokens: Maximum tokens per chunk.
        config: Optional chunking config (uses defaults if not provided).

    Returns:
        List of text chunks, each under max_tokens.
    """
    if config is None:
        config = ChunkingConfig()

    encoder = config.get_encoder()
    tokens = encoder.encode(content)

    if len(tokens) <= max_tokens:
        return [content]

    chunks: list[str] = []
    start = 0

    while start < len(tokens):
        end = min(start + max_tokens, len(tokens))
        chunk_tokens = tokens[start:end]
        chunk_text = encoder.decode(chunk_tokens)

        # Try to find a good break point (last space/newline)
        if end < len(tokens):
            # Look for last space or newline in decoded text
            last_space = max(chunk_text.rfind(" "), chunk_text.rfind("\n"))
            if last_space > len(chunk_text) // 2:
                # Re-encode up to the break point
                chunk_text = chunk_text[: last_space + 1]
                # Adjust token count for next iteration
                actual_tokens = encoder.encode(chunk_text)
                end = start + len(actual_tokens)

        chunks.append(chunk_text.strip())
        start = end

    return chunks


def chunk_semantic(content: str, config: ChunkingConfig) -> list[str]:
    """Split content on semantic boundaries (section headers, etc.).

    Respects markdown structure to keep related content together.
    Falls back to token-based splitting for sections exceeding max_tokens.

    Args:
        content: Markdown content to chunk.
        config: Chunking configuration.

    Returns:
        List of semantically coherent chunks.
    """
    encoder = config.get_encoder()
    tokens = encoder.encode(content)

    if len(tokens) <= config.max_tokens:
        return [content]

    # Find section boundaries
    boundaries = _find_section_boundaries(content)
    boundaries = [0] + boundaries + [len(content)]

    chunks: list[str] = []
    current_chunk_start = 0
    current_chunk_end = 0

    for i in range(len(boundaries) - 1):
        section_start = boundaries[i]
        section_end = boundaries[i + 1]
        section_text = content[section_start:section_end]
        section_tokens = len(encoder.encode(section_text))

        # Check if adding this section would exceed max_tokens
        potential_chunk = content[current_chunk_start:section_end]
        potential_tokens = len(encoder.encode(potential_chunk))

        if potential_tokens <= config.max_tokens:
            # Include this section in current chunk
            current_chunk_end = section_end
        else:
            # Current chunk is full, save it
            if current_chunk_start < current_chunk_end:
                chunk_text = content[current_chunk_start:current_chunk_end].strip()
                if chunk_text:
                    chunks.append(chunk_text)

            # Start new chunk with this section
            current_chunk_start = section_start
            current_chunk_end = section_end

            # If single section exceeds max_tokens, use token-based splitting
            if section_tokens > config.max_tokens and not config.respect_sections:
                sub_chunks = chunk_by_tokens(section_text, config.max_tokens, config)
                chunks.extend(sub_chunks)
                # This section is fully consumed by token-based sub-chunks.
                # Continue chunking from the next section boundary.
                current_chunk_start = section_end
                current_chunk_end = section_end

    # Add final chunk
    if current_chunk_start < len(content):
        final_chunk = content[current_chunk_start:].strip()
        if final_chunk:
            chunks.append(final_chunk)

    return chunks if chunks else [content]


def chunk_with_overlap(content: str, config: ChunkingConfig) -> list[str]:
    """Split content with overlapping windows for context preservation.

    Each chunk overlaps with the previous one by overlap_tokens to maintain
    context across chunk boundaries.

    Args:
        content: Text content to chunk.
        config: Chunking configuration (uses overlap_tokens for overlap size).

    Returns:
        List of overlapping text chunks.
    """
    encoder = config.get_encoder()
    tokens = encoder.encode(content)

    if len(tokens) <= config.max_tokens:
        return [content]

    chunks: list[str] = []
    start = 0
    step = config.max_tokens - config.overlap_tokens

    # Ensure step is positive
    if step <= 0:
        step = config.max_tokens // 2

    while start < len(tokens):
        end = min(start + config.max_tokens, len(tokens))
        chunk_tokens = tokens[start:end]
        chunk_text = encoder.decode(chunk_tokens)

        if chunk_text.strip():
            chunks.append(chunk_text)

        if end >= len(tokens):
            break

        start += step

    return chunks


def chunk_structural(content: str, config: ChunkingConfig) -> list[str]:
    """Split content while preserving protected structural blocks.

    Protected blocks include equation notations and structural comment markers
    for tables/figures/equations generated by extraction.
    """
    protected_pattern = re.compile(
        r"(\$\$.*?\$\$|\\\[.*?\\\]|\\begin\{equation\}.*?\\end\{equation\}|<!--\s*(?:table|figure|equation):.*?-->)",
        re.DOTALL,
    )

    parts = protected_pattern.split(content)
    if len(parts) == 1:
        return chunk_semantic(content, config)

    chunks: list[str] = []
    current = ""

    def _flush_current() -> None:
        nonlocal current
        current = current.strip()
        if current:
            chunks.extend(chunk_semantic(current, config))
        current = ""

    for part in parts:
        if not part or not part.strip():
            continue

        is_protected = protected_pattern.fullmatch(part.strip()) is not None
        if is_protected:
            _flush_current()
            chunks.append(part.strip())
            continue

        candidate = f"{current}\n{part}" if current else part
        if config.count_tokens(candidate) > config.max_tokens and current.strip():
            _flush_current()
            current = part
        else:
            current = candidate

    _flush_current()
    return chunks


def chunk_document(content: str, config: ChunkingConfig | None = None) -> list[str]:
    """Chunk a document using the configured strategy.

    Main entry point for document chunking. Selects the appropriate
    chunking algorithm based on the strategy in config.

    Args:
        content: Document text content (typically markdown).
        config: Chunking configuration. Uses defaults if not provided.

    Returns:
        List of document chunks.

    Raises:
        ValueError: If an unknown chunking strategy is specified.
    """
    if config is None:
        config = ChunkingConfig()

    if not content or not content.strip():
        return []

    strategy = config.strategy

    if strategy == ChunkingStrategy.TRUNCATE:
        result = chunk_by_tokens(content, config.max_tokens, config)
    elif strategy == ChunkingStrategy.SEMANTIC:
        result = chunk_semantic(content, config)
    elif strategy == ChunkingStrategy.OVERLAP:
        result = chunk_with_overlap(content, config)
    elif strategy == ChunkingStrategy.STRUCTURAL:
        result = chunk_structural(content, config)
    else:
        msg = f"Unknown chunking strategy: {strategy}"
        raise ValueError(msg)

    logger.debug(f"Chunked document using {strategy.value} strategy: {len(result)} chunks, avg {sum(len(c) for c in result) // max(len(result), 1)} chars")

    return result


def chunk_hybrid(dl_doc: Any, *, max_tokens: int = 100_000) -> list[str]:
    """Chunk a docling document using HybridChunker for semantic boundaries.

    HybridChunker preserves document structure (headings, paragraphs, tables, figures)
    and creates chunks with proper semantic boundaries. This replaces the older
    regex-based chunking strategies.

    Args:
        dl_doc: A docling DoclingDocument (result.document from ConvertResult).
        max_tokens: Maximum tokens per chunk.

    Returns:
        List of text chunks.
    """
    from docling.chunking import HybridChunker

    chunker = HybridChunker(max_tokens=max_tokens)
    chunks: list[str] = []
    for chunk in chunker.chunk(dl_doc):
        text = getattr(chunk, "text", "") if hasattr(chunk, "text") else str(chunk)
        if text:
            chunks.append(text)
    return chunks
    chunks: list[str] = []
    for chunk in chunker.chunk(dl_doc):
        text = getattr(chunk, "text", "") if hasattr(chunk, "text") else str(chunk)
        if text:
            chunks.append(text)
    return chunks


__all__ = [
    "ChunkingConfig",
    "ChunkingStrategy",
    "chunk_by_tokens",
    "chunk_document",
    "chunk_hybrid",
    "chunk_semantic",
    "chunk_structural",
    "chunk_with_overlap",
]
+0 −55
Original line number Diff line number Diff line
@@ -30,9 +30,6 @@ from threegpp_ai.models import (
    StructuredExtractionResult,
)

if TYPE_CHECKING:
    from docling.document_converter import ConversionResult

logger = get_logger(__name__)

_EQUATION_PATTERNS = [
@@ -271,57 +268,6 @@ def persist_figures_from_extraction(
    return path_map


def persist_figures_from_docling_result(
    result: ConversionResult,
    figures_dir: Path,
    doc_stem: str | None = None,
) -> dict[str, Path]:
    """Persist extracted figure bytes to disk and return resolved paths.

    Args:
        result: Object returned by docling DocumentConverter.convert().
        figures_dir: Target directory for figure artifacts.
        doc_stem: Document stem for naming (e.g., "S4-250638"). If None, uses legacy naming.

    Returns:
        Mapping from generated figure id (figure_N) to file path.
    """
    # Docling stores pictures in result.document.pictures
    doc = getattr(result, "document", None)
    if doc is None:
        return {}

    image_items: Sequence[Any] = getattr(doc, "pictures", []) or []
    if not image_items:
        return {}

    figures_dir.mkdir(parents=True, exist_ok=True)
    path_map: dict[str, Path] = {}

    for index, image in enumerate(image_items, start=1):
        # Docling PictureItem.get_image(doc) returns image bytes
        try:
            image_bytes = image.get_image(doc)
        except Exception:
            image_bytes = None

        image_format = str(getattr(image, "format", "png") or "png").lower()
        page = getattr(image, "page_number", None)

        if not isinstance(image_bytes, bytes) or not image_bytes:
            continue

        extension = "jpg" if image_format == "jpeg" else image_format

        filename = f"{doc_stem}_figure_{page or 'unknown'}_{index}.{extension}" if doc_stem else f"figure_{index}.{extension}"

        image_path = figures_dir / filename
        image_path.write_bytes(image_bytes)
        path_map[f"figure_{index}"] = image_path

    return path_map


def build_structured_extraction_result(
    content: str,
    *,
@@ -1056,7 +1002,6 @@ __all__ = [
    "has_cached_artifacts",
    "persist_canonical_output",
    "persist_equations_from_extraction",
    "persist_figures_from_docling_result",
    "persist_figures_from_extraction",
    "persist_output_contracts",
    "persist_output_manifest",
+1 −69
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@ from tdoc_crawler.utils.misc import utc_now
from tdoc_crawler.utils.normalization import normalize_tdoc_id

from threegpp_ai.config import AiConfig, QualityPolicyMode
from threegpp_ai.models import DocumentSummary, LlmConfigError, SummarizeResult
from threegpp_ai.models import LlmConfigError, SummarizeResult

from .convert import extract_tdoc_structured
from .llm_client import LiteLLMClient
@@ -256,74 +256,6 @@ Source document:
"""


def summarize_document(
    document_id: str,
    markdown: str,
) -> DocumentSummary:
    """Generate summary for a document."""
    config = AiConfig()

    client = _get_llm_client()

    truncated_markdown = _truncate_text(markdown, ABSTRACT_INPUT_LIMIT)
    abstract_prompt = ABSTRACT_PROMPT.format(content=truncated_markdown)
    try:
        abstract = client.complete(abstract_prompt, model=config.llm_model)
    except (litellm_exceptions.OpenAIError, ValueError) as exc:
        msg = f"LLM endpoint is unreachable or misconfigured: {exc}"
        raise LlmConfigError(msg) from exc

    truncated_for_structured = _truncate_text(markdown, SUMMARY_INPUT_LIMIT)
    structured_prompt = STRUCTURED_SUMMARY_PROMPT.format(
        abstract=abstract,
        content=truncated_for_structured,
    )
    try:
        structured_payload = client.complete(structured_prompt)
    except (litellm_exceptions.OpenAIError, ValueError) as exc:
        msg = f"LLM endpoint is unreachable or misconfigured: {exc}"
        raise LlmConfigError(msg) from exc

    structured = _parse_structured_summary(structured_payload)

    original_word_count = _count_words(markdown)
    word_count = _count_words(abstract)

    if word_count < config.abstract_min_words:
        if original_word_count >= config.abstract_min_words:
            logger.warning(
                f"Abstract for {document_id} shorter than minimum: {word_count} words "
                f"(minimum: {config.abstract_min_words}, original document: {original_word_count} words)"
            )
        else:
            logger.debug(
                f"Abstract for {document_id} has {word_count} words, "
                f"but original document is short ({original_word_count} words, minimum: {config.abstract_min_words})"
            )
    elif word_count > config.abstract_max_words:
        logger.warning(f"Abstract for {document_id} exceeds maximum: {word_count} words (maximum: {config.abstract_max_words})")

    key_points = structured["key_points"] or _extract_key_points(abstract)
    action_items = structured["action_items"] or _extract_action_items(markdown)
    decisions = structured["decisions"] or _extract_decisions(markdown)
    affected_specs = structured["affected_specs"] or _extract_spec_references(markdown)

    summary = DocumentSummary(
        document_id=document_id,
        abstract=abstract,
        key_points=key_points,
        action_items=action_items,
        decisions=decisions,
        affected_specs=affected_specs,
        llm_model=config.llm_model,
        prompt_version="v2",
        generated_at=utc_now(),
    )

    logger.info(f"Generated summary for {document_id}")
    return summary


def _extract_key_points(abstract: str) -> list[str]:
    """Extract key points from text."""
    sentences = abstract.split(". ")

tests/ai/test_chunking.py

deleted100644 → 0
+0 −104

File deleted.

Preview size limit exceeded, changes collapsed.