Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions encodings/runend/src/compute/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ impl DynAggregateKernel for RunEndMinMaxKernel {
batch: &ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<Scalar>> {
if !aggregate_fn.is::<MinMax>() {
let Some(options) = aggregate_fn.as_opt::<MinMax>() else {
return Ok(None);
}
};

let Some(run_end) = batch.as_opt::<RunEnd>() else {
return Ok(None);
};

let struct_dtype = make_minmax_dtype(batch.dtype());
match min_max(run_end.values(), ctx)? {
match min_max(run_end.values(), ctx, *options)? {
Some(result) => Ok(Some(Scalar::struct_(
struct_dtype,
vec![result.min, result.max],
Expand Down
5 changes: 4 additions & 1 deletion encodings/sparse/benches/sparse_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::aggregate_fn::SkipNansOptions;
use vortex_array::aggregate_fn::fns::is_constant::is_constant;
use vortex_array::aggregate_fn::fns::min_max::min_max;
use vortex_array::aggregate_fn::fns::null_count::null_count;
Expand Down Expand Up @@ -106,7 +107,9 @@ fn sparse_min_max(bencher: Bencher) {
bencher
.with_inputs(|| (make_sparse(40_000, false), SESSION.create_execution_ctx()))
.bench_values(|(array, mut ctx)| {
divan::black_box(min_max(&array, &mut ctx).vortex_expect("min_max"))
divan::black_box(
min_max(&array, &mut ctx, SkipNansOptions::default()).vortex_expect("min_max"),
)
});
}

Expand Down
24 changes: 16 additions & 8 deletions encodings/sparse/src/compute/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use vortex_array::IntoArray;
use vortex_array::aggregate_fn::Accumulator;
use vortex_array::aggregate_fn::AggregateFnRef;
use vortex_array::aggregate_fn::DynAccumulator;
use vortex_array::aggregate_fn::EmptyOptions;
use vortex_array::aggregate_fn::fns::min_max::MinMax;
use vortex_array::aggregate_fn::kernels::DynAggregateKernel;
use vortex_array::arrays::ConstantArray;
Expand All @@ -32,17 +31,17 @@ impl DynAggregateKernel for SparseMinMaxKernel {
batch: &ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<Scalar>> {
if !aggregate_fn.is::<MinMax>() {
let Some(options) = aggregate_fn.as_opt::<MinMax>() else {
return Ok(None);
}
};

let Some(sparse) = batch.as_opt::<Sparse>() else {
return Ok(None);
};

let patches = sparse.patches();

let mut acc = Accumulator::try_new(MinMax, EmptyOptions, batch.dtype().clone())?;
let mut acc = Accumulator::try_new(MinMax, *options, batch.dtype().clone())?;

if !patches.values().is_empty() {
acc.accumulate(patches.values(), ctx)?;
Expand All @@ -66,6 +65,7 @@ mod tests {
use rstest::rstest;
use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::aggregate_fn::SkipNansOptions;
use vortex_array::aggregate_fn::fns::min_max::MinMaxResult;
use vortex_array::aggregate_fn::fns::min_max::min_max;
use vortex_array::scalar::Scalar;
Expand Down Expand Up @@ -100,10 +100,18 @@ mod tests {
#[case(Sparse::try_new(buffer![0u64, 1, 2].into_array(), buffer![7i32, 3, 9].into_array(), 3, Scalar::from(99i32)).unwrap())]
fn min_max_matches_canonical(#[case] array: SparseArray) {
let arr = array.into_array();
let kernel: Option<MinMaxResult> =
min_max(&arr, &mut SESSION.create_execution_ctx()).unwrap();
let canonical: Option<MinMaxResult> =
min_max(&arr, &mut CANONICAL_SESSION.create_execution_ctx()).unwrap();
let kernel: Option<MinMaxResult> = min_max(
&arr,
&mut SESSION.create_execution_ctx(),
SkipNansOptions::default(),
)
.unwrap();
let canonical: Option<MinMaxResult> = min_max(
&arr,
&mut CANONICAL_SESSION.create_execution_ctx(),
SkipNansOptions::default(),
)
.unwrap();
assert_eq!(kernel, canonical);
}
}
9 changes: 4 additions & 5 deletions encodings/sparse/src/compute/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use vortex_array::IntoArray;
use vortex_array::aggregate_fn::Accumulator;
use vortex_array::aggregate_fn::AggregateFnRef;
use vortex_array::aggregate_fn::DynAccumulator;
use vortex_array::aggregate_fn::EmptyOptions;
use vortex_array::aggregate_fn::fns::sum::Sum;
use vortex_array::aggregate_fn::kernels::DynAggregateKernel;
use vortex_array::arrays::ConstantArray;
Expand All @@ -34,9 +33,9 @@ impl DynAggregateKernel for SparseSumKernel {
batch: &ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<Scalar>> {
if !aggregate_fn.is::<Sum>() {
let Some(options) = aggregate_fn.as_opt::<Sum>() else {
return Ok(None);
}
};

let Some(sparse) = batch.as_opt::<Sparse>() else {
return Ok(None);
Expand All @@ -47,8 +46,8 @@ impl DynAggregateKernel for SparseSumKernel {

// Build a fresh Sum accumulator over the array dtype and fold in the fill and patch
// contributions. The accumulator's existing semantics (checked overflow → null
// partial) are preserved.
let mut acc = Accumulator::try_new(Sum, EmptyOptions, batch.dtype().clone())?;
// partial, NaN handling per the options) are preserved.
let mut acc = Accumulator::try_new(Sum, *options, batch.dtype().clone())?;

if n_fill > 0 {
let fill_array = ConstantArray::new(sparse.fill_scalar().clone(), n_fill).into_array();
Expand Down
3 changes: 2 additions & 1 deletion fuzz/src/array/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray as _;
use vortex_array::aggregate_fn::SkipNansOptions;
use vortex_array::aggregate_fn::fns::min_max::MinMaxResult;
use vortex_array::aggregate_fn::fns::min_max::min_max;
use vortex_error::VortexResult;
Expand All @@ -13,5 +14,5 @@ pub fn min_max_canonical_array(
canonical: Canonical,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<MinMaxResult>> {
min_max(&canonical.into_array(), ctx)
min_max(&canonical.into_array(), ctx, SkipNansOptions::default())
Comment thread
robert3005 marked this conversation as resolved.
}
3 changes: 2 additions & 1 deletion fuzz/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::aggregate_fn::SkipNansOptions;
use vortex_array::aggregate_fn::fns::all_non_distinct::all_non_distinct;
use vortex_array::aggregate_fn::fns::min_max::MinMaxResult;
use vortex_array::aggregate_fn::fns::min_max::min_max;
Expand Down Expand Up @@ -667,7 +668,7 @@ pub fn run_fuzz_action(fuzz_action: FuzzArrayAction) -> VortexFuzzResult<bool> {
assert_scalar_eq(&expected.scalar(), &sum_result, i)?;
}
Action::MinMax => {
let min_max_result = min_max(&current_array, &mut ctx)
let min_max_result = min_max(&current_array, &mut ctx, SkipNansOptions::default())
.vortex_expect("min_max operation should succeed in fuzz test");
assert_min_max_eq(expected.min_max().as_ref(), min_max_result.as_ref(), i)?;
}
Expand Down
12 changes: 8 additions & 4 deletions vortex-array/benches/aggregate_grouped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::aggregate_fn::AggregateFnVTable;
use vortex_array::aggregate_fn::DynGroupedAccumulator;
use vortex_array::aggregate_fn::EmptyOptions;
use vortex_array::aggregate_fn::GroupedAccumulator;
use vortex_array::aggregate_fn::SkipNansOptions;
use vortex_array::aggregate_fn::fns::count::Count;
use vortex_array::aggregate_fn::fns::sum::Sum;
use vortex_array::arrays::ListViewArray;
Expand Down Expand Up @@ -149,10 +149,14 @@ fn list_element_dtype(list_view: &ArrayRef) -> DType {

fn grouped_accumulator<V>(list_view: &ArrayRef, vtable: V) -> ArrayRef
where
V: AggregateFnVTable<Options = EmptyOptions> + Clone,
V: AggregateFnVTable<Options = SkipNansOptions> + Clone,
{
let mut acc =
GroupedAccumulator::try_new(vtable, EmptyOptions, list_element_dtype(list_view)).unwrap();
let mut acc = GroupedAccumulator::try_new(
vtable,
SkipNansOptions::default(),
list_element_dtype(list_view),
)
.unwrap();
acc.accumulate_list(list_view, &mut LEGACY_SESSION.create_execution_ctx())
.unwrap();
divan::black_box(acc.finish().unwrap())
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/aggregate_fn/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ mod tests {
use crate::aggregate_fn::AggregateFnRef;
use crate::aggregate_fn::AggregateFnVTable;
use crate::aggregate_fn::DynAccumulator;
use crate::aggregate_fn::EmptyOptions;
use crate::aggregate_fn::SkipNansOptions;
use crate::aggregate_fn::combined::Combined;
use crate::aggregate_fn::combined::PairOptions;
use crate::aggregate_fn::fns::mean::Mean;
Expand Down Expand Up @@ -348,7 +348,7 @@ mod tests {
let dtype = DType::Primitive(PType::F64, Nullability::NonNullable);
Accumulator::try_new(
Mean::combined(),
PairOptions(EmptyOptions, EmptyOptions),
PairOptions(SkipNansOptions::default(), SkipNansOptions::default()),
dtype,
)
}
Expand Down
28 changes: 20 additions & 8 deletions vortex-array/src/aggregate_fn/fns/bounded_max/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::aggregate_fn::AggregateFnId;
use crate::aggregate_fn::AggregateFnRef;
use crate::aggregate_fn::AggregateFnSatisfaction;
use crate::aggregate_fn::AggregateFnVTable;
use crate::aggregate_fn::EmptyOptions;
use crate::aggregate_fn::SkipNansOptions;
use crate::aggregate_fn::fns::max::Max;
use crate::aggregate_fn::fns::min_max::MinMax;
use crate::aggregate_fn::fns::min_max::min_max;
Expand Down Expand Up @@ -136,7 +136,11 @@ impl AggregateFnVTable for BoundedMax {
};
}

if requested.is::<Max>() {
// The stored bound skips NaNs, so it cannot stand in for a NaN-including maximum.
if requested
.as_opt::<Max>()
.is_some_and(|options| options.skip_nans)
{
AggregateFnSatisfaction::Approximate
} else {
AggregateFnSatisfaction::No
Expand Down Expand Up @@ -192,7 +196,7 @@ impl AggregateFnVTable for BoundedMax {
Columnar::Canonical(canonical) => canonical.clone().into_array(),
Columnar::Constant(constant) => constant.clone().into_array(),
};
let Some(result) = min_max(&array, ctx)? else {
let Some(result) = min_max(&array, ctx, SkipNansOptions::default())? else {
return Ok(());
};
match truncate_max(result.max, partial.max_bytes.get())? {
Expand All @@ -213,7 +217,7 @@ impl AggregateFnVTable for BoundedMax {

fn supported_dtype<'a>(_options: &BoundedMaxOptions, input_dtype: &'a DType) -> Option<&'a DType> {
MinMax
.return_dtype(&EmptyOptions, input_dtype)
.return_dtype(&SkipNansOptions::default(), input_dtype)
.map(|_| input_dtype)
}

Expand Down Expand Up @@ -253,7 +257,7 @@ mod tests {
use crate::aggregate_fn::AggregateFnVTable;
use crate::aggregate_fn::AggregateFnVTableExt;
use crate::aggregate_fn::DynAccumulator;
use crate::aggregate_fn::EmptyOptions;
use crate::aggregate_fn::SkipNansOptions;
use crate::aggregate_fn::fns::bounded_max::BoundedMax;
use crate::aggregate_fn::fns::bounded_max::BoundedMaxOptions;
use crate::aggregate_fn::fns::max::Max;
Expand Down Expand Up @@ -416,15 +420,23 @@ mod tests {
AggregateFnSatisfaction::No
);
assert_eq!(
stored.can_satisfy(&Max.bind(EmptyOptions)),
stored.can_satisfy(&Max.bind(SkipNansOptions::default())),
AggregateFnSatisfaction::Approximate
);
assert_eq!(
Max.bind(EmptyOptions).can_satisfy(&stored),
stored.can_satisfy(&Max.bind(SkipNansOptions::include())),
AggregateFnSatisfaction::No
);
assert_eq!(
Max.bind(SkipNansOptions::include()).can_satisfy(&stored),
AggregateFnSatisfaction::No
);
assert_eq!(
Max.bind(SkipNansOptions::default()).can_satisfy(&stored),
AggregateFnSatisfaction::Approximate
);
assert_eq!(
stored.can_satisfy(&Min.bind(EmptyOptions)),
stored.can_satisfy(&Min.bind(SkipNansOptions::default())),
AggregateFnSatisfaction::No
);
}
Expand Down
28 changes: 20 additions & 8 deletions vortex-array/src/aggregate_fn/fns/bounded_min/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::aggregate_fn::AggregateFnId;
use crate::aggregate_fn::AggregateFnRef;
use crate::aggregate_fn::AggregateFnSatisfaction;
use crate::aggregate_fn::AggregateFnVTable;
use crate::aggregate_fn::EmptyOptions;
use crate::aggregate_fn::SkipNansOptions;
use crate::aggregate_fn::fns::min::Min;
use crate::aggregate_fn::fns::min_max::MinMax;
use crate::aggregate_fn::fns::min_max::min_max;
Expand Down Expand Up @@ -126,7 +126,11 @@ impl AggregateFnVTable for BoundedMin {
};
}

if requested.is::<Min>() {
// The stored bound skips NaNs, so it cannot stand in for a NaN-including minimum.
if requested
.as_opt::<Min>()
.is_some_and(|options| options.skip_nans)
{
AggregateFnSatisfaction::Approximate
} else {
AggregateFnSatisfaction::No
Expand Down Expand Up @@ -182,7 +186,7 @@ impl AggregateFnVTable for BoundedMin {
Columnar::Canonical(canonical) => canonical.clone().into_array(),
Columnar::Constant(constant) => constant.clone().into_array(),
};
let Some(result) = min_max(&array, ctx)? else {
let Some(result) = min_max(&array, ctx, SkipNansOptions::default())? else {
return Ok(());
};
if let Some(bound) = truncate_min(result.min, partial.max_bytes.get())? {
Expand All @@ -202,7 +206,7 @@ impl AggregateFnVTable for BoundedMin {

fn supported_dtype<'a>(_options: &BoundedMinOptions, input_dtype: &'a DType) -> Option<&'a DType> {
MinMax
.return_dtype(&EmptyOptions, input_dtype)
.return_dtype(&SkipNansOptions::default(), input_dtype)
.map(|_| input_dtype)
}

Expand Down Expand Up @@ -241,7 +245,7 @@ mod tests {
use crate::aggregate_fn::AggregateFnVTable;
use crate::aggregate_fn::AggregateFnVTableExt;
use crate::aggregate_fn::DynAccumulator;
use crate::aggregate_fn::EmptyOptions;
use crate::aggregate_fn::SkipNansOptions;
use crate::aggregate_fn::fns::bounded_min::BoundedMin;
use crate::aggregate_fn::fns::bounded_min::BoundedMinOptions;
use crate::aggregate_fn::fns::max::Max;
Expand Down Expand Up @@ -350,15 +354,23 @@ mod tests {
AggregateFnSatisfaction::No
);
assert_eq!(
stored.can_satisfy(&Min.bind(EmptyOptions)),
stored.can_satisfy(&Min.bind(SkipNansOptions::default())),
AggregateFnSatisfaction::Approximate
);
assert_eq!(
Min.bind(EmptyOptions).can_satisfy(&stored),
stored.can_satisfy(&Min.bind(SkipNansOptions::include())),
AggregateFnSatisfaction::No
);
assert_eq!(
Min.bind(SkipNansOptions::include()).can_satisfy(&stored),
AggregateFnSatisfaction::No
);
assert_eq!(
Min.bind(SkipNansOptions::default()).can_satisfy(&stored),
AggregateFnSatisfaction::Approximate
);
assert_eq!(
stored.can_satisfy(&Max.bind(EmptyOptions)),
stored.can_satisfy(&Max.bind(SkipNansOptions::default())),
AggregateFnSatisfaction::No
);
}
Expand Down
Loading
Loading