Skip to content

Conversation

smeyerre
Copy link
Contributor

Description

This provides the option to the delta writer to flush the in-memory buffer to disk after each record batch as opposed to waiting for the targeted file size by passing the flush_per_batch parameter. This prevents accumulating lots of memory in some cases.

Note: I believe there is still some accumulated memory usage through things like transaction metadata.

Related Issue(s)

Documentation

  • N/A

@github-actions github-actions bot added binding/python Issues for the Python package binding/rust Issues for the Rust crate labels Aug 12, 2025
@ion-elgreco ion-elgreco self-assigned this Aug 12, 2025
Copy link

codecov bot commented Aug 12, 2025

Codecov Report

❌ Patch coverage is 92.04545% with 14 lines in your changes missing coverage. Please review.
✅ Project coverage is 75.64%. Comparing base (3354e73) to head (1ba0e07).
⚠️ Report is 8 commits behind head on main.

Files with missing lines Patch % Lines
crates/core/src/operations/write/writer.rs 94.85% 0 Missing and 7 partials ⚠️
python/src/lib.rs 0.00% 7 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3675      +/-   ##
==========================================
+ Coverage   75.58%   75.64%   +0.06%     
==========================================
  Files         146      146              
  Lines       45172    45342     +170     
  Branches    45172    45342     +170     
==========================================
+ Hits        34141    34299     +158     
- Misses       9210     9215       +5     
- Partials     1821     1828       +7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

// does not extract write_batch_size from WriterProperties
if let Some(write_batch_size) = writer_props.write_batch_size {
builder = builder.with_write_batch_size(write_batch_size);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels weird to me, but I wasn't sure how much I wanted to change the write_deltalake function signature here since we provide this write_batch_size setting in the WriterProperties as opposed to a standalone parameter like target_file_size.

@smeyerre
Copy link
Contributor Author

Oops also realized I had pyarrow in my local uv env so that failing test passed for me locally but not here

Copy link
Collaborator

@ion-elgreco ion-elgreco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the side effect of creating a lot of small files, right?

I was thinking, couldn't we do the flushing more tied together to file-writing, so the flush arrow writer becomes aware of the amount of flushed data.

Typing this on a phone so hope it's clear:

Write_batch --> bytes --> flush to disk --> check if flushed data is gt file_size_limit, if so close multipart and start new multipart

The only thing is the last part can only be smaller if you close the multipart write, so you have to make sure the writer only flushes full parts, and somehow becomes aware to check if the last part was the final part.

@smeyerre
Copy link
Contributor Author

This has the side effect of creating a lot of small files, right?

I was thinking, couldn't we do the flushing more tied together to file-writing, so the flush arrow writer becomes aware of the amount of flushed data.

Typing this on a phone so hope it's clear:

Write_batch --> bytes --> flush to disk --> check if flushed data is gt file_size_limit, if so close multipart and start new multipart

The only thing is the last part can only be smaller if you close the multipart write, so you have to make sure the writer only flushes full parts, and somehow becomes aware to check if the last part was the final part.

Yes this does currently write many more small files.

Ahhh ok I see, yeah that makes a lot of sense. Ok I'll work on that, thanks!

@smeyerre
Copy link
Contributor Author

@ion-elgreco Ok follow up question about this:

Currently create_add() (

self.files_written.push(
create_add(
&self.config.partition_values,
path.to_string(),
file_size,
&metadata,
self.num_indexed_cols,
&self.stats_columns,
)
.map_err(|err| WriteError::CreateAdd {
source: Box::new(err),
})?,
) expects complete metadata from writer.close(), but now I think we have to call writer.close() multiple times per file to get buffer data. Currently the way I'm handling this is I'm just using the metadata from the first flush for a file and accumulating row counts, but we're losing out on column statistics for the whole set of batches (just using the stats from the first batch) which doesn't seem great.

Another option I can think of is merging the stats from each flush and rebuilding the whole metadata at the end but that seems pretty intense for this.

I'm also seeing this which seems potentially helpful: https://github.com/delta-io/delta-rs/blob/2920177ac5215e192e0182bed93c42c0b4a98b6f/crates/core/src/writer/stats.rs#L436C1-L489C2

Just wanted to run this past you and see if you had an opinion on the best course of action, thanks!

@ion-elgreco
Copy link
Collaborator

I think we should avoid calling AsyncArrowWriter.close() before reaching the max file size.

Reset_writer currently returns the old buffer and writet and creates new buffer and writer but that should perhaps change 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package binding/rust Issues for the Rust crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refactor delta writer to not use buffer
2 participants