From 3998b9883872b35e47a59be4359db3ec8b3df696 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 19 May 2025 17:02:07 -0700 Subject: [PATCH] queue: add new BackpressureQueue[T] variant In this commit, we add a new type of queue: the back pressure queue. This is a bounded queue based on a simple channel, that will consult a predicate to decide if we should preemptively drop a message or not. We then provide a sample predicate for this use case, based on random early dropping. Given a min and max threshold, we'll start to drop message randomly once we get past the min threshold, ramping up to the max threshold where we'll start to always drop the message. --- queue/back_pressure.go | 153 +++++++++++++++ queue/back_pressure_test.go | 366 ++++++++++++++++++++++++++++++++++++ queue/go.mod | 15 +- queue/go.sum | 25 +++ 4 files changed, 558 insertions(+), 1 deletion(-) create mode 100644 queue/back_pressure.go create mode 100644 queue/back_pressure_test.go diff --git a/queue/back_pressure.go b/queue/back_pressure.go new file mode 100644 index 00000000000..7e916b215a5 --- /dev/null +++ b/queue/back_pressure.go @@ -0,0 +1,153 @@ +package queue + +import ( + "context" + "errors" + "math/rand" + + "github.com/lightningnetwork/lnd/fn/v2" +) + +// DropPredicate decides whether to drop an item when the queue is full. +// It receives the current queue length and the item, and returns true to drop, +// false to enqueue. +type DropPredicate[T any] func(queueLen int, item T) bool + +// ErrQueueFullAndDropped is returned by Enqueue when the item is dropped +// due to the DropPredicate. +var ErrQueueFullAndDropped = errors.New("queue full and item dropped") + +// BackpressureQueue is a generic, fixed-capacity queue with predicate-based +// drop behavior. When full, it uses the DropPredicate to perform early drops +// (e.g., RED-style). +type BackpressureQueue[T any] struct { + ch chan T + dropPredicate DropPredicate[T] +} + +// NewBackpressureQueue creates a new BackpressureQueue with the given capacity +// and drop predicate. +func NewBackpressureQueue[T any](capacity int, + predicate DropPredicate[T]) *BackpressureQueue[T] { + + return &BackpressureQueue[T]{ + ch: make(chan T, capacity), + dropPredicate: predicate, + } +} + +// Enqueue attempts to add an item to the queue, respecting context +// cancellation. Returns ErrQueueFullAndDropped if dropped, or context error if +// ctx is done before enqueue. Otherwise, `nil` is returned on success. +func (q *BackpressureQueue[T]) Enqueue(ctx context.Context, + item T) error { + + // First, consult the drop predicate based on the current queue length. + // If the predicate decides to drop the item, return an error. + if q.dropPredicate(len(q.ch), item) { + return ErrQueueFullAndDropped + } + + // If the predicate decides not to drop, attempt to enqueue the item. + select { + case q.ch <- item: + return nil + + default: + // Channel is full, and the predicate decided not to drop. We + // must block until space is available or context is cancelled. + select { + case q.ch <- item: + return nil + + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// Dequeue retrieves the next item from the queue, blocking until available or +// context done. Returns the item or an error if ctx is done before an item is +// available. +func (q *BackpressureQueue[T]) Dequeue(ctx context.Context) fn.Result[T] { + select { + + case item := <-q.ch: + return fn.Ok(item) + + case <-ctx.Done(): + return fn.Err[T](ctx.Err()) + } +} + +// redConfig holds configuration for RandomEarlyDrop. +type redConfig struct { + randSrc func() float64 +} + +// REDOption is a functional option for configuring RandomEarlyDrop. +type REDOption func(*redConfig) + +// WithRandSource provides a custom random number source (a function that +// returns a float64 between 0.0 and 1.0). +func WithRandSource(src func() float64) REDOption { + return func(cfg *redConfig) { + cfg.randSrc = src + } +} + +// RandomEarlyDrop returns a DropPredicate that implements Random Early +// Detection (RED), inspired by TCP-RED queue management. +// +// RED prevents sudden buffer overflows by proactively dropping packets before +// the queue is full. It establishes two thresholds: +// +// 1. minThreshold: queue length below which no drops occur. +// 2. maxThreshold: queue length at or above which all items are dropped. +// +// Between these points, the drop probability p increases linearly: +// +// p = (queueLen - minThreshold) / (maxThreshold - minThreshold) +// +// For example, with minThreshold=15 and maxThreshold=35: +// - At queueLen=15, p=0.0 (0% drop chance) +// - At queueLen=25, p=0.5 (50% drop chance) +// - At queueLen=35, p=1.0 (100% drop chance) +// +// This smooth ramp helps avoid tail-drop spikes, smooths queue occupancy, +// and gives early back-pressure signals to senders. +func RandomEarlyDrop[T any](minThreshold, maxThreshold int, + opts ...REDOption) DropPredicate[T] { + + cfg := redConfig{ + randSrc: rand.Float64, + } + + for _, opt := range opts { + opt(&cfg) + } + if cfg.randSrc == nil { + panic("randSrc cannot be nil") + } + + return func(queueLen int, _ T) bool { + // If the queue is below the minimum threshold, then we never + // drop. + if queueLen < minThreshold { + return false + } + + // If the queue is at or above the maximum threshold, then we + // always drop. + if queueLen >= maxThreshold { + return true + } + + // In between the thresholds, linearly scale the drop + // probability. + denominator := float64(maxThreshold - minThreshold) + p := float64(queueLen-minThreshold) / denominator + + return cfg.randSrc() < p + } +} diff --git a/queue/back_pressure_test.go b/queue/back_pressure_test.go new file mode 100644 index 00000000000..e24a9b47298 --- /dev/null +++ b/queue/back_pressure_test.go @@ -0,0 +1,366 @@ +package queue + +import ( + "context" + "errors" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + "pgregory.net/rapid" +) + +// queueMachine is the generic state machine logic for testing +// BackpressureQueue. T must be comparable for use in assertions. +type queueMachine[T comparable] struct { + tb rapid.TB + + capacity int + + queue *BackpressureQueue[T] + + modelQueue []T + + dropPredicate DropPredicate[T] + + itemGenerator *rapid.Generator[T] +} + +// Enqueue is a state machine action. It enqueues an item and updates the model. +func (m *queueMachine[T]) Enqueue(t *rapid.T) { + item := m.itemGenerator.Draw(t, "item") + + err := m.queue.Enqueue(context.Background(), item) + + actualDrop := false + if errors.Is(err, ErrQueueFullAndDropped) { + actualDrop = true + } else if err != nil { + // If Enqueue with background context returns an error other + // than ErrQueueFullAndDropped, it's unexpected. + m.tb.Fatalf( + "Enqueue with background context returned "+ + "unexpected error: %v", err, + ) + } + + if !actualDrop { + // If the item was not dropped, it must have been enqueued. Add + // it to the model. The modelQueue should not exceed capacity. + // This is also checked in Check(). + m.modelQueue = append(m.modelQueue, item) + } +} + +// Dequeue is a state machine action. It dequeues an item and updates the model. +func (m *queueMachine[T]) Dequeue(t *rapid.T) { + if len(m.modelQueue) == 0 { + // If the model is empty, the actual queue channel should also + // be empty. + require.Zero( + m.tb, len(m.queue.ch), + "actual queue channel not empty when model is empty", + ) + + // Attempting to dequeue from an empty queue should block. We + // verify this by trying to dequeue with a very short timeout. + ctx, cancel := context.WithTimeout( + context.Background(), 5*time.Millisecond, + ) + defer cancel() + + result := m.queue.Dequeue(ctx) + require.True( + m.tb, result.IsErr(), + "dequeue should return error on empty queue with "+ + "timeout", + ) + require.ErrorIs( + m.tb, result.Err(), context.DeadlineExceeded, + "dequeue should block on empty queue", + ) + + return + } + + // The model is not empty, so we expect to dequeue an item. + expectedItem := m.modelQueue[0] + m.modelQueue = m.modelQueue[1:] + + // Perform the dequeue operation, this should succeed. + result := m.queue.Dequeue(context.Background()) + actualItem, err := result.Unpack() + require.NoError(t, err) + require.Equal( + m.tb, expectedItem, actualItem, + "dequeued item does not match model (FIFO violation or "+ + "model error)", + ) +} + +// Check is called by rapid after each action to verify invariants. +func (m *queueMachine[T]) Check(t *rapid.T) { + // Invariant 1: The length of the internal channel must not exceed + // capacity. + require.LessOrEqual( + m.tb, len(m.queue.ch), m.capacity, + "queue channel length exceeds capacity", + ) + + // Invariant 2: The length of our model queue must match the length of + // the actual queue's channel. + require.Equal( + m.tb, len(m.modelQueue), len(m.queue.ch), + "model queue length mismatch with actual queue channel "+ + "length", + ) +} + +// intQueueMachine is a concrete wrapper for queueMachine[int] for rapid. +type intQueueMachine struct { + *queueMachine[int] +} + +// NewIntqueueMachine creates a new queueMachine specialized for int items. +func NewIntqueueMachine(rt *rapid.T) *intQueueMachine { + // Draw from the rapid distribution for the made params of our + // queue. + capacity := rapid.IntRange(1, 50).Draw(rt, "capacity") + minThreshold := rapid.IntRange(0, capacity).Draw(rt, "minThreshold") + maxThreshold := rapid.IntRange( + minThreshold, capacity, + ).Draw(rt, "maxThreshold") + + // Draw a seed for this machine's local RNG using rapid. This makes + // the predicate's randomness part of rapid's generated test case. + machineSeed := rapid.Int64().Draw(rt, "machine_rng_seed") + localRngFixed := rand.New(rand.NewSource(machineSeed)) + + rt.Logf( + "NewIntqueueMachine: capacity=%d, minT=%d, maxT=%d, "+ + "machineSeed=%d", + capacity, minThreshold, maxThreshold, machineSeed, + ) + + predicate := RandomEarlyDrop[int]( + minThreshold, maxThreshold, + WithRandSource(localRngFixed.Float64), + ) + + q := NewBackpressureQueue(capacity, predicate) + + return &intQueueMachine{ + queueMachine: &queueMachine[int]{ + tb: rt, + capacity: capacity, + queue: q, + modelQueue: make([]int, 0, capacity), + dropPredicate: predicate, + itemGenerator: rapid.IntRange(-1000, 1000), + }, + } +} + +// Enqueue forwards the call to the generic queueMachine. +func (m *intQueueMachine) Enqueue(t *rapid.T) { m.queueMachine.Enqueue(t) } + +// Dequeue forwards the call to the generic queueMachine. +func (m *intQueueMachine) Dequeue(t *rapid.T) { m.queueMachine.Dequeue(t) } + +// Check forwards the call to the generic queueMachine. +func (m *intQueueMachine) Check(t *rapid.T) { m.queueMachine.Check(t) } + +// TestBackpressureQueueRapidInt is the main property-based test for +// BackpressureQueue using the IntqueueMachine state machine. +func TestBackpressureQueueRapidInt(t *testing.T) { + rapid.Check(t, func(rt *rapid.T) { + // Initialize the state machine instance within the + // property function. NewIntqueueMachine expects *rapid.T, + // which rt is. + machine := NewIntqueueMachine(rt) + + // Generate the actions map from the machine's methods. + // Rapid will randomly call the methods, and then use the + // `Check` method to verify invariants. + rt.Repeat(rapid.StateMachineActions(machine)) + }) +} + +// TestBackpressureQueueEnqueueCancellation tests that Enqueue respects +// context cancellation when it would otherwise block. +func TestBackpressureQueueEnqueueCancellation(t *testing.T) { + rapid.Check(t, func(rt *rapid.T) { + capacity := rapid.IntRange(1, 20).Draw(rt, "capacity") + + // Use a predicate that never drops when full, to force blocking + // behavior. + q := NewBackpressureQueue( + capacity, + func(_ int, _ int) bool { return false }, + ) + + // Fill the queue to its capacity. The predicate always returns + // false, so no drops expected. + for i := 0; i < capacity; i++ { + err := q.Enqueue(context.Background(), i) + require.NoError( + rt, err, "enqueue failed during setup: %v", err, + ) + } + require.Equal( + rt, capacity, len(q.ch), + "queue should be full after setup", + ) + + // Attempt to enqueue one more item with an immediately cancelled + // context. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := q.Enqueue(ctx, 999) + require.Error( + rt, err, + "enqueue should have returned an error for cancelled "+ + "context", + ) + require.ErrorIs( + rt, err, context.Canceled, + "error should be context.Canceled", + ) + + // Ensure the queue state (length) is unchanged. + require.Equal( + rt, capacity, len(q.ch), + "queue length changed after cancelled enqueue attempt", + ) + }) +} + +// TestBackpressureQueueDequeueCancellation tests that Dequeue respects +// context cancellation when the queue is empty and it would otherwise block. +func TestBackpressureQueueDequeueCancellation(t *testing.T) { + rapid.Check(t, func(rt *rapid.T) { + capacity := rapid.IntRange(0, 20).Draw(rt, "capacity") + + // The predicate doesn't matter much here as the queue will be + // empty. + q := NewBackpressureQueue( + capacity, RandomEarlyDrop[int](0, capacity), + ) + + require.Zero( + rt, len(q.ch), + "queue should be empty initially for Dequeue "+ + "cancellation test", + ) + + // Attempt to dequeue from the empty queue with an immediately + // cancelled context. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + result := q.Dequeue(ctx) + require.True( + rt, result.IsErr(), + "dequeue should have returned an error for cancelled "+ + "context", + ) + require.ErrorIs( + rt, result.Err(), context.Canceled, + "error should be context.Canceled", + ) + }) +} + +// TestBackpressureQueueComposedPredicate demonstrates testing with a +// composed predicate. This is a scenario-based test rather than a full +// property-based state machine. +func TestBackpressureQueueComposedPredicate(t *testing.T) { + capacity := 10 + minThresh, maxThresh := 3, 7 + + // Use a deterministic random source for this specific test case + // to ensure predictable behavior of RandomEarlyDrop. + const testSeed = int64(12345) + localRng := rand.New(rand.NewSource(testSeed)) + + redPred := RandomEarlyDrop[int]( + minThresh, maxThresh, WithRandSource(localRng.Float64), + ) + + // Next, we'll define a custom predicate: drop items with value 42. + customValuePredicate := func(_ int, item int) bool { + return item == 42 + } + + // We'll also make a composed predicate: drop if RED says so OR if + // item is 42. + composedPredicate := func(queueLen int, item int) bool { + isRedDrop := redPred(queueLen, item) + isCustomDrop := customValuePredicate(queueLen, item) + return isRedDrop || isCustomDrop + } + + q := NewBackpressureQueue(capacity, composedPredicate) + + // Scenario 1: Enqueue item 42 when queue length is between min/max + // thresholds. As we're below the max threshold, we shouldn't drop + // anything. + for i := 0; i < minThresh; i++ { + // All items aren't 42, and queue is not full enough for RED + // to drop. + err := q.Enqueue(context.Background(), i) + require.NoErrorf( + t, err, + "enqueue S1 setup item %d (qLen before: %d) should "+ + "not be dropped. Predicate was "+ + "redPred(%d,%d) || customPred(%d,%d)", + i, len(q.ch)-1, len(q.ch)-1, i, len(q.ch)-1, i, + ) + + } + + currentLen := len(q.ch) + require.Equal(t, minThresh, currentLen, "queue length after S1 setup") + + // Enqueue item 42. customValuePredicate is true, so + // composedPredicate is true. Item 42 should be dropped regardless + // of what redPred decides. + err := q.Enqueue(context.Background(), 42) + require.ErrorIs( + t, err, ErrQueueFullAndDropped, + "item 42 should have been dropped by composed predicate", + ) + require.Equal( + t, currentLen, len(q.ch), + "queue length should not change after dropping 42", + ) + + // Re-create the main SUT queue with the composedPredicate. We will + // manually fill its channel to capacity to bypass Enqueue logic + // for setup. + q = NewBackpressureQueue(capacity, composedPredicate) + for i := 0; i < capacity; i++ { + q.ch <- i + } + require.Equal( + t, capacity, len(q.ch), + "queue manually filled to capacity for S2 test", + ) + + err = q.Enqueue(context.Background(), 100) + + // Expect drop because queue is full (len=capacity), so + // redPred(capacity, 100) is true. customValuePredicate(capacity, + // 100) is false. Thus, composedPredicate should be true. + require.ErrorIs( + t, err, ErrQueueFullAndDropped, + "item 100 should be dropped (due to RED part of composed "+ + "predicate) when queue full", + ) + require.Equal( + t, capacity, len(q.ch), + "queue length should not change after dropping 100", + ) +} diff --git a/queue/go.mod b/queue/go.mod index 58267e27606..7dd9930d31b 100644 --- a/queue/go.mod +++ b/queue/go.mod @@ -1,6 +1,19 @@ module github.com/lightningnetwork/lnd/queue -require github.com/lightningnetwork/lnd/ticker v1.0.0 +require ( + github.com/lightningnetwork/lnd/fn/v2 v2.0.8 + github.com/lightningnetwork/lnd/ticker v1.0.0 + github.com/stretchr/testify v1.8.1 + pgregory.net/rapid v1.2.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/exp v0.0.0-20231226003508-02704c960a9b // indirect + golang.org/x/sync v0.7.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) replace github.com/lightningnetwork/lnd/ticker v1.0.0 => ../ticker diff --git a/queue/go.sum b/queue/go.sum index e69de29bb2d..575b2bc6777 100644 --- a/queue/go.sum +++ b/queue/go.sum @@ -0,0 +1,25 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/lightningnetwork/lnd/fn/v2 v2.0.8 h1:r2SLz7gZYQPVc3IZhU82M66guz3Zk2oY+Rlj9QN5S3g= +github.com/lightningnetwork/lnd/fn/v2 v2.0.8/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/exp v0.0.0-20231226003508-02704c960a9b h1:kLiC65FbiHWFAOu+lxwNPujcsl8VYyTYYEZnsOO1WK4= +golang.org/x/exp v0.0.0-20231226003508-02704c960a9b/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +pgregory.net/rapid v1.2.0 h1:keKAYRcjm+e1F0oAuU5F5+YPAWcyxNNRK2wud503Gnk= +pgregory.net/rapid v1.2.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=