Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax.

Closes #NNN.
- Closes #NNN.

# Rationale for this change

Expand All @@ -13,6 +13,14 @@ Explaining clearly why changes are proposed helps reviewers understand your chan

There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.

# Are these changes tested?

We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?

# Are there any user-facing changes?

If there are user-facing changes then we may require documentation to be updated before approving the PR.
Expand Down
128 changes: 104 additions & 24 deletions parquet-variant/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
use crate::decoder::{VariantBasicType, VariantPrimitiveType};
use crate::{ShortString, Variant};
use std::collections::HashMap;
use crate::{ShortString, Variant, VariantDecimal16, VariantDecimal4, VariantDecimal8};
use std::collections::BTreeMap;

const BASIC_TYPE_BITS: u8 = 2;
const UNIX_EPOCH_DATE: chrono::NaiveDate = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
Expand Down Expand Up @@ -166,15 +166,15 @@ fn make_room_for_header(buffer: &mut Vec<u8>, start_pos: usize, header_size: usi
///
pub struct VariantBuilder {
buffer: Vec<u8>,
dict: HashMap<String, u32>,
dict: BTreeMap<String, u32>,
dict_keys: Vec<String>,
}

impl VariantBuilder {
pub fn new() -> Self {
Self {
buffer: Vec::new(),
dict: HashMap::new(),
dict: BTreeMap::new(),
dict_keys: Vec::new(),
}
}
Expand Down Expand Up @@ -296,7 +296,7 @@ impl VariantBuilder {

/// Add key to dictionary, return its ID
fn add_key(&mut self, key: &str) -> u32 {
use std::collections::hash_map::Entry;
use std::collections::btree_map::Entry;
match self.dict.entry(key.to_string()) {
Entry::Occupied(entry) => *entry.get(),
Entry::Vacant(entry) => {
Expand Down Expand Up @@ -384,9 +384,15 @@ impl VariantBuilder {
Variant::Date(v) => self.append_date(v),
Variant::TimestampMicros(v) => self.append_timestamp_micros(v),
Variant::TimestampNtzMicros(v) => self.append_timestamp_ntz_micros(v),
Variant::Decimal4 { integer, scale } => self.append_decimal4(integer, scale),
Variant::Decimal8 { integer, scale } => self.append_decimal8(integer, scale),
Variant::Decimal16 { integer, scale } => self.append_decimal16(integer, scale),
Variant::Decimal4(VariantDecimal4 { integer, scale }) => {
self.append_decimal4(integer, scale)
}
Variant::Decimal8(VariantDecimal8 { integer, scale }) => {
self.append_decimal8(integer, scale)
}
Variant::Decimal16(VariantDecimal16 { integer, scale }) => {
self.append_decimal16(integer, scale)
}
Variant::Float(v) => self.append_float(v),
Variant::Double(v) => self.append_double(v),
Variant::Binary(v) => self.append_binary(v),
Expand Down Expand Up @@ -482,7 +488,7 @@ impl<'a> ListBuilder<'a> {
pub struct ObjectBuilder<'a> {
parent: &'a mut VariantBuilder,
start_pos: usize,
fields: Vec<(u32, usize)>, // (field_id, offset)
fields: BTreeMap<u32, usize>, // (field_id, offset)
}

impl<'a> ObjectBuilder<'a> {
Expand All @@ -491,7 +497,7 @@ impl<'a> ObjectBuilder<'a> {
Self {
parent,
start_pos,
fields: Vec::new(),
fields: BTreeMap::new(),
}
}

Expand All @@ -500,25 +506,27 @@ impl<'a> ObjectBuilder<'a> {
let id = self.parent.add_key(key);
let field_start = self.parent.offset() - self.start_pos;
self.parent.append_value(value);
self.fields.push((id, field_start));
let res = self.fields.insert(id, field_start);
debug_assert!(res.is_none());
}

/// Finalize object with sorted fields
pub fn finish(mut self) {
// Sort fields by key name
self.fields.sort_by(|a, b| {
let key_a = &self.parent.dict_keys[a.0 as usize];
let key_b = &self.parent.dict_keys[b.0 as usize];
key_a.cmp(key_b)
});

pub fn finish(self) {
let data_size = self.parent.offset() - self.start_pos;
let num_fields = self.fields.len();
let is_large = num_fields > u8::MAX as usize;
let size_bytes = if is_large { 4 } else { 1 };

let max_id = self.fields.iter().map(|&(id, _)| id).max().unwrap_or(0);
let id_size = int_size(max_id as usize);
let field_ids_by_sorted_field_name = self
.parent
.dict
.iter()
.filter_map(|(_, id)| self.fields.contains_key(id).then_some(*id))
.collect::<Vec<_>>();

let max_id = self.fields.keys().last().copied().unwrap_or(0) as usize;

let id_size = int_size(max_id);
let offset_size = int_size(data_size);

let header_size = 1
Expand All @@ -542,17 +550,18 @@ impl<'a> ObjectBuilder<'a> {
}

// Write field IDs (sorted order)
for &(id, _) in &self.fields {
for id in &field_ids_by_sorted_field_name {
write_offset(
&mut self.parent.buffer[pos..pos + id_size as usize],
id as usize,
*id as usize,
id_size,
);
pos += id_size as usize;
}

// Write field offsets
for &(_, offset) in &self.fields {
for id in &field_ids_by_sorted_field_name {
let &offset = self.fields.get(id).unwrap();
write_offset(
&mut self.parent.buffer[pos..pos + offset_size as usize],
offset,
Expand Down Expand Up @@ -749,6 +758,77 @@ mod tests {
assert_eq!(field_ids, vec![1, 2, 0]);
}

#[test]
fn test_object_and_metadata_ordering() {
let mut builder = VariantBuilder::new();

let mut obj = builder.new_object();

obj.append_value("zebra", "stripes"); // ID = 0
obj.append_value("apple", "red"); // ID = 1

{
// fields_map is ordered by insertion order (field id)
let fields_map = obj.fields.keys().copied().collect::<Vec<_>>();
assert_eq!(fields_map, vec![0, 1]);

// dict is ordered by field names
// NOTE: when we support nested objects, we'll want to perform a filter by fields_map field ids
let dict_metadata = obj
.parent
.dict
.iter()
.map(|(f, i)| (f.as_str(), *i))
.collect::<Vec<_>>();

assert_eq!(dict_metadata, vec![("apple", 1), ("zebra", 0)]);

// dict_keys is ordered by insertion order (field id)
let dict_keys = obj
.parent
.dict_keys
.iter()
.map(|k| k.as_str())
.collect::<Vec<_>>();
assert_eq!(dict_keys, vec!["zebra", "apple"]);
}

obj.append_value("banana", "yellow"); // ID = 2

{
// fields_map is ordered by insertion order (field id)
let fields_map = obj.fields.keys().copied().collect::<Vec<_>>();
assert_eq!(fields_map, vec![0, 1, 2]);

// dict is ordered by field names
// NOTE: when we support nested objects, we'll want to perform a filter by fields_map field ids
let dict_metadata = obj
.parent
.dict
.iter()
.map(|(f, i)| (f.as_str(), *i))
.collect::<Vec<_>>();

assert_eq!(
dict_metadata,
vec![("apple", 1), ("banana", 2), ("zebra", 0)]
);

// dict_keys is ordered by insertion order (field id)
let dict_keys = obj
.parent
.dict_keys
.iter()
.map(|k| k.as_str())
.collect::<Vec<_>>();
assert_eq!(dict_keys, vec!["zebra", "apple", "banana"]);
}

obj.finish();

builder.finish();
}

#[test]
fn test_append_object() {
let (object_metadata, object_value) = {
Expand Down
Loading
Loading