Skip to content
Closed
179 changes: 63 additions & 116 deletions parquet-variant-compute/src/variant_array_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

use crate::VariantArray;
use arrow::array::{ArrayRef, BinaryViewArray, BinaryViewBuilder, NullBufferBuilder, StructArray};
use arrow_schema::{DataType, Field, Fields};
use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder, VariantBuilderExt};
use arrow_schema::{ArrowError, DataType, Field, Fields};
use parquet_variant::{
ListBuilder, MetadataBuilderXX, ObjectBuilder, ParentState, ValueBuilder, Variant,
VariantBuilderExt,
};
use std::sync::Arc;

/// A builder for [`VariantArray`]
Expand Down Expand Up @@ -73,13 +76,13 @@ pub struct VariantArrayBuilder {
/// Nulls
nulls: NullBufferBuilder,
/// buffer for all the metadata
metadata_buffer: Vec<u8>,
metadata_builder: MetadataBuilderXX,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the metadata builder is now "reusable" (each new finish call just appends another chunk of bytes to the underlying byte buffer), we can instantiate it directly and just pass it by reference to the VariantArrayVariantBuilder helper class.

/// (offset, len) pairs for locations of metadata in the buffer
metadata_locations: Vec<(usize, usize)>,
metadata_offsets: Vec<usize>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Track starting offsets (similar to normal variant code), and infer the length of each entry as the difference between adjacent offsets. Ditto for value_offsets below.

/// buffer for values
value_buffer: Vec<u8>,
value_builder: ValueBuilder,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to be called ValueBuffer, and it turns out to have a better API for our purposes than VariantBuilder. So we no longer create a VariantBuilder at all, and just pass the value and metadata builders around by reference as needed.

/// (offset, len) pairs for locations of values in the buffer
value_locations: Vec<(usize, usize)>,
value_offsets: Vec<usize>,
/// The fields of the final `StructArray`
///
/// TODO: 1) Add extension type metadata
Expand All @@ -95,10 +98,10 @@ impl VariantArrayBuilder {

Self {
nulls: NullBufferBuilder::new(row_capacity),
metadata_buffer: Vec::new(), // todo allocation capacity
metadata_locations: Vec::with_capacity(row_capacity),
value_buffer: Vec::new(),
value_locations: Vec::with_capacity(row_capacity),
metadata_builder: Default::default(), // todo allocation capacity
metadata_offsets: Vec::with_capacity(row_capacity),
value_builder: Default::default(),
value_offsets: Vec::with_capacity(row_capacity),
fields: Fields::from(vec![metadata_field, value_field]),
}
}
Expand All @@ -107,16 +110,17 @@ impl VariantArrayBuilder {
pub fn build(self) -> VariantArray {
let Self {
mut nulls,
metadata_buffer,
metadata_locations,
value_buffer,
value_locations,
metadata_builder,
metadata_offsets,
value_builder,
value_offsets,
fields,
} = self;

let metadata_array = binary_view_array_from_buffers(metadata_buffer, metadata_locations);
let metadata_array =
binary_view_array_from_buffers(metadata_builder.into_inner(), metadata_offsets);

let value_array = binary_view_array_from_buffers(value_buffer, value_locations);
let value_array = binary_view_array_from_buffers(value_builder.into_inner(), value_offsets);

// The build the final struct array
let inner = StructArray::new(
Expand All @@ -136,20 +140,15 @@ impl VariantArrayBuilder {
pub fn append_null(&mut self) {
self.nulls.append_null();
// The subfields are expected to be non-nullable according to the parquet variant spec.
let metadata_offset = self.metadata_buffer.len();
let metadata_length = 0;
self.metadata_locations
.push((metadata_offset, metadata_length));
let value_offset = self.value_buffer.len();
let value_length = 0;
self.value_locations.push((value_offset, value_length));
self.metadata_offsets.push(self.metadata_builder.offset());
self.value_offsets.push(self.value_builder.offset());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not as part of this PR, but I think the Null elements are supposed to have a Variant::Null rather than empty for all elements that are marked as null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For top-level (which this is), I believe you are correct. Good catch!

}

/// Append the [`Variant`] to the builder as the next row
pub fn append_variant(&mut self, variant: Variant) {
let mut direct_builder = self.variant_builder();
direct_builder.variant_builder.append_value(variant);
direct_builder.finish()
direct_builder.append_value(variant);
direct_builder.finish();
}

/// Return a `VariantArrayVariantBuilder` that writes directly to the
Expand Down Expand Up @@ -186,10 +185,7 @@ impl VariantArrayBuilder {
/// assert!(variant_array.value(1).as_object().is_some());
/// ```
pub fn variant_builder(&mut self) -> VariantArrayVariantBuilder<'_> {
// append directly into the metadata and value buffers
let metadata_buffer = std::mem::take(&mut self.metadata_buffer);
let value_buffer = std::mem::take(&mut self.value_buffer);
VariantArrayVariantBuilder::new(self, metadata_buffer, value_buffer)
VariantArrayVariantBuilder::new(self)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move all the magic into the constructor (this could be done as a separate PR)

}
}

Expand All @@ -212,22 +208,19 @@ pub struct VariantArrayVariantBuilder<'a> {
/// have been moved into the variant builder, and must be returned on
/// drop
array_builder: &'a mut VariantArrayBuilder,
/// Builder for the in progress variant value, temporarily owns the buffers
/// from `array_builder`
variant_builder: VariantBuilder,
}

impl VariantBuilderExt for VariantArrayVariantBuilder<'_> {
fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) {
self.variant_builder.append_value(value);
ValueBuilder::try_append_variant(self.parent_state(), value.into()).unwrap()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed one

Suggested change
ValueBuilder::try_append_variant(self.parent_state(), value.into()).unwrap()
ValueBuilder::append_variant(self.parent_state(), value.into())

}

fn new_list(&mut self) -> ListBuilder<'_> {
self.variant_builder.new_list()
fn try_new_list(&mut self) -> Result<ListBuilder<'_>, ArrowError> {
Ok(ListBuilder::new(self.parent_state(), false))
}

fn new_object(&mut self) -> ObjectBuilder<'_> {
self.variant_builder.new_object()
fn try_new_object(&mut self) -> Result<ObjectBuilder<'_>, ArrowError> {
Ok(ObjectBuilder::new(self.parent_state(), false))
}
}

