Commit 29c3e396 authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(tests): enhance testing coverage for Adobe and CloudConvert providers

* Add MockAsset initialization for better test asset handling in test_adobe.py
* Refactor DummyJobResult to DummyCreatePDFResult for clarity in test_adobe.py
* Introduce DummyPDFServicesResponse to encapsulate Adobe SDK results in test_adobe.py
* Create integration tests for live API calls to Adobe, CloudConvert, and Zamzar providers in test_provider_integration.py
parent 51859736
Loading
Loading
Loading
Loading
+26 −6
Original line number Diff line number Diff line
@@ -18,6 +18,9 @@ from pdf_remote_converter.providers.adobe import AdobeProvider
class MockAsset:
    """Mock Asset class for testing."""

    def __init__(self) -> None:
        self.asset_id = "mock-asset-id"


class DummyResultResource:
    """Minimal resource wrapper for Adobe SDK result."""
@@ -32,16 +35,32 @@ class DummyStreamAsset:
    def __init__(self, data: bytes) -> None:
        self._data = data

    def read(self) -> bytes:
    def get_input_stream(self) -> bytes:
        """Return stream data."""
        return self._data


class DummyJobResult:
    """Minimal job result wrapper for Adobe SDK result."""
class DummyCreatePDFResult:
    """Minimal CreatePDFResult wrapper for Adobe SDK."""

    def __init__(self, data: bytes) -> None:
        self._asset = MockAsset()
        self._data = data

    def get_asset(self) -> MockAsset:
        """Return the asset."""
        return self._asset


class DummyPDFServicesResponse:
    """Minimal PDFServicesResponse wrapper for Adobe SDK."""

    def __init__(self, data: bytes) -> None:
        self.resource = DummyResultResource(data)
        self._result = DummyCreatePDFResult(data)

    def get_result(self) -> DummyCreatePDFResult:
        """Return the job result."""
        return self._result


def _build_provider() -> AdobeProvider:
@@ -65,7 +84,7 @@ def test_adobe_convert_success(tmp_path: Path) -> None:
    input_path.write_text("hello", encoding="utf-8")
    output_path = tmp_path / "output.pdf"

    dummy_result = DummyJobResult(b"%PDF")
    dummy_response = DummyPDFServicesResponse(b"%PDF")

    with (
        mock.patch("pdf_remote_converter.providers.adobe.PDFServices") as pdf_services,
@@ -76,7 +95,8 @@ def test_adobe_convert_success(tmp_path: Path) -> None:
        pdf_instance = pdf_services.return_value
        pdf_instance.upload.return_value = mock.MagicMock()
        pdf_instance.submit.return_value = "polling-url"
        pdf_instance.get_job_result.return_value = dummy_result
        pdf_instance.get_job_result.return_value = dummy_response
        pdf_instance.get_content.return_value = DummyStreamAsset(b"%PDF")

        provider = _build_provider()
        result = provider.convert(input_path, output_path)
+7 −1
Original line number Diff line number Diff line
@@ -27,6 +27,12 @@ class DummyClient:
            raise AssertionError(msg)
        return self._responses.pop(0)

    def get(self, url: str, **kwargs: object) -> httpx.Response:
        if not self._responses:
            msg = f"Unexpected GET request: {url}"
            raise AssertionError(msg)
        return self._responses.pop(0)


def test_cloudconvert_provider_initialization() -> None:
    """Ensure CloudConvert provider initializes with defaults."""
@@ -80,7 +86,7 @@ def test_cloudconvert_convert_success(tmp_path: Path) -> None:
            }
        },
    )
    download_response = httpx.Response(200, content=b"%PDF")
    download_response = httpx.Response(200, content=b"%PDF", request=httpx.Request("GET", "https://download"))

    client = DummyClient([create_response, upload_response, poll_response, download_response])
    provider = CloudConvertProvider(api_key="token", http_client=client)
+128 −0
Original line number Diff line number Diff line
"""Integration tests for actual API calls to providers.

These tests are skipped if .env file is not present or required API keys are missing.
"""

from __future__ import annotations

from pathlib import Path

import pytest

