Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ asyncio.run(main())

- Concurrent downloads with worker pool
- Priority queue
- Selective cancellation (cancel individual downloads by ID)
- Hash validation (MD5, SHA256, SHA512)
- Retry logic with exponential backoff
- Real-time speed & ETA tracking
Expand Down
37 changes: 35 additions & 2 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ Key pieces:
- `FileExistsStrategy`: Enum for handling existing files (SKIP, OVERWRITE, ERROR)
- `DownloadInfo`: Current state of a download (includes final average speed and validation state)
- `DownloadStatus`: Enum for states (queued, pending, in_progress, completed, failed, skipped, cancelled)
- `DownloadStats`: Aggregated statistics
- `DownloadStats`: Aggregated statistics (includes cancelled count)
- `CancelResult`: Enum for cancel operation results (CANCELLED, NOT_FOUND, ALREADY_TERMINAL)
- `CancelledFrom`: Enum indicating cancellation source (QUEUED, IN_PROGRESS)
- `HashConfig`: Hash validation configuration (algorithm and expected hash)
- `HashAlgorithm`: Supported hash algorithms (MD5, SHA256, SHA512)
- `ValidationResult`: Hash validation result with `is_valid` property (algorithm, expected/calculated hash, duration)
Expand Down Expand Up @@ -101,6 +103,7 @@ Key pieces:
- Handles graceful vs immediate shutdown semantics
- Re-queues unstarted downloads during shutdown
- Maintains task lifecycle and cleanup
- Supports selective cancellation of individual downloads (queued or in-progress)

**DownloadWorker**:

