-
Notifications
You must be signed in to change notification settings - Fork 1k
[Variant] [Pathfinding] Variant builder rollback, variant array builder, and read-only metadata builder #8167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bdb0c99
9e502d5
78cba4d
984dc0b
55e4cc8
b1478c2
c82b7f0
f551f9b
714c5a0
8da01b4
bf39e9c
00ba715
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -19,8 +19,11 @@ | |||||||
|
|
||||||||
| use crate::VariantArray; | ||||||||
| use arrow::array::{ArrayRef, BinaryViewArray, BinaryViewBuilder, NullBufferBuilder, StructArray}; | ||||||||
| use arrow_schema::{DataType, Field, Fields}; | ||||||||
| use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder, VariantBuilderExt}; | ||||||||
| use arrow_schema::{ArrowError, DataType, Field, Fields}; | ||||||||
| use parquet_variant::{ | ||||||||
| ListBuilder, MetadataBuilderXX, ObjectBuilder, ParentState, ValueBuilder, Variant, | ||||||||
| VariantBuilderExt, | ||||||||
| }; | ||||||||
| use std::sync::Arc; | ||||||||
|
|
||||||||
| /// A builder for [`VariantArray`] | ||||||||
|
|
@@ -73,13 +76,13 @@ pub struct VariantArrayBuilder { | |||||||
| /// Nulls | ||||||||
| nulls: NullBufferBuilder, | ||||||||
| /// buffer for all the metadata | ||||||||
| metadata_buffer: Vec<u8>, | ||||||||
| metadata_builder: MetadataBuilderXX, | ||||||||
| /// (offset, len) pairs for locations of metadata in the buffer | ||||||||
| metadata_locations: Vec<(usize, usize)>, | ||||||||
| metadata_offsets: Vec<usize>, | ||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Track starting offsets (similar to normal variant code), and infer the length of each entry as the difference between adjacent offsets. Ditto for |
||||||||
| /// buffer for values | ||||||||
| value_buffer: Vec<u8>, | ||||||||
| value_builder: ValueBuilder, | ||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This used to be called |
||||||||
| /// (offset, len) pairs for locations of values in the buffer | ||||||||
| value_locations: Vec<(usize, usize)>, | ||||||||
| value_offsets: Vec<usize>, | ||||||||
| /// The fields of the final `StructArray` | ||||||||
| /// | ||||||||
| /// TODO: 1) Add extension type metadata | ||||||||
|
|
@@ -95,10 +98,10 @@ impl VariantArrayBuilder { | |||||||
|
|
||||||||
| Self { | ||||||||
| nulls: NullBufferBuilder::new(row_capacity), | ||||||||
| metadata_buffer: Vec::new(), // todo allocation capacity | ||||||||
| metadata_locations: Vec::with_capacity(row_capacity), | ||||||||
| value_buffer: Vec::new(), | ||||||||
| value_locations: Vec::with_capacity(row_capacity), | ||||||||
| metadata_builder: Default::default(), // todo allocation capacity | ||||||||
| metadata_offsets: Vec::with_capacity(row_capacity), | ||||||||
| value_builder: Default::default(), | ||||||||
| value_offsets: Vec::with_capacity(row_capacity), | ||||||||
| fields: Fields::from(vec![metadata_field, value_field]), | ||||||||
| } | ||||||||
| } | ||||||||
|
|
@@ -107,16 +110,17 @@ impl VariantArrayBuilder { | |||||||
| pub fn build(self) -> VariantArray { | ||||||||
| let Self { | ||||||||
| mut nulls, | ||||||||
| metadata_buffer, | ||||||||
| metadata_locations, | ||||||||
| value_buffer, | ||||||||
| value_locations, | ||||||||
| metadata_builder, | ||||||||
| metadata_offsets, | ||||||||
| value_builder, | ||||||||
| value_offsets, | ||||||||
| fields, | ||||||||
| } = self; | ||||||||
|
|
||||||||
| let metadata_array = binary_view_array_from_buffers(metadata_buffer, metadata_locations); | ||||||||
| let metadata_array = | ||||||||
| binary_view_array_from_buffers(metadata_builder.into_inner(), metadata_offsets); | ||||||||
|
|
||||||||
| let value_array = binary_view_array_from_buffers(value_buffer, value_locations); | ||||||||
| let value_array = binary_view_array_from_buffers(value_builder.into_inner(), value_offsets); | ||||||||
|
|
||||||||
| // The build the final struct array | ||||||||
| let inner = StructArray::new( | ||||||||
|
|
@@ -136,20 +140,15 @@ impl VariantArrayBuilder { | |||||||
| pub fn append_null(&mut self) { | ||||||||
| self.nulls.append_null(); | ||||||||
| // The subfields are expected to be non-nullable according to the parquet variant spec. | ||||||||
| let metadata_offset = self.metadata_buffer.len(); | ||||||||
| let metadata_length = 0; | ||||||||
| self.metadata_locations | ||||||||
| .push((metadata_offset, metadata_length)); | ||||||||
| let value_offset = self.value_buffer.len(); | ||||||||
| let value_length = 0; | ||||||||
| self.value_locations.push((value_offset, value_length)); | ||||||||
| self.metadata_offsets.push(self.metadata_builder.offset()); | ||||||||
| self.value_offsets.push(self.value_builder.offset()); | ||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not as part of this PR, but I think the Null elements are supposed to have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For top-level (which this is), I believe you are correct. Good catch! |
||||||||
| } | ||||||||
|
|
||||||||
| /// Append the [`Variant`] to the builder as the next row | ||||||||
| pub fn append_variant(&mut self, variant: Variant) { | ||||||||
| let mut direct_builder = self.variant_builder(); | ||||||||
| direct_builder.variant_builder.append_value(variant); | ||||||||
| direct_builder.finish() | ||||||||
| direct_builder.append_value(variant); | ||||||||
| direct_builder.finish(); | ||||||||
| } | ||||||||
|
|
||||||||
| /// Return a `VariantArrayVariantBuilder` that writes directly to the | ||||||||
|
|
@@ -186,10 +185,7 @@ impl VariantArrayBuilder { | |||||||
| /// assert!(variant_array.value(1).as_object().is_some()); | ||||||||
| /// ``` | ||||||||
| pub fn variant_builder(&mut self) -> VariantArrayVariantBuilder<'_> { | ||||||||
| // append directly into the metadata and value buffers | ||||||||
| let metadata_buffer = std::mem::take(&mut self.metadata_buffer); | ||||||||
| let value_buffer = std::mem::take(&mut self.value_buffer); | ||||||||
| VariantArrayVariantBuilder::new(self, metadata_buffer, value_buffer) | ||||||||
| VariantArrayVariantBuilder::new(self) | ||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move all the magic into the constructor (this could be done as a separate PR) |
||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
|
|
@@ -212,22 +208,19 @@ pub struct VariantArrayVariantBuilder<'a> { | |||||||
| /// have been moved into the variant builder, and must be returned on | ||||||||
| /// drop | ||||||||
| array_builder: &'a mut VariantArrayBuilder, | ||||||||
| /// Builder for the in progress variant value, temporarily owns the buffers | ||||||||
| /// from `array_builder` | ||||||||
| variant_builder: VariantBuilder, | ||||||||
| } | ||||||||
|
|
||||||||
| impl VariantBuilderExt for VariantArrayVariantBuilder<'_> { | ||||||||
| fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) { | ||||||||
| self.variant_builder.append_value(value); | ||||||||
| ValueBuilder::try_append_variant(self.parent_state(), value.into()).unwrap() | ||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missed one
Suggested change
|
||||||||
| } | ||||||||
|
|
||||||||
| fn new_list(&mut self) -> ListBuilder<'_> { | ||||||||
| self.variant_builder.new_list() | ||||||||
| fn try_new_list(&mut self) -> Result<ListBuilder<'_>, ArrowError> { | ||||||||
| Ok(ListBuilder::new(self.parent_state(), false)) | ||||||||
| } | ||||||||
|
|
||||||||
| fn new_object(&mut self) -> ObjectBuilder<'_> { | ||||||||
| self.variant_builder.new_object() | ||||||||
| fn try_new_object(&mut self) -> Result<ObjectBuilder<'_>, ArrowError> { | ||||||||
| Ok(ObjectBuilder::new(self.parent_state(), false)) | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
|
|
@@ -236,30 +229,22 @@ impl<'a> VariantArrayVariantBuilder<'a> { | |||||||
| /// | ||||||||
| /// Note this is not public as this is a structure that is logically | ||||||||
| /// part of the [`VariantArrayBuilder`] and relies on its internal structure | ||||||||
| fn new( | ||||||||
| array_builder: &'a mut VariantArrayBuilder, | ||||||||
| metadata_buffer: Vec<u8>, | ||||||||
| value_buffer: Vec<u8>, | ||||||||
| ) -> Self { | ||||||||
| let metadata_offset = metadata_buffer.len(); | ||||||||
| let value_offset = value_buffer.len(); | ||||||||
| fn new(array_builder: &'a mut VariantArrayBuilder) -> Self { | ||||||||
| let metadata_offset = array_builder.metadata_builder.offset(); | ||||||||
| let value_offset = array_builder.value_builder.offset(); | ||||||||
| VariantArrayVariantBuilder { | ||||||||
| finished: false, | ||||||||
| metadata_offset, | ||||||||
| value_offset, | ||||||||
| variant_builder: VariantBuilder::new_with_buffers(metadata_buffer, value_buffer), | ||||||||
| array_builder, | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| /// Return a reference to the underlying `VariantBuilder` | ||||||||
| pub fn inner(&self) -> &VariantBuilder { | ||||||||
| &self.variant_builder | ||||||||
| } | ||||||||
|
|
||||||||
| /// Return a mutable reference to the underlying `VariantBuilder` | ||||||||
| pub fn inner_mut(&mut self) -> &mut VariantBuilder { | ||||||||
| &mut self.variant_builder | ||||||||
| fn parent_state(&mut self) -> ParentState<'_> { | ||||||||
| ParentState::variant( | ||||||||
| &mut self.array_builder.value_builder, | ||||||||
| &mut self.array_builder.metadata_builder, | ||||||||
|
Comment on lines
+244
to
+246
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is one of the important bits of magic -- we leverage the new parent state flavored |
||||||||
| ) | ||||||||
| } | ||||||||
|
|
||||||||
| /// Called to finish the in progress variant and write it to the underlying | ||||||||
|
|
@@ -272,84 +257,46 @@ impl<'a> VariantArrayVariantBuilder<'a> { | |||||||
|
|
||||||||
| let metadata_offset = self.metadata_offset; | ||||||||
| let value_offset = self.value_offset; | ||||||||
| // get the buffers back from the variant builder | ||||||||
| let (metadata_buffer, value_buffer) = std::mem::take(&mut self.variant_builder).finish(); | ||||||||
|
|
||||||||
| let metadata_builder = &mut self.array_builder.metadata_builder; | ||||||||
| let value_builder = &mut self.array_builder.value_builder; | ||||||||
|
|
||||||||
| // Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) | ||||||||
| let metadata_len = metadata_buffer | ||||||||
| .len() | ||||||||
| .checked_sub(metadata_offset) | ||||||||
| .expect("metadata length decreased unexpectedly"); | ||||||||
| let value_len = value_buffer | ||||||||
| .len() | ||||||||
| .checked_sub(value_offset) | ||||||||
| .expect("value length decreased unexpectedly"); | ||||||||
| assert!( | ||||||||
| metadata_offset <= metadata_builder.offset(), | ||||||||
| "metadata length decreased unexpectedly" | ||||||||
| ); | ||||||||
| assert!( | ||||||||
| value_offset <= value_builder.offset(), | ||||||||
| "value length decreased unexpectedly" | ||||||||
| ); | ||||||||
|
|
||||||||
| metadata_builder.finish(); | ||||||||
|
|
||||||||
| // commit the changes by putting the | ||||||||
| // offsets and lengths into the parent array builder. | ||||||||
| self.array_builder | ||||||||
| .metadata_locations | ||||||||
| .push((metadata_offset, metadata_len)); | ||||||||
| self.array_builder | ||||||||
| .value_locations | ||||||||
| .push((value_offset, value_len)); | ||||||||
| self.array_builder.metadata_offsets.push(metadata_offset); | ||||||||
| self.array_builder.value_offsets.push(value_offset); | ||||||||
|
Comment on lines
+278
to
+279
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The The code would be a bit simpler if we stored end offsets instead of starting offsets, but I couldn't get it to work for some reason (failing unit tests with wrong offsets). See the reverted commit for details. |
||||||||
| self.array_builder.nulls.append_non_null(); | ||||||||
| // put the buffers back into the array builder | ||||||||
| self.array_builder.metadata_buffer = metadata_buffer; | ||||||||
| self.array_builder.value_buffer = value_buffer; | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| // Make it harder for people to accidentally forget to `finish` the builder. | ||||||||
| impl Drop for VariantArrayVariantBuilder<'_> { | ||||||||
| /// If the builder was not finished, roll back any changes made to the | ||||||||
| /// underlying buffers (by truncating them) | ||||||||
| fn drop(&mut self) { | ||||||||
| if self.finished { | ||||||||
| return; | ||||||||
| } | ||||||||
|
|
||||||||
| // if the object was not finished, need to rollback any changes by | ||||||||
| // truncating the buffers to the original offsets | ||||||||
| let metadata_offset = self.metadata_offset; | ||||||||
| let value_offset = self.value_offset; | ||||||||
|
|
||||||||
| // get the buffers back from the variant builder | ||||||||
| let (mut metadata_buffer, mut value_buffer) = | ||||||||
| std::mem::take(&mut self.variant_builder).into_buffers(); | ||||||||
|
|
||||||||
| // Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) so panic immediately | ||||||||
| metadata_buffer | ||||||||
| .len() | ||||||||
| .checked_sub(metadata_offset) | ||||||||
| .expect("metadata length decreased unexpectedly"); | ||||||||
| value_buffer | ||||||||
| .len() | ||||||||
| .checked_sub(value_offset) | ||||||||
| .expect("value length decreased unexpectedly"); | ||||||||
|
|
||||||||
| // Note this truncate is fast because truncate doesn't free any memory: | ||||||||
| // it just has to drop elements (and u8 doesn't have a destructor) | ||||||||
| metadata_buffer.truncate(metadata_offset); | ||||||||
| value_buffer.truncate(value_offset); | ||||||||
|
|
||||||||
| // put the buffers back into the array builder | ||||||||
| self.array_builder.metadata_buffer = metadata_buffer; | ||||||||
| self.array_builder.value_buffer = value_buffer; | ||||||||
|
Comment on lines
-311
to
-337
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new |
||||||||
| } | ||||||||
| fn drop(&mut self) {} | ||||||||
| } | ||||||||
|
|
||||||||
| fn binary_view_array_from_buffers( | ||||||||
| buffer: Vec<u8>, | ||||||||
| locations: Vec<(usize, usize)>, | ||||||||
| ) -> BinaryViewArray { | ||||||||
| let mut builder = BinaryViewBuilder::with_capacity(locations.len()); | ||||||||
| fn binary_view_array_from_buffers(buffer: Vec<u8>, mut offsets: Vec<usize>) -> BinaryViewArray { | ||||||||
| let mut builder = BinaryViewBuilder::with_capacity(offsets.len()); | ||||||||
| offsets.push(buffer.len()); | ||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
| let block = builder.append_block(buffer.into()); | ||||||||
| // TODO this can be much faster if it creates the views directly during append | ||||||||
| for (offset, length) in locations { | ||||||||
| let offset = offset.try_into().expect("offset should fit in u32"); | ||||||||
| let length = length.try_into().expect("length should fit in u32"); | ||||||||
| for i in 1..offsets.len() { | ||||||||
| let start = u32::try_from(offsets[i - 1]).expect("offset should fit in u32"); | ||||||||
| let end = u32::try_from(offsets[i]).expect("offset should fit in u32"); | ||||||||
| let length = end - start; | ||||||||
| builder | ||||||||
| .try_append_view(block, offset, length) | ||||||||
| .try_append_view(block, start, length) | ||||||||
| .expect("Failed to append view"); | ||||||||
| } | ||||||||
| builder.finish() | ||||||||
|
|
||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -114,14 +114,14 @@ fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) -> Result<(), | |
| } | ||
| Value::String(s) => builder.append_value(s.as_str()), | ||
| Value::Array(arr) => { | ||
| let mut list_builder = builder.new_list(); | ||
| let mut list_builder = builder.try_new_list()?; | ||
| for val in arr { | ||
| append_json(val, &mut list_builder)?; | ||
| } | ||
| list_builder.finish(); | ||
| } | ||
| Value::Object(obj) => { | ||
| let mut obj_builder = builder.new_object(); | ||
| let mut obj_builder = builder.try_new_object()?; | ||
| for (key, value) in obj.iter() { | ||
| let mut field_builder = ObjectFieldBuilder { | ||
| key, | ||
|
|
@@ -145,12 +145,12 @@ impl VariantBuilderExt for ObjectFieldBuilder<'_, '_, '_> { | |
| self.builder.insert(self.key, value); | ||
| } | ||
|
|
||
| fn new_list(&mut self) -> ListBuilder<'_> { | ||
| self.builder.new_list(self.key) | ||
| fn try_new_list(&mut self) -> Result<ListBuilder<'_>, ArrowError> { | ||
| self.builder.try_new_list(self.key) | ||
| } | ||
|
|
||
| fn new_object(&mut self) -> ObjectBuilder<'_> { | ||
| self.builder.new_object(self.key) | ||
| fn try_new_object(&mut self) -> Result<ObjectBuilder<'_>, ArrowError> { | ||
| self.builder.try_new_object(self.key) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -630,10 +630,10 @@ mod test { | |
| // Verify metadata size = 1 + 2 + 2 * 497 + 3 * 496 | ||
| assert_eq!(metadata.len(), 2485); | ||
| // Verify value size. | ||
| // Size of innermost_list: 1 + 1 + 258 + 256 = 516 | ||
| // Size of inner object: 1 + 4 + 256 + 257 * 3 + 256 * 516 = 133128 | ||
| // Size of json: 1 + 4 + 512 + 1028 + 256 * 133128 = 34082313 | ||
| assert_eq!(value.len(), 34082313); | ||
| // Size of innermost_list: 1 + 1 + 2*(128 + 1) + 2*128 = 516 | ||
| // Size of inner object: 1 + 4 + 2*256 + 3*(256 + 1) + 256 * 516 = 133384 | ||
| // Size of json: 1 + 4 + 2*256 + 4*(256 + 1) + 256 * 133384 = 34147849 | ||
| assert_eq!(value.len(), 34147849); | ||
|
Comment on lines
+633
to
+636
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I changed the
The size change is tiny -- less than 0.2% -- and the original optimized layout requires an extremely precise choreography that is unlikely to arise often in the wild. So IMO the change is acceptable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with this analysis -- the test seems overly sensitive to small pturbatons |
||
|
|
||
| let mut variant_builder = VariantBuilder::new(); | ||
| let mut object_builder = variant_builder.new_object(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the metadata builder is now "reusable" (each new
finishcall just appends another chunk of bytes to the underlying byte buffer), we can instantiate it directly and just pass it by reference to theVariantArrayVariantBuilderhelper class.