Loading packages/3gpp-ai/threegpp_ai/lightrag/processor.py +15 −6 Original line number Diff line number Diff line Loading @@ -12,6 +12,7 @@ from enum import StrEnum from pathlib import Path from typing import Any from threegpp_ai.models import ConversionError, ExtractionError from threegpp_ai.operations.conversion import OFFICE_FORMATS from threegpp_ai.operations.extraction import extract_document_structured Loading Loading @@ -118,11 +119,19 @@ class DocumentProcessor: ) # Extract using the unified pipeline try: extraction = extract_document_structured( file_path, metadata=None, # Metadata enrichment happens at this level, not in extraction extract_types=extract_types, ) except (ExtractionError, ConversionError) as e: logger.error("Extraction failed for %s: %s", file_path, e) return ProcessingResult( file_path=str(file_path), status=ProcessingResultStatus.ERROR, error=str(e), ) text = extraction.content Loading Loading @@ -206,7 +215,7 @@ class DocumentProcessor: figure_count=extraction.figure_count, equation_count=extraction.equation_count, ) except Exception as e: except (ConnectionError, TimeoutError, OSError, RuntimeError, ValueError) as e: logger.error("Failed to insert %s into LightRAG: %s", file_path, e) return ProcessingResult( file_path=str(file_path), Loading packages/3gpp-ai/threegpp_ai/operations/summarize.py +6 −5 Original line number Diff line number Diff line Loading @@ -6,6 +6,7 @@ import json import logging import re from litellm import exceptions as litellm_exceptions from tdoc_crawler.utils.misc import utc_now from threegpp_ai.config import AiConfig Loading Loading @@ -143,7 +144,7 @@ def summarize_document( abstract_prompt = ABSTRACT_PROMPT.format(content=truncated_markdown) try: abstract = client.complete(abstract_prompt, model=config.llm_model) except Exception as exc: except (litellm_exceptions.OpenAIError, ValueError) as exc: msg = f"LLM endpoint is unreachable or misconfigured: {exc}" raise LlmConfigError(msg) from exc Loading @@ -155,7 +156,7 @@ def summarize_document( ) try: structured_payload = client.complete(structured_prompt) except Exception as exc: except (litellm_exceptions.OpenAIError, ValueError) as exc: msg = f"LLM endpoint is unreachable or misconfigured: {exc}" raise LlmConfigError(msg) from exc Loading Loading @@ -333,7 +334,7 @@ def summarize_tdoc( ) try: summary = client.complete(summary_prompt, model=config.llm_model, max_tokens=max_words * 4) except Exception as exc: except (litellm_exceptions.OpenAIError, ValueError) as exc: msg = f"LLM endpoint is unreachable or misconfigured: {exc}" raise LlmConfigError(msg) from exc Loading @@ -343,8 +344,8 @@ def summarize_tdoc( try: keywords_raw = client.complete(keywords_prompt, model=config.llm_model, max_tokens=200) keywords = _parse_keywords(keywords_raw) except Exception as e: logger.warning(f"Keyword extraction failed: {e}") except (litellm_exceptions.OpenAIError, ValueError) as exc: logger.warning(f"Keyword extraction failed: {exc}") keywords = [] # Count actual words Loading tests/ai/test_pipeline_errors.py 0 → 100644 +270 −0 Original line number Diff line number Diff line """Tests for AI pipeline error recovery and workspace state consistency. Verifies that document processing failures are properly reported via ProcessingResult and do not corrupt workspace state or prevent other documents from being processed. """ from __future__ import annotations from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import pytest from threegpp_ai.lightrag.processor import ( DocumentProcessor, ProcessingResultStatus, ) from threegpp_ai.models import ConversionError, ExtractionError @pytest.fixture def mock_rag() -> MagicMock: """Create a mock TDocRAG instance.""" rag = MagicMock() rag.insert = AsyncMock() rag.start = AsyncMock() rag.stop = AsyncMock() rag.workspace_index = None return rag @pytest.fixture def processor(mock_rag: MagicMock) -> DocumentProcessor: """Create a DocumentProcessor with mocked RAG.""" proc = DocumentProcessor.__new__(DocumentProcessor) proc.config = MagicMock() proc.rag = mock_rag return proc class TestExtractionFailure: """Test extraction failures produce correct ProcessingResult.""" @pytest.mark.asyncio async def test_extraction_error_returns_error_status(self, processor: DocumentProcessor, tmp_path: Path) -> None: """ExtractionError from extract_document_structured yields ERROR result.""" docx_file = tmp_path / "broken.docx" docx_file.write_text("not a real docx") with patch( "threegpp_ai.lightrag.processor.extract_document_structured", side_effect=ExtractionError("Corrupt DOCX file"), ): result = await processor.process_file(docx_file) assert result.status == ProcessingResultStatus.ERROR assert "Corrupt DOCX file" in (result.error or "") assert result.file_path == str(docx_file) @pytest.mark.asyncio async def test_conversion_error_returns_error_status(self, processor: DocumentProcessor, tmp_path: Path) -> None: """ConversionError from Office-to-PDF conversion yields ERROR result.""" docx_file = tmp_path / "needs_conversion.docx" docx_file.write_bytes(b"\xd0\xcf\x11\xe0") # OLE2 header with patch( "threegpp_ai.lightrag.processor.extract_document_structured", side_effect=ConversionError("LibreOffice conversion failed"), ): result = await processor.process_file(docx_file) assert result.status == ProcessingResultStatus.ERROR assert "LibreOffice conversion failed" in (result.error or "") @pytest.mark.asyncio async def test_missing_file_extraction_error(self, processor: DocumentProcessor, tmp_path: Path) -> None: """Missing file triggers ExtractionError → ERROR status.""" missing_file = tmp_path / "nonexistent.pdf" # extract_document_structured raises ExtractionError for missing files with patch( "threegpp_ai.lightrag.processor.extract_document_structured", side_effect=ExtractionError(f"Source file does not exist: {missing_file}"), ): result = await processor.process_file(missing_file) assert result.status == ProcessingResultStatus.ERROR class TestRAGInsertionFailure: """Test RAG insertion failures are properly caught and reported.""" @pytest.mark.asyncio async def test_rag_insert_connection_error(self, processor: DocumentProcessor, mock_rag: MagicMock, tmp_path: Path) -> None: """Connection error during rag.insert() yields ERROR result.""" mock_rag.insert.side_effect = ConnectionError("Connection refused") fake_extraction = MagicMock() fake_extraction.content = "A" * 100 # Above 50-char minimum with patch( "threegpp_ai.lightrag.processor.extract_document_structured", return_value=fake_extraction, ): result = await processor.process_file(tmp_path / "test.pdf") assert result.status == ProcessingResultStatus.ERROR assert "Connection refused" in (result.error or "") assert result.chars_extracted == 100 @pytest.mark.asyncio async def test_rag_insert_timeout(self, processor: DocumentProcessor, mock_rag: MagicMock, tmp_path: Path) -> None: """Timeout during rag.insert() yields ERROR result.""" mock_rag.insert.side_effect = TimeoutError("LLM request timed out") fake_extraction = MagicMock() fake_extraction.content = "B" * 100 fake_extraction.table_count = 0 fake_extraction.figure_count = 0 fake_extraction.equation_count = 0 with patch( "threegpp_ai.lightrag.processor.extract_document_structured", return_value=fake_extraction, ): result = await processor.process_file(tmp_path / "test.pdf") assert result.status == ProcessingResultStatus.ERROR assert "timed out" in (result.error or "").lower() @pytest.mark.asyncio async def test_rag_insert_runtime_error(self, processor: DocumentProcessor, mock_rag: MagicMock, tmp_path: Path) -> None: """RuntimeError during rag.insert() yields ERROR result.""" mock_rag.insert.side_effect = RuntimeError("Embedding dimension mismatch") fake_extraction = MagicMock() fake_extraction.content = "C" * 100 fake_extraction.table_count = 2 fake_extraction.figure_count = 1 fake_extraction.equation_count = 0 with patch( "threegpp_ai.lightrag.processor.extract_document_structured", return_value=fake_extraction, ): result = await processor.process_file(tmp_path / "test.pdf") assert result.status == ProcessingResultStatus.ERROR assert result.chars_extracted == 100 class TestUnsupportedFormat: """Test unsupported file formats are skipped.""" @pytest.mark.asyncio async def test_unsupported_format_skipped(self, processor: DocumentProcessor, tmp_path: Path) -> None: """Unsupported format (.xyz) yields SKIPPED status.""" unsupported = tmp_path / "file.xyz" unsupported.write_text("content") result = await processor.process_file(unsupported) assert result.status == ProcessingResultStatus.SKIPPED assert "unsupported format" in (result.reason or "").lower() # rag.insert should NOT be called for skipped files processor.rag.insert.assert_not_called() class TestInsufficientContent: """Test documents with too little content are skipped.""" @pytest.mark.asyncio async def test_short_content_skipped(self, processor: DocumentProcessor, tmp_path: Path) -> None: """Document with < 50 chars yields SKIPPED status.""" fake_extraction = MagicMock() fake_extraction.content = "short" fake_extraction.table_count = 0 fake_extraction.figure_count = 0 fake_extraction.equation_count = 0 with patch( "threegpp_ai.lightrag.processor.extract_document_structured", return_value=fake_extraction, ): result = await processor.process_file(tmp_path / "short.pdf") assert result.status == ProcessingResultStatus.SKIPPED assert "insufficient content" in (result.reason or "").lower() assert result.chars_extracted == 5 # rag.insert should NOT be called for skipped files processor.rag.insert.assert_not_called() class TestSuccessfulProcessing: """Test successful processing produces correct result.""" @pytest.mark.asyncio async def test_success_with_extraction_metadata(self, processor: DocumentProcessor, mock_rag: MagicMock, tmp_path: Path) -> None: """Successful extraction and insertion yields SUCCESS with counts.""" fake_extraction = MagicMock() fake_extraction.content = "D" * 200 fake_extraction.table_count = 3 fake_extraction.figure_count = 2 fake_extraction.equation_count = 1 fake_extraction.tables = [MagicMock(element_id="t1"), MagicMock(element_id="t2"), MagicMock(element_id="t3")] fake_extraction.figures = [MagicMock(element_id="f1"), MagicMock(element_id="f2")] fake_extraction.equations = [MagicMock(element_id="e1")] with patch( "threegpp_ai.lightrag.processor.extract_document_structured", return_value=fake_extraction, ): result = await processor.process_file(tmp_path / "good.pdf") assert result.status == ProcessingResultStatus.SUCCESS assert result.chars_extracted == 200 assert result.table_count == 3 assert result.figure_count == 2 assert result.equation_count == 1 assert result.error is None processor.rag.insert.assert_called_once() class TestWorkspaceStateConsistency: """Test that processing failures do not affect subsequent documents.""" @pytest.mark.asyncio async def test_partial_failure_does_not_stop_processing(self, processor: DocumentProcessor, mock_rag: MagicMock, tmp_path: Path) -> None: """When one document fails, subsequent documents still process.""" files = [ tmp_path / "doc1.pdf", tmp_path / "doc2.pdf", tmp_path / "doc3.pdf", ] for f in files: f.write_text("content") call_count = 0 def fake_extract(file_path: Path, **kwargs: object) -> MagicMock: nonlocal call_count call_count += 1 fake = MagicMock() if call_count == 2: # Second document fails extraction raise ExtractionError("corrupt file") fake.content = f"Document {call_count} " * 10 # > 50 chars fake.table_count = 0 fake.figure_count = 0 fake.equation_count = 0 return fake results = [] for f in files: with patch( "threegpp_ai.lightrag.processor.extract_document_structured", side_effect=fake_extract, ): result = await processor.process_file(f) results.append(result) # First: success assert results[0].status == ProcessingResultStatus.SUCCESS # Second: extraction failure assert results[1].status == ProcessingResultStatus.ERROR assert "corrupt file" in (results[1].error or "") # Third: still succeeds despite second failure assert results[2].status == ProcessingResultStatus.SUCCESS Loading
packages/3gpp-ai/threegpp_ai/lightrag/processor.py +15 −6 Original line number Diff line number Diff line Loading @@ -12,6 +12,7 @@ from enum import StrEnum from pathlib import Path from typing import Any from threegpp_ai.models import ConversionError, ExtractionError from threegpp_ai.operations.conversion import OFFICE_FORMATS from threegpp_ai.operations.extraction import extract_document_structured Loading Loading @@ -118,11 +119,19 @@ class DocumentProcessor: ) # Extract using the unified pipeline try: extraction = extract_document_structured( file_path, metadata=None, # Metadata enrichment happens at this level, not in extraction extract_types=extract_types, ) except (ExtractionError, ConversionError) as e: logger.error("Extraction failed for %s: %s", file_path, e) return ProcessingResult( file_path=str(file_path), status=ProcessingResultStatus.ERROR, error=str(e), ) text = extraction.content Loading Loading @@ -206,7 +215,7 @@ class DocumentProcessor: figure_count=extraction.figure_count, equation_count=extraction.equation_count, ) except Exception as e: except (ConnectionError, TimeoutError, OSError, RuntimeError, ValueError) as e: logger.error("Failed to insert %s into LightRAG: %s", file_path, e) return ProcessingResult( file_path=str(file_path), Loading
packages/3gpp-ai/threegpp_ai/operations/summarize.py +6 −5 Original line number Diff line number Diff line Loading @@ -6,6 +6,7 @@ import json import logging import re from litellm import exceptions as litellm_exceptions from tdoc_crawler.utils.misc import utc_now from threegpp_ai.config import AiConfig Loading Loading @@ -143,7 +144,7 @@ def summarize_document( abstract_prompt = ABSTRACT_PROMPT.format(content=truncated_markdown) try: abstract = client.complete(abstract_prompt, model=config.llm_model) except Exception as exc: except (litellm_exceptions.OpenAIError, ValueError) as exc: msg = f"LLM endpoint is unreachable or misconfigured: {exc}" raise LlmConfigError(msg) from exc Loading @@ -155,7 +156,7 @@ def summarize_document( ) try: structured_payload = client.complete(structured_prompt) except Exception as exc: except (litellm_exceptions.OpenAIError, ValueError) as exc: msg = f"LLM endpoint is unreachable or misconfigured: {exc}" raise LlmConfigError(msg) from exc Loading Loading @@ -333,7 +334,7 @@ def summarize_tdoc( ) try: summary = client.complete(summary_prompt, model=config.llm_model, max_tokens=max_words * 4) except Exception as exc: except (litellm_exceptions.OpenAIError, ValueError) as exc: msg = f"LLM endpoint is unreachable or misconfigured: {exc}" raise LlmConfigError(msg) from exc Loading @@ -343,8 +344,8 @@ def summarize_tdoc( try: keywords_raw = client.complete(keywords_prompt, model=config.llm_model, max_tokens=200) keywords = _parse_keywords(keywords_raw) except Exception as e: logger.warning(f"Keyword extraction failed: {e}") except (litellm_exceptions.OpenAIError, ValueError) as exc: logger.warning(f"Keyword extraction failed: {exc}") keywords = [] # Count actual words Loading
tests/ai/test_pipeline_errors.py 0 → 100644 +270 −0 Original line number Diff line number Diff line """Tests for AI pipeline error recovery and workspace state consistency. Verifies that document processing failures are properly reported via ProcessingResult and do not corrupt workspace state or prevent other documents from being processed. """ from __future__ import annotations from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import pytest from threegpp_ai.lightrag.processor import ( DocumentProcessor, ProcessingResultStatus, ) from threegpp_ai.models import ConversionError, ExtractionError @pytest.fixture def mock_rag() -> MagicMock: """Create a mock TDocRAG instance.""" rag = MagicMock() rag.insert = AsyncMock() rag.start = AsyncMock() rag.stop = AsyncMock() rag.workspace_index = None return rag @pytest.fixture def processor(mock_rag: MagicMock) -> DocumentProcessor: """Create a DocumentProcessor with mocked RAG.""" proc = DocumentProcessor.__new__(DocumentProcessor) proc.config = MagicMock() proc.rag = mock_rag return proc class TestExtractionFailure: """Test extraction failures produce correct ProcessingResult.""" @pytest.mark.asyncio async def test_extraction_error_returns_error_status(self, processor: DocumentProcessor, tmp_path: Path) -> None: """ExtractionError from extract_document_structured yields ERROR result.""" docx_file = tmp_path / "broken.docx" docx_file.write_text("not a real docx") with patch( "threegpp_ai.lightrag.processor.extract_document_structured", side_effect=ExtractionError("Corrupt DOCX file"), ): result = await processor.process_file(docx_file) assert result.status == ProcessingResultStatus.ERROR assert "Corrupt DOCX file" in (result.error or "") assert result.file_path == str(docx_file) @pytest.mark.asyncio async def test_conversion_error_returns_error_status(self, processor: DocumentProcessor, tmp_path: Path) -> None: """ConversionError from Office-to-PDF conversion yields ERROR result.""" docx_file = tmp_path / "needs_conversion.docx" docx_file.write_bytes(b"\xd0\xcf\x11\xe0") # OLE2 header with patch( "threegpp_ai.lightrag.processor.extract_document_structured", side_effect=ConversionError("LibreOffice conversion failed"), ): result = await processor.process_file(docx_file) assert result.status == ProcessingResultStatus.ERROR assert "LibreOffice conversion failed" in (result.error or "") @pytest.mark.asyncio async def test_missing_file_extraction_error(self, processor: DocumentProcessor, tmp_path: Path) -> None: """Missing file triggers ExtractionError → ERROR status.""" missing_file = tmp_path / "nonexistent.pdf" # extract_document_structured raises ExtractionError for missing files with patch( "threegpp_ai.lightrag.processor.extract_document_structured", side_effect=ExtractionError(f"Source file does not exist: {missing_file}"), ): result = await processor.process_file(missing_file) assert result.status == ProcessingResultStatus.ERROR class TestRAGInsertionFailure: """Test RAG insertion failures are properly caught and reported.""" @pytest.mark.asyncio async def test_rag_insert_connection_error(self, processor: DocumentProcessor, mock_rag: MagicMock, tmp_path: Path) -> None: """Connection error during rag.insert() yields ERROR result.""" mock_rag.insert.side_effect = ConnectionError("Connection refused") fake_extraction = MagicMock() fake_extraction.content = "A" * 100 # Above 50-char minimum with patch( "threegpp_ai.lightrag.processor.extract_document_structured", return_value=fake_extraction, ): result = await processor.process_file(tmp_path / "test.pdf") assert result.status == ProcessingResultStatus.ERROR assert "Connection refused" in (result.error or "") assert result.chars_extracted == 100 @pytest.mark.asyncio async def test_rag_insert_timeout(self, processor: DocumentProcessor, mock_rag: MagicMock, tmp_path: Path) -> None: """Timeout during rag.insert() yields ERROR result.""" mock_rag.insert.side_effect = TimeoutError("LLM request timed out") fake_extraction = MagicMock() fake_extraction.content = "B" * 100 fake_extraction.table_count = 0 fake_extraction.figure_count = 0 fake_extraction.equation_count = 0 with patch( "threegpp_ai.lightrag.processor.extract_document_structured", return_value=fake_extraction, ): result = await processor.process_file(tmp_path / "test.pdf") assert result.status == ProcessingResultStatus.ERROR assert "timed out" in (result.error or "").lower() @pytest.mark.asyncio async def test_rag_insert_runtime_error(self, processor: DocumentProcessor, mock_rag: MagicMock, tmp_path: Path) -> None: """RuntimeError during rag.insert() yields ERROR result.""" mock_rag.insert.side_effect = RuntimeError("Embedding dimension mismatch") fake_extraction = MagicMock() fake_extraction.content = "C" * 100 fake_extraction.table_count = 2 fake_extraction.figure_count = 1 fake_extraction.equation_count = 0 with patch( "threegpp_ai.lightrag.processor.extract_document_structured", return_value=fake_extraction, ): result = await processor.process_file(tmp_path / "test.pdf") assert result.status == ProcessingResultStatus.ERROR assert result.chars_extracted == 100 class TestUnsupportedFormat: """Test unsupported file formats are skipped.""" @pytest.mark.asyncio async def test_unsupported_format_skipped(self, processor: DocumentProcessor, tmp_path: Path) -> None: """Unsupported format (.xyz) yields SKIPPED status.""" unsupported = tmp_path / "file.xyz" unsupported.write_text("content") result = await processor.process_file(unsupported) assert result.status == ProcessingResultStatus.SKIPPED assert "unsupported format" in (result.reason or "").lower() # rag.insert should NOT be called for skipped files processor.rag.insert.assert_not_called() class TestInsufficientContent: """Test documents with too little content are skipped.""" @pytest.mark.asyncio async def test_short_content_skipped(self, processor: DocumentProcessor, tmp_path: Path) -> None: """Document with < 50 chars yields SKIPPED status.""" fake_extraction = MagicMock() fake_extraction.content = "short" fake_extraction.table_count = 0 fake_extraction.figure_count = 0 fake_extraction.equation_count = 0 with patch( "threegpp_ai.lightrag.processor.extract_document_structured", return_value=fake_extraction, ): result = await processor.process_file(tmp_path / "short.pdf") assert result.status == ProcessingResultStatus.SKIPPED assert "insufficient content" in (result.reason or "").lower() assert result.chars_extracted == 5 # rag.insert should NOT be called for skipped files processor.rag.insert.assert_not_called() class TestSuccessfulProcessing: """Test successful processing produces correct result.""" @pytest.mark.asyncio async def test_success_with_extraction_metadata(self, processor: DocumentProcessor, mock_rag: MagicMock, tmp_path: Path) -> None: """Successful extraction and insertion yields SUCCESS with counts.""" fake_extraction = MagicMock() fake_extraction.content = "D" * 200 fake_extraction.table_count = 3 fake_extraction.figure_count = 2 fake_extraction.equation_count = 1 fake_extraction.tables = [MagicMock(element_id="t1"), MagicMock(element_id="t2"), MagicMock(element_id="t3")] fake_extraction.figures = [MagicMock(element_id="f1"), MagicMock(element_id="f2")] fake_extraction.equations = [MagicMock(element_id="e1")] with patch( "threegpp_ai.lightrag.processor.extract_document_structured", return_value=fake_extraction, ): result = await processor.process_file(tmp_path / "good.pdf") assert result.status == ProcessingResultStatus.SUCCESS assert result.chars_extracted == 200 assert result.table_count == 3 assert result.figure_count == 2 assert result.equation_count == 1 assert result.error is None processor.rag.insert.assert_called_once() class TestWorkspaceStateConsistency: """Test that processing failures do not affect subsequent documents.""" @pytest.mark.asyncio async def test_partial_failure_does_not_stop_processing(self, processor: DocumentProcessor, mock_rag: MagicMock, tmp_path: Path) -> None: """When one document fails, subsequent documents still process.""" files = [ tmp_path / "doc1.pdf", tmp_path / "doc2.pdf", tmp_path / "doc3.pdf", ] for f in files: f.write_text("content") call_count = 0 def fake_extract(file_path: Path, **kwargs: object) -> MagicMock: nonlocal call_count call_count += 1 fake = MagicMock() if call_count == 2: # Second document fails extraction raise ExtractionError("corrupt file") fake.content = f"Document {call_count} " * 10 # > 50 chars fake.table_count = 0 fake.figure_count = 0 fake.equation_count = 0 return fake results = [] for f in files: with patch( "threegpp_ai.lightrag.processor.extract_document_structured", side_effect=fake_extract, ): result = await processor.process_file(f) results.append(result) # First: success assert results[0].status == ProcessingResultStatus.SUCCESS # Second: extraction failure assert results[1].status == ProcessingResultStatus.ERROR assert "corrupt file" in (results[1].error or "") # Third: still succeeds despite second failure assert results[2].status == ProcessingResultStatus.SUCCESS