Skip to content

Commit 4b9a3e5

Browse files
authored
📝 Improve cookbook documentation (#224)
2 parents 2c59251 + 3a508f9 commit 4b9a3e5

2 files changed

Lines changed: 108 additions & 115 deletions

File tree

docs/cookbook/rabbitmq_retries.md

Lines changed: 89 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,61 @@
11
# RabbitMQ Retries with Exponential Backoff
22

3-
When a task fails, you often want to retry it automatically, but with increasing delays
4-
between attempts (exponential backoff). This prevents your system from being overwhelmed
5-
by a failing dependency and avoids wasting resources on immediate, likely-to-fail retries.
6-
7-
This cookbook explains how to implement exponential backoff using **RabbitMQ 4.x** (or
8-
3.10+), utilizing **Quorum Queues** to prevent poison messages and a smart routing
9-
topology that allows **one set of backoff queues** for all your work queues.
10-
11-
This guide draws inspiration from
12-
[Brian Storti's excellent article on the topic](https://www.brianstorti.com/rabbitmq-exponential-backoff/).
13-
14-
## The Architecture
15-
16-
RabbitMQ does not support natively delaying messages per-message without custom plugins.
17-
Instead, we simulate delays by placing a message in a queue with no consumers and a
18-
Time-To-Live (`x-message-ttl`). When the TTL expires, the message is dead-lettered to an
19-
exchange that routes it back to its original work queue.
20-
21-
To support multiple backoff delays (e.g., 10s, 60s, 300s) for multiple different work
22-
queues (e.g., `emails`, `reports`) **without** creating a separate delay queue for every
23-
work queue, we use **Topic Exchanges**.
24-
25-
1. **`retry_exchange` (Topic)**: Fails are published here. Routing key format:
26-
`delay.<delay_ms>.<original_queue_name>`.
27-
2. **Delay Queues**: E.g., `delay_10s`. Bound to `retry_exchange` with routing key
28-
`delay.10000.#`. Has `x-message-ttl=10000` and its `x-dead-letter-exchange` is set to
29-
`requeue_exchange`.
30-
3. **`requeue_exchange` (Topic)**: Receives expired messages from delay queues.
31-
4. **Work Queues**: E.g., `emails`. Bound to `requeue_exchange` with routing key
32-
`*.*.emails`. Configured as a **Quorum Queue** with `x-delivery-limit` to prevent
33-
immediate poison message loops (e.g. if the worker hard crashes).
3+
When a task fails, retrying it with increasing delays (exponential backoff) prevents your
4+
system from being overwhelmed by failing dependencies.
5+
6+
This cookbook explains how to implement exponential backoff using **RabbitMQ 4.x**
7+
(or 3.13+ by enabling plugin `rabbitmq-plugins enable rabbitmq_amqp1_0`).
8+
It uses **Quorum Queues** to prevent poison messages and a smart **Topic Exchange** topology
9+
that shares a single set of delay queues across all your work queues.
10+
11+
_(Inspired by [Brian Storti's article](https://www.brianstorti.com/rabbitmq-exponential-backoff/))_
12+
13+
## The Architecture & Message Flow
14+
15+
RabbitMQ lacks native per-message delays. We simulate delays using queues with a Time-To-Live
16+
(`x-message-ttl`). When the TTL expires, messages are dead-lettered back to an exchange.
17+
18+
Creating a dedicated delay queue for _every_ work queue and delay interval is inefficient.
19+
Instead, we use **Topic Exchanges** to share delay queues.
20+
21+
Here is the lifecycle of a retried message:
22+
23+
```mermaid
24+
graph TD
25+
WQ[(my_work_queue)] -->|Consumes| W[Worker]
26+
27+
W -->|1st Fail Routing Key:<br/>delay.10000.my_work_queue| RX{{retry_exchange}}
28+
W -->|2nd Fail Routing Key:<br/>delay.60000.my_work_queue| RX
29+
30+
RX -->|Binding:<br/>delay.10000.#| DQ1[(delay_10s<br/>TTL: 10s)]
31+
RX -->|Binding:<br/>delay.60000.#| DQ2[(delay_60s<br/>TTL: 60s)]
32+
33+
DQ1 -.->|Dead-letter<br/>Routing Key preserved| RQX{{requeue_exchange}}
34+
DQ2 -.->|Dead-letter<br/>Routing Key preserved| RQX
35+
36+
RQX -->|Binding:<br/>*.*.my_work_queue| WQ
37+
```
38+
39+
1. **Fail**: A worker fails to process a message. The application calculates the delay, increments
40+
the retry counter, and publishes it to `retry_exchange` with a routing key like
41+
`delay.10000.my_work_queue` (or `delay.60000.my_work_queue` for the next attempt).
42+
2. **Wait**: The exchange routes it to the corresponding shared delay queue (e.g., `delay_10s` or
43+
`delay_60s`) via the wildcard bindings. These queues have no consumers and a matching
44+
`x-message-ttl`.
45+
3. **Expire**: The TTL expires. RabbitMQ dead-letters the message to `requeue_exchange`,
46+
preserving the original routing key (e.g., `delay.10000.my_work_queue`).
47+
4. **Requeue**: `requeue_exchange` routes the message back to `my_work_queue` via the wildcard
48+
binding `*.*.my_work_queue`.
3449

3550
## Setting Up the Topology
3651

37-
Because Repid's AMQP server is designed for message processing rather than broker
38-
administration, you should set up your RabbitMQ topology using your preferred tool. For
39-
example, you can use RabbitMQ's native Definitions JSON format, which can be applied via
40-
the HTTP API.
52+
Configure this topology using your preferred administration tool. Below is an example using
53+
RabbitMQ's native Definitions JSON format.
4154

42-
First, create a `definitions.json` file. This declarative format defines the exchanges,
43-
queues, and bindings exactly as described above:
55+
Create a `definitions.json` file:
4456

4557
```json
4658
{
47-
"rabbit_version": "4.0.0",
4859
"vhosts": [{ "name": "/" }],
4960
"exchanges": [
5061
{
@@ -126,7 +137,7 @@ You can upload this configuration using the RabbitMQ Management HTTP API via `cu
126137
(assuming the management plugin `rabbitmq_management` is enabled):
127138

128139
```bash
129-
curl -i -u guest:guest -H "content-type:application/json" -X POST \ # (1)
140+
curl -i -u guest:guest -H "content-type:application/json" -X POST \ # (1)!
130141
-d @definitions.json http://localhost:15672/api/definitions
131142
```
132143

@@ -137,24 +148,12 @@ configuration file in `rabbitmq.conf` to this JSON file)._
137148

138149
## The Decorator Implementation
139150

140-
To keep our actors clean and reusable, we can encapsulate the entire retry, delay
141-
calculation, and republishing logic into a Python decorator.
142-
143-
Because Repid relies on `inspect.signature()` to resolve Dependency Injection arguments
144-
(like parsing your JSON payload into Pydantic models or injecting `Message`), we **must**
145-
explicitly manipulate the `__signature__` attribute of our wrapper. If we don't, Repid
146-
either won't see your original arguments (breaking payload parsing) or won't see the
147-
injected `Message` parameter.
148-
149-
By using `typing.Concatenate` and `ParamSpec`, we ensure the wrapper is perfectly
150-
type-safe for your IDE, while dynamically injecting the `message` parameter so Repid knows
151-
to provide it at runtime.
151+
To keep our actors clean and reusable, we can encapsulate the entire retry, delay calculation, and
152+
republishing logic into a single Python decorator.
152153

153-
By using Repid's default `confirmation_mode="auto"`, this decorator works elegantly: if it
154-
catches an exception, publishes a retry message, and suppresses the error by returning
155-
normally, Repid will automatically `ack` the original message. If retries are exhausted,
156-
it simply re-raises the exception so Repid can handle it according to the `on_error`
157-
policy (e.g., rejecting or dead-lettering it).
154+
This decorator dynamically merges the `Message` dependency into your actor's signature so Repid can
155+
inject it at runtime. It leverages Repid's default `confirmation_mode="auto"` to elegantly handle
156+
successes (auto-ack) and exhausted retries (re-raise for `on_error` handling).
158157

159158
```python
160159
import inspect
@@ -182,18 +181,14 @@ def with_rabbitmq_retries(
182181
backoff_delays = [10_000, 60_000, 300_000, 600_000, 1_800_000]
183182

184183
def decorator(
185-
func: Callable[P, Awaitable[R]]
184+
func: Callable[P, Awaitable[R]],
186185
) -> Callable[Concatenate[Message, P], Awaitable[R]]:
187-
# 1. We must dynamically merge the signature so Repid injects BOTH
188-
# your custom payload arguments AND the `Message` dependency.
189-
# Without this, @wraps hides `message`, or removing @wraps hides your payload args!
190-
sig = inspect.signature(func)
186+
sig = inspect.signature(func) # (1)!
191187

192-
# Add `message: Message` as a required parameter if it isn't already there
193188
if "message" not in sig.parameters:
194189
new_params = [
195-
inspect.Parameter("message", inspect.Parameter.KEYWORD_ONLY, annotation=Message),
196-
*list(sig.parameters.values())
190+
*list(sig.parameters.values()),
191+
inspect.Parameter("message", inspect.Parameter.KEYWORD_ONLY, annotation=Message)
197192
]
198193
new_sig = sig.replace(parameters=new_params)
199194
else:
@@ -202,60 +197,65 @@ def with_rabbitmq_retries(
202197
@wraps(func)
203198
async def wrapper(message: Message, *args: P.args, **kwargs: P.kwargs) -> R:
204199
try:
205-
# 2. Execute the original actor code
206-
# If the original func didn't explicitly request 'message', we don't pass it down.
207200
if "message" in sig.parameters:
208201
kw = cast(dict[str, Any], kwargs)
209202
kw["message"] = message
210-
return await func(*args, **kw) # type: ignore[arg-type]
203+
return await func(*args, **kw) # (2)!
211204
else:
212205
return await func(*args, **kwargs)
213206

214-
except retry_exceptions as e:
215-
# 3. Handle the failure and calculate backoff
216-
# Any exception NOT in retry_exceptions will bypass this block and be
217-
# immediately handled by Repid's on_error policy (no retries).
207+
except retry_exceptions as e: # (3)!
218208
headers = message.headers or {}
219209
retry_count = int(headers.get("x-retry-count", 0))
220210

221211
if retry_count >= max_retries:
222212
print(f"Max retries ({max_retries}) reached for message {message.message_id}")
223-
# Re-raise the exception so Repid's auto mode acts on it
224-
raise
213+
raise # (4)!
225214

226215
delay_index = min(retry_count, len(backoff_delays) - 1)
227216
delay_ms = backoff_delays[delay_index]
228217

229218
print(f"Task failed: {e}. Retrying in {delay_ms}ms (Attempt {retry_count + 1})")
230219

231-
# 4. Republish to the delay exchange
232220
new_headers = headers.copy()
233221
new_headers["x-retry-count"] = str(retry_count + 1)
234222

235-
# We explicitly construct the AMQP 1.0 address for a RabbitMQ exchange
236-
# Format: /exchanges/<exchange_name>/<routing_key>
237-
# message.channel contains the original queue name
238-
amqp_to_address = f"/exchanges/{retry_exchange}/delay.{delay_ms}.{message.channel}"
223+
amqp_to_address = (
224+
f"/exchanges/{retry_exchange}/delay.{delay_ms}.{message.channel}" # (5)!
225+
)
239226

240227
await message.send_message(
241-
channel=message.channel, # The channel doesn't matter since `to` overrides it
228+
channel=message.channel,
242229
payload=message.payload,
243230
content_type=message.content_type,
244231
headers=new_headers,
245232
server_specific_parameters={"to": amqp_to_address},
246233
)
247234

248-
# Return normally to suppress the exception.
249-
# In auto mode, this causes Repid to ACK the original message!
250-
return None # type: ignore[return-value]
235+
return None # (6)!
251236

252-
# Apply the merged signature to the wrapper so Repid's DI can parse it
253-
wrapper.__signature__ = new_sig # type: ignore[attr-defined]
237+
wrapper.__signature__ = new_sig # (7)!
254238
return wrapper
255239

256240
return decorator
257241
```
258242

243+
1. We must dynamically merge the signature so Repid injects BOTH your custom payload arguments
244+
AND the `Message` dependency. Without this, `@wraps` hides `message`,
245+
or removing `@wraps` hides your payload args!
246+
2. Execute the original actor code, safely omitting `message` if they didn't explicitly request
247+
it in their signature.
248+
3. Handle the failure and calculate backoff. Any exception NOT in `retry_exceptions` will bypass
249+
this block and be immediately handled by Repid's `on_error` policy (no retries).
250+
4. Re-raise the exception so Repid's auto mode catches it and naturally nacks/rejects
251+
it based on your actor settings.
252+
5. Explicitly construct the AMQP 1.0 address for a RabbitMQ exchange
253+
(format: `/exchanges/<exchange_name>/<routing_key>`).
254+
6. Return normally to suppress the exception. In auto mode, this causes Repid
255+
to automatically `ack` the original message!
256+
7. Apply the merged signature to the wrapper so Repid's DI parser knows
257+
`message` needs to be provided.
258+
259259
### Using the Decorator
260260

261261
Now, your actual actor implementation becomes incredibly clean. You only need to focus on
@@ -270,20 +270,11 @@ async def process_task(data: dict) -> None:
270270
print(f"Processing data: {data}")
271271

272272
if data.get("bad_payload"):
273-
# This will NOT be retried. It immediately propagates to Repid and gets NACK-ed.
274-
raise ValueError("Invalid data format")
273+
raise ValueError("Invalid data format") # (1)!
275274

276-
# This WILL be caught by the decorator and retried with backoff.
277-
raise ConnectionError("Temporary API failure")
275+
raise ConnectionError("Temporary API failure") # (2)!
278276
```
279277

280-
## Summary
281-
282-
By combining **Topic Exchanges**, **Message TTLs**, and **Repid's extensible Server
283-
architecture**:
284-
285-
1. You can exponentially back off failed jobs without overwhelming your consumers.
286-
2. A **single** set of backoff queues handles delays for **all** your work queues
287-
automatically.
288-
3. You maintain safety against unhandled application crashes via Quorum Queue
289-
`x-delivery-limit`.
278+
1. This will NOT be retried. It bypasses our decorator exception block
279+
and immediately propagates to Repid to be handled (e.g. NACK-ed or Dead Lettered).
280+
2. This WILL be caught by the decorator and retried with exponential backoff!

docs/cookbook/sentry_middleware.md

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,25 @@ async def sentry_actor_middleware(
2828
message: ReceivedMessageT,
2929
actor: ActorData,
3030
) -> T:
31-
# 1. Isolate the context for this specific task execution
32-
with sentry_sdk.new_scope() as scope:
33-
# 2. Add useful contextual tags
34-
scope.set_tag("repid.channel", message.channel)
31+
with sentry_sdk.new_scope() as scope: # (1)!
32+
scope.set_tag("repid.channel", message.channel) # (2)!
3533
scope.set_tag("repid.actor", actor.name)
3634
if message.message_id:
3735
scope.set_tag("repid.message_id", message.message_id)
3836

3937
try:
40-
# 3. Execute the actor
41-
return await call_next(message, actor)
38+
return await call_next(message, actor) # (3)!
4239

4340
except Exception as e:
44-
# 4. Capture the exception before letting it bubble up to Repid
45-
sentry_sdk.capture_exception(e)
41+
sentry_sdk.capture_exception(e) # (4)!
4642
raise
4743
```
4844

45+
1. Isolate the context for this specific task execution so tags don't bleed into other concurrent tasks.
46+
2. Add useful contextual tags that will appear in your Sentry dashboard.
47+
3. Execute the actor (and any subsequent middlewares).
48+
4. Capture the exception before letting it bubble up to Repid's internal error handler.
49+
4950
## Registering the Middleware
5051

5152
Once defined, you apply the middleware to your Repid application, router, or specific
@@ -56,20 +57,21 @@ errors.
5657
import sentry_sdk
5758
from repid import Repid, Router
5859

59-
# Initialize Sentry
60-
sentry_sdk.init()
60+
sentry_sdk.init() # (1)!
6161

62-
# Register the middleware globally
63-
app = Repid(actor_middlewares=[sentry_actor_middleware])
62+
app = Repid(actor_middlewares=[sentry_actor_middleware]) # (2)!
6463

6564
router = Router()
6665

67-
# Now any actor on this router will automatically report errors!
68-
@router.actor(channel="my_queue")
66+
@router.actor(channel="my_queue") # (3)!
6967
async def process_task(data: dict) -> None:
7068
print("Processing task...")
7169

72-
# Any exceptions here will immediately show up in your Sentry dashboard!
73-
# They will include the channel, actor name, and message ID tags!
74-
raise ValueError("Something went wrong!")
70+
raise ValueError("Something went wrong!") # (4)!
7571
```
72+
73+
1. Initialize Sentry.
74+
2. Register the middleware globally on the Repid app so all routers inherit it.
75+
3. Any actor on this router will now automatically report errors.
76+
4. Any exceptions here will immediately show up in your Sentry dashboard!
77+
They will include the channel, actor name, and message ID tags.

0 commit comments

Comments
 (0)