From b97efadbbccdbe0b816b99f5558ed303a138c280 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 31 Dec 2025 14:33:14 +0800 Subject: [PATCH 1/4] refactor: use new codec utils in TDigest and Frequencies sketches Signed-off-by: tison --- datasketches/src/tdigest/sketch.rs | 36 +++++++++++++++--------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/datasketches/src/tdigest/sketch.rs b/datasketches/src/tdigest/sketch.rs index c9ec382..05bdb44 100644 --- a/datasketches/src/tdigest/sketch.rs +++ b/datasketches/src/tdigest/sketch.rs @@ -19,7 +19,7 @@ use std::cmp::Ordering; use std::convert::identity; use std::num::NonZeroU64; -use crate::codec::SketchSlice; +use crate::codec::{SketchBytes, SketchSlice}; use crate::error::Error; use crate::error::ErrorKind; use crate::tdigest::serialization::*; @@ -316,16 +316,16 @@ impl TDigestMut { total_size += self.centroids.len() * (size_of::() + size_of::()); } - let mut bytes = Vec::with_capacity(total_size); - bytes.push(match self.total_weight() { + let mut bytes = SketchBytes::with_capacity(total_size); + bytes.write_u8(match self.total_weight() { 0 => PREAMBLE_LONGS_EMPTY_OR_SINGLE, 1 => PREAMBLE_LONGS_EMPTY_OR_SINGLE, _ => PREAMBLE_LONGS_MULTIPLE, }); - bytes.push(SERIAL_VERSION); - bytes.push(TDIGEST_FAMILY_ID); - bytes.extend_from_slice(&self.k.to_le_bytes()); - bytes.push({ + bytes.write_u8(SERIAL_VERSION); + bytes.write_u8(TDIGEST_FAMILY_ID); + bytes.write_u16_le(self.k); + bytes.write_u8({ let mut flags = 0; if self.is_empty() { flags |= FLAGS_IS_EMPTY; @@ -338,23 +338,23 @@ impl TDigestMut { } flags }); - bytes.extend_from_slice(&0u16.to_le_bytes()); // unused + bytes.write_u16_le(0); // unused if self.is_empty() { - return bytes; + return bytes.into_bytes(); } if self.is_single_value() { - bytes.extend_from_slice(&self.min.to_le_bytes()); - return bytes; + bytes.write_f64_le(self.min); + return bytes.into_bytes(); } - bytes.extend_from_slice(&(self.centroids.len() as u32).to_le_bytes()); - bytes.extend_from_slice(&0u32.to_le_bytes()); // unused - bytes.extend_from_slice(&self.min.to_le_bytes()); - bytes.extend_from_slice(&self.max.to_le_bytes()); + bytes.write_u32_le(self.centroids.len() as u32); + bytes.write_u32_le(0); // unused + bytes.write_f64_le(self.min); + bytes.write_f64_le(self.max); for centroid in &self.centroids { - bytes.extend_from_slice(¢roid.mean.to_le_bytes()); - bytes.extend_from_slice(¢roid.weight.get().to_le_bytes()); + bytes.write_f64_le(centroid.mean); + bytes.write_u64_le(centroid.weight.get()); } - bytes + bytes.into_bytes() } /// Deserializes a TDigest from bytes. From 4b8b1f9dfb03acebc49e94ac4546e02ae4e4639a Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 31 Dec 2025 15:09:57 +0800 Subject: [PATCH 2/4] for frequencies Signed-off-by: tison --- datasketches/src/frequencies/serde.rs | 99 +++++------ datasketches/src/frequencies/serialization.rs | 2 +- datasketches/src/frequencies/sketch.rs | 158 ++++++++++-------- datasketches/src/hll/array4.rs | 2 +- datasketches/src/hll/array6.rs | 2 +- datasketches/src/hll/array8.rs | 2 +- datasketches/src/hll/hash_set.rs | 2 +- datasketches/src/hll/list.rs | 2 +- datasketches/src/hll/serialization.rs | 2 +- datasketches/src/hll/sketch.rs | 4 +- 10 files changed, 140 insertions(+), 135 deletions(-) diff --git a/datasketches/src/frequencies/serde.rs b/datasketches/src/frequencies/serde.rs index a44e6b0..171aea0 100644 --- a/datasketches/src/frequencies/serde.rs +++ b/datasketches/src/frequencies/serde.rs @@ -17,82 +17,69 @@ //! Serialization helpers for frequent items sketches. -use std::str; - +use crate::codec::{SketchBytes, SketchSlice}; use crate::error::Error; -use crate::frequencies::serialization::read_i64_le; -use crate::frequencies::serialization::read_u32_le; -pub(crate) fn serialize_string_items(items: &[String]) -> Vec { - let total_len: usize = items.iter().map(|item| 4 + item.len()).sum(); - let mut out = Vec::with_capacity(total_len); +pub(crate) fn count_string_items_bytes(items: &[String]) -> usize { + items.iter().map(|item| 4 + item.len()).sum() +} + +pub(crate) fn serialize_string_items(bytes: &mut SketchBytes, items: &[String]) { for item in items { - let bytes = item.as_bytes(); - let len = bytes.len() as u32; - out.extend_from_slice(&len.to_le_bytes()); - out.extend_from_slice(bytes); + let bs = item.as_bytes(); + bytes.write_u32_le(bs.len() as u32); + bytes.write(bs); } - out } pub(crate) fn deserialize_string_items( - bytes: &[u8], + mut cursor: SketchSlice<'_>, num_items: usize, -) -> Result<(Vec, usize), Error> { - if num_items == 0 { - return Ok((Vec::new(), 0)); - } +) -> Result, Error> { let mut items = Vec::with_capacity(num_items); - let mut offset = 0usize; - for _ in 0..num_items { - if offset + 4 > bytes.len() { - return Err(Error::insufficient_data( - "not enough bytes for string length", - )); - } - let len = read_u32_le(bytes, offset) as usize; - offset += 4; - if offset + len > bytes.len() { - return Err(Error::insufficient_data( - "not enough bytes for string payload", - )); - } - let slice = &bytes[offset..offset + len]; - let value = match str::from_utf8(slice) { - Ok(s) => s.to_string(), - Err(_) => { - return Err(Error::deserial("invalid UTF-8 string payload")); - } - }; + for i in 0..num_items { + let len = cursor.read_u32_le().map_err(|_| { + Error::insufficient_data(format!( + "expected {num_items} string items, failed to read len at index {i}" + )) + })?; + + let mut slice = Vec::with_capacity(len as usize); + cursor.read_exact(&mut slice).map_err(|_| { + Error::insufficient_data(format!( + "expected {num_items} string items, failed to read slice at index {i}" + )) + })?; + + let value = String::from_utf8(slice) + .map_err(|_| Error::deserial(format!("invalid UTF-8 string payload at index {i}")))?; items.push(value); - offset += len; } - Ok((items, offset)) + Ok(items) } -pub(crate) fn serialize_i64_items(items: &[i64]) -> Vec { - let mut out = Vec::with_capacity(items.len() * 8); - for item in items { - out.extend_from_slice(&item.to_le_bytes()); +pub(crate) fn count_i64_items_bytes(items: &[i64]) -> usize { + items.len() * 8 +} + +pub(crate) fn serialize_i64_items(bytes: &mut SketchBytes, items: &[i64]) { + for item in items.iter().copied() { + bytes.write_i64_le(item); } - out } pub(crate) fn deserialize_i64_items( - bytes: &[u8], + mut cursor: SketchSlice<'_>, num_items: usize, -) -> Result<(Vec, usize), Error> { - let needed = num_items - .checked_mul(8) - .ok_or_else(|| Error::deserial("items size overflow"))?; - if bytes.len() < needed { - return Err(Error::insufficient_data("not enough bytes for i64 items")); - } +) -> Result, Error> { let mut items = Vec::with_capacity(num_items); for i in 0..num_items { - let offset = i * 8; - let value = read_i64_le(bytes, offset); + let value = cursor.read_i64_le().map_err(|_| { + Error::insufficient_data(format!( + "expected {num_items} i64 items, failed at index {i}" + )) + })?; items.push(value); } - Ok((items, needed)) + Ok(items) } diff --git a/datasketches/src/frequencies/serialization.rs b/datasketches/src/frequencies/serialization.rs index 43003c9..b17b1ec 100644 --- a/datasketches/src/frequencies/serialization.rs +++ b/datasketches/src/frequencies/serialization.rs @@ -20,7 +20,7 @@ /// Family ID for frequency sketches. pub const FAMILY_ID: u8 = 10; /// Serialization version. -pub const SER_VER: u8 = 1; +pub const SERIAL_VERSION: u8 = 1; /// Preamble longs for empty sketch. pub const PREAMBLE_LONGS_EMPTY: u8 = 1; diff --git a/datasketches/src/frequencies/sketch.rs b/datasketches/src/frequencies/sketch.rs index ee4552b..0efc778 100644 --- a/datasketches/src/frequencies/sketch.rs +++ b/datasketches/src/frequencies/sketch.rs @@ -17,17 +17,19 @@ //! Frequent items sketch implementations. -use std::hash::Hash; - +use crate::codec::{SketchBytes, SketchSlice}; use crate::error::Error; use crate::frequencies::reverse_purge_item_hash_map::ReversePurgeItemHashMap; -use crate::frequencies::serde::deserialize_i64_items; -use crate::frequencies::serde::deserialize_string_items; use crate::frequencies::serde::serialize_i64_items; use crate::frequencies::serde::serialize_string_items; +use crate::frequencies::serde::{count_i64_items_bytes, deserialize_string_items}; +use crate::frequencies::serde::{count_string_items_bytes, deserialize_i64_items}; use crate::frequencies::serialization::*; +use std::hash::Hash; -type DeserializeItems = fn(&[u8], usize) -> Result<(Vec, usize), Error>; +type CountSerializeSize = fn(&[T]) -> usize; +type SerializeItems = fn(&mut SketchBytes, &[T]); +type DeserializeItems = fn(SketchSlice<'_>, usize) -> Result, Error>; const LG_MIN_MAP_SIZE: u8 = 3; const SAMPLE_SIZE: usize = 1024; @@ -322,63 +324,78 @@ impl FrequentItemsSketch { } } - fn serialize_inner(&self, serialize_items: fn(&[T]) -> Vec) -> Vec + fn serialize_inner( + &self, + count_serialize_size: CountSerializeSize, + serialize_items: SerializeItems, + ) -> Vec where - T: Clone, + T: Clone, // for self.hash_map.active_keys() { if self.is_empty() { - let mut out = vec![0u8; 8]; - out[PREAMBLE_LONGS_BYTE] = PREAMBLE_LONGS_EMPTY; - out[SER_VER_BYTE] = SER_VER; - out[FAMILY_BYTE] = FAMILY_ID; - out[LG_MAX_MAP_SIZE_BYTE] = self.lg_max_map_size; - out[LG_CUR_MAP_SIZE_BYTE] = self.hash_map.lg_length(); - out[FLAGS_BYTE] = EMPTY_FLAG_MASK; - return out; + let mut bytes = SketchBytes::with_capacity(8); + bytes.write_u8(PREAMBLE_LONGS_EMPTY); + bytes.write_u8(SERIAL_VERSION); + bytes.write_u8(FAMILY_ID); + bytes.write_u8(self.lg_max_map_size); + bytes.write_u8(self.hash_map.lg_length()); + bytes.write_u8(EMPTY_FLAG_MASK); + return bytes.into_bytes(); } + let active_items = self.num_active_items(); let values = self.hash_map.active_values(); let keys = self.hash_map.active_keys(); - let items_bytes = serialize_items(&keys); let total_bytes = - PREAMBLE_LONGS_NONEMPTY as usize * 8 + (active_items * 8) + items_bytes.len(); - let mut out = vec![0u8; total_bytes]; - out[PREAMBLE_LONGS_BYTE] = PREAMBLE_LONGS_NONEMPTY; - out[SER_VER_BYTE] = SER_VER; - out[FAMILY_BYTE] = FAMILY_ID; - out[LG_MAX_MAP_SIZE_BYTE] = self.lg_max_map_size; - out[LG_CUR_MAP_SIZE_BYTE] = self.hash_map.lg_length(); - out[FLAGS_BYTE] = 0; - write_u32_le(&mut out, ACTIVE_ITEMS_INT, active_items as u32); - write_u64_le(&mut out, STREAM_WEIGHT_LONG, self.stream_weight); - write_u64_le(&mut out, OFFSET_LONG, self.offset); - - let mut offset = PREAMBLE_LONGS_NONEMPTY as usize * 8; + PREAMBLE_LONGS_NONEMPTY as usize * 8 + (active_items * 8) + count_serialize_size(&keys); + + let mut bytes = SketchBytes::with_capacity(total_bytes); + bytes.write_u8(PREAMBLE_LONGS_NONEMPTY); + bytes.write_u8(SERIAL_VERSION); + bytes.write_u8(FAMILY_ID); + bytes.write_u8(self.lg_max_map_size); + bytes.write_u8(self.hash_map.lg_length()); + bytes.write_u8(0); // flags + bytes.write_u16_le(0); // unused + + bytes.write_u32_le(active_items as u32); + bytes.write_u32_le(0); // unused + bytes.write_u64_le(self.stream_weight); + bytes.write_u64_le(self.offset); + for value in values { - write_u64_le(&mut out, offset, value); - offset += 8; + bytes.write_u64_le(value); } - out[offset..offset + items_bytes.len()].copy_from_slice(&items_bytes); - out + serialize_items(&mut bytes, &keys); + + bytes.into_bytes() } fn deserialize_inner( bytes: &[u8], deserialize_items: DeserializeItems, ) -> Result { - if bytes.len() < 8 { - return Err(Error::insufficient_data("preamble")); + fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) -> Error { + move |_| Error::insufficient_data(tag) } - let pre_longs = bytes[PREAMBLE_LONGS_BYTE] & 0x3f; - let ser_ver = bytes[SER_VER_BYTE]; - let family = bytes[FAMILY_BYTE]; - let lg_max = bytes[LG_MAX_MAP_SIZE_BYTE]; - let lg_cur = bytes[LG_CUR_MAP_SIZE_BYTE]; - let flags = bytes[FLAGS_BYTE]; - let is_empty = (flags & EMPTY_FLAG_MASK) != 0; - if ser_ver != SER_VER { - return Err(Error::unsupported_serial_version(SER_VER, ser_ver)); + + let mut cursor = SketchSlice::new(bytes); + let pre_longs = cursor.read_u8().map_err(make_error("pre_longs"))?; + let pre_longs = pre_longs & 0x3F; + let serial_version = cursor.read_u8().map_err(make_error("serial_version"))?; + let family = cursor.read_u8().map_err(make_error("family"))?; + let lg_max = cursor.read_u8().map_err(make_error("lg_max_map_size"))?; + let lg_cur = cursor.read_u8().map_err(make_error("lg_cur_map_size"))?; + let flags = cursor.read_u8().map_err(make_error("flags"))?; + cursor.read_u16_le().map_err(make_error(""))?; + + if serial_version != SERIAL_VERSION { + return Err(Error::unsupported_serial_version( + SERIAL_VERSION, + serial_version, + )); } + if family != FAMILY_ID { return Err(Error::invalid_family( FAMILY_ID, @@ -386,51 +403,52 @@ impl FrequentItemsSketch { "FrequentItemsSketch", )); } + if lg_cur > lg_max { return Err(Error::deserial("lg_cur_map_size exceeds lg_max_map_size")); } + + let is_empty = (flags & EMPTY_FLAG_MASK) != 0; if is_empty { - if pre_longs != PREAMBLE_LONGS_EMPTY { - return Err(Error::invalid_preamble_longs( + return if pre_longs != PREAMBLE_LONGS_EMPTY { + Err(Error::invalid_preamble_longs( PREAMBLE_LONGS_EMPTY, pre_longs, - )); - } - return Ok(Self::with_lg_map_sizes(lg_max, lg_cur)); + )) + } else { + Ok(Self::with_lg_map_sizes(lg_max, lg_cur)) + }; } + if pre_longs != PREAMBLE_LONGS_NONEMPTY { return Err(Error::invalid_preamble_longs( PREAMBLE_LONGS_NONEMPTY, pre_longs, )); } - if bytes.len() < PREAMBLE_LONGS_NONEMPTY as usize * 8 { - return Err(Error::insufficient_data("full preamble")); - } - let active_items = read_u32_le(bytes, ACTIVE_ITEMS_INT) as usize; - let stream_weight = read_u64_le(bytes, STREAM_WEIGHT_LONG); - let offset_val = read_u64_le(bytes, OFFSET_LONG); - let values_offset = PREAMBLE_LONGS_NONEMPTY as usize * 8; - let values_bytes = active_items - .checked_mul(8) - .ok_or_else(|| Error::deserial("values size overflow"))?; - let items_offset = values_offset + values_bytes; - if bytes.len() < items_offset { - return Err(Error::insufficient_data("values")); - } + + let active_items = cursor.read_u32_le().map_err(make_error("active_items"))?; + let active_items = active_items as usize; + cursor.read_u32_le().map_err(make_error(""))?; + let stream_weight = cursor.read_u64_le().map_err(make_error("stream_weight"))?; + let offset_val = cursor.read_u64_le().map_err(make_error("offset"))?; + let mut values = Vec::with_capacity(active_items); for i in 0..active_items { - values.push(read_u64_le(bytes, values_offset + i * 8)); + values.push(cursor.read_u64_le().map_err(|_| { + Error::insufficient_data(format!( + "expected {active_items} weights, failed at index {i}" + )) + })?); } - let (items, consumed) = deserialize_items(&bytes[items_offset..], active_items)?; + + let items = deserialize_items(cursor, active_items)?; if items.len() != active_items { return Err(Error::deserial( "item count mismatch during deserialization", )); } - if consumed > bytes.len() - items_offset { - return Err(Error::insufficient_data("items")); - } + let mut sketch = Self::with_lg_map_sizes(lg_max, lg_cur); for (item, value) in items.into_iter().zip(values) { sketch.update_with_count(item, value); @@ -444,7 +462,7 @@ impl FrequentItemsSketch { impl FrequentItemsSketch { /// Serializes this sketch into a byte vector. pub fn serialize(&self) -> Vec { - self.serialize_inner(serialize_i64_items) + self.serialize_inner(count_i64_items_bytes, serialize_i64_items) } /// Deserializes a sketch from bytes. @@ -456,7 +474,7 @@ impl FrequentItemsSketch { impl FrequentItemsSketch { /// Serializes this sketch into a byte vector. pub fn serialize(&self) -> Vec { - self.serialize_inner(serialize_string_items) + self.serialize_inner(count_string_items_bytes, serialize_string_items) } /// Deserializes a sketch from bytes. diff --git a/datasketches/src/hll/array4.rs b/datasketches/src/hll/array4.rs index 3944a41..5bf809f 100644 --- a/datasketches/src/hll/array4.rs +++ b/datasketches/src/hll/array4.rs @@ -375,7 +375,7 @@ impl Array4 { // Write standard header bytes.write_u8(HLL_PREINTS); - bytes.write_u8(SERIAL_VER); + bytes.write_u8(SERIAL_VERSION); bytes.write_u8(HLL_FAMILY_ID); bytes.write_u8(lg_config_k); bytes.write_u8(0); // unused for HLL mode diff --git a/datasketches/src/hll/array6.rs b/datasketches/src/hll/array6.rs index f247a67..223424d 100644 --- a/datasketches/src/hll/array6.rs +++ b/datasketches/src/hll/array6.rs @@ -228,7 +228,7 @@ impl Array6 { // Write standard header bytes.write_u8(HLL_PREINTS); - bytes.write_u8(SERIAL_VER); + bytes.write_u8(SERIAL_VERSION); bytes.write_u8(HLL_FAMILY_ID); bytes.write_u8(lg_config_k); bytes.write_u8(0); // unused for HLL mode diff --git a/datasketches/src/hll/array8.rs b/datasketches/src/hll/array8.rs index 3ac1f0c..9f0f849 100644 --- a/datasketches/src/hll/array8.rs +++ b/datasketches/src/hll/array8.rs @@ -300,7 +300,7 @@ impl Array8 { // Write standard header bytes.write_u8(HLL_PREINTS); - bytes.write_u8(SERIAL_VER); + bytes.write_u8(SERIAL_VERSION); bytes.write_u8(HLL_FAMILY_ID); bytes.write_u8(lg_config_k); bytes.write_u8(0); // unused for HLL mode diff --git a/datasketches/src/hll/hash_set.rs b/datasketches/src/hll/hash_set.rs index 1a31031..874d3a4 100644 --- a/datasketches/src/hll/hash_set.rs +++ b/datasketches/src/hll/hash_set.rs @@ -148,7 +148,7 @@ impl HashSet { // Write preamble bytes.write_u8(HASH_SET_PREINTS); - bytes.write_u8(SERIAL_VER); + bytes.write_u8(SERIAL_VERSION); bytes.write_u8(HLL_FAMILY_ID); bytes.write_u8(lg_config_k); bytes.write_u8(lg_arr as u8); diff --git a/datasketches/src/hll/list.rs b/datasketches/src/hll/list.rs index 2fa9173..1abf699 100644 --- a/datasketches/src/hll/list.rs +++ b/datasketches/src/hll/list.rs @@ -110,7 +110,7 @@ impl List { // Write preamble bytes.write_u8(LIST_PREINTS); - bytes.write_u8(SERIAL_VER); + bytes.write_u8(SERIAL_VERSION); bytes.write_u8(HLL_FAMILY_ID); bytes.write_u8(lg_config_k); bytes.write_u8(lg_arr as u8); diff --git a/datasketches/src/hll/serialization.rs b/datasketches/src/hll/serialization.rs index 30f034f..5fdb2b3 100644 --- a/datasketches/src/hll/serialization.rs +++ b/datasketches/src/hll/serialization.rs @@ -24,7 +24,7 @@ pub const HLL_FAMILY_ID: u8 = 7; /// Current serialization version -pub const SERIAL_VER: u8 = 1; +pub const SERIAL_VERSION: u8 = 1; /// Flag indicating sketch is empty (no values inserted) pub const EMPTY_FLAG_MASK: u8 = 4; diff --git a/datasketches/src/hll/sketch.rs b/datasketches/src/hll/sketch.rs index ef76647..64626cd 100644 --- a/datasketches/src/hll/sketch.rs +++ b/datasketches/src/hll/sketch.rs @@ -241,9 +241,9 @@ impl HllSketch { } // Verify serialization version - if serial_version != SERIAL_VER { + if serial_version != SERIAL_VERSION { return Err(Error::unsupported_serial_version( - SERIAL_VER, + SERIAL_VERSION, serial_version, )); } From 45c0c2cb5730e32358fa79350a44e65f1a55978f Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 31 Dec 2025 15:18:25 +0800 Subject: [PATCH 3/4] fixup Signed-off-by: tison --- datasketches/src/frequencies/serde.rs | 5 +- datasketches/src/frequencies/serialization.rs | 73 ------------------- datasketches/src/frequencies/sketch.rs | 12 ++- datasketches/src/tdigest/sketch.rs | 3 +- 4 files changed, 13 insertions(+), 80 deletions(-) diff --git a/datasketches/src/frequencies/serde.rs b/datasketches/src/frequencies/serde.rs index 171aea0..83c6d54 100644 --- a/datasketches/src/frequencies/serde.rs +++ b/datasketches/src/frequencies/serde.rs @@ -17,7 +17,8 @@ //! Serialization helpers for frequent items sketches. -use crate::codec::{SketchBytes, SketchSlice}; +use crate::codec::SketchBytes; +use crate::codec::SketchSlice; use crate::error::Error; pub(crate) fn count_string_items_bytes(items: &[String]) -> usize { @@ -44,7 +45,7 @@ pub(crate) fn deserialize_string_items( )) })?; - let mut slice = Vec::with_capacity(len as usize); + let mut slice = vec![0; len as usize]; cursor.read_exact(&mut slice).map_err(|_| { Error::insufficient_data(format!( "expected {num_items} string items, failed to read slice at index {i}" diff --git a/datasketches/src/frequencies/serialization.rs b/datasketches/src/frequencies/serialization.rs index b17b1ec..c629f5a 100644 --- a/datasketches/src/frequencies/serialization.rs +++ b/datasketches/src/frequencies/serialization.rs @@ -29,76 +29,3 @@ pub const PREAMBLE_LONGS_NONEMPTY: u8 = 4; /// Empty flag mask (both bits for compatibility). pub const EMPTY_FLAG_MASK: u8 = 5; - -/// Offset of preamble longs byte. -pub const PREAMBLE_LONGS_BYTE: usize = 0; -/// Offset of serialization version byte. -pub const SER_VER_BYTE: usize = 1; -/// Offset of family ID byte. -pub const FAMILY_BYTE: usize = 2; -/// Offset of lg_max_map_size byte. -pub const LG_MAX_MAP_SIZE_BYTE: usize = 3; -/// Offset of lg_cur_map_size byte. -pub const LG_CUR_MAP_SIZE_BYTE: usize = 4; -/// Offset of flags byte. -pub const FLAGS_BYTE: usize = 5; - -/// Offset of active items int (low 32 bits of second pre-long). -pub const ACTIVE_ITEMS_INT: usize = 8; -/// Offset of stream weight (third pre-long). -pub const STREAM_WEIGHT_LONG: usize = 16; -/// Offset of offset (fourth pre-long). -pub const OFFSET_LONG: usize = 24; - -/// Read an u32 value from bytes at the given offset (little-endian). -#[inline] -pub fn read_u32_le(bytes: &[u8], offset: usize) -> u32 { - u32::from_le_bytes([ - bytes[offset], - bytes[offset + 1], - bytes[offset + 2], - bytes[offset + 3], - ]) -} - -/// Read an i64 value from bytes at the given offset (little-endian). -#[inline] -pub fn read_i64_le(bytes: &[u8], offset: usize) -> i64 { - i64::from_le_bytes([ - bytes[offset], - bytes[offset + 1], - bytes[offset + 2], - bytes[offset + 3], - bytes[offset + 4], - bytes[offset + 5], - bytes[offset + 6], - bytes[offset + 7], - ]) -} - -/// Read an u64 value from bytes at the given offset (little-endian). -#[inline] -pub fn read_u64_le(bytes: &[u8], offset: usize) -> u64 { - u64::from_le_bytes([ - bytes[offset], - bytes[offset + 1], - bytes[offset + 2], - bytes[offset + 3], - bytes[offset + 4], - bytes[offset + 5], - bytes[offset + 6], - bytes[offset + 7], - ]) -} - -/// Write a u32 value to bytes at the given offset (little-endian). -#[inline] -pub fn write_u32_le(bytes: &mut [u8], offset: usize, value: u32) { - bytes[offset..offset + 4].copy_from_slice(&value.to_le_bytes()); -} - -/// Write an u64 value to bytes at the given offset (little-endian). -#[inline] -pub fn write_u64_le(bytes: &mut [u8], offset: usize, value: u64) { - bytes[offset..offset + 8].copy_from_slice(&value.to_le_bytes()); -} diff --git a/datasketches/src/frequencies/sketch.rs b/datasketches/src/frequencies/sketch.rs index 0efc778..e4900d6 100644 --- a/datasketches/src/frequencies/sketch.rs +++ b/datasketches/src/frequencies/sketch.rs @@ -17,15 +17,19 @@ //! Frequent items sketch implementations. -use crate::codec::{SketchBytes, SketchSlice}; +use std::hash::Hash; + +use crate::codec::SketchBytes; +use crate::codec::SketchSlice; use crate::error::Error; use crate::frequencies::reverse_purge_item_hash_map::ReversePurgeItemHashMap; +use crate::frequencies::serde::count_i64_items_bytes; +use crate::frequencies::serde::count_string_items_bytes; +use crate::frequencies::serde::deserialize_i64_items; +use crate::frequencies::serde::deserialize_string_items; use crate::frequencies::serde::serialize_i64_items; use crate::frequencies::serde::serialize_string_items; -use crate::frequencies::serde::{count_i64_items_bytes, deserialize_string_items}; -use crate::frequencies::serde::{count_string_items_bytes, deserialize_i64_items}; use crate::frequencies::serialization::*; -use std::hash::Hash; type CountSerializeSize = fn(&[T]) -> usize; type SerializeItems = fn(&mut SketchBytes, &[T]); diff --git a/datasketches/src/tdigest/sketch.rs b/datasketches/src/tdigest/sketch.rs index 05bdb44..ddf440f 100644 --- a/datasketches/src/tdigest/sketch.rs +++ b/datasketches/src/tdigest/sketch.rs @@ -19,7 +19,8 @@ use std::cmp::Ordering; use std::convert::identity; use std::num::NonZeroU64; -use crate::codec::{SketchBytes, SketchSlice}; +use crate::codec::SketchBytes; +use crate::codec::SketchSlice; use crate::error::Error; use crate::error::ErrorKind; use crate::tdigest::serialization::*; From 6e10e39580715c68a871370f9908d31003198790 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 31 Dec 2025 15:33:00 +0800 Subject: [PATCH 4/4] simplify code Signed-off-by: tison --- datasketches/src/frequencies/mod.rs | 1 - datasketches/src/frequencies/serde.rs | 86 ------------------- datasketches/src/frequencies/serialization.rs | 68 ++++++++++++++- datasketches/src/frequencies/sketch.rs | 6 -- 4 files changed, 67 insertions(+), 94 deletions(-) delete mode 100644 datasketches/src/frequencies/serde.rs diff --git a/datasketches/src/frequencies/mod.rs b/datasketches/src/frequencies/mod.rs index d04343a..e461b61 100644 --- a/datasketches/src/frequencies/mod.rs +++ b/datasketches/src/frequencies/mod.rs @@ -25,7 +25,6 @@ //! mod reverse_purge_item_hash_map; -mod serde; mod serialization; mod sketch; diff --git a/datasketches/src/frequencies/serde.rs b/datasketches/src/frequencies/serde.rs deleted file mode 100644 index 83c6d54..0000000 --- a/datasketches/src/frequencies/serde.rs +++ /dev/null @@ -1,86 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Serialization helpers for frequent items sketches. - -use crate::codec::SketchBytes; -use crate::codec::SketchSlice; -use crate::error::Error; - -pub(crate) fn count_string_items_bytes(items: &[String]) -> usize { - items.iter().map(|item| 4 + item.len()).sum() -} - -pub(crate) fn serialize_string_items(bytes: &mut SketchBytes, items: &[String]) { - for item in items { - let bs = item.as_bytes(); - bytes.write_u32_le(bs.len() as u32); - bytes.write(bs); - } -} - -pub(crate) fn deserialize_string_items( - mut cursor: SketchSlice<'_>, - num_items: usize, -) -> Result, Error> { - let mut items = Vec::with_capacity(num_items); - for i in 0..num_items { - let len = cursor.read_u32_le().map_err(|_| { - Error::insufficient_data(format!( - "expected {num_items} string items, failed to read len at index {i}" - )) - })?; - - let mut slice = vec![0; len as usize]; - cursor.read_exact(&mut slice).map_err(|_| { - Error::insufficient_data(format!( - "expected {num_items} string items, failed to read slice at index {i}" - )) - })?; - - let value = String::from_utf8(slice) - .map_err(|_| Error::deserial(format!("invalid UTF-8 string payload at index {i}")))?; - items.push(value); - } - Ok(items) -} - -pub(crate) fn count_i64_items_bytes(items: &[i64]) -> usize { - items.len() * 8 -} - -pub(crate) fn serialize_i64_items(bytes: &mut SketchBytes, items: &[i64]) { - for item in items.iter().copied() { - bytes.write_i64_le(item); - } -} - -pub(crate) fn deserialize_i64_items( - mut cursor: SketchSlice<'_>, - num_items: usize, -) -> Result, Error> { - let mut items = Vec::with_capacity(num_items); - for i in 0..num_items { - let value = cursor.read_i64_le().map_err(|_| { - Error::insufficient_data(format!( - "expected {num_items} i64 items, failed at index {i}" - )) - })?; - items.push(value); - } - Ok(items) -} diff --git a/datasketches/src/frequencies/serialization.rs b/datasketches/src/frequencies/serialization.rs index c629f5a..3f8600b 100644 --- a/datasketches/src/frequencies/serialization.rs +++ b/datasketches/src/frequencies/serialization.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -//! Serialization constants and helpers for frequency sketches. +use crate::codec::SketchBytes; +use crate::codec::SketchSlice; +use crate::error::Error; /// Family ID for frequency sketches. pub const FAMILY_ID: u8 = 10; @@ -29,3 +31,67 @@ pub const PREAMBLE_LONGS_NONEMPTY: u8 = 4; /// Empty flag mask (both bits for compatibility). pub const EMPTY_FLAG_MASK: u8 = 5; + +pub(crate) fn count_string_items_bytes(items: &[String]) -> usize { + items.iter().map(|item| 4 + item.len()).sum() +} + +pub(crate) fn serialize_string_items(bytes: &mut SketchBytes, items: &[String]) { + for item in items { + let bs = item.as_bytes(); + bytes.write_u32_le(bs.len() as u32); + bytes.write(bs); + } +} + +pub(crate) fn deserialize_string_items( + mut cursor: SketchSlice<'_>, + num_items: usize, +) -> Result, Error> { + let mut items = Vec::with_capacity(num_items); + for i in 0..num_items { + let len = cursor.read_u32_le().map_err(|_| { + Error::insufficient_data(format!( + "expected {num_items} string items, failed to read len at index {i}" + )) + })?; + + let mut slice = vec![0; len as usize]; + cursor.read_exact(&mut slice).map_err(|_| { + Error::insufficient_data(format!( + "expected {num_items} string items, failed to read slice at index {i}" + )) + })?; + + let value = String::from_utf8(slice) + .map_err(|_| Error::deserial(format!("invalid UTF-8 string payload at index {i}")))?; + items.push(value); + } + Ok(items) +} + +pub(crate) fn count_i64_items_bytes(items: &[i64]) -> usize { + items.len() * 8 +} + +pub(crate) fn serialize_i64_items(bytes: &mut SketchBytes, items: &[i64]) { + for item in items.iter().copied() { + bytes.write_i64_le(item); + } +} + +pub(crate) fn deserialize_i64_items( + mut cursor: SketchSlice<'_>, + num_items: usize, +) -> Result, Error> { + let mut items = Vec::with_capacity(num_items); + for i in 0..num_items { + let value = cursor.read_i64_le().map_err(|_| { + Error::insufficient_data(format!( + "expected {num_items} i64 items, failed at index {i}" + )) + })?; + items.push(value); + } + Ok(items) +} diff --git a/datasketches/src/frequencies/sketch.rs b/datasketches/src/frequencies/sketch.rs index e4900d6..28f3325 100644 --- a/datasketches/src/frequencies/sketch.rs +++ b/datasketches/src/frequencies/sketch.rs @@ -23,12 +23,6 @@ use crate::codec::SketchBytes; use crate::codec::SketchSlice; use crate::error::Error; use crate::frequencies::reverse_purge_item_hash_map::ReversePurgeItemHashMap; -use crate::frequencies::serde::count_i64_items_bytes; -use crate::frequencies::serde::count_string_items_bytes; -use crate::frequencies::serde::deserialize_i64_items; -use crate::frequencies::serde::deserialize_string_items; -use crate::frequencies::serde::serialize_i64_items; -use crate::frequencies::serde::serialize_string_items; use crate::frequencies::serialization::*; type CountSerializeSize = fn(&[T]) -> usize;