Commit 855dee62 authored by Jan Reimes's avatar Jan Reimes
Browse files

feat(cli): add overall timeout option for crawl duration control

- Introduced `--overall-timeout` option to limit total crawl duration.
- Updated `TDocCrawlConfig` to include `overall_timeout` field.
- Adjusted tests to utilize serial execution and set timeout for reliability.
- Removed unused mock runner class from tests for cleaner code.
parent eccea8b9
Loading
Loading
Loading
Loading

.python-version

deleted100644 → 0
+0 −1
Original line number Diff line number Diff line
3.14
 No newline at end of file
+1 −0
Original line number Diff line number Diff line
@@ -51,6 +51,7 @@ dev = [
    "mdformat>=1.0.0",
    "isort>=7.0.0",
    "undersort>=0.1.5",
    "beads-mcp>=0.44.0",
]

[build-system]
+36 −138
Original line number Diff line number Diff line
@@ -14,24 +14,19 @@ T = TypeVar("T")


class SerialFuture(Future[T]):
    """Future for serial execution that runs task on first result() call.

    This future does not execute the task immediately when created. Instead,
    the task is executed lazily when result() is first called. Subsequent
    calls return the cached result.

    Attributes:
        _fn: Callable to execute
        _args: Positional arguments for callable
        _kwargs: Keyword arguments for callable
        _result: Cached result after execution
        _exception: Stored exception if task failed
        _done: Flag indicating if task has been executed
        _callbacks: List of callbacks to invoke after execution
    """Future for serial execution that executes immediately.

    This future executes the task immediately upon creation to ensure
    compatibility with as_completed(). The result is cached and returned
    from result() calls.

    This differs from lazy execution but is necessary for as_completed()
    to work correctly, as it waits on internal completion events that are
    only triggered by set_result() or set_exception().
    """

    def __init__(self, fn: Callable[..., T], args: tuple[Any, ...], kwargs: dict[str, Any]) -> None:
        """Initialize serial future with callable and arguments.
        """Initialize and immediately execute serial future.

        Args:
            fn: Callable to execute
@@ -39,118 +34,23 @@ class SerialFuture(Future[T]):
            kwargs: Keyword arguments
        """
        super().__init__()
        self._fn = fn
        self._args = args
        self._kwargs = kwargs
        self._result: T | None = None
        self._exception: BaseException | None = None
        self._done = False
        self._callbacks: list[Callable[[Future[T]], object]] = []

    def result(self, timeout: float | None = None) -> T:
        """Execute task if not done, return result.

        The timeout parameter is ignored for serial execution as tasks
        run synchronously in the calling thread.

        Args:
            timeout: Ignored for serial execution

        Returns:
            Task result

        Raises:
            Exception: If task raised an exception
        """
        if not self._done:
        # Execute immediately and set result/exception
        # This triggers the internal _Waiter event that as_completed() depends on
        try:
                self._result = self._fn(*self._args, **self._kwargs)
            self.set_running_or_notify_cancel()
            result = fn(*args, **kwargs)
            self.set_result(result)
        except BaseException as e:
                self._exception = e
            finally:
                self._done = True
                # Invoke callbacks after task completion
                for callback in self._callbacks:
                    try:
                        callback(self)
                    except Exception:
                        logger.exception("Exception in callback")

        if self._exception:
            raise self._exception
        return self._result  # type: ignore[return-value]

    def exception(self, timeout: float | None = None) -> BaseException | None:
        """Return exception raised by task, or None.

        Args:
            timeout: Ignored for serial execution

        Returns:
            Exception if task failed, None otherwise
        """
        if not self._done:
            with contextlib.suppress(BaseException):
                self.result()
        return self._exception

    def done(self) -> bool:
        """Check if task has been executed.

        Returns:
            True if task has been executed, False otherwise
        """
        return self._done

    def cancel(self) -> bool:
        """Attempt to cancel task.

        Serial tasks cannot be cancelled as they execute synchronously.

        Returns:
            Always False
        """
        return False

    def cancelled(self) -> bool:
        """Check if task was cancelled.

        Returns:
            Always False (serial tasks cannot be cancelled)
        """
        return False

    def running(self) -> bool:
        """Check if task is currently running.

        Returns:
            False (serial tasks complete before returning)
        """
        return False

    def add_done_callback(self, fn: Callable[[Future[T]], object]) -> None:
        """Add callback to invoke after task completion.

        If task is already done, callback is invoked immediately.
        Otherwise, it's stored and invoked after task execution.

        Args:
            fn: Callback function taking this future as argument
        """
        if self._done:
            try:
                fn(self)
            except Exception:
                logger.exception("Exception in callback")
        else:
            self._callbacks.append(fn)
            self.set_exception(e)


