Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion datasketches/src/frequencies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
//! <https://apache.github.io/datasketches-java/9.0.0/org/apache/datasketches/frequencies/FrequentItemsSketch.html>

mod reverse_purge_item_hash_map;
mod serde;
mod serialization;
mod sketch;

Expand Down
98 changes: 0 additions & 98 deletions datasketches/src/frequencies/serde.rs

This file was deleted.

121 changes: 57 additions & 64 deletions datasketches/src/frequencies/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
// specific language governing permissions and limitations
// under the License.

//! Serialization constants and helpers for frequency sketches.
use crate::codec::SketchBytes;
use crate::codec::SketchSlice;
use crate::error::Error;

/// Family ID for frequency sketches.
pub const FAMILY_ID: u8 = 10;
/// Serialization version.
pub const SER_VER: u8 = 1;
pub const SERIAL_VERSION: u8 = 1;

/// Preamble longs for empty sketch.
pub const PREAMBLE_LONGS_EMPTY: u8 = 1;
Expand All @@ -30,75 +32,66 @@ pub const PREAMBLE_LONGS_NONEMPTY: u8 = 4;
/// Empty flag mask (both bits for compatibility).
pub const EMPTY_FLAG_MASK: u8 = 5;

/// Offset of preamble longs byte.
pub const PREAMBLE_LONGS_BYTE: usize = 0;
/// Offset of serialization version byte.
pub const SER_VER_BYTE: usize = 1;
/// Offset of family ID byte.
pub const FAMILY_BYTE: usize = 2;
/// Offset of lg_max_map_size byte.
pub const LG_MAX_MAP_SIZE_BYTE: usize = 3;
/// Offset of lg_cur_map_size byte.
pub const LG_CUR_MAP_SIZE_BYTE: usize = 4;
/// Offset of flags byte.
pub const FLAGS_BYTE: usize = 5;

/// Offset of active items int (low 32 bits of second pre-long).
pub const ACTIVE_ITEMS_INT: usize = 8;
/// Offset of stream weight (third pre-long).
pub const STREAM_WEIGHT_LONG: usize = 16;
/// Offset of offset (fourth pre-long).
pub const OFFSET_LONG: usize = 24;
pub(crate) fn count_string_items_bytes(items: &[String]) -> usize {
items.iter().map(|item| 4 + item.len()).sum()
}

/// Read an u32 value from bytes at the given offset (little-endian).
#[inline]
pub fn read_u32_le(bytes: &[u8], offset: usize) -> u32 {
u32::from_le_bytes([
bytes[offset],
bytes[offset + 1],
bytes[offset + 2],
bytes[offset + 3],
])
pub(crate) fn serialize_string_items(bytes: &mut SketchBytes, items: &[String]) {
for item in items {
let bs = item.as_bytes();
bytes.write_u32_le(bs.len() as u32);
bytes.write(bs);
}
}

/// Read an i64 value from bytes at the given offset (little-endian).
#[inline]
pub fn read_i64_le(bytes: &[u8], offset: usize) -> i64 {
i64::from_le_bytes([
bytes[offset],
bytes[offset + 1],
bytes[offset + 2],
bytes[offset + 3],
bytes[offset + 4],
bytes[offset + 5],
bytes[offset + 6],
bytes[offset + 7],
])
pub(crate) fn deserialize_string_items(
mut cursor: SketchSlice<'_>,
num_items: usize,
) -> Result<Vec<String>, Error> {
let mut items = Vec::with_capacity(num_items);
for i in 0..num_items {
let len = cursor.read_u32_le().map_err(|_| {
Error::insufficient_data(format!(
"expected {num_items} string items, failed to read len at index {i}"
))
})?;

let mut slice = vec![0; len as usize];
cursor.read_exact(&mut slice).map_err(|_| {
Error::insufficient_data(format!(
"expected {num_items} string items, failed to read slice at index {i}"
))
})?;

let value = String::from_utf8(slice)
.map_err(|_| Error::deserial(format!("invalid UTF-8 string payload at index {i}")))?;
items.push(value);
}
Ok(items)
}

/// Read an u64 value from bytes at the given offset (little-endian).
#[inline]
pub fn read_u64_le(bytes: &[u8], offset: usize) -> u64 {
u64::from_le_bytes([
bytes[offset],
bytes[offset + 1],
bytes[offset + 2],
bytes[offset + 3],
bytes[offset + 4],
bytes[offset + 5],
bytes[offset + 6],
bytes[offset + 7],
])
pub(crate) fn count_i64_items_bytes(items: &[i64]) -> usize {
items.len() * 8
}

/// Write a u32 value to bytes at the given offset (little-endian).
#[inline]
pub fn write_u32_le(bytes: &mut [u8], offset: usize, value: u32) {
bytes[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
pub(crate) fn serialize_i64_items(bytes: &mut SketchBytes, items: &[i64]) {
for item in items.iter().copied() {
bytes.write_i64_le(item);
}
}

/// Write an u64 value to bytes at the given offset (little-endian).
#[inline]
pub fn write_u64_le(bytes: &mut [u8], offset: usize, value: u64) {
bytes[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
pub(crate) fn deserialize_i64_items(
mut cursor: SketchSlice<'_>,
num_items: usize,
) -> Result<Vec<i64>, Error> {
let mut items = Vec::with_capacity(num_items);
for i in 0..num_items {
let value = cursor.read_i64_le().map_err(|_| {
Error::insufficient_data(format!(
"expected {num_items} i64 items, failed at index {i}"
))
})?;
items.push(value);
}
Ok(items)
}
Loading