-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Testing: Try test optimize performance for coalesce #17193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…eam_arrow_coalesce
…eam_arrow_coalesce
Sounds good to me. I am sorry I have somewhat lost track of the current status Shall we polish up this PR then? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @zhuqi-lucas
// Limit not reached, push the entire batch | ||
self.total_rows += batch.num_rows(); | ||
|
||
if batch.num_rows() >= self.biggest_coalesce_size { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or maybe we should migrate biggest_coalesce_size
setting to the upstream coalsecer 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @alamb , polished the upstream PR to support it now:
Thank you @alamb , right, let me polish the upstream PR before this PR. |
Polished the upstream PR to support it now: |
Very cool! Added a comment on the upstream PR, I think it makes sense to see if we can avoid the (small) regressions. |
Thank you @Dandandan for review! |
…row-datafusion into test_optimize_performance
Updated to use latest upstream code: May be we can trigger a new benchmark to compare the performance result. cc @alamb @Dandandan , thanks! |
🤖 |
DOne. I really need to find some better way to get the benchmarks triggered other than having to do it manually |
🤖: Benchmark completed Details
|
Thank you @alamb , i agree, if we can run by CI, it will be perfect! |
The result is similar @alamb @Dandandan for latest change, average time is better, Q1 and Q18 query improvement is good, some queries very small regression. |
Results look good to me -- I think the small changes are likely measurement noise. Thank you for pushing this through |
… batch support (#8146) # Which issue does this PR close? needed for: apache/datafusion#17193 # Rationale for this change ```rust // Large batch bypass optimization: // When biggest_coalesce_batch_size is configured and a batch exceeds this limit, // we can avoid expensive split-and-merge operations by passing it through directly. // // IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size // is explicitly set via with_biggest_coalesce_batch_size(Some(limit)). // If not set (None), ALL batches follow normal coalescing behavior regardless of size. // ============================================================================= // CASE 1: No buffer + large batch → Direct bypass // ============================================================================= // Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)): // Input sequence: [600, 1200, 300] // // With biggest_coalesce_batch_size=Some(500) (optimization enabled): // 600 → large batch detected! buffered_rows=0 → Case 1: direct bypass // → output: [600] (bypass, preserves large batch) // 1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass // → output: [1200] (bypass, preserves large batch) // 300 → normal batch, buffer: [300] // Result: [600], [1200], [300] - large batches preserved, mixed sizes // ============================================================================= // CASE 2: Buffer too large + large batch → Flush first, then bypass // ============================================================================= // This case prevents creating extremely large merged batches that would // significantly exceed both target_batch_size and biggest_coalesce_batch_size. // // Example 1: Buffer exceeds limit before large batch arrives // target_batch_size=1000, biggest_coalesce_batch_size=Some(400) // Input: [350, 200, 800] // // Step 1: push_batch([350]) // → batch_size=350 <= 400, normal path // → buffer: [350], buffered_rows=350 // // Step 2: push_batch([200]) // → batch_size=200 <= 400, normal path // → buffer: [350, 200], buffered_rows=550 // // Step 3: push_batch([800]) // → batch_size=800 > 400, large batch path // → buffered_rows=550 > 400 → Case 2: flush first // → flush: output [550] (combined [350, 200]) // → then bypass: output [800] // Result: [550], [800] - buffer flushed to prevent oversized merge // // Example 2: Multiple small batches accumulate before large batch // target_batch_size=1000, biggest_coalesce_batch_size=Some(300) // Input: [150, 100, 80, 900] // // Step 1-3: Accumulate small batches // 150 → buffer: [150], buffered_rows=150 // 100 → buffer: [150, 100], buffered_rows=250 // 80 → buffer: [150, 100, 80], buffered_rows=330 // // Step 4: push_batch([900]) // → batch_size=900 > 300, large batch path // → buffered_rows=330 > 300 → Case 2: flush first // → flush: output [330] (combined [150, 100, 80]) // → then bypass: output [900] // Result: [330], [900] - prevents merge into [1230] which would be too large // ============================================================================= // CASE 3: Small buffer + large batch → Normal coalescing (no bypass) // ============================================================================= // When buffer is small enough, we still merge to maintain efficiency // Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500) // Input: [300, 1200] // // Step 1: push_batch([300]) // → batch_size=300 <= 500, normal path // → buffer: [300], buffered_rows=300 // // Step 2: push_batch([1200]) // → batch_size=1200 > 500, large batch path // → buffered_rows=300 <= 500 → Case 3: normal merge // → buffer: [300, 1200] (1500 total) // → 1500 > target_batch_size → split: output [1000], buffer [500] // Result: [1000], [500] - normal split/merge behavior maintained // ============================================================================= // Comparison: Default vs Optimized Behavior // ============================================================================= // target_batch_size=1000, biggest_coalesce_batch_size=Some(500) // Input: [600, 1200, 300] // // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None): // 600 → buffer: [600] // 1200 → buffer: [600, 1200] (1800 rows total) // → split: output [1000 rows], buffer [800 rows remaining] // 300 → buffer: [800, 300] (1100 rows total) // → split: output [1000 rows], buffer [100 rows remaining] // Result: [1000], [1000], [100] - all outputs respect target_batch_size // // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)): // 600 → Case 1: direct bypass → output: [600] // 1200 → Case 1: direct bypass → output: [1200] // 300 → normal path → buffer: [300] // Result: [600], [1200], [300] - large batches preserved // ============================================================================= // Benefits and Trade-offs // ============================================================================= // Benefits of the optimization: // - Large batches stay intact (better for downstream vectorized processing) // - Fewer split/merge operations (better CPU performance) // - More predictable memory usage patterns // - Maintains streaming efficiency while preserving batch boundaries // // Trade-offs: // - Output batch sizes become variable (not always target_batch_size) // - May produce smaller partial batches when flushing before large batches // - Requires tuning biggest_coalesce_batch_size parameter for optimal performance // TODO, for unsorted batches, we may can filter all large batches, and coalesce all // small batches together? ``` # What changes are included in this PR? Add more public API which is needed for apache datafusion. # Are these changes tested? yes Added unit test. # Are there any user-facing changes? No --------- Co-authored-by: Andrew Lamb <[email protected]>
🤖 |
I have updated my benchmark machine on gcp so it supposedly is more consistent -- I am going to rerun the benchmarks on this PR to see if it looks better Also, I think one the following PR is merged, we can do this one: |
🤖 |
Thank you @alamb , it makes sense, i will refactor after the PR merged. |
🤖: Benchmark completed Details
|
Thank you @alamb , the result is amazing, 16 queries faster for clickbench! But one query has regression: │ QQuery 23 │ 14761.76 ms │ 19908.31 ms │ 1.35x slower │ I need to investigate this one. 🤔 |
I merged upstream/main to this branch first before investigation. |
I also made some changes to my benchmark machine that hopefully will result in less noise. I'll rerun the benchmarks for this one |
🤖 |
🤖: Benchmark completed Details
|
Thank you @alamb , it seems regression for clickbench for this PR. 🤔 I updated the branch again now since some PRs merged to main branch |
Which issue does this PR close?
Try test optimize performance for coalesce
This is a follow-up testing for
#17105
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?