From 2e0d72e4c3a2402c7899b7d6e5e2e5ddef78970c Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 13 Apr 2023 20:19:19 -0700 Subject: [PATCH 1/3] Introduce a test function to make it easier to create test tables with schemas --- rust/src/writer/test_utils.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/rust/src/writer/test_utils.rs b/rust/src/writer/test_utils.rs index 62684c3f05..b4c3b1a8fe 100644 --- a/rust/src/writer/test_utils.rs +++ b/rust/src/writer/test_utils.rs @@ -164,6 +164,10 @@ pub fn get_delta_metadata(partition_cols: &[String]) -> DeltaTableMetaData { ) } +/* + * Create a bare table without any commits, this basically just creates a directory and then + * initializes a DeltaTable from it + */ pub fn create_bare_table() -> DeltaTable { let table_dir = tempfile::tempdir().unwrap(); let table_path = table_dir.path(); @@ -172,10 +176,14 @@ pub fn create_bare_table() -> DeltaTable { .unwrap() } -pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable { +/* + * Create an initialized table with a schema + */ +pub async fn create_initialized_table_with( + schema: Schema, + partition_cols: &[String], +) -> DeltaTable { let mut table = create_bare_table(); - let table_schema = get_delta_schema(); - let mut commit_info = serde_json::Map::::new(); commit_info.insert( "operation".to_string(), @@ -195,7 +203,7 @@ pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable { None, None, None, - table_schema, + schema, partition_cols.to_vec(), HashMap::new(), ); @@ -207,3 +215,11 @@ pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable { table } + +/* + * Create an initialize table in a temp directory with a test schema + */ +pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable { + let table_schema = get_delta_schema(); + create_initialized_table_with(table_schema, partition_cols).await +} From e1b2b64c45d854e0de6976f9e7655d187af6c3fa Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 13 Apr 2023 20:19:45 -0700 Subject: [PATCH 2/3] Introduce a failing test for #1286 This test currently fails because the RecordBatchWriter doesn't like the difference between Timestamps: ---- writer::record_batch::tests::test_write_batch_with_timestamps stdout ---- thread 'writer::record_batch::tests::test_write_batch_with_timestamps' panicked at 'called `Result::unwrap()` on an `Err` value: InvalidArgumentError("column types must match schema types, expected Timestamp(Microsecond, None) but found Timestamp(Nanosecond, None) at column index 1")', rust/src/writer/record_batch.rs:507:101 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace --- rust/src/writer/record_batch.rs | 45 ++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/rust/src/writer/record_batch.rs b/rust/src/writer/record_batch.rs index 4e0bd38833..bd5cdb308b 100644 --- a/rust/src/writer/record_batch.rs +++ b/rust/src/writer/record_batch.rs @@ -413,7 +413,7 @@ pub(crate) fn divide_by_partition_values( mod tests { use super::*; use crate::writer::{ - test_utils::{create_initialized_table, get_record_batch}, + test_utils::{create_initialized_table, create_initialized_table_with, get_record_batch}, utils::PartitionPath, }; use std::path::Path; @@ -477,6 +477,49 @@ mod tests { assert_eq!(adds.len(), 1); } + /* + * This is a test case to address: + * + */ + #[tokio::test] + async fn test_write_batch_with_timestamps() { + use crate::{SchemaDataType, SchemaField}; + use arrow::array::*; + use arrow::datatypes::{Field, TimeUnit, DataType as ArrowDataType}; + + let schema = Schema::new(vec![ + SchemaField::new( + "id".to_string(), + SchemaDataType::primitive("string".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "timestamp".to_string(), + SchemaDataType::primitive("timestamp".to_string()), + true, + HashMap::new(), + ), + ]); + + let batch_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", ArrowDataType::Utf8, true), + Field::new("timestamp", ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), true), + ])); + + let table = create_initialized_table_with(schema, &vec![]).await; + + let id_values = Arc::new(StringArray::from(vec![Some("Hi")])); + let timestamp_values = Arc::new(TimestampNanosecondArray::from(vec![1])); + let batch = RecordBatch::try_new(batch_schema, vec![id_values, timestamp_values]) + .unwrap(); + let mut writer = RecordBatchWriter::for_table(&table).unwrap(); + + writer.write(batch).await.unwrap(); + let adds = writer.flush().await.unwrap(); + assert_eq!(adds.len(), 1); + } + #[tokio::test] async fn test_write_multiple_partitions() { let batch = get_record_batch(None, false); From 2d21532f2c9c9615f9612cc34fa579815a98d9e6 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 13 Apr 2023 21:57:33 -0700 Subject: [PATCH 3/3] wip --- rust/src/writer/record_batch.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/rust/src/writer/record_batch.rs b/rust/src/writer/record_batch.rs index bd5cdb308b..fb5a24a73e 100644 --- a/rust/src/writer/record_batch.rs +++ b/rust/src/writer/record_batch.rs @@ -45,6 +45,7 @@ use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use bytes::Bytes; +use log::*; use object_store::ObjectStore; use parquet::{arrow::ArrowWriter, errors::ParquetError}; use parquet::{basic::Compression, file::properties::WriterProperties}; @@ -297,7 +298,15 @@ impl PartitionWriter { /// This method buffers the write stream internally so it can be invoked for many /// record batches and flushed after the appropriate number of bytes has been written. pub fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DeltaWriterError> { + /* + * This schema check is redundant with one which ArrowWriter + * performs itself + */ if record_batch.schema() != self.arrow_schema { + debug!("Attempting to write RecordBatch which does not match self.arrow_schema"); + let new_batch = RecordBatch::try_new(self.arrow_schema.clone(), record_batch.columns().iter().map(|c| c.clone()).collect())?; + + println!("record batch mismatch, new: {:?}", new_batch); return Err(DeltaWriterError::SchemaMismatch { record_batch_schema: record_batch.schema(), expected_schema: self.arrow_schema.clone(),