-
Notifications
You must be signed in to change notification settings - Fork 526
feat: provide delta writer option to flush buffer after every batch #3675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Sam Meyer-Reed <[email protected]>
Signed-off-by: Sam Meyer-Reed <[email protected]>
Signed-off-by: Sam Meyer-Reed <[email protected]>
Signed-off-by: Sam Meyer-Reed <[email protected]>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3675 +/- ##
==========================================
+ Coverage 75.58% 75.64% +0.06%
==========================================
Files 146 146
Lines 45172 45342 +170
Branches 45172 45342 +170
==========================================
+ Hits 34141 34299 +158
- Misses 9210 9215 +5
- Partials 1821 1828 +7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
// does not extract write_batch_size from WriterProperties | ||
if let Some(write_batch_size) = writer_props.write_batch_size { | ||
builder = builder.with_write_batch_size(write_batch_size); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels weird to me, but I wasn't sure how much I wanted to change the write_deltalake
function signature here since we provide this write_batch_size
setting in the WriterProperties as opposed to a standalone parameter like target_file_size
.
Oops also realized I had pyarrow in my local uv env so that failing test passed for me locally but not here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has the side effect of creating a lot of small files, right?
I was thinking, couldn't we do the flushing more tied together to file-writing, so the flush arrow writer becomes aware of the amount of flushed data.
Typing this on a phone so hope it's clear:
Write_batch --> bytes --> flush to disk --> check if flushed data is gt file_size_limit, if so close multipart and start new multipart
The only thing is the last part can only be smaller if you close the multipart write, so you have to make sure the writer only flushes full parts, and somehow becomes aware to check if the last part was the final part.
Yes this does currently write many more small files. Ahhh ok I see, yeah that makes a lot of sense. Ok I'll work on that, thanks! |
@ion-elgreco Ok follow up question about this: Currently delta-rs/crates/core/src/operations/write/writer.rs Lines 433 to 444 in 2920177
writer.close() , but now I think we have to call writer.close() multiple times per file to get buffer data. Currently the way I'm handling this is I'm just using the metadata from the first flush for a file and accumulating row counts, but we're losing out on column statistics for the whole set of batches (just using the stats from the first batch) which doesn't seem great.
Another option I can think of is merging the stats from each flush and rebuilding the whole metadata at the end but that seems pretty intense for this. I'm also seeing this which seems potentially helpful: https://github.com/delta-io/delta-rs/blob/2920177ac5215e192e0182bed93c42c0b4a98b6f/crates/core/src/writer/stats.rs#L436C1-L489C2 Just wanted to run this past you and see if you had an opinion on the best course of action, thanks! |
I think we should avoid calling AsyncArrowWriter.close() before reaching the max file size. Reset_writer currently returns the old buffer and writet and creates new buffer and writer but that should perhaps change 🤔 |
Description
This provides the option to the delta writer to flush the in-memory buffer to disk after each record batch as opposed to waiting for the targeted file size by passing the
flush_per_batch
parameter. This prevents accumulating lots of memory in some cases.Note: I believe there is still some accumulated memory usage through things like transaction metadata.
Related Issue(s)
Documentation