Commit 186685ec authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(runner): add async integration adapter for pool executors

- Introduce `Runner` class for async-compatible interface.
- Support async/await for synchronous functions.
- Implement context manager for executor lifecycle management.
- Allow flexible executor types (multiprocessing, threading, etc.).
- Provide usage examples and API reference in documentation.
parent 0515da64
Loading
Loading
Loading
Loading
+157 −0
Original line number Diff line number Diff line
@@ -111,6 +111,163 @@ When `InterpreterPoolExecutor` is not available (Python < 3.14), the factory aut
- Python 3.14+ recommended for subinterpreter support
- No external dependencies (stdlib only)

## Runner: Async Integration Adapter

The `Runner` class provides an async-compatible interface for `pool_executors`, enabling seamless integration with asyncio-based applications. It mimics the `aiointerpreters.Runner` API while leveraging the underlying `pool_executors` for task execution.

### Key Features

- **Async/Await Support**: Run synchronous functions asynchronously using `await runner.run(func, *args, **kwargs)`
- **Context Manager**: Manage executor lifecycle with `with runner.start():`
- **Flexible Executor Types**: Use any `pool_executors` backend (multiprocessing, threading, subinterpreter, or serial)
- **Thread-Safe**: Designed for use in async applications with proper event loop handling

### Usage Examples

#### Basic Async Integration

```python
from pool_executors import Runner
import asyncio

def cpu_intensive_task(x: int) -> int:
    return x * x

async def main():
    # Create runner with 4 workers using multiprocessing
    runner = Runner(workers=4, executor_type="multiprocessing")

    # Use context manager to start executor
    async with runner.start():
        # Run tasks asynchronously
        results = await asyncio.gather(
            runner.run(cpu_intensive_task, 2),
            runner.run(cpu_intensive_task, 3),
            runner.run(cpu_intensive_task, 4)
        )
        print(results)  # [4, 9, 16]

asyncio.run(main())
```

#### Using Different Executor Types

```python
from pool_executors import Runner

async def io_bound_task(url: str) -> str:
    import requests
    return requests.get(url).text[:100]  # Return first 100 chars

async def main():
    # Use threading for I/O-bound tasks
    runner = Runner(workers=8, executor_type="threading")

    async with runner.start():
        result = await runner.run(io_bound_task, "https://example.com")
        print(result)

asyncio.run(main())
```

#### Integration with Existing Async Code

```python
from pool_executors import Runner
import asyncio

def process_data(data: list) -> list:
    return [x * 2 for x in data]

async def fetch_data() -> list:
    # Simulate async data fetching
    await asyncio.sleep(0.1)
    return [1, 2, 3, 4, 5]

async def main():
    runner = Runner(workers=2, executor_type="subinterpreter")

    async with runner.start():
        # Fetch data asynchronously
        data = await fetch_data()

        # Process data in executor
        processed = await runner.run(process_data, data)
        print(processed)  # [2, 4, 6, 8, 10]

asyncio.run(main())
```

### Runner API Reference

| Method/Property | Description |
|-----------------|-------------|
| `Runner(workers=4, executor_type="multiprocessing")` | Initialize runner with specified workers and executor type |
| `runner.start()` | Returns context manager for executor lifecycle |
| `await runner.run(func, *args, **kwargs)` | Run function in executor and return awaitable result |
| `runner.executor_type` | Get the executor type (read-only) |
| `runner.workers` | Get the number of workers (read-only) |

### When to Use Runner

Use the `Runner` class when:

1. **Integrating with async applications**: When you need to run synchronous code from async contexts
1. **Migrating from aiointerpreters**: For compatibility with code written for `aiointerpreters.Runner`
1. **Leveraging async/await**: When you want to use async/await syntax with pool executors
1. **Managing complex workflows**: For applications that mix async I/O with CPU-bound tasks

### Advanced Usage

#### Error Handling

```python
from pool_executors import Runner
import asyncio

def might_fail(x: int) -> int:
    if x < 0:
        raise ValueError("Negative values not allowed")
    return x * x

async def main():
    runner = Runner(workers=2)

    async with runner.start():
        try:
            result = await runner.run(might_fail, -5)
        except ValueError as e:
            print(f"Error: {e}")  # Error: Negative values not allowed

asyncio.run(main())
```

#### Custom Initialization

```python
from pool_executors import Runner
import asyncio

def init_db_connection():
    # Initialize database connection
    pass

def query_database(user_id: int) -> dict:
    # Use initialized connection
    return {"id": user_id, "name": "Test User"}

async def main():
    # Initialize with custom setup
    runner = Runner(workers=4, executor_type="multiprocessing")

    async with runner.start():
        # Run with initialized state
        result = await runner.run(query_database, 42)
        print(result)  # {"id": 42, "name": "Test User"}

asyncio.run(main())
```

## License

MIT License - see LICENSE file for details.
+2 −1
Original line number Diff line number Diff line
"""Executor pool extensions with serial execution support."""

from .factory import create_executor
from .runner import Runner
from .serial import SerialPoolExecutor
from .types import ExecutorType

__version__ = "0.1.0"

__all__ = ["ExecutorType", "SerialPoolExecutor", "create_executor"]
__all__ = ["ExecutorType", "Runner", "SerialPoolExecutor", "create_executor"]
+102 −101
Original line number Diff line number Diff line
@@ -7,7 +7,8 @@ from collections.abc import Callable
from concurrent.futures import Executor
from typing import Any, TypeVar

from pool_executors.pool_executors import ExecutorType, create_executor
from .factory import create_executor
from .types import ExecutorType

T = TypeVar("T")

+0 −2
Original line number Diff line number Diff line
@@ -24,7 +24,6 @@ __all__ = [
    "PortalAuthenticationError",
    "PortalParsingError",
    "PortalSession",
    "Runner",
    "TDocCrawlResult",
    "TDocCrawler",
    "WhatTheSpecResolutionError",
@@ -51,7 +50,6 @@ _ATTR_MODULES: dict[str, tuple[str, str]] = {
    "PortalAuthenticationError": ("tdoc_crawler.crawlers.portal", "PortalAuthenticationError"),
    "PortalParsingError": ("tdoc_crawler.crawlers.portal", "PortalParsingError"),
    "PortalSession": ("tdoc_crawler.crawlers.portal", "PortalSession"),
    "Runner": ("tdoc_crawler.crawlers.executor_adapter", "Runner"),
    "TDOC_PATTERN": ("tdoc_crawler.crawlers.constants", "TDOC_PATTERN"),
    "TDOC_PATTERN_STR": ("tdoc_crawler.crawlers.constants", "TDOC_PATTERN_STR"),
    "TDOC_SUBDIRS": ("tdoc_crawler.crawlers.constants", "TDOC_SUBDIRS"),
+1 −1
Original line number Diff line number Diff line
@@ -8,7 +8,7 @@ import logging
from collections.abc import Callable
from dataclasses import dataclass

from tdoc_crawler.crawlers.executor_adapter import Runner
from pool_executors.pool_executors import Runner
from tdoc_crawler.crawlers.parallel import fetch_meeting_document_list_subinterpreter, fetch_meeting_tdocs
from tdoc_crawler.database import TDocDatabase
from tdoc_crawler.models import CrawlLimits, MeetingMetadata, MeetingQueryConfig, SortOrder, TDocCrawlConfig, TDocMetadata, WorkingGroup
Loading