diff --git a/queue/back_pressure.go b/queue/back_pressure.go new file mode 100644 index 00000000000..7e916b215a5 --- /dev/null +++ b/queue/back_pressure.go @@ -0,0 +1,153 @@ +package queue + +import ( + "context" + "errors" + "math/rand" + + "github.com/lightningnetwork/lnd/fn/v2" +) + +// DropPredicate decides whether to drop an item when the queue is full. +// It receives the current queue length and the item, and returns true to drop, +// false to enqueue. +type DropPredicate[T any] func(queueLen int, item T) bool + +// ErrQueueFullAndDropped is returned by Enqueue when the item is dropped +// due to the DropPredicate. +var ErrQueueFullAndDropped = errors.New("queue full and item dropped") + +// BackpressureQueue is a generic, fixed-capacity queue with predicate-based +// drop behavior. When full, it uses the DropPredicate to perform early drops +// (e.g., RED-style). +type BackpressureQueue[T any] struct { + ch chan T + dropPredicate DropPredicate[T] +} + +// NewBackpressureQueue creates a new BackpressureQueue with the given capacity +// and drop predicate. +func NewBackpressureQueue[T any](capacity int, + predicate DropPredicate[T]) *BackpressureQueue[T] { + + return &BackpressureQueue[T]{ + ch: make(chan T, capacity), + dropPredicate: predicate, + } +} + +// Enqueue attempts to add an item to the queue, respecting context +// cancellation. Returns ErrQueueFullAndDropped if dropped, or context error if +// ctx is done before enqueue. Otherwise, `nil` is returned on success. +func (q *BackpressureQueue[T]) Enqueue(ctx context.Context, + item T) error { + + // First, consult the drop predicate based on the current queue length. + // If the predicate decides to drop the item, return an error. + if q.dropPredicate(len(q.ch), item) { + return ErrQueueFullAndDropped + } + + // If the predicate decides not to drop, attempt to enqueue the item. + select { + case q.ch <- item: + return nil + + default: + // Channel is full, and the predicate decided not to drop. We + // must block until space is available or context is cancelled. + select { + case q.ch <- item: + return nil + + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// Dequeue retrieves the next item from the queue, blocking until available or +// context done. Returns the item or an error if ctx is done before an item is +// available. +func (q *BackpressureQueue[T]) Dequeue(ctx context.Context) fn.Result[T] { + select { + + case item := <-q.ch: + return fn.Ok(item) + + case <-ctx.Done(): + return fn.Err[T](ctx.Err()) + } +} + +// redConfig holds configuration for RandomEarlyDrop. +type redConfig struct { + randSrc func() float64 +} + +// REDOption is a functional option for configuring RandomEarlyDrop. +type REDOption func(*redConfig) + +// WithRandSource provides a custom random number source (a function that +// returns a float64 between 0.0 and 1.0). +func WithRandSource(src func() float64) REDOption { + return func(cfg *redConfig) { + cfg.randSrc = src + } +} + +// RandomEarlyDrop returns a DropPredicate that implements Random Early +// Detection (RED), inspired by TCP-RED queue management. +// +// RED prevents sudden buffer overflows by proactively dropping packets before +// the queue is full. It establishes two thresholds: +// +// 1. minThreshold: queue length below which no drops occur. +// 2. maxThreshold: queue length at or above which all items are dropped. +// +// Between these points, the drop probability p increases linearly: +// +// p = (queueLen - minThreshold) / (maxThreshold - minThreshold) +// +// For example, with minThreshold=15 and maxThreshold=35: +// - At queueLen=15, p=0.0 (0% drop chance) +// - At queueLen=25, p=0.5 (50% drop chance) +// - At queueLen=35, p=1.0 (100% drop chance) +// +// This smooth ramp helps avoid tail-drop spikes, smooths queue occupancy, +// and gives early back-pressure signals to senders. +func RandomEarlyDrop[T any](minThreshold, maxThreshold int, + opts ...REDOption) DropPredicate[T] { + + cfg := redConfig{ + randSrc: rand.Float64, + } + + for _, opt := range opts { + opt(&cfg) + } + if cfg.randSrc == nil { + panic("randSrc cannot be nil") + } + + return func(queueLen int, _ T) bool { + // If the queue is below the minimum threshold, then we never + // drop. + if queueLen < minThreshold { + return false + } + + // If the queue is at or above the maximum threshold, then we + // always drop. + if queueLen >= maxThreshold { + return true + } + + // In between the thresholds, linearly scale the drop + // probability. + denominator := float64(maxThreshold - minThreshold) + p := float64(queueLen-minThreshold) / denominator + + return cfg.randSrc() < p + } +} diff --git a/queue/back_pressure_test.go b/queue/back_pressure_test.go new file mode 100644 index 00000000000..e24a9b47298 --- /dev/null +++ b/queue/back_pressure_test.go @@ -0,0 +1,366 @@ +package queue + +import ( + "context" + "errors" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + "pgregory.net/rapid" +) + +// queueMachine is the generic state machine logic for testing +// BackpressureQueue. T must be comparable for use in assertions. +type queueMachine[T comparable] struct { + tb rapid.TB + + capacity int + + queue *BackpressureQueue[T] + + modelQueue []T + + dropPredicate DropPredicate[T] + + itemGenerator *rapid.Generator[T] +} + +// Enqueue is a state machine action. It enqueues an item and updates the model. +func (m *queueMachine[T]) Enqueue(t *rapid.T) { + item := m.itemGenerator.Draw(t, "item") + + err := m.queue.Enqueue(context.Background(), item) + + actualDrop := false + if errors.Is(err, ErrQueueFullAndDropped) { + actualDrop = true + } else if err != nil { + // If Enqueue with background context returns an error other + // than ErrQueueFullAndDropped, it's unexpected. + m.tb.Fatalf( + "Enqueue with background context returned "+ + "unexpected error: %v", err, + ) + } + + if !actualDrop { + // If the item was not dropped, it must have been enqueued. Add + // it to the model. The modelQueue should not exceed capacity. + // This is also checked in Check(). + m.modelQueue = append(m.modelQueue, item) + } +} + +// Dequeue is a state machine action. It dequeues an item and updates the model. +func (m *queueMachine[T]) Dequeue(t *rapid.T) { + if len(m.modelQueue) == 0 { + // If the model is empty, the actual queue channel should also + // be empty. + require.Zero( + m.tb, len(m.queue.ch), + "actual queue channel not empty when model is empty", + ) + + // Attempting to dequeue from an empty queue should block. We + // verify this by trying to dequeue with a very short timeout. + ctx, cancel := context.WithTimeout( + context.Background(), 5*time.Millisecond, + ) + defer cancel() + + result := m.queue.Dequeue(ctx) + require.True( + m.tb, result.IsErr(), + "dequeue should return error on empty queue with "+ + "timeout", + ) + require.ErrorIs( + m.tb, result.Err(), context.DeadlineExceeded, + "dequeue should block on empty queue", + ) + + return + } + + // The model is not empty, so we expect to dequeue an item. + expectedItem := m.modelQueue[0] + m.modelQueue = m.modelQueue[1:] + + // Perform the dequeue operation, this should succeed. + result := m.queue.Dequeue(context.Background()) + actualItem, err := result.Unpack() + require.NoError(t, err) + require.Equal( + m.tb, expectedItem, actualItem, + "dequeued item does not match model (FIFO violation or "+ + "model error)", + ) +} + +// Check is called by rapid after each action to verify invariants. +func (m *queueMachine[T]) Check(t *rapid.T) { + // Invariant 1: The length of the internal channel must not exceed + // capacity. + require.LessOrEqual( + m.tb, len(m.queue.ch), m.capacity, + "queue channel length exceeds capacity", + ) + + // Invariant 2: The length of our model queue must match the length of + // the actual queue's channel. + require.Equal( + m.tb, len(m.modelQueue), len(m.queue.ch), + "model queue length mismatch with actual queue channel "+ + "length", + ) +} + +// intQueueMachine is a concrete wrapper for queueMachine[int] for rapid. +type intQueueMachine struct { + *queueMachine[int] +} + +// NewIntqueueMachine creates a new queueMachine specialized for int items. +func NewIntqueueMachine(rt *rapid.T) *intQueueMachine { + // Draw from the rapid distribution for the made params of our + // queue. + capacity := rapid.IntRange(1, 50).Draw(rt, "capacity") + minThreshold := rapid.IntRange(0, capacity).Draw(rt, "minThreshold") + maxThreshold := rapid.IntRange( + minThreshold, capacity, + ).Draw(rt, "maxThreshold") + + // Draw a seed for this machine's local RNG using rapid. This makes + // the predicate's randomness part of rapid's generated test case. + machineSeed := rapid.Int64().Draw(rt, "machine_rng_seed") + localRngFixed := rand.New(rand.NewSource(machineSeed)) + + rt.Logf( + "NewIntqueueMachine: capacity=%d, minT=%d, maxT=%d, "+ + "machineSeed=%d", + capacity, minThreshold, maxThreshold, machineSeed, + ) + + predicate := RandomEarlyDrop[int]( + minThreshold, maxThreshold, + WithRandSource(localRngFixed.Float64), + ) + + q := NewBackpressureQueue(capacity, predicate) + + return &intQueueMachine{ + queueMachine: &queueMachine[int]{ + tb: rt, + capacity: capacity, + queue: q, + modelQueue: make([]int, 0, capacity), + dropPredicate: predicate, + itemGenerator: rapid.IntRange(-1000, 1000), + }, + } +} + +// Enqueue forwards the call to the generic queueMachine. +func (m *intQueueMachine) Enqueue(t *rapid.T) { m.queueMachine.Enqueue(t) } + +// Dequeue forwards the call to the generic queueMachine. +func (m *intQueueMachine) Dequeue(t *rapid.T) { m.queueMachine.Dequeue(t) } + +// Check forwards the call to the generic queueMachine. +func (m *intQueueMachine) Check(t *rapid.T) { m.queueMachine.Check(t) } + +// TestBackpressureQueueRapidInt is the main property-based test for +// BackpressureQueue using the IntqueueMachine state machine. +func TestBackpressureQueueRapidInt(t *testing.T) { + rapid.Check(t, func(rt *rapid.T) { + // Initialize the state machine instance within the + // property function. NewIntqueueMachine expects *rapid.T, + // which rt is. + machine := NewIntqueueMachine(rt) + + // Generate the actions map from the machine's methods. + // Rapid will randomly call the methods, and then use the + // `Check` method to verify invariants. + rt.Repeat(rapid.StateMachineActions(machine)) + }) +} + +// TestBackpressureQueueEnqueueCancellation tests that Enqueue respects +// context cancellation when it would otherwise block. +func TestBackpressureQueueEnqueueCancellation(t *testing.T) { + rapid.Check(t, func(rt *rapid.T) { + capacity := rapid.IntRange(1, 20).Draw(rt, "capacity") + + // Use a predicate that never drops when full, to force blocking + // behavior. + q := NewBackpressureQueue( + capacity, + func(_ int, _ int) bool { return false }, + ) + + // Fill the queue to its capacity. The predicate always returns + // false, so no drops expected. + for i := 0; i < capacity; i++ { + err := q.Enqueue(context.Background(), i) + require.NoError( + rt, err, "enqueue failed during setup: %v", err, + ) + } + require.Equal( + rt, capacity, len(q.ch), + "queue should be full after setup", + ) + + // Attempt to enqueue one more item with an immediately cancelled + // context. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := q.Enqueue(ctx, 999) + require.Error( + rt, err, + "enqueue should have returned an error for cancelled "+ + "context", + ) + require.ErrorIs( + rt, err, context.Canceled, + "error should be context.Canceled", + ) + + // Ensure the queue state (length) is unchanged. + require.Equal( + rt, capacity, len(q.ch), + "queue length changed after cancelled enqueue attempt", + ) + }) +} + +// TestBackpressureQueueDequeueCancellation tests that Dequeue respects +// context cancellation when the queue is empty and it would otherwise block. +func TestBackpressureQueueDequeueCancellation(t *testing.T) { + rapid.Check(t, func(rt *rapid.T) { + capacity := rapid.IntRange(0, 20).Draw(rt, "capacity") + + // The predicate doesn't matter much here as the queue will be + // empty. + q := NewBackpressureQueue( + capacity, RandomEarlyDrop[int](0, capacity), + ) + + require.Zero( + rt, len(q.ch), + "queue should be empty initially for Dequeue "+ + "cancellation test", + ) + + // Attempt to dequeue from the empty queue with an immediately + // cancelled context. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + result := q.Dequeue(ctx) + require.True( + rt, result.IsErr(), + "dequeue should have returned an error for cancelled "+ + "context", + ) + require.ErrorIs( + rt, result.Err(), context.Canceled, + "error should be context.Canceled", + ) + }) +} + +// TestBackpressureQueueComposedPredicate demonstrates testing with a +// composed predicate. This is a scenario-based test rather than a full +// property-based state machine. +func TestBackpressureQueueComposedPredicate(t *testing.T) { + capacity := 10 + minThresh, maxThresh := 3, 7 + + // Use a deterministic random source for this specific test case + // to ensure predictable behavior of RandomEarlyDrop. + const testSeed = int64(12345) + localRng := rand.New(rand.NewSource(testSeed)) + + redPred := RandomEarlyDrop[int]( + minThresh, maxThresh, WithRandSource(localRng.Float64), + ) + + // Next, we'll define a custom predicate: drop items with value 42. + customValuePredicate := func(_ int, item int) bool { + return item == 42 + } + + // We'll also make a composed predicate: drop if RED says so OR if + // item is 42. + composedPredicate := func(queueLen int, item int) bool { + isRedDrop := redPred(queueLen, item) + isCustomDrop := customValuePredicate(queueLen, item) + return isRedDrop || isCustomDrop + } + + q := NewBackpressureQueue(capacity, composedPredicate) + + // Scenario 1: Enqueue item 42 when queue length is between min/max + // thresholds. As we're below the max threshold, we shouldn't drop + // anything. + for i := 0; i < minThresh; i++ { + // All items aren't 42, and queue is not full enough for RED + // to drop. + err := q.Enqueue(context.Background(), i) + require.NoErrorf( + t, err, + "enqueue S1 setup item %d (qLen before: %d) should "+ + "not be dropped. Predicate was "+ + "redPred(%d,%d) || customPred(%d,%d)", + i, len(q.ch)-1, len(q.ch)-1, i, len(q.ch)-1, i, + ) + + } + + currentLen := len(q.ch) + require.Equal(t, minThresh, currentLen, "queue length after S1 setup") + + // Enqueue item 42. customValuePredicate is true, so + // composedPredicate is true. Item 42 should be dropped regardless + // of what redPred decides. + err := q.Enqueue(context.Background(), 42) + require.ErrorIs( + t, err, ErrQueueFullAndDropped, + "item 42 should have been dropped by composed predicate", + ) + require.Equal( + t, currentLen, len(q.ch), + "queue length should not change after dropping 42", + ) + + // Re-create the main SUT queue with the composedPredicate. We will + // manually fill its channel to capacity to bypass Enqueue logic + // for setup. + q = NewBackpressureQueue(capacity, composedPredicate) + for i := 0; i < capacity; i++ { + q.ch <- i + } + require.Equal( + t, capacity, len(q.ch), + "queue manually filled to capacity for S2 test", + ) + + err = q.Enqueue(context.Background(), 100) + + // Expect drop because queue is full (len=capacity), so + // redPred(capacity, 100) is true. customValuePredicate(capacity, + // 100) is false. Thus, composedPredicate should be true. + require.ErrorIs( + t, err, ErrQueueFullAndDropped, + "item 100 should be dropped (due to RED part of composed "+ + "predicate) when queue full", + ) + require.Equal( + t, capacity, len(q.ch), + "queue length should not change after dropping 100", + ) +} diff --git a/queue/go.mod b/queue/go.mod index 58267e27606..7dd9930d31b 100644 --- a/queue/go.mod +++ b/queue/go.mod @@ -1,6 +1,19 @@ module github.com/lightningnetwork/lnd/queue -require github.com/lightningnetwork/lnd/ticker v1.0.0 +require ( + github.com/lightningnetwork/lnd/fn/v2 v2.0.8 + github.com/lightningnetwork/lnd/ticker v1.0.0 + github.com/stretchr/testify v1.8.1 + pgregory.net/rapid v1.2.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/exp v0.0.0-20231226003508-02704c960a9b // indirect + golang.org/x/sync v0.7.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) replace github.com/lightningnetwork/lnd/ticker v1.0.0 => ../ticker diff --git a/queue/go.sum b/queue/go.sum index e69de29bb2d..575b2bc6777 100644 --- a/queue/go.sum +++ b/queue/go.sum @@ -0,0 +1,25 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/lightningnetwork/lnd/fn/v2 v2.0.8 h1:r2SLz7gZYQPVc3IZhU82M66guz3Zk2oY+Rlj9QN5S3g= +github.com/lightningnetwork/lnd/fn/v2 v2.0.8/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/exp v0.0.0-20231226003508-02704c960a9b h1:kLiC65FbiHWFAOu+lxwNPujcsl8VYyTYYEZnsOO1WK4= +golang.org/x/exp v0.0.0-20231226003508-02704c960a9b/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +pgregory.net/rapid v1.2.0 h1:keKAYRcjm+e1F0oAuU5F5+YPAWcyxNNRK2wud503Gnk= +pgregory.net/rapid v1.2.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=