class SerialPoolExecutor(Executor):
    """Executor that runs tasks sequentially in the main thread.
    """Executor that runs tasks immediately and sequentially in the main thread.

    Tasks are collected via submit() but not executed until shutdown()
    is called with wait=True (or when used as context manager).
    Tasks are executed synchronously when submit() is called. This ensures
    compatibility with as_completed() and other concurrent.futures utilities
    that depend on internal completion events.

    This is useful for:
    - Debugging parallel code by forcing serial execution
@@ -160,7 +60,7 @@ class SerialPoolExecutor(Executor):
    Example:
        >>> with SerialPoolExecutor() as executor:
        ...     futures = [executor.submit(func, i) for i in range(10)]
        ...     results = [f.result() for f in futures]
        ...     results = [f.result() for f in futures]  # Already executed
    """

    def __init__(self, max_workers: int | None = None, **kwargs: Any) -> None:
@@ -176,11 +76,11 @@ class SerialPoolExecutor(Executor):
        self._shutdown = False

    def submit(self, fn: Callable[..., T], /, *args: Any, **kwargs: Any) -> Future[T]:
        """Submit task for serial execution.
        """Submit task for immediate serial execution.

        Task is not executed immediately. It will be executed when
        result() is called on the returned future, or when shutdown(wait=True)
        is called.
        Task is executed synchronously in the calling thread before this
        method returns. The returned future already contains the result
        or exception.

        Args:
            fn: Callable to execute
@@ -188,7 +88,7 @@ class SerialPoolExecutor(Executor):
            **kwargs: Keyword arguments

        Returns:
            SerialFuture that will execute task lazily
            SerialFuture with task already executed

        Raises:
            RuntimeError: If executor is shutdown
@@ -202,20 +102,16 @@ class SerialPoolExecutor(Executor):
        return future

    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None:
        """Shutdown executor and optionally execute pending tasks.
        """Shutdown executor.

        All tasks are already executed immediately upon submit(), so this
        method only sets the shutdown flag.

        Args:
            wait: If True, execute all pending tasks. If False, leave tasks unexecuted
            wait: Ignored (tasks already executed)
            cancel_futures: Ignored (serial tasks cannot be cancelled)
        """
        self._shutdown = True
        if wait:
            # Execute all pending tasks
            for future in self._pending:
                if not future.done():
                    with contextlib.suppress(Exception):
                        # Exceptions are stored in future, will be raised when result() called
                        future.result()

    def __enter__(self) -> SerialPoolExecutor:
        """Enter context manager.
@@ -226,7 +122,9 @@ class SerialPoolExecutor(Executor):
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Exit context manager, executing all pending tasks.
        """Exit context manager.

        All tasks are already executed, so this just calls shutdown().

        Args:
            exc_type: Exception type if exception occurred
