Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ async fn execute_non_empty_expr(
None,
writer_properties.clone(),
writer_stats_config.clone(),
false,
)
.await?;

Expand Down Expand Up @@ -297,6 +298,7 @@ async fn execute_non_empty_expr(
None,
writer_properties,
writer_stats_config,
false,
)
.await?;
actions.extend(cdc_actions)
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,7 @@ async fn execute(
writer_stats_config.clone(),
None,
should_cdc, // if true, write execution plan splits batches in [normal, cdc] data before writing
false,
)
.await?;
if let Some(schema_metadata) = schema_action {
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ impl MergePlan {
Some(task_parameters.writer_properties.clone()),
Some(task_parameters.input_parameters.target_size as usize),
None,
false,
)?;
let mut writer = PartitionWriter::try_with_config(
object_store,
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ async fn execute(
None,
writer_properties.clone(),
writer_stats_config.clone(),
false,
)
.await?;

Expand Down Expand Up @@ -457,6 +458,7 @@ async fn execute(
None,
writer_properties,
writer_stats_config,
false,
)
.await?;
actions.extend(cdc_actions);
Expand Down
9 changes: 9 additions & 0 deletions crates/core/src/operations/write/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub(crate) async fn write_execution_plan_cdc(
write_batch_size: Option<usize>,
writer_properties: Option<WriterProperties>,
writer_stats_config: WriterStatsConfig,
flush_per_batch: bool,
) -> DeltaResult<Vec<Action>> {
let cdc_store = Arc::new(PrefixStore::new(object_store, "_change_data"));

Expand All @@ -66,6 +67,7 @@ pub(crate) async fn write_execution_plan_cdc(
write_batch_size,
writer_properties,
writer_stats_config,
flush_per_batch,
)
.await?
.into_iter()
Expand Down Expand Up @@ -100,6 +102,7 @@ pub(crate) async fn write_execution_plan(
write_batch_size: Option<usize>,
writer_properties: Option<WriterProperties>,
writer_stats_config: WriterStatsConfig,
flush_per_batch: bool,
) -> DeltaResult<Vec<Action>> {
let (actions, _) = write_execution_plan_v2(
snapshot,
Expand All @@ -113,6 +116,7 @@ pub(crate) async fn write_execution_plan(
writer_stats_config,
None,
false,
flush_per_batch,
)
.await?;
Ok(actions)
Expand Down Expand Up @@ -172,6 +176,7 @@ pub(crate) async fn execute_non_empty_expr(
None,
writer_properties.clone(),
writer_stats_config.clone(),
false,
)
.await?;

Expand Down Expand Up @@ -257,6 +262,7 @@ pub(crate) async fn write_execution_plan_v2(
writer_stats_config: WriterStatsConfig,
predicate: Option<Expr>,
contains_cdc: bool,
flush_per_batch: bool,
) -> DeltaResult<(Vec<Action>, WriteExecutionPlanMetrics)> {
// We always take the plan Schema since the data may contain Large/View arrow types,
// the schema and batches were prior constructed with this in mind.
Expand Down Expand Up @@ -294,6 +300,7 @@ pub(crate) async fn write_execution_plan_v2(
write_batch_size,
writer_stats_config.num_indexed_cols,
writer_stats_config.stats_columns.clone(),
flush_per_batch,
);
let mut writer = DeltaWriter::new(object_store.clone(), config);
let checker_stream = checker.clone();
Expand Down Expand Up @@ -360,6 +367,7 @@ pub(crate) async fn write_execution_plan_v2(
write_batch_size,
writer_stats_config.num_indexed_cols,
writer_stats_config.stats_columns.clone(),
flush_per_batch,
);

let cdf_config = WriterConfig::new(
Expand All @@ -370,6 +378,7 @@ pub(crate) async fn write_execution_plan_v2(
write_batch_size,
writer_stats_config.num_indexed_cols,
writer_stats_config.stats_columns.clone(),
flush_per_batch,
);

let mut writer = DeltaWriter::new(object_store.clone(), normal_config);
Expand Down
28 changes: 28 additions & 0 deletions crates/core/src/operations/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ pub struct WriteBuilder {
target_file_size: Option<usize>,
/// Number of records to be written in single batch to underlying writer
write_batch_size: Option<usize>,
/// Write in-memory buffer to disk after each batch if true
flush_per_batch: bool,
/// whether to overwrite the schema or to merge it. None means to fail on schmema drift
schema_mode: Option<SchemaMode>,
/// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false)
Expand Down Expand Up @@ -201,6 +203,7 @@ impl WriteBuilder {
predicate: None,
target_file_size: None,
write_batch_size: None,
flush_per_batch: false,
safe_cast: false,
schema_mode: None,
writer_properties: None,
Expand Down Expand Up @@ -264,6 +267,12 @@ impl WriteBuilder {
self
}

/// If true, write in-memory buffer to disk after each batch
pub fn with_flush_per_batch(mut self, flush_per_batch: bool) -> Self {
self.flush_per_batch = flush_per_batch;
self
}

/// Specify the safety of the casting operation
/// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false)
pub fn with_cast_safety(mut self, safe: bool) -> Self {
Expand Down Expand Up @@ -701,6 +710,7 @@ impl std::future::IntoFuture for WriteBuilder {
writer_stats_config.clone(),
predicate.clone(),
contains_cdc,
this.flush_per_batch,
)
.await?;

Expand Down Expand Up @@ -1993,5 +2003,23 @@ mod tests {

Ok(())
}

#[test]
fn test_write_builder_flush_per_batch() {
let ops = DeltaOps::new_in_memory();
let log_store = ops.write(vec![]).log_store().clone();

// Default
let builder = WriteBuilder::new(log_store.clone(), None);
assert!(!builder.flush_per_batch);

// True
let builder = WriteBuilder::new(log_store.clone(), None).with_flush_per_batch(true);
assert!(builder.flush_per_batch);

// False
let builder = WriteBuilder::new(log_store.clone(), None).with_flush_per_batch(false);
assert!(!builder.flush_per_batch);
}
}
}
Loading
Loading