[RFC] Multi-shard cluster support and recovery fixes
Summary
Three targeted changes to the recovery path that fix a duplicate processing bug in the current version and enable multi-shard cluster support (Redis Enterprise and OSS Cluster):
- Replace XCLAIM/XAUTOCLAIM with XACK + XADD to LBS in both the keyspace listener and periodic recovery scan.
- Add mutex verification (EXISTS on lock key) in the periodic recovery scan before re-queuing, to prevent stealing messages from slow-but-alive consumers.
- Add
_retry_count in message payload for poison pill detection, since XADD creates new messages that lose PEL times_delivered tracking.
These changes preserve the existing two-component architecture: keyspace notifications for fast recovery, periodic scan for completeness. No new recovery layers are introduced.
What's broken today
Bug: duplicate processing of slow consumers (all topologies)
The current recoverUnackedMessages uses XAUTOCLAIM with a MinIdle threshold. XAUTOCLAIM only checks PEL idle time — it does not verify whether the consumer holding the message is still alive (i.e., whether the lock is still held). A consumer's lock heartbeat keeps the lock alive but does not reset PEL idle time.
T=0ms: Consumer A reads M1, acquires lock, starts processing
T=0-8s: Consumer A is legitimately slow (large task)
T=5s: Consumer B starts, runs recoverUnackedMessages
XAUTOCLAIM sees M1 idle for 5 seconds — transfers to Consumer B
Consumer B starts processing M1
T=8s: Consumer A finishes M1
Both consumers processed the same task
Fix: Add EXISTS check on the lock key before re-queuing. If lock exists, consumer is alive — skip.
Limitation: keyspace notifications are shard-local (multi-shard only)
Keyspace notifications are not broadcasted across shards. A subscriber on shard X only sees expirations on shard X. With N master shards, ~(N-1)/N of lock expirations are invisible. Stuck messages on unobserved shards remain in the PEL until a consumer restarts.
Fix: The periodic recovery scan (with the new mutex check) already covers this. It uses XPENDING which is key-addressed on the stream and works correctly in all topologies. No shard-specific subscriber infrastructure needed.
Changes
Change 1: XACK + XADD to LBS replaces XCLAIM/XAUTOCLAIM
When recovering a stuck message, instead of XCLAIM (which transfers ownership and creates contention), the library:
- XACKs the original message (atomic — returns 1 if first, 0 if already handled)
- If XACK returned 1: XADDs the task back to LBS as a new message
The recovered task re-enters the consumer group's normal round-robin distribution. The consumer processing it is unaware it's a recovered task.
func (c *Client) requeueMessage(ctx context.Context, msgID string) error {
acked, err := c.client.XAck(ctx, c.stream, c.group, msgID).Result()
if err != nil {
return err
}
if acked == 0 {
return nil // Another consumer/component already handled this
}
msgs, err := c.client.XRange(ctx, c.stream, msgID, msgID).Result()
if err != nil || len(msgs) == 0 {
c.logger.Warn("message trimmed before re-queue", "msg_id", msgID)
return nil
}
newValues := copyValues(msgs[0].Values)
retryCount := getRetryCount(msgs[0]) + 1
newValues["_retry_count"] = strconv.Itoa(retryCount)
if retryCount > c.config.Recovery.MaxRetries {
return c.sendToDLQ(ctx, msgID, newValues)
}
_, err = c.client.XAdd(ctx, &redis.XAddArgs{
Stream: c.stream,
Values: newValues,
}).Err()
return err
}
Why not XCLAIM:
- XCLAIM creates contention — multiple consumers race to claim the same message.
- XCLAIM (and XAUTOCLAIM) only checks idle time, not lock state — root cause of the duplicate processing bug.
- XADD to LBS lets the consumer group distribute the work. No races, no special re-delivery code path.
XACK-first provides idempotency: XACK is atomic. When multiple consumers try to re-queue the same message simultaneously, only one gets return value 1. All others get 0 and skip. Exactly one re-queue per stuck message.
Change 2: Mutex verification in periodic recovery scan
The periodic scan adds an EXISTS check on the lock key before calling requeueMessage:
func (c *Client) scanAndRecover(ctx context.Context) {
pending, _ := c.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: c.stream,
Group: c.group,
Start: "-",
End: "+",
Count: c.config.Recovery.BatchSize,
Idle: c.config.Recovery.MinIdleTime,
}).Result()
for _, msg := range pending {
lockKey := fmt.Sprintf("lock:{%s}:%s", c.stream, msg.ID)
exists, _ := c.client.Exists(ctx, lockKey).Result()
if exists == 1 {
continue // Consumer is alive, just slow
}
c.requeueMessage(ctx, msg.ID)
}
}
The keyspace listener does NOT need a mutex check. The notification itself is proof the lock expired — there is no ambiguity about consumer liveness.
The periodic scan DOES need the mutex check. It scans messages belonging to other consumers that may still be alive and processing. PEL idle time alone cannot distinguish dead consumers from slow ones.
Change 3: _retry_count for poison pill detection
Since XADD creates new messages, PEL times_delivered tracking is lost. A _retry_count field in the message payload tracks re-queue attempts. When it exceeds MaxRetries, the message is routed to a DLQ stream instead of back to LBS.
The DLQ is a Redis Stream (e.g., {stream_name}:dlq). Hash-tagged name ensures co-location. The library writes to the DLQ but does not provide a built-in DLQ consumer.
How multi-shard support works with these changes
No shard-specific infrastructure is needed. The existing two components cover all topologies:
Keyspace listener (existing): Subscribes through whatever connection is available. In single-shard: complete coverage, sub-second recovery. In multi-shard: partial coverage (~1/N), still sub-second for what it sees. No change in behavior.
Periodic recovery scan (existing, now with mutex check): Uses XPENDING and EXISTS — both key-addressed commands that route correctly through Redis Enterprise proxy and OSS ClusterClient. Catches everything the keyspace listener missed, regardless of shard topology. Runs on timer AND once on startup.
The only topology-specific concern: in OSS Cluster mode, the keyspace subscriber needs a direct connection to a master node (Pub/Sub doesn't aggregate across nodes in OSS Cluster). This requires a ClusterMode configuration option:
type ClusterMode int
const (
ClusterModeNone ClusterMode = iota // Single node / Enterprise single endpoint
ClusterModeOSS // OSS Cluster — direct node subscription for Layer 1
)
In ClusterModeOSS, each consumer subscribes to one master node (deterministic assignment). ResetTopology() is exposed for the client to call on topology changes. This only affects the keyspace listener — the periodic scan works identically in all modes.
Configuration
type RecoveryConfig struct {
// Interval between periodic reconciliation scans.
// Also runs once on startup before processing new messages.
// Default: 60 seconds.
ReconciliationInterval time.Duration
// Messages idle longer than this are candidates for recovery.
// Should be significantly larger than lock TTL.
// Default: 30 seconds.
MinIdleTime time.Duration
// Maximum messages to recover per scan cycle.
// Default: 50.
BatchSize int64
// Maximum re-queues before routing to DLQ.
// Default: 3.
MaxRetries int
// DLQ stream name. Empty disables DLQ (messages are logged and dropped).
// Recommended: "{stream_name}:dlq"
DLQStream string
// Prefix for library-managed message fields (_retry_count, _original_id).
// Default: "_"
FieldPrefix string
}
Known tradeoffs
XACK contention in periodic scan. With 20 pods scanning simultaneously and 10 stuck messages, 200 XACKs execute with 190 returning 0. XACK is O(1) and cheap, but the fan-out exists. Mitigated by natural stagger (pods boot at different times) and jitter on the scan timer. No correctness issue — just wasted reads during failure events.
XACK-XADD atomicity gap. If a consumer crashes between XACK and XADD, the message is ACKed but never re-queued — message loss. The window is microseconds. Mitigation options: (A) accept the risk, (B) Lua script for atomicity (requires lock key and stream on same shard via hash tags). Recommended: start with A, instrument with metrics, upgrade to B if observed in production.
Periodic scan latency. Expirations on unobserved shards are recovered within the reconciliation interval (default 60 seconds), not sub-second. Acceptable as a backstop — the keyspace listener handles the fast path.
Breaking changes
- XCLAIM/XAUTOCLAIM replaced with XACK + XADD to LBS.
RecoveryConfig added to StreamClientConfig.
ClusterMode added for OSS Cluster support.
- Lock keys must contain the LBS message ID (e.g.,
lock:{stream}:M1).
- Re-queued messages contain
_retry_count and _original_id fields.
- Periodic scan now checks lock existence (fixes duplicate processing bug — behavioral change).
Metrics
recovery_keyspace_requeued_total: Messages re-queued by keyspace listener
recovery_scan_requeued_total: Messages re-queued by periodic scan
recovery_scan_skipped_alive_total: Messages skipped because lock was held
recovery_duplicate_ack_total: XACK returned 0 (already handled)
recovery_dlq_total: Messages routed to DLQ
recovery_scan_duration_seconds: Time per reconciliation cycle
pel_depth: Current PEL size (gauge)
Tasks
Appendix: design exploration
This RFC is the result of an extensive design exploration. The following alternatives were considered and rejected. They are documented here for contributors who may propose similar approaches.
Alternative: dedicated aggregation stream (ksp_stream)
Per-shard subscribers write keyspace events to a dedicated stream, consumed by a separate consumer group for recovery processing.
Rejected because: If subscribers also process locally, the aggregation stream becomes a dead-letter log nobody reads. If subscribers don't process locally, ~1/N of messages take an unnecessary roundabout (delivered back to the originating subscriber). LBS itself serves as the redistribution mechanism — no separate stream needed.
Alternative: PEL scanning as sole recovery mechanism (remove keyspace notifications)
Remove keyspace notifications entirely. Use only periodic PEL scanning with mutex check for all recovery.
Rejected because: Sub-second recovery is a genuine requirement for real-time processing workloads. PEL scanning at 60-second intervals is unacceptable as the only recovery path. Keyspace notifications provide surgical, event-driven recovery with zero overhead during normal operation.
Alternative: XAUTOCLAIM polling as stopgap
Add XAUTOCLAIM-based polling alongside keyspace notifications as an interim solution before building the proper fix.
Rejected because: Implementation effort is comparable to the proper fix. XAUTOCLAIM has the same duplicate processing bug (no mutex check). It would produce throwaway code and require maintaining two recovery mechanisms temporarily.
Alternative: leader election for single scanner
Elect a single pod to run the periodic scan via distributed lock (SET NX), avoiding XACK contention across all pods.
Rejected because: Leader election introduces its own edge cases (lock holder GC pauses, split-brain, lock expiry during scanning). The complexity rivals the problem it solves. XACK contention with 20 pods is cheap (~200 O(1) commands per scan cycle during failures) and has no correctness issues.
Alternative: per-shard keyspace subscribers with topology management
Each consumer subscribes to all master nodes for full keyspace coverage. Library manages subscriber lifecycle and watches for topology changes.
Rejected because: Massive operational complexity (per-node connections, topology watching, subscriber lifecycle). The periodic scan with mutex check provides complete coverage using only standard key-addressed commands. Per-shard subscription is unnecessary when the scan backstop exists.
Alternative: XCLAIM in self-PEL recovery for efficiency
Use XCLAIM when a consumer recovers its own PEL on startup (no contention since it's self-recovery), while using XACK + XADD elsewhere.
Rejected because: The efficiency gain (avoiding one XADD per message on startup, a rare event) doesn't justify two code paths. Consistency of XACK + XADD everywhere reduces testing surface and cognitive overhead.
[RFC] Multi-shard cluster support and recovery fixes
Summary
Three targeted changes to the recovery path that fix a duplicate processing bug in the current version and enable multi-shard cluster support (Redis Enterprise and OSS Cluster):
_retry_countin message payload for poison pill detection, since XADD creates new messages that lose PELtimes_deliveredtracking.These changes preserve the existing two-component architecture: keyspace notifications for fast recovery, periodic scan for completeness. No new recovery layers are introduced.
What's broken today
Bug: duplicate processing of slow consumers (all topologies)
The current
recoverUnackedMessagesuses XAUTOCLAIM with aMinIdlethreshold. XAUTOCLAIM only checks PEL idle time — it does not verify whether the consumer holding the message is still alive (i.e., whether the lock is still held). A consumer's lock heartbeat keeps the lock alive but does not reset PEL idle time.Fix: Add EXISTS check on the lock key before re-queuing. If lock exists, consumer is alive — skip.
Limitation: keyspace notifications are shard-local (multi-shard only)
Keyspace notifications are not broadcasted across shards. A subscriber on shard X only sees expirations on shard X. With N master shards, ~(N-1)/N of lock expirations are invisible. Stuck messages on unobserved shards remain in the PEL until a consumer restarts.
Fix: The periodic recovery scan (with the new mutex check) already covers this. It uses XPENDING which is key-addressed on the stream and works correctly in all topologies. No shard-specific subscriber infrastructure needed.
Changes
Change 1: XACK + XADD to LBS replaces XCLAIM/XAUTOCLAIM
When recovering a stuck message, instead of XCLAIM (which transfers ownership and creates contention), the library:
The recovered task re-enters the consumer group's normal round-robin distribution. The consumer processing it is unaware it's a recovered task.
Why not XCLAIM:
XACK-first provides idempotency: XACK is atomic. When multiple consumers try to re-queue the same message simultaneously, only one gets return value 1. All others get 0 and skip. Exactly one re-queue per stuck message.
Change 2: Mutex verification in periodic recovery scan
The periodic scan adds an EXISTS check on the lock key before calling
requeueMessage:The keyspace listener does NOT need a mutex check. The notification itself is proof the lock expired — there is no ambiguity about consumer liveness.
The periodic scan DOES need the mutex check. It scans messages belonging to other consumers that may still be alive and processing. PEL idle time alone cannot distinguish dead consumers from slow ones.
Change 3:
_retry_countfor poison pill detectionSince XADD creates new messages, PEL
times_deliveredtracking is lost. A_retry_countfield in the message payload tracks re-queue attempts. When it exceedsMaxRetries, the message is routed to a DLQ stream instead of back to LBS.The DLQ is a Redis Stream (e.g.,
{stream_name}:dlq). Hash-tagged name ensures co-location. The library writes to the DLQ but does not provide a built-in DLQ consumer.How multi-shard support works with these changes
No shard-specific infrastructure is needed. The existing two components cover all topologies:
Keyspace listener (existing): Subscribes through whatever connection is available. In single-shard: complete coverage, sub-second recovery. In multi-shard: partial coverage (~1/N), still sub-second for what it sees. No change in behavior.
Periodic recovery scan (existing, now with mutex check): Uses XPENDING and EXISTS — both key-addressed commands that route correctly through Redis Enterprise proxy and OSS ClusterClient. Catches everything the keyspace listener missed, regardless of shard topology. Runs on timer AND once on startup.
The only topology-specific concern: in OSS Cluster mode, the keyspace subscriber needs a direct connection to a master node (Pub/Sub doesn't aggregate across nodes in OSS Cluster). This requires a
ClusterModeconfiguration option:In
ClusterModeOSS, each consumer subscribes to one master node (deterministic assignment).ResetTopology()is exposed for the client to call on topology changes. This only affects the keyspace listener — the periodic scan works identically in all modes.Configuration
Known tradeoffs
XACK contention in periodic scan. With 20 pods scanning simultaneously and 10 stuck messages, 200 XACKs execute with 190 returning 0. XACK is O(1) and cheap, but the fan-out exists. Mitigated by natural stagger (pods boot at different times) and jitter on the scan timer. No correctness issue — just wasted reads during failure events.
XACK-XADD atomicity gap. If a consumer crashes between XACK and XADD, the message is ACKed but never re-queued — message loss. The window is microseconds. Mitigation options: (A) accept the risk, (B) Lua script for atomicity (requires lock key and stream on same shard via hash tags). Recommended: start with A, instrument with metrics, upgrade to B if observed in production.
Periodic scan latency. Expirations on unobserved shards are recovered within the reconciliation interval (default 60 seconds), not sub-second. Acceptable as a backstop — the keyspace listener handles the fast path.
Breaking changes
RecoveryConfigadded toStreamClientConfig.ClusterModeadded for OSS Cluster support.lock:{stream}:M1)._retry_countand_original_idfields.Metrics
recovery_keyspace_requeued_total: Messages re-queued by keyspace listenerrecovery_scan_requeued_total: Messages re-queued by periodic scanrecovery_scan_skipped_alive_total: Messages skipped because lock was heldrecovery_duplicate_ack_total: XACK returned 0 (already handled)recovery_dlq_total: Messages routed to DLQrecovery_scan_duration_seconds: Time per reconciliation cyclepel_depth: Current PEL size (gauge)Tasks
Appendix: design exploration
This RFC is the result of an extensive design exploration. The following alternatives were considered and rejected. They are documented here for contributors who may propose similar approaches.
Alternative: dedicated aggregation stream (
ksp_stream)Per-shard subscribers write keyspace events to a dedicated stream, consumed by a separate consumer group for recovery processing.
Rejected because: If subscribers also process locally, the aggregation stream becomes a dead-letter log nobody reads. If subscribers don't process locally, ~1/N of messages take an unnecessary roundabout (delivered back to the originating subscriber). LBS itself serves as the redistribution mechanism — no separate stream needed.
Alternative: PEL scanning as sole recovery mechanism (remove keyspace notifications)
Remove keyspace notifications entirely. Use only periodic PEL scanning with mutex check for all recovery.
Rejected because: Sub-second recovery is a genuine requirement for real-time processing workloads. PEL scanning at 60-second intervals is unacceptable as the only recovery path. Keyspace notifications provide surgical, event-driven recovery with zero overhead during normal operation.
Alternative: XAUTOCLAIM polling as stopgap
Add XAUTOCLAIM-based polling alongside keyspace notifications as an interim solution before building the proper fix.
Rejected because: Implementation effort is comparable to the proper fix. XAUTOCLAIM has the same duplicate processing bug (no mutex check). It would produce throwaway code and require maintaining two recovery mechanisms temporarily.
Alternative: leader election for single scanner
Elect a single pod to run the periodic scan via distributed lock (SET NX), avoiding XACK contention across all pods.
Rejected because: Leader election introduces its own edge cases (lock holder GC pauses, split-brain, lock expiry during scanning). The complexity rivals the problem it solves. XACK contention with 20 pods is cheap (~200 O(1) commands per scan cycle during failures) and has no correctness issues.
Alternative: per-shard keyspace subscribers with topology management
Each consumer subscribes to all master nodes for full keyspace coverage. Library manages subscriber lifecycle and watches for topology changes.
Rejected because: Massive operational complexity (per-node connections, topology watching, subscriber lifecycle). The periodic scan with mutex check provides complete coverage using only standard key-addressed commands. Per-shard subscription is unnecessary when the scan backstop exists.
Alternative: XCLAIM in self-PEL recovery for efficiency
Use XCLAIM when a consumer recovers its own PEL on startup (no contention since it's self-recovery), while using XACK + XADD elsewhere.
Rejected because: The efficiency gain (avoiding one XADD per message on startup, a rare event) doesn't justify two code paths. Consistency of XACK + XADD everywhere reduces testing surface and cognitive overhead.