Expand All @@ -236,30 +229,22 @@ impl<'a> VariantArrayVariantBuilder<'a> {
///
/// Note this is not public as this is a structure that is logically
/// part of the [`VariantArrayBuilder`] and relies on its internal structure
fn new(
array_builder: &'a mut VariantArrayBuilder,
metadata_buffer: Vec<u8>,
value_buffer: Vec<u8>,
) -> Self {
let metadata_offset = metadata_buffer.len();
let value_offset = value_buffer.len();
fn new(array_builder: &'a mut VariantArrayBuilder) -> Self {
let metadata_offset = array_builder.metadata_builder.offset();
let value_offset = array_builder.value_builder.offset();
VariantArrayVariantBuilder {
finished: false,
metadata_offset,
value_offset,
variant_builder: VariantBuilder::new_with_buffers(metadata_buffer, value_buffer),
array_builder,
}
}

/// Return a reference to the underlying `VariantBuilder`
pub fn inner(&self) -> &VariantBuilder {
&self.variant_builder
}

/// Return a mutable reference to the underlying `VariantBuilder`
pub fn inner_mut(&mut self) -> &mut VariantBuilder {
&mut self.variant_builder
fn parent_state(&mut self) -> ParentState<'_> {
ParentState::variant(
&mut self.array_builder.value_builder,
&mut self.array_builder.metadata_builder,
Comment on lines +244 to +246
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the important bits of magic -- we leverage the new parent state flavored ValueBuilder API, which allows us to construct an appropriate ParentState directly from our internal builders. No need to pass ownership back and forth between the VariantArrayBuilder and short-lived VariantBuilder instances.

)
}

/// Called to finish the in progress variant and write it to the underlying
Expand All @@ -272,84 +257,46 @@ impl<'a> VariantArrayVariantBuilder<'a> {

let metadata_offset = self.metadata_offset;
let value_offset = self.value_offset;
// get the buffers back from the variant builder
let (metadata_buffer, value_buffer) = std::mem::take(&mut self.variant_builder).finish();

let metadata_builder = &mut self.array_builder.metadata_builder;
let value_builder = &mut self.array_builder.value_builder;

// Sanity Check: if the buffers got smaller, something went wrong (previous data was lost)
let metadata_len = metadata_buffer
.len()
.checked_sub(metadata_offset)
.expect("metadata length decreased unexpectedly");
let value_len = value_buffer
.len()
.checked_sub(value_offset)
.expect("value length decreased unexpectedly");
assert!(
metadata_offset <= metadata_builder.offset(),
"metadata length decreased unexpectedly"
);
assert!(
value_offset <= value_builder.offset(),
"value length decreased unexpectedly"
);

metadata_builder.finish();

// commit the changes by putting the
// offsets and lengths into the parent array builder.
self.array_builder
.metadata_locations
.push((metadata_offset, metadata_len));
self.array_builder
.value_locations
.push((value_offset, value_len));
self.array_builder.metadata_offsets.push(metadata_offset);
self.array_builder.value_offsets.push(value_offset);
Comment on lines +278 to +279
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ParentState API doesn't cover these fields, so we take care of them here.

The code would be a bit simpler if we stored end offsets instead of starting offsets, but I couldn't get it to work for some reason (failing unit tests with wrong offsets). See the reverted commit for details.

self.array_builder.nulls.append_non_null();
// put the buffers back into the array builder
self.array_builder.metadata_buffer = metadata_buffer;
self.array_builder.value_buffer = value_buffer;
}
}

// Make it harder for people to accidentally forget to `finish` the builder.
impl Drop for VariantArrayVariantBuilder<'_> {
/// If the builder was not finished, roll back any changes made to the
/// underlying buffers (by truncating them)
fn drop(&mut self) {
if self.finished {
return;
}

// if the object was not finished, need to rollback any changes by
// truncating the buffers to the original offsets
let metadata_offset = self.metadata_offset;
let value_offset = self.value_offset;

// get the buffers back from the variant builder
let (mut metadata_buffer, mut value_buffer) =
std::mem::take(&mut self.variant_builder).into_buffers();

// Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) so panic immediately
metadata_buffer
.len()
.checked_sub(metadata_offset)
.expect("metadata length decreased unexpectedly");
value_buffer
.len()
.checked_sub(value_offset)
.expect("value length decreased unexpectedly");

// Note this truncate is fast because truncate doesn't free any memory:
// it just has to drop elements (and u8 doesn't have a destructor)
metadata_buffer.truncate(metadata_offset);
value_buffer.truncate(value_offset);

// put the buffers back into the array builder
self.array_builder.metadata_buffer = metadata_buffer;
self.array_builder.value_buffer = value_buffer;
Comment on lines -311 to -337
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new ParentState approach eliminated all this code.

}
fn drop(&mut self) {}
}

