-
Notifications
You must be signed in to change notification settings - Fork 990
Commit e0f9382
feat: support push batch direct to completed and add biggest coalesce batch support (#8146)
# Which issue does this PR close?
needed for:
apache/datafusion#17193
# Rationale for this change
```rust
// Large batch bypass optimization:
// When biggest_coalesce_batch_size is configured and a batch exceeds this limit,
// we can avoid expensive split-and-merge operations by passing it through directly.
//
// IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size
// is explicitly set via with_biggest_coalesce_batch_size(Some(limit)).
// If not set (None), ALL batches follow normal coalescing behavior regardless of size.
// =============================================================================
// CASE 1: No buffer + large batch → Direct bypass
// =============================================================================
// Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)):
// Input sequence: [600, 1200, 300]
//
// With biggest_coalesce_batch_size=Some(500) (optimization enabled):
// 600 → large batch detected! buffered_rows=0 → Case 1: direct bypass
// → output: [600] (bypass, preserves large batch)
// 1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass
// → output: [1200] (bypass, preserves large batch)
// 300 → normal batch, buffer: [300]
// Result: [600], [1200], [300] - large batches preserved, mixed sizes
// =============================================================================
// CASE 2: Buffer too large + large batch → Flush first, then bypass
// =============================================================================
// This case prevents creating extremely large merged batches that would
// significantly exceed both target_batch_size and biggest_coalesce_batch_size.
//
// Example 1: Buffer exceeds limit before large batch arrives
// target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
// Input: [350, 200, 800]
//
// Step 1: push_batch([350])
// → batch_size=350 <= 400, normal path
// → buffer: [350], buffered_rows=350
//
// Step 2: push_batch([200])
// → batch_size=200 <= 400, normal path
// → buffer: [350, 200], buffered_rows=550
//
// Step 3: push_batch([800])
// → batch_size=800 > 400, large batch path
// → buffered_rows=550 > 400 → Case 2: flush first
// → flush: output [550] (combined [350, 200])
// → then bypass: output [800]
// Result: [550], [800] - buffer flushed to prevent oversized merge
//
// Example 2: Multiple small batches accumulate before large batch
// target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
// Input: [150, 100, 80, 900]
//
// Step 1-3: Accumulate small batches
// 150 → buffer: [150], buffered_rows=150
// 100 → buffer: [150, 100], buffered_rows=250
// 80 → buffer: [150, 100, 80], buffered_rows=330
//
// Step 4: push_batch([900])
// → batch_size=900 > 300, large batch path
// → buffered_rows=330 > 300 → Case 2: flush first
// → flush: output [330] (combined [150, 100, 80])
// → then bypass: output [900]
// Result: [330], [900] - prevents merge into [1230] which would be too large
// =============================================================================
// CASE 3: Small buffer + large batch → Normal coalescing (no bypass)
// =============================================================================
// When buffer is small enough, we still merge to maintain efficiency
// Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
// Input: [300, 1200]
//
// Step 1: push_batch([300])
// → batch_size=300 <= 500, normal path
// → buffer: [300], buffered_rows=300
//
// Step 2: push_batch([1200])
// → batch_size=1200 > 500, large batch path
// → buffered_rows=300 <= 500 → Case 3: normal merge
// → buffer: [300, 1200] (1500 total)
// → 1500 > target_batch_size → split: output [1000], buffer [500]
// Result: [1000], [500] - normal split/merge behavior maintained
// =============================================================================
// Comparison: Default vs Optimized Behavior
// =============================================================================
// target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
// Input: [600, 1200, 300]
//
// DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
// 600 → buffer: [600]
// 1200 → buffer: [600, 1200] (1800 rows total)
// → split: output [1000 rows], buffer [800 rows remaining]
// 300 → buffer: [800, 300] (1100 rows total)
// → split: output [1000 rows], buffer [100 rows remaining]
// Result: [1000], [1000], [100] - all outputs respect target_batch_size
//
// OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
// 600 → Case 1: direct bypass → output: [600]
// 1200 → Case 1: direct bypass → output: [1200]
// 300 → normal path → buffer: [300]
// Result: [600], [1200], [300] - large batches preserved
// =============================================================================
// Benefits and Trade-offs
// =============================================================================
// Benefits of the optimization:
// - Large batches stay intact (better for downstream vectorized processing)
// - Fewer split/merge operations (better CPU performance)
// - More predictable memory usage patterns
// - Maintains streaming efficiency while preserving batch boundaries
//
// Trade-offs:
// - Output batch sizes become variable (not always target_batch_size)
// - May produce smaller partial batches when flushing before large batches
// - Requires tuning biggest_coalesce_batch_size parameter for optimal performance
// TODO, for unsorted batches, we may can filter all large batches, and coalesce all
// small batches together?
```
# What changes are included in this PR?
Add more public API which is needed for apache datafusion.
# Are these changes tested?
yes
Added unit test.
# Are there any user-facing changes?
No
---------
Co-authored-by: Andrew Lamb <[email protected]>1 parent 78212bf commit e0f9382Copy full SHA for e0f9382
File tree
Expand file treeCollapse file tree
1 file changed
+623
-3
lines changedFilter options
- arrow-select/src
Expand file treeCollapse file tree
1 file changed
+623
-3
lines changed
0 commit comments