Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ async fn execute_non_empty_expr(
filter.clone(),
table_partition_cols.clone(),
log_store.object_store(Some(operation_id)),
Some(snapshot.table_properties().target_file_size().get() as usize),
Some(snapshot.table_properties().target_file_size()),
None,
writer_properties.clone(),
writer_stats_config.clone(),
Expand Down Expand Up @@ -357,7 +357,7 @@ async fn execute_non_empty_expr(
cdc_filter,
table_partition_cols.clone(),
log_store.object_store(Some(operation_id)),
Some(snapshot.table_properties().target_file_size().get() as usize),
Some(snapshot.table_properties().target_file_size()),
None,
writer_properties,
writer_stats_config,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1406,7 +1406,7 @@ async fn execute(
write,
table_partition_cols.clone(),
log_store.object_store(Some(operation_id)),
Some(snapshot.table_properties().target_file_size().get() as usize),
Some(snapshot.table_properties().target_file_size()),
None,
writer_properties.clone(),
writer_stats_config.clone(),
Expand Down
17 changes: 0 additions & 17 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,23 +399,6 @@ pub fn get_num_idx_cols_and_stats_columns(
)
}

/// Get the target_file_size from the table configuration in the sates
/// If table_config does not exist (only can occur in the first write action) it takes
/// the configuration that was passed to the writerBuilder.
#[cfg(feature = "datafusion")]
pub(crate) fn get_target_file_size(
config: Option<&TableProperties>,
configuration: &HashMap<String, Option<String>>,
) -> u64 {
match &config {
Some(conf) => conf.target_file_size().get(),
_ => configuration
.get("delta.targetFileSize")
.and_then(|v| v.clone().map(|v| v.parse::<u64>().unwrap()))
.unwrap_or(crate::table::config::DEFAULT_TARGET_FILE_SIZE),
}
}

#[cfg(feature = "datafusion")]
mod datafusion_utils {
use datafusion::logical_expr::Expr;
Expand Down
36 changes: 22 additions & 14 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

use std::collections::HashMap;
use std::fmt;
use std::num::NonZeroU64;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

Expand Down Expand Up @@ -206,7 +207,7 @@ pub struct OptimizeBuilder<'a> {
/// Filters to select specific table partitions to be optimized
filters: &'a [PartitionFilter],
/// Desired file size after bin-packing files
target_size: Option<u64>,
target_size: Option<NonZeroU64>,
/// Properties passed to underlying parquet writer
writer_properties: Option<WriterProperties>,
/// Commit properties and configuration
Expand Down Expand Up @@ -267,7 +268,7 @@ impl<'a> OptimizeBuilder<'a> {
}

/// Set the target file size
pub fn with_target_size(mut self, target: u64) -> Self {
pub fn with_target_size(mut self, target: NonZeroU64) -> Self {
self.target_size = Some(target);
self
}
Expand Down Expand Up @@ -395,14 +396,14 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {

#[derive(Debug, Clone)]
struct OptimizeInput {
target_size: u64,
target_size: NonZeroU64,
predicate: Option<String>,
}

impl From<OptimizeInput> for DeltaOperation {
fn from(opt_input: OptimizeInput) -> Self {
DeltaOperation::Optimize {
target_size: opt_input.target_size as i64,
target_size: opt_input.target_size.get() as i64,
predicate: opt_input.predicate,
}
}
Expand Down Expand Up @@ -487,12 +488,12 @@ pub struct MergePlan {
/// Parameters passed to individual merge tasks
#[derive(Debug)]
pub struct MergeTaskParameters {
/// Parameters passed to optimize operation
input_parameters: OptimizeInput,
/// Schema of written files
file_schema: SchemaRef,
/// Properties passed to parquet writer
writer_properties: WriterProperties,
/// Input parameters for the optimize operation
input_parameters: OptimizeInput,
/// Num index cols to collect stats for
num_indexed_cols: DataSkippingNumIndexedCols,
/// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols
Expand All @@ -513,6 +514,7 @@ impl MergePlan {
files: MergeBin,
object_store: ObjectStoreRef,
read_stream: F,
ignore_target_size: bool,
) -> Result<(Vec<Action>, PartialMetrics), DeltaTableError>
where
F: Future<Output = Result<ParquetReadStream, DeltaTableError>> + Send + 'static,
Expand Down Expand Up @@ -549,7 +551,12 @@ impl MergePlan {
task_parameters.file_schema.clone(),
partition_values.clone(),
Some(task_parameters.writer_properties.clone()),
Some(task_parameters.input_parameters.target_size as usize),
// Since we know the total size of the bin, we can set the target file size to None.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if ignore_target_size {
None
} else {
Some(task_parameters.input_parameters.target_size)
},
None,
None,
)?;
Expand Down Expand Up @@ -683,6 +690,7 @@ impl MergePlan {
files,
object_store.clone(),
futures::future::ready(Ok(batch_stream)),
true,
));
util::flatten_join_error(rewrite_result)
})
Expand Down Expand Up @@ -727,6 +735,7 @@ impl MergePlan {
files,
log_store.object_store(Some(operation_id)),
batch_stream,
false,
));
util::flatten_join_error(rewrite_result)
})
Expand Down Expand Up @@ -831,12 +840,11 @@ pub async fn create_merge_plan(
optimize_type: OptimizeType,
snapshot: &EagerSnapshot,
filters: &[PartitionFilter],
target_size: Option<u64>,
target_size: Option<NonZeroU64>,
writer_properties: WriterProperties,
session: SessionState,
) -> Result<MergePlan, DeltaTableError> {
let target_size =
target_size.unwrap_or_else(|| snapshot.table_properties().target_file_size().get());
let target_size = target_size.unwrap_or_else(|| snapshot.table_properties().target_file_size());
let partitions_keys = snapshot.metadata().partition_columns();

let (operations, metrics) = match optimize_type {
Expand Down Expand Up @@ -877,9 +885,9 @@ pub async fn create_merge_plan(
operations,
metrics,
task_parameters: Arc::new(MergeTaskParameters {
input_parameters,
file_schema,
writer_properties,
input_parameters,
num_indexed_cols: snapshot.table_properties().num_indexed_cols(),
stats_columns: snapshot
.table_properties()
Expand Down Expand Up @@ -937,7 +945,7 @@ async fn build_compaction_plan(
log_store: &dyn LogStore,
snapshot: &EagerSnapshot,
filters: &[PartitionFilter],
target_size: u64,
target_size: NonZeroU64,
) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
let mut metrics = Metrics::default();

Expand All @@ -947,7 +955,7 @@ async fn build_compaction_plan(
let file = file?;
metrics.total_considered_files += 1;
let object_meta = ObjectMeta::try_from(&file)?;
if object_meta.size > target_size {
if object_meta.size > target_size.get() {
metrics.total_files_skipped += 1;
continue;
}
Expand Down Expand Up @@ -980,7 +988,7 @@ async fn build_compaction_plan(

'files: for file in files {
for bin in merge_bins.iter_mut() {
if bin.total_file_size() + file.size as u64 <= target_size {
if bin.total_file_size() + file.size as u64 <= target_size.get() {
bin.add(file);
// Move to next file
continue 'files;
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ async fn execute(
physical_plan.clone(),
table_partition_cols.clone(),
log_store.object_store(Some(operation_id)).clone(),
Some(snapshot.table_properties().target_file_size().get() as usize),
Some(snapshot.table_properties().target_file_size()),
None,
writer_properties.clone(),
writer_stats_config.clone(),
Expand Down Expand Up @@ -469,7 +469,7 @@ async fn execute(
df.create_physical_plan().await?,
table_partition_cols,
log_store.object_store(Some(operation_id)),
Some(snapshot.table_properties().target_file_size().get() as usize),
Some(snapshot.table_properties().target_file_size()),
None,
writer_properties,
writer_stats_config,
Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/operations/write/execution.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::num::NonZeroU64;
use std::sync::{Arc, OnceLock};
use std::vec;

Expand Down Expand Up @@ -60,7 +61,7 @@ pub(crate) async fn write_execution_plan_cdc(
plan: Arc<dyn ExecutionPlan>,
partition_columns: Vec<String>,
object_store: ObjectStoreRef,
target_file_size: Option<usize>,
target_file_size: Option<NonZeroU64>,
write_batch_size: Option<usize>,
writer_properties: Option<WriterProperties>,
writer_stats_config: WriterStatsConfig,
Expand Down Expand Up @@ -107,7 +108,7 @@ pub(crate) async fn write_execution_plan(
plan: Arc<dyn ExecutionPlan>,
partition_columns: Vec<String>,
object_store: ObjectStoreRef,
target_file_size: Option<usize>,
target_file_size: Option<NonZeroU64>,
write_batch_size: Option<usize>,
writer_properties: Option<WriterProperties>,
writer_stats_config: WriterStatsConfig,
Expand Down Expand Up @@ -175,7 +176,7 @@ pub(crate) async fn execute_non_empty_expr(
filter,
partition_columns.clone(),
log_store.object_store(Some(operation_id)),
Some(snapshot.table_properties().target_file_size().get() as usize),
Some(snapshot.table_properties().target_file_size()),
None,
writer_properties.clone(),
writer_stats_config.clone(),
Expand Down Expand Up @@ -265,7 +266,7 @@ pub(crate) async fn write_execution_plan_v2(
plan: Arc<dyn ExecutionPlan>,
partition_columns: Vec<String>,
object_store: ObjectStoreRef,
target_file_size: Option<usize>,
target_file_size: Option<NonZeroU64>,
write_batch_size: Option<usize>,
writer_properties: Option<WriterProperties>,
writer_stats_config: WriterStatsConfig,
Expand Down
17 changes: 13 additions & 4 deletions crates/core/src/operations/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
//! ````

use std::collections::HashMap;
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -68,6 +69,7 @@ use crate::kernel::{
};
use crate::logstore::LogStoreRef;
use crate::protocol::{DeltaOperation, SaveMode};
use crate::table::config::{TablePropertiesExt, DEFAULT_TARGET_FILE_SIZE};
use crate::DeltaTable;

pub mod configs;
Expand Down Expand Up @@ -143,7 +145,8 @@ pub struct WriteBuilder {
/// When using `Overwrite` mode, replace data that matches a predicate
predicate: Option<Expression>,
/// Size above which we will write a buffered parquet file to disk.
target_file_size: Option<usize>,
/// If None, the writer will not create a new file until the writer is closed.
target_file_size: Option<Option<NonZeroU64>>,
/// Number of records to be written in single batch to underlying writer
write_batch_size: Option<usize>,
/// whether to overwrite the schema or to merge it. None means to fail on schmema drift
Expand Down Expand Up @@ -259,7 +262,7 @@ impl WriteBuilder {
}

/// Specify the target file size for data files written to the delta table.
pub fn with_target_file_size(mut self, target_file_size: usize) -> Self {
pub fn with_target_file_size(mut self, target_file_size: Option<NonZeroU64>) -> Self {
self.target_file_size = Some(target_file_size);
self
}
Expand Down Expand Up @@ -616,9 +619,15 @@ impl std::future::IntoFuture for WriteBuilder {
.as_ref()
.map(|snapshot| snapshot.table_properties());

let target_file_size = this.target_file_size.or_else(|| {
Some(super::get_target_file_size(config, &this.configuration) as usize)
let target_file_size = this.target_file_size.unwrap_or_else(|| {
Some(
this.snapshot
.as_ref()
.map(|snapshot| snapshot.table_properties().target_file_size())
.unwrap_or(DEFAULT_TARGET_FILE_SIZE),
)
});

let (num_indexed_cols, stats_columns) =
super::get_num_idx_cols_and_stats_columns(config, this.configuration);

Expand Down
Loading
Loading