from pdf_remote_converter.providers.adobe import AdobeProvider
from pdf_remote_converter.providers.cloudconvert import CloudConvertProvider
from pdf_remote_converter.providers.zamzar import ZamzarProvider
from tests.conftest import EnvSettings


def test_cloudconvert_live_conversion(
    require_env_settings: EnvSettings,
    example_docs: list[Path],
    converted_dir: Path,
) -> None:
    """Test actual CloudConvert API conversion."""
    if not require_env_settings.cloudconvert_api_key:
        pytest.skip("CLOUDCONVERT_API_KEY not set")

    if not example_docs:
        pytest.skip("No example documents available")

    provider_dir = converted_dir / "cloudconvert"
    provider_dir.mkdir(parents=True, exist_ok=True)

    # Check if all outputs already exist
    all_exist = all((provider_dir / f"{doc.stem}.pdf").exists() for doc in example_docs)
    if all_exist:
        pytest.skip("All CloudConvert outputs already exist, skipping API calls")

    provider = CloudConvertProvider(api_key=require_env_settings.cloudconvert_api_key)

    for input_path in example_docs:
        output_path = provider_dir / f"{input_path.stem}.pdf"
        if output_path.exists():
            continue

        result = provider.convert(input_path, output_path)

        assert output_path.exists()
        assert output_path.stat().st_size > 0
        assert result.provider == "cloudconvert"
        assert result.credits_used >= 1


def test_adobe_live_conversion(
    require_env_settings: EnvSettings,
    example_docs: list[Path],
    converted_dir: Path,
) -> None:
    """Test actual Adobe PDF Services API conversion."""
    if not require_env_settings.adobe_client_id or not require_env_settings.adobe_client_secret:
        pytest.skip("Adobe credentials not set (ADOBE_CLIENT_ID, ADOBE_CLIENT_SECRET)")

    if not example_docs:
        pytest.skip("No example documents available")

    provider_dir = converted_dir / "adobe"
    provider_dir.mkdir(parents=True, exist_ok=True)

    # Check if all outputs already exist
    all_exist = all((provider_dir / f"{doc.stem}.pdf").exists() for doc in example_docs)
    if all_exist:
        pytest.skip("All Adobe outputs already exist, skipping API calls")

    provider = AdobeProvider(
        client_id=require_env_settings.adobe_client_id,
        client_secret=require_env_settings.adobe_client_secret,
    )

    for input_path in example_docs:
        output_path = provider_dir / f"{input_path.stem}.pdf"
        if output_path.exists():
            continue

        result = provider.convert(input_path, output_path)

        assert output_path.exists()
        assert output_path.stat().st_size > 0
        assert result.provider == "adobe"
        assert result.credits_used >= 1


def test_zamzar_live_conversion(
    require_env_settings: EnvSettings,
    example_docs: list[Path],
    converted_dir: Path,
) -> None:
    """Test actual Zamzar API conversion."""
    if not require_env_settings.zamzar_api_key:
        pytest.skip("ZAMZAR_API_KEY not set")

    if not example_docs:
        pytest.skip("No example documents available")

    provider_dir = converted_dir / "zamzar"
    provider_dir.mkdir(parents=True, exist_ok=True)

    # Filter out files larger than Zamzar's 1MB limit
    small_docs = [doc for doc in example_docs if doc.stat().st_size <= 1024 * 1024]
    if not small_docs:
        pytest.skip("All example documents exceed Zamzar's 1MB file size limit")

    # Check if all outputs already exist
    all_exist = all((provider_dir / f"{doc.stem}.pdf").exists() for doc in small_docs)
    if all_exist:
        pytest.skip("All Zamzar outputs already exist, skipping API calls")

    provider = ZamzarProvider(api_key=require_env_settings.zamzar_api_key)

    for input_path in small_docs:
        output_path = provider_dir / f"{input_path.stem}.pdf"
        if output_path.exists():
            continue

        result = provider.convert(input_path, output_path)

        assert output_path.exists()
        assert output_path.stat().st_size > 0
        assert result.provider == "zamzar"
        assert result.credits_used >= 1