Commit e311e66a authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(classification): enhance document classification and file handling

* Implement heuristic scoring to select the main document from a folder.
* Add a function to scan for document files while excluding hidden files.
* Remove deprecated pipeline stage and processing status models.
* Clean up warnings in the project configuration.
parent 5da13fe3
Loading
Loading
Loading
Loading
+13 −7
Original line number Diff line number Diff line
@@ -83,6 +83,7 @@ from threegpp_ai.lightrag.config import LightRAGConfig
from threegpp_ai.lightrag.metadata import RAGMetadata
from threegpp_ai.lightrag.processor import DocumentProcessor
from threegpp_ai.lightrag.rag import PROVIDER_ALIASES, PROVIDERS
from threegpp_ai.operations.classify import pick_main_document
from threegpp_ai.operations.conversion import OFFICE_FORMATS, convert_to_pdf
from threegpp_ai.operations.convert import convert_document_to_markdown
from threegpp_ai.operations.extraction import extract_document_structured
@@ -371,6 +372,9 @@ def _build_workspace_members(
def _resolve_process_file(path: Path) -> Path | None:
    """Resolve the actual document file from a path or directory.

    Uses heuristic classification (filename patterns + file size scoring)
    to pick the best document when multiple files exist in a directory.

    Args:
        path: Path to a file or directory containing documents.

@@ -382,14 +386,16 @@ def _resolve_process_file(path: Path) -> Path | None:
    if not path.is_dir():
        return None

    # Search for document files (including in subdirectories)
    for extension in [".pdf", ".docx", ".doc", ".ppt", ".pptx"]:
        # Search in the directory and one level of subdirectories
        matches = list(path.glob(f"*{extension}"))
        if not matches:
    # Use classification scoring to pick the best document
    main_file = pick_main_document(path)
    if main_file is not None:
        return main_file

    # Fallback: search subdirectories for additional formats
    for extension in [".pdf", ".docx", ".doc", ".md", ".txt"]:
        matches = list(path.rglob(f"*{extension}"))
        if matches:
            _logger.debug("Found document file for processing: %s", matches[0])
            _logger.debug("Found document file in subdirectory: %s", matches[0])
            return matches[0]

    _logger.warning("No document file found in: %s", path)
+0 −59
Original line number Diff line number Diff line
@@ -14,18 +14,6 @@ from threegpp_ai.config import AiConfig
from threegpp_ai.operations.workspace_names import normalize_workspace_name


class PipelineStage(StrEnum):
    """Stages of the AI processing pipeline."""

    PENDING = "pending"
    CLASSIFYING = "classifying"
    EXTRACTING = "extracting"
    EMBEDDING = "embedding"
    GRAPHING = "graphing"
    COMPLETED = "completed"
    FAILED = "failed"


class GraphNodeType(StrEnum):
    """Types of nodes in the knowledge graph."""

@@ -51,24 +39,6 @@ class GraphEdgeType(StrEnum):
    REVISION_OF = "revision_of"  # is_revision_of metadata relationship


class ProcessingFailureType(StrEnum):
    """Classification of processing failures to determine retry behavior.

    Permanent failures - do NOT retry:
    - NOT_FOUND_ONLINE: Document withdrawn or never existed
    - DOWNLOAD_FAILED: Could not download source file
    - BROKEN_SOURCE: Downloaded file is corrupt/invalid
    - CLASSIFICATION_FAILED: Could not identify main document

    Retryable failures - CAN retry in next run:
    - EXTRACTION_FAILED: DOCX to Markdown conversion failed
    - EMBEDDING_FAILED: Embedding generation failed
    - GRAPH_FAILED: Graph building failed
    """

    GRAPH_FAILED = "graph_failed"


# TODO: Unnecessary? We can infer from presence of WorkspaceMember records and their is_active flag - or simply use a bool variable?
class WorkspaceStatus(StrEnum):
    """Lifecycle state of a workspace."""
@@ -178,33 +148,6 @@ class ArtifactScope(BaseModel):
        return normalize_workspace_name(value)


class ProcessingStatus(BaseModel):
    """Processing state for a single TDoc."""

    document_id: str = Field(..., description="Document identifier (normalized via .upper())")
    current_stage: PipelineStage = Field(PipelineStage.PENDING, description="Current pipeline stage")
    classified_at: datetime | None = Field(None, description="Timestamp when classification completed")
    extracted_at: datetime | None = Field(None, description="Timestamp when extraction completed")
    embedded_at: datetime | None = Field(None, description="Timestamp when embedding completed")
    summarized_at: datetime | None = Field(None, description="Timestamp when summarization completed")
    graphed_at: datetime | None = Field(None, description="Timestamp when graphing completed")
    completed_at: datetime | None = Field(None, description="Timestamp when pipeline completed")
    error_message: str | None = Field(None, description="Error details for failed stage")
    failure_type: ProcessingFailureType | None = Field(None, description="Type of failure if permanent")
    source_hash: str | None = Field(None, description="Hash of source DOCX for change detection")
    keywords: list[str] | None = Field(None, description="Keywords extracted from document content")
    detected_language: str | None = Field(None, description="Primary language detected in document")

    @field_validator("document_id")
    @classmethod
    def _normalize_document_id(cls, value: str) -> str:
        normalized = normalize_tdoc_id(value)
        if not normalized:
            msg = "document_id must not be empty"
            raise ValueError(msg)
        return normalized


class DocumentClassification(BaseModel):
    """Classification of a file within a TDoc folder."""

@@ -452,8 +395,6 @@ __all__ = [
    "GraphNodeType",
    "GraphQueryResult",
    "LlmConfigError",
    "PipelineStage",
    "ProcessingStatus",
    "QueryResult",
    "SourceKind",
    "SummarizeResult",
+82 −26
Original line number Diff line number Diff line
@@ -138,6 +138,74 @@ def _determine_heuristic(
    return best_file, "file_size_fallback", 0.5


# Extensions to scan when looking for document files
_DOCUMENT_EXTENSIONS = ["*.doc", "*.docx", "*.pdf", "*.ppt", "*.pptx", "*.xls", "*.xlsx", "*.txt", "*.csv"]


def _scan_document_files(folder_path: Path) -> list[Path]:
    """Scan a folder for document files, excluding hidden files.

    Args:
        folder_path: Directory to scan.

    Returns:
        Deduplicated list of document file paths.
    """
    files: list[Path] = []
    for ext in _DOCUMENT_EXTENSIONS:
        files.extend([p for p in folder_path.glob(ext) if p.is_file() and not p.name.startswith(".")])
    return list(dict.fromkeys(files))  # Preserves order, removes duplicates


def _score_file(file: Path) -> tuple[float, int]:
    """Combine filename heuristic score with file size factor.

    Args:
        file: Document file to score.

    Returns:
        Tuple of (combined_score, file_size_bytes).
    """
    filename_score = _score_filename(file.name)
    file_size_bytes = file.stat().st_size
    size_score = min(file_size_bytes / (1024 * 1024) / 10.0, 1.0) * 0.3
    return filename_score * 0.7 + size_score, file_size_bytes


def pick_main_document(folder_path: Path) -> Path | None:
    """Pick the main document from a folder using heuristic scoring.

    Uses filename patterns, file size, and file type to determine which file
    is most likely the primary document. Does not require a TDoc ID.

    Args:
        folder_path: Path to the folder containing document files.

    Returns:
        Path to the best document file, or None if no suitable files found.
    """
    if not folder_path.exists() or not folder_path.is_dir():
        return None

    files = _scan_document_files(folder_path)
    if not files:
        return None

    if len(files) == 1:
        return files[0]

    scores: dict[Path, float] = {}
    file_sizes: dict[Path, int] = {}
    for file in files:
        combined, size = _score_file(file)
        scores[file] = combined
        file_sizes[file] = size

    best_file, _, _ = _determine_heuristic(scores, file_sizes)
    logger.debug("Picked %s as main document from %d files in %s", best_file.name, len(files), folder_path)
    return best_file


def classify_document_files(
    document_id: str,
    folder_path: Path,
@@ -156,27 +224,15 @@ def classify_document_files(
        The is_main_document field indicates the primary file.
    """
    if not folder_path.exists() or not folder_path.is_dir():
        logger.warning(f"Folder not found: {folder_path}")
        logger.warning("Folder not found: %s", folder_path)
        return []

    # Find all files matching structured and unstructured patterns
    # STRUCTURED: .doc, .docx, .pdf
    # UNSTRUCTURED: .ppt, .pptx, .xls, .xlsx, .txt, .csv
    extensions = ["*.doc", "*.docx", "*.pdf", "*.ppt", "*.pptx", "*.xls", "*.xlsx", "*.txt", "*.csv"]

    files = []
    for ext in extensions:
        files.extend([file_path for file_path in folder_path.glob(ext) if file_path.is_file() and not file_path.name.startswith(".")])

    # Remove duplicates (in case a file matches multiple patterns)
    files = list(dict.fromkeys(files))  # Preserves order, removes duplicates

    files = _scan_document_files(folder_path)
    if not files:
        logger.warning(f"No document files found in {folder_path} for document {document_id}")
        logger.warning("No document files found in %s for document %s", folder_path, document_id)
        return []

    if len(files) == 1:
        # Single file - automatic main document
        file = files[0]
        return [
            DocumentClassification(
@@ -190,28 +246,21 @@ def classify_document_files(
            )
        ]

    # Multiple files - need classification
    # Multiple files — score and classify
    scores: dict[Path, float] = {}
    file_sizes: dict[Path, int] = {}
    for file in files:
        filename_score = _score_filename(file.name)
        file_size_bytes = file.stat().st_size
        file_sizes[file] = file_size_bytes
        size_score = file_size_bytes / (1024 * 1024)
        size_score = min(size_score / 10.0, 1.0) * 0.3

        combined = filename_score * 0.7 + size_score
        combined, size = _score_file(file)
        scores[file] = combined
        file_sizes[file] = size

    best_file, heuristic, confidence = _determine_heuristic(scores, file_sizes)

    # Build result list
    result: list[DocumentClassification] = []
    for file in files:
        is_main = file == best_file
        file_score = scores[file]

        # Adjust confidence for non-main documents
        if is_main:
            conf = confidence
            heur = heuristic
@@ -231,7 +280,13 @@ def classify_document_files(
            )
        )

    logger.info(f"Classified {len(files)} files for {document_id}, main: {best_file.name} (confidence: {confidence:.2f})")
    logger.info(
        "Classified %d files for %s, main: %s (confidence: %.2f)",
        len(files),
        document_id,
        best_file.name,
        confidence,
    )

    return result

@@ -239,4 +294,5 @@ def classify_document_files(
__all__ = [
    "_score_filename",
    "classify_document_files",
    "pick_main_document",
]
+0 −5
Original line number Diff line number Diff line
@@ -71,11 +71,6 @@ pythonpath = ["src"]
markers = [
    "integration: marks tests as integration tests (slower, requires network)",
]
# Suppress Pydantic deprecation warning from pydantic-sqlite library (external dependency)
filterwarnings = [
    "ignore:.*Accessing the 'model_fields' attribute on the instance is deprecated.*:DeprecationWarning",
]


[tool.coverage.report]
skip_empty = true