Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 118 additions & 15 deletions vortex-array/src/aggregate_fn/fns/bounded_max/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
use std::fmt::Display;
use std::fmt::Formatter;
use std::num::NonZeroUsize;
use std::sync::LazyLock;

use vortex_buffer::BufferString;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_session::VortexSession;

Expand All @@ -24,12 +26,24 @@ use crate::aggregate_fn::EmptyOptions;
use crate::aggregate_fn::fns::max::Max;
use crate::aggregate_fn::fns::min_max::MinMax;
use crate::aggregate_fn::fns::min_max::min_max;
use crate::builtins::ArrayBuiltins;
use crate::dtype::DType;
use crate::dtype::FieldNames;
use crate::dtype::Nullability;
use crate::dtype::StructFields;
use crate::partial_ord::partial_max;
use crate::scalar::Scalar;
use crate::scalar::ScalarTruncation;
use crate::scalar::upper_bound;

/// Field name for the bounded maximum upper-bound value in the partial state.
pub const BOUNDED_MAX_BOUND: &str = "bound";
/// Field name for whether the partial state represents an unknown upper bound.
pub const BOUNDED_MAX_UNKNOWN: &str = "unknown";

static NAMES: LazyLock<FieldNames> =
LazyLock::new(|| FieldNames::from([BOUNDED_MAX_BOUND, BOUNDED_MAX_UNKNOWN]));

/// Options for [`BoundedMax`].
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct BoundedMaxOptions {
Expand Down Expand Up @@ -61,12 +75,8 @@ pub struct BoundedMaxPartial {
}

impl BoundedMaxPartial {
fn merge(&mut self, max: Scalar) {
fn merge_bound(&mut self, max: Scalar) {
if max.is_null() {
// Serialized partials encode both empty input and unknown upper bounds as null.
// Treat null as unknown when merging; this may lose a bound from an empty shard, but
// it preserves pruning soundness.
self.state = BoundedMaxState::Unknown;
return;
}

Expand All @@ -82,6 +92,32 @@ impl BoundedMaxPartial {
fn unknown(&mut self) {
self.state = BoundedMaxState::Unknown;
}

fn final_scalar(&self) -> VortexResult<Scalar> {
let dtype = self.element_dtype.as_nullable();
match &self.state {
BoundedMaxState::Value(max) => max.cast(&dtype),
BoundedMaxState::Empty | BoundedMaxState::Unknown => Ok(Scalar::null(dtype)),
}
}
}

/// Return the serialized partial-state dtype for [`BoundedMax`].
///
/// A null struct means the partial is empty. A non-null struct with a null `bound` and
/// `unknown = true` means the input has a non-null maximum but no finite upper bound could be
/// represented within the configured byte limit.
pub fn make_bounded_max_partial_dtype(element_dtype: &DType) -> DType {
DType::Struct(
StructFields::new(
NAMES.clone(),
vec![
element_dtype.as_nullable(),
DType::Bool(Nullability::NonNullable),
],
),
Nullability::Nullable,
)
}

impl AggregateFnVTable for BoundedMax {
Expand Down Expand Up @@ -144,7 +180,7 @@ impl AggregateFnVTable for BoundedMax {
}

fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
self.return_dtype(options, input_dtype)
supported_dtype(options, input_dtype).map(make_bounded_max_partial_dtype)
}

fn empty_partial(
Expand All @@ -160,15 +196,50 @@ impl AggregateFnVTable for BoundedMax {
}

fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
partial.merge(other);
if other.is_null() {
return Ok(());
}

let Some(other) = other.as_struct_opt() else {
vortex_bail!("BoundedMax partial must be a struct, got {}", other.dtype());
};
let Some(bound) = other.field_by_idx(0) else {
vortex_bail!("BoundedMax partial is missing its bound field");
};
let Some(unknown) = other
.field_by_idx(1)
.and_then(|unknown| unknown.as_bool().value())
else {
vortex_bail!("BoundedMax partial is missing its non-null unknown field");
};

if unknown {
partial.unknown();
} else {
partial.merge_bound(bound);
}
Ok(())
}

fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
let dtype = partial.element_dtype.as_nullable();
let dtype = make_bounded_max_partial_dtype(&partial.element_dtype);
let bound_dtype = partial.element_dtype.as_nullable();
match &partial.state {
BoundedMaxState::Value(max) => max.cast(&dtype),
BoundedMaxState::Empty | BoundedMaxState::Unknown => Ok(Scalar::null(dtype)),
BoundedMaxState::Empty => Ok(Scalar::null(dtype)),
BoundedMaxState::Value(max) => Ok(Scalar::struct_(
dtype,
vec![
max.cast(&bound_dtype)?,
Scalar::bool(false, Nullability::NonNullable),
],
)),
BoundedMaxState::Unknown => Ok(Scalar::struct_(
dtype,
vec![
Scalar::null(bound_dtype),
Scalar::bool(true, Nullability::NonNullable),
],
)),
}
}

Expand Down Expand Up @@ -196,18 +267,18 @@ impl AggregateFnVTable for BoundedMax {
return Ok(());
};
match truncate_max(result.max, partial.max_bytes.get())? {
Some(bound) => partial.merge(bound),
Some(bound) => partial.merge_bound(bound),
None => partial.unknown(),
}
Ok(())
}

fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
Ok(partials)
partials.get_item(BOUNDED_MAX_BOUND)
}

fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
self.to_scalar(partial)
partial.final_scalar()
}
}

