From 1cf64628a6515d1f8d4312bda49c3fafa9c9cae6 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 30 Dec 2025 20:38:50 +0800 Subject: [PATCH 1/5] refactor: replace byteorder with SketchBytes and SketchSlice Signed-off-by: tison --- Cargo.lock | 22 +-- Cargo.toml | 2 - datasketches/Cargo.toml | 2 - datasketches/src/codec.rs | 239 ++++++++++++++++++++++++++++ datasketches/src/countmin/sketch.rs | 57 +++---- datasketches/src/hash/mod.rs | 11 ++ datasketches/src/hash/murmurhash.rs | 20 +-- datasketches/src/hash/xxhash.rs | 15 +- datasketches/src/lib.rs | 1 + datasketches/src/tdigest/sketch.rs | 96 +++++------ 10 files changed, 331 insertions(+), 134 deletions(-) create mode 100644 datasketches/src/codec.rs diff --git a/Cargo.lock b/Cargo.lock index 66212a0..f2aefc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,12 +61,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "anyhow" -version = "1.0.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" - [[package]] name = "autocfg" version = "1.5.0" @@ -79,12 +73,6 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" -[[package]] -name = "byteorder" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" - [[package]] name = "clap" version = "4.5.53" @@ -135,8 +123,6 @@ checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" name = "datasketches" version = "0.1.0" dependencies = [ - "anyhow", - "byteorder", "googletest", ] @@ -233,9 +219,9 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "proc-macro2" -version = "1.0.103" +version = "1.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" +checksum = "9695f8df41bb4f3d222c95a67532365f569318332d03d5f3f67f37b20e6ebdf0" dependencies = [ "unicode-ident", ] @@ -280,9 +266,9 @@ checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" [[package]] name = "rustix" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" +checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" dependencies = [ "bitflags", "errno", diff --git a/Cargo.toml b/Cargo.toml index 4c93ee2..30c7a35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,8 +32,6 @@ rust-version = "1.85.0" datasketches = { path = "datasketches" } # Crates.io dependencies -anyhow = { version = "1.0.100" } -byteorder = { version = "1.5.0" } clap = { version = "4.5.20", features = ["derive"] } googletest = { version = "0.14.2" } which = { version = "8.0.0" } diff --git a/datasketches/Cargo.toml b/datasketches/Cargo.toml index 8a150ff..7d70103 100644 --- a/datasketches/Cargo.toml +++ b/datasketches/Cargo.toml @@ -35,8 +35,6 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [dependencies] -anyhow = { workspace = true } -byteorder = { workspace = true } [dev-dependencies] googletest = { workspace = true } diff --git a/datasketches/src/codec.rs b/datasketches/src/codec.rs new file mode 100644 index 0000000..94f2439 --- /dev/null +++ b/datasketches/src/codec.rs @@ -0,0 +1,239 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io; +use std::io::{Cursor, Read}; + +pub(crate) struct SketchBytes { + bytes: Vec, +} + +impl SketchBytes { + pub fn new() -> Self { + Self { bytes: vec![] } + } + + pub fn with_capacity(capacity: usize) -> Self { + Self { + bytes: Vec::with_capacity(capacity), + } + } + + pub fn into_bytes(self) -> Vec { + self.bytes + } + + pub fn write(&mut self, buf: &[u8]) { + self.bytes.extend_from_slice(buf); + } + + pub fn write_u8(&mut self, n: u8) { + self.bytes.push(n); + } + + pub fn write_i8(&mut self, n: i8) { + self.bytes.push(n as u8); + } + + pub fn write_u16_le(&mut self, n: u16) { + self.write(&n.to_le_bytes()); + } + + pub fn write_u16_be(&mut self, n: u16) { + self.write(&n.to_be_bytes()); + } + + pub fn write_i16_le(&mut self, n: i16) { + self.write(&n.to_le_bytes()); + } + + pub fn write_i16_be(&mut self, n: i16) { + self.write(&n.to_be_bytes()); + } + + pub fn write_u32_le(&mut self, n: u32) { + self.write(&n.to_le_bytes()); + } + + pub fn write_u32_be(&mut self, n: u32) { + self.write(&n.to_be_bytes()); + } + + pub fn write_i32_le(&mut self, n: i32) { + self.write(&n.to_le_bytes()); + } + + pub fn write_i32_be(&mut self, n: i32) { + self.write(&n.to_be_bytes()); + } + + pub fn write_u64_le(&mut self, n: u64) { + self.write(&n.to_le_bytes()); + } + + pub fn write_u64_be(&mut self, n: u64) { + self.write(&n.to_be_bytes()); + } + + pub fn write_i64_le(&mut self, n: i64) { + self.write(&n.to_le_bytes()); + } + + pub fn write_i64_be(&mut self, n: i64) { + self.write(&n.to_be_bytes()); + } + + pub fn write_f32_le(&mut self, n: f32) { + self.write(&n.to_le_bytes()); + } + + pub fn write_f32_be(&mut self, n: f32) { + self.write(&n.to_be_bytes()); + } + + pub fn write_f64_le(&mut self, n: f64) { + self.write(&n.to_le_bytes()); + } + + pub fn write_f64_be(&mut self, n: f64) { + self.write(&n.to_be_bytes()); + } +} + +pub(crate) struct SketchSlice<'a> { + slice: Cursor<&'a [u8]>, +} + +impl SketchSlice<'_> { + pub fn new(slice: &[u8]) -> SketchSlice { + SketchSlice { + slice: Cursor::new(slice), + } + } + + pub fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> { + self.slice.read_exact(buf) + } + + pub fn read_u8(&mut self) -> io::Result { + let mut buf = [0u8; 1]; + self.read_exact(&mut buf)?; + Ok(buf[0]) + } + + pub fn read_i8(&mut self) -> io::Result { + let mut buf = [0u8; 1]; + self.read_exact(&mut buf)?; + Ok(buf[0] as i8) + } + + pub fn read_u16_le(&mut self) -> io::Result { + let mut buf = [0u8; 2]; + self.read_exact(&mut buf)?; + Ok(u16::from_le_bytes(buf)) + } + + pub fn read_u16_be(&mut self) -> io::Result { + let mut buf = [0u8; 2]; + self.read_exact(&mut buf)?; + Ok(u16::from_be_bytes(buf)) + } + + pub fn read_i16_le(&mut self) -> io::Result { + let mut buf = [0u8; 2]; + self.read_exact(&mut buf)?; + Ok(i16::from_le_bytes(buf)) + } + + pub fn read_i16_be(&mut self) -> io::Result { + let mut buf = [0u8; 2]; + self.read_exact(&mut buf)?; + Ok(i16::from_be_bytes(buf)) + } + + pub fn read_u32_le(&mut self) -> io::Result { + let mut buf = [0u8; 4]; + self.read_exact(&mut buf)?; + Ok(u32::from_le_bytes(buf)) + } + + pub fn read_u32_be(&mut self) -> io::Result { + let mut buf = [0u8; 4]; + self.read_exact(&mut buf)?; + Ok(u32::from_be_bytes(buf)) + } + + pub fn read_i32_le(&mut self) -> io::Result { + let mut buf = [0u8; 4]; + self.read_exact(&mut buf)?; + Ok(i32::from_le_bytes(buf)) + } + + pub fn read_i32_be(&mut self) -> io::Result { + let mut buf = [0u8; 4]; + self.read_exact(&mut buf)?; + Ok(i32::from_be_bytes(buf)) + } + + pub fn read_u64_le(&mut self) -> io::Result { + let mut buf = [0u8; 8]; + self.read_exact(&mut buf)?; + Ok(u64::from_le_bytes(buf)) + } + + pub fn read_u64_be(&mut self) -> io::Result { + let mut buf = [0u8; 8]; + self.read_exact(&mut buf)?; + Ok(u64::from_be_bytes(buf)) + } + + pub fn read_i64_le(&mut self) -> io::Result { + let mut buf = [0u8; 8]; + self.read_exact(&mut buf)?; + Ok(i64::from_le_bytes(buf)) + } + + pub fn read_i64_be(&mut self) -> io::Result { + let mut buf = [0u8; 8]; + self.read_exact(&mut buf)?; + Ok(i64::from_be_bytes(buf)) + } + + pub fn read_f32_le(&mut self) -> io::Result { + let mut buf = [0u8; 4]; + self.read_exact(&mut buf)?; + Ok(f32::from_le_bytes(buf)) + } + + pub fn read_f32_be(&mut self) -> io::Result { + let mut buf = [0u8; 4]; + self.read_exact(&mut buf)?; + Ok(f32::from_be_bytes(buf)) + } + + pub fn read_f64_le(&mut self) -> io::Result { + let mut buf = [0u8; 8]; + self.read_exact(&mut buf)?; + Ok(f64::from_le_bytes(buf)) + } + + pub fn read_f64_be(&mut self) -> io::Result { + let mut buf = [0u8; 8]; + self.read_exact(&mut buf)?; + Ok(f64::from_be_bytes(buf)) + } +} diff --git a/datasketches/src/countmin/sketch.rs b/datasketches/src/countmin/sketch.rs index 3df99f3..dae4ad4 100644 --- a/datasketches/src/countmin/sketch.rs +++ b/datasketches/src/countmin/sketch.rs @@ -15,14 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::hash::Hash; -use std::hash::Hasher; -use std::io::Cursor; -use std::mem::size_of; - -use byteorder::LE; -use byteorder::ReadBytesExt; - +use crate::codec::{SketchBytes, SketchSlice}; use crate::countmin::serialization::COUNTMIN_FAMILY_ID; use crate::countmin::serialization::FLAGS_IS_EMPTY; use crate::countmin::serialization::LONG_SIZE_BYTES; @@ -33,6 +26,10 @@ use crate::error::Error; use crate::hash::DEFAULT_UPDATE_SEED; use crate::hash::MurmurHash3X64128; +use std::hash::Hash; +use std::hash::Hasher; +use std::mem::size_of; + const MAX_TABLE_ENTRIES: usize = 1 << 30; /// Count-Min sketch for estimating item frequencies. @@ -204,28 +201,28 @@ impl CountMinSketch { } else { LONG_SIZE_BYTES + (self.counts.len() * size_of::()) }; - let mut bytes = Vec::with_capacity(header_size + payload_size); + let mut bytes = SketchBytes::with_capacity(header_size + payload_size); - bytes.push(PREAMBLE_LONGS_SHORT); - bytes.push(SERIAL_VERSION); - bytes.push(COUNTMIN_FAMILY_ID); - bytes.push(if self.is_empty() { FLAGS_IS_EMPTY } else { 0 }); - bytes.extend_from_slice(&0u32.to_le_bytes()); + bytes.write_u8(PREAMBLE_LONGS_SHORT); + bytes.write_u8(SERIAL_VERSION); + bytes.write_u8(COUNTMIN_FAMILY_ID); + bytes.write_u8(if self.is_empty() { FLAGS_IS_EMPTY } else { 0 }); + bytes.write_u32_le(0); // unused - bytes.extend_from_slice(&self.num_buckets.to_le_bytes()); - bytes.push(self.num_hashes); - bytes.extend_from_slice(&compute_seed_hash(self.seed).to_le_bytes()); - bytes.push(0u8); + bytes.write_u32_le(self.num_buckets); + bytes.write_u8(self.num_hashes); + bytes.write_u16_le(compute_seed_hash(self.seed)); + bytes.write_u8(0); if self.is_empty() { - return bytes; + return bytes.into_bytes(); } - bytes.extend_from_slice(&self.total_weight.to_le_bytes()); - for count in &self.counts { - bytes.extend_from_slice(&count.to_le_bytes()); + bytes.write_i64_le(self.total_weight); + for count in self.counts.iter().copied() { + bytes.write_i64_le(count); } - bytes + bytes.into_bytes() } /// Deserializes a sketch from bytes using the default seed. @@ -239,12 +236,12 @@ impl CountMinSketch { move |_| Error::insufficient_data(tag) } - let mut cursor = Cursor::new(bytes); + let mut cursor = SketchSlice::new(bytes); let preamble_longs = cursor.read_u8().map_err(make_error("preamble_longs"))?; let serial_version = cursor.read_u8().map_err(make_error("serial_version"))?; let family_id = cursor.read_u8().map_err(make_error("family_id"))?; let flags = cursor.read_u8().map_err(make_error("flags"))?; - cursor.read_u32::().map_err(make_error("unused32"))?; + cursor.read_u32_le().map_err(make_error(""))?; if family_id != COUNTMIN_FAMILY_ID { return Err(Error::invalid_family( @@ -266,9 +263,9 @@ impl CountMinSketch { )); } - let num_buckets = cursor.read_u32::().map_err(make_error("num_buckets"))?; + let num_buckets = cursor.read_u32_le().map_err(make_error("num_buckets"))?; let num_hashes = cursor.read_u8().map_err(make_error("num_hashes"))?; - let seed_hash = cursor.read_u16::().map_err(make_error("seed_hash"))?; + let seed_hash = cursor.read_u16_le().map_err(make_error("seed_hash"))?; cursor.read_u8().map_err(make_error("unused8"))?; let expected_seed_hash = compute_seed_hash(seed); @@ -284,11 +281,9 @@ impl CountMinSketch { return Ok(sketch); } - sketch.total_weight = cursor - .read_i64::() - .map_err(make_error("total_weight"))?; + sketch.total_weight = cursor.read_i64_le().map_err(make_error("total_weight"))?; for count in sketch.counts.iter_mut() { - *count = cursor.read_i64::().map_err(make_error("counts"))?; + *count = cursor.read_i64_le().map_err(make_error("counts"))?; } Ok(sketch) } diff --git a/datasketches/src/hash/mod.rs b/datasketches/src/hash/mod.rs index a094f39..d9df24a 100644 --- a/datasketches/src/hash/mod.rs +++ b/datasketches/src/hash/mod.rs @@ -36,3 +36,14 @@ pub(crate) use self::xxhash::XxHash64; /// original source key value and the hashed bit string would be violated. Once you have developed /// a history of stored sketches you are stuck with it. pub(crate) const DEFAULT_UPDATE_SEED: u64 = 9001; + +/// Reads an u64 from a byte slice in little-endian order. +/// +/// # Panics +/// +/// Panics if `bytes.len()` is greater than 8. +fn read_u64_le(bytes: &[u8]) -> u64 { + let mut buf = [0u8; 8]; + buf[..bytes.len()].copy_from_slice(bytes); + u64::from_le_bytes(buf) +} diff --git a/datasketches/src/hash/murmurhash.rs b/datasketches/src/hash/murmurhash.rs index e275e36..a99f5eb 100644 --- a/datasketches/src/hash/murmurhash.rs +++ b/datasketches/src/hash/murmurhash.rs @@ -17,9 +17,6 @@ use std::hash::Hasher; -use byteorder::ByteOrder; -use byteorder::LE; - use crate::hash::DEFAULT_UPDATE_SEED; const C1: u64 = 0x87c37b91114253d5; @@ -58,11 +55,8 @@ impl MurmurHash3X64128 { if rem > 0 { if rem > 8 { // read k2 little endian - let mut buf = [0u8; 8]; - let k2_len = rem - 8; - buf[..k2_len].copy_from_slice(&self.buf[8..rem]); + let mut k2 = super::read_u64_le(&self.buf[8..rem]); // mix k2 - let mut k2 = u64::from_le_bytes(buf); k2 = k2.wrapping_mul(C2); k2 = k2.rotate_left(33); k2 = k2.wrapping_mul(C1); @@ -70,11 +64,9 @@ impl MurmurHash3X64128 { } // read k1 little endian - let mut buf = [0u8; 8]; let k1_len = rem.min(8); - buf[..k1_len].copy_from_slice(&self.buf[..k1_len]); + let mut k1 = super::read_u64_le(&self.buf[..k1_len]); // mix k1 - let mut k1 = u64::from_le_bytes(buf); k1 = k1.wrapping_mul(C1); k1 = k1.rotate_left(31); k1 = k1.wrapping_mul(C2); @@ -143,8 +135,8 @@ impl Hasher for MurmurHash3X64128 { let wanted = 16 - self.buf_len; self.buf[self.buf_len..].copy_from_slice(&bytes[..wanted]); - let k1 = LE::read_u64(&self.buf[0..8]); - let k2 = LE::read_u64(&self.buf[8..16]); + let k1 = super::read_u64_le(&self.buf[0..8]); + let k2 = super::read_u64_le(&self.buf[8..16]); self.update(k1, k2); bytes = &bytes[wanted..]; @@ -160,8 +152,8 @@ impl Hasher for MurmurHash3X64128 { let lo = i << 4; let mi = lo + 8; let hi = mi + 8; - let k1 = LE::read_u64(&bytes[lo..mi]); - let k2 = LE::read_u64(&bytes[mi..hi]); + let k1 = super::read_u64_le(&bytes[lo..mi]); + let k2 = super::read_u64_le(&bytes[mi..hi]); self.update(k1, k2); } diff --git a/datasketches/src/hash/xxhash.rs b/datasketches/src/hash/xxhash.rs index d1f0eed..6232c04 100644 --- a/datasketches/src/hash/xxhash.rs +++ b/datasketches/src/hash/xxhash.rs @@ -17,9 +17,6 @@ use std::hash::Hasher; -use byteorder::ByteOrder; -use byteorder::LE; - const DEFAULT_SEED: u64 = 0; // Unsigned 64-bit primes from xxhash64. @@ -79,7 +76,7 @@ impl XxHash64 { let mut idx = 0; let buf = &self.buffer[..self.buffer_len]; while idx + 8 <= buf.len() { - let mut k1 = LE::read_u64(&buf[idx..idx + 8]); + let mut k1 = super::read_u64_le(&buf[idx..idx + 8]); k1 = k1.wrapping_mul(P2); k1 = k1.rotate_left(31); k1 = k1.wrapping_mul(P1); @@ -89,7 +86,7 @@ impl XxHash64 { } if idx + 4 <= buf.len() { - let k1 = LE::read_u32(&buf[idx..idx + 4]) as u64; + let k1 = super::read_u64_le(&buf[idx..idx + 4]); hash ^= k1.wrapping_mul(P1); hash = hash.rotate_left(23).wrapping_mul(P2).wrapping_add(P3); idx += 4; @@ -119,10 +116,10 @@ impl XxHash64 { #[inline] fn update(&mut self, chunk: &[u8]) { - self.v1 = round(self.v1, LE::read_u64(&chunk[0..8])); - self.v2 = round(self.v2, LE::read_u64(&chunk[8..16])); - self.v3 = round(self.v3, LE::read_u64(&chunk[16..24])); - self.v4 = round(self.v4, LE::read_u64(&chunk[24..32])); + self.v1 = round(self.v1, super::read_u64_le(&chunk[0..8])); + self.v2 = round(self.v2, super::read_u64_le(&chunk[8..16])); + self.v3 = round(self.v3, super::read_u64_le(&chunk[16..24])); + self.v4 = round(self.v4, super::read_u64_le(&chunk[24..32])); } } diff --git a/datasketches/src/lib.rs b/datasketches/src/lib.rs index c7c2a14..7c18cfb 100644 --- a/datasketches/src/lib.rs +++ b/datasketches/src/lib.rs @@ -35,4 +35,5 @@ pub mod error; pub mod hll; pub mod tdigest; +mod codec; mod hash; diff --git a/datasketches/src/tdigest/sketch.rs b/datasketches/src/tdigest/sketch.rs index a0f3883..458d1c6 100644 --- a/datasketches/src/tdigest/sketch.rs +++ b/datasketches/src/tdigest/sketch.rs @@ -15,19 +15,15 @@ // specific language governing permissions and limitations // under the License. +use crate::codec::SketchSlice; +use crate::error::Error; +use crate::error::ErrorKind; +use crate::tdigest::serialization::*; use std::cmp::Ordering; use std::convert::identity; use std::io::Cursor; use std::num::NonZeroU64; -use byteorder::BE; -use byteorder::LE; -use byteorder::ReadBytesExt; - -use crate::error::Error; -use crate::error::ErrorKind; -use crate::tdigest::serialization::*; - /// The default value of K if one is not specified. const DEFAULT_K: u16 = 200; /// Multiplier for buffer size relative to centroids capacity. @@ -375,7 +371,7 @@ impl TDigestMut { move |_| Error::insufficient_data(tag) } - let mut cursor = Cursor::new(bytes); + let mut cursor = SketchSlice::new(bytes); let preamble_longs = cursor.read_u8().map_err(make_error("preamble_longs"))?; let serial_version = cursor.read_u8().map_err(make_error("serial_version"))?; @@ -397,7 +393,7 @@ impl TDigestMut { serial_version, )); } - let k = cursor.read_u16::().map_err(make_error("k"))?; + let k = cursor.read_u16_le().map_err(make_error("k"))?; if k < 10 { return Err(Error::deserial(format!("k must be at least 10, got {k}"))); } @@ -415,7 +411,7 @@ impl TDigestMut { preamble_longs, )); } - cursor.read_u16::().map_err(make_error(""))?; // unused + cursor.read_u16_le().map_err(make_error(""))?; // unused if is_empty { return Ok(TDigestMut::new(k)); } @@ -423,13 +419,9 @@ impl TDigestMut { let reverse_merge = (flags & FLAGS_REVERSE_MERGE) != 0; if is_single_value { let value = if is_f32 { - cursor - .read_f32::() - .map_err(make_error("single_value"))? as f64 + cursor.read_f32_le().map_err(make_error("single_value"))? as f64 } else { - cursor - .read_f64::() - .map_err(make_error("single_value"))? + cursor.read_f64_le().map_err(make_error("single_value"))? }; check_non_nan(value, "single_value")?; check_finite(value, "single_value")?; @@ -446,21 +438,17 @@ impl TDigestMut { vec![], )); } - let num_centroids = cursor - .read_u32::() - .map_err(make_error("num_centroids"))? as usize; - let num_buffered = cursor - .read_u32::() - .map_err(make_error("num_buffered"))? as usize; + let num_centroids = cursor.read_u32_le().map_err(make_error("num_centroids"))? as usize; + let num_buffered = cursor.read_u32_le().map_err(make_error("num_buffered"))? as usize; let (min, max) = if is_f32 { ( - cursor.read_f32::().map_err(make_error("min"))? as f64, - cursor.read_f32::().map_err(make_error("max"))? as f64, + cursor.read_f32_le().map_err(make_error("min"))? as f64, + cursor.read_f32_le().map_err(make_error("max"))? as f64, ) } else { ( - cursor.read_f64::().map_err(make_error("min"))?, - cursor.read_f64::().map_err(make_error("max"))?, + cursor.read_f64_le().map_err(make_error("min"))?, + cursor.read_f64_le().map_err(make_error("max"))?, ) }; check_non_nan(min, "min")?; @@ -470,13 +458,13 @@ impl TDigestMut { for _ in 0..num_centroids { let (mean, weight) = if is_f32 { ( - cursor.read_f32::().map_err(make_error("mean"))? as f64, - cursor.read_u32::().map_err(make_error("weight"))? as u64, + cursor.read_f32_le().map_err(make_error("mean"))? as f64, + cursor.read_u32_le().map_err(make_error("weight"))? as u64, ) } else { ( - cursor.read_f64::().map_err(make_error("mean"))?, - cursor.read_u64::().map_err(make_error("weight"))?, + cursor.read_f64_le().map_err(make_error("mean"))?, + cursor.read_u64_le().map_err(make_error("weight"))?, ) }; check_non_nan(mean, "centroid mean")?; @@ -488,13 +476,9 @@ impl TDigestMut { let mut buffer = Vec::with_capacity(num_buffered); for _ in 0..num_buffered { let value = if is_f32 { - cursor - .read_f32::() - .map_err(make_error("buffered_value"))? as f64 + cursor.read_f32_le().map_err(make_error("buffered_value"))? as f64 } else { - cursor - .read_f64::() - .map_err(make_error("buffered_value"))? + cursor.read_f64_le().map_err(make_error("buffered_value"))? }; check_non_nan(value, "buffered_value mean")?; check_finite(value, "buffered_value mean")?; @@ -518,34 +502,32 @@ impl TDigestMut { move |_| Error::insufficient_data_of("compat format", tag) } - let mut cursor = Cursor::new(bytes); + let mut cursor = SketchSlice::new(bytes); - let ty = cursor.read_u32::().map_err(make_error("type"))?; + let ty = cursor.read_u32_be().map_err(make_error("type"))?; match ty { COMPAT_DOUBLE => { fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) -> Error { move |_| Error::insufficient_data_of("compat double format", tag) } // compatibility with asBytes() - let min = cursor.read_f64::().map_err(make_error("min"))?; - let max = cursor.read_f64::().map_err(make_error("max"))?; + let min = cursor.read_f64_be().map_err(make_error("min"))?; + let max = cursor.read_f64_be().map_err(make_error("max"))?; check_non_nan(min, "min in compat double format")?; check_non_nan(max, "max in compat double format")?; - let k = cursor.read_f64::().map_err(make_error("k"))? as u16; + let k = cursor.read_f64_be().map_err(make_error("k"))? as u16; if k < 10 { return Err(Error::deserial(format!( "k must be at least 10 in compat double format, got {k}" ))); } - let num_centroids = cursor - .read_u32::() - .map_err(make_error("num_centroids"))? - as usize; + let num_centroids = + cursor.read_u32_be().map_err(make_error("num_centroids"))? as usize; let mut total_weight = 0u64; let mut centroids = Vec::with_capacity(num_centroids); for _ in 0..num_centroids { - let weight = cursor.read_f64::().map_err(make_error("weight"))? as u64; - let mean = cursor.read_f64::().map_err(make_error("mean"))?; + let weight = cursor.read_f64_be().map_err(make_error("weight"))? as u64; + let mean = cursor.read_f64_be().map_err(make_error("mean"))?; let weight = check_nonzero(weight, "centroid weight in compat double format")?; check_non_nan(mean, "centroid mean in compat double format")?; check_finite(mean, "centroid mean in compat double format")?; @@ -568,11 +550,11 @@ impl TDigestMut { } // COMPAT_FLOAT: compatibility with asSmallBytes() // reference implementation uses doubles for min and max - let min = cursor.read_f64::().map_err(make_error("min"))?; - let max = cursor.read_f64::().map_err(make_error("max"))?; + let min = cursor.read_f64_be().map_err(make_error("min"))?; + let max = cursor.read_f64_be().map_err(make_error("max"))?; check_non_nan(min, "min in compat float format")?; check_non_nan(max, "max in compat float format")?; - let k = cursor.read_f32::().map_err(make_error("k"))? as u16; + let k = cursor.read_f32_be().map_err(make_error("k"))? as u16; if k < 10 { return Err(Error::deserial(format!( "k must be at least 10 in compat float format, got {k}" @@ -580,16 +562,14 @@ impl TDigestMut { } // reference implementation stores capacities of the array of centroids and the // buffer as shorts they can be derived from k in the constructor - cursor.read_u32::().map_err(make_error(""))?; - let num_centroids = cursor - .read_u16::() - .map_err(make_error("num_centroids"))? - as usize; + cursor.read_u32_be().map_err(make_error(""))?; + let num_centroids = + cursor.read_u16_be().map_err(make_error("num_centroids"))? as usize; let mut total_weight = 0u64; let mut centroids = Vec::with_capacity(num_centroids); for _ in 0..num_centroids { - let weight = cursor.read_f32::().map_err(make_error("weight"))? as u64; - let mean = cursor.read_f32::().map_err(make_error("mean"))? as f64; + let weight = cursor.read_f32_be().map_err(make_error("weight"))? as u64; + let mean = cursor.read_f32_be().map_err(make_error("mean"))? as f64; let weight = check_nonzero(weight, "centroid weight in compat float format")?; check_non_nan(mean, "centroid mean in compat float format")?; check_finite(mean, "centroid mean in compat float format")?; From ade728e4a60ec971f952f15f8c0497f9123abcc3 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 30 Dec 2025 20:52:56 +0800 Subject: [PATCH 2/5] for hll Signed-off-by: tison --- datasketches/src/hll/array4.rs | 41 ++++++++++++++++---------------- datasketches/src/hll/array6.rs | 33 ++++++++++++------------- datasketches/src/hll/array8.rs | 33 ++++++++++++------------- datasketches/src/hll/hash_set.rs | 31 ++++++++++++------------ datasketches/src/hll/list.rs | 28 +++++++++++----------- 5 files changed, 83 insertions(+), 83 deletions(-) diff --git a/datasketches/src/hll/array4.rs b/datasketches/src/hll/array4.rs index fbef5e4..0014ad4 100644 --- a/datasketches/src/hll/array4.rs +++ b/datasketches/src/hll/array4.rs @@ -21,6 +21,7 @@ //! When values exceed 4 bits after cur_min offset, they're stored in an auxiliary hash map. use super::aux_map::AuxMap; +use crate::codec::SketchBytes; use crate::error::Error; use crate::hll::NumStdDev; use crate::hll::estimator::HipEstimator; @@ -388,51 +389,49 @@ impl Array4 { let aux_count = aux_entries.len() as u32; let total_size = HLL_PREAMBLE_SIZE + num_bytes + (aux_count as usize * COUPON_SIZE_BYTES); - let mut bytes = vec![0u8; total_size]; + let mut bytes = SketchBytes::with_capacity(total_size); // Write standard header - bytes[PREAMBLE_INTS_BYTE] = HLL_PREINTS; - bytes[SER_VER_BYTE] = SERIAL_VER; - bytes[FAMILY_BYTE] = HLL_FAMILY_ID; - bytes[LG_K_BYTE] = lg_config_k; - bytes[LG_ARR_BYTE] = 0; // Not used for HLL mode + bytes.write_u8(HLL_PREINTS); + bytes.write_u8(SERIAL_VER); + bytes.write_u8(HLL_FAMILY_ID); + bytes.write_u8(lg_config_k); + bytes.write_u8(0); // unused for HLL mode // Write flags let mut flags = 0u8; if self.estimator.is_out_of_order() { flags |= OUT_OF_ORDER_FLAG_MASK; } - bytes[FLAGS_BYTE] = flags; + bytes.write_u8(flags); // Write cur_min - bytes[HLL_CUR_MIN_BYTE] = self.cur_min; + bytes.write_u8(self.cur_min); // Mode byte: HLL mode with HLL4 type - bytes[MODE_BYTE] = encode_mode_byte(CUR_MODE_HLL, TGT_HLL4); + bytes.write_u8(encode_mode_byte(CUR_MODE_HLL, TGT_HLL4)); // Write HIP estimator values - write_f64_le(&mut bytes, HIP_ACCUM_DOUBLE, self.estimator.hip_accum()); - write_f64_le(&mut bytes, KXQ0_DOUBLE, self.estimator.kxq0()); - write_f64_le(&mut bytes, KXQ1_DOUBLE, self.estimator.kxq1()); + bytes.write_f64_le(self.estimator.hip_accum()); + bytes.write_f64_le(self.estimator.kxq0()); + bytes.write_f64_le(self.estimator.kxq1()); // Write num_at_cur_min - write_u32_le(&mut bytes, CUR_MIN_COUNT_INT, self.num_at_cur_min); + bytes.write_u32_le(self.num_at_cur_min); // Write aux_count - write_u32_le(&mut bytes, AUX_COUNT_INT, aux_count); + bytes.write_u32_le(aux_count); // Write packed 4-bit byte array - bytes[HLL_BYTE_ARR_START..HLL_BYTE_ARR_START + num_bytes].copy_from_slice(&self.bytes); + bytes.write(&self.bytes); // Write aux map entries if present - let aux_start = HLL_BYTE_ARR_START + num_bytes; - for (i, (slot, value)) in aux_entries.iter().enumerate() { - let offset = aux_start + (i * COUPON_SIZE_BYTES); - let coupon = pack_coupon(*slot, *value); - write_u32_le(&mut bytes, offset, coupon); + for (slot, value) in aux_entries.iter().copied() { + let coupon = pack_coupon(slot, value); + bytes.write_u32_le(coupon); } - bytes + bytes.into_bytes() } } diff --git a/datasketches/src/hll/array6.rs b/datasketches/src/hll/array6.rs index 5d36cb5..a11276e 100644 --- a/datasketches/src/hll/array6.rs +++ b/datasketches/src/hll/array6.rs @@ -21,6 +21,7 @@ //! This is sufficient for most HLL use cases without needing exception handling or //! cur_min optimization like Array4. +use crate::codec::SketchBytes; use crate::error::Error; use crate::hll::NumStdDev; use crate::hll::estimator::HipEstimator; @@ -230,43 +231,43 @@ impl Array6 { let k = 1 << lg_config_k; let num_bytes = num_bytes_for_k(k); let total_size = HLL_PREAMBLE_SIZE + num_bytes; - let mut bytes = vec![0u8; total_size]; + let mut bytes = SketchBytes::with_capacity(total_size); // Write standard header - bytes[PREAMBLE_INTS_BYTE] = HLL_PREINTS; - bytes[SER_VER_BYTE] = SERIAL_VER; - bytes[FAMILY_BYTE] = HLL_FAMILY_ID; - bytes[LG_K_BYTE] = lg_config_k; - bytes[LG_ARR_BYTE] = 0; // Not used for HLL mode + bytes.write_u8(HLL_PREINTS); + bytes.write_u8(SERIAL_VER); + bytes.write_u8(HLL_FAMILY_ID); + bytes.write_u8(lg_config_k); + bytes.write_u8(0); // unused for HLL mode // Write flags let mut flags = 0u8; if self.estimator.is_out_of_order() { flags |= OUT_OF_ORDER_FLAG_MASK; } - bytes[FLAGS_BYTE] = flags; + bytes.write_u8(flags); // cur_min is always 0 for Array6 - bytes[HLL_CUR_MIN_BYTE] = 0; + bytes.write_u8(0); // Mode byte: HLL mode with HLL6 type - bytes[MODE_BYTE] = encode_mode_byte(CUR_MODE_HLL, TGT_HLL6); + bytes.write_u8(encode_mode_byte(CUR_MODE_HLL, TGT_HLL6)); // Write HIP estimator values - write_f64_le(&mut bytes, HIP_ACCUM_DOUBLE, self.estimator.hip_accum()); - write_f64_le(&mut bytes, KXQ0_DOUBLE, self.estimator.kxq0()); - write_f64_le(&mut bytes, KXQ1_DOUBLE, self.estimator.kxq1()); + bytes.write_f64_le(self.estimator.hip_accum()); + bytes.write_f64_le(self.estimator.kxq0()); + bytes.write_f64_le(self.estimator.kxq1()); // Write num_at_cur_min (num_zeros for Array6) - write_u32_le(&mut bytes, CUR_MIN_COUNT_INT, self.num_zeros); + bytes.write_u32_le(self.num_zeros); // Write aux_count (always 0 for Array6) - write_u32_le(&mut bytes, AUX_COUNT_INT, 0); + bytes.write_u32_le(0); // Write packed byte array - bytes[HLL_BYTE_ARR_START..].copy_from_slice(&self.bytes); + bytes.write(&self.bytes); - bytes + bytes.into_bytes() } } diff --git a/datasketches/src/hll/array8.rs b/datasketches/src/hll/array8.rs index dd3b556..e1cadb6 100644 --- a/datasketches/src/hll/array8.rs +++ b/datasketches/src/hll/array8.rs @@ -20,6 +20,7 @@ //! Array8 is the simplest HLL array implementation, storing one byte per slot. //! This provides the maximum value range (0-255) with no bit-packing complexity. +use crate::codec::SketchBytes; use crate::error::Error; use crate::hll::NumStdDev; use crate::hll::estimator::HipEstimator; @@ -302,43 +303,43 @@ impl Array8 { let k = 1 << lg_config_k; let total_size = HLL_PREAMBLE_SIZE + k as usize; - let mut bytes = vec![0u8; total_size]; + let mut bytes = SketchBytes::with_capacity(total_size); // Write standard header - bytes[PREAMBLE_INTS_BYTE] = HLL_PREINTS; - bytes[SER_VER_BYTE] = SERIAL_VER; - bytes[FAMILY_BYTE] = HLL_FAMILY_ID; - bytes[LG_K_BYTE] = lg_config_k; - bytes[LG_ARR_BYTE] = 0; // Not used for HLL mode + bytes.write_u8(HLL_PREINTS); + bytes.write_u8(SERIAL_VER); + bytes.write_u8(HLL_FAMILY_ID); + bytes.write_u8(lg_config_k); + bytes.write_u8(0); // unused for HLL mode // Write flags let mut flags = 0u8; if self.estimator.is_out_of_order() { flags |= OUT_OF_ORDER_FLAG_MASK; } - bytes[FLAGS_BYTE] = flags; + bytes.write_u8(flags); // cur_min is always 0 for Array8 - bytes[HLL_CUR_MIN_BYTE] = 0; + bytes.write_u8(0); // Mode byte: HLL mode with HLL8 type - bytes[MODE_BYTE] = encode_mode_byte(CUR_MODE_HLL, TGT_HLL8); + bytes.write_u8(encode_mode_byte(CUR_MODE_HLL, TGT_HLL8)); // Write HIP estimator values - write_f64_le(&mut bytes, HIP_ACCUM_DOUBLE, self.estimator.hip_accum()); - write_f64_le(&mut bytes, KXQ0_DOUBLE, self.estimator.kxq0()); - write_f64_le(&mut bytes, KXQ1_DOUBLE, self.estimator.kxq1()); + bytes.write_f64_le(self.estimator.hip_accum()); + bytes.write_f64_le(self.estimator.kxq0()); + bytes.write_f64_le(self.estimator.kxq1()); // Write num_at_cur_min (num_zeros for Array8) - write_u32_le(&mut bytes, CUR_MIN_COUNT_INT, self.num_zeros); + bytes.write_u32_le(self.num_zeros); // Write aux_count (always 0 for Array8) - write_u32_le(&mut bytes, AUX_COUNT_INT, 0); + bytes.write_u32_le(0); // Write byte array - bytes[HLL_BYTE_ARR_START..].copy_from_slice(&self.bytes); + bytes.write(&self.bytes); - bytes + bytes.into_bytes() } } diff --git a/datasketches/src/hll/hash_set.rs b/datasketches/src/hll/hash_set.rs index f6bff05..0cee817 100644 --- a/datasketches/src/hll/hash_set.rs +++ b/datasketches/src/hll/hash_set.rs @@ -20,6 +20,7 @@ //! Uses open addressing with a custom stride function to handle collisions. //! Provides better performance than List when many coupons are stored. +use crate::codec::SketchBytes; use crate::error::Error; use crate::hll::HllType; use crate::hll::KEY_MASK_26; @@ -149,27 +150,27 @@ impl HashSet { let array_size = if compact { coupon_count } else { 1 << lg_arr }; let total_size = HASH_SET_INT_ARR_START + (array_size * 4); - let mut bytes = vec![0u8; total_size]; + let mut bytes = SketchBytes::with_capacity(total_size); // Write preamble - bytes[PREAMBLE_INTS_BYTE] = HASH_SET_PREINTS; - bytes[SER_VER_BYTE] = SERIAL_VER; - bytes[FAMILY_BYTE] = HLL_FAMILY_ID; - bytes[LG_K_BYTE] = lg_config_k; - bytes[LG_ARR_BYTE] = lg_arr as u8; + bytes.write_u8(HASH_SET_PREINTS); + bytes.write_u8(SERIAL_VER); + bytes.write_u8(HLL_FAMILY_ID); + bytes.write_u8(lg_config_k); + bytes.write_u8(lg_arr as u8); // Write flags let mut flags = 0u8; if compact { flags |= COMPACT_FLAG_MASK; } - bytes[FLAGS_BYTE] = flags; + bytes.write_u8(flags); // Write mode byte: SET mode with target HLL type - bytes[MODE_BYTE] = encode_mode_byte(CUR_MODE_SET, hll_type as u8); + bytes.write_u8(encode_mode_byte(CUR_MODE_SET, hll_type as u8)); // Write coupon count - write_u32_le(&mut bytes, HASH_SET_COUNT_INT, coupon_count as u32); + bytes.write_u32_le(coupon_count as u32); // Write coupons if compact { @@ -183,18 +184,16 @@ impl HashSet { .collect(); coupons_vec.sort_unstable(); - for (i, coupon) in coupons_vec.iter().enumerate() { - let offset = HASH_SET_INT_ARR_START + i * 4; - bytes[offset..offset + 4].copy_from_slice(&coupon.to_le_bytes()); + for coupon in coupons_vec.iter().copied() { + bytes.write_u32_le(coupon); } } else { // Non-compact mode: write entire hash table - for (i, coupon) in self.container.coupons.iter().enumerate() { - let offset = HASH_SET_INT_ARR_START + i * 4; - bytes[offset..offset + 4].copy_from_slice(&coupon.to_le_bytes()); + for coupon in self.container.coupons.iter().copied() { + bytes.write_u32_le(coupon); } } - bytes + bytes.into_bytes() } } diff --git a/datasketches/src/hll/list.rs b/datasketches/src/hll/list.rs index c705383..affba4a 100644 --- a/datasketches/src/hll/list.rs +++ b/datasketches/src/hll/list.rs @@ -20,6 +20,7 @@ //! Provides sequential storage with linear search for duplicates. //! Efficient for small numbers of coupons before transitioning to HashSet. +use crate::codec::SketchBytes; use crate::error::Error; use crate::hll::HllType; use crate::hll::container::COUPON_EMPTY; @@ -109,14 +110,14 @@ impl List { let array_size = if compact { coupon_count } else { 1 << lg_arr }; let total_size = LIST_INT_ARR_START + (array_size * 4); - let mut bytes = vec![0u8; total_size]; + let mut bytes = SketchBytes::with_capacity(total_size); // Write preamble - bytes[PREAMBLE_INTS_BYTE] = LIST_PREINTS; - bytes[SER_VER_BYTE] = SERIAL_VER; - bytes[FAMILY_BYTE] = HLL_FAMILY_ID; - bytes[LG_K_BYTE] = lg_config_k; - bytes[LG_ARR_BYTE] = lg_arr as u8; + bytes.write_u8(LIST_PREINTS); + bytes.write_u8(SERIAL_VER); + bytes.write_u8(HLL_FAMILY_ID); + bytes.write_u8(lg_config_k); + bytes.write_u8(lg_arr as u8); // Write flags let mut flags = 0u8; @@ -126,23 +127,22 @@ impl List { if compact { flags |= COMPACT_FLAG_MASK; } - bytes[FLAGS_BYTE] = flags; + bytes.write_u8(flags); // Write count - bytes[LIST_COUNT_BYTE] = coupon_count as u8; + bytes.write_u8(coupon_count as u8); // Write mode byte: LIST mode with target HLL type - bytes[MODE_BYTE] = encode_mode_byte(CUR_MODE_LIST, hll_type as u8); + bytes.write_u8(encode_mode_byte(CUR_MODE_LIST, hll_type as u8)); // Write coupons (only non-empty ones if compact) if !empty { let mut write_idx = 0; - for coupon in &self.container.coupons { - if compact && *coupon == 0 { + for coupon in self.container.coupons.iter().copied() { + if compact && coupon == 0 { continue; // Skip empty coupons in compact mode } - let offset = LIST_INT_ARR_START + write_idx * 4; - write_u32_le(&mut bytes, offset, *coupon); + bytes.write_u32_le(coupon); write_idx += 1; if write_idx >= array_size { break; @@ -150,6 +150,6 @@ impl List { } } - bytes + bytes.into_bytes() } } From 4612c424b378a2dd0359fecb9602ff9562a26618 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 30 Dec 2025 22:01:58 +0800 Subject: [PATCH 3/5] deser hll Signed-off-by: tison --- datasketches/src/codec.rs | 11 +- datasketches/src/hll/array4.rs | 57 ++++------ datasketches/src/hll/array6.rs | 33 +++--- datasketches/src/hll/array8.rs | 33 +++--- datasketches/src/hll/hash_set.rs | 49 ++++----- datasketches/src/hll/list.rs | 33 +++--- datasketches/src/hll/serialization.rs | 106 +------------------ datasketches/src/hll/sketch.rs | 56 ++++++---- datasketches/src/tdigest/sketch.rs | 1 - datasketches/tests/hll_serialization_test.rs | 8 +- 10 files changed, 132 insertions(+), 255 deletions(-) diff --git a/datasketches/src/codec.rs b/datasketches/src/codec.rs index 94f2439..a057f86 100644 --- a/datasketches/src/codec.rs +++ b/datasketches/src/codec.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#![allow(dead_code)] + use std::io; use std::io::{Cursor, Read}; @@ -23,10 +25,6 @@ pub(crate) struct SketchBytes { } impl SketchBytes { - pub fn new() -> Self { - Self { bytes: vec![] } - } - pub fn with_capacity(capacity: usize) -> Self { Self { bytes: Vec::with_capacity(capacity), @@ -125,6 +123,11 @@ impl SketchSlice<'_> { } } + pub fn advance(&mut self, n: u64) { + let pos = self.slice.position(); + self.slice.set_position(pos + n); + } + pub fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> { self.slice.read_exact(buf) } diff --git a/datasketches/src/hll/array4.rs b/datasketches/src/hll/array4.rs index 0014ad4..85f8b26 100644 --- a/datasketches/src/hll/array4.rs +++ b/datasketches/src/hll/array4.rs @@ -21,7 +21,7 @@ //! When values exceed 4 bits after cur_min offset, they're stored in an auxiliary hash map. use super::aux_map::AuxMap; -use crate::codec::SketchBytes; +use crate::codec::{SketchBytes, SketchSlice}; use crate::error::Error; use crate::hll::NumStdDev; use crate::hll::estimator::HipEstimator; @@ -286,67 +286,48 @@ impl Array4 { /// /// Expects full HLL preamble (40 bytes) followed by packed 4-bit data and optional aux map. pub fn deserialize( - bytes: &[u8], + mut cursor: SketchSlice, + cur_min: u8, lg_config_k: u8, compact: bool, ooo: bool, ) -> Result { use crate::hll::get_slot; use crate::hll::get_value; - use crate::hll::serialization::*; - if bytes.len() < HLL_PREAMBLE_SIZE { - return Err(Error::insufficient_data(format!( - "expected at least {}, got {}", - HLL_PREAMBLE_SIZE, - bytes.len() - ))); + fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) -> Error { + move |_| Error::insufficient_data(tag) } let num_bytes = 1 << (lg_config_k - 1); // k/2 bytes for 4-bit packing - // Read cur_min from header - let cur_min = bytes[HLL_CUR_MIN_BYTE]; - // Read HIP estimator values from preamble - let hip_accum = read_f64_le(bytes, HIP_ACCUM_DOUBLE); - let kxq0 = read_f64_le(bytes, KXQ0_DOUBLE); - let kxq1 = read_f64_le(bytes, KXQ1_DOUBLE); + let hip_accum = cursor.read_f64_le().map_err(make_error("hip_accum"))?; + let kxq0 = cursor.read_f64_le().map_err(make_error("kxq0"))?; + let kxq1 = cursor.read_f64_le().map_err(make_error("kxq1"))?; // Read num_at_cur_min and aux_count - let num_at_cur_min = read_u32_le(bytes, CUR_MIN_COUNT_INT); - let aux_count = read_u32_le(bytes, AUX_COUNT_INT); - - // Calculate expected length - let expected_len = if compact { - HLL_PREAMBLE_SIZE // Just preamble for compact empty sketch - } else { - HLL_PREAMBLE_SIZE + num_bytes + (aux_count as usize * COUPON_SIZE_BYTES) - }; + let num_at_cur_min = cursor.read_u32_le().map_err(make_error("num_at_cur_min"))?; + let aux_count = cursor.read_u32_le().map_err(make_error("aux_count"))?; - if bytes.len() < expected_len { - return Err(Error::insufficient_data(format!( - "expected {}, got {}", - expected_len, - bytes.len() - ))); - } - - // Read packed 4-bit byte array from HLL_BYTE_ARR_START + // Read packed 4-bit byte array let mut data = vec![0u8; num_bytes]; if !compact { - data.copy_from_slice(&bytes[HLL_BYTE_ARR_START..HLL_BYTE_ARR_START + num_bytes]); + cursor.read_exact(&mut data).map_err(make_error("data"))?; + } else { + cursor.advance(num_bytes as u64); } // Read aux map if present let mut aux_map = None; if aux_count > 0 { let mut aux = AuxMap::new(lg_config_k); - let aux_start = HLL_BYTE_ARR_START + num_bytes; - for i in 0..aux_count { - let offset = aux_start + (i as usize * COUPON_SIZE_BYTES); - let coupon = read_u32_le(bytes, offset); + let coupon = cursor.read_u32_le().map_err(|_| { + Error::insufficient_data(format!( + "expected {aux_count} aux coupons, failed at index {i}", + )) + })?; let slot = get_slot(coupon) & ((1 << lg_config_k) - 1); let value = get_value(coupon); aux.insert(slot, value); diff --git a/datasketches/src/hll/array6.rs b/datasketches/src/hll/array6.rs index a11276e..0bba28a 100644 --- a/datasketches/src/hll/array6.rs +++ b/datasketches/src/hll/array6.rs @@ -21,7 +21,7 @@ //! This is sufficient for most HLL use cases without needing exception handling or //! cur_min optimization like Array4. -use crate::codec::SketchBytes; +use crate::codec::{SketchBytes, SketchSlice}; use crate::error::Error; use crate::hll::NumStdDev; use crate::hll::estimator::HipEstimator; @@ -170,41 +170,32 @@ impl Array6 { /// /// Expects full HLL preamble (40 bytes) followed by packed 6-bit data. pub fn deserialize( - bytes: &[u8], + mut cursor: SketchSlice, lg_config_k: u8, compact: bool, ooo: bool, ) -> Result { - use crate::hll::serialization::*; + fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) -> Error { + move |_| Error::insufficient_data(tag) + } let k = 1 << lg_config_k; let num_bytes = num_bytes_for_k(k); - let expected_len = if compact { - HLL_PREAMBLE_SIZE // Just preamble for compact empty sketch - } else { - HLL_PREAMBLE_SIZE + num_bytes - }; - - if bytes.len() < expected_len { - return Err(Error::insufficient_data(format!( - "expected {}, got {}", - expected_len, - bytes.len() - ))); - } // Read HIP estimator values from preamble - let hip_accum = read_f64_le(bytes, HIP_ACCUM_DOUBLE); - let kxq0 = read_f64_le(bytes, KXQ0_DOUBLE); - let kxq1 = read_f64_le(bytes, KXQ1_DOUBLE); + let hip_accum = cursor.read_f64_le().map_err(make_error("hip_accum"))?; + let kxq0 = cursor.read_f64_le().map_err(make_error("kxq0"))?; + let kxq1 = cursor.read_f64_le().map_err(make_error("kxq1"))?; // Read num_at_cur_min (for Array6, this is num_zeros since cur_min=0) - let num_zeros = read_u32_le(bytes, CUR_MIN_COUNT_INT); + let num_zeros = cursor.read_u32_le().map_err(make_error("num_zeros"))?; // Read packed byte array from offset HLL_BYTE_ARR_START let mut data = vec![0u8; num_bytes]; if !compact { - data.copy_from_slice(&bytes[HLL_BYTE_ARR_START..HLL_BYTE_ARR_START + num_bytes]); + cursor.read_exact(&mut data).map_err(make_error("data"))?; + } else { + cursor.advance(num_bytes as u64); } // Create estimator and restore state diff --git a/datasketches/src/hll/array8.rs b/datasketches/src/hll/array8.rs index e1cadb6..1f1d8f4 100644 --- a/datasketches/src/hll/array8.rs +++ b/datasketches/src/hll/array8.rs @@ -20,7 +20,7 @@ //! Array8 is the simplest HLL array implementation, storing one byte per slot. //! This provides the maximum value range (0-255) with no bit-packing complexity. -use crate::codec::SketchBytes; +use crate::codec::{SketchBytes, SketchSlice}; use crate::error::Error; use crate::hll::NumStdDev; use crate::hll::estimator::HipEstimator; @@ -244,40 +244,31 @@ impl Array8 { /// /// Expects full HLL preamble (40 bytes) followed by k bytes of data. pub fn deserialize( - bytes: &[u8], + mut cursor: SketchSlice, lg_config_k: u8, compact: bool, ooo: bool, ) -> Result { - use crate::hll::serialization::*; + fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) -> Error { + move |_| Error::insufficient_data(tag) + } let k = 1 << lg_config_k; - let expected_len = if compact { - HLL_PREAMBLE_SIZE // Just preamble for compact empty sketch - } else { - HLL_PREAMBLE_SIZE + k as usize - }; - - if bytes.len() < expected_len { - return Err(Error::insufficient_data(format!( - "expected {}, got {}", - expected_len, - bytes.len() - ))); - } // Read HIP estimator values from preamble - let hip_accum = read_f64_le(bytes, HIP_ACCUM_DOUBLE); - let kxq0 = read_f64_le(bytes, KXQ0_DOUBLE); - let kxq1 = read_f64_le(bytes, KXQ1_DOUBLE); + let hip_accum = cursor.read_f64_le().map_err(make_error("hip_accum"))?; + let kxq0 = cursor.read_f64_le().map_err(make_error("kxq0"))?; + let kxq1 = cursor.read_f64_le().map_err(make_error("kxq1"))?; // Read num_at_cur_min (for Array8, this is num_zeros since cur_min=0) - let num_zeros = read_u32_le(bytes, CUR_MIN_COUNT_INT); + let num_zeros = cursor.read_u32_le().map_err(make_error("num_zeros"))?; // Read byte array from offset HLL_BYTE_ARR_START let mut data = vec![0u8; k as usize]; if !compact { - data.copy_from_slice(&bytes[HLL_BYTE_ARR_START..HLL_BYTE_ARR_START + k as usize]); + cursor.read_exact(&mut data).map_err(make_error("data"))?; + } else { + cursor.advance(k as u64); } // Create estimator and restore state diff --git a/datasketches/src/hll/hash_set.rs b/datasketches/src/hll/hash_set.rs index 0cee817..9e7f1af 100644 --- a/datasketches/src/hll/hash_set.rs +++ b/datasketches/src/hll/hash_set.rs @@ -20,7 +20,7 @@ //! Uses open addressing with a custom stride function to handle collisions. //! Provides better performance than List when many coupons are stored. -use crate::codec::SketchBytes; +use crate::codec::{SketchBytes, SketchSlice}; use crate::error::Error; use crate::hll::HllType; use crate::hll::KEY_MASK_26; @@ -85,49 +85,42 @@ impl HashSet { } /// Deserialize a HashSet from bytes - pub fn deserialize(bytes: &[u8], compact: bool) -> Result { + pub fn deserialize( + mut cursor: SketchSlice, + lg_arr: usize, + compact: bool, + ) -> Result { // Read coupon count from bytes 8-11 - let coupon_count = read_u32_le(bytes, HASH_SET_COUNT_INT) as usize; - - // Compute array size - let lg_arr = bytes[LG_ARR_BYTE] as usize; + let coupon_count = cursor + .read_u32_le() + .map_err(|_| Error::insufficient_data("coupon_count"))?; + let coupon_count = coupon_count as usize; if compact { // Compact mode: only couponCount coupons are stored - let expected_len = HASH_SET_INT_ARR_START + (coupon_count * 4); - if bytes.len() < expected_len { - return Err(Error::insufficient_data(format!( - "expected {}, got {}", - expected_len, - bytes.len() - ))); - } - // Create a new hash set and insert coupons one by one let mut hash_set = HashSet::new(lg_arr); for i in 0..coupon_count { - let offset = HASH_SET_INT_ARR_START + i * COUPON_SIZE_BYTES; - let coupon = read_u32_le(bytes, offset); + let coupon = cursor.read_u32_le().map_err(|_| { + Error::insufficient_data(format!( + "expected {coupon_count} coupons, failed at index {i}" + )) + })?; hash_set.update(coupon); } Ok(hash_set) } else { // Non-compact mode: full hash table with empty slots let array_size = 1 << lg_arr; - let expected_len = HASH_SET_INT_ARR_START + (array_size * 4); - if bytes.len() < expected_len { - return Err(Error::insufficient_data(format!( - "expected {}, got {}", - expected_len, - bytes.len() - ))); - } // Read entire hash table including empty slots let mut coupons = vec![0u32; array_size]; for (i, coupon) in coupons.iter_mut().enumerate() { - let offset = HASH_SET_INT_ARR_START + i * COUPON_SIZE_BYTES; - *coupon = read_u32_le(bytes, offset); + *coupon = cursor.read_u32_le().map_err(|_| { + Error::insufficient_data(format!( + "expected {array_size} coupons, failed at index {i}" + )) + })?; } Ok(Self { @@ -148,7 +141,7 @@ impl HashSet { // Compute size let array_size = if compact { coupon_count } else { 1 << lg_arr }; - let total_size = HASH_SET_INT_ARR_START + (array_size * 4); + let total_size = SET_PREAMBLE_SIZE + (array_size * 4); let mut bytes = SketchBytes::with_capacity(total_size); diff --git a/datasketches/src/hll/list.rs b/datasketches/src/hll/list.rs index affba4a..9e40b68 100644 --- a/datasketches/src/hll/list.rs +++ b/datasketches/src/hll/list.rs @@ -20,7 +20,7 @@ //! Provides sequential storage with linear search for duplicates. //! Efficient for small numbers of coupons before transitioning to HashSet. -use crate::codec::SketchBytes; +use crate::codec::{SketchBytes, SketchSlice}; use crate::error::Error; use crate::hll::HllType; use crate::hll::container::COUPON_EMPTY; @@ -67,30 +67,25 @@ impl List { } /// Deserialize a List from bytes - pub fn deserialize(bytes: &[u8], empty: bool, compact: bool) -> Result { - // Read coupon count from byte 6 - let coupon_count = bytes[LIST_COUNT_BYTE] as usize; - + pub fn deserialize( + mut cursor: SketchSlice, + lg_arr: usize, + coupon_count: usize, + empty: bool, + compact: bool, + ) -> Result { // Compute array size - let lg_arr = bytes[LG_ARR_BYTE] as usize; let array_size = if compact { coupon_count } else { 1 << lg_arr }; - // Validate length - let expected_len = LIST_INT_ARR_START + (array_size * 4); - if bytes.len() < expected_len { - return Err(Error::insufficient_data(format!( - "expected {}, got {}", - expected_len, - bytes.len() - ))); - } - // Read coupons let mut coupons = vec![0u32; array_size]; if !empty && coupon_count > 0 { for (i, coupon) in coupons.iter_mut().enumerate() { - let offset = LIST_INT_ARR_START + i * COUPON_SIZE_BYTES; - *coupon = read_u32_le(bytes, offset); + *coupon = cursor.read_u32_le().map_err(|_| { + Error::insufficient_data(format!( + "expect {coupon_count} coupons, failed at index {i}" + )) + })?; } } @@ -108,7 +103,7 @@ impl List { // Compute size let array_size = if compact { coupon_count } else { 1 << lg_arr }; - let total_size = LIST_INT_ARR_START + (array_size * 4); + let total_size = LIST_PREAMBLE_SIZE + (array_size * 4); let mut bytes = SketchBytes::with_capacity(total_size); diff --git a/datasketches/src/hll/serialization.rs b/datasketches/src/hll/serialization.rs index b99a262..b3d22fe 100644 --- a/datasketches/src/hll/serialization.rs +++ b/datasketches/src/hll/serialization.rs @@ -28,76 +28,22 @@ pub const SERIAL_VER: u8 = 1; /// Flag indicating sketch is empty (no values inserted) pub const EMPTY_FLAG_MASK: u8 = 4; - /// Flag indicating compact serialization (no empty slots stored) pub const COMPACT_FLAG_MASK: u8 = 8; - /// Flag indicating out-of-order mode (HIP estimator invalid) pub const OUT_OF_ORDER_FLAG_MASK: u8 = 16; -/// Offset of preamble size field (in 4-byte ints) -pub const PREAMBLE_INTS_BYTE: usize = 0; - -/// Offset of serialization version byte -pub const SER_VER_BYTE: usize = 1; - -/// Offset of family ID byte -pub const FAMILY_BYTE: usize = 2; - -/// Offset of lg_config_k byte -pub const LG_K_BYTE: usize = 3; - -/// Offset of lg_arr (array size) byte -pub const LG_ARR_BYTE: usize = 4; - -/// Offset of flags byte -pub const FLAGS_BYTE: usize = 5; - -/// Offset of mode byte (current mode in low 2 bits, target type in bits 2-3) -pub const MODE_BYTE: usize = 7; - /// Preamble size for LIST mode (8 bytes = 2 ints) pub const LIST_PREINTS: u8 = 2; - /// Preamble size for SET mode (12 bytes = 3 ints) pub const HASH_SET_PREINTS: u8 = 3; - /// Preamble size for HLL mode (40 bytes = 10 ints) pub const HLL_PREINTS: u8 = 10; -/// Offset of coupon count byte in LIST mode -pub const LIST_COUNT_BYTE: usize = 6; - -/// Offset where coupon array starts in LIST mode -pub const LIST_INT_ARR_START: usize = 8; - -/// Offset of coupon count in SET mode (4-byte int) -pub const HASH_SET_COUNT_INT: usize = 8; - -/// Offset where coupon array starts in SET mode -pub const HASH_SET_INT_ARR_START: usize = 12; - -/// Offset of cur_min byte in HLL mode header -pub const HLL_CUR_MIN_BYTE: usize = 6; - -/// Offset of HIP accumulator (8-byte double) in HLL preamble -pub const HIP_ACCUM_DOUBLE: usize = 8; - -/// Offset of KxQ0 register (8-byte double) in HLL preamble -pub const KXQ0_DOUBLE: usize = 16; - -/// Offset of KxQ1 register (8-byte double) in HLL preamble -pub const KXQ1_DOUBLE: usize = 24; - -/// Offset of num_at_cur_min (4-byte int) in HLL preamble -pub const CUR_MIN_COUNT_INT: usize = 32; - -/// Offset of aux_count (4-byte int) in HLL preamble -pub const AUX_COUNT_INT: usize = 36; - -/// Offset where HLL byte array data starts -pub const HLL_BYTE_ARR_START: usize = 40; - +/// Total size of LIST preamble in bytes +pub const LIST_PREAMBLE_SIZE: usize = 8; +/// Total size of LIST preamble in bytes +pub const SET_PREAMBLE_SIZE: usize = 12; /// Total size of HLL preamble in bytes pub const HLL_PREAMBLE_SIZE: usize = 40; @@ -148,47 +94,3 @@ pub const TGT_HLL8: u8 = 2; /// Size of a single coupon in bytes (u32) pub const COUPON_SIZE_BYTES: usize = 4; - -/// Size of a double (f64) in bytes -pub const DOUBLE_SIZE_BYTES: usize = 8; - -/// Size of an int (u32) in bytes -pub const INT_SIZE_BYTES: usize = 4; - -/// Read an u32 value from bytes at the given offset (little-endian) -#[inline] -pub fn read_u32_le(bytes: &[u8], offset: usize) -> u32 { - u32::from_le_bytes([ - bytes[offset], - bytes[offset + 1], - bytes[offset + 2], - bytes[offset + 3], - ]) -} - -/// Read a f64 value from bytes at the given offset (little-endian) -#[inline] -pub fn read_f64_le(bytes: &[u8], offset: usize) -> f64 { - f64::from_le_bytes([ - bytes[offset], - bytes[offset + 1], - bytes[offset + 2], - bytes[offset + 3], - bytes[offset + 4], - bytes[offset + 5], - bytes[offset + 6], - bytes[offset + 7], - ]) -} - -/// Write an u32 value to bytes at the given offset (little-endian) -#[inline] -pub fn write_u32_le(bytes: &mut [u8], offset: usize, value: u32) { - bytes[offset..offset + INT_SIZE_BYTES].copy_from_slice(&value.to_le_bytes()); -} - -/// Write a f64 value to bytes at the given offset (little-endian) -#[inline] -pub fn write_f64_le(bytes: &mut [u8], offset: usize, value: f64) { - bytes[offset..offset + DOUBLE_SIZE_BYTES].copy_from_slice(&value.to_le_bytes()); -} diff --git a/datasketches/src/hll/sketch.rs b/datasketches/src/hll/sketch.rs index a5f0aac..5109f34 100644 --- a/datasketches/src/hll/sketch.rs +++ b/datasketches/src/hll/sketch.rs @@ -20,8 +20,7 @@ //! This module provides the main [`HllSketch`] struct, which is the primary interface //! for creating and using HLL sketches for cardinality estimation. -use std::hash::Hash; - +use crate::codec::SketchSlice; use crate::error::Error; use crate::hll::HllType; use crate::hll::NumStdDev; @@ -36,6 +35,7 @@ use crate::hll::hash_set::HashSet; use crate::hll::list::List; use crate::hll::mode::Mode; use crate::hll::serialization::*; +use std::hash::Hash; /// A HyperLogLog sketch. /// @@ -213,19 +213,26 @@ impl HllSketch { /// Deserializes an HLL sketch from bytes pub fn deserialize(bytes: &[u8]) -> Result { - if bytes.len() < 8 { - return Err(Error::insufficient_data( - "sketch data too short (< 8 bytes)", - )); + fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) -> Error { + move |_| Error::insufficient_data(tag) } + let mut cursor = SketchSlice::new(bytes); + // Read and validate preamble - let preamble_ints = bytes[PREAMBLE_INTS_BYTE]; - let serial_ver = bytes[SER_VER_BYTE]; - let family_id = bytes[FAMILY_BYTE]; - let lg_config_k = bytes[LG_K_BYTE]; - let flags = bytes[FLAGS_BYTE]; - let mode_byte = bytes[MODE_BYTE]; + let preamble_ints = cursor.read_u8().map_err(make_error("preamble_ints"))?; + let serial_version = cursor.read_u8().map_err(make_error("serial_version"))?; + let family_id = cursor.read_u8().map_err(make_error("family_id"))?; + let lg_config_k = cursor.read_u8().map_err(make_error("lg_config_k"))?; + // lg_arr used in List/Set modes + let lg_arr = cursor.read_u8().map_err(make_error("lg_arr"))?; + let flags = cursor.read_u8().map_err(make_error("flags"))?; + // The contextual state byte: + // * coupon count in LIST mode + // * cur_min in HLL mode + // * unused in other modes + let state = cursor.read_u8().map_err(make_error("state"))?; + let mode_byte = cursor.read_u8().map_err(make_error("mode"))?; // Verify family ID if family_id != HLL_FAMILY_ID { @@ -233,8 +240,11 @@ impl HllSketch { } // Verify serialization version - if serial_ver != SERIAL_VER { - return Err(Error::unsupported_serial_version(SERIAL_VER, serial_ver)); + if serial_version != SERIAL_VER { + return Err(Error::unsupported_serial_version( + SERIAL_VER, + serial_version, + )); } // Verify lg_k range (4-21 are valid) @@ -268,7 +278,9 @@ impl HllSketch { ))); } - let list = List::deserialize(bytes, empty, compact)?; + let lg_arr = lg_arr as usize; + let coupon_count = state as usize; + let list = List::deserialize(cursor, lg_arr, coupon_count, empty, compact)?; Mode::List { list, hll_type } } CUR_MODE_SET => { @@ -279,7 +291,8 @@ impl HllSketch { ))); } - let set = HashSet::deserialize(bytes, compact)?; + let lg_arr = lg_arr as usize; + let set = HashSet::deserialize(cursor, lg_arr, compact)?; Mode::Set { set, hll_type } } CUR_MODE_HLL => { @@ -291,11 +304,14 @@ impl HllSketch { } match hll_type { - HllType::Hll4 => Array4::deserialize(bytes, lg_config_k, compact, ooo) - .map(Mode::Array4)?, - HllType::Hll6 => Array6::deserialize(bytes, lg_config_k, compact, ooo) + HllType::Hll4 => { + let cur_min = state; + Array4::deserialize(cursor, cur_min, lg_config_k, compact, ooo) + .map(Mode::Array4)? + } + HllType::Hll6 => Array6::deserialize(cursor, lg_config_k, compact, ooo) .map(Mode::Array6)?, - HllType::Hll8 => Array8::deserialize(bytes, lg_config_k, compact, ooo) + HllType::Hll8 => Array8::deserialize(cursor, lg_config_k, compact, ooo) .map(Mode::Array8)?, } } diff --git a/datasketches/src/tdigest/sketch.rs b/datasketches/src/tdigest/sketch.rs index 458d1c6..0bb5f7f 100644 --- a/datasketches/src/tdigest/sketch.rs +++ b/datasketches/src/tdigest/sketch.rs @@ -21,7 +21,6 @@ use crate::error::ErrorKind; use crate::tdigest::serialization::*; use std::cmp::Ordering; use std::convert::identity; -use std::io::Cursor; use std::num::NonZeroU64; /// The default value of K if one is not specified. diff --git a/datasketches/tests/hll_serialization_test.rs b/datasketches/tests/hll_serialization_test.rs index fc1969c..a703a23 100644 --- a/datasketches/tests/hll_serialization_test.rs +++ b/datasketches/tests/hll_serialization_test.rs @@ -77,7 +77,13 @@ fn test_sketch_file(path: PathBuf, expected_cardinality: usize, expected_lg_k: u // Serialize and deserialize again to test round-trip let serialized_bytes = sketch1.serialize(); - let sketch2 = HllSketch::deserialize(&serialized_bytes).unwrap(); + let sketch2 = HllSketch::deserialize(&serialized_bytes).unwrap_or_else(|err| { + panic!( + "Deserialization failed after round-trip for {}: {}", + path.display(), + err + ) + }); // Check that both sketches are functionally equivalent assert_eq!( From 3e8848fde5b5a8e39f6694d5b2e3b52d4002d88e Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 30 Dec 2025 22:10:47 +0800 Subject: [PATCH 4/5] fixup Signed-off-by: tison --- datasketches/src/hll/array6.rs | 1 + datasketches/src/hll/array8.rs | 5 +++-- datasketches/src/hll/hash_set.rs | 3 +++ datasketches/src/hll/sketch.rs | 2 +- datasketches/tests/hll_serialization_test.rs | 2 +- 5 files changed, 9 insertions(+), 4 deletions(-) diff --git a/datasketches/src/hll/array6.rs b/datasketches/src/hll/array6.rs index 0bba28a..c78d2bd 100644 --- a/datasketches/src/hll/array6.rs +++ b/datasketches/src/hll/array6.rs @@ -189,6 +189,7 @@ impl Array6 { // Read num_at_cur_min (for Array6, this is num_zeros since cur_min=0) let num_zeros = cursor.read_u32_le().map_err(make_error("num_zeros"))?; + let _aux_count = cursor.read_u32_le().map_err(make_error("aux_count"))?; // always 0 // Read packed byte array from offset HLL_BYTE_ARR_START let mut data = vec![0u8; num_bytes]; diff --git a/datasketches/src/hll/array8.rs b/datasketches/src/hll/array8.rs index 1f1d8f4..4512dd7 100644 --- a/datasketches/src/hll/array8.rs +++ b/datasketches/src/hll/array8.rs @@ -253,7 +253,7 @@ impl Array8 { move |_| Error::insufficient_data(tag) } - let k = 1 << lg_config_k; + let k = 1usize << lg_config_k; // Read HIP estimator values from preamble let hip_accum = cursor.read_f64_le().map_err(make_error("hip_accum"))?; @@ -262,9 +262,10 @@ impl Array8 { // Read num_at_cur_min (for Array8, this is num_zeros since cur_min=0) let num_zeros = cursor.read_u32_le().map_err(make_error("num_zeros"))?; + let _aux_count = cursor.read_u32_le().map_err(make_error("aux_count"))?; // always 0 // Read byte array from offset HLL_BYTE_ARR_START - let mut data = vec![0u8; k as usize]; + let mut data = vec![0u8; k]; if !compact { cursor.read_exact(&mut data).map_err(make_error("data"))?; } else { diff --git a/datasketches/src/hll/hash_set.rs b/datasketches/src/hll/hash_set.rs index 9e7f1af..98b9935 100644 --- a/datasketches/src/hll/hash_set.rs +++ b/datasketches/src/hll/hash_set.rs @@ -159,6 +159,9 @@ impl HashSet { } bytes.write_u8(flags); + // Write unused byte + bytes.write_u8(0); + // Write mode byte: SET mode with target HLL type bytes.write_u8(encode_mode_byte(CUR_MODE_SET, hll_type as u8)); diff --git a/datasketches/src/hll/sketch.rs b/datasketches/src/hll/sketch.rs index 5109f34..9073d7b 100644 --- a/datasketches/src/hll/sketch.rs +++ b/datasketches/src/hll/sketch.rs @@ -230,7 +230,7 @@ impl HllSketch { // The contextual state byte: // * coupon count in LIST mode // * cur_min in HLL mode - // * unused in other modes + // * unused in SET mode let state = cursor.read_u8().map_err(make_error("state"))?; let mode_byte = cursor.read_u8().map_err(make_error("mode"))?; diff --git a/datasketches/tests/hll_serialization_test.rs b/datasketches/tests/hll_serialization_test.rs index a703a23..9c8200f 100644 --- a/datasketches/tests/hll_serialization_test.rs +++ b/datasketches/tests/hll_serialization_test.rs @@ -87,8 +87,8 @@ fn test_sketch_file(path: PathBuf, expected_cardinality: usize, expected_lg_k: u // Check that both sketches are functionally equivalent assert_eq!( - sketch2.lg_config_k(), sketch1.lg_config_k(), + sketch2.lg_config_k(), "lg_config_k mismatch after round-trip for {}", path.display() ); From 12e36c32115d3621851081ded87f69796b73ba63 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 30 Dec 2025 22:14:01 +0800 Subject: [PATCH 5/5] fmt Signed-off-by: tison --- datasketches/src/codec.rs | 5 +++-- datasketches/src/countmin/sketch.rs | 11 ++++++----- datasketches/src/hll/array4.rs | 3 ++- datasketches/src/hll/array6.rs | 3 ++- datasketches/src/hll/array8.rs | 3 ++- datasketches/src/hll/hash_set.rs | 3 ++- datasketches/src/hll/list.rs | 3 ++- datasketches/src/hll/serialization.rs | 2 +- datasketches/src/hll/sketch.rs | 3 ++- datasketches/src/tdigest/sketch.rs | 7 ++++--- 10 files changed, 26 insertions(+), 17 deletions(-) diff --git a/datasketches/src/codec.rs b/datasketches/src/codec.rs index a057f86..4df7b22 100644 --- a/datasketches/src/codec.rs +++ b/datasketches/src/codec.rs @@ -18,7 +18,8 @@ #![allow(dead_code)] use std::io; -use std::io::{Cursor, Read}; +use std::io::Cursor; +use std::io::Read; pub(crate) struct SketchBytes { bytes: Vec, @@ -117,7 +118,7 @@ pub(crate) struct SketchSlice<'a> { } impl SketchSlice<'_> { - pub fn new(slice: &[u8]) -> SketchSlice { + pub fn new(slice: &[u8]) -> SketchSlice<'_> { SketchSlice { slice: Cursor::new(slice), } diff --git a/datasketches/src/countmin/sketch.rs b/datasketches/src/countmin/sketch.rs index dae4ad4..ca08bff 100644 --- a/datasketches/src/countmin/sketch.rs +++ b/datasketches/src/countmin/sketch.rs @@ -15,7 +15,12 @@ // specific language governing permissions and limitations // under the License. -use crate::codec::{SketchBytes, SketchSlice}; +use std::hash::Hash; +use std::hash::Hasher; +use std::mem::size_of; + +use crate::codec::SketchBytes; +use crate::codec::SketchSlice; use crate::countmin::serialization::COUNTMIN_FAMILY_ID; use crate::countmin::serialization::FLAGS_IS_EMPTY; use crate::countmin::serialization::LONG_SIZE_BYTES; @@ -26,10 +31,6 @@ use crate::error::Error; use crate::hash::DEFAULT_UPDATE_SEED; use crate::hash::MurmurHash3X64128; -use std::hash::Hash; -use std::hash::Hasher; -use std::mem::size_of; - const MAX_TABLE_ENTRIES: usize = 1 << 30; /// Count-Min sketch for estimating item frequencies. diff --git a/datasketches/src/hll/array4.rs b/datasketches/src/hll/array4.rs index 85f8b26..3944a41 100644 --- a/datasketches/src/hll/array4.rs +++ b/datasketches/src/hll/array4.rs @@ -21,7 +21,8 @@ //! When values exceed 4 bits after cur_min offset, they're stored in an auxiliary hash map. use super::aux_map::AuxMap; -use crate::codec::{SketchBytes, SketchSlice}; +use crate::codec::SketchBytes; +use crate::codec::SketchSlice; use crate::error::Error; use crate::hll::NumStdDev; use crate::hll::estimator::HipEstimator; diff --git a/datasketches/src/hll/array6.rs b/datasketches/src/hll/array6.rs index c78d2bd..f247a67 100644 --- a/datasketches/src/hll/array6.rs +++ b/datasketches/src/hll/array6.rs @@ -21,7 +21,8 @@ //! This is sufficient for most HLL use cases without needing exception handling or //! cur_min optimization like Array4. -use crate::codec::{SketchBytes, SketchSlice}; +use crate::codec::SketchBytes; +use crate::codec::SketchSlice; use crate::error::Error; use crate::hll::NumStdDev; use crate::hll::estimator::HipEstimator; diff --git a/datasketches/src/hll/array8.rs b/datasketches/src/hll/array8.rs index 4512dd7..3ac1f0c 100644 --- a/datasketches/src/hll/array8.rs +++ b/datasketches/src/hll/array8.rs @@ -20,7 +20,8 @@ //! Array8 is the simplest HLL array implementation, storing one byte per slot. //! This provides the maximum value range (0-255) with no bit-packing complexity. -use crate::codec::{SketchBytes, SketchSlice}; +use crate::codec::SketchBytes; +use crate::codec::SketchSlice; use crate::error::Error; use crate::hll::NumStdDev; use crate::hll::estimator::HipEstimator; diff --git a/datasketches/src/hll/hash_set.rs b/datasketches/src/hll/hash_set.rs index 98b9935..1a31031 100644 --- a/datasketches/src/hll/hash_set.rs +++ b/datasketches/src/hll/hash_set.rs @@ -20,7 +20,8 @@ //! Uses open addressing with a custom stride function to handle collisions. //! Provides better performance than List when many coupons are stored. -use crate::codec::{SketchBytes, SketchSlice}; +use crate::codec::SketchBytes; +use crate::codec::SketchSlice; use crate::error::Error; use crate::hll::HllType; use crate::hll::KEY_MASK_26; diff --git a/datasketches/src/hll/list.rs b/datasketches/src/hll/list.rs index 9e40b68..2fa9173 100644 --- a/datasketches/src/hll/list.rs +++ b/datasketches/src/hll/list.rs @@ -20,7 +20,8 @@ //! Provides sequential storage with linear search for duplicates. //! Efficient for small numbers of coupons before transitioning to HashSet. -use crate::codec::{SketchBytes, SketchSlice}; +use crate::codec::SketchBytes; +use crate::codec::SketchSlice; use crate::error::Error; use crate::hll::HllType; use crate::hll::container::COUPON_EMPTY; diff --git a/datasketches/src/hll/serialization.rs b/datasketches/src/hll/serialization.rs index b3d22fe..30f034f 100644 --- a/datasketches/src/hll/serialization.rs +++ b/datasketches/src/hll/serialization.rs @@ -42,7 +42,7 @@ pub const HLL_PREINTS: u8 = 10; /// Total size of LIST preamble in bytes pub const LIST_PREAMBLE_SIZE: usize = 8; -/// Total size of LIST preamble in bytes +/// Total size of SET preamble in bytes pub const SET_PREAMBLE_SIZE: usize = 12; /// Total size of HLL preamble in bytes pub const HLL_PREAMBLE_SIZE: usize = 40; diff --git a/datasketches/src/hll/sketch.rs b/datasketches/src/hll/sketch.rs index 9073d7b..ef76647 100644 --- a/datasketches/src/hll/sketch.rs +++ b/datasketches/src/hll/sketch.rs @@ -20,6 +20,8 @@ //! This module provides the main [`HllSketch`] struct, which is the primary interface //! for creating and using HLL sketches for cardinality estimation. +use std::hash::Hash; + use crate::codec::SketchSlice; use crate::error::Error; use crate::hll::HllType; @@ -35,7 +37,6 @@ use crate::hll::hash_set::HashSet; use crate::hll::list::List; use crate::hll::mode::Mode; use crate::hll::serialization::*; -use std::hash::Hash; /// A HyperLogLog sketch. /// diff --git a/datasketches/src/tdigest/sketch.rs b/datasketches/src/tdigest/sketch.rs index 0bb5f7f..c9ec382 100644 --- a/datasketches/src/tdigest/sketch.rs +++ b/datasketches/src/tdigest/sketch.rs @@ -15,13 +15,14 @@ // specific language governing permissions and limitations // under the License. +use std::cmp::Ordering; +use std::convert::identity; +use std::num::NonZeroU64; + use crate::codec::SketchSlice; use crate::error::Error; use crate::error::ErrorKind; use crate::tdigest::serialization::*; -use std::cmp::Ordering; -use std::convert::identity; -use std::num::NonZeroU64; /// The default value of K if one is not specified. const DEFAULT_K: u16 = 200;