-
Notifications
You must be signed in to change notification settings - Fork 2.2k
queue: add new BackpressureQueue[T] variant #9838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Roasbeef
wants to merge
1
commit into
master
Choose a base branch
from
backpressure-queue
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
package queue | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"math/rand" | ||
|
||
"github.com/lightningnetwork/lnd/fn/v2" | ||
) | ||
|
||
// DropPredicate decides whether to drop an item when the queue is full. | ||
// It receives the current queue length and the item, and returns true to drop, | ||
// false to enqueue. | ||
type DropPredicate[T any] func(queueLen int, item T) bool | ||
|
||
// ErrQueueFullAndDropped is returned by Enqueue when the item is dropped | ||
// due to the DropPredicate. | ||
var ErrQueueFullAndDropped = errors.New("queue full and item dropped") | ||
|
||
// BackpressureQueue is a generic, fixed-capacity queue with predicate-based | ||
// drop behavior. When full, it uses the DropPredicate to perform early drops | ||
// (e.g., RED-style). | ||
type BackpressureQueue[T any] struct { | ||
ch chan T | ||
dropPredicate DropPredicate[T] | ||
} | ||
|
||
// NewBackpressureQueue creates a new BackpressureQueue with the given capacity | ||
// and drop predicate. | ||
func NewBackpressureQueue[T any](capacity int, | ||
predicate DropPredicate[T]) *BackpressureQueue[T] { | ||
|
||
return &BackpressureQueue[T]{ | ||
ch: make(chan T, capacity), | ||
dropPredicate: predicate, | ||
} | ||
} | ||
|
||
// Enqueue attempts to add an item to the queue, respecting context | ||
// cancellation. Returns ErrQueueFullAndDropped if dropped, or context error if | ||
// ctx is done before enqueue. Otherwise, `nil` is returned on success. | ||
func (q *BackpressureQueue[T]) Enqueue(ctx context.Context, | ||
item T) error { | ||
|
||
// First, consult the drop predicate based on the current queue length. | ||
// If the predicate decides to drop the item, return an error. | ||
if q.dropPredicate(len(q.ch), item) { | ||
return ErrQueueFullAndDropped | ||
} | ||
|
||
// If the predicate decides not to drop, attempt to enqueue the item. | ||
select { | ||
case q.ch <- item: | ||
return nil | ||
|
||
default: | ||
// Channel is full, and the predicate decided not to drop. We | ||
// must block until space is available or context is cancelled. | ||
select { | ||
case q.ch <- item: | ||
return nil | ||
|
||
case <-ctx.Done(): | ||
return ctx.Err() | ||
} | ||
} | ||
} | ||
|
||
// Dequeue retrieves the next item from the queue, blocking until available or | ||
// context done. Returns the item or an error if ctx is done before an item is | ||
// available. | ||
func (q *BackpressureQueue[T]) Dequeue(ctx context.Context) fn.Result[T] { | ||
select { | ||
|
||
case item := <-q.ch: | ||
return fn.Ok(item) | ||
|
||
case <-ctx.Done(): | ||
return fn.Err[T](ctx.Err()) | ||
} | ||
} | ||
|
||
// redConfig holds configuration for RandomEarlyDrop. | ||
type redConfig struct { | ||
randSrc func() float64 | ||
} | ||
|
||
// REDOption is a functional option for configuring RandomEarlyDrop. | ||
type REDOption func(*redConfig) | ||
|
||
// WithRandSource provides a custom random number source (a function that | ||
// returns a float64 between 0.0 and 1.0). | ||
func WithRandSource(src func() float64) REDOption { | ||
return func(cfg *redConfig) { | ||
cfg.randSrc = src | ||
} | ||
} | ||
|
||
// RandomEarlyDrop returns a DropPredicate that implements Random Early | ||
// Detection (RED), inspired by TCP-RED queue management. | ||
// | ||
// RED prevents sudden buffer overflows by proactively dropping packets before | ||
// the queue is full. It establishes two thresholds: | ||
// | ||
// 1. minThreshold: queue length below which no drops occur. | ||
// 2. maxThreshold: queue length at or above which all items are dropped. | ||
// | ||
// Between these points, the drop probability p increases linearly: | ||
// | ||
// p = (queueLen - minThreshold) / (maxThreshold - minThreshold) | ||
// | ||
// For example, with minThreshold=15 and maxThreshold=35: | ||
// - At queueLen=15, p=0.0 (0% drop chance) | ||
// - At queueLen=25, p=0.5 (50% drop chance) | ||
// - At queueLen=35, p=1.0 (100% drop chance) | ||
// | ||
// This smooth ramp helps avoid tail-drop spikes, smooths queue occupancy, | ||
// and gives early back-pressure signals to senders. | ||
func RandomEarlyDrop[T any](minThreshold, maxThreshold int, | ||
opts ...REDOption) DropPredicate[T] { | ||
|
||
cfg := redConfig{ | ||
randSrc: rand.Float64, | ||
} | ||
|
||
for _, opt := range opts { | ||
opt(&cfg) | ||
} | ||
if cfg.randSrc == nil { | ||
panic("randSrc cannot be nil") | ||
} | ||
|
||
return func(queueLen int, _ T) bool { | ||
// If the queue is below the minimum threshold, then we never | ||
// drop. | ||
if queueLen < minThreshold { | ||
return false | ||
} | ||
|
||
// If the queue is at or above the maximum threshold, then we | ||
// always drop. | ||
if queueLen >= maxThreshold { | ||
return true | ||
} | ||
|
||
// In between the thresholds, linearly scale the drop | ||
// probability. | ||
denominator := float64(maxThreshold - minThreshold) | ||
p := float64(queueLen-minThreshold) / denominator | ||
|
||
return cfg.randSrc() < p | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.