Expand Down Expand Up @@ -256,6 +327,7 @@ mod tests {
use crate::aggregate_fn::EmptyOptions;
use crate::aggregate_fn::fns::bounded_max::BoundedMax;
use crate::aggregate_fn::fns::bounded_max::BoundedMaxOptions;
use crate::aggregate_fn::fns::bounded_max::make_bounded_max_partial_dtype;
use crate::aggregate_fn::fns::max::Max;
use crate::aggregate_fn::fns::min::Min;
use crate::arrays::PrimitiveArray;
Expand Down Expand Up @@ -352,7 +424,29 @@ mod tests {
}

#[test]
fn bounded_max_null_partial_poisons_existing_bound() -> VortexResult<()> {
fn bounded_max_empty_partial_does_not_poison_existing_bound() -> VortexResult<()> {
let mut ctx = fresh_session().create_execution_ctx();
let values = VarBinViewArray::from_iter_bin([&[1u8][..]]).into_array();
let mut acc = Accumulator::try_new(
BoundedMax,
BoundedMaxOptions {
max_bytes: max_bytes(2),
},
values.dtype().clone(),
)?;

acc.accumulate(&values, &mut ctx)?;
acc.combine_partials(Scalar::null(make_bounded_max_partial_dtype(values.dtype())))?;

assert_eq!(
acc.finish()?,
Scalar::binary(buffer![1u8], Nullability::Nullable)
);
Ok(())
}

#[test]
fn bounded_max_unknown_partial_poisons_existing_bound() -> VortexResult<()> {
let mut ctx = fresh_session().create_execution_ctx();
let values = VarBinViewArray::from_iter_bin([&[1u8][..]]).into_array();
let mut acc = Accumulator::try_new(
Expand All @@ -363,8 +457,17 @@ mod tests {
values.dtype().clone(),
)?;

let partial_dtype = make_bounded_max_partial_dtype(values.dtype());
let unknown = Scalar::struct_(
partial_dtype,
vec![
Scalar::null(values.dtype().as_nullable()),
Scalar::bool(true, Nullability::NonNullable),
],
);

acc.accumulate(&values, &mut ctx)?;
acc.combine_partials(Scalar::null(values.dtype().as_nullable()))?;
acc.combine_partials(unknown)?;

assert_eq!(acc.finish()?, Scalar::null(values.dtype().as_nullable()));
Ok(())
Expand Down
78 changes: 78 additions & 0 deletions vortex-file/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1988,3 +1988,81 @@ async fn test_can_prune_composite_predicates() -> VortexResult<()> {

Ok(())
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn repro_8166_binary_gt_all_ff_max() -> VortexResult<()> {
use vortex_buffer::ByteBuffer;

let mut ctx = SESSION.create_execution_ctx();

let empty: Vec<u8> = vec![];
let chunk0: Vec<Vec<u8>> = vec![
vec![0x1d, 0x00],
empty.clone(),
vec![0x1d, 0x10, 0x9d, 0x08],
empty.clone(),
empty.clone(),
empty.clone(),
empty.clone(),
empty.clone(),
empty.clone(),
];
let chunk1: Vec<Vec<u8>> = vec![
empty.clone(),
empty.clone(),
vec![0x40],
empty.clone(),
empty.clone(),
empty.clone(),
empty.clone(),
empty.clone(),
vec![0x24],
vec![0x43, 0xff],
];
let mut big = vec![0xffu8; 112];
big[89] = 0x03;
let mut chunk2: Vec<Vec<u8>> = vec![empty.clone(); 10];
chunk2[8] = big;

let bin = DType::Binary(Nullability::NonNullable);
let mk_struct = |vals: Vec<Vec<u8>>| -> VortexResult<ArrayRef> {
let yyw = VarBinArray::from_vec(vals, bin.clone()).into_array();
Ok(StructArray::from_fields(&[("yyw", yyw)])?.into_array())
};
let array =
ChunkedArray::from_iter([mk_struct(chunk0)?, mk_struct(chunk1)?, mk_struct(chunk2)?])
.into_array();

let mut buf = ByteBufferMut::empty();
SESSION
.write_options()
.write(&mut buf, array.to_array_stream())
.await?;

let mut literal = vec![0x6fu8; 5];
literal.extend(iter::repeat_n(0xffu8, 57));
literal.push(0x98);
assert_eq!(literal.len(), 63);

let filter = gt(
get_item("yyw", root()),
lit(Scalar::binary(
ByteBuffer::from(literal),
Nullability::NonNullable,
)),
);

let result = SESSION
.open_options()
.open_buffer(buf)?
.scan()?
.with_filter(filter)
.into_array_stream()?
.read_all()
.await?
.execute::<StructArray>(&mut ctx)?;

assert_eq!(result.len(), 1);
Ok(())
}
9 changes: 9 additions & 0 deletions vortex-layout/src/layouts/file_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ impl StatsAccumulator {
};

match stat {
Stat::Max if is_varlen_dtype(array.dtype()) && !array.all_valid(ctx)? => {
// A null truncated varlen max can mean either an empty chunk or no finite
// upper bound, so aggregating by skipping nulls would be unsound.
continue;
}
Stat::Min | Stat::Max | Stat::Sum => {
if let Some(s) = array.statistics().compute_stat(stat, ctx)?
&& let Some(v) = s.into_value()
Expand All @@ -182,6 +187,10 @@ fn supports_file_stats(dtype: &DType) -> bool {
!matches!(dtype, DType::Variant(_))
}

fn is_varlen_dtype(dtype: &DType) -> bool {
matches!(dtype, DType::Utf8(_) | DType::Binary(_))
}

fn stats_builder_with_capacity(
stat: Stat,
dtype: &DType,
Expand Down
Loading
Loading