Expand Down Expand Up @@ -170,7 +173,7 @@ Key pieces:
- `DownloadCompletedEvent` - Success (includes destination_path, elapsed_seconds, average_speed_bps, optional `ValidationResult`)
- `DownloadFailedEvent` - Failure (includes `ErrorInfo`, optional `ValidationResult` for hash mismatches)
- `DownloadSkippedEvent` - Skipped due to file-exists strategy (includes reason, destination_path)
- `DownloadCancelledEvent` - Cancelled by caller
- `DownloadCancelledEvent` - Cancelled by caller (includes `cancelled_from`: QUEUED or IN_PROGRESS)
- `DownloadRetryingEvent` - Before retry (with retry count, delay, `ErrorInfo`)
- `DownloadValidatingEvent` - Validation started (algorithm)
- `ErrorInfo`: Structured error model with `exc_type`, `message`, optional `traceback`
Expand Down Expand Up @@ -389,6 +392,36 @@ config4 = FileConfig(url="https://example.com/file.zip", destination_subdir="dir

Note: Validation events still use `worker.*` namespace and will be renamed to `download.*` in a future release.

### Cancellation Flow

Selective cancellation allows stopping individual downloads without affecting others:

```text
1. User calls manager.cancel(download_id)
2. Manager delegates to pool.cancel(download_id)
3. Pool checks if download is in-progress:
a. If active task exists → cancel task, return True
b. Worker catches CancelledError, cleans up partial file
c. Worker emits download.cancelled event (cancelled_from=IN_PROGRESS)
4. If not in-progress, pool checks if queued:
a. If in queue → mark ID in _cancelled_ids set, return True
b. When worker dequeues item, checks _cancelled_ids
c. If cancelled, emits download.cancelled event (cancelled_from=QUEUED) and skips
5. If neither → return False (caller checks tracker for terminal state)
6. Manager maps pool result to CancelResult:
- True → CANCELLED
- False + terminal in tracker → ALREADY_TERMINAL
- False + not in tracker → NOT_FOUND
```

**Key features**:

- Queued downloads use cooperative cancellation (checked when dequeued)
- In-progress downloads use task cancellation with cleanup
- Partial files are automatically deleted on cancellation
- Events distinguish between queued and in-progress cancellation
- Terminal downloads are not affected (files preserved)

### Shutdown Flow

The system uses an event-based shutdown mechanism for clean termination:
Expand Down
37 changes: 37 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ That's it. The manager handles worker pools, state tracking, and cleanup automat
- **Command-line interface**: Simple `rheo download` command with progress display and hash validation
- **Concurrent downloads**: Worker pool manages multiple downloads simultaneously
- **Priority queue**: Download urgent files first
- **Selective cancellation**: Cancel individual downloads by ID (queued or in-progress)
- **Hash validation**: Verify file integrity with MD5, SHA256, or SHA512 checksums
- **Retry logic**: Automatic retry with exponential backoff for transient errors
- **Speed & ETA tracking**: Real-time download speed with moving averages and estimated completion time
Expand Down Expand Up @@ -446,6 +447,41 @@ async with DownloadManager(download_dir=Path("./downloads")) as manager:

**Note**: The context manager (`async with`) triggers immediate shutdown on exit. It will raise `PendingDownloadsError` if there are pending downloads that weren't handled. Call `wait_until_complete()` or `close()` explicitly to avoid this.

### Selective Cancellation

Cancel individual downloads by ID while others continue:

```python
from rheo.domain.cancellation import CancelResult

async with DownloadManager(download_dir=Path("./downloads")) as manager:
files = [
FileConfig(url="https://example.com/large.zip"),
FileConfig(url="https://example.com/small.txt"),
]
await manager.add(files)

# Cancel specific download (works for queued or in-progress)
result = await manager.cancel(files[0].id)

match result:
case CancelResult.CANCELLED:
print("Download cancelled successfully")
case CancelResult.ALREADY_TERMINAL:
print("Download already completed/failed")
case CancelResult.NOT_FOUND:
print("Download ID not found")

await manager.wait_until_complete()
```

**Key behaviours**:

- **Queued downloads**: Removed from queue, never start
- **In-progress downloads**: Task cancelled, partial file cleaned up
- **Terminal downloads**: Returns `ALREADY_TERMINAL` (file not deleted)
- **Events**: Emits `download.cancelled` with `cancelled_from` field (QUEUED or IN_PROGRESS)

## Security Considerations

### Path Traversal Protection
Expand Down Expand Up @@ -480,6 +516,7 @@ FileConfig(url="...", destination_subdir="../../../etc")

- Concurrent downloads with worker pool
- Priority queue system
- Selective cancellation (cancel individual downloads by ID)
- Event-driven architecture with `manager.on()` subscription (returns `Subscription` handle)
- Full download lifecycle events (queued, started, progress, completed, failed, skipped, cancelled, retrying, validating)
- Download tracking and state management
Expand Down
1 change: 1 addition & 0 deletions docs/ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The library and CLI are working for real use.

- Concurrent downloads with worker pool
- Priority queue system
- Selective cancellation (cancel individual downloads by ID)
- Event-driven architecture with `manager.on()`/`off()` subscription
- Download tracking and state management
- Comprehensive error handling
Expand Down
32 changes: 32 additions & 0 deletions src/rheo/downloads/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import aiofiles.os

from ..domain.cancellation import CancelResult
from ..domain.downloads import DownloadInfo, DownloadStats
from ..domain.exceptions import ManagerNotInitialisedError, PendingDownloadsError
from ..domain.file_config import FileConfig, FileExistsStrategy
Expand Down Expand Up @@ -361,6 +362,37 @@ async def wait_until_complete(self, timeout: float | None = None) -> None:
else:
await self.queue.join()

async def cancel(self, download_id: str) -> CancelResult:
"""Cancel a specific download by ID.

Cancels both in-progress and queued downloads. Workers continue
processing remaining queue items after cancellation.

Args:
download_id: The ID of the download to cancel (from FileConfig.id)

Returns:
CancelResult indicating what happened:
- CANCELLED: Download was found and cancelled
- NOT_FOUND: Download ID was never queued
- ALREADY_TERMINAL: Download already completed/failed/skipped/cancelled

Example:
result = await manager.cancel(file_config.id)
if result == CancelResult.CANCELLED:
print("Download stopped")
"""
# Try to cancel via pool (handles active and queued)
if await self._worker_pool.cancel(download_id):
return CancelResult.CANCELLED

# Not active or queued - check if it's in a terminal state
info = self._tracker.get_download_info(download_id)
if info is not None and info.status.is_terminal:
return CancelResult.ALREADY_TERMINAL

return CancelResult.NOT_FOUND

async def open(self) -> None:
"""Manually initialise the manager.

Expand Down
8 changes: 8 additions & 0 deletions src/rheo/downloads/worker_pool/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ def is_running(self) -> bool:
@abstractmethod
def create_worker(self, client: BaseHttpClient) -> BaseWorker:
"""Create a fresh worker with isolated dependencies for each task."""

@abstractmethod
async def cancel(self, download_id: str) -> bool:
"""Cancel a specific download if it is active or queued.

Returns True if found and cancelled, False if the download is not
in the pool's scope (caller should check tracker).
"""
139 changes: 131 additions & 8 deletions src/rheo/downloads/worker_pool/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from enum import StrEnum
from pathlib import Path

from ...domain.cancellation import CancelledFrom
from ...domain.exceptions import WorkerPoolAlreadyStartedError
from ...domain.file_config import FileConfig, FileExistsStrategy
from ...events import EventEmitter
from ...events import DownloadCancelledEvent, EventEmitter
from ...events.base import BaseEmitter
from ...infrastructure.http import BaseHttpClient
from ..queue import PriorityDownloadQueue
Expand Down Expand Up @@ -113,6 +114,10 @@ def __init__(
}
self._file_exists_strategy = file_exists_strategy
self._emitter = emitter or EventEmitter(self._logger)
# Track active download tasks by download_id, used for cancellation.
self._active_download_tasks: dict[str, asyncio.Task[None]] = {}
# IDs of downloads cancelled while queued, checked before starting download.
self._cancelled_ids: set[str] = set()

@property
def active_tasks(self) -> tuple[asyncio.Task[None], ...]:
Expand All @@ -122,6 +127,11 @@ def active_tasks(self) -> tuple[asyncio.Task[None], ...]:
"""
return tuple(self._worker_tasks)

@property
def active_download_tasks(self) -> dict[str, asyncio.Task[None]]:
"""Snapshot of currently active download tasks by download_id."""
return self._active_download_tasks.copy()

@property
def is_running(self) -> bool:
"""True if pool has been started and not yet stopped."""
Expand Down Expand Up @@ -165,14 +175,44 @@ async def shutdown(self, wait_for_current: bool = True) -> None:
self._request_shutdown()

if not wait_for_current:
# Cancel all worker tasks for immediate shutdown
# Cancel all tasks (downloads and workers) for immediate shutdown
for task in self._active_download_tasks.values():
task.cancel()
for task in self._worker_tasks:
task.cancel()

# Wait for workers to finish (either gracefully or after cancellation)
# and clean up task references
await self._wait_for_workers_and_clear()

async def cancel(self, download_id: str) -> bool:
"""Cancel a specific download by ID.

Handles both in-progress downloads (via task cancellation) and
queued downloads (via cooperative cancellation).

Args:
download_id: The download ID to cancel

Returns:
True if download was active or queued and cancelled.
False if download_id is not known to the pool (may be terminal or
never existed - caller should check tracker).
"""
# Check if download is currently in progress
task = self._active_download_tasks.get(download_id)
if task is not None:
task.cancel()
return True

# Check if download is queued but not yet started
if download_id in self.queue._queued_ids:
self._cancelled_ids.add(download_id)
return True

# Not in pool's scope so let caller determine why
return False

def _request_shutdown(self) -> None:
"""Signal workers to stop accepting new work.

Expand Down Expand Up @@ -243,16 +283,20 @@ async def _process_queue(self, worker: BaseWorker) -> None:
if await self._handle_shutdown_and_requeue(file_config):
break

# Check if this download was cancelled while queued
if await self._handle_cancelled_queued(file_config):
continue

self._logger.debug(
f"Downloading {file_config.url} to {destination_path}"
)
await worker.download(
str(file_config.url),
destination_path,
download_id=file_config.id,
hash_config=file_config.hash_config,
file_exists_strategy=file_config.file_exists_strategy,

cancelled = await self._execute_download(
worker, file_config, destination_path
)
if cancelled:
continue

self._logger.debug(
f"Downloaded {file_config.url} to {destination_path}"
)
Expand Down Expand Up @@ -307,6 +351,85 @@ async def _handle_shutdown_and_requeue(self, file_config: FileConfig) -> bool:
self.queue.task_done(file_config.id)
return shutdown_is_set

async def _handle_cancelled_queued(self, file_config: FileConfig) -> bool:
"""Check if download was cancelled while queued and handle if so.

If the download ID is in _cancelled_ids, emits a cancelled event,
and signals caller to skip this download.

Args:
file_config: The file configuration to check

Returns:
True if download was cancelled (caller should continue), False otherwise
"""
if file_config.id not in self._cancelled_ids:
return False

self._cancelled_ids.discard(file_config.id)

self._logger.debug(f"Skipping cancelled download: {file_config.url}")

await self._emitter.emit(
"download.cancelled",
DownloadCancelledEvent(
download_id=file_config.id,
url=str(file_config.url),
cancelled_from=CancelledFrom.QUEUED,
),
)

return True

async def _execute_download(
self,
worker: BaseWorker,
file_config: FileConfig,
destination_path: Path,
) -> bool:
"""Execute download as a tracked task with cancellation support.

Creates an asyncio task for the download, tracks it in _active_download_tasks,
and handles cancellation. Individual download cancellation (via pool.cancel())
is distinguished from worker shutdown cancellation.

Args:
worker: The worker instance to use for the download
file_config: Configuration for the file to download
destination_path: Local path to save the file

Returns:
True if the download was cancelled (caller should continue to next item),
False if completed normally

Raises:
asyncio.CancelledError: If the worker task itself was cancelled (shutdown)
"""
download_task = asyncio.create_task(
worker.download(
str(file_config.url),
destination_path,
download_id=file_config.id,
hash_config=file_config.hash_config,
file_exists_strategy=file_config.file_exists_strategy,
)
)
self._active_download_tasks[file_config.id] = download_task

try:
await download_task
return False
except asyncio.CancelledError:
# Distinguish between individual download cancellation and worker shutdown.
# If download_task.cancelled() is True, this specific download was cancelled
# via pool.cancel(). Otherwise, the worker task itself was cancelled.
if download_task.cancelled():
self._logger.debug(f"Download cancelled: {file_config.url}")
return True
raise
finally:
self._active_download_tasks.pop(file_config.id, None)

async def _wait_for_workers_and_clear(self) -> None:
"""Wait for all worker tasks to complete and clear the task list.

Expand Down
Loading