From 6dbd9e778591be23bd7faa135c0124015bc2f4f9 Mon Sep 17 00:00:00 2001 From: klion26 Date: Tue, 15 Jul 2025 20:06:12 +0800 Subject: [PATCH 01/11] [Variant] Avoid extra allocation in object builder This commit will reuse the parent buffer for object builder. It can avoid the extra allocation for the object and the later buffer copy. --- parquet-variant/src/builder.rs | 170 ++++++++++++++++++++++++++------- 1 file changed, 137 insertions(+), 33 deletions(-) diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index ae82cfec9d3a..9273277ae000 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -1028,7 +1028,11 @@ impl Drop for ListBuilder<'_> { pub struct ObjectBuilder<'a> { parent_state: ParentState<'a>, fields: IndexMap, // (field_id, offset) - buffer: ValueBuffer, + /// the starting offset in the parent's buffer where this object starts + object_start_offset: usize, + /// whether the object has been finished, the written content of the current object + /// will be truncated in `drop` if `has_been_finished` is false + has_been_finished: bool, validate_unique_fields: bool, /// Set of duplicate fields to report for errors duplicate_fields: HashSet, @@ -1036,10 +1040,16 @@ pub struct ObjectBuilder<'a> { impl<'a> ObjectBuilder<'a> { fn new(parent_state: ParentState<'a>, validate_unique_fields: bool) -> Self { + let start_offset = match &parent_state { + ParentState::Variant { buffer, .. } => buffer.offset(), + ParentState::List { buffer, .. } => buffer.offset(), + ParentState::Object { buffer, .. } => buffer.offset(), + }; Self { parent_state, fields: IndexMap::new(), - buffer: ValueBuffer::default(), + object_start_offset: start_offset, + has_been_finished: false, validate_unique_fields, duplicate_fields: HashSet::new(), } @@ -1068,13 +1078,14 @@ impl<'a> ObjectBuilder<'a> { let metadata_builder = self.parent_state.metadata_builder(); let field_id = metadata_builder.upsert_field_name(key); - let field_start = self.buffer.offset(); + // field_start is a relevant offset from the buffer this object is being built in. + let field_start = self.parent_state.buffer().offset() - self.object_start_offset; if self.fields.insert(field_id, field_start).is_some() && self.validate_unique_fields { self.duplicate_fields.insert(field_id); } - self.buffer + self.parent_state.buffer() .try_append_variant(value.into(), metadata_builder)?; Ok(()) @@ -1091,13 +1102,48 @@ impl<'a> ObjectBuilder<'a> { // Returns validate_unique_fields because we can no longer reference self once this method returns. fn parent_state<'b>(&'b mut self, key: &'b str) -> (ParentState<'b>, bool) { - let state = ParentState::Object { - buffer: &mut self.buffer, - metadata_builder: self.parent_state.metadata_builder(), - fields: &mut self.fields, - field_name: key, - }; - (state, self.validate_unique_fields) + let validate_unique_fields = self.validate_unique_fields; + + match &mut self.parent_state { + ParentState::Variant { + buffer, + metadata_builder, + } => { + let state = ParentState::Object { + buffer, + metadata_builder, + fields: &mut self.fields, + field_name: key, + }; + (state, validate_unique_fields) + } + ParentState::List { + buffer, + metadata_builder, + .. + } => { + let state = ParentState::Object { + buffer, + metadata_builder, + fields: &mut self.fields, + field_name: key, + }; + (state, validate_unique_fields) + } + ParentState::Object { + buffer, + metadata_builder, + .. + } => { + let state = ParentState::Object { + buffer, + metadata_builder, + fields: &mut self.fields, + field_name: key, + }; + (state, validate_unique_fields) + } + } } /// Returns an object builder that can be used to append a new (nested) object to this object. @@ -1118,8 +1164,8 @@ impl<'a> ObjectBuilder<'a> { /// Finalizes this object and appends it to its parent, which otherwise remains unmodified. pub fn finish(mut self) -> Result<(), ArrowError> { - let metadata_builder = self.parent_state.metadata_builder(); if self.validate_unique_fields && !self.duplicate_fields.is_empty() { + let metadata_builder = self.parent_state.metadata_builder(); let mut names = self .duplicate_fields .iter() @@ -1134,39 +1180,89 @@ impl<'a> ObjectBuilder<'a> { ))); } - let data_size = self.buffer.offset(); - let num_fields = self.fields.len(); - let is_large = num_fields > u8::MAX as usize; + let metadata_builder = match &self.parent_state { + ParentState::Variant { + metadata_builder, .. + } => metadata_builder, + ParentState::List { + metadata_builder, .. + } => metadata_builder, + ParentState::Object { + metadata_builder, .. + } => metadata_builder, + }; self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { - let key_a = &metadata_builder.field_name(field_a_id as usize); - let key_b = &metadata_builder.field_name(field_b_id as usize); - key_a.cmp(key_b) + let field_a_name = metadata_builder.field_name(field_a_id as usize); + let field_b_name = metadata_builder.field_name(field_b_id as usize); + field_a_name.cmp(field_b_name) }); - let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); - let id_size = int_size(max_id as usize); - let offset_size = int_size(data_size); - // Get parent's buffer let parent_buffer = self.parent_state.buffer(); - let starting_offset = parent_buffer.offset(); + let current_offset = parent_buffer.offset(); + // current object starts from `object_start_offset` + let data_size = current_offset - self.object_start_offset; + let offset_size = int_size(data_size); - // Write header + let num_fields = self.fields.len(); + let is_large = num_fields > u8::MAX as usize; + + let header_size = 1 + // header byte + (if is_large { 4 } else { 1 }) + // num_fields + (num_fields * id_size as usize) + // field IDs + ((num_fields + 1) * offset_size as usize); // field offsets + data_size + + let starting_offset = self.object_start_offset; + + // Shift existing data to make room for the header + let buffer = parent_buffer.inner_mut(); + buffer.splice(starting_offset..starting_offset, vec![0u8; header_size]); + + // Write header at the original start position + let mut header_pos = starting_offset; + + // Write header byte let header = object_header(is_large, id_size, offset_size); - parent_buffer.append_header(header, is_large, num_fields); + buffer[header_pos] = header; + header_pos += 1; - // Write field IDs (sorted order) - let ids = self.fields.keys().map(|id| *id as usize); - parent_buffer.append_offset_array(ids, None, id_size); + // Write number of fields + if is_large { + buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as u32).to_le_bytes()); + header_pos += 4; + } else { + buffer[header_pos] = num_fields as u8; + header_pos += 1; + } + + // Write field IDs + for (&field_id, _) in &self.fields { + let id_bytes = (field_id as usize).to_le_bytes(); + buffer[header_pos..header_pos + id_size as usize] + .copy_from_slice(&id_bytes[..id_size as usize]); + header_pos += id_size as usize; + } + + // Write field offsets (adjusted for header) + for (_, &relative_offset) in &self.fields { + let offset_bytes = relative_offset.to_le_bytes(); + buffer[header_pos..header_pos + offset_size as usize] + .copy_from_slice(&offset_bytes[..offset_size as usize]); + header_pos += offset_size as usize; + } + + // Write data_size + let data_size_bytes = data_size.to_le_bytes(); + buffer[header_pos..header_pos + offset_size as usize] + .copy_from_slice(&data_size_bytes[..offset_size as usize]); - // Write the field offset array, followed by the value bytes - let offsets = std::mem::take(&mut self.fields).into_values(); - parent_buffer.append_offset_array(offsets, Some(data_size), offset_size); - parent_buffer.append_slice(self.buffer.inner()); self.parent_state.finish(starting_offset); + // mark that this object has been finished + self.has_been_finished = true; + Ok(()) } } @@ -1176,7 +1272,15 @@ impl<'a> ObjectBuilder<'a> { /// This is to ensure that the object is always finalized before its parent builder /// is finalized. impl Drop for ObjectBuilder<'_> { - fn drop(&mut self) {} + fn drop(&mut self) { + // truncate the buffer if the `finish` method has not been called. + if !self.has_been_finished { + self.parent_state + .buffer() + .inner_mut() + .truncate(self.object_start_offset); + } + } } /// Extends [`VariantBuilder`] to help building nested [`Variant`]s From 55def1d40fa71e673f88208c6002e2fcc69a34c9 Mon Sep 17 00:00:00 2001 From: klion26 Date: Wed, 16 Jul 2025 18:11:43 +0800 Subject: [PATCH 02/11] fixup! [Variant] Avoid extra allocation in object builder trigger ci --- parquet-variant/src/builder.rs | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 9273277ae000..89d3b47422ee 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -506,6 +506,7 @@ enum ParentState<'a> { metadata_builder: &'a mut MetadataBuilder, fields: &'a mut IndexMap, field_name: &'a str, + object_start_offset: usize, }, } @@ -999,8 +1000,17 @@ impl<'a> ListBuilder<'a> { let offset_size = int_size(data_size); // Get parent's buffer + let offset_shift = match &self.parent_state { + ParentState::Object { + object_start_offset, + .. + } => *object_start_offset, + _ => 0, + }; let parent_buffer = self.parent_state.buffer(); - let starting_offset = parent_buffer.offset(); + // as object builder has been reused the parent buffer, + // we need to shift the offset by the starting offset of the parent object + let starting_offset = parent_buffer.offset() - offset_shift; // Write header let header = array_header(is_large, offset_size); @@ -1114,6 +1124,7 @@ impl<'a> ObjectBuilder<'a> { metadata_builder, fields: &mut self.fields, field_name: key, + object_start_offset: self.object_start_offset, }; (state, validate_unique_fields) } @@ -1127,6 +1138,7 @@ impl<'a> ObjectBuilder<'a> { metadata_builder, fields: &mut self.fields, field_name: key, + object_start_offset: self.object_start_offset, }; (state, validate_unique_fields) } @@ -1140,6 +1152,7 @@ impl<'a> ObjectBuilder<'a> { metadata_builder, fields: &mut self.fields, field_name: key, + object_start_offset: self.object_start_offset, }; (state, validate_unique_fields) } @@ -1258,7 +1271,15 @@ impl<'a> ObjectBuilder<'a> { buffer[header_pos..header_pos + offset_size as usize] .copy_from_slice(&data_size_bytes[..offset_size as usize]); - self.parent_state.finish(starting_offset); + let start_offset_shift = match &self.parent_state { + ParentState::Object { + object_start_offset, + .. + } => *object_start_offset, + _ => 0, + }; + self.parent_state + .finish(starting_offset - start_offset_shift); // mark that this object has been finished self.has_been_finished = true; From 442c935c3414eecbd813494ea003726ffb8cf539 Mon Sep 17 00:00:00 2001 From: klion26 Date: Thu, 17 Jul 2025 11:47:31 +0800 Subject: [PATCH 03/11] fixup! [Variant] Avoid extra allocation in object builder --- parquet-variant/src/builder.rs | 101 ++++++++++++++++++++++++++++----- 1 file changed, 87 insertions(+), 14 deletions(-) diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 89d3b47422ee..5306c97a57e2 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -1084,21 +1084,58 @@ impl<'a> ObjectBuilder<'a> { key: &str, value: T, ) -> Result<(), ArrowError> { - // Get metadata_builder from parent state - let metadata_builder = self.parent_state.metadata_builder(); + match &mut self.parent_state { + ParentState::Variant { + buffer, + metadata_builder, + } => { + let field_id = metadata_builder.upsert_field_name(key); + let field_start = buffer.offset() - self.object_start_offset; - let field_id = metadata_builder.upsert_field_name(key); - // field_start is a relevant offset from the buffer this object is being built in. - let field_start = self.parent_state.buffer().offset() - self.object_start_offset; + if self.fields.insert(field_id, field_start).is_some() + && self.validate_unique_fields + { + self.duplicate_fields.insert(field_id); + } - if self.fields.insert(field_id, field_start).is_some() && self.validate_unique_fields { - self.duplicate_fields.insert(field_id); - } + buffer.try_append_variant(value.into(), metadata_builder)?; + Ok(()) + } + ParentState::List { + buffer, + metadata_builder, + .. + } => { + let field_id = metadata_builder.upsert_field_name(key); + let field_start = buffer.offset() - self.object_start_offset; - self.parent_state.buffer() - .try_append_variant(value.into(), metadata_builder)?; + if self.fields.insert(field_id, field_start).is_some() + && self.validate_unique_fields + { + self.duplicate_fields.insert(field_id); + } - Ok(()) + buffer.try_append_variant(value.into(), metadata_builder)?; + Ok(()) + } + ParentState::Object { + buffer, + metadata_builder, + .. + } => { + let field_id = metadata_builder.upsert_field_name(key); + let field_start = buffer.offset() - self.object_start_offset; + + if self.fields.insert(field_id, field_start).is_some() + && self.validate_unique_fields + { + self.duplicate_fields.insert(field_id); + } + + buffer.try_append_variant(value.into(), metadata_builder)?; + Ok(()) + } + } } /// Enables validation for unique field keys when inserting into this object. @@ -1936,7 +1973,13 @@ mod tests { { "a": false, "c": { - "b": "a" + "b": "a", + "c": { + "aa": "bb", + }, + "d": { + "cc": "dd" + } } "b": true, } @@ -1951,6 +1994,18 @@ mod tests { { let mut inner_object_builder = outer_object_builder.new_object("c"); inner_object_builder.insert("b", "a"); + + { + let mut inner_inner_object_builder = inner_object_builder.new_object("c"); + inner_inner_object_builder.insert("aa", "bb"); + let _ = inner_inner_object_builder.finish(); + } + + { + let mut inner_inner_object_builder = inner_object_builder.new_object("d"); + inner_inner_object_builder.insert("cc", "dd"); + let _ = inner_inner_object_builder.finish(); + } let _ = inner_object_builder.finish(); } @@ -1967,7 +2022,13 @@ mod tests { "a": false, "b": true, "c": { - "b": "a" + "b": "a", + "c": { + "aa": "bb", + }, + "d": { + "cc": "dd" + } } } */ @@ -1985,10 +2046,22 @@ mod tests { let inner_object_variant = outer_object.field(2).unwrap(); let inner_object = inner_object_variant.as_object().unwrap(); - assert_eq!(inner_object.len(), 1); + assert_eq!(inner_object.len(), 3); assert_eq!(inner_object.field_name(0).unwrap(), "b"); assert_eq!(inner_object.field(0).unwrap(), Variant::from("a")); + let inner_iner_object_variant_c = inner_object.field(1).unwrap(); + let inner_inner_object_c = inner_iner_object_variant_c.as_object().unwrap(); + assert_eq!(inner_inner_object_c.len(), 1); + assert_eq!(inner_inner_object_c.field_name(0).unwrap(), "aa"); + assert_eq!(inner_inner_object_c.field(0).unwrap(), Variant::from("bb")); + + let inner_iner_object_variant_d = inner_object.field(2).unwrap(); + let inner_inner_object_d = inner_iner_object_variant_d.as_object().unwrap(); + assert_eq!(inner_inner_object_d.len(), 1); + assert_eq!(inner_inner_object_d.field_name(0).unwrap(), "cc"); + assert_eq!(inner_inner_object_d.field(0).unwrap(), Variant::from("dd")); + assert_eq!(outer_object.field_name(1).unwrap(), "b"); assert_eq!(outer_object.field(1).unwrap(), Variant::from(true)); } From f5b04653db8f5c995c7a9e1103ca37e010d7cc5c Mon Sep 17 00:00:00 2001 From: klion26 Date: Fri, 18 Jul 2025 13:07:31 +0800 Subject: [PATCH 04/11] fixup! [Variant] Avoid extra allocation in object builder --- parquet-variant/src/builder.rs | 350 +++++++++++++++++++++------------ 1 file changed, 226 insertions(+), 124 deletions(-) diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 5306c97a57e2..7271dab7e34a 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -552,6 +552,49 @@ impl ParentState<'_> { } } } + + // returns the beginning offset of buffer for the parent if it is object builder, else 0. + // for object builder will reuse the buffer from the parent, this is needed for `finish` + // which needs the relative offset from the current variant. + fn object_start_offset(&self) -> usize { + match self { + ParentState::Object { + object_start_offset, + .. + } => *object_start_offset, + _ => 0, + } + } + + /// Return mutable references to the buffer and metadata builder that this + /// parent state is using. + fn buffer_and_metadata_builder(&mut self) -> (&mut ValueBuffer, &mut MetadataBuilder) { + match self { + ParentState::Variant { + buffer, + metadata_builder, + } => (buffer, metadata_builder), + ParentState::List { + buffer, + metadata_builder, + .. + } => (buffer, metadata_builder), + ParentState::Object { + buffer, + metadata_builder, + .. + } => (buffer, metadata_builder), + } + } + + // return the offset of the underlying buffer at the time of calling this method. + fn buffer_current_offset(&self) -> usize { + match self { + ParentState::Variant { buffer, .. } => buffer.offset(), + ParentState::Object { buffer, .. } => buffer.offset(), + ParentState::List { buffer, .. } => buffer.offset(), + } + } } /// Top level builder for [`Variant`] values @@ -1000,13 +1043,7 @@ impl<'a> ListBuilder<'a> { let offset_size = int_size(data_size); // Get parent's buffer - let offset_shift = match &self.parent_state { - ParentState::Object { - object_start_offset, - .. - } => *object_start_offset, - _ => 0, - }; + let offset_shift = self.parent_state.object_start_offset(); let parent_buffer = self.parent_state.buffer(); // as object builder has been reused the parent buffer, // we need to shift the offset by the starting offset of the parent object @@ -1050,11 +1087,7 @@ pub struct ObjectBuilder<'a> { impl<'a> ObjectBuilder<'a> { fn new(parent_state: ParentState<'a>, validate_unique_fields: bool) -> Self { - let start_offset = match &parent_state { - ParentState::Variant { buffer, .. } => buffer.offset(), - ParentState::List { buffer, .. } => buffer.offset(), - ParentState::Object { buffer, .. } => buffer.offset(), - }; + let start_offset = parent_state.buffer_current_offset(); Self { parent_state, fields: IndexMap::new(), @@ -1084,58 +1117,17 @@ impl<'a> ObjectBuilder<'a> { key: &str, value: T, ) -> Result<(), ArrowError> { - match &mut self.parent_state { - ParentState::Variant { - buffer, - metadata_builder, - } => { - let field_id = metadata_builder.upsert_field_name(key); - let field_start = buffer.offset() - self.object_start_offset; + let (buffer, metadata_builder) = self.parent_state.buffer_and_metadata_builder(); - if self.fields.insert(field_id, field_start).is_some() - && self.validate_unique_fields - { - self.duplicate_fields.insert(field_id); - } + let field_id = metadata_builder.upsert_field_name(key); + let field_start = buffer.offset() - self.object_start_offset; - buffer.try_append_variant(value.into(), metadata_builder)?; - Ok(()) - } - ParentState::List { - buffer, - metadata_builder, - .. - } => { - let field_id = metadata_builder.upsert_field_name(key); - let field_start = buffer.offset() - self.object_start_offset; - - if self.fields.insert(field_id, field_start).is_some() - && self.validate_unique_fields - { - self.duplicate_fields.insert(field_id); - } - - buffer.try_append_variant(value.into(), metadata_builder)?; - Ok(()) - } - ParentState::Object { - buffer, - metadata_builder, - .. - } => { - let field_id = metadata_builder.upsert_field_name(key); - let field_start = buffer.offset() - self.object_start_offset; - - if self.fields.insert(field_id, field_start).is_some() - && self.validate_unique_fields - { - self.duplicate_fields.insert(field_id); - } - - buffer.try_append_variant(value.into(), metadata_builder)?; - Ok(()) - } + if self.fields.insert(field_id, field_start).is_some() && self.validate_unique_fields { + self.duplicate_fields.insert(field_id); } + + buffer.try_append_variant(value.into(), metadata_builder)?; + Ok(()) } /// Enables validation for unique field keys when inserting into this object. @@ -1151,49 +1143,16 @@ impl<'a> ObjectBuilder<'a> { fn parent_state<'b>(&'b mut self, key: &'b str) -> (ParentState<'b>, bool) { let validate_unique_fields = self.validate_unique_fields; - match &mut self.parent_state { - ParentState::Variant { - buffer, - metadata_builder, - } => { - let state = ParentState::Object { - buffer, - metadata_builder, - fields: &mut self.fields, - field_name: key, - object_start_offset: self.object_start_offset, - }; - (state, validate_unique_fields) - } - ParentState::List { - buffer, - metadata_builder, - .. - } => { - let state = ParentState::Object { - buffer, - metadata_builder, - fields: &mut self.fields, - field_name: key, - object_start_offset: self.object_start_offset, - }; - (state, validate_unique_fields) - } - ParentState::Object { - buffer, - metadata_builder, - .. - } => { - let state = ParentState::Object { - buffer, - metadata_builder, - fields: &mut self.fields, - field_name: key, - object_start_offset: self.object_start_offset, - }; - (state, validate_unique_fields) - } - } + let (buffer, metadata_builder) = self.parent_state.buffer_and_metadata_builder(); + + let state = ParentState::Object { + buffer, + metadata_builder, + fields: &mut self.fields, + field_name: key, + object_start_offset: self.object_start_offset, + }; + (state, validate_unique_fields) } /// Returns an object builder that can be used to append a new (nested) object to this object. @@ -1230,17 +1189,7 @@ impl<'a> ObjectBuilder<'a> { ))); } - let metadata_builder = match &self.parent_state { - ParentState::Variant { - metadata_builder, .. - } => metadata_builder, - ParentState::List { - metadata_builder, .. - } => metadata_builder, - ParentState::Object { - metadata_builder, .. - } => metadata_builder, - }; + let metadata_builder = self.parent_state.metadata_builder(); self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { let field_a_name = metadata_builder.field_name(field_a_id as usize); @@ -1308,13 +1257,7 @@ impl<'a> ObjectBuilder<'a> { buffer[header_pos..header_pos + offset_size as usize] .copy_from_slice(&data_size_bytes[..offset_size as usize]); - let start_offset_shift = match &self.parent_state { - ParentState::Object { - object_start_offset, - .. - } => *object_start_offset, - _ => 0, - }; + let start_offset_shift = self.parent_state.object_start_offset(); self.parent_state .finish(starting_offset - start_offset_shift); @@ -1980,8 +1923,13 @@ mod tests { "d": { "cc": "dd" } - } + }, "b": true, + "d": { + "e": 1, + "f": [1, true], + "g": ["tree", false], + } } */ @@ -2011,6 +1959,28 @@ mod tests { outer_object_builder.insert("b", true); + { + let mut inner_object_builder = outer_object_builder.new_object("d"); + inner_object_builder.insert("e", 1); + { + let mut inner_list_builder = inner_object_builder.new_list("f"); + inner_list_builder.append_value(1); + inner_list_builder.append_value(true); + + inner_list_builder.finish(); + } + + { + let mut inner_list_builder = inner_object_builder.new_list("g"); + inner_list_builder.append_value("tree"); + inner_list_builder.append_value(false); + + inner_list_builder.finish(); + } + + let _ = inner_object_builder.finish(); + } + let _ = outer_object_builder.finish(); } @@ -2029,6 +1999,11 @@ mod tests { "d": { "cc": "dd" } + }, + "d": { + "e": 1, + "f": [1, true], + "g": ["tree", false], } } */ @@ -2036,7 +2011,7 @@ mod tests { let variant = Variant::try_new(&metadata, &value).unwrap(); let outer_object = variant.as_object().unwrap(); - assert_eq!(outer_object.len(), 3); + assert_eq!(outer_object.len(), 4); assert_eq!(outer_object.field_name(0).unwrap(), "a"); assert_eq!(outer_object.field(0).unwrap(), Variant::from(false)); @@ -2064,6 +2039,133 @@ mod tests { assert_eq!(outer_object.field_name(1).unwrap(), "b"); assert_eq!(outer_object.field(1).unwrap(), Variant::from(true)); + + let out_object_variant_d = outer_object.field(3).unwrap(); + let out_object_d = out_object_variant_d.as_object().unwrap(); + assert_eq!(out_object_d.len(), 3); + assert_eq!("e", out_object_d.field_name(0).unwrap()); + assert_eq!(Variant::from(1), out_object_d.field(0).unwrap()); + assert_eq!("f", out_object_d.field_name(1).unwrap()); + + let first_inner_list_variant_f = out_object_d.field(1).unwrap(); + let first_inner_list_f = first_inner_list_variant_f.as_list().unwrap(); + assert_eq!(2, first_inner_list_f.len()); + assert_eq!(Variant::from(1), first_inner_list_f.get(0).unwrap()); + assert_eq!(Variant::from(true), first_inner_list_f.get(1).unwrap()); + + let second_inner_list_variant_g = out_object_d.field(2).unwrap(); + let second_inner_list_g = second_inner_list_variant_g.as_list().unwrap(); + assert_eq!(2, second_inner_list_g.len()); + assert_eq!(Variant::from("tree"), second_inner_list_g.get(0).unwrap()); + assert_eq!(Variant::from(false), second_inner_list_g.get(1).unwrap()); + } + + // this test wants to cover the logic for reuse parent buffer for list builder + // the builder looks like + // [ "apple", "false", [{"a": "b", "b": "c"}, {"c":"d", "d":"e"}], [[1, true], ["tree", false]], 1] + #[test] + fn test_nested_list_with_heterogeneous_fields_for_buffer_reuse() { + let mut builder = VariantBuilder::new(); + + { + let mut outer_list_builder = builder.new_list(); + + outer_list_builder.append_value("apple"); + outer_list_builder.append_value(false); + + { + // the list here wants to cover the logic object builder inside list builder + let mut inner_list_builder = outer_list_builder.new_list(); + + { + let mut inner_object_builder = inner_list_builder.new_object(); + inner_object_builder.insert("a", "b"); + inner_object_builder.insert("b", "c"); + let _ = inner_object_builder.finish(); + } + + { + // the seconde object builder here wants to cover the logic for + // list builder resue the parent buffer. + let mut inner_object_builder = inner_list_builder.new_object(); + inner_object_builder.insert("c", "d"); + inner_object_builder.insert("d", "e"); + let _ = inner_object_builder.finish(); + } + + inner_list_builder.finish(); + } + + { + // the list here wants to cover the logic list builder inside list builder + let mut inner_list_builder = outer_list_builder.new_list(); + + { + let mut double_inner_list_builder = inner_list_builder.new_list(); + double_inner_list_builder.append_value(1); + double_inner_list_builder.append_value(true); + + double_inner_list_builder.finish(); + } + + { + let mut double_inner_list_builder = inner_list_builder.new_list(); + double_inner_list_builder.append_value("tree"); + double_inner_list_builder.append_value(false); + + double_inner_list_builder.finish(); + } + inner_list_builder.finish(); + } + + outer_list_builder.append_value(1); + + outer_list_builder.finish(); + } + + let (metadata, value) = builder.finish(); + + let variant = Variant::try_new(&metadata, &value).unwrap(); + let outer_list = variant.as_list().unwrap(); + + assert_eq!(5, outer_list.len()); + + // primitive value + assert_eq!(Variant::from("apple"), outer_list.get(0).unwrap()); + assert_eq!(Variant::from(false), outer_list.get(1).unwrap()); + assert_eq!(Variant::from(1), outer_list.get(4).unwrap()); + + // the first inner list [{"a": "b", "b": "c"}, {"c":"d", "d":"e"}] + let list1_variant = outer_list.get(2).unwrap(); + let list1 = list1_variant.as_list().unwrap(); + assert_eq!(2, list1.len()); + + let list1_obj1_variant = list1.get(0).unwrap(); + let list1_obj1 = list1_obj1_variant.as_object().unwrap(); + assert_eq!("a", list1_obj1.field_name(0).unwrap()); + assert_eq!(Variant::from("b"), list1_obj1.field(0).unwrap()); + + assert_eq!("b", list1_obj1.field_name(1).unwrap()); + assert_eq!(Variant::from("c"), list1_obj1.field(1).unwrap()); + + // the second inner list [[1, true], ["tree", false]] + let list2_variant = outer_list.get(3).unwrap(); + let list2 = list2_variant.as_list().unwrap(); + assert_eq!(2, list2.len()); + + // the list [1, true] + let list2_list1_variant = list2.get(0).unwrap(); + let list2_list1 = list2_list1_variant.as_list().unwrap(); + assert_eq!(2, list2_list1.len()); + assert_eq!(Variant::from(1), list2_list1.get(0).unwrap()); + assert_eq!(Variant::from(true), list2_list1.get(1).unwrap()); + + // the list ["true", false] + let list2_list2_variant = list2.get(1).unwrap(); + let list2_list2 = list2_list2_variant.as_list().unwrap(); + assert_eq!(2, list2_list2.len()); + assert_eq!(Variant::from("tree"), list2_list2.get(0).unwrap()); + assert_eq!(Variant::from(false), list2_list2.get(1).unwrap()); } #[test] From 2b1edde68ce3f70c29290de00d9d07e39f58109d Mon Sep 17 00:00:00 2001 From: klion26 Date: Mon, 21 Jul 2025 00:06:30 +0800 Subject: [PATCH 05/11] fixup! Add tests for `BatchCoalescer::push_batch_with_filter`, fix bug (#7774) --- parquet-variant/src/builder.rs | 49 +++++++++++++--------------------- 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 7271dab7e34a..ddad91f62e90 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -545,27 +545,16 @@ impl ParentState<'_> { metadata_builder, fields, field_name, + object_start_offset, .. } => { let field_id = metadata_builder.upsert_field_name(field_name); - fields.insert(field_id, starting_offset); + let shifted_start_offset = starting_offset - *object_start_offset; + fields.insert(field_id, shifted_start_offset); } } } - // returns the beginning offset of buffer for the parent if it is object builder, else 0. - // for object builder will reuse the buffer from the parent, this is needed for `finish` - // which needs the relative offset from the current variant. - fn object_start_offset(&self) -> usize { - match self { - ParentState::Object { - object_start_offset, - .. - } => *object_start_offset, - _ => 0, - } - } - /// Return mutable references to the buffer and metadata builder that this /// parent state is using. fn buffer_and_metadata_builder(&mut self) -> (&mut ValueBuffer, &mut MetadataBuilder) { @@ -573,13 +562,13 @@ impl ParentState<'_> { ParentState::Variant { buffer, metadata_builder, - } => (buffer, metadata_builder), - ParentState::List { + } + | ParentState::List { buffer, metadata_builder, .. - } => (buffer, metadata_builder), - ParentState::Object { + } + | ParentState::Object { buffer, metadata_builder, .. @@ -590,9 +579,9 @@ impl ParentState<'_> { // return the offset of the underlying buffer at the time of calling this method. fn buffer_current_offset(&self) -> usize { match self { - ParentState::Variant { buffer, .. } => buffer.offset(), - ParentState::Object { buffer, .. } => buffer.offset(), - ParentState::List { buffer, .. } => buffer.offset(), + ParentState::Variant { buffer, .. } + | ParentState::Object { buffer, .. } + | ParentState::List { buffer, .. } => buffer.offset(), } } } @@ -1043,11 +1032,10 @@ impl<'a> ListBuilder<'a> { let offset_size = int_size(data_size); // Get parent's buffer - let offset_shift = self.parent_state.object_start_offset(); let parent_buffer = self.parent_state.buffer(); // as object builder has been reused the parent buffer, // we need to shift the offset by the starting offset of the parent object - let starting_offset = parent_buffer.offset() - offset_shift; + let starting_offset = parent_buffer.offset(); // Write header let header = array_header(is_large, offset_size); @@ -1217,7 +1205,10 @@ impl<'a> ObjectBuilder<'a> { // Shift existing data to make room for the header let buffer = parent_buffer.inner_mut(); - buffer.splice(starting_offset..starting_offset, vec![0u8; header_size]); + buffer.splice( + starting_offset..starting_offset, + std::iter::repeat_n(0u8, header_size), + ); // Write header at the original start position let mut header_pos = starting_offset; @@ -1237,15 +1228,15 @@ impl<'a> ObjectBuilder<'a> { } // Write field IDs - for (&field_id, _) in &self.fields { - let id_bytes = (field_id as usize).to_le_bytes(); + for field_id in self.fields.keys() { + let id_bytes = field_id.to_le_bytes(); buffer[header_pos..header_pos + id_size as usize] .copy_from_slice(&id_bytes[..id_size as usize]); header_pos += id_size as usize; } // Write field offsets (adjusted for header) - for (_, &relative_offset) in &self.fields { + for relative_offset in self.fields.values() { let offset_bytes = relative_offset.to_le_bytes(); buffer[header_pos..header_pos + offset_size as usize] .copy_from_slice(&offset_bytes[..offset_size as usize]); @@ -1257,9 +1248,7 @@ impl<'a> ObjectBuilder<'a> { buffer[header_pos..header_pos + offset_size as usize] .copy_from_slice(&data_size_bytes[..offset_size as usize]); - let start_offset_shift = self.parent_state.object_start_offset(); - self.parent_state - .finish(starting_offset - start_offset_shift); + self.parent_state.finish(starting_offset); // mark that this object has been finished self.has_been_finished = true; From 690dc35baeaa802f22d07cad517227e944dd1202 Mon Sep 17 00:00:00 2001 From: klion26 Date: Mon, 21 Jul 2025 00:14:59 +0800 Subject: [PATCH 06/11] truncate metadata when drop object builder --- parquet-variant/src/builder.rs | 53 ++++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index ddad91f62e90..20fa4de741a4 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -584,6 +584,21 @@ impl ParentState<'_> { | ParentState::List { buffer, .. } => buffer.offset(), } } + + // return the current index of the undelying metadata buffer at the time of calling this method. + fn metadata_current_offset(&self) -> usize { + match self { + ParentState::Variant { + metadata_builder, .. + } + | ParentState::Object { + metadata_builder, .. + } + | ParentState::List { + metadata_builder, .. + } => metadata_builder.metadata_buffer.len(), + } + } } /// Top level builder for [`Variant`] values @@ -1065,6 +1080,9 @@ pub struct ObjectBuilder<'a> { fields: IndexMap, // (field_id, offset) /// the starting offset in the parent's buffer where this object starts object_start_offset: usize, + /// the starting offset in the parent's metadata buffer where this object starts + /// used to truncate the written fields in `drop` if the current object has not been finished + object_meta_start_offset: usize, /// whether the object has been finished, the written content of the current object /// will be truncated in `drop` if `has_been_finished` is false has_been_finished: bool, @@ -1076,11 +1094,13 @@ pub struct ObjectBuilder<'a> { impl<'a> ObjectBuilder<'a> { fn new(parent_state: ParentState<'a>, validate_unique_fields: bool) -> Self { let start_offset = parent_state.buffer_current_offset(); + let meta_start_offset = parent_state.metadata_current_offset(); Self { parent_state, fields: IndexMap::new(), object_start_offset: start_offset, has_been_finished: false, + object_meta_start_offset: meta_start_offset, validate_unique_fields, duplicate_fields: HashSet::new(), } @@ -1269,6 +1289,11 @@ impl Drop for ObjectBuilder<'_> { .buffer() .inner_mut() .truncate(self.object_start_offset); + + self.parent_state + .metadata_builder() + .field_names + .truncate(self.object_meta_start_offset); } } } @@ -2542,8 +2567,7 @@ mod tests { // The original builder should be unchanged let (metadata, value) = builder.finish(); let metadata = VariantMetadata::try_new(&metadata).unwrap(); - assert_eq!(metadata.len(), 1); - assert_eq!(&metadata[0], "name"); // not rolled back + assert!(metadata.is_empty()); // rolled back let variant = Variant::try_new_with_metadata(metadata, &value).unwrap(); assert_eq!(variant, Variant::Int8(42)); @@ -2617,8 +2641,7 @@ mod tests { list_builder.finish(); let (metadata, value) = builder.finish(); let metadata = VariantMetadata::try_new(&metadata).unwrap(); - assert_eq!(metadata.len(), 1); - assert_eq!(&metadata[0], "name"); // not rolled back + assert!(metadata.is_empty()); let variant = Variant::try_new_with_metadata(metadata, &value).unwrap(); let list = variant.as_list().unwrap(); @@ -2646,8 +2669,8 @@ mod tests { // Only the second attempt should appear in the final variant let (metadata, value) = builder.finish(); let metadata = VariantMetadata::try_new(&metadata).unwrap(); - assert_eq!(metadata.len(), 1); - assert_eq!(&metadata[0], "name"); // not rolled back + assert_eq!(metadata.len(), 1); // rolled back + assert_eq!(&metadata[0], "name"); let variant = Variant::try_new_with_metadata(metadata, &value).unwrap(); assert_eq!(variant, Variant::Int8(2)); @@ -2700,9 +2723,7 @@ mod tests { // Only the second attempt should appear in the final variant let (metadata, value) = builder.finish(); let metadata = VariantMetadata::try_new(&metadata).unwrap(); - assert_eq!(metadata.len(), 2); - assert_eq!(&metadata[0], "first"); - assert_eq!(&metadata[1], "nested"); // not rolled back + assert!(metadata.is_empty()); // rolled back let variant = Variant::try_new_with_metadata(metadata, &value).unwrap(); assert_eq!(variant, Variant::Int8(2)); @@ -2725,15 +2746,12 @@ mod tests { object_builder.finish().unwrap(); let (metadata, value) = builder.finish(); let metadata = VariantMetadata::try_new(&metadata).unwrap(); - assert_eq!(metadata.len(), 3); - assert_eq!(&metadata[0], "first"); - assert_eq!(&metadata[1], "name"); // not rolled back - assert_eq!(&metadata[2], "second"); + assert_eq!(metadata.len(), 1); // the fields of nested_object_builder has been rolled back + assert_eq!(&metadata[0], "second"); let variant = Variant::try_new_with_metadata(metadata, &value).unwrap(); let obj = variant.as_object().unwrap(); - assert_eq!(obj.len(), 2); - assert_eq!(obj.get("first"), Some(Variant::Int8(1))); + assert_eq!(obj.len(), 1); assert_eq!(obj.get("second"), Some(Variant::Int8(2))); } @@ -2756,10 +2774,7 @@ mod tests { // Only the second attempt should appear in the final variant let (metadata, value) = builder.finish(); let metadata = VariantMetadata::try_new(&metadata).unwrap(); - assert_eq!(metadata.len(), 3); - assert_eq!(&metadata[0], "first"); // not rolled back - assert_eq!(&metadata[1], "name"); // not rolled back - assert_eq!(&metadata[2], "nested"); // not rolled back + assert_eq!(metadata.len(), 0); // rolled back let variant = Variant::try_new_with_metadata(metadata, &value).unwrap(); assert_eq!(variant, Variant::Int8(2)); From bdf1f2dda4693263d59c27d04439995b9c49138a Mon Sep 17 00:00:00 2001 From: klion26 Date: Mon, 21 Jul 2025 00:21:30 +0800 Subject: [PATCH 07/11] use metadata lenght as the max_id for field id --- parquet-variant/src/builder.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 20fa4de741a4..25c136320a11 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -1204,8 +1204,12 @@ impl<'a> ObjectBuilder<'a> { let field_b_name = metadata_builder.field_name(field_b_id as usize); field_a_name.cmp(field_b_name) }); - let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); - let id_size = int_size(max_id as usize); + + // the length of the metadata's field names is a very cheap to compute the upper bound. + // it will almost always be a tight upper bound as well -- it would take a pretty + // carefully crafted object to use only the early field ids of a large dictionary. + let max_id = metadata_builder.field_names.len(); + let id_size = int_size(max_id); let parent_buffer = self.parent_state.buffer(); let current_offset = parent_buffer.offset(); From c76e612205b53c07f5db5eb4d63384670c21fcb0 Mon Sep 17 00:00:00 2001 From: klion26 Date: Mon, 21 Jul 2025 14:12:32 +0800 Subject: [PATCH 08/11] address comments from viirya --- parquet-variant/src/builder.rs | 182 +++++++++++++++++++++------------ 1 file changed, 117 insertions(+), 65 deletions(-) diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 25c136320a11..82f2f60cf8db 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -63,6 +63,12 @@ fn write_offset(buf: &mut Vec, value: usize, nbytes: u8) { buf.extend_from_slice(&bytes[..nbytes as usize]); } +/// Write little-endian integer to buffer at a specific position +fn write_offset_at_pos(buf: &mut [u8], start_pos: usize, value: usize, nbytes: u8) { + let bytes = value.to_le_bytes(); + buf[start_pos..start_pos + nbytes as usize].copy_from_slice(&bytes[..nbytes as usize]); +} + /// Wrapper around a `Vec` that provides methods for appending /// primitive values, variant types, and metadata. /// @@ -342,6 +348,63 @@ impl ValueBuffer { write_offset(buf, data_size, nbytes); } } + + /// Writes out the header byte for a variant object or list, from the starting position + /// of the buffer, will return the position after this write + fn append_header_start_from_buf_pos( + &mut self, + start_pos: usize, // the start position where the header will be inserted + header_byte: u8, + is_large: bool, + num_fields: usize, + ) -> usize { + let buffer = self.inner_mut(); + + // Write header at the original start position + let mut header_pos = start_pos; + + // Write header byte + buffer[header_pos] = header_byte; + header_pos += 1; + + // Write number of fields + if is_large { + buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as u32).to_le_bytes()); + header_pos += 4; + } else { + buffer[header_pos] = num_fields as u8; + header_pos += 1; + } + + header_pos + } + + /// Writes out the offsets for an array of offsets, including the final offset (data size). + /// from the starting position of the buffer, will return the position after this write + fn append_offset_array_start_from_buf_pos( + &mut self, + start_pos: usize, + offsets: impl IntoIterator, + data_size: Option, + nbytes: u8, + ) -> usize { + let buf = self.inner_mut(); + + let mut current_pos = start_pos; + for relative_offset in offsets { + write_offset_at_pos(buf, current_pos, relative_offset, nbytes); + current_pos += nbytes as usize; + } + + // Write data_size + if let Some(data_size) = data_size { + // Write data_size at the end of the offsets + write_offset_at_pos(buf, current_pos, data_size, nbytes); + current_pos += nbytes as usize; + } + + current_pos + } } /// Builder for constructing metadata for [`Variant`] values. @@ -506,7 +569,7 @@ enum ParentState<'a> { metadata_builder: &'a mut MetadataBuilder, fields: &'a mut IndexMap, field_name: &'a str, - object_start_offset: usize, + parent_offset_base: usize, }, } @@ -545,7 +608,7 @@ impl ParentState<'_> { metadata_builder, fields, field_name, - object_start_offset, + parent_offset_base: object_start_offset, .. } => { let field_id = metadata_builder.upsert_field_name(field_name); @@ -576,7 +639,7 @@ impl ParentState<'_> { } } - // return the offset of the underlying buffer at the time of calling this method. + // Return the offset of the underlying buffer at the time of calling this method. fn buffer_current_offset(&self) -> usize { match self { ParentState::Variant { buffer, .. } @@ -585,7 +648,7 @@ impl ParentState<'_> { } } - // return the current index of the undelying metadata buffer at the time of calling this method. + // Return the current index of the undelying metadata buffer at the time of calling this method. fn metadata_current_offset(&self) -> usize { match self { ParentState::Variant { @@ -1048,8 +1111,6 @@ impl<'a> ListBuilder<'a> { // Get parent's buffer let parent_buffer = self.parent_state.buffer(); - // as object builder has been reused the parent buffer, - // we need to shift the offset by the starting offset of the parent object let starting_offset = parent_buffer.offset(); // Write header @@ -1078,12 +1139,12 @@ impl Drop for ListBuilder<'_> { pub struct ObjectBuilder<'a> { parent_state: ParentState<'a>, fields: IndexMap, // (field_id, offset) - /// the starting offset in the parent's buffer where this object starts - object_start_offset: usize, - /// the starting offset in the parent's metadata buffer where this object starts + /// The starting offset in the parent's buffer where this object starts + parent_offset_base: usize, + /// The starting offset in the parent's metadata buffer where this object starts /// used to truncate the written fields in `drop` if the current object has not been finished - object_meta_start_offset: usize, - /// whether the object has been finished, the written content of the current object + parent_metadata_offset_base: usize, + /// Whether the object has been finished, the written content of the current object /// will be truncated in `drop` if `has_been_finished` is false has_been_finished: bool, validate_unique_fields: bool, @@ -1093,14 +1154,14 @@ pub struct ObjectBuilder<'a> { impl<'a> ObjectBuilder<'a> { fn new(parent_state: ParentState<'a>, validate_unique_fields: bool) -> Self { - let start_offset = parent_state.buffer_current_offset(); - let meta_start_offset = parent_state.metadata_current_offset(); + let offset_base = parent_state.buffer_current_offset(); + let meta_offset_base = parent_state.metadata_current_offset(); Self { parent_state, fields: IndexMap::new(), - object_start_offset: start_offset, + parent_offset_base: offset_base, has_been_finished: false, - object_meta_start_offset: meta_start_offset, + parent_metadata_offset_base: meta_offset_base, validate_unique_fields, duplicate_fields: HashSet::new(), } @@ -1128,7 +1189,7 @@ impl<'a> ObjectBuilder<'a> { let (buffer, metadata_builder) = self.parent_state.buffer_and_metadata_builder(); let field_id = metadata_builder.upsert_field_name(key); - let field_start = buffer.offset() - self.object_start_offset; + let field_start = buffer.offset() - self.parent_offset_base; if self.fields.insert(field_id, field_start).is_some() && self.validate_unique_fields { self.duplicate_fields.insert(field_id); @@ -1158,7 +1219,7 @@ impl<'a> ObjectBuilder<'a> { metadata_builder, fields: &mut self.fields, field_name: key, - object_start_offset: self.object_start_offset, + parent_offset_base: self.parent_offset_base, }; (state, validate_unique_fields) } @@ -1207,14 +1268,14 @@ impl<'a> ObjectBuilder<'a> { // the length of the metadata's field names is a very cheap to compute the upper bound. // it will almost always be a tight upper bound as well -- it would take a pretty - // carefully crafted object to use only the early field ids of a large dictionary. + // carefully crafted object to use only the early field ids of a large dictionary. let max_id = metadata_builder.field_names.len(); let id_size = int_size(max_id); let parent_buffer = self.parent_state.buffer(); let current_offset = parent_buffer.offset(); - // current object starts from `object_start_offset` - let data_size = current_offset - self.object_start_offset; + // Current object starts from `object_start_offset` + let data_size = current_offset - self.parent_offset_base; let offset_size = int_size(data_size); let num_fields = self.fields.len(); @@ -1225,7 +1286,7 @@ impl<'a> ObjectBuilder<'a> { (num_fields * id_size as usize) + // field IDs ((num_fields + 1) * offset_size as usize); // field offsets + data_size - let starting_offset = self.object_start_offset; + let starting_offset = self.parent_offset_base; // Shift existing data to make room for the header let buffer = parent_buffer.inner_mut(); @@ -1239,42 +1300,33 @@ impl<'a> ObjectBuilder<'a> { // Write header byte let header = object_header(is_large, id_size, offset_size); - buffer[header_pos] = header; - header_pos += 1; - - // Write number of fields - if is_large { - buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as u32).to_le_bytes()); - header_pos += 4; - } else { - buffer[header_pos] = num_fields as u8; - header_pos += 1; - } - - // Write field IDs - for field_id in self.fields.keys() { - let id_bytes = field_id.to_le_bytes(); - buffer[header_pos..header_pos + id_size as usize] - .copy_from_slice(&id_bytes[..id_size as usize]); - header_pos += id_size as usize; - } - - // Write field offsets (adjusted for header) - for relative_offset in self.fields.values() { - let offset_bytes = relative_offset.to_le_bytes(); - buffer[header_pos..header_pos + offset_size as usize] - .copy_from_slice(&offset_bytes[..offset_size as usize]); - header_pos += offset_size as usize; - } - - // Write data_size - let data_size_bytes = data_size.to_le_bytes(); - buffer[header_pos..header_pos + offset_size as usize] - .copy_from_slice(&data_size_bytes[..offset_size as usize]); + header_pos = self + .parent_state + .buffer() + .append_header_start_from_buf_pos(header_pos, header, is_large, num_fields); + + header_pos = self + .parent_state + .buffer() + .append_offset_array_start_from_buf_pos( + header_pos, + self.fields.keys().copied().map(|id| id as usize), + None, + id_size, + ); + + self.parent_state + .buffer() + .append_offset_array_start_from_buf_pos( + header_pos, + self.fields.values().copied(), + Some(data_size), + offset_size, + ); self.parent_state.finish(starting_offset); - // mark that this object has been finished + // Mark that this object has been finished self.has_been_finished = true; Ok(()) @@ -1287,17 +1339,17 @@ impl<'a> ObjectBuilder<'a> { /// is finalized. impl Drop for ObjectBuilder<'_> { fn drop(&mut self) { - // truncate the buffer if the `finish` method has not been called. + // Truncate the buffer if the `finish` method has not been called. if !self.has_been_finished { self.parent_state .buffer() .inner_mut() - .truncate(self.object_start_offset); + .truncate(self.parent_offset_base); self.parent_state .metadata_builder() .field_names - .truncate(self.object_meta_start_offset); + .truncate(self.parent_metadata_offset_base); } } } @@ -2078,7 +2130,7 @@ mod tests { assert_eq!(Variant::from(false), second_inner_list_g.get(1).unwrap()); } - // this test wants to cover the logic for reuse parent buffer for list builder + // This test wants to cover the logic for reuse parent buffer for list builder // the builder looks like // [ "apple", "false", [{"a": "b", "b": "c"}, {"c":"d", "d":"e"}], [[1, true], ["tree", false]], 1] #[test] @@ -2148,12 +2200,12 @@ mod tests { assert_eq!(5, outer_list.len()); - // primitive value + // Primitive value assert_eq!(Variant::from("apple"), outer_list.get(0).unwrap()); assert_eq!(Variant::from(false), outer_list.get(1).unwrap()); assert_eq!(Variant::from(1), outer_list.get(4).unwrap()); - // the first inner list [{"a": "b", "b": "c"}, {"c":"d", "d":"e"}] + // The first inner list [{"a": "b", "b": "c"}, {"c":"d", "d":"e"}] let list1_variant = outer_list.get(2).unwrap(); let list1 = list1_variant.as_list().unwrap(); assert_eq!(2, list1.len()); @@ -2166,19 +2218,19 @@ mod tests { assert_eq!("b", list1_obj1.field_name(1).unwrap()); assert_eq!(Variant::from("c"), list1_obj1.field(1).unwrap()); - // the second inner list [[1, true], ["tree", false]] + // The second inner list [[1, true], ["tree", false]] let list2_variant = outer_list.get(3).unwrap(); let list2 = list2_variant.as_list().unwrap(); assert_eq!(2, list2.len()); - // the list [1, true] + // The list [1, true] let list2_list1_variant = list2.get(0).unwrap(); let list2_list1 = list2_list1_variant.as_list().unwrap(); assert_eq!(2, list2_list1.len()); assert_eq!(Variant::from(1), list2_list1.get(0).unwrap()); assert_eq!(Variant::from(true), list2_list1.get(1).unwrap()); - // the list ["true", false] + // The list ["true", false] let list2_list2_variant = list2.get(1).unwrap(); let list2_list2 = list2_list2_variant.as_list().unwrap(); assert_eq!(2, list2_list2.len()); @@ -2673,8 +2725,8 @@ mod tests { // Only the second attempt should appear in the final variant let (metadata, value) = builder.finish(); let metadata = VariantMetadata::try_new(&metadata).unwrap(); - assert_eq!(metadata.len(), 1); // rolled back - assert_eq!(&metadata[0], "name"); + assert_eq!(metadata.len(), 1); + assert_eq!(&metadata[0], "name"); // not rolled back let variant = Variant::try_new_with_metadata(metadata, &value).unwrap(); assert_eq!(variant, Variant::Int8(2)); From f0d35de7ddea94d03c9b8808c077cce6a7045dc9 Mon Sep 17 00:00:00 2001 From: klion26 Date: Mon, 21 Jul 2025 14:22:05 +0800 Subject: [PATCH 09/11] calculate max_id with for-loop to see if we can pass all tests, will revert if needed --- parquet-variant/src/builder.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 82f2f60cf8db..08080551f621 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -1266,11 +1266,8 @@ impl<'a> ObjectBuilder<'a> { field_a_name.cmp(field_b_name) }); - // the length of the metadata's field names is a very cheap to compute the upper bound. - // it will almost always be a tight upper bound as well -- it would take a pretty - // carefully crafted object to use only the early field ids of a large dictionary. - let max_id = metadata_builder.field_names.len(); - let id_size = int_size(max_id); + let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); + let id_size = int_size(max_id as usize); let parent_buffer = self.parent_state.buffer(); let current_offset = parent_buffer.offset(); From d9a61d4ad1b5b9cdf0e698b7d4d40f6dc38cab30 Mon Sep 17 00:00:00 2001 From: klion26 Date: Tue, 22 Jul 2025 13:47:46 +0800 Subject: [PATCH 10/11] rename parent_offset_base to parent_value_offset_base --- parquet-variant/src/builder.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 08080551f621..cd42be1988fe 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -1140,7 +1140,7 @@ pub struct ObjectBuilder<'a> { parent_state: ParentState<'a>, fields: IndexMap, // (field_id, offset) /// The starting offset in the parent's buffer where this object starts - parent_offset_base: usize, + parent_value_offset_base: usize, /// The starting offset in the parent's metadata buffer where this object starts /// used to truncate the written fields in `drop` if the current object has not been finished parent_metadata_offset_base: usize, @@ -1159,7 +1159,7 @@ impl<'a> ObjectBuilder<'a> { Self { parent_state, fields: IndexMap::new(), - parent_offset_base: offset_base, + parent_value_offset_base: offset_base, has_been_finished: false, parent_metadata_offset_base: meta_offset_base, validate_unique_fields, @@ -1189,7 +1189,7 @@ impl<'a> ObjectBuilder<'a> { let (buffer, metadata_builder) = self.parent_state.buffer_and_metadata_builder(); let field_id = metadata_builder.upsert_field_name(key); - let field_start = buffer.offset() - self.parent_offset_base; + let field_start = buffer.offset() - self.parent_value_offset_base; if self.fields.insert(field_id, field_start).is_some() && self.validate_unique_fields { self.duplicate_fields.insert(field_id); @@ -1219,7 +1219,7 @@ impl<'a> ObjectBuilder<'a> { metadata_builder, fields: &mut self.fields, field_name: key, - parent_offset_base: self.parent_offset_base, + parent_offset_base: self.parent_value_offset_base, }; (state, validate_unique_fields) } @@ -1272,7 +1272,7 @@ impl<'a> ObjectBuilder<'a> { let parent_buffer = self.parent_state.buffer(); let current_offset = parent_buffer.offset(); // Current object starts from `object_start_offset` - let data_size = current_offset - self.parent_offset_base; + let data_size = current_offset - self.parent_value_offset_base; let offset_size = int_size(data_size); let num_fields = self.fields.len(); @@ -1283,7 +1283,7 @@ impl<'a> ObjectBuilder<'a> { (num_fields * id_size as usize) + // field IDs ((num_fields + 1) * offset_size as usize); // field offsets + data_size - let starting_offset = self.parent_offset_base; + let starting_offset = self.parent_value_offset_base; // Shift existing data to make room for the header let buffer = parent_buffer.inner_mut(); @@ -1341,7 +1341,7 @@ impl Drop for ObjectBuilder<'_> { self.parent_state .buffer() .inner_mut() - .truncate(self.parent_offset_base); + .truncate(self.parent_value_offset_base); self.parent_state .metadata_builder() From 19bb544be4b049c4fb0ad7ad0d35149655959f4e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 22 Jul 2025 06:49:01 -0400 Subject: [PATCH 11/11] revert unecessary refactor --- parquet-variant/src/builder.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 7e200ca9e78d..dc66865e68ac 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -1383,8 +1383,8 @@ impl<'a> ObjectBuilder<'a> { /// Finalizes this object and appends it to its parent, which otherwise remains unmodified. pub fn finish(mut self) -> Result<(), ArrowError> { + let metadata_builder = self.parent_state.metadata_builder(); if self.validate_unique_fields && !self.duplicate_fields.is_empty() { - let metadata_builder = self.parent_state.metadata_builder(); let mut names = self .duplicate_fields .iter() @@ -1399,8 +1399,6 @@ impl<'a> ObjectBuilder<'a> { ))); } - let metadata_builder = self.parent_state.metadata_builder(); - self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { let field_a_name = metadata_builder.field_name(field_a_id as usize); let field_b_name = metadata_builder.field_name(field_b_id as usize);