+2 −0
Original line number Diff line number Diff line
@@ -53,6 +53,7 @@ def crawl_tdocs(
    limit_meetings_per_wg: int | None = typer.Option(None, "--limit-meetings-per-wg", help="Limit meetings per working group"),
    limit_wgs: int | None = typer.Option(None, "--limit-wgs", help="Limit number of working groups"),
    workers: int = typer.Option(4, "--workers", help="Number of parallel subinterpreter workers"),
    overall_timeout: int | None = typer.Option(None, "--overall-timeout", help="Maximum total crawl duration in seconds (None = unlimited)"),
    max_retries: int = typer.Option(3, "--max-retries", help="HTTP connection retry attempts"),
    timeout: int = typer.Option(30, "--timeout", help="HTTP request timeout seconds"),
    verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging"),
@@ -71,6 +72,7 @@ def crawl_tdocs(
        incremental=incremental,
        force_revalidate=False,
        workers=workers,
        overall_timeout=overall_timeout,
        max_retries=max_retries,
        timeout=timeout,
        verbose=verbose,
+68 −51
Original line number Diff line number Diff line
"""TDoc crawler orchestrating asynchronous subinterpreter workers."""
"""TDoc crawler orchestrating parallel workers."""

from __future__ import annotations

import asyncio
import logging
import time
from collections.abc import Callable
from concurrent.futures import Future, as_completed
from dataclasses import dataclass

from tdoc_crawler.crawlers.executor_adapter import Runner
from tdoc_crawler.crawlers.parallel import fetch_meeting_tdocs
from tdoc_crawler.database import TDocDatabase
from tdoc_crawler.models import (CrawlLimits, MeetingMetadata,
                                 MeetingQueryConfig, SortOrder,
                                 TDocCrawlConfig, TDocMetadata, WorkingGroup)
from tdoc_crawler.models import CrawlLimits, MeetingMetadata, MeetingQueryConfig, SortOrder, TDocCrawlConfig, TDocMetadata, WorkingGroup

logger = logging.getLogger(__name__)

@@ -165,7 +165,7 @@ class TDocCrawler:

        return filtered

    async def _crawl_meetings_parallel(
    def _crawl_meetings_parallel(
        self,
        meetings: list,
        config: TDocCrawlConfig,
@@ -185,14 +185,26 @@ class TDocCrawler:

        target_tuple = tuple(targets) if targets else None

        runner = Runner(workers=config.workers)
        with runner.start():
            tasks: dict[asyncio.Task[tuple[str, ...]], MeetingMetadata] = {}
        # Select executor based on worker count
        if config.workers == 1:
            executor = create_executor("serial")
            executor_type = "serial"
        else:
            # Auto-selects InterpreterPoolExecutor (Python 3.14+) or ProcessPoolExecutor (fallback)
            executor = create_executor("subinterpreter", max_workers=config.workers)
            executor_type = "subinterpreter/process"

        logger.debug("Using %s executor with %d worker(s)", executor_type, config.workers)

        # Track start time for overall timeout enforcement
        start_time = time.monotonic()

        with executor:
            futures: dict[Future[tuple[str, ...]], MeetingMetadata] = {}

            for meeting in meetings:
                base_url = meeting.files_url.rstrip("/") + "/"
                task = asyncio.create_task(
                    runner.run(
                future = executor.submit(
                    fetch_meeting_tdocs,
                    meeting.meeting_id,
                    base_url,
@@ -204,16 +216,13 @@ class TDocCrawler:
                    config.http_cache.ttl,
                    config.http_cache.refresh_ttl_on_access,
                )
                )
                tasks[task] = meeting
                futures[future] = meeting

            pending = set(tasks.keys())
            while pending:
                done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
                for task in done:
                    meeting = tasks[task]
            # Process results as they complete
            for future in as_completed(futures):
                meeting = futures[future]
                try:
                        payloads = task.result()
                    payloads = future.result()
                except Exception as exc:  # pragma: no cover - defensive, depends on network
                    short_name = meeting.short_name or str(meeting.meeting_id)
                    message = f"Meeting {short_name}: {exc}"
@@ -231,12 +240,20 @@ class TDocCrawler:
                    max_tdocs,
                )

                    if limit_reached or (targets is not None and not targets):
                        for pending_task in pending:
                            pending_task.cancel()
                        if pending:
                            await asyncio.gather(*pending, return_exceptions=True)
                        return errors
                # Check if overall timeout exceeded
                elapsed = time.monotonic() - start_time
                timeout_exceeded = config.overall_timeout is not None and elapsed >= config.overall_timeout

                if limit_reached or (targets is not None and not targets) or timeout_exceeded:
                    # Cancel remaining futures
                    for pending_future in futures:
                        if not pending_future.done():
                            pending_future.cancel()
                    if timeout_exceeded:
                        message = f"Overall timeout of {config.overall_timeout}s exceeded after {elapsed:.1f}s"
                        logger.warning(message)
                        errors.append(message)
                    break

        return errors

Loading