fn binary_view_array_from_buffers(
buffer: Vec<u8>,
locations: Vec<(usize, usize)>,
) -> BinaryViewArray {
let mut builder = BinaryViewBuilder::with_capacity(locations.len());
fn binary_view_array_from_buffers(buffer: Vec<u8>, mut offsets: Vec<usize>) -> BinaryViewArray {
let mut builder = BinaryViewBuilder::with_capacity(offsets.len());
offsets.push(buffer.len());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
offsets.push(buffer.len());
// Push the final end offset only to simplify the loop below
offsets.push(buffer.len());

let block = builder.append_block(buffer.into());
// TODO this can be much faster if it creates the views directly during append
for (offset, length) in locations {
let offset = offset.try_into().expect("offset should fit in u32");
let length = length.try_into().expect("length should fit in u32");
for i in 1..offsets.len() {
let start = u32::try_from(offsets[i - 1]).expect("offset should fit in u32");
let end = u32::try_from(offsets[i]).expect("offset should fit in u32");
let length = end - start;
builder
.try_append_view(block, offset, length)
.try_append_view(block, start, length)
.expect("Failed to append view");
}
builder.finish()
Expand Down
20 changes: 10 additions & 10 deletions parquet-variant-json/src/from_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) -> Result<(),
}
Value::String(s) => builder.append_value(s.as_str()),
Value::Array(arr) => {
let mut list_builder = builder.new_list();
let mut list_builder = builder.try_new_list()?;
for val in arr {
append_json(val, &mut list_builder)?;
}
list_builder.finish();
}
Value::Object(obj) => {
let mut obj_builder = builder.new_object();
let mut obj_builder = builder.try_new_object()?;
for (key, value) in obj.iter() {
let mut field_builder = ObjectFieldBuilder {
key,
Expand All @@ -145,12 +145,12 @@ impl VariantBuilderExt for ObjectFieldBuilder<'_, '_, '_> {
self.builder.insert(self.key, value);
}

fn new_list(&mut self) -> ListBuilder<'_> {
self.builder.new_list(self.key)
fn try_new_list(&mut self) -> Result<ListBuilder<'_>, ArrowError> {
self.builder.try_new_list(self.key)
}

fn new_object(&mut self) -> ObjectBuilder<'_> {
self.builder.new_object(self.key)
fn try_new_object(&mut self) -> Result<ObjectBuilder<'_>, ArrowError> {
self.builder.try_new_object(self.key)
}
}

Expand Down Expand Up @@ -630,10 +630,10 @@ mod test {
// Verify metadata size = 1 + 2 + 2 * 497 + 3 * 496
assert_eq!(metadata.len(), 2485);
// Verify value size.
// Size of innermost_list: 1 + 1 + 258 + 256 = 516
// Size of inner object: 1 + 4 + 256 + 257 * 3 + 256 * 516 = 133128
// Size of json: 1 + 4 + 512 + 1028 + 256 * 133128 = 34082313
assert_eq!(value.len(), 34082313);
// Size of innermost_list: 1 + 1 + 2*(128 + 1) + 2*128 = 516
// Size of inner object: 1 + 4 + 2*256 + 3*(256 + 1) + 256 * 516 = 133384
// Size of json: 1 + 4 + 2*256 + 4*(256 + 1) + 256 * 133384 = 34147849
assert_eq!(value.len(), 34147849);
Comment on lines +633 to +636
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I changed the ParentState protocol to eagerly insert new field names, it upset the delicate balance in this test:

  • Previously, it would start the outer object, then register the first inner object without registering its field name, and then register 256 fields; that meant the dictionary's first 256 entries were all contained inside the first inner object, and could be encoded as a single byte.
  • Now, the first inner object's field name is registered immediately upon creation, which means the dictionary contains 257 entries by the time the inner object gets finalized. Which requires 2 bytes to encode each field id.

The size change is tiny -- less than 0.2% -- and the original optimized layout requires an extremely precise choreography that is unlikely to arise often in the wild. So IMO the change is acceptable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this analysis -- the test seems overly sensitive to small pturbatons


let mut variant_builder = VariantBuilder::new();
let mut object_builder = variant_builder.new_object();
Expand Down
Loading
Loading