From 045b6cc08c4a963bd4ad7996bf69068881241121 Mon Sep 17 00:00:00 2001 From: plutopulp Date: Tue, 16 Dec 2025 16:33:13 +0100 Subject: [PATCH 1/7] Add task tracking infrastructure for individual download cancellation Wraps each download in its own asyncio.Task and tracks download_id to Task mapping in _active_download_tasks. This enables cancelling specific downloads without stopping the entire worker. Shutdown now cancels active downloads when wait_for_current=False. --- src/rheo/downloads/worker_pool/pool.py | 41 ++++++++++++--- tests/downloads/worker_pool/test_cancel.py | 58 ++++++++++++++++++++++ 2 files changed, 92 insertions(+), 7 deletions(-) create mode 100644 tests/downloads/worker_pool/test_cancel.py diff --git a/src/rheo/downloads/worker_pool/pool.py b/src/rheo/downloads/worker_pool/pool.py index e4ce2d7..0171114 100644 --- a/src/rheo/downloads/worker_pool/pool.py +++ b/src/rheo/downloads/worker_pool/pool.py @@ -113,6 +113,8 @@ def __init__( } self._file_exists_strategy = file_exists_strategy self._emitter = emitter or EventEmitter(self._logger) + # Track active download tasks by download_id, used for cancellation. + self._active_download_tasks: dict[str, asyncio.Task[None]] = {} @property def active_tasks(self) -> tuple[asyncio.Task[None], ...]: @@ -122,6 +124,11 @@ def active_tasks(self) -> tuple[asyncio.Task[None], ...]: """ return tuple(self._worker_tasks) + @property + def active_download_tasks(self) -> dict[str, asyncio.Task[None]]: + """Snapshot of currently active download tasks by download_id.""" + return self._active_download_tasks.copy() + @property def is_running(self) -> bool: """True if pool has been started and not yet stopped.""" @@ -165,7 +172,9 @@ async def shutdown(self, wait_for_current: bool = True) -> None: self._request_shutdown() if not wait_for_current: - # Cancel all worker tasks for immediate shutdown + # Cancel all tasks (downloads and workers) for immediate shutdown + for task in self._active_download_tasks.values(): + task.cancel() for task in self._worker_tasks: task.cancel() @@ -246,13 +255,31 @@ async def _process_queue(self, worker: BaseWorker) -> None: self._logger.debug( f"Downloading {file_config.url} to {destination_path}" ) - await worker.download( - str(file_config.url), - destination_path, - download_id=file_config.id, - hash_config=file_config.hash_config, - file_exists_strategy=file_config.file_exists_strategy, + + # Wrap download in a task to track it and allow cancellation. + download_task = asyncio.create_task( + worker.download( + str(file_config.url), + destination_path, + download_id=file_config.id, + hash_config=file_config.hash_config, + file_exists_strategy=file_config.file_exists_strategy, + ) ) + self._active_download_tasks[file_config.id] = download_task + + # Consider moving task creation, execution and cancellation + # handling into it's own helper method. + try: + await download_task + except asyncio.CancelledError: + if download_task.cancelled(): + self._logger.debug(f"Download cancelled: {file_config.url}") + continue + raise + finally: + self._active_download_tasks.pop(file_config.id, None) + self._logger.debug( f"Downloaded {file_config.url} to {destination_path}" ) diff --git a/tests/downloads/worker_pool/test_cancel.py b/tests/downloads/worker_pool/test_cancel.py new file mode 100644 index 0000000..7fe79b4 --- /dev/null +++ b/tests/downloads/worker_pool/test_cancel.py @@ -0,0 +1,58 @@ +"""Tests for selective download cancellation.""" + +import asyncio +import typing as t + +import pytest + +from rheo.domain.file_config import FileConfig +from rheo.downloads.worker_pool.pool import WorkerPool + +if t.TYPE_CHECKING: + from tests.downloads.conftest import WorkerFactoryMaker + + +class TestTaskTracking: + """Test download task tracking in pool.""" + + @pytest.mark.asyncio + async def test_active_download_tasks_empty_initially( + self, + make_worker_pool: t.Callable[..., WorkerPool], + ) -> None: + """Pool should have no active downloads before start.""" + pool = make_worker_pool() + assert pool.active_download_tasks == {} + + @pytest.mark.asyncio + async def test_active_download_task_lifecycle( + self, + mock_aio_client, + make_worker_pool: t.Callable[..., WorkerPool], + real_priority_queue, + slow_download_mock, + make_mock_worker_factory: "WorkerFactoryMaker", + ) -> None: + """Task should be tracked during download and cleared after completion.""" + download = slow_download_mock(download_time=0.2) + worker_factory = make_mock_worker_factory(download_side_effect=download) + pool = make_worker_pool( + worker_factory=worker_factory, + queue=real_priority_queue, + ) + file_config = FileConfig(url="http://example.com/file.txt") + await real_priority_queue.add([file_config]) + + await pool.start(mock_aio_client) + await asyncio.sleep(0.05) # Let worker pick up item + + # Task should be tracked while in progress + assert file_config.id in pool.active_download_tasks + + # Wait for completion + await real_priority_queue.join() + + # Task should be cleared after completion + assert file_config.id not in pool.active_download_tasks + + await pool.shutdown(wait_for_current=False) From b19a7413455c583b819f4dccd2b741b2d5318606 Mon Sep 17 00:00:00 2001 From: plutopulp Date: Tue, 16 Dec 2025 16:47:33 +0100 Subject: [PATCH 2/7] Add cooperative cancellation for queued downloads Adds _cancelled_ids set to pool. When worker dequeues an item, checks if the download_id is in the cancelled set. If so, emits download.cancelled event with cancelled_from=QUEUED and skips processing. This enables cancelling downloads that haven't started yet. --- src/rheo/downloads/worker_pool/pool.py | 39 ++++++++++- tests/downloads/worker_pool/test_cancel.py | 79 ++++++++++++++++++++++ 2 files changed, 117 insertions(+), 1 deletion(-) diff --git a/src/rheo/downloads/worker_pool/pool.py b/src/rheo/downloads/worker_pool/pool.py index 0171114..deafd90 100644 --- a/src/rheo/downloads/worker_pool/pool.py +++ b/src/rheo/downloads/worker_pool/pool.py @@ -5,9 +5,10 @@ from enum import StrEnum from pathlib import Path +from ...domain.cancellation import CancelledFrom from ...domain.exceptions import WorkerPoolAlreadyStartedError from ...domain.file_config import FileConfig, FileExistsStrategy -from ...events import EventEmitter +from ...events import DownloadCancelledEvent, EventEmitter from ...events.base import BaseEmitter from ...infrastructure.http import BaseHttpClient from ..queue import PriorityDownloadQueue @@ -115,6 +116,8 @@ def __init__( self._emitter = emitter or EventEmitter(self._logger) # Track active download tasks by download_id, used for cancellation. self._active_download_tasks: dict[str, asyncio.Task[None]] = {} + # IDs of downloads cancelled while queued, checked before starting download. + self._cancelled_ids: set[str] = set() @property def active_tasks(self) -> tuple[asyncio.Task[None], ...]: @@ -252,6 +255,10 @@ async def _process_queue(self, worker: BaseWorker) -> None: if await self._handle_shutdown_and_requeue(file_config): break + # Check if this download was cancelled while queued + if await self._handle_cancelled_queued(file_config): + continue + self._logger.debug( f"Downloading {file_config.url} to {destination_path}" ) @@ -334,6 +341,36 @@ async def _handle_shutdown_and_requeue(self, file_config: FileConfig) -> bool: self.queue.task_done(file_config.id) return shutdown_is_set + async def _handle_cancelled_queued(self, file_config: FileConfig) -> bool: + """Check if download was cancelled while queued and handle if so. + + If the download ID is in _cancelled_ids, emits a cancelled event, + and signals caller to skip this download. + + Args: + file_config: The file configuration to check + + Returns: + True if download was cancelled (caller should continue), False otherwise + """ + if file_config.id not in self._cancelled_ids: + return False + + self._cancelled_ids.discard(file_config.id) + + self._logger.debug(f"Skipping cancelled download: {file_config.url}") + + await self._emitter.emit( + "download.cancelled", + DownloadCancelledEvent( + download_id=file_config.id, + url=str(file_config.url), + cancelled_from=CancelledFrom.QUEUED, + ), + ) + + return True + async def _wait_for_workers_and_clear(self) -> None: """Wait for all worker tasks to complete and clear the task list. diff --git a/tests/downloads/worker_pool/test_cancel.py b/tests/downloads/worker_pool/test_cancel.py index 7fe79b4..48206c6 100644 --- a/tests/downloads/worker_pool/test_cancel.py +++ b/tests/downloads/worker_pool/test_cancel.py @@ -5,8 +5,11 @@ import pytest +from rheo.domain.cancellation import CancelledFrom +from rheo.domain.downloads import DownloadStatus from rheo.domain.file_config import FileConfig from rheo.downloads.worker_pool.pool import WorkerPool +from rheo.tracking import DownloadTracker if t.TYPE_CHECKING: from tests.downloads.conftest import WorkerFactoryMaker @@ -56,3 +59,79 @@ async def test_active_download_task_lifecycle( assert file_config.id not in pool.active_download_tasks await pool.shutdown(wait_for_current=False) + + +class TestCooperativeCancellation: + """Test cooperative cancellation for queued downloads.""" + + @pytest.mark.asyncio + async def test_cancelled_queued_download_skipped( + self, + mock_aio_client, + make_worker_pool: t.Callable[..., WorkerPool], + real_priority_queue, + tracker: DownloadTracker, + ) -> None: + """Download in _cancelled_ids should be skipped when dequeued.""" + pool = make_worker_pool(queue=real_priority_queue) + file_config = FileConfig(url="http://example.com/file.txt") + await real_priority_queue.add([file_config]) + + # Mark as cancelled before worker picks it up + pool._cancelled_ids.add(file_config.id) + + await pool.start(mock_aio_client) + await real_priority_queue.join() + + # Should be marked cancelled in tracker + info = tracker.get_download_info(file_config.id) + assert info is not None + assert info.status == DownloadStatus.CANCELLED + + await pool.shutdown(wait_for_current=False) + + @pytest.mark.asyncio + async def test_cancelled_queued_emits_event_with_queued_state( + self, + mock_aio_client, + make_worker_pool: t.Callable[..., WorkerPool], + real_priority_queue, + ) -> None: + """Cancelled queued download should emit event with cancelled_from=QUEUED.""" + pool = make_worker_pool(queue=real_priority_queue) + file_config = FileConfig(url="http://example.com/file.txt") + await real_priority_queue.add([file_config]) + + events: list = [] + pool._emitter.on("download.cancelled", lambda e: events.append(e)) + + pool._cancelled_ids.add(file_config.id) + + await pool.start(mock_aio_client) + await real_priority_queue.join() + + assert len(events) == 1 + assert events[0].cancelled_from == CancelledFrom.QUEUED + + await pool.shutdown(wait_for_current=False) + + @pytest.mark.asyncio + async def test_cancelled_id_removed_after_processing( + self, + mock_aio_client, + make_worker_pool: t.Callable[..., WorkerPool], + real_priority_queue, + ) -> None: + """Cancelled ID should be removed from set after being processed.""" + pool = make_worker_pool(queue=real_priority_queue) + file_config = FileConfig(url="http://example.com/file.txt") + await real_priority_queue.add([file_config]) + + pool._cancelled_ids.add(file_config.id) + + await pool.start(mock_aio_client) + await real_priority_queue.join() + + assert file_config.id not in pool._cancelled_ids + + await pool.shutdown(wait_for_current=False) From 314bd516ae374a0cea31cb2a18757505690ad4f0 Mon Sep 17 00:00:00 2001 From: plutopulp Date: Tue, 16 Dec 2025 17:01:21 +0100 Subject: [PATCH 3/7] Add pool.cancel() method for selective download cancellation Implements cancel(download_id) -> bool on WorkerPool: - For in-progress downloads: cancels the task directly - For queued downloads: adds to _cancelled_ids for cooperative cancellation - Returns False for unknown IDs (manager will check tracker for terminal state) --- src/rheo/downloads/worker_pool/base.py | 8 ++ src/rheo/downloads/worker_pool/pool.py | 28 +++++ tests/downloads/worker_pool/test_cancel.py | 137 +++++++++++++++++++++ 3 files changed, 173 insertions(+) diff --git a/src/rheo/downloads/worker_pool/base.py b/src/rheo/downloads/worker_pool/base.py index 139fbec..31e6367 100644 --- a/src/rheo/downloads/worker_pool/base.py +++ b/src/rheo/downloads/worker_pool/base.py @@ -41,3 +41,11 @@ def is_running(self) -> bool: @abstractmethod def create_worker(self, client: BaseHttpClient) -> BaseWorker: """Create a fresh worker with isolated dependencies for each task.""" + + @abstractmethod + async def cancel(self, download_id: str) -> bool: + """Cancel a specific download if it is active or queued. + + Returns True if found and cancelled, False if the download is not + in the pool's scope (caller should check tracker). + """ diff --git a/src/rheo/downloads/worker_pool/pool.py b/src/rheo/downloads/worker_pool/pool.py index deafd90..5373569 100644 --- a/src/rheo/downloads/worker_pool/pool.py +++ b/src/rheo/downloads/worker_pool/pool.py @@ -185,6 +185,34 @@ async def shutdown(self, wait_for_current: bool = True) -> None: # and clean up task references await self._wait_for_workers_and_clear() + async def cancel(self, download_id: str) -> bool: + """Cancel a specific download by ID. + + Handles both in-progress downloads (via task cancellation) and + queued downloads (via cooperative cancellation). + + Args: + download_id: The download ID to cancel + + Returns: + True if download was active or queued and cancelled. + False if download_id is not known to the pool (may be terminal or + never existed - caller should check tracker). + """ + # Check if download is currently in progress + task = self._active_download_tasks.get(download_id) + if task is not None: + task.cancel() + return True + + # Check if download is queued but not yet started + if download_id in self.queue._queued_ids: + self._cancelled_ids.add(download_id) + return True + + # Not in pool's scope so let caller determine why + return False + def _request_shutdown(self) -> None: """Signal workers to stop accepting new work. diff --git a/tests/downloads/worker_pool/test_cancel.py b/tests/downloads/worker_pool/test_cancel.py index 48206c6..b27c2a4 100644 --- a/tests/downloads/worker_pool/test_cancel.py +++ b/tests/downloads/worker_pool/test_cancel.py @@ -135,3 +135,140 @@ async def test_cancelled_id_removed_after_processing( assert file_config.id not in pool._cancelled_ids await pool.shutdown(wait_for_current=False) + + +class TestPoolCancel: + """Test WorkerPool.cancel() method.""" + + @pytest.mark.asyncio + async def test_cancel_in_progress_returns_true( + self, + mock_aio_client, + make_worker_pool: t.Callable[..., WorkerPool], + real_priority_queue, + slow_download_mock, + make_mock_worker_factory: "WorkerFactoryMaker", + ) -> None: + """Cancelling an in-progress download should return True.""" + download = slow_download_mock(download_time=1.0) + worker_factory = make_mock_worker_factory(download_side_effect=download) + pool = make_worker_pool( + worker_factory=worker_factory, + queue=real_priority_queue, + ) + file_config = FileConfig(url="http://example.com/file.txt") + await real_priority_queue.add([file_config]) + + await pool.start(mock_aio_client) + await asyncio.sleep(0.1) # Let worker pick up item + + result = await pool.cancel(file_config.id) + + assert result is True + await pool.shutdown(wait_for_current=False) + + @pytest.mark.asyncio + async def test_cancel_in_progress_cancels_task( + self, + mock_aio_client, + make_worker_pool: t.Callable[..., WorkerPool], + real_priority_queue, + slow_download_mock, + make_mock_worker_factory: "WorkerFactoryMaker", + ) -> None: + """Cancelling an in-progress download should cancel its task.""" + download = slow_download_mock(download_time=1.0) + worker_factory = make_mock_worker_factory(download_side_effect=download) + pool = make_worker_pool( + worker_factory=worker_factory, + queue=real_priority_queue, + ) + file_config = FileConfig(url="http://example.com/file.txt") + await real_priority_queue.add([file_config]) + + await pool.start(mock_aio_client) + await asyncio.sleep(0.1) + + task = pool._active_download_tasks.get(file_config.id) + assert task is not None + + await pool.cancel(file_config.id) + await asyncio.sleep(0.05) + + assert task.cancelled() + await pool.shutdown(wait_for_current=False) + + @pytest.mark.asyncio + async def test_cancel_queued_returns_true( + self, + mock_aio_client, + make_worker_pool: t.Callable[..., WorkerPool], + real_priority_queue, + slow_download_mock, + make_mock_worker_factory: "WorkerFactoryMaker", + ) -> None: + """Cancelling a queued download should return True.""" + # Use slow worker and add two items - first blocks, second is queued + download = slow_download_mock(download_time=1.0) + worker_factory = make_mock_worker_factory(download_side_effect=download) + pool = make_worker_pool( + worker_factory=worker_factory, + queue=real_priority_queue, + max_workers=1, + ) + file1 = FileConfig(url="http://example.com/file1.txt") + file2 = FileConfig(url="http://example.com/file2.txt") + await real_priority_queue.add([file1, file2]) + + await pool.start(mock_aio_client) + await asyncio.sleep(0.1) # Let worker pick up file1 + + # file2 should still be queued + result = await pool.cancel(file2.id) + + assert result is True + await pool.shutdown(wait_for_current=False) + + @pytest.mark.asyncio + async def test_cancel_queued_adds_to_cancelled_ids( + self, + mock_aio_client, + make_worker_pool: t.Callable[..., WorkerPool], + real_priority_queue, + slow_download_mock, + make_mock_worker_factory: "WorkerFactoryMaker", + ) -> None: + """Cancelling a queued download should add to _cancelled_ids.""" + download = slow_download_mock(download_time=1.0) + worker_factory = make_mock_worker_factory(download_side_effect=download) + pool = make_worker_pool( + worker_factory=worker_factory, + queue=real_priority_queue, + max_workers=1, + ) + file1 = FileConfig(url="http://example.com/file1.txt") + file2 = FileConfig(url="http://example.com/file2.txt") + await real_priority_queue.add([file1, file2]) + + await pool.start(mock_aio_client) + await asyncio.sleep(0.1) + + await pool.cancel(file2.id) + + assert file2.id in pool._cancelled_ids + await pool.shutdown(wait_for_current=False) + + @pytest.mark.asyncio + async def test_cancel_unknown_returns_false( + self, + mock_aio_client, + make_worker_pool: t.Callable[..., WorkerPool], + ) -> None: + """Cancelling an unknown download should return False.""" + pool = make_worker_pool() + await pool.start(mock_aio_client) + + result = await pool.cancel("unknown-id") + + assert result is False + await pool.shutdown(wait_for_current=False) From 59e74f55af2e7fc2bef5b1d351a4f1012720560f Mon Sep 17 00:00:00 2001 From: plutopulp Date: Tue, 16 Dec 2025 17:23:14 +0100 Subject: [PATCH 4/7] Add manager.cancel() method for selective download cancellation Implements cancel(download_id) -> CancelResult on DownloadManager: - Delegates to pool for active/queued downloads - Uses tracker to distinguish ALREADY_TERMINAL vs NOT_FOUND - Workers continue processing remaining queue items after cancellation --- src/rheo/downloads/manager.py | 32 +++++++ tests/downloads/manager/test_cancel.py | 124 +++++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 tests/downloads/manager/test_cancel.py diff --git a/src/rheo/downloads/manager.py b/src/rheo/downloads/manager.py index 64bb805..57ab089 100644 --- a/src/rheo/downloads/manager.py +++ b/src/rheo/downloads/manager.py @@ -11,6 +11,7 @@ import aiofiles.os +from ..domain.cancellation import CancelResult from ..domain.downloads import DownloadInfo, DownloadStats from ..domain.exceptions import ManagerNotInitialisedError, PendingDownloadsError from ..domain.file_config import FileConfig, FileExistsStrategy @@ -361,6 +362,37 @@ async def wait_until_complete(self, timeout: float | None = None) -> None: else: await self.queue.join() + async def cancel(self, download_id: str) -> CancelResult: + """Cancel a specific download by ID. + + Cancels both in-progress and queued downloads. Workers continue + processing remaining queue items after cancellation. + + Args: + download_id: The ID of the download to cancel (from FileConfig.id) + + Returns: + CancelResult indicating what happened: + - CANCELLED: Download was found and cancelled + - NOT_FOUND: Download ID was never queued + - ALREADY_TERMINAL: Download already completed/failed/skipped/cancelled + + Example: + result = await manager.cancel(file_config.id) + if result == CancelResult.CANCELLED: + print("Download stopped") + """ + # Try to cancel via pool (handles active and queued) + if await self._worker_pool.cancel(download_id): + return CancelResult.CANCELLED + + # Not active or queued - check if it's in a terminal state + info = self._tracker.get_download_info(download_id) + if info is not None and info.status.is_terminal: + return CancelResult.ALREADY_TERMINAL + + return CancelResult.NOT_FOUND + async def open(self) -> None: """Manually initialise the manager. diff --git a/tests/downloads/manager/test_cancel.py b/tests/downloads/manager/test_cancel.py new file mode 100644 index 0000000..ea048da --- /dev/null +++ b/tests/downloads/manager/test_cancel.py @@ -0,0 +1,124 @@ +"""Unit tests for DownloadManager.cancel() method.""" + +import typing as t +from unittest.mock import Mock + +import pytest + +from rheo.domain.cancellation import CancelResult +from rheo.domain.downloads import DownloadInfo, DownloadStatus +from rheo.downloads.manager import DownloadManager +from rheo.downloads.worker_pool.factory import WorkerPoolFactory + +if t.TYPE_CHECKING: + from loguru import Logger + + from rheo.tracking import DownloadTracker + + +class TestManagerCancel: + """Unit tests for manager.cancel() logic. + + These tests mock the pool and tracker to test the CancelResult mapping: + - Pool returns True → CANCELLED + - Pool returns False + tracker has terminal → ALREADY_TERMINAL + - Pool returns False + tracker has no info → NOT_FOUND + """ + + @pytest.mark.asyncio + async def test_cancel_returns_cancelled_when_pool_cancels( + self, + mock_worker_pool: Mock, + mock_pool_factory: WorkerPoolFactory, + tracker: "DownloadTracker", + mock_logger: "Logger", + ) -> None: + """Returns CANCELLED when pool.cancel() returns True.""" + mock_worker_pool.cancel.return_value = True + + manager = DownloadManager( + worker_pool_factory=mock_pool_factory, + tracker=tracker, + logger=mock_logger, + ) + + result = await manager.cancel("some-id") + + assert result == CancelResult.CANCELLED + mock_worker_pool.cancel.assert_called_once_with("some-id") + + @pytest.mark.asyncio + async def test_cancel_returns_already_terminal_when_completed( + self, + mock_worker_pool: Mock, + mock_pool_factory: WorkerPoolFactory, + tracker: "DownloadTracker", + mock_logger: "Logger", + ) -> None: + """Returns ALREADY_TERMINAL when pool says no but tracker has completed.""" + mock_worker_pool.cancel.return_value = False + + # Set up tracker with completed download + tracker._downloads["completed-id"] = DownloadInfo( + id="completed-id", + url="http://example.com/file.txt", + status=DownloadStatus.COMPLETED, + ) + + manager = DownloadManager( + worker_pool_factory=mock_pool_factory, + tracker=tracker, + logger=mock_logger, + ) + + result = await manager.cancel("completed-id") + + assert result == CancelResult.ALREADY_TERMINAL + + @pytest.mark.asyncio + async def test_cancel_returns_already_terminal_when_failed( + self, + mock_worker_pool: Mock, + mock_pool_factory: WorkerPoolFactory, + tracker: "DownloadTracker", + mock_logger: "Logger", + ) -> None: + """Returns ALREADY_TERMINAL when pool says no but tracker has failed.""" + mock_worker_pool.cancel.return_value = False + + tracker._downloads["failed-id"] = DownloadInfo( + id="failed-id", + url="http://example.com/file.txt", + status=DownloadStatus.FAILED, + ) + + manager = DownloadManager( + worker_pool_factory=mock_pool_factory, + tracker=tracker, + logger=mock_logger, + ) + + result = await manager.cancel("failed-id") + + assert result == CancelResult.ALREADY_TERMINAL + + @pytest.mark.asyncio + async def test_cancel_returns_not_found_when_unknown( + self, + mock_worker_pool: Mock, + mock_pool_factory: WorkerPoolFactory, + tracker: "DownloadTracker", + mock_logger: "Logger", + ) -> None: + """Returns NOT_FOUND when pool says no and tracker has no record.""" + mock_worker_pool.cancel.return_value = False + + manager = DownloadManager( + worker_pool_factory=mock_pool_factory, + tracker=tracker, + logger=mock_logger, + ) + + result = await manager.cancel("unknown-id") + + assert result == CancelResult.NOT_FOUND From bfdf262971aa40daab2539572a95c5f4d0dcc780 Mon Sep 17 00:00:00 2001 From: plutopulp Date: Tue, 16 Dec 2025 18:00:46 +0100 Subject: [PATCH 5/7] test(integration): add cancellation integration tests Add comprehensive integration tests for selective download cancellation: - TestCancelInProgress: verify partial file cleanup, tracker status, events - TestCancelQueued: verify queued downloads skipped, correct event state - TestCancelAndContinue: verify workers continue after cancellation - TestCancelAlreadyTerminal: verify ALREADY_TERMINAL for completed downloads - TestCancelEventSubscription: verify external subscribers receive events Uses slow_response callback to control timing for reliable queue state testing. --- tests/integration/test_cancellation.py | 436 +++++++++++++++++++++++++ 1 file changed, 436 insertions(+) create mode 100644 tests/integration/test_cancellation.py diff --git a/tests/integration/test_cancellation.py b/tests/integration/test_cancellation.py new file mode 100644 index 0000000..27640c2 --- /dev/null +++ b/tests/integration/test_cancellation.py @@ -0,0 +1,436 @@ +"""Integration tests for selective download cancellation.""" + +import asyncio +from pathlib import Path + +import pytest +from aioresponses import CallbackResult, aioresponses + +from rheo import DownloadManager, DownloadStatus +from rheo.domain import FileConfig +from rheo.domain.cancellation import CancelledFrom, CancelResult +from rheo.events.models import DownloadCancelledEvent + + +def slow_response(delay: float = 0.5, body: bytes = b"content"): + """Create a callback that delays before returning response.""" + + async def callback(url, **kwargs): + await asyncio.sleep(delay) + return CallbackResult(status=200, body=body) + + return callback + + +class TestCancelInProgress: + """Integration tests for cancelling in-progress downloads.""" + + @pytest.mark.asyncio + async def test_cancel_in_progress_cleans_up_partial_file( + self, + tmp_path: Path, + aio_client, + ) -> None: + """Cancelling mid-download should delete partial file.""" + with aioresponses() as mock: + # Slow response to give time to cancel + mock.get( + "http://example.com/large.txt", + callback=slow_response(delay=1.0, body=b"x" * 10000), + repeat=True, + ) + + async with DownloadManager( + client=aio_client, + download_dir=tmp_path, + max_concurrent=1, + ) as manager: + file_config = FileConfig( + url="http://example.com/large.txt", filename="large.txt" + ) + await manager.add([file_config]) + await asyncio.sleep(0.1) # Let download start + expected_path = tmp_path / "large.txt" + assert expected_path.exists() + + await manager.cancel(file_config.id) + await asyncio.sleep(0.2) # Let cleanup complete + + # Partial file should be cleaned up + assert not expected_path.exists() + + @pytest.mark.asyncio + async def test_cancel_in_progress_updates_tracker_status( + self, + tmp_path: Path, + aio_client, + ) -> None: + """Cancelled download should show CANCELLED in tracker.""" + with aioresponses() as mock: + mock.get( + "http://example.com/file.txt", + callback=slow_response(delay=1.0), + repeat=True, + ) + + async with DownloadManager( + client=aio_client, + download_dir=tmp_path, + max_concurrent=1, + ) as manager: + file_config = FileConfig( + url="http://example.com/file.txt", filename="file.txt" + ) + await manager.add([file_config]) + await asyncio.sleep(0.1) + expected_path = tmp_path / "file.txt" + assert expected_path.exists() + + await manager.cancel(file_config.id) + await asyncio.sleep(0.2) + + info = manager.get_download_info(file_config.id) + assert info is not None + assert info.status == DownloadStatus.CANCELLED + + @pytest.mark.asyncio + async def test_cancel_in_progress_emits_event_with_in_progress_state( + self, + tmp_path: Path, + aio_client, + ) -> None: + """Cancelled in-progress download should emit event with + cancelled_from=IN_PROGRESS.""" + events: list[DownloadCancelledEvent] = [] + + with aioresponses() as mock: + mock.get( + "http://example.com/file.txt", + callback=slow_response(delay=1.0), + repeat=True, + ) + + async with DownloadManager( + client=aio_client, + download_dir=tmp_path, + max_concurrent=1, + ) as manager: + manager.on("download.cancelled", lambda e: events.append(e)) + + file_config = FileConfig( + url="http://example.com/file.txt", filename="file.txt" + ) + await manager.add([file_config]) + await asyncio.sleep(0.1) + + await manager.cancel(file_config.id) + await asyncio.sleep(0.2) + + assert len(events) == 1 + assert events[0].cancelled_from == CancelledFrom.IN_PROGRESS + assert events[0].download_id == file_config.id + + +class TestCancelQueued: + """Integration tests for cancelling queued downloads.""" + + @pytest.mark.asyncio + async def test_cancel_queued_never_starts_download( + self, + tmp_path: Path, + aio_client, + ) -> None: + """Cancelled queued download should never emit started event.""" + started_ids: list[str] = [] + + with aioresponses() as mock: + # slow_file is slow so queued_file stays queued + mock.get( + "http://example.com/slow.txt", + callback=slow_response(delay=0.5, body=b"content1"), + repeat=True, + ) + mock.get("http://example.com/queued.txt", body=b"content2", repeat=True) + + async with DownloadManager( + client=aio_client, + download_dir=tmp_path, + max_concurrent=1, + ) as manager: + manager.on( + "download.started", lambda e: started_ids.append(e.download_id) + ) + + # slow_file added first - worker grabs it immediately, blocks on + # slow HTTP + slow_file = FileConfig( + url="http://example.com/slow.txt", filename="slow.txt" + ) + # queued_file added second - stays in queue while worker is busy + queued_file = FileConfig( + url="http://example.com/queued.txt", filename="queued.txt" + ) + await manager.add([slow_file, queued_file]) + await asyncio.sleep(0.1) + + # Cancel queued_file while slow_file is downloading + result = await manager.cancel(queued_file.id) + assert result == CancelResult.CANCELLED + + await manager.wait_until_complete() + + # queued_file should never have started + assert slow_file.id in started_ids + assert queued_file.id not in started_ids + + @pytest.mark.asyncio + async def test_cancel_queued_emits_event_with_queued_state( + self, + tmp_path: Path, + aio_client, + ) -> None: + """Cancelled queued download should emit event with cancelled_from=QUEUED.""" + events: list[DownloadCancelledEvent] = [] + + with aioresponses() as mock: + # slow_file blocks worker so queued_file stays queued + mock.get( + "http://example.com/slow.txt", + callback=slow_response(delay=0.5, body=b"content1"), + repeat=True, + ) + mock.get("http://example.com/queued.txt", body=b"content2", repeat=True) + + async with DownloadManager( + client=aio_client, + download_dir=tmp_path, + max_concurrent=1, + ) as manager: + manager.on("download.cancelled", lambda e: events.append(e)) + + # slow_file added first - worker grabs it, blocks on slow HTTP + slow_file = FileConfig( + url="http://example.com/slow.txt", filename="slow.txt" + ) + # queued_file added second - stays in queue + queued_file = FileConfig( + url="http://example.com/queued.txt", filename="queued.txt" + ) + await manager.add([slow_file, queued_file]) + await asyncio.sleep(0.1) + + await manager.cancel(queued_file.id) + await manager.wait_until_complete() + + # Should have one cancelled event for queued_file + cancelled_events = [ + e for e in events if e.download_id == queued_file.id + ] + assert len(cancelled_events) == 1 + assert cancelled_events[0].cancelled_from == CancelledFrom.QUEUED + + +class TestCancelAndContinue: + """Test that workers continue after cancellation.""" + + @pytest.mark.asyncio + async def test_workers_continue_after_cancel( + self, + tmp_path: Path, + aio_client, + ) -> None: + """Cancelling one download should not affect others.""" + with aioresponses() as mock: + # file1 is slow so file2 and file3 stay queued + mock.get( + "http://example.com/file1.txt", + callback=slow_response(delay=0.3, body=b"content1"), + repeat=True, + ) + mock.get("http://example.com/file2.txt", body=b"content2", repeat=True) + mock.get("http://example.com/file3.txt", body=b"content3", repeat=True) + + async with DownloadManager( + client=aio_client, + download_dir=tmp_path, + max_concurrent=1, + ) as manager: + file1 = FileConfig( + url="http://example.com/file1.txt", filename="file1.txt" + ) + file2 = FileConfig( + url="http://example.com/file2.txt", filename="file2.txt" + ) + file3 = FileConfig( + url="http://example.com/file3.txt", filename="file3.txt" + ) + await manager.add([file1, file2, file3]) + await asyncio.sleep(0.1) + + # Cancel file2 (queued) + await manager.cancel(file2.id) + + await manager.wait_until_complete() + + # file1 and file3 should complete, file2 cancelled + stats = manager.stats + assert stats.completed == 2 + assert stats.cancelled == 1 + + @pytest.mark.asyncio + async def test_cancel_does_not_block_queue( + self, + tmp_path: Path, + aio_client, + ) -> None: + """Cancelling should not cause queue deadlock.""" + with aioresponses() as mock: + # First file is slow + mock.get( + "http://example.com/file0.txt", + callback=slow_response(delay=0.2, body=b"content"), + repeat=True, + ) + for i in range(1, 5): + mock.get( + f"http://example.com/file{i}.txt", body=b"content", repeat=True + ) + + async with DownloadManager( + client=aio_client, + download_dir=tmp_path, + max_concurrent=2, + ) as manager: + files = [ + FileConfig( + url=f"http://example.com/file{i}.txt", filename=f"file{i}.txt" + ) + for i in range(5) + ] + await manager.add(files) + await asyncio.sleep(0.1) + + # Cancel a couple of queued ones + await manager.cancel(files[2].id) + await manager.cancel(files[4].id) + + # Should complete without hanging + await asyncio.wait_for( + manager.wait_until_complete(), + timeout=5.0, + ) + + stats = manager.stats + assert stats.completed + stats.cancelled == 5 + + +class TestCancelAlreadyTerminal: + """Test cancelling downloads that already finished.""" + + @pytest.mark.asyncio + async def test_cancel_completed_returns_already_terminal( + self, + tmp_path: Path, + aio_client, + ) -> None: + """Cancelling completed download returns ALREADY_TERMINAL.""" + with aioresponses() as mock: + mock.get("http://example.com/file.txt", body=b"content") + + async with DownloadManager( + client=aio_client, + download_dir=tmp_path, + max_concurrent=1, + ) as manager: + file_config = FileConfig( + url="http://example.com/file.txt", filename="file.txt" + ) + await manager.add([file_config]) + await manager.wait_until_complete() + + result = await manager.cancel(file_config.id) + + assert result == CancelResult.ALREADY_TERMINAL + + @pytest.mark.asyncio + async def test_cancel_completed_does_not_delete_file( + self, + tmp_path: Path, + aio_client, + ) -> None: + """Cancelling completed download should NOT delete the file.""" + with aioresponses() as mock: + mock.get("https://example.com/file.txt", body=b"downloaded content") + + async with DownloadManager( + client=aio_client, + download_dir=tmp_path, + max_concurrent=1, + ) as manager: + expected_path = tmp_path / "file.txt" + assert not expected_path.exists() + file_config = FileConfig( + url="https://example.com/file.txt", + filename="file.txt", + ) + await manager.add([file_config]) + await manager.wait_until_complete() + + # Verify file was actually created by download + assert expected_path.exists(), "File should exist after download" + original_content = expected_path.read_bytes() + + await manager.cancel(file_config.id) + + # File should still exist after cancel + assert expected_path.exists() + assert expected_path.read_bytes() == original_content + + +class TestCancelEventSubscription: + """Test that cancel events work with manager subscriptions.""" + + @pytest.mark.asyncio + async def test_subscriber_receives_cancel_event( + self, + tmp_path: Path, + aio_client, + ) -> None: + """External subscribers should receive cancel events.""" + received_events: list[DownloadCancelledEvent] = [] + + with aioresponses() as mock: + # Slow response so we have time to cancel + mock.get( + "http://example.com/file.txt", + callback=slow_response(delay=0.5), + repeat=True, + ) + + async with DownloadManager( + client=aio_client, + download_dir=tmp_path, + max_concurrent=1, + ) as manager: + # Subscribe before adding downloads + manager.on("download.cancelled", lambda e: received_events.append(e)) + + file_config = FileConfig( + url="http://example.com/file.txt", filename="file.txt" + ) + await manager.add([file_config]) + await asyncio.sleep(0.1) + + await manager.cancel(file_config.id) + await asyncio.sleep(0.2) + + assert len(received_events) == 1 + event = received_events[0] + assert event.download_id == file_config.id + assert event.url == str(file_config.url) + assert event.cancelled_from in ( + CancelledFrom.QUEUED, + CancelledFrom.IN_PROGRESS, + ) + + await manager.close() From 5e064d0b25f03455ed5258e84d1f322327ea37d1 Mon Sep 17 00:00:00 2001 From: plutopulp Date: Tue, 16 Dec 2025 18:25:10 +0100 Subject: [PATCH 6/7] refactor(pool): extract download execution into _execute_download helper Move task creation, tracking, and cancellation handling logic from _process_queue into dedicated helper method. Improves readability and separates concerns for download task lifecycle management. --- src/rheo/downloads/worker_pool/pool.py | 75 ++++++++++++++++++-------- 1 file changed, 53 insertions(+), 22 deletions(-) diff --git a/src/rheo/downloads/worker_pool/pool.py b/src/rheo/downloads/worker_pool/pool.py index 5373569..fc8bd6b 100644 --- a/src/rheo/downloads/worker_pool/pool.py +++ b/src/rheo/downloads/worker_pool/pool.py @@ -291,29 +291,11 @@ async def _process_queue(self, worker: BaseWorker) -> None: f"Downloading {file_config.url} to {destination_path}" ) - # Wrap download in a task to track it and allow cancellation. - download_task = asyncio.create_task( - worker.download( - str(file_config.url), - destination_path, - download_id=file_config.id, - hash_config=file_config.hash_config, - file_exists_strategy=file_config.file_exists_strategy, - ) + cancelled = await self._execute_download( + worker, file_config, destination_path ) - self._active_download_tasks[file_config.id] = download_task - - # Consider moving task creation, execution and cancellation - # handling into it's own helper method. - try: - await download_task - except asyncio.CancelledError: - if download_task.cancelled(): - self._logger.debug(f"Download cancelled: {file_config.url}") - continue - raise - finally: - self._active_download_tasks.pop(file_config.id, None) + if cancelled: + continue self._logger.debug( f"Downloaded {file_config.url} to {destination_path}" @@ -399,6 +381,55 @@ async def _handle_cancelled_queued(self, file_config: FileConfig) -> bool: return True + async def _execute_download( + self, + worker: BaseWorker, + file_config: FileConfig, + destination_path: Path, + ) -> bool: + """Execute download as a tracked task with cancellation support. + + Creates an asyncio task for the download, tracks it in _active_download_tasks, + and handles cancellation. Individual download cancellation (via pool.cancel()) + is distinguished from worker shutdown cancellation. + + Args: + worker: The worker instance to use for the download + file_config: Configuration for the file to download + destination_path: Local path to save the file + + Returns: + True if the download was cancelled (caller should continue to next item), + False if completed normally + + Raises: + asyncio.CancelledError: If the worker task itself was cancelled (shutdown) + """ + download_task = asyncio.create_task( + worker.download( + str(file_config.url), + destination_path, + download_id=file_config.id, + hash_config=file_config.hash_config, + file_exists_strategy=file_config.file_exists_strategy, + ) + ) + self._active_download_tasks[file_config.id] = download_task + + try: + await download_task + return False + except asyncio.CancelledError: + # Distinguish between individual download cancellation and worker shutdown. + # If download_task.cancelled() is True, this specific download was cancelled + # via pool.cancel(). Otherwise, the worker task itself was cancelled. + if download_task.cancelled(): + self._logger.debug(f"Download cancelled: {file_config.url}") + return True + raise + finally: + self._active_download_tasks.pop(file_config.id, None) + async def _wait_for_workers_and_clear(self) -> None: """Wait for all worker tasks to complete and clear the task list. From 95310f5da38c1a8ce4f72603f83261d80a77c1a3 Mon Sep 17 00:00:00 2001 From: plutopulp Date: Tue, 16 Dec 2025 18:48:22 +0100 Subject: [PATCH 7/7] docs: document selective download cancellation feature - Add cancellation to feature lists in READMEs - Add Selective Cancellation section with API example - Document CancelResult and CancelledFrom enums in architecture - Add Cancellation Flow to architecture data flows - Update roadmap to mark feature as done --- README.md | 1 + docs/ARCHITECTURE.md | 37 +++++++++++++++++++++++++++++++++++-- docs/README.md | 37 +++++++++++++++++++++++++++++++++++++ docs/ROADMAP.md | 1 + 4 files changed, 74 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index da4af6c..6207e92 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ asyncio.run(main()) - Concurrent downloads with worker pool - Priority queue +- Selective cancellation (cancel individual downloads by ID) - Hash validation (MD5, SHA256, SHA512) - Retry logic with exponential backoff - Real-time speed & ETA tracking diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 180c992..4818a0f 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -67,7 +67,9 @@ Key pieces: - `FileExistsStrategy`: Enum for handling existing files (SKIP, OVERWRITE, ERROR) - `DownloadInfo`: Current state of a download (includes final average speed and validation state) - `DownloadStatus`: Enum for states (queued, pending, in_progress, completed, failed, skipped, cancelled) -- `DownloadStats`: Aggregated statistics +- `DownloadStats`: Aggregated statistics (includes cancelled count) +- `CancelResult`: Enum for cancel operation results (CANCELLED, NOT_FOUND, ALREADY_TERMINAL) +- `CancelledFrom`: Enum indicating cancellation source (QUEUED, IN_PROGRESS) - `HashConfig`: Hash validation configuration (algorithm and expected hash) - `HashAlgorithm`: Supported hash algorithms (MD5, SHA256, SHA512) - `ValidationResult`: Hash validation result with `is_valid` property (algorithm, expected/calculated hash, duration) @@ -101,6 +103,7 @@ Key pieces: - Handles graceful vs immediate shutdown semantics - Re-queues unstarted downloads during shutdown - Maintains task lifecycle and cleanup +- Supports selective cancellation of individual downloads (queued or in-progress) **DownloadWorker**: @@ -170,7 +173,7 @@ Key pieces: - `DownloadCompletedEvent` - Success (includes destination_path, elapsed_seconds, average_speed_bps, optional `ValidationResult`) - `DownloadFailedEvent` - Failure (includes `ErrorInfo`, optional `ValidationResult` for hash mismatches) - `DownloadSkippedEvent` - Skipped due to file-exists strategy (includes reason, destination_path) - - `DownloadCancelledEvent` - Cancelled by caller + - `DownloadCancelledEvent` - Cancelled by caller (includes `cancelled_from`: QUEUED or IN_PROGRESS) - `DownloadRetryingEvent` - Before retry (with retry count, delay, `ErrorInfo`) - `DownloadValidatingEvent` - Validation started (algorithm) - `ErrorInfo`: Structured error model with `exc_type`, `message`, optional `traceback` @@ -389,6 +392,36 @@ config4 = FileConfig(url="https://example.com/file.zip", destination_subdir="dir Note: Validation events still use `worker.*` namespace and will be renamed to `download.*` in a future release. +### Cancellation Flow + +Selective cancellation allows stopping individual downloads without affecting others: + +```text +1. User calls manager.cancel(download_id) +2. Manager delegates to pool.cancel(download_id) +3. Pool checks if download is in-progress: + a. If active task exists → cancel task, return True + b. Worker catches CancelledError, cleans up partial file + c. Worker emits download.cancelled event (cancelled_from=IN_PROGRESS) +4. If not in-progress, pool checks if queued: + a. If in queue → mark ID in _cancelled_ids set, return True + b. When worker dequeues item, checks _cancelled_ids + c. If cancelled, emits download.cancelled event (cancelled_from=QUEUED) and skips +5. If neither → return False (caller checks tracker for terminal state) +6. Manager maps pool result to CancelResult: + - True → CANCELLED + - False + terminal in tracker → ALREADY_TERMINAL + - False + not in tracker → NOT_FOUND +``` + +**Key features**: + +- Queued downloads use cooperative cancellation (checked when dequeued) +- In-progress downloads use task cancellation with cleanup +- Partial files are automatically deleted on cancellation +- Events distinguish between queued and in-progress cancellation +- Terminal downloads are not affected (files preserved) + ### Shutdown Flow The system uses an event-based shutdown mechanism for clean termination: diff --git a/docs/README.md b/docs/README.md index 25af94e..4d0673f 100644 --- a/docs/README.md +++ b/docs/README.md @@ -46,6 +46,7 @@ That's it. The manager handles worker pools, state tracking, and cleanup automat - **Command-line interface**: Simple `rheo download` command with progress display and hash validation - **Concurrent downloads**: Worker pool manages multiple downloads simultaneously - **Priority queue**: Download urgent files first +- **Selective cancellation**: Cancel individual downloads by ID (queued or in-progress) - **Hash validation**: Verify file integrity with MD5, SHA256, or SHA512 checksums - **Retry logic**: Automatic retry with exponential backoff for transient errors - **Speed & ETA tracking**: Real-time download speed with moving averages and estimated completion time @@ -446,6 +447,41 @@ async with DownloadManager(download_dir=Path("./downloads")) as manager: **Note**: The context manager (`async with`) triggers immediate shutdown on exit. It will raise `PendingDownloadsError` if there are pending downloads that weren't handled. Call `wait_until_complete()` or `close()` explicitly to avoid this. +### Selective Cancellation + +Cancel individual downloads by ID while others continue: + +```python +from rheo.domain.cancellation import CancelResult + +async with DownloadManager(download_dir=Path("./downloads")) as manager: + files = [ + FileConfig(url="https://example.com/large.zip"), + FileConfig(url="https://example.com/small.txt"), + ] + await manager.add(files) + + # Cancel specific download (works for queued or in-progress) + result = await manager.cancel(files[0].id) + + match result: + case CancelResult.CANCELLED: + print("Download cancelled successfully") + case CancelResult.ALREADY_TERMINAL: + print("Download already completed/failed") + case CancelResult.NOT_FOUND: + print("Download ID not found") + + await manager.wait_until_complete() +``` + +**Key behaviours**: + +- **Queued downloads**: Removed from queue, never start +- **In-progress downloads**: Task cancelled, partial file cleaned up +- **Terminal downloads**: Returns `ALREADY_TERMINAL` (file not deleted) +- **Events**: Emits `download.cancelled` with `cancelled_from` field (QUEUED or IN_PROGRESS) + ## Security Considerations ### Path Traversal Protection @@ -480,6 +516,7 @@ FileConfig(url="...", destination_subdir="../../../etc") - Concurrent downloads with worker pool - Priority queue system +- Selective cancellation (cancel individual downloads by ID) - Event-driven architecture with `manager.on()` subscription (returns `Subscription` handle) - Full download lifecycle events (queued, started, progress, completed, failed, skipped, cancelled, retrying, validating) - Download tracking and state management diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index 2b15f9d..a868fee 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -10,6 +10,7 @@ The library and CLI are working for real use. - Concurrent downloads with worker pool - Priority queue system +- Selective cancellation (cancel individual downloads by ID) - Event-driven architecture with `manager.on()`/`off()` subscription - Download tracking and state management - Comprehensive error handling