Skip to content

Commit 3dbc7ab

Browse files
authored
Merge branch 'main' into manndp/write_domain_metadata
2 parents 6f13346 + a5705c5 commit 3dbc7ab

File tree

43 files changed

+1242
-390
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1242
-390
lines changed

acceptance/src/data.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ pub async fn assert_scan_metadata(
115115
test_case: &TestCaseInfo,
116116
) -> TestResult<()> {
117117
let table_root = test_case.table_root()?;
118-
let snapshot = Snapshot::builder(table_root).build(engine.as_ref())?;
119-
let scan = snapshot.into_scan_builder().build()?;
118+
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
119+
let scan = snapshot.scan_builder().build()?;
120120
let mut schema = None;
121121
let batches: Vec<RecordBatch> = scan
122122
.execute(engine)?

acceptance/src/meta.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,11 @@ impl TestCaseInfo {
103103
let engine = engine.as_ref();
104104
let (latest, versions) = self.versions().await?;
105105

106-
let snapshot = Snapshot::builder(self.table_root()?).build(engine)?;
106+
let snapshot = Snapshot::builder_for(self.table_root()?).build(engine)?;
107107
self.assert_snapshot_meta(&latest, &snapshot)?;
108108

109109
for table_version in versions {
110-
let snapshot = Snapshot::builder(self.table_root()?)
110+
let snapshot = Snapshot::builder_for(self.table_root()?)
111111
.at_version(table_version.version)
112112
.build(engine)?;
113113
self.assert_snapshot_meta(&table_version, &snapshot)?;

ffi/examples/read-table/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ include(CTest)
1414
set(TestRunner "../../../tests/read-table-testing/run_test.sh")
1515
set(DatPath "../../../../acceptance/tests/dat/out/reader_tests/generated")
1616
set(ExpectedPath "../../../tests/read-table-testing/expected-data")
17+
set(KernelTestPath "../../../../kernel/tests/data")
1718
add_test(NAME read_and_print_all_prim COMMAND ${TestRunner} ${DatPath}/all_primitive_types/delta/ ${ExpectedPath}/all-prim-types.expected)
1819
add_test(NAME read_and_print_basic_partitioned COMMAND ${TestRunner} ${DatPath}/basic_partitioned/delta/ ${ExpectedPath}/basic-partitioned.expected)
20+
add_test(NAME read_and_print_with_dv_small COMMAND ${TestRunner} ${KernelTestPath}/table-with-dv-small/ ${ExpectedPath}/table-with-dv-small.expected)
1921

2022
if(WIN32)
2123
set(CMAKE_C_FLAGS_DEBUG "/MT")

ffi/examples/read-table/read_table.c

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ void scan_row_callback(
4949
KernelStringSlice path,
5050
int64_t size,
5151
const Stats* stats,
52-
const DvInfo* dv_info,
52+
const CDvInfo* cdv_info,
5353
const Expression* transform,
5454
const CStringMap* partition_values)
5555
{
@@ -62,18 +62,25 @@ void scan_row_callback(
6262
print_diag(" [no stats])\n");
6363
}
6464
KernelStringSlice table_root_slice = { context->table_root, strlen(context->table_root) };
65-
ExternResultKernelBoolSlice selection_vector_res =
66-
selection_vector_from_dv(dv_info, context->engine, table_root_slice);
67-
if (selection_vector_res.tag != OkKernelBoolSlice) {
68-
printf("Could not get selection vector from kernel\n");
69-
exit(-1);
70-
}
71-
KernelBoolSlice selection_vector = selection_vector_res.ok;
72-
if (selection_vector.len > 0) {
73-
print_diag(" Selection vector for this file:\n");
74-
print_selection_vector(" ", &selection_vector);
65+
KernelBoolSlice selection_vector;
66+
67+
if (cdv_info->has_vector) {
68+
ExternResultKernelBoolSlice selection_vector_res =
69+
selection_vector_from_dv(cdv_info->info, context->engine, table_root_slice);
70+
if (selection_vector_res.tag != OkKernelBoolSlice) {
71+
printf("Could not get selection vector from kernel\n");
72+
exit(-1);
73+
}
74+
selection_vector = selection_vector_res.ok;
75+
if (selection_vector.len > 0) {
76+
print_diag(" Selection vector for this file:\n");
77+
print_selection_vector(" ", &selection_vector);
78+
} else {
79+
print_diag(" No selection vector for this file\n");
80+
}
7581
} else {
7682
print_diag(" No selection vector for this file\n");
83+
selection_vector.len = 0;
7784
}
7885
context->partition_values = partition_values;
7986
print_partition_info(context, partition_values);

ffi/src/ffi_test_utils.rs

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,11 @@ pub(crate) fn ok_or_panic<T>(result: ExternResult<T>) -> T {
4747
match result {
4848
ExternResult::Ok(t) => t,
4949
ExternResult::Err(e) => unsafe {
50-
panic!("Got engine error with type {:?}", (*e).etype);
50+
let error = recover_error(e);
51+
panic!(
52+
"Got engine error with type {:?} message: {}",
53+
error.etype, error.message
54+
);
5155
},
5256
}
5357
}
@@ -67,3 +71,62 @@ pub(crate) fn assert_extern_result_error_with_message<T>(
6771
_ => panic!("Expected error of type '{expected_etype:?}' and message '{expected_message}'"),
6872
}
6973
}
74+
75+
#[cfg(test)]
76+
mod tests {
77+
use super::*;
78+
use std::panic;
79+
80+
#[test]
81+
fn test_ok_or_panic_with_error() {
82+
// Create a test error
83+
let message = "Test error message";
84+
let error_ptr = allocate_err(
85+
KernelError::GenericError,
86+
KernelStringSlice {
87+
ptr: message.as_ptr() as *const i8,
88+
len: message.len(),
89+
},
90+
);
91+
let result = ExternResult::<i32>::Err(error_ptr);
92+
93+
// Test that ok_or_panic panics with the expected message
94+
let panic_result = panic::catch_unwind(|| {
95+
ok_or_panic(result);
96+
});
97+
98+
assert!(panic_result.is_err(), "Expected ok_or_panic to panic");
99+
100+
// Check that the panic message contains the error type and message
101+
let panic_message = panic_result.unwrap_err();
102+
let panic_str = if let Some(s) = panic_message.downcast_ref::<String>() {
103+
s.clone()
104+
} else {
105+
"Unknown panic type".to_string()
106+
};
107+
108+
assert!(
109+
panic_str.contains("Got engine error with type"),
110+
"Panic message should contain 'Got engine error with type', got: {}",
111+
panic_str
112+
);
113+
assert!(
114+
panic_str.contains("GenericError"),
115+
"Panic message should contain error type 'GenericError', got: {}",
116+
panic_str
117+
);
118+
assert!(
119+
panic_str.contains(message),
120+
"Panic message should contain error message 'Test error message', got: {}",
121+
panic_str
122+
);
123+
}
124+
125+
#[test]
126+
fn test_ok_or_panic_with_ok() {
127+
// Test that ok_or_panic returns the value when the result is Ok
128+
let result = ExternResult::<i32>::Ok(42);
129+
let value = ok_or_panic(result);
130+
assert_eq!(value, 42);
131+
}
132+
}

ffi/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -601,15 +601,15 @@ fn snapshot_impl(
601601
extern_engine: &dyn ExternEngine,
602602
version: Option<Version>,
603603
) -> DeltaResult<Handle<SharedSnapshot>> {
604-
let builder = Snapshot::builder(url?);
604+
let builder = Snapshot::builder_for(url?);
605605
let builder = if let Some(v) = version {
606606
// TODO: should we include a `with_version_opt` method for the builder?
607607
builder.at_version(v)
608608
} else {
609609
builder
610610
};
611611
let snapshot = builder.build(extern_engine.engine().as_ref())?;
612-
Ok(Arc::new(snapshot).into())
612+
Ok(snapshot.into())
613613
}
614614

615615
/// # Safety

ffi/src/scan.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,13 +275,27 @@ pub struct Stats {
275275
pub num_records: u64,
276276
}
277277

278+
/// Contains information that can be used to get a selection vector. If `has_vector` is false, that
279+
/// indicates there is no selection vector to consider. It is always possible to get a vector out of
280+
/// a `DvInfo`, but if `has_vector` is false it will just be an empty vector (indicating all
281+
/// selected). Without this there's no way for a connector using ffi to know if a &DvInfo actually
282+
/// has a vector in it. We have has_vector() on the rust side, but this isn't exposed via ffi. So
283+
/// this just wraps the &DvInfo in another struct which includes a boolean that says if there is a
284+
/// dv to consider or not. This allows engines to ignore dv info if there isn't any without needing
285+
/// to make another ffi call at all.
286+
#[repr(C)]
287+
pub struct CDvInfo<'a> {
288+
info: &'a DvInfo,
289+
has_vector: bool,
290+
}
291+
278292
/// This callback will be invoked for each valid file that needs to be read for a scan.
279293
///
280294
/// The arguments to the callback are:
281295
/// * `context`: a `void*` context this can be anything that engine needs to pass through to each call
282296
/// * `path`: a `KernelStringSlice` which is the path to the file
283297
/// * `size`: an `i64` which is the size of the file
284-
/// * `dv_info`: a [`DvInfo`] struct, which allows getting the selection vector for this file
298+
/// * `dv_info`: a [`CDvInfo`] struct, which allows getting the selection vector for this file
285299
/// * `transform`: An optional expression that, if not `NULL`, _must_ be applied to physical data to
286300
/// convert it to the correct logical format. If this is `NULL`, no transform is needed.
287301
/// * `partition_values`: [DEPRECATED] a `HashMap<String, String>` which are partition values
@@ -290,7 +304,7 @@ type CScanCallback = extern "C" fn(
290304
path: KernelStringSlice,
291305
size: i64,
292306
stats: Option<&Stats>,
293-
dv_info: &DvInfo,
307+
dv_info: &CDvInfo,
294308
transform: Option<&Expression>,
295309
partition_map: &CStringMap,
296310
);
@@ -430,12 +444,16 @@ fn rust_callback(
430444
let stats = kernel_stats.map(|ks| Stats {
431445
num_records: ks.num_records,
432446
});
447+
let cdv_info = CDvInfo {
448+
info: &dv_info,
449+
has_vector: dv_info.has_vector(),
450+
};
433451
(context.callback)(
434452
context.engine_context,
435453
kernel_string_slice!(path),
436454
size,
437455
stats.as_ref(),
438-
&dv_info,
456+
&cdv_info,
439457
transform.as_ref(),
440458
&partition_map,
441459
);

ffi/src/transaction/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use crate::{DeltaResult, ExternEngine, Snapshot, Url};
99
use crate::{ExclusiveEngineData, SharedExternEngine};
1010
use delta_kernel::transaction::{CommitResult, Transaction};
1111
use delta_kernel_ffi_macros::handle_descriptor;
12-
use std::sync::Arc;
1312

1413
/// A handle representing an exclusive transaction on a Delta table. (Similar to a Box<_>)
1514
///
@@ -38,7 +37,7 @@ fn transaction_impl(
3837
url: DeltaResult<Url>,
3938
extern_engine: &dyn ExternEngine,
4039
) -> DeltaResult<Handle<ExclusiveTransaction>> {
41-
let snapshot = Arc::new(Snapshot::builder(url?).build(extern_engine.engine().as_ref())?);
40+
let snapshot = Snapshot::builder_for(url?).build(extern_engine.engine().as_ref())?;
4241
let transaction = snapshot.transaction();
4342
Ok(Box::new(transaction?).into())
4443
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
Reading table at ../../../../kernel/tests/data/table-with-dv-small/
2+
version: 1
3+
4+
Schema:
5+
└─ value: integer
6+
7+
value: [
8+
1,
9+
2,
10+
3,
11+
4,
12+
5,
13+
6,
14+
7,
15+
8
16+
]

kernel/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,3 +152,7 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [
152152
[[bench]]
153153
name = "metadata_bench"
154154
harness = false
155+
156+
[[bench]]
157+
name = "expression_bench"
158+
harness = false

0 commit comments

Comments
 (0)