Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .github/workflows/publish-to-pypi.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: Publish Python Package to PyPI

on:
release:
types: [published]

permissions:
id-token: write
contents: read

jobs:
build-and-publish:
name: Build and publish Python distributions to PyPI
runs-on: ubuntu-latest
environment:
name: pypi
url: https://pypi.org/p/toller

steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.x'

- name: Install build dependencies
run: |
python -m pip install --upgrade pip
pip install build

- name: Build package
run: python -m build

- name: Publish package to PyPI using Trusted Publishing
uses: pypa/gh-action-pypi-publish@release/v1
158 changes: 157 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ Toller is a lightweight Python library designed to make your asynchronous calls

Just as the [Nova Scotia Duck Tolling Retriever](https://www.akc.org/dog-breeds/nova-scotia-duck-tolling-retriever/) lures and guides ducks, Toller "lures" unruly asynchronous tasks into well-managed, predictable flows, guiding the overall execution path and making concurrency easier to reason about.

When deploying applications that rely on numerous external async calls, managing failures (downtime, rate limits, transient errors) in a standard way becomes critical. Toller offers this standard, both for client-side calls and potentially for protecting server-side resources.
## Why Toller?

Modern applications that integrate with numerous LLMs, vector databases, and other microservices, face a constant challenge: external services can be unreliable. They might be temporarily down, enforce rate limits, or return transient errors.

Building robust applications in this environment means every external call needs careful handling, but repeating this logic for every API call leads to boilerplate, inconsistency, and often, poorly managed asynchronous processes. **Toller was built to solve this.** It provides a declarative way to add these resilience patterns.

Toller offers this standard, both for client-side calls and potentially for protecting server-side resources.

## Features

Expand Down Expand Up @@ -36,3 +42,153 @@ When deploying applications that rely on numerous external async calls, managing
```bash
pip install toller
```

## Usage and Examples

### Example 1: Basic Resilience for Generative AI Calls
<details open>
For a function that calls out to an LLM, we want to handle rate limits, retry on temporary server issues, and stop if the service is truly down.

```python
import asyncio
import random
from toller import TransientError, FatalError, MaxRetriesExceeded, OpenCircuitError

# Define potential API errors
class LLMRateLimitError(TransientError): pass
class LLMServerError(TransientError): pass
class LLMInputError(FatalError): pass # e.g., prompt too long

# Simulate an LLM call
LLM_DOWN_FOR_DEMO = 0 # Counter for demoing circuit breaker
async def call_llm_api(prompt: str):
global LLM_DOWN_FOR_DEMO
print(f"LLM API: Processing '{prompt[:20]}...' (Attempt for this task)")
await asyncio.sleep(random.uniform(0.1, 0.3)) # Network latency

if LLM_DOWN_FOR_DEMO > 0:
LLM_DOWN_FOR_DEMO -=1
print("LLM API: Simulating 503 Service Unavailable")
raise LLMServerError("LLM service is temporarily down")
if random.random() < 0.2: # 20% chance of a transient rate limit error
print("LLM API: Simulating 429 Rate Limit")
raise LLMRateLimitError("Hit LLM rate limit")
if len(prompt) < 5:
print("LLM API: Simulating 400 Bad Request (prompt too short)")
raise LLMInputError("Prompt is too short")

return f"LLM Response for '{prompt[:20]}...': Generated text."

# Apply Toller
@toller.task(
# Rate Limiter: 60 calls per minute (1 per sec), burst 5
rl_calls_per_second=1.0, # 60 RPM / 60s
rl_max_burst_calls=5,

# Retries: 3 attempts on transient LLM errors
retry_max_attempts=3,
retry_delay=1.0, # Start with 1s delay for LLM errors
retry_backoff=2.0,
retry_on_exception=(LLMRateLimitError, LLMServerError),
retry_stop_on_exception=(LLMInputError,), # Don't retry bad input

# Circuit Breaker: Opens if retries fail 2 times consecutively
cb_failure_threshold=2, # Low threshold for demo
cb_recovery_timeout=20.0, # Wait 20s before one test call
cb_expected_exception=MaxRetriesExceeded # CB trips when all retries are exhausted
)
async def get_llm_completion(prompt: str):
return await call_llm_api(prompt)

async def run_example1():
print("Example 1: Basic Resilience for Generative AI Calls")
prompts = [
"Tell me a story about a brave duck.",
"Explain async programming.",
"Short", # This will cause a FatalError (LLMInputError)
"Another valid prompt after fatal.",
"Prompt to trigger server errors 1", # Will hit retry then MaxRetriesExceeded
"Prompt to trigger server errors 2", # Will hit retry then MaxRetriesExceeded, tripping CB
"Prompt after CB should open", # Should hit OpenCircuitError
]

# Simulated LLM service downtime for the relevant prompts
global LLM_DOWN_FOR_DEMO
LLM_DOWN_FOR_DEMO = 4

for i, p in enumerate(prompts):
print(f"\Sending request: '{p}'")
try:
result = await get_llm_completion(p)
print(f"Success: {result}")
except MaxRetriesExceeded as e:
print(f"Toller: Max retries exceeded. Last error: {type(e.last_exception).__name__}: {e.last_exception}")
except OpenCircuitError as e:
print(f"Toller: Circuit is open! {e}. Further calls blocked temporarily.")
if i == len(prompts) - 2: # If this is the call just before the last one
print("Waiting for circuit breaker recovery timeout for demo...")
await asyncio.sleep(21) # Wait for CB to go HALF_OPEN
except FatalError as e:
print(f"Toller: Fatal error, no retries. Error: {type(e).__name__}: {e}")
except Exception as e:
print(f"Toller: Unexpected error. Type: {type(e).__name__}, Error: {e}")

await asyncio.sleep(0.3) # Small pause between top-level requests to see rate limiter too

if __name__ == "__main__":
asyncio.run(run_example1())
```
</details>


### Example 2: Shared Rate Limiter for Multiple Related API Calls
<details>
Often, different API endpoints for the same service share an overall rate limit.

```python
import time
from toller import CallRateLimiter # For creating a shared instance

# Assume these two functions call endpoints that share a single rate limit pool
shared_api_rl = CallRateLimiter(calls_per_second=2, max_burst_calls=2, name="MyServiceSharedRL")

@toller.task(
rate_limiter_instance=shared_api_rl,
# Disable retry/CB for this simple RL demo
enable_retry=False, enable_circuit_breaker=False
)
async def call_endpoint_a(item_id: int):
print(f"Calling A for {item_id}...")
await asyncio.sleep(0.1)
return f"A {item_id} done"

@toller.task(
rate_limiter_instance=shared_api_rl,
enable_retry=False, enable_circuit_breaker=False
)
async def call_endpoint_b(item_id: int):
print(f"Calling B for {item_id}...")
await asyncio.sleep(0.1)
return f"B {item_id} done"

async def run_example2():
print("\nExample 2: Shared Rate Limiter")
tasks = []
# These 4 calls will exceed the burst of 2 for the shared limiter (rate 2/sec), so, some will be delayed.
tasks.append(call_endpoint_a(1))
tasks.append(call_endpoint_b(1))
tasks.append(call_endpoint_a(2))
tasks.append(call_endpoint_b(2))

start_time = time.time()
results = await asyncio.gather(*tasks)
duration = time.time() - start_time

for res in results:
print(f"Shared RL Result: {res}")
print(f"Total time for 4 calls with shared RL (2/sec, burst 2): {duration:.2f}s (expected > ~1.0s)")

if __name__ == "__main__":
asyncio.run(run_example2())
```
</details>
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "toller"
version = "0.0.1"
version = "0.0.2"
description = "Intelligent async flow controller for Python - making complex asyncio workflows manageable"
readme = "README.md"
requires-python = ">=3.10"
Expand All @@ -26,7 +26,6 @@ classifiers = [
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
Expand Down