Skip to content

Commit 7b9f2f8

Browse files
roeaprtyler
authored andcommitted
refactor!: use kernel TableProperties
Signed-off-by: Robert Pack <[email protected]>
1 parent fdb5ea4 commit 7b9f2f8

File tree

25 files changed

+237
-432
lines changed

25 files changed

+237
-432
lines changed

crates/core/src/delta_datafusion/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ use crate::kernel::{
9595
};
9696
use crate::logstore::LogStoreRef;
9797
use crate::table::builder::ensure_table_uri;
98+
use crate::table::config::TablePropertiesExt as _;
9899
use crate::table::state::DeltaTableState;
99100
use crate::table::{Constraint, GeneratedColumn};
100101
use crate::{open_table, open_table_with_storage_options, DeltaTable};

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use delta_kernel::path::{LogPathFileType, ParsedLogPath};
2626
use delta_kernel::scan::scan_row_schema;
2727
use delta_kernel::schema::SchemaRef;
2828
use delta_kernel::snapshot::Snapshot as KernelSnapshot;
29+
use delta_kernel::table_properties::TableProperties;
2930
use delta_kernel::{PredicateRef, Version};
3031
use futures::stream::BoxStream;
3132
use futures::{StreamExt, TryStreamExt};
@@ -42,7 +43,6 @@ use crate::kernel::parse::read_removes;
4243
use crate::kernel::transaction::CommitData;
4344
use crate::kernel::{ActionType, Add, StructType};
4445
use crate::logstore::{LogStore, LogStoreExt};
45-
use crate::table::config::TableConfig;
4646
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};
4747

4848
pub use self::log_data::*;
@@ -208,8 +208,8 @@ impl Snapshot {
208208
}
209209

210210
/// Well known table configuration
211-
pub fn table_config(&self) -> TableConfig<'_> {
212-
TableConfig(self.inner.metadata().configuration())
211+
pub fn table_config(&self) -> &TableProperties {
212+
self.inner.table_properties()
213213
}
214214

215215
/// Get the active files for the current snapshot.
@@ -514,7 +514,7 @@ impl EagerSnapshot {
514514
}
515515

516516
/// Well known table configuration
517-
pub fn table_config(&self) -> TableConfig<'_> {
517+
pub fn table_config(&self) -> &TableProperties {
518518
self.snapshot.table_config()
519519
}
520520

crates/core/src/kernel/transaction/conflict_checker.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
//! Helper module to check if a transaction can be committed in case of conflicting commits.
22
use std::collections::HashSet;
33

4+
use delta_kernel::table_properties::IsolationLevel;
5+
46
use super::CommitInfo;
57
#[cfg(feature = "datafusion")]
68
use crate::delta_datafusion::DataFusionMixins;
@@ -10,7 +12,7 @@ use crate::kernel::Transaction;
1012
use crate::kernel::{Action, Add, Metadata, Protocol, Remove};
1113
use crate::logstore::{get_actions, LogStore};
1214
use crate::protocol::DeltaOperation;
13-
use crate::table::config::IsolationLevel;
15+
use crate::table::config::TablePropertiesExt as _;
1416
use crate::DeltaTableError;
1517

1618
#[cfg(feature = "datafusion")]

crates/core/src/kernel/transaction/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ use std::sync::Arc;
7979
use bytes::Bytes;
8080
use chrono::Utc;
8181
use conflict_checker::ConflictChecker;
82+
use delta_kernel::table_properties::TableProperties;
8283
use futures::future::BoxFuture;
8384
use object_store::path::Path;
8485
use object_store::Error as ObjectStoreError;
@@ -97,7 +98,7 @@ use crate::logstore::{CommitOrBytes, LogStoreRef};
9798
use crate::operations::CustomExecuteHandler;
9899
use crate::protocol::DeltaOperation;
99100
use crate::protocol::{cleanup_expired_logs_for, create_checkpoint_for};
100-
use crate::table::config::TableConfig;
101+
use crate::table::config::TablePropertiesExt as _;
101102
use crate::table::state::DeltaTableState;
102103
use crate::{crate_version, DeltaResult};
103104

@@ -235,7 +236,7 @@ impl From<CommitBuilderError> for DeltaTableError {
235236
/// Reference to some structure that contains mandatory attributes for performing a commit.
236237
pub trait TableReference: Send + Sync {
237238
/// Well known table configuration
238-
fn config(&self) -> TableConfig;
239+
fn config(&self) -> &TableProperties;
239240

240241
/// Get the table protocol of the snapshot
241242
fn protocol(&self) -> &Protocol;
@@ -256,7 +257,7 @@ impl TableReference for EagerSnapshot {
256257
EagerSnapshot::metadata(self)
257258
}
258259

259-
fn config(&self) -> TableConfig {
260+
fn config(&self) -> &TableProperties {
260261
self.table_config()
261262
}
262263

@@ -266,7 +267,7 @@ impl TableReference for EagerSnapshot {
266267
}
267268

268269
impl TableReference for DeltaTableState {
269-
fn config(&self) -> TableConfig {
270+
fn config(&self) -> &TableProperties {
270271
self.snapshot.config()
271272
}
272273

@@ -857,7 +858,7 @@ impl PostCommit {
857858
return Ok(false);
858859
}
859860

860-
let checkpoint_interval = table_state.config().checkpoint_interval() as i64;
861+
let checkpoint_interval = table_state.config().checkpoint_interval().get() as i64;
861862
if ((version + 1) % checkpoint_interval) == 0 {
862863
create_checkpoint_for(version as u64, log_store.as_ref(), Some(operation_id)).await?;
863864
Ok(true)

crates/core/src/kernel/transaction/protocol.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::kernel::{
88
contains_timestampntz, Action, EagerSnapshot, Protocol, ProtocolExt as _, Schema,
99
};
1010
use crate::protocol::DeltaOperation;
11+
use crate::table::config::TablePropertiesExt as _;
1112
use crate::table::state::DeltaTableState;
1213

1314
static READER_V2: LazyLock<HashSet<ReaderFeature>> =

crates/core/src/operations/cdc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
//! The CDC module contains private tools for managing CDC files
33
//!
44
5+
use crate::table::config::TablePropertiesExt as _;
56
use crate::table::state::DeltaTableState;
67
use crate::DeltaResult;
78

crates/core/src/operations/constraints.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ mod tests {
242242
use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
243243
use datafusion::logical_expr::{col, lit};
244244

245+
use crate::table::config::TablePropertiesExt as _;
245246
use crate::writer::test_utils::{create_bare_table, get_arrow_schema, get_record_batch};
246247
use crate::{DeltaOps, DeltaResult, DeltaTable};
247248

crates/core/src/operations/delete.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use crate::operations::write::execution::{write_execution_plan, write_execution_
5959
use crate::operations::write::WriterStatsConfig;
6060
use crate::operations::CustomExecuteHandler;
6161
use crate::protocol::DeltaOperation;
62+
use crate::table::config::TablePropertiesExt as _;
6263
use crate::table::state::DeltaTableState;
6364
use crate::{DeltaTable, DeltaTableError};
6465

@@ -237,7 +238,8 @@ async fn execute_non_empty_expr(
237238
snapshot.table_config().num_indexed_cols(),
238239
snapshot
239240
.table_config()
240-
.stats_columns()
241+
.data_skipping_stats_columns
242+
.as_ref()
241243
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
242244
);
243245

@@ -257,7 +259,7 @@ async fn execute_non_empty_expr(
257259
filter.clone(),
258260
table_partition_cols.clone(),
259261
log_store.object_store(Some(operation_id)),
260-
Some(snapshot.table_config().target_file_size() as usize),
262+
Some(snapshot.table_config().target_file_size().get() as usize),
261263
None,
262264
writer_properties.clone(),
263265
writer_stats_config.clone(),
@@ -293,7 +295,7 @@ async fn execute_non_empty_expr(
293295
cdc_filter,
294296
table_partition_cols.clone(),
295297
log_store.object_store(Some(operation_id)),
296-
Some(snapshot.table_config().target_file_size() as usize),
298+
Some(snapshot.table_config().target_file_size().get() as usize),
297299
None,
298300
writer_properties,
299301
writer_stats_config,

crates/core/src/operations/merge/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ use crate::operations::write::generated_columns::{
9191
};
9292
use crate::operations::write::WriterStatsConfig;
9393
use crate::protocol::{DeltaOperation, MergePredicate};
94+
use crate::table::config::TablePropertiesExt as _;
9495
use crate::table::state::DeltaTableState;
9596
use crate::{DeltaResult, DeltaTable, DeltaTableError};
9697

@@ -1378,7 +1379,8 @@ async fn execute(
13781379
snapshot.table_config().num_indexed_cols(),
13791380
snapshot
13801381
.table_config()
1381-
.stats_columns()
1382+
.data_skipping_stats_columns
1383+
.as_ref()
13821384
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
13831385
);
13841386

@@ -1388,7 +1390,7 @@ async fn execute(
13881390
write,
13891391
table_partition_cols.clone(),
13901392
log_store.object_store(Some(operation_id)),
1391-
Some(snapshot.table_config().target_file_size() as usize),
1393+
Some(snapshot.table_config().target_file_size().get() as usize),
13921394
None,
13931395
writer_properties.clone(),
13941396
writer_stats_config.clone(),

crates/core/src/operations/mod.rs

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
//! with a [data stream][datafusion::physical_plan::SendableRecordBatchStream],
88
//! if the operation returns data as well.
99
use async_trait::async_trait;
10+
use delta_kernel::table_properties::{DataSkippingNumIndexedCols, TableProperties};
1011
use std::collections::HashMap;
1112
use std::sync::Arc;
1213
use update_field_metadata::UpdateFieldMetadataBuilder;
@@ -36,6 +37,7 @@ use self::{
3637
use crate::errors::{DeltaResult, DeltaTableError};
3738
use crate::logstore::LogStoreRef;
3839
use crate::table::builder::DeltaTableBuilder;
40+
use crate::table::config::{TablePropertiesExt as _, DEFAULT_NUM_INDEX_COLS};
3941
use crate::DeltaTable;
4042

4143
pub mod add_column;
@@ -333,19 +335,33 @@ impl AsRef<DeltaTable> for DeltaOps {
333335
/// If table_config does not exist (only can occur in the first write action) it takes
334336
/// the configuration that was passed to the writerBuilder.
335337
pub fn get_num_idx_cols_and_stats_columns(
336-
config: Option<crate::table::config::TableConfig<'_>>,
338+
config: Option<&TableProperties>,
337339
configuration: HashMap<String, Option<String>>,
338-
) -> (i32, Option<Vec<String>>) {
340+
) -> (DataSkippingNumIndexedCols, Option<Vec<String>>) {
339341
let (num_index_cols, stats_columns) = match &config {
340-
Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()),
342+
Some(conf) => (
343+
conf.num_indexed_cols(),
344+
conf.data_skipping_stats_columns
345+
.clone()
346+
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
347+
),
341348
_ => (
342349
configuration
343350
.get("delta.dataSkippingNumIndexedCols")
344-
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
345-
.unwrap_or(crate::table::config::DEFAULT_NUM_INDEX_COLS),
351+
.and_then(|v| {
352+
v.as_ref()
353+
.and_then(|vv| vv.parse::<u64>().ok())
354+
.map(DataSkippingNumIndexedCols::NumColumns)
355+
})
356+
.unwrap_or(DataSkippingNumIndexedCols::NumColumns(
357+
DEFAULT_NUM_INDEX_COLS,
358+
)),
346359
configuration
347360
.get("delta.dataSkippingStatsColumns")
348-
.and_then(|v| v.as_ref().map(|v| v.split(',').collect::<Vec<&str>>())),
361+
.and_then(|v| {
362+
v.as_ref()
363+
.map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<String>>())
364+
}),
349365
),
350366
};
351367
(
@@ -360,14 +376,14 @@ pub fn get_num_idx_cols_and_stats_columns(
360376
/// If table_config does not exist (only can occur in the first write action) it takes
361377
/// the configuration that was passed to the writerBuilder.
362378
pub(crate) fn get_target_file_size(
363-
config: &Option<crate::table::config::TableConfig<'_>>,
379+
config: Option<&TableProperties>,
364380
configuration: &HashMap<String, Option<String>>,
365-
) -> i64 {
381+
) -> u64 {
366382
match &config {
367-
Some(conf) => conf.target_file_size(),
383+
Some(conf) => conf.target_file_size().get(),
368384
_ => configuration
369385
.get("delta.targetFileSize")
370-
.and_then(|v| v.clone().map(|v| v.parse::<i64>().unwrap()))
386+
.and_then(|v| v.clone().map(|v| v.parse::<u64>().unwrap()))
371387
.unwrap_or(crate::table::config::DEFAULT_TARGET_FILE_SIZE),
372388
}
373389
}

0 commit comments

Comments
 (0)