diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index e8e5a62d8c..8ec1c836e5 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -261,6 +261,7 @@ async fn execute_non_empty_expr( None, writer_properties.clone(), writer_stats_config.clone(), + false, ) .await?; @@ -297,6 +298,7 @@ async fn execute_non_empty_expr( None, writer_properties, writer_stats_config, + false, ) .await?; actions.extend(cdc_actions) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 0b38e787c5..eb325a739b 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -1394,6 +1394,7 @@ async fn execute( writer_stats_config.clone(), None, should_cdc, // if true, write execution plan splits batches in [normal, cdc] data before writing + false, ) .await?; if let Some(schema_metadata) = schema_action { diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 354ffc511a..321536e9c8 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -512,6 +512,7 @@ impl MergePlan { Some(task_parameters.writer_properties.clone()), Some(task_parameters.input_parameters.target_size as usize), None, + false, )?; let mut writer = PartitionWriter::try_with_config( object_store, diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 38d7056188..198cc0c519 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -395,6 +395,7 @@ async fn execute( None, writer_properties.clone(), writer_stats_config.clone(), + false, ) .await?; @@ -457,6 +458,7 @@ async fn execute( None, writer_properties, writer_stats_config, + false, ) .await?; actions.extend(cdc_actions); diff --git a/crates/core/src/operations/write/execution.rs b/crates/core/src/operations/write/execution.rs index c1eac86e86..3925b2477f 100644 --- a/crates/core/src/operations/write/execution.rs +++ b/crates/core/src/operations/write/execution.rs @@ -53,6 +53,7 @@ pub(crate) async fn write_execution_plan_cdc( write_batch_size: Option, writer_properties: Option, writer_stats_config: WriterStatsConfig, + flush_per_batch: bool, ) -> DeltaResult> { let cdc_store = Arc::new(PrefixStore::new(object_store, "_change_data")); @@ -66,6 +67,7 @@ pub(crate) async fn write_execution_plan_cdc( write_batch_size, writer_properties, writer_stats_config, + flush_per_batch, ) .await? .into_iter() @@ -100,6 +102,7 @@ pub(crate) async fn write_execution_plan( write_batch_size: Option, writer_properties: Option, writer_stats_config: WriterStatsConfig, + flush_per_batch: bool, ) -> DeltaResult> { let (actions, _) = write_execution_plan_v2( snapshot, @@ -113,6 +116,7 @@ pub(crate) async fn write_execution_plan( writer_stats_config, None, false, + flush_per_batch, ) .await?; Ok(actions) @@ -172,6 +176,7 @@ pub(crate) async fn execute_non_empty_expr( None, writer_properties.clone(), writer_stats_config.clone(), + false, ) .await?; @@ -257,6 +262,7 @@ pub(crate) async fn write_execution_plan_v2( writer_stats_config: WriterStatsConfig, predicate: Option, contains_cdc: bool, + flush_per_batch: bool, ) -> DeltaResult<(Vec, WriteExecutionPlanMetrics)> { // We always take the plan Schema since the data may contain Large/View arrow types, // the schema and batches were prior constructed with this in mind. @@ -294,6 +300,7 @@ pub(crate) async fn write_execution_plan_v2( write_batch_size, writer_stats_config.num_indexed_cols, writer_stats_config.stats_columns.clone(), + flush_per_batch, ); let mut writer = DeltaWriter::new(object_store.clone(), config); let checker_stream = checker.clone(); @@ -360,6 +367,7 @@ pub(crate) async fn write_execution_plan_v2( write_batch_size, writer_stats_config.num_indexed_cols, writer_stats_config.stats_columns.clone(), + flush_per_batch, ); let cdf_config = WriterConfig::new( @@ -370,6 +378,7 @@ pub(crate) async fn write_execution_plan_v2( write_batch_size, writer_stats_config.num_indexed_cols, writer_stats_config.stats_columns.clone(), + flush_per_batch, ); let mut writer = DeltaWriter::new(object_store.clone(), normal_config); diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index 288ff6d8ec..7b6c4eb060 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -147,6 +147,8 @@ pub struct WriteBuilder { target_file_size: Option, /// Number of records to be written in single batch to underlying writer write_batch_size: Option, + /// Write in-memory buffer to disk after each batch if true + flush_per_batch: bool, /// whether to overwrite the schema or to merge it. None means to fail on schmema drift schema_mode: Option, /// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false) @@ -201,6 +203,7 @@ impl WriteBuilder { predicate: None, target_file_size: None, write_batch_size: None, + flush_per_batch: false, safe_cast: false, schema_mode: None, writer_properties: None, @@ -264,6 +267,12 @@ impl WriteBuilder { self } + /// If true, write in-memory buffer to disk after each batch + pub fn with_flush_per_batch(mut self, flush_per_batch: bool) -> Self { + self.flush_per_batch = flush_per_batch; + self + } + /// Specify the safety of the casting operation /// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false) pub fn with_cast_safety(mut self, safe: bool) -> Self { @@ -701,6 +710,7 @@ impl std::future::IntoFuture for WriteBuilder { writer_stats_config.clone(), predicate.clone(), contains_cdc, + this.flush_per_batch, ) .await?; @@ -1993,5 +2003,23 @@ mod tests { Ok(()) } + + #[test] + fn test_write_builder_flush_per_batch() { + let ops = DeltaOps::new_in_memory(); + let log_store = ops.write(vec![]).log_store().clone(); + + // Default + let builder = WriteBuilder::new(log_store.clone(), None); + assert!(!builder.flush_per_batch); + + // True + let builder = WriteBuilder::new(log_store.clone(), None).with_flush_per_batch(true); + assert!(builder.flush_per_batch); + + // False + let builder = WriteBuilder::new(log_store.clone(), None).with_flush_per_batch(false); + assert!(!builder.flush_per_batch); + } } } diff --git a/crates/core/src/operations/write/writer.rs b/crates/core/src/operations/write/writer.rs index edc5784e6e..f4c0d1a68c 100644 --- a/crates/core/src/operations/write/writer.rs +++ b/crates/core/src/operations/write/writer.rs @@ -110,6 +110,8 @@ pub struct WriterConfig { num_indexed_cols: i32, /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols stats_columns: Option>, + /// Write in-memory buffer to disk after each batch if true + flush_per_batch: bool, } impl WriterConfig { @@ -122,6 +124,7 @@ impl WriterConfig { write_batch_size: Option, num_indexed_cols: i32, stats_columns: Option>, + flush_per_batch: bool, ) -> Self { let writer_properties = writer_properties.unwrap_or_else(|| { WriterProperties::builder() @@ -139,6 +142,7 @@ impl WriterConfig { write_batch_size, num_indexed_cols, stats_columns, + flush_per_batch, } } @@ -210,6 +214,7 @@ impl DeltaWriter { Some(self.config.writer_properties.clone()), Some(self.config.target_file_size), Some(self.config.write_batch_size), + self.config.flush_per_batch, )?; let mut writer = PartitionWriter::try_with_config( self.object_store.clone(), @@ -275,6 +280,8 @@ pub struct PartitionWriterConfig { /// Row chunks passed to parquet writer. This and the internal parquet writer settings /// determine how fine granular we can track / control the size of resulting files. write_batch_size: usize, + /// Write in-memory buffer to disk after each batch if true + flush_per_batch: bool, } impl PartitionWriterConfig { @@ -285,6 +292,7 @@ impl PartitionWriterConfig { writer_properties: Option, target_file_size: Option, write_batch_size: Option, + flush_per_batch: bool, ) -> DeltaResult { let part_path = partition_values.hive_partition_path(); let prefix = Path::parse(part_path)?; @@ -303,6 +311,7 @@ impl PartitionWriterConfig { writer_properties, target_file_size, write_batch_size, + flush_per_batch, }) } } @@ -448,7 +457,8 @@ impl PartitionWriter { } /// Buffers record batches in-memory up to appx. `target_file_size`. - /// Flushes data to storage once a full file can be written. + /// Flushes data to storage once a full file can be written or after + /// each batch if flush_per_batch is true. /// /// The `close` method has to be invoked to write all data still buffered /// and get the list of all written files. @@ -465,11 +475,17 @@ impl PartitionWriter { for offset in (0..max_offset).step_by(self.config.write_batch_size) { let length = usize::min(self.config.write_batch_size, max_offset - offset); self.write_batch(&batch.slice(offset, length)).await?; - // flush currently buffered data to disk once we meet or exceed the target file size. - let estimated_size = self.buffer.len().await + self.arrow_writer.in_progress_size(); - if estimated_size >= self.config.target_file_size { - debug!("Writing file with estimated size {estimated_size:?} to disk."); + + if self.config.flush_per_batch { + // flush currently buffered data to disk after writing each batch self.flush_arrow_writer().await?; + } else { + // flush currently buffered data to disk once we meet or exceed the target file size. + let estimated_size = self.buffer.len().await + self.arrow_writer.in_progress_size(); + if estimated_size >= self.config.target_file_size { + debug!("Writing file with estimated size {estimated_size:?} to disk."); + self.flush_arrow_writer().await?; + } } } @@ -500,6 +516,7 @@ mod tests { writer_properties: Option, target_file_size: Option, write_batch_size: Option, + flush_per_batch: bool, ) -> DeltaWriter { let config = WriterConfig::new( batch.schema(), @@ -509,6 +526,7 @@ mod tests { write_batch_size, DEFAULT_NUM_INDEX_COLS, None, + flush_per_batch, ); DeltaWriter::new(object_store, config) } @@ -519,6 +537,7 @@ mod tests { writer_properties: Option, target_file_size: Option, write_batch_size: Option, + flush_per_batch: bool, ) -> PartitionWriter { let config = PartitionWriterConfig::try_new( batch.schema(), @@ -526,6 +545,7 @@ mod tests { writer_properties, target_file_size, write_batch_size, + flush_per_batch, ) .unwrap(); PartitionWriter::try_with_config(object_store, config, DEFAULT_NUM_INDEX_COLS, None) @@ -541,7 +561,8 @@ mod tests { let batch = get_record_batch(None, false); // write single un-partitioned batch - let mut writer = get_partition_writer(object_store.clone(), &batch, None, None, None); + let mut writer = + get_partition_writer(object_store.clone(), &batch, None, None, None, false); writer.write(&batch).await.unwrap(); let files = list(object_store.as_ref(), None).await.unwrap(); assert_eq!(files.len(), 0); @@ -574,8 +595,14 @@ mod tests { .set_max_row_group_size(1024) .build(); // configure small target file size and and row group size so we can observe multiple files written - let mut writer = - get_partition_writer(object_store, &batch, Some(properties), Some(10_000), None); + let mut writer = get_partition_writer( + object_store, + &batch, + Some(properties), + Some(10_000), + None, + false, + ); writer.write(&batch).await.unwrap(); // check that we have written more then once file, and no more then 1 is below target size @@ -602,7 +629,8 @@ mod tests { .unwrap() .object_store(None); // configure small target file size so we can observe multiple files written - let mut writer = get_partition_writer(object_store, &batch, None, Some(10_000), None); + let mut writer = + get_partition_writer(object_store, &batch, None, Some(10_000), None, false); writer.write(&batch).await.unwrap(); // check that we have written more then once file, and no more then 1 is below target size @@ -630,7 +658,8 @@ mod tests { .object_store(None); // configure high batch size and low file size to observe one file written and flushed immediately // upon writing batch, then ensures the buffer is empty upon closing writer - let mut writer = get_partition_writer(object_store, &batch, None, Some(9000), Some(10000)); + let mut writer = + get_partition_writer(object_store, &batch, None, Some(9000), Some(10000), false); writer.write(&batch).await.unwrap(); let adds = writer.close().await.unwrap(); @@ -646,7 +675,7 @@ mod tests { let batch = get_record_batch(None, false); // write single un-partitioned batch - let mut writer = get_delta_writer(object_store.clone(), &batch, None, None, None); + let mut writer = get_delta_writer(object_store.clone(), &batch, None, None, None, false); writer.write(&batch).await.unwrap(); // Ensure the write hasn't been flushed let files = list(object_store.as_ref(), None).await.unwrap(); @@ -685,4 +714,115 @@ mod tests { } }; } + + #[test] + fn test_writer_config_flush_per_batch() { + let config = WriterConfig::new( + Arc::new(ArrowSchema::empty()), + vec![], + None, + None, + None, + DEFAULT_NUM_INDEX_COLS, + None, + true, // flush_per_batch + ); + assert!(config.flush_per_batch); + + let config = WriterConfig::new( + Arc::new(ArrowSchema::empty()), + vec![], + None, + None, + None, + DEFAULT_NUM_INDEX_COLS, + None, + false, // flush_per_batch + ); + assert!(!config.flush_per_batch); + } + + #[tokio::test] + async fn test_partition_writer_flush_per_batch() { + let base_int = Arc::new(Int32Array::from((0..100).collect::>())); + let base_str = Arc::new(StringArray::from(vec!["test"; 100])); + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("value", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new(schema, vec![base_int, base_str]).unwrap(); + + let object_store = DeltaTableBuilder::from_uri("memory:///") + .build_storage() + .unwrap() + .object_store(None); + + // flush_per_batch = true + let mut writer_true = get_partition_writer( + object_store.clone(), + &batch, + None, + None, + Some(20), // Small batch size + true, // flush_per_batch + ); + writer_true.write(&batch).await.unwrap(); + let adds_true = writer_true.close().await.unwrap(); + + // flush_per_batch = false + let object_store2 = DeltaTableBuilder::from_uri("memory:///test2") + .build_storage() + .unwrap() + .object_store(None); + let mut writer_false = get_partition_writer( + object_store2, + &batch, + None, + None, + Some(20), // Small batch size + false, // flush_per_batch + ); + writer_false.write(&batch).await.unwrap(); + let adds_false = writer_false.close().await.unwrap(); + + // Verify the flush behavior: flush_per_batch=true should create more files + // because it flushes (and creates a new file) after each batch + assert!(adds_true.len() > adds_false.len(), "flush_per_batch=true should create more files than flush_per_batch=false. Got {} vs {}", adds_true.len(), adds_false.len()); + + // Verify both wrote the same total number of records + let total_records_true: i64 = adds_true + .iter() + .map(|add| { + if let Some(stats) = &add.stats { + serde_json::from_str::(stats).unwrap()["numRecords"] + .as_i64() + .unwrap_or(0) + } else { + 0 + } + }) + .sum(); + + let total_records_false: i64 = adds_false + .iter() + .map(|add| { + if let Some(stats) = &add.stats { + serde_json::from_str::(stats).unwrap()["numRecords"] + .as_i64() + .unwrap_or(0) + } else { + 0 + } + }) + .sum(); + + assert_eq!( + total_records_true, total_records_false, + "Both flush modes should write same total number of records" + ); + assert_eq!( + total_records_true, 100, + "Should have written all 100 records" + ); + } } diff --git a/python/deltalake/writer/writer.py b/python/deltalake/writer/writer.py index 09741ddf32..3bca83b4d6 100644 --- a/python/deltalake/writer/writer.py +++ b/python/deltalake/writer/writer.py @@ -78,6 +78,7 @@ def write_deltalake( writer_properties: WriterProperties | None = None, post_commithook_properties: PostCommitHookProperties | None = None, commit_properties: CommitProperties | None = None, + flush_per_batch: bool = False, ) -> None: """Write to a Delta Lake table @@ -102,6 +103,7 @@ def write_deltalake( writer_properties: Pass writer properties to the Rust parquet writer. post_commithook_properties: properties for the post commit hook. If None, default values are used. commit_properties: properties of the transaction commit. If None, default values are used. + flush_per_batch: TODO """ table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) if table is not None: @@ -136,6 +138,7 @@ def write_deltalake( writer_properties=writer_properties, commit_properties=commit_properties, post_commithook_properties=post_commithook_properties, + flush_per_batch=flush_per_batch, ) else: write_deltalake_rust( @@ -154,4 +157,5 @@ def write_deltalake( writer_properties=writer_properties, commit_properties=commit_properties, post_commithook_properties=post_commithook_properties, + flush_per_batch=flush_per_batch, ) diff --git a/python/src/lib.rs b/python/src/lib.rs index 0a56214feb..6ed20be4be 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1649,7 +1649,7 @@ impl RawDeltaTable { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (data, batch_schema, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))] + #[pyo3(signature = (data, batch_schema, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, writer_properties=None, commit_properties=None, post_commithook_properties=None, flush_per_batch=false))] fn write( &self, py: Python, @@ -1666,6 +1666,7 @@ impl RawDeltaTable { writer_properties: Option, commit_properties: Option, post_commithook_properties: Option, + flush_per_batch: bool, ) -> PyResult<()> { let table = py.allow_threads(|| { let save_mode = mode.parse().map_err(PythonError::from)?; @@ -1691,6 +1692,8 @@ impl RawDeltaTable { builder = builder.with_input_execution_plan(Arc::new(plan)); + builder = builder.with_flush_per_batch(flush_per_batch); + if let Some(schema_mode) = schema_mode { builder = builder.with_schema_mode(schema_mode.parse().map_err(PythonError::from)?); } @@ -1699,6 +1702,11 @@ impl RawDeltaTable { } if let Some(writer_props) = writer_properties { + // Extract write_batch_size and set it on WriteBuilder because WriterConfig::new() + // does not extract write_batch_size from WriterProperties + if let Some(write_batch_size) = writer_props.write_batch_size { + builder = builder.with_write_batch_size(write_batch_size); + } builder = builder.with_writer_properties( set_writer_properties(writer_props).map_err(PythonError::from)?, ); @@ -2262,7 +2270,7 @@ pub struct PyCommitProperties { #[pyfunction] #[allow(clippy::too_many_arguments)] -#[pyo3(signature = (table_uri, data, batch_schema, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))] +#[pyo3(signature = (table_uri, data, batch_schema, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None, flush_per_batch=false))] fn write_to_deltalake( py: Python, table_uri: String, @@ -2280,6 +2288,7 @@ fn write_to_deltalake( writer_properties: Option, commit_properties: Option, post_commithook_properties: Option, + flush_per_batch: bool, ) -> PyResult<()> { let raw_table: DeltaResult = py.allow_threads(|| { let options = storage_options.clone().unwrap_or_default(); @@ -2315,6 +2324,7 @@ fn write_to_deltalake( writer_properties, commit_properties, post_commithook_properties, + flush_per_batch, ) } diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index c27626c5cd..77f51e05dc 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -2503,3 +2503,51 @@ def test_polars_write_array(tmp_path: pathlib.Path): df, mode="overwrite", ) + + +def test_write_flush_per_batch_parameter(tmp_path: pathlib.Path): + """Test that flush_per_batch parameter affects file creation behavior""" + from deltalake import WriterProperties + + test_data = Table( + { + "id": Array( + list(range(100)), + ArrowField("id", type=DataType.int32(), nullable=False), + ), + "value": Array( + [f"val_{i}" for i in range(100)], + ArrowField("value", type=DataType.string(), nullable=False), + ), + } + ) + + writer_props = WriterProperties(write_batch_size=20) + + write_deltalake( + tmp_path / "flush_false", + test_data, + mode="overwrite", + flush_per_batch=False, + writer_properties=writer_props, + ) + + table_false = DeltaTable(tmp_path / "flush_false") + files_false = table_false.file_uris() + + write_deltalake( + tmp_path / "flush_true", + test_data, + mode="overwrite", + flush_per_batch=True, + writer_properties=writer_props, + ) + + table_true = DeltaTable(tmp_path / "flush_true") + files_true = table_true.file_uris() + + assert len(files_true) > len(files_false), ( + f"flush_per_batch=True should create more files. Got {len(files_true)} files with True vs {len(files_false)} files with False." + ) + + assert table_false.to_pyarrow_table() == table_true.to_pyarrow_table()