Commit 75e93a5e authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(ai): implement structured extraction artifact storage with folder-based organization

parent 123ca91c
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -90,11 +90,11 @@ def test_convert_tdoc_to_markdown_writes_table_sidecar(monkeypatch: pytest.Monke

    _ = convert_ops.convert_tdoc_to_markdown("S4-260001", force=True)

    sidecar = tmp_path / ".ai" / "S4-260001_tables.json"
    sidecar = tmp_path / ".ai" / "tables" / "S4-260001_table_1_1.json"
    assert sidecar.exists()
    payload = json.loads(sidecar.read_text(encoding="utf-8"))
    assert isinstance(payload, list)
    assert payload[0]["element_id"] == "table_1"
    assert isinstance(payload, dict)
    assert payload["element_id"] == "table_1"


def test_summarize_tdoc_records_summarization_metric(monkeypatch: pytest.MonkeyPatch) -> None:
+57 −9
Original line number Diff line number Diff line
@@ -17,8 +17,14 @@ from kreuzberg import extract_file, extract_file_sync

from threegpp_ai.operations.extraction_result import (
    StructuredExtractionResult,
    build_structured_extraction_result,
    from_kreuzberg_result,
    has_cached_artifacts,
    persist_equations_from_extraction,
    persist_figures_from_extraction,
    persist_figures_from_kreuzberg_result,
    persist_tables_from_extraction,
    read_cached_artifacts,
)
from threegpp_ai.operations.figure_descriptor import describe_figures

@@ -95,7 +101,7 @@ class TDocProcessor:
        ...     await processor.rag.stop()
    """

    def __init__(self, config: LightRAGConfig | None = None):
    def __init__(self, config: LightRAGConfig | None = None) -> None:
        self.config = config or LightRAGConfig.from_env()
        self.rag = TDocRAG(self.config)

@@ -251,7 +257,11 @@ class TDocProcessor:
        return file_path.suffix.lower() in SUPPORTED_FORMATS

    @staticmethod
    async def extract_text(file_path: Path) -> StructuredExtractionResult | None:
    async def extract_text(
        file_path: Path,
        force: bool = False,
        extract_types: set[str] | None = None,
    ) -> StructuredExtractionResult | None:
        """Extract structured content from a file using kreuzberg.

        For office documents, first converts to PDF using convert-lo.
@@ -259,10 +269,16 @@ class TDocProcessor:

        Args:
            file_path: Path to the document file.
            force: If True, re-extract even if artifacts exist.
            extract_types: Optional set of artifact types to extract/persist.
                If None, extracts all types. Supported types: "tables", "figures", "equations".

        Returns:
            Structured extraction result, or None if extraction failed.
        """
        if extract_types is None:
            extract_types = {"tables", "figures", "equations"}

        pdf_path: Path | None = None

        try:
@@ -281,14 +297,38 @@ class TDocProcessor:

            ai_dir = file_path.parent / ".ai"
            ai_dir.mkdir(parents=True, exist_ok=True)
            figure_paths = persist_figures_from_kreuzberg_result(result, ai_dir / "figures")
            figure_descriptions = describe_figures(
                figure_paths,
                enabled=True,
            doc_stem = file_path.stem

            # Check for cached artifacts to skip re-extraction (only for types in extract_types)
            if not force and has_cached_artifacts(ai_dir, doc_stem, extract_types):
                md_path = ai_dir / f"{doc_stem}.md"
                if md_path.exists():
                    cached_content = md_path.read_text(encoding="utf-8")
                    cached = read_cached_artifacts(ai_dir, doc_stem)
                    if cached is not None:
                        # Filter based on extract_types
                        tables = cached.tables if "tables" in extract_types else []
                        figures = cached.figures if "figures" in extract_types else []
                        equations = cached.equations if "equations" in extract_types else []
                        return build_structured_extraction_result(
                            cached_content,
                            tables=tables,
                            figures=figures,
                            equations=equations,
                        )

            figures_dir = ai_dir / "figures"
            figure_paths: dict[str, Path] = {}
            if "figures" in extract_types:
                figure_paths = persist_figures_from_kreuzberg_result(
                    result,
                    figures_dir,
                    doc_stem=doc_stem,
                )
            figure_descriptions = describe_figures(figure_paths, enabled=True) if figure_paths else None
            extraction = from_kreuzberg_result(
                result,
                figure_paths={key: str(path) for key, path in figure_paths.items()},
                figure_paths={key: str(path) for key, path in figure_paths.items()} if figure_paths else None,
                figure_descriptions=figure_descriptions,
            )
            if extraction.content:
@@ -297,8 +337,16 @@ class TDocProcessor:
                logger.warning("No content extracted from %s", file_path)
                return None

            # Persist artifacts based on extract_types
            if extraction.table_count > 0 and "tables" in extract_types:
                persist_tables_from_extraction(extraction.tables, ai_dir, doc_stem)
            if extraction.figure_count > 0 and "figures" in extract_types:
                persist_figures_from_extraction(extraction.figures, figures_dir, doc_stem)
            if extraction.equation_count > 0 and "equations" in extract_types:
                persist_equations_from_extraction(extraction.equations, ai_dir, doc_stem)

            # Save extracted content as markdown in .ai subfolder
            md_path = ai_dir / f"{file_path.stem}.md"
            md_path = ai_dir / f"{doc_stem}.md"
            md_path.write_text(extracted_content, encoding="utf-8")
            logger.info("Saved markdown: %s", md_path)

+128 −35
Original line number Diff line number Diff line
@@ -30,7 +30,11 @@ from threegpp_ai.operations.extraction_result import (
    StructuredExtractionResult,
    build_structured_extraction_result,
    from_kreuzberg_result,
    persist_equations_from_extraction,
    persist_figures_from_extraction,
    persist_figures_from_kreuzberg_result,
    persist_tables_from_extraction,
    read_cached_artifacts,
)
from threegpp_ai.operations.fetch_tdoc import fetch_tdoc_files
from threegpp_ai.operations.figure_descriptor import describe_figures
@@ -301,6 +305,7 @@ def extract_tdoc_structured(
    document_id: str,
    force: bool = False,
    converter_config: ConverterConfig | None = None,
    extract_types: set[str] | None = None,
) -> StructuredExtractionResult:
    """Extract a TDoc into the canonical structured payload.

@@ -308,6 +313,8 @@ def extract_tdoc_structured(
        document_id: TDoc identifier (e.g., "S4-260001").
        force: Force reconversion even if cached markdown exists.
        converter_config: Optional converter backend configuration.
        extract_types: Optional set of artifact types to extract/persist.
            If None, extracts all types. Supported types: "tables", "figures", "equations".

    Returns:
        Structured extraction result.
@@ -318,6 +325,9 @@ def extract_tdoc_structured(
    """
    normalized_id = document_id.strip().upper()
    config = converter_config or ConverterConfig.from_env()
    # Default to all types if not specified
    if extract_types is None:
        extract_types = {"tables", "figures", "equations"}

    with timed_operation(get_metrics_tracker(), normalized_id, MetricType.CONVERSION):
        tdoc_files = fetch_tdoc_files(normalized_id, force_download=force)
@@ -329,22 +339,58 @@ def extract_tdoc_structured(
        ai_dir = tdoc_files.checkout_dir / ".ai"
        md_path = ai_dir / f"{normalized_id}.md"

        # Check for cached extraction: markdown + folder-based artifacts
        if md_path.exists() and not force:
            cached = md_path.read_text(encoding="utf-8")
            if len(cached) > 100:
                return _read_cached_structured(ai_dir, normalized_id, cached)
            cached_content = md_path.read_text(encoding="utf-8")
            if len(cached_content) > 100:
                # Try new folder-based artifacts first
                cached = read_cached_artifacts(ai_dir, normalized_id)
                if cached is not None:
                    # Filter artifacts based on extract_types
                    tables = cached.tables if "tables" in extract_types else []
                    figures = cached.figures if "figures" in extract_types else []
                    equations = cached.equations if "equations" in extract_types else []
                    return build_structured_extraction_result(
                        cached_content,
                        tables=tables,
                        figures=figures,
                        equations=equations,
                    )
                # Fallback: try old flat sidecar format
                if _has_flat_sidecars(ai_dir, normalized_id):
                    cached = _read_cached_structured(ai_dir, normalized_id, cached_content)
                    tables = cached.tables if "tables" in extract_types else []
                    figures = cached.figures if "figures" in extract_types else []
                    equations = cached.equations if "equations" in extract_types else []
                    return build_structured_extraction_result(
                        cached_content,
                        tables=tables,
                        figures=figures,
                        equations=equations,
                    )

        if primary.suffix.lower() == ".pdf":
            result = extract_file_sync(str(primary))
            extraction = _build_structured_from_result(result, ai_dir=ai_dir)
            extraction = _build_structured_from_result(
                result,
                ai_dir=ai_dir,
                doc_stem=normalized_id,
                extract_types=extract_types,
            )
        else:
            extraction = _convert_document_to_structured(primary, config, ai_dir=ai_dir)
            extraction = _convert_document_to_structured(
                primary,
                config,
                ai_dir=ai_dir,
                doc_stem=normalized_id,
                extract_types=extract_types,
            )

        markdown = extraction.content

        ai_dir.mkdir(parents=True, exist_ok=True)
        md_path.write_text(markdown, encoding="utf-8")
        _write_structured_sidecars(ai_dir, normalized_id, extraction)
        _persist_extraction_artifacts(ai_dir, normalized_id, extraction, extract_types=extract_types)

        return extraction

@@ -354,12 +400,17 @@ def _convert_document_to_structured(
    config: ConverterConfig,
    *,
    ai_dir: Path | None = None,
    doc_stem: str | None = None,
    extract_types: set[str] | None = None,
) -> StructuredExtractionResult:
    """Convert non-PDF document to structured extraction via configured backend.

    Args:
        primary: Path to source document (DOCX, DOC, etc.)
        config: Converter configuration
        ai_dir: Optional .ai directory for artifact storage.
        doc_stem: Document stem for naming artifacts.
        extract_types: Optional set of artifact types to extract/persist.

    Returns:
        Structured extraction payload
@@ -367,34 +418,46 @@ def _convert_document_to_structured(
    Raises:
        ConversionError: If all conversion attempts fail
    """
    if extract_types is None:
        extract_types = {"tables", "figures", "equations"}

    if config.backend == ConverterBackend.REMOTE:
        # Remote only - no fallback
        with tempfile.TemporaryDirectory() as tmpdir:
            pdf_path = _convert_via_remote(primary, Path(tmpdir), config)
            result = extract_file_sync(str(pdf_path))
            return _build_structured_from_result(result, ai_dir=ai_dir)
            return _build_structured_from_result(result, ai_dir=ai_dir, doc_stem=doc_stem, extract_types=extract_types)

    if config.backend == ConverterBackend.LIBREOFFICE:
        # LibreOffice only - no fallback
        return _convert_via_libreoffice(primary, ai_dir=ai_dir)
        return _convert_via_libreoffice(primary, ai_dir=ai_dir, doc_stem=doc_stem, extract_types=extract_types)

    # AUTO: Try LibreOffice first, fallback to remote on failure
    try:
        return _convert_via_libreoffice(primary)
        return _convert_via_libreoffice(primary, ai_dir=ai_dir, doc_stem=doc_stem, extract_types=extract_types)
    except Exception as e:
        logger.warning(f"LibreOffice conversion failed for {primary.name}: {e}")
        logger.info(f"Falling back to remote converter for {primary.name}")
        with tempfile.TemporaryDirectory() as tmpdir:
            pdf_path = _convert_via_remote(primary, Path(tmpdir), config)
            result = extract_file_sync(str(pdf_path))
            return _build_structured_from_result(result, ai_dir=ai_dir)
            return _build_structured_from_result(result, ai_dir=ai_dir, doc_stem=doc_stem, extract_types=extract_types)


def _convert_via_libreoffice(primary: Path, *, ai_dir: Path | None = None) -> StructuredExtractionResult:
def _convert_via_libreoffice(
    primary: Path,
    *,
    ai_dir: Path | None = None,
    doc_stem: str | None = None,
    extract_types: set[str] | None = None,
) -> StructuredExtractionResult:
    """Convert document to structured extraction via LibreOffice.

    Args:
        primary: Path to source document (DOCX, DOC, etc.)
        ai_dir: Optional .ai directory for artifact storage.
        doc_stem: Document stem for naming artifacts.
        extract_types: Optional set of artifact types to extract/persist.

    Returns:
        Structured extraction payload
@@ -407,41 +470,53 @@ def _convert_via_libreoffice(primary: Path, *, ai_dir: Path | None = None) -> St
        with tempfile.TemporaryDirectory() as tmpdir:
            conversion_result = converter.convert(primary, LibreOfficeFormat.PDF, Path(tmpdir))
            result = extract_file_sync(str(conversion_result.output_path))
            return _build_structured_from_result(result, ai_dir=ai_dir)
            return _build_structured_from_result(result, ai_dir=ai_dir, doc_stem=doc_stem, extract_types=extract_types)
    except Exception as e:
        msg = f"LibreOffice conversion failed for {primary.name}: {e}"
        logger.error(msg)
        raise ConversionError(msg) from e


def _write_structured_sidecars(ai_dir: Path, doc_stem: str, extraction: StructuredExtractionResult) -> None:
    """Write optional structured sidecars for extracted rich elements.
def _persist_extraction_artifacts(
    ai_dir: Path,
    doc_stem: str,
    extraction: StructuredExtractionResult,
    *,
    extract_types: set[str] | None = None,
) -> None:
    """Persist extracted artifacts to folder-based storage.

    Args:
        ai_dir: Destination `.ai` directory.
        doc_stem: Base filename stem (typically normalized document ID).
        extraction: Structured extraction payload.
        extract_types: Optional set of artifact types to persist.
    """
    if extraction.table_count > 0:
        tables_path = ai_dir / f"{doc_stem}_tables.json"
        tables_path.write_text(
            json.dumps([table.model_dump(mode="json") for table in extraction.tables], indent=2),
            encoding="utf-8",
        )
    if extract_types is None:
        extract_types = {"tables", "figures", "equations"}

    if extraction.figure_count > 0:
        figures_path = ai_dir / f"{doc_stem}_figures.json"
        figures_path.write_text(
            json.dumps([figure.model_dump(mode="json") for figure in extraction.figures], indent=2),
            encoding="utf-8",
        )
    if extraction.table_count > 0 and "tables" in extract_types:
        persist_tables_from_extraction(extraction.tables, ai_dir, doc_stem)

    if extraction.equation_count > 0:
        equations_path = ai_dir / f"{doc_stem}_equations.json"
        equations_path.write_text(
            json.dumps([equation.model_dump(mode="json") for equation in extraction.equations], indent=2),
            encoding="utf-8",
        )
    if extraction.figure_count > 0 and "figures" in extract_types:
        figures_dir = ai_dir / "figures"
        persist_figures_from_extraction(extraction.figures, figures_dir, doc_stem)

    if extraction.equation_count > 0 and "equations" in extract_types:
        persist_equations_from_extraction(extraction.equations, ai_dir, doc_stem)


def _has_flat_sidecars(ai_dir: Path, doc_stem: str) -> bool:
    """Check if old flat sidecar format exists for backward compatibility.

    Args:
        ai_dir: The `.ai` directory.
        doc_stem: Base filename stem.

    Returns:
        True if any flat sidecar file exists.
    """
    return any((ai_dir / f"{doc_stem}_{suffix}").exists() for suffix in ("_tables.json", "_figures.json", "_equations.json"))


def _read_cached_structured(ai_dir: Path, doc_stem: str, cached_content: str) -> StructuredExtractionResult:
@@ -472,11 +547,29 @@ def _read_cached_structured(ai_dir: Path, doc_stem: str, cached_content: str) ->
    )


def _build_structured_from_result(result: object, *, ai_dir: Path | None = None) -> StructuredExtractionResult:
def _build_structured_from_result(
    result: object,
    *,
    ai_dir: Path | None = None,
    doc_stem: str | None = None,
    extract_types: set[str] | None = None,
) -> StructuredExtractionResult:
    """Build structured extraction from kreuzberg result.

    Args:
        result: kreuzberg extraction result.
        ai_dir: Optional .ai directory for artifact storage.
        doc_stem: Document stem for naming artifacts.
        extract_types: Optional set of artifact types to extract/persist.

    Returns:
        Structured extraction payload.
    """
    figure_paths: dict[str, str] | None = None
    figure_descriptions: dict[str, str] | None = None
    if ai_dir is not None:
        persisted = persist_figures_from_kreuzberg_result(result, ai_dir / "figures")
    if ai_dir is not None and extract_types is not None and "figures" in extract_types:
        figures_dir = ai_dir / "figures"
        persisted = persist_figures_from_kreuzberg_result(result, figures_dir, doc_stem=doc_stem)
        figure_paths = {key: str(path) for key, path in persisted.items()}
        figure_descriptions = describe_figures(persisted, enabled=True)

+282 −1

File changed.

Preview size limit exceeded, changes collapsed.