diff --git a/Cargo.lock b/Cargo.lock index 2f0965f5a8e..d4507ec3403 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9437,6 +9437,7 @@ dependencies = [ "vortex-onpair", "vortex-pco", "vortex-runend", + "vortex-runend-bool", "vortex-sequence", "vortex-session", "vortex-sparse", @@ -9796,6 +9797,7 @@ dependencies = [ "vortex-onpair", "vortex-pco", "vortex-runend", + "vortex-runend-bool", "vortex-scan", "vortex-sequence", "vortex-session", @@ -10157,6 +10159,22 @@ dependencies = [ "vortex-session", ] +[[package]] +name = "vortex-runend-bool" +version = "0.1.0" +dependencies = [ + "itertools 0.14.0", + "num-traits", + "prost 0.14.4", + "rstest", + "vortex-array", + "vortex-buffer", + "vortex-error", + "vortex-mask", + "vortex-runend", + "vortex-session", +] + [[package]] name = "vortex-scan" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index a587ef1db16..7631d7779f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ members = [ "encodings/fastlanes", "encodings/decimal-byte-parts", "encodings/runend", + "encodings/runend-bool", "encodings/sequence", "encodings/alp", "encodings/datetime-parts", @@ -303,6 +304,7 @@ vortex-pco = { version = "0.1.0", path = "./encodings/pco", default-features = f vortex-proto = { version = "0.1.0", path = "./vortex-proto", default-features = false } vortex-row = { version = "0.1.0", path = "./vortex-row", default-features = false } vortex-runend = { version = "0.1.0", path = "./encodings/runend", default-features = false } +vortex-runend-bool = { version = "0.1.0", path = "./encodings/runend-bool", default-features = false } vortex-scan = { version = "0.1.0", path = "./vortex-scan", default-features = false } vortex-sequence = { version = "0.1.0", path = "encodings/sequence", default-features = false } vortex-session = { version = "0.1.0", path = "./vortex-session", default-features = false } diff --git a/encodings/runend-bool/Cargo.toml b/encodings/runend-bool/Cargo.toml new file mode 100644 index 00000000000..794d88df4fa --- /dev/null +++ b/encodings/runend-bool/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "vortex-runend-bool" +authors = { workspace = true } +categories = { workspace = true } +description = "Vortex run-end encoded bool array" +edition = { workspace = true } +homepage = { workspace = true } +include = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +readme = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +[dependencies] +itertools = { workspace = true } +num-traits = { workspace = true } +prost = { workspace = true } +vortex-array = { workspace = true } +vortex-buffer = { workspace = true } +vortex-error = { workspace = true } +vortex-mask = { workspace = true } +vortex-runend = { workspace = true } +vortex-session = { workspace = true } + +[lints] +workspace = true + +[dev-dependencies] +rstest = { workspace = true } +vortex-array = { workspace = true, features = ["_test-harness"] } diff --git a/encodings/runend-bool/goldenfiles/runend_bool.metadata b/encodings/runend-bool/goldenfiles/runend_bool.metadata new file mode 100644 index 00000000000..eb9aaf1d358 --- /dev/null +++ b/encodings/runend-bool/goldenfiles/runend_bool.metadata @@ -0,0 +1 @@ +ÿÿÿÿÿÿÿÿÿÿÿÿÿÿÿÿÿÿ  \ No newline at end of file diff --git a/encodings/runend-bool/src/array.rs b/encodings/runend-bool/src/array.rs new file mode 100644 index 00000000000..9965a03e02e --- /dev/null +++ b/encodings/runend-bool/src/array.rs @@ -0,0 +1,467 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt::Debug; +use std::fmt::Display; +use std::fmt::Formatter; +use std::hash::Hash; +use std::hash::Hasher; + +use prost::Message; +use vortex_array::Array; +use vortex_array::ArrayEq; +use vortex_array::ArrayHash; +use vortex_array::ArrayId; +use vortex_array::ArrayParts; +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::EqMode; +use vortex_array::ExecutionCtx; +use vortex_array::ExecutionResult; +use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; +use vortex_array::TypedArrayRef; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::Bool; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::bool::BoolArray; +use vortex_array::buffer::BufferHandle; +use vortex_array::child_to_validity; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::dtype::PType; +use vortex_array::match_each_unsigned_integer_ptype; +use vortex_array::serde::ArrayChildren; +use vortex_array::smallvec::smallvec; +use vortex_array::validity::Validity; +use vortex_array::validity_to_child; +use vortex_array::vtable::VTable; +use vortex_array::vtable::ValidityVTable; +use vortex_error::VortexExpect as _; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_panic; +use vortex_runend::find_physical_index; +use vortex_runend::trimmed_ends_iter; +use vortex_session::VortexSession; +use vortex_session::registry::CachedId; + +use crate::compress::encode_runend_bool; +use crate::compress::runend_bool_decode_slice; +use crate::kernel::PARENT_KERNELS; + +/// A [`RunEndBool`]-encoded Vortex array. +pub type RunEndBoolArray = Array; + +#[derive(Clone, prost::Message)] +pub struct RunEndBoolMetadata { + #[prost(enumeration = "PType", tag = "1")] + pub ends_ptype: i32, + #[prost(uint64, tag = "2")] + pub num_runs: u64, + #[prost(uint64, tag = "3")] + pub offset: u64, + #[prost(bool, tag = "4")] + pub start: bool, +} + +impl ArrayHash for RunEndBoolData { + fn array_hash(&self, state: &mut H, _accuracy: EqMode) { + self.offset.hash(state); + self.start.hash(state); + } +} + +impl ArrayEq for RunEndBoolData { + fn array_eq(&self, other: &Self, _accuracy: EqMode) -> bool { + self.offset == other.offset && self.start == other.start + } +} + +impl VTable for RunEndBool { + type TypedArrayData = RunEndBoolData; + + type OperationsVTable = Self; + type ValidityVTable = Self; + + fn id(&self) -> ArrayId { + static ID: CachedId = CachedId::new("vortex.runend_bool"); + *ID + } + + fn validate( + &self, + data: &Self::TypedArrayData, + dtype: &DType, + len: usize, + slots: &[Option], + ) -> VortexResult<()> { + let DType::Bool(nullability) = dtype else { + vortex_bail!("Expected bool dtype, got {dtype:?}"); + }; + let ends = slots[ENDS_SLOT] + .as_ref() + .vortex_expect("RunEndBoolArray ends slot"); + // TODO(ctx): trait fixes - VTable::validate has a fixed signature. + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + RunEndBoolData::validate_parts(ends, data.offset, len, &mut ctx)?; + + let validity = child_to_validity(slots[VALIDITY_SLOT].as_ref(), *nullability); + if let Some(validity_len) = validity.maybe_len() { + vortex_ensure!( + validity_len == len, + "RunEndBoolArray validity len {} does not match outer length {}", + validity_len, + len + ); + } + Ok(()) + } + + fn nbuffers(_array: ArrayView<'_, Self>) -> usize { + 0 + } + + fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle { + vortex_panic!("RunEndBoolArray buffer index {idx} out of bounds") + } + + fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option { + vortex_panic!("RunEndBoolArray buffer_name index {idx} out of bounds") + } + + fn serialize( + array: ArrayView<'_, Self>, + _session: &VortexSession, + ) -> VortexResult>> { + Ok(Some( + RunEndBoolMetadata { + ends_ptype: PType::try_from(array.ends().dtype()) + .vortex_expect("Must be a valid PType") as i32, + num_runs: array.ends().len() as u64, + offset: array.offset() as u64, + start: array.start(), + } + .encode_to_vec(), + )) + } + + fn deserialize( + &self, + dtype: &DType, + len: usize, + metadata: &[u8], + _buffers: &[BufferHandle], + children: &dyn ArrayChildren, + _session: &VortexSession, + ) -> VortexResult> { + let metadata = RunEndBoolMetadata::decode(metadata)?; + let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable); + let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize"); + let ends = children.get(0, &ends_dtype, runs)?; + + // Validity is an optional child whose index depends on whether ends consumed a slot. + let validity = if children.len() <= 1 { + Validity::from(dtype.nullability()) + } else { + Validity::Array(children.get(1, &Validity::DTYPE, len)?) + }; + + let offset = usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"); + let slots = smallvec![Some(ends), validity_to_child(&validity, len)]; + let data = RunEndBoolData::new(offset, metadata.start); + Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots)) + } + + fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String { + SLOT_NAMES[idx].to_string() + } + + fn execute_parent( + array: ArrayView<'_, Self>, + parent: &ArrayRef, + child_idx: usize, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + PARENT_KERNELS.execute(array, parent, child_idx, ctx) + } + + fn execute(array: Array, ctx: &mut ExecutionCtx) -> VortexResult { + run_end_bool_canonicalize(&array, ctx).map(ExecutionResult::done) + } +} + +/// The run-end positions marking where each run terminates. +pub(super) const ENDS_SLOT: usize = 0; +/// The optional validity child. +pub(super) const VALIDITY_SLOT: usize = 1; +pub(super) const NUM_SLOTS: usize = 2; +pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["ends", "validity"]; + +#[derive(Clone, Debug)] +pub struct RunEndBoolData { + offset: usize, + start: bool, +} + +impl Display for RunEndBoolData { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "offset: {}, start: {}", self.offset, self.start) + } +} + +/// Extension methods for [`RunEndBoolArray`]. +pub trait RunEndBoolArrayExt: TypedArrayRef { + /// The logical offset into the first run. + fn offset(&self) -> usize { + self.offset + } + + /// The boolean value of the first (run index 0) run. + fn start(&self) -> bool { + self.start + } + + /// The primitive array of strictly-increasing run end positions. + fn ends(&self) -> &ArrayRef { + self.as_ref().slots()[ENDS_SLOT] + .as_ref() + .vortex_expect("RunEndBoolArray ends slot") + } + + /// The array's validity. + fn bool_validity(&self) -> Validity { + child_to_validity( + self.as_ref().slots()[VALIDITY_SLOT].as_ref(), + self.nullability(), + ) + } + + /// The array's nullability. + fn nullability(&self) -> Nullability { + match self.as_ref().dtype() { + DType::Bool(nullability) => *nullability, + _ => unreachable!("RunEndBoolArray requires a bool dtype"), + } + } + + /// Find the physical run index containing the given logical `index`. + fn find_physical_index(&self, index: usize) -> VortexResult { + find_physical_index(self.ends(), self.offset(), index) + } +} +impl> RunEndBoolArrayExt for T {} + +#[derive(Clone, Debug)] +pub struct RunEndBool; + +impl RunEndBool { + /// Build a new [`RunEndBoolArray`] without validation. + /// + /// # Safety + /// The caller must ensure `ends` are strictly increasing unsigned integers and that the last + /// run end is `>= offset + length`. + pub unsafe fn new_unchecked( + ends: ArrayRef, + start: bool, + offset: usize, + length: usize, + validity: Validity, + ) -> RunEndBoolArray { + let dtype = DType::Bool(validity.nullability()); + let slots = smallvec![Some(ends), validity_to_child(&validity, length)]; + let data = unsafe { RunEndBoolData::new_unchecked(offset, start) }; + unsafe { + Array::from_parts_unchecked( + ArrayParts::new(RunEndBool, dtype, length, data).with_slots(slots), + ) + } + } + + /// Build a new [`RunEndBoolArray`] from ends, start, and validity. + pub fn try_new( + ends: ArrayRef, + start: bool, + validity: Validity, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let len = RunEndBoolData::logical_len_from_ends(&ends, ctx)?; + Self::try_new_offset_length(ends, start, 0, len, validity, ctx) + } + + /// Build a new [`RunEndBoolArray`] from ends, start, offset, length, and validity. + pub fn try_new_offset_length( + ends: ArrayRef, + start: bool, + offset: usize, + length: usize, + validity: Validity, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + RunEndBoolData::validate_parts(&ends, offset, length, ctx)?; + if let Some(validity_len) = validity.maybe_len() { + vortex_ensure!( + validity_len == length, + "validity len {validity_len} does not match length {length}" + ); + } + let dtype = DType::Bool(validity.nullability()); + let slots = smallvec![Some(ends), validity_to_child(&validity, length)]; + let data = RunEndBoolData::new(offset, start); + Array::try_from_parts(ArrayParts::new(RunEndBool, dtype, length, data).with_slots(slots)) + } + + /// Build a new [`RunEndBoolArray`] from ends, start, and validity (panics on invalid input). + /// + /// # Examples + /// + /// ``` + /// # use vortex_array::IntoArray; + /// # use vortex_array::{LEGACY_SESSION, VortexSessionExecute}; + /// # use vortex_array::validity::Validity; + /// # use vortex_buffer::buffer; + /// # use vortex_error::VortexResult; + /// # use vortex_runend_bool::RunEndBool; + /// # fn main() -> VortexResult<()> { + /// let mut ctx = LEGACY_SESSION.create_execution_ctx(); + /// // start = false, so runs alternate false, true: [false, false, true] + /// let ends = buffer![2u8, 3u8].into_array(); + /// let run_end = RunEndBool::new(ends, false, Validity::NonNullable, &mut ctx); + /// + /// assert_eq!(run_end.execute_scalar(0, &mut ctx)?, false.into()); + /// assert_eq!(run_end.execute_scalar(1, &mut ctx)?, false.into()); + /// assert_eq!(run_end.execute_scalar(2, &mut ctx)?, true.into()); + /// # Ok(()) + /// # } + /// ``` + pub fn new( + ends: ArrayRef, + start: bool, + validity: Validity, + ctx: &mut ExecutionCtx, + ) -> RunEndBoolArray { + Self::try_new(ends, start, validity, ctx).vortex_expect("RunEndBoolData is always valid") + } + + /// Run-end encode a boolean array. + pub fn encode(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + if let Some(barray) = array.as_opt::() { + encode_runend_bool(barray, ctx) + } else { + vortex_bail!("RunEndBool can only encode bool arrays") + } + } +} + +impl RunEndBoolData { + fn logical_len_from_ends(ends: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + vortex_runend::logical_len_from_ends(ends, ctx) + } + + pub(crate) fn validate_parts( + ends: &ArrayRef, + offset: usize, + length: usize, + ctx: &mut ExecutionCtx, + ) -> VortexResult<()> { + vortex_runend::validate_ends(ends, offset, length, ctx) + } + + /// Build new inner data from an offset and the value of the first run. + pub fn new(offset: usize, start: bool) -> Self { + Self { offset, start } + } + + /// Build new inner data without validation. + /// + /// # Safety + /// + /// See [`RunEndBool::try_new_offset_length`] for the required preconditions. + pub unsafe fn new_unchecked(offset: usize, start: bool) -> Self { + Self { offset, start } + } +} + +impl ValidityVTable for RunEndBool { + fn validity(array: ArrayView<'_, RunEndBool>) -> VortexResult { + Ok(array.bool_validity()) + } +} + +pub(super) fn run_end_bool_canonicalize( + array: &RunEndBoolArray, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let ends = array.ends().clone().execute::(ctx)?; + let offset = array.offset(); + let length = array.as_ref().len(); + let start = array.start(); + + let bits = match_each_unsigned_integer_ptype!(ends.ptype(), |E| { + runend_bool_decode_slice( + trimmed_ends_iter(ends.as_slice::(), offset, length), + start, + length, + ) + }); + + let validity = array.bool_validity().execute_mask(length, ctx)?; + let validity = Validity::from_mask(validity, array.nullability()); + + Ok(BoolArray::new(bits, validity).into_array()) +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::BoolArray; + use vortex_array::assert_arrays_eq; + use vortex_array::dtype::DType; + use vortex_array::dtype::Nullability; + use vortex_array::session::ArraySession; + use vortex_array::validity::Validity; + use vortex_buffer::BitBuffer; + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_session::VortexSession; + + use crate::RunEndBool; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + #[test] + fn test_constructor() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + let arr = RunEndBool::new( + buffer![2u32, 5, 10].into_array(), + true, + Validity::NonNullable, + &mut ctx, + ); + assert_eq!(arr.len(), 10); + assert_eq!(arr.dtype(), &DType::Bool(Nullability::NonNullable)); + + let expected = BoolArray::from(BitBuffer::from(vec![ + true, true, false, false, false, true, true, true, true, true, + ])); + assert_arrays_eq!(arr.into_array(), expected); + Ok(()) + } + + #[test] + fn test_encode_roundtrip() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + let bits = vec![ + true, true, false, false, false, true, true, true, true, true, + ]; + let bool_array = BoolArray::from(BitBuffer::from(bits)); + let encoded = RunEndBool::encode(bool_array.clone().into_array(), &mut ctx)?; + assert_arrays_eq!(encoded.into_array(), bool_array); + Ok(()) + } +} diff --git a/encodings/runend-bool/src/compress.rs b/encodings/runend-bool/src/compress.rs new file mode 100644 index 00000000000..25f3fd98049 --- /dev/null +++ b/encodings/runend-bool/src/compress.rs @@ -0,0 +1,165 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Run-end encoding and decoding routines specialized for boolean arrays. +//! +//! Boolean runs strictly alternate, so a run-end encoded bool array stores only the run `ends` +//! plus the value of the first run (`start`). The value of run `i` is then +//! [`value_at_index`]`(i, start)`. + +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::Bool; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::bool::BoolArrayExt; +use vortex_array::arrays::primitive::PrimitiveArrayExt; +use vortex_array::validity::Validity; +use vortex_buffer::BitBuffer; +use vortex_buffer::BitBufferMut; +use vortex_buffer::Buffer; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; + +use crate::array::RunEndBool; +use crate::array::RunEndBoolArray; + +/// Returns the boolean value of the run with index `idx` given the value of run 0 (`start`). +/// +/// Runs strictly alternate, so even-indexed runs equal `start` and odd-indexed runs equal `!start`. +pub fn value_at_index(idx: usize, start: bool) -> bool { + if idx.is_multiple_of(2) { start } else { !start } +} + +/// Run-end encode a [`BitBuffer`], returning the run `ends` and the value of the first run. +/// +/// `start` is the value of run 0. The returned `ends` are the exclusive end positions of each run. +pub fn runend_bool_encode_slice(elements: &BitBuffer) -> (Vec, bool) { + let mut iter = elements.set_slices(); + let Some((start, end)) = iter.next() else { + return (vec![elements.len() as u64], false); + }; + let mut ends = Vec::new(); + let first_bool = start == 0; + if !first_bool { + ends.push(start as u64) + } + ends.push(end as u64); + for (s, e) in iter { + ends.push(s as u64); + ends.push(e as u64); + } + let last_end = *ends.last().vortex_expect("ends is non-empty"); + if last_end != elements.len() as u64 { + ends.push(elements.len() as u64) + } + (ends, first_bool) +} + +/// Decode run-end encoded boolean values into a flat [`BitBuffer`]. +pub fn runend_bool_decode_slice( + run_ends_iter: impl Iterator, + start: bool, + length: usize, +) -> BitBuffer { + let mut decoded = BitBufferMut::with_capacity(length); + for (idx, end) in run_ends_iter.enumerate() { + decoded.append_n(value_at_index(idx, start), end - decoded.len()); + } + decoded.freeze() +} + +/// Run-end encode a [`BoolArray`] into a [`RunEndBoolArray`]. +/// +/// The run `ends` are narrowed to the smallest unsigned integer type that can hold them. +pub fn encode_runend_bool( + array: ArrayView<'_, Bool>, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let length = array.as_ref().len(); + let validity = array.as_ref().validity()?; + let bits = array.to_bit_buffer(); + let (ends, start) = runend_bool_encode_slice(&bits); + + let ends = PrimitiveArray::new(Buffer::from(ends), Validity::NonNullable) + .narrow(ctx) + .vortex_expect("ends must succeed downcasting"); + + // SAFETY: runend_bool_encode_slice produces strictly-increasing ends with last == length. + Ok(unsafe { RunEndBool::new_unchecked(ends.into_array(), start, 0, length, validity) }) +} + +#[cfg(test)] +mod tests { + use vortex_array::LEGACY_SESSION; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::BoolArray; + use vortex_buffer::BitBuffer; + use vortex_error::VortexResult; + use vortex_runend::trimmed_ends_iter; + + use super::runend_bool_decode_slice; + use super::runend_bool_encode_slice; + use super::value_at_index; + + #[test] + fn encode_decode_roundtrip() -> VortexResult<()> { + let bits = BitBuffer::from(vec![ + true, true, false, false, false, true, true, true, true, true, + ]); + let (ends, start) = runend_bool_encode_slice(&bits); + assert_eq!(ends, vec![2, 5, 10]); + assert!(start); + + let decoded = runend_bool_decode_slice(trimmed_ends_iter(&ends, 0, 10), start, 10); + assert_eq!(decoded, bits); + Ok(()) + } + + #[test] + fn encode_all_false() { + let bits = BitBuffer::from(vec![false, false, false]); + let (ends, start) = runend_bool_encode_slice(&bits); + assert_eq!(ends, vec![3]); + assert!(!start); + } + + #[test] + fn encode_leading_false() { + let bits = BitBuffer::from(vec![false, true, true, false]); + let (ends, start) = runend_bool_encode_slice(&bits); + assert_eq!(ends, vec![1, 3, 4]); + assert!(!start); + } + + #[test] + fn value_at_index_alternates() { + assert!(value_at_index(0, true)); + assert!(!value_at_index(1, true)); + assert!(value_at_index(2, true)); + assert!(!value_at_index(0, false)); + } + + #[test] + fn decode_with_offset() -> VortexResult<()> { + // [T,T,F,F,F,T,T,T,T,T] sliced 2..8 => [F,F,F,T,T,T] + let ends: Vec = vec![2, 5, 10]; + let decoded = runend_bool_decode_slice(trimmed_ends_iter(&ends, 2, 6), true, 6); + assert_eq!( + decoded, + BitBuffer::from(vec![false, false, false, true, true, true]) + ); + Ok(()) + } + + #[test] + fn encode_array_roundtrip() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let array = BoolArray::from(BitBuffer::from(vec![ + true, true, false, false, false, true, true, true, true, true, + ])); + let encoded = super::encode_runend_bool(array.as_view(), &mut ctx)?; + assert_eq!(encoded.as_ref().len(), 10); + Ok(()) + } +} diff --git a/encodings/runend-bool/src/compute/filter.rs b/encodings/runend-bool/src/compute/filter.rs new file mode 100644 index 00000000000..fad76f22df2 --- /dev/null +++ b/encodings/runend-bool/src/compute/filter.rs @@ -0,0 +1,242 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::cmp::min; + +use num_traits::AsPrimitive; +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::BoolArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::filter::FilterKernel; +use vortex_array::arrays::primitive::PrimitiveArrayExt; +use vortex_array::dtype::NativePType; +use vortex_array::match_each_unsigned_integer_ptype; +use vortex_array::validity::Validity; +use vortex_buffer::BitBuffer; +use vortex_buffer::BitBufferMut; +use vortex_buffer::Buffer; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use crate::RunEndBool; +use crate::array::RunEndBoolArrayExt; +use crate::compress::value_at_index; + +/// Ratio of kept elements to runs below which we decode via per-index lookup instead of a linear +/// run-preserving scan. Mirrors the threshold used by `vortex-runend`. +const FILTER_TAKE_THRESHOLD: f64 = 0.1; + +impl FilterKernel for RunEndBool { + fn filter( + array: ArrayView<'_, Self>, + mask: &Mask, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + let mask_values = mask + .values() + .vortex_expect("FilterKernel precondition: mask is Mask::Values"); + + let runs_ratio = mask_values.true_count() as f64 / array.ends().len() as f64; + + // Sparse masks: decode the kept elements directly. Each kept index needs a single binary + // search, which is cheaper than a full linear scan when few elements survive. + if runs_ratio < FILTER_TAKE_THRESHOLD || mask_values.true_count() < 25 { + let start = array.start(); + let mut bits = BitBufferMut::with_capacity(mask_values.true_count()); + for idx in mask_values.indices().iter().copied() { + let run_index = array.find_physical_index(idx)?; + bits.append(value_at_index(run_index, start)); + } + let validity = filter_validity(&array.bool_validity(), mask)?; + return Ok(Some(BoolArray::new(bits.freeze(), validity).into_array())); + } + + // Dense masks: scan the run ends once, accumulating the kept length of each run. This avoids + // a per-element binary search and preserves the run-end encoding in the output. + let ends = array.ends().clone().execute::(ctx)?; + let start = array.start(); + let (new_ends, new_start, kept) = match_each_unsigned_integer_ptype!(ends.ptype(), |E| { + filter_run_end_bool( + ends.as_slice::(), + array.offset(), + array.as_ref().len(), + start, + mask_values.bit_buffer(), + ) + }); + + let validity = filter_validity(&array.bool_validity(), mask)?; + let new_ends = PrimitiveArray::new(Buffer::from(new_ends), Validity::NonNullable) + .narrow(ctx) + .vortex_expect("ends must succeed downcasting"); + + // SAFETY: filter_run_end_bool produces strictly-increasing ends whose last value is `kept`, + // the length of the filtered array. + Ok(Some( + unsafe { + RunEndBool::new_unchecked(new_ends.into_array(), new_start, 0, kept, validity) + } + .into_array(), + )) + } +} + +/// Linear run-preserving filter over boolean run ends. +/// +/// Scans each run once, counting the elements kept by `mask`, and emits run ends for the filtered +/// array. Because boolean runs strictly alternate, dropping an entire run can leave two kept runs +/// with the same value adjacent; these are merged so the output still alternates and can be encoded +/// by a single `start` flag. Returns `(ends, start, length)` of the filtered array. +fn filter_run_end_bool>( + run_ends: &[E], + offset: usize, + length: usize, + start: bool, + mask: &BitBuffer, +) -> (Vec, bool, usize) { + let mut new_ends: Vec = Vec::new(); + let mut prev = 0usize; + let mut count = 0usize; + let mut cur_value = false; + let mut new_start = false; + + for (run_idx, &end) in run_ends.iter().enumerate() { + let raw: usize = end.as_(); + let end = min(raw.saturating_sub(offset), length); + + let mut kept = 0usize; + for i in prev..end { + // SAFETY: i < end <= length == mask.len() + kept += usize::from(unsafe { mask.value_unchecked(i) }); + } + + if kept > 0 { + let value = value_at_index(run_idx, start); + count += kept; + if new_ends.is_empty() { + new_start = value; + cur_value = value; + new_ends.push(count as u64); + } else if value == cur_value { + // Same value as the previous kept run: merge by extending its end. + *new_ends.last_mut().vortex_expect("new_ends is non-empty") = count as u64; + } else { + cur_value = value; + new_ends.push(count as u64); + } + } + + prev = end; + } + + (new_ends, new_start, count) +} + +fn filter_validity(validity: &Validity, mask: &Mask) -> VortexResult { + Ok(match validity { + Validity::NonNullable => Validity::NonNullable, + Validity::AllValid => Validity::AllValid, + Validity::AllInvalid => Validity::AllInvalid, + Validity::Array(a) => Validity::Array(a.filter(mask.clone())?), + }) +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::BoolArray; + use vortex_array::assert_arrays_eq; + use vortex_array::session::ArraySession; + use vortex_array::validity::Validity; + use vortex_buffer::BitBuffer; + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_mask::Mask; + use vortex_session::VortexSession; + + use crate::RunEndBool; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + #[test] + fn filter_runend_bool() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + // [T,T,F,F,F,T,T,T,T,T] + let arr = RunEndBool::try_new( + buffer![2u32, 5, 10].into_array(), + true, + Validity::NonNullable, + &mut ctx, + )?; + let filtered = arr.filter(Mask::from_iter([ + true, false, true, false, true, false, true, false, true, false, + ]))?; + // keep indices 0,2,4,6,8 => [T,F,F,T,T] + let expected = BoolArray::from(BitBuffer::from(vec![true, false, false, true, true])); + assert_arrays_eq!(filtered, expected); + Ok(()) + } + + /// 4 runs of 32, dense mask: exercises the linear run-preserving path and asserts the output is + /// still run-end encoded. + #[test] + fn filter_dense_preserves_runend() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + // ends = [32, 64, 96, 128], start = true => runs T,F,T,F of length 32. + let arr = RunEndBool::try_new( + buffer![32u32, 64, 96, 128].into_array(), + true, + Validity::NonNullable, + &mut ctx, + )?; + // Keep every other element (64 kept, ratio 16 => dense path). + let mask = Mask::from_iter((0..128).map(|i| i % 2 == 0)); + let filtered = arr.into_array().filter(mask)?; + + let executed = filtered.execute_until::(&mut ctx)?; + assert_eq!( + executed.encoding_id().as_ref(), + "vortex.runend_bool", + "dense filter should preserve run-end encoding" + ); + + // Within each run every kept element shares the run value: 16 T, 16 F, 16 T, 16 F. + let mut expected_bits = Vec::new(); + for run in 0..4 { + expected_bits.extend(std::iter::repeat_n(run % 2 == 0, 16)); + } + assert_arrays_eq!(executed, BoolArray::from(BitBuffer::from(expected_bits))); + Ok(()) + } + + /// Dropping an entire run leaves two same-valued runs adjacent; they must merge. + #[rstest] + #[case::merge_true(true)] + #[case::merge_false(false)] + fn filter_dense_merges_runs(#[case] start: bool) -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + // 4 runs of 32. Keep runs 0 and 2 fully, drop runs 1 and 3. + let arr = RunEndBool::try_new( + buffer![32u32, 64, 96, 128].into_array(), + start, + Validity::NonNullable, + &mut ctx, + )?; + let mask = Mask::from_iter((0..128).map(|i| (0..32).contains(&i) || (64..96).contains(&i))); + let filtered = arr.into_array().filter(mask)?; + + // Runs 0 and 2 share the same value `start`, so the result is a single run of 64. + let expected = BoolArray::from(BitBuffer::from(vec![start; 64])); + assert_arrays_eq!(filtered, expected); + Ok(()) + } +} diff --git a/encodings/runend-bool/src/compute/invert.rs b/encodings/runend-bool/src/compute/invert.rs new file mode 100644 index 00000000000..556b2328c17 --- /dev/null +++ b/encodings/runend-bool/src/compute/invert.rs @@ -0,0 +1,73 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::scalar_fn::fns::not::NotKernel; +use vortex_error::VortexResult; + +use crate::RunEndBool; +use crate::array::RunEndBoolArrayExt; + +impl NotKernel for RunEndBool { + fn invert( + array: ArrayView<'_, RunEndBool>, + _ctx: &mut ExecutionCtx, + ) -> VortexResult> { + // Inverting a boolean run-end array negates the value of every run, which is equivalent to + // negating the `start` flag. Ends and validity are unchanged. + // SAFETY: ends and offset are copied unchanged from a valid array. + let inverted = unsafe { + RunEndBool::new_unchecked( + array.ends().clone(), + !array.start(), + array.offset(), + array.as_ref().len(), + array.bool_validity(), + ) + }; + Ok(Some(inverted.into_array())) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::BoolArray; + use vortex_array::assert_arrays_eq; + use vortex_array::builtins::ArrayBuiltins; + use vortex_array::session::ArraySession; + use vortex_array::validity::Validity; + use vortex_buffer::BitBuffer; + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_session::VortexSession; + + use crate::RunEndBool; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + #[test] + fn invert_runend_bool() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + // [T,T,F,F,F,T,T,T,T,T] + let arr = RunEndBool::try_new( + buffer![2u32, 5, 10].into_array(), + true, + Validity::NonNullable, + &mut ctx, + )?; + let inverted = arr.into_array().not()?; + let expected = BoolArray::from(BitBuffer::from(vec![ + false, false, true, true, true, false, false, false, false, false, + ])); + assert_arrays_eq!(inverted, expected); + Ok(()) + } +} diff --git a/encodings/runend-bool/src/compute/is_constant.rs b/encodings/runend-bool/src/compute/is_constant.rs new file mode 100644 index 00000000000..cbe9c3300c1 --- /dev/null +++ b/encodings/runend-bool/src/compute/is_constant.rs @@ -0,0 +1,50 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::aggregate_fn::AggregateFnRef; +use vortex_array::aggregate_fn::fns::is_constant::IsConstant; +use vortex_array::aggregate_fn::kernels::DynAggregateKernel; +use vortex_array::scalar::Scalar; +use vortex_error::VortexResult; + +use crate::RunEndBool; +use crate::array::RunEndBoolArrayExt; + +/// RunEndBool-specific is_constant kernel. +/// +/// A non-nullable run-end bool array with a single run is constant. Other cases defer to the +/// default canonicalization path by returning `None`. +#[derive(Debug)] +pub(crate) struct RunEndBoolIsConstantKernel; + +impl DynAggregateKernel for RunEndBoolIsConstantKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if !aggregate_fn.is::() { + return Ok(None); + } + + let Some(array) = batch.as_opt::() else { + return Ok(None); + }; + + // Single physical run with no validity child => every element is identical. + if array.ends().len() == 1 + && matches!( + array.bool_validity(), + vortex_array::validity::Validity::NonNullable + | vortex_array::validity::Validity::AllValid + ) + { + return Ok(Some(IsConstant::make_partial(batch, true, ctx)?)); + } + + Ok(None) + } +} diff --git a/encodings/runend-bool/src/compute/is_sorted.rs b/encodings/runend-bool/src/compute/is_sorted.rs new file mode 100644 index 00000000000..7fedaa92b6f --- /dev/null +++ b/encodings/runend-bool/src/compute/is_sorted.rs @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray as _; +use vortex_array::aggregate_fn::AggregateFnRef; +use vortex_array::aggregate_fn::fns::is_sorted::IsSorted; +use vortex_array::aggregate_fn::fns::is_sorted::is_sorted; +use vortex_array::aggregate_fn::fns::is_sorted::is_strict_sorted; +use vortex_array::aggregate_fn::kernels::DynAggregateKernel; +use vortex_array::scalar::Scalar; +use vortex_error::VortexResult; + +use crate::RunEndBool; + +/// RunEndBool-specific is_sorted kernel. +/// +/// Sortedness depends on the decoded values, so we canonicalize and delegate. +#[derive(Debug)] +pub(crate) struct RunEndBoolIsSortedKernel; + +impl DynAggregateKernel for RunEndBoolIsSortedKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + let Some(options) = aggregate_fn.as_opt::() else { + return Ok(None); + }; + + if batch.as_opt::().is_none() { + return Ok(None); + } + + let canonical = batch.clone().execute::(ctx)?.into_array(); + let result = if options.strict { + is_strict_sorted(&canonical, ctx)? + } else { + is_sorted(&canonical, ctx)? + }; + + Ok(Some(IsSorted::make_partial( + batch, + result, + options.strict, + ctx, + )?)) + } +} diff --git a/encodings/runend-bool/src/compute/min_max.rs b/encodings/runend-bool/src/compute/min_max.rs new file mode 100644 index 00000000000..2076e0bb780 --- /dev/null +++ b/encodings/runend-bool/src/compute/min_max.rs @@ -0,0 +1,50 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray as _; +use vortex_array::aggregate_fn::AggregateFnRef; +use vortex_array::aggregate_fn::fns::min_max::MinMax; +use vortex_array::aggregate_fn::fns::min_max::make_minmax_dtype; +use vortex_array::aggregate_fn::fns::min_max::min_max; +use vortex_array::aggregate_fn::kernels::DynAggregateKernel; +use vortex_array::scalar::Scalar; +use vortex_error::VortexResult; + +use crate::RunEndBool; + +/// RunEndBool-specific min/max kernel. +/// +/// Min/max for a boolean array depends only on the decoded values, so we canonicalize to a +/// `BoolArray` and delegate. This still benefits from the optimized bool min/max path. +#[derive(Debug)] +pub(crate) struct RunEndBoolMinMaxKernel; + +impl DynAggregateKernel for RunEndBoolMinMaxKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if !aggregate_fn.is::() { + return Ok(None); + } + + if batch.as_opt::().is_none() { + return Ok(None); + } + + let canonical = batch.clone().execute::(ctx)?.into_array(); + let struct_dtype = make_minmax_dtype(batch.dtype()); + match min_max(&canonical, ctx)? { + Some(result) => Ok(Some(Scalar::struct_( + struct_dtype, + vec![result.min, result.max], + ))), + None => Ok(Some(Scalar::null(struct_dtype))), + } + } +} diff --git a/encodings/runend-bool/src/compute/mod.rs b/encodings/runend-bool/src/compute/mod.rs new file mode 100644 index 00000000000..f413c340ce4 --- /dev/null +++ b/encodings/runend-bool/src/compute/mod.rs @@ -0,0 +1,71 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +pub(crate) mod filter; +pub(crate) mod invert; +pub(crate) mod is_constant; +pub(crate) mod is_sorted; +pub(crate) mod min_max; +pub(crate) mod take; + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::compute::conformance::consistency::test_array_consistency; + use vortex_array::session::ArraySession; + use vortex_array::validity::Validity; + use vortex_buffer::BitBuffer; + use vortex_buffer::buffer; + use vortex_session::VortexSession; + + use crate::RunEndBool; + use crate::RunEndBoolArray; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn alternating() -> RunEndBoolArray { + let mut ctx = SESSION.create_execution_ctx(); + RunEndBool::try_new( + buffer![2u32, 5, 10].into_array(), + true, + Validity::NonNullable, + &mut ctx, + ) + .unwrap() + } + + fn single_run() -> RunEndBoolArray { + let mut ctx = SESSION.create_execution_ctx(); + RunEndBool::try_new( + buffer![6u32].into_array(), + false, + Validity::NonNullable, + &mut ctx, + ) + .unwrap() + } + + fn nullable() -> RunEndBoolArray { + let mut ctx = SESSION.create_execution_ctx(); + RunEndBool::try_new( + buffer![2u32, 4].into_array(), + true, + Validity::from(BitBuffer::from(vec![true, false, false, true])), + &mut ctx, + ) + .unwrap() + } + + #[rstest] + #[case::alternating(alternating())] + #[case::single_run(single_run())] + #[case::nullable(nullable())] + fn test_runend_bool_consistency(#[case] array: RunEndBoolArray) { + test_array_consistency(&array.into_array()); + } +} diff --git a/encodings/runend-bool/src/compute/take.rs b/encodings/runend-bool/src/compute/take.rs new file mode 100644 index 00000000000..7a850c090a1 --- /dev/null +++ b/encodings/runend-bool/src/compute/take.rs @@ -0,0 +1,110 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::BoolArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::dict::TakeExecute; +use vortex_array::match_each_integer_ptype; +use vortex_buffer::BitBufferMut; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; + +use crate::RunEndBool; +use crate::array::RunEndBoolArrayExt; +use crate::compress::value_at_index; + +impl TakeExecute for RunEndBool { + #[expect( + clippy::cast_possible_truncation, + reason = "index cast to usize inside macro" + )] + fn take( + array: ArrayView<'_, Self>, + indices: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + let primitive_indices = indices.clone().execute::(ctx)?; + let len = array.as_ref().len(); + let start = array.start(); + + let mut bits = BitBufferMut::with_capacity(primitive_indices.len()); + match_each_integer_ptype!(primitive_indices.ptype(), |P| { + for idx in primitive_indices.as_slice::

().iter().copied() { + let usize_idx = idx as usize; + if usize_idx >= len { + vortex_bail!(OutOfBounds: usize_idx, 0, len); + } + let run_index = array.find_physical_index(usize_idx)?; + bits.append(value_at_index(run_index, start)); + } + }); + + let validity = array + .bool_validity() + .take(&primitive_indices.into_array())?; + Ok(Some(BoolArray::new(bits.freeze(), validity).into_array())) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::BoolArray; + use vortex_array::assert_arrays_eq; + use vortex_array::compute::conformance::take::test_take_conformance; + use vortex_array::session::ArraySession; + use vortex_array::validity::Validity; + use vortex_buffer::BitBuffer; + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_session::VortexSession; + + use crate::RunEndBool; + use crate::RunEndBoolArray; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn ree_array() -> RunEndBoolArray { + let mut ctx = SESSION.create_execution_ctx(); + // [T,T,F,F,F,T,T,T,T,T] + RunEndBool::try_new( + buffer![2u32, 5, 10].into_array(), + true, + Validity::NonNullable, + &mut ctx, + ) + .unwrap() + } + + #[test] + fn ree_take() -> VortexResult<()> { + let taken = ree_array().take(buffer![0u32, 2, 5, 9].into_array())?; + let expected = BoolArray::from(BitBuffer::from(vec![true, false, true, true])); + assert_arrays_eq!(taken, expected); + Ok(()) + } + + #[rstest] + #[case(ree_array())] + #[case({ + let mut ctx = SESSION.create_execution_ctx(); + RunEndBool::try_new( + buffer![1u32, 3, 4].into_array(), + false, + Validity::from(BitBuffer::from(vec![true, false, false, true])), + &mut ctx, + ).unwrap() + })] + fn test_take_conformance_runend_bool(#[case] array: RunEndBoolArray) { + test_take_conformance(&array.into_array()); + } +} diff --git a/encodings/runend-bool/src/kernel.rs b/encodings/runend-bool/src/kernel.rs new file mode 100644 index 00000000000..a06e7bf28a4 --- /dev/null +++ b/encodings/runend-bool/src/kernel.rs @@ -0,0 +1,81 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::ConstantArray; +use vortex_array::arrays::Slice; +use vortex_array::arrays::dict::TakeExecuteAdaptor; +use vortex_array::arrays::filter::FilterExecuteAdaptor; +use vortex_array::dtype::Nullability; +use vortex_array::kernel::ExecuteParentKernel; +use vortex_array::kernel::ParentKernelSet; +use vortex_array::scalar::Scalar; +use vortex_array::scalar_fn::fns::not::NotExecuteAdaptor; +use vortex_error::VortexResult; +use vortex_runend::find_slice_end_index; + +use crate::RunEndBool; +use crate::array::RunEndBoolArrayExt; +use crate::compress::value_at_index; + +pub(super) const PARENT_KERNELS: ParentKernelSet = ParentKernelSet::new(&[ + ParentKernelSet::lift(&RunEndBoolSliceKernel), + ParentKernelSet::lift(&FilterExecuteAdaptor(RunEndBool)), + ParentKernelSet::lift(&TakeExecuteAdaptor(RunEndBool)), + ParentKernelSet::lift(&NotExecuteAdaptor(RunEndBool)), +]); + +/// Kernel to execute slicing on a [`RunEndBool`] array. +#[derive(Debug)] +struct RunEndBoolSliceKernel; + +impl ExecuteParentKernel for RunEndBoolSliceKernel { + type Parent = Slice; + + fn execute_parent( + &self, + array: ArrayView<'_, RunEndBool>, + parent: ArrayView<'_, Slice>, + _child_idx: usize, + _ctx: &mut ExecutionCtx, + ) -> VortexResult> { + slice(array, parent.slice_range().clone()).map(Some) + } +} + +fn slice(array: ArrayView<'_, RunEndBool>, range: Range) -> VortexResult { + let new_length = range.len(); + + let slice_begin = array.find_physical_index(range.start)?; + let slice_end = find_slice_end_index(array.ends(), range.end + array.offset())?; + + let nullability = array.nullability(); + let sliced_validity = array.bool_validity().slice(range.start..range.end)?; + + // If the sliced range contains only a single run and the array is non-nullable, opt to return a + // ConstantArray. When nullable we keep the run-end structure so per-element validity is + // preserved by canonicalization. + if slice_begin + 1 == slice_end && nullability == Nullability::NonNullable { + let value = value_at_index(slice_begin, array.start()); + return Ok(ConstantArray::new(Scalar::bool(value, nullability), new_length).into_array()); + } + + let new_start = value_at_index(slice_begin, array.start()); + + // SAFETY: slicing preserves the strictly-increasing ends invariant. + Ok(unsafe { + RunEndBool::new_unchecked( + array.ends().slice(slice_begin..slice_end)?, + new_start, + range.start + array.offset(), + new_length, + sliced_validity, + ) + .into_array() + }) +} diff --git a/encodings/runend-bool/src/lib.rs b/encodings/runend-bool/src/lib.rs new file mode 100644 index 00000000000..81d5759e916 --- /dev/null +++ b/encodings/runend-bool/src/lib.rs @@ -0,0 +1,69 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Run-end encoding specialized for boolean arrays. +//! +//! Boolean runs strictly alternate, so a [`RunEndBoolArray`] stores only the run `ends`, the value +//! of the first run (`start`), and an optional validity child, rather than a separate values array. + +pub use array::*; + +mod array; +pub mod compress; +mod compute; +mod kernel; +mod ops; + +use vortex_array::ArrayVTable; +use vortex_array::aggregate_fn::AggregateFnVTable; +use vortex_array::aggregate_fn::fns::is_constant::IsConstant; +use vortex_array::aggregate_fn::fns::is_sorted::IsSorted; +use vortex_array::aggregate_fn::fns::min_max::MinMax; +use vortex_array::aggregate_fn::session::AggregateFnSessionExt; +use vortex_array::session::ArraySessionExt; +use vortex_session::VortexSession; + +/// Initialize run-end bool encoding in the given session. +pub fn initialize(session: &VortexSession) { + session.arrays().register(RunEndBool); + + session.aggregate_fns().register_aggregate_kernel( + RunEndBool.id(), + Some(MinMax.id()), + &compute::min_max::RunEndBoolMinMaxKernel, + ); + session.aggregate_fns().register_aggregate_kernel( + RunEndBool.id(), + Some(IsConstant.id()), + &compute::is_constant::RunEndBoolIsConstantKernel, + ); + session.aggregate_fns().register_aggregate_kernel( + RunEndBool.id(), + Some(IsSorted.id()), + &compute::is_sorted::RunEndBoolIsSortedKernel, + ); +} + +#[cfg(test)] +mod tests { + use prost::Message; + use vortex_array::dtype::PType; + use vortex_array::test_harness::check_metadata; + + use crate::RunEndBoolMetadata; + + #[cfg_attr(miri, ignore)] + #[test] + fn test_runend_bool_metadata() { + check_metadata( + "runend_bool.metadata", + &RunEndBoolMetadata { + ends_ptype: PType::U64 as i32, + num_runs: u64::MAX, + offset: u64::MAX, + start: true, + } + .encode_to_vec(), + ); + } +} diff --git a/encodings/runend-bool/src/ops.rs b/encodings/runend-bool/src/ops.rs new file mode 100644 index 00000000000..34c3a5273bc --- /dev/null +++ b/encodings/runend-bool/src/ops.rs @@ -0,0 +1,100 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::scalar::Scalar; +use vortex_array::vtable::OperationsVTable; +use vortex_error::VortexResult; + +use crate::RunEndBool; +use crate::array::RunEndBoolArrayExt; +use crate::compress::value_at_index; + +impl OperationsVTable for RunEndBool { + fn scalar_at( + array: ArrayView<'_, RunEndBool>, + index: usize, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + // Honor validity: a null logical element produces a null scalar. + let validity_mask = array + .bool_validity() + .execute_mask(array.as_ref().len(), ctx)?; + if !validity_mask.value(index) { + return Ok(Scalar::null(array.as_ref().dtype().clone())); + } + + let run_index = array.find_physical_index(index)?; + let value = value_at_index(run_index, array.start()); + Ok(Scalar::bool(value, array.nullability())) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::BoolArray; + use vortex_array::assert_arrays_eq; + use vortex_array::session::ArraySession; + use vortex_array::validity::Validity; + use vortex_buffer::BitBuffer; + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_session::VortexSession; + + use crate::RunEndBool; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + #[test] + fn slice_array() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + // [T,T,F,F,F,T,T,T,T,T] sliced 3..8 => [F,F,T,T,T] + let arr = RunEndBool::try_new( + buffer![2u32, 5, 10].into_array(), + true, + Validity::NonNullable, + &mut ctx, + )? + .slice(3..8)?; + assert_eq!(arr.len(), 5); + let expected = BoolArray::from(BitBuffer::from(vec![false, false, true, true, true])); + assert_arrays_eq!(arr, expected); + Ok(()) + } + + #[test] + fn double_slice() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + let arr = RunEndBool::try_new( + buffer![2u32, 5, 10].into_array(), + true, + Validity::NonNullable, + &mut ctx, + )? + .slice(3..8)?; + let doubly_sliced = arr.slice(0..3)?; + let expected = BoolArray::from(BitBuffer::from(vec![false, false, true])); + assert_arrays_eq!(doubly_sliced, expected); + Ok(()) + } + + #[test] + fn slice_to_empty() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + let arr = RunEndBool::try_new( + buffer![2u32, 5, 10].into_array(), + true, + Validity::NonNullable, + &mut ctx, + )?; + let sliced = arr.slice(arr.len()..arr.len())?; + assert!(sliced.is_empty()); + Ok(()) + } +} diff --git a/encodings/runend/src/array.rs b/encodings/runend/src/array.rs index 7bb069bb458..d01bf93dce5 100644 --- a/encodings/runend/src/array.rs +++ b/encodings/runend/src/array.rs @@ -28,9 +28,6 @@ use vortex_array::buffer::BufferHandle; use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; use vortex_array::dtype::PType; -use vortex_array::scalar::PValue; -use vortex_array::search_sorted::SearchSorted; -use vortex_array::search_sorted::SearchSortedSide; use vortex_array::serde::ArrayChildren; use vortex_array::smallvec::smallvec; use vortex_array::validity::Validity; @@ -44,6 +41,7 @@ use vortex_error::vortex_panic; use vortex_session::VortexSession; use vortex_session::registry::CachedId; +use crate::RunEndIndex; use crate::compress::runend_decode_primitive; use crate::compress::runend_decode_varbinview; use crate::compress::runend_encode; @@ -210,17 +208,7 @@ pub struct RunEndDataParts { pub offset: usize, } -pub trait RunEndArrayExt: TypedArrayRef { - fn offset(&self) -> usize { - self.offset - } - - fn ends(&self) -> &ArrayRef { - self.as_ref().slots()[ENDS_SLOT] - .as_ref() - .vortex_expect("RunEndArray ends slot") - } - +pub trait RunEndArrayExt: TypedArrayRef + RunEndIndex { fn values(&self) -> &ArrayRef { self.as_ref().slots()[VALUES_SLOT] .as_ref() @@ -230,19 +218,20 @@ pub trait RunEndArrayExt: TypedArrayRef { fn dtype(&self) -> &DType { self.values().dtype() } +} +impl> RunEndArrayExt for T {} + +impl> RunEndIndex for T { + fn ends(&self) -> &ArrayRef { + self.as_ref().slots()[ENDS_SLOT] + .as_ref() + .vortex_expect("RunEndArray ends slot") + } - fn find_physical_index(&self, index: usize) -> VortexResult { - Ok(self - .ends() - .as_primitive_typed() - .search_sorted( - &PValue::from(index + self.offset()), - SearchSortedSide::Right, - )? - .to_ends_index(self.ends().len())) + fn offset(&self) -> usize { + self.offset } } -impl> RunEndArrayExt for T {} #[derive(Clone, Debug)] pub struct RunEnd; @@ -320,11 +309,7 @@ impl RunEnd { impl RunEndData { fn logical_len_from_ends(ends: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { - if ends.is_empty() { - Ok(0) - } else { - usize::try_from(&ends.execute_scalar(ends.len() - 1, ctx)?) - } + crate::shared::logical_len_from_ends(ends, ctx) } pub(crate) fn validate_parts( @@ -334,69 +319,13 @@ impl RunEndData { length: usize, ctx: &mut ExecutionCtx, ) -> VortexResult<()> { - // DType validation - vortex_ensure!( - ends.dtype().is_unsigned_int(), - "run ends must be unsigned integers, was {}", - ends.dtype(), - ); vortex_ensure!( ends.len() == values.len(), "run ends len != run values len, {} != {}", ends.len(), values.len() ); - - // Handle empty run-ends - if ends.is_empty() { - vortex_ensure!( - offset == 0, - "non-zero offset provided for empty RunEndArray" - ); - return Ok(()); - } - - // Zero-length logical slices may retain run metadata from the source array. - if length == 0 { - return Ok(()); - } - - #[cfg(debug_assertions)] - { - // Run ends must be strictly sorted for binary search to work correctly. - let pre_validation = ends.statistics().to_owned(); - - let is_sorted = ends - .statistics() - .compute_is_strict_sorted(ctx) - .unwrap_or(false); - - // Preserve the original statistics since compute_is_strict_sorted may have mutated them. - // We don't want to run with different stats in debug mode and outside. - ends.statistics().inherit(pre_validation.iter()); - debug_assert!(is_sorted); - } - - // Skip host-only validation when ends are not host-resident. - if !ends.is_host() { - return Ok(()); - } - - // Validate the offset and length are valid for the given ends and values - if offset != 0 && length != 0 { - let first_run_end = usize::try_from(&ends.execute_scalar(0, ctx)?)?; - if first_run_end < offset { - vortex_bail!("First run end {first_run_end} must be >= offset {offset}"); - } - } - - let last_run_end = usize::try_from(&ends.execute_scalar(ends.len() - 1, ctx)?)?; - let min_required_end = offset + length; - if last_run_end < min_required_end { - vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}"); - } - - Ok(()) + crate::shared::validate_ends(ends, offset, length, ctx) } } diff --git a/encodings/runend/src/compute/cast.rs b/encodings/runend/src/compute/cast.rs index f70bb0becb5..5b464f2605d 100644 --- a/encodings/runend/src/compute/cast.rs +++ b/encodings/runend/src/compute/cast.rs @@ -10,6 +10,7 @@ use vortex_array::scalar_fn::fns::cast::CastReduce; use vortex_error::VortexResult; use crate::RunEnd; +use crate::RunEndIndex; use crate::array::RunEndArrayExt; impl CastReduce for RunEnd { fn cast(array: ArrayView<'_, Self>, dtype: &DType) -> VortexResult> { diff --git a/encodings/runend/src/compute/compare.rs b/encodings/runend/src/compute/compare.rs index 4da7a453000..ba4aa1648c2 100644 --- a/encodings/runend/src/compute/compare.rs +++ b/encodings/runend/src/compute/compare.rs @@ -15,6 +15,7 @@ use vortex_array::scalar_fn::fns::operators::Operator; use vortex_error::VortexResult; use crate::RunEnd; +use crate::RunEndIndex; use crate::array::RunEndArrayExt; use crate::decompress_bool::runend_decode_bools; diff --git a/encodings/runend/src/compute/fill_null.rs b/encodings/runend/src/compute/fill_null.rs index ddce31184eb..9135ea65ee3 100644 --- a/encodings/runend/src/compute/fill_null.rs +++ b/encodings/runend/src/compute/fill_null.rs @@ -10,6 +10,7 @@ use vortex_array::scalar_fn::fns::fill_null::FillNullReduce; use vortex_error::VortexResult; use crate::RunEnd; +use crate::RunEndIndex; use crate::array::RunEndArrayExt; impl FillNullReduce for RunEnd { diff --git a/encodings/runend/src/compute/filter.rs b/encodings/runend/src/compute/filter.rs index 60644e2bced..3df7613a9e4 100644 --- a/encodings/runend/src/compute/filter.rs +++ b/encodings/runend/src/compute/filter.rs @@ -21,6 +21,7 @@ use vortex_error::VortexResult; use vortex_mask::Mask; use crate::RunEnd; +use crate::RunEndIndex; use crate::array::RunEndArrayExt; use crate::compute::take::take_indices_unchecked; const FILTER_TAKE_THRESHOLD: f64 = 0.1; diff --git a/encodings/runend/src/compute/take.rs b/encodings/runend/src/compute/take.rs index 7100faf9eac..d7063f77fd3 100644 --- a/encodings/runend/src/compute/take.rs +++ b/encodings/runend/src/compute/take.rs @@ -19,6 +19,7 @@ use vortex_error::VortexResult; use vortex_error::vortex_bail; use crate::RunEnd; +use crate::RunEndIndex; use crate::array::RunEndArrayExt; impl TakeExecute for RunEnd { diff --git a/encodings/runend/src/compute/take_from.rs b/encodings/runend/src/compute/take_from.rs index 32b22eb96c5..1ea08707e8d 100644 --- a/encodings/runend/src/compute/take_from.rs +++ b/encodings/runend/src/compute/take_from.rs @@ -12,6 +12,7 @@ use vortex_array::kernel::ExecuteParentKernel; use vortex_error::VortexResult; use crate::RunEnd; +use crate::RunEndIndex; use crate::array::RunEndArrayExt; #[derive(Debug)] @@ -65,6 +66,7 @@ mod tests { use crate::RunEnd; use crate::RunEndArray; + use crate::RunEndIndex; use crate::array::RunEndArrayExt; use crate::compute::take_from::RunEndTakeFrom; diff --git a/encodings/runend/src/kernel.rs b/encodings/runend/src/kernel.rs index 73f6ebc524e..e22529cfe4b 100644 --- a/encodings/runend/src/kernel.rs +++ b/encodings/runend/src/kernel.rs @@ -17,6 +17,7 @@ use vortex_array::scalar_fn::fns::binary::CompareExecuteAdaptor; use vortex_error::VortexResult; use crate::RunEnd; +use crate::RunEndIndex; use crate::array::RunEndArrayExt; use crate::compute::take_from::RunEndTakeFrom; diff --git a/encodings/runend/src/lib.rs b/encodings/runend/src/lib.rs index 8770dbbf58e..8ebec51c208 100644 --- a/encodings/runend/src/lib.rs +++ b/encodings/runend/src/lib.rs @@ -7,6 +7,7 @@ mod arbitrary; pub use arbitrary::ArbitraryRunEndArray; pub use array::*; pub use iter::trimmed_ends_iter; +pub use shared::*; mod array; #[cfg(feature = "arrow")] @@ -18,6 +19,7 @@ mod iter; mod kernel; mod ops; mod rules; +mod shared; #[doc(hidden)] pub mod _benchmarking { diff --git a/encodings/runend/src/ops.rs b/encodings/runend/src/ops.rs index 41a9e69419c..03b5f40fdae 100644 --- a/encodings/runend/src/ops.rs +++ b/encodings/runend/src/ops.rs @@ -1,19 +1,16 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use vortex_array::ArrayRef; use vortex_array::ArrayView; use vortex_array::ExecutionCtx; -use vortex_array::scalar::PValue; use vortex_array::scalar::Scalar; -use vortex_array::search_sorted::SearchResult; -use vortex_array::search_sorted::SearchSorted; -use vortex_array::search_sorted::SearchSortedSide; use vortex_array::vtable::OperationsVTable; use vortex_error::VortexResult; use crate::RunEnd; +use crate::RunEndIndex; use crate::array::RunEndArrayExt; +pub(crate) use crate::shared::find_slice_end_index; impl OperationsVTable for RunEnd { fn scalar_at( @@ -27,26 +24,6 @@ impl OperationsVTable for RunEnd { } } -/// Find the physical offset for and index that would be an end of the slice i.e., one past the last element. -/// -/// If the index exists in the array we want to take that position (as we are searching from the right) -/// otherwise we want to take the next one -pub(crate) fn find_slice_end_index(array: &ArrayRef, index: usize) -> VortexResult { - let result = array - .as_primitive_typed() - .search_sorted(&PValue::from(index), SearchSortedSide::Right)?; - Ok(match result { - SearchResult::Found(i) => i, - SearchResult::NotFound(i) => { - if i == array.len() { - i - } else { - i + 1 - } - } - }) -} - #[cfg(test)] mod tests { diff --git a/encodings/runend/src/rules.rs b/encodings/runend/src/rules.rs index ac7d6038146..ed145f46039 100644 --- a/encodings/runend/src/rules.rs +++ b/encodings/runend/src/rules.rs @@ -18,6 +18,7 @@ use vortex_array::scalar_fn::fns::fill_null::FillNullReduceAdaptor; use vortex_error::VortexResult; use crate::RunEnd; +use crate::RunEndIndex; use crate::array::RunEndArrayExt; pub(super) const RULES: ParentRuleSet = ParentRuleSet::new(&[ diff --git a/encodings/runend/src/shared.rs b/encodings/runend/src/shared.rs new file mode 100644 index 00000000000..fb866156c04 --- /dev/null +++ b/encodings/runend/src/shared.rs @@ -0,0 +1,140 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Shared run-end "ends indexing" logic reused by both the `RunEnd` and `RunEndBool` encodings. + +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::scalar::PValue; +use vortex_array::search_sorted::SearchResult; +use vortex_array::search_sorted::SearchSorted; +use vortex_array::search_sorted::SearchSortedSide; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; + +/// Shared run-end index math over a strictly-increasing unsigned `ends` child. +pub trait RunEndIndex { + /// The strictly-increasing run-end positions. + fn ends(&self) -> &ArrayRef; + + /// The logical offset into the first run. + fn offset(&self) -> usize; + + /// Find the physical run index containing logical `index`. + fn find_physical_index(&self, index: usize) -> VortexResult { + find_physical_index(self.ends(), self.offset(), index) + } +} + +/// Find the physical run index containing logical `index` for the given `ends` child and `offset`. +/// +/// This is the free-function form of [`RunEndIndex::find_physical_index`], usable where the orphan +/// rule prevents implementing [`RunEndIndex`] for a foreign array type. +pub fn find_physical_index(ends: &ArrayRef, offset: usize, index: usize) -> VortexResult { + Ok(ends + .as_primitive_typed() + .search_sorted(&PValue::from(index + offset), SearchSortedSide::Right)? + .to_ends_index(ends.len())) +} + +/// Find the physical offset for an index that would be an end of the slice, i.e., one past the last +/// element. +/// +/// If the index exists in the array we take that position (as we are searching from the right); +/// otherwise we take the next one. +pub fn find_slice_end_index(ends: &ArrayRef, index: usize) -> VortexResult { + let result = ends + .as_primitive_typed() + .search_sorted(&PValue::from(index), SearchSortedSide::Right)?; + Ok(match result { + SearchResult::Found(i) => i, + SearchResult::NotFound(i) => { + if i == ends.len() { + i + } else { + i + 1 + } + } + }) +} + +/// Validate the common invariants of a run-end `ends` child for the given `offset` and `length`. +/// +/// This checks that the ends are unsigned integers, that an empty ends child implies a zero offset, +/// and (for host-resident, non-empty, non-zero-length arrays) that the offset and last run end are +/// within bounds. In debug builds it additionally asserts that the ends are strictly sorted. +pub fn validate_ends( + ends: &ArrayRef, + offset: usize, + length: usize, + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + vortex_ensure!( + ends.dtype().is_unsigned_int(), + "run ends must be unsigned integers, was {}", + ends.dtype(), + ); + + // Handle empty run-ends + if ends.is_empty() { + vortex_ensure!( + offset == 0, + "non-zero offset provided for empty RunEndArray" + ); + return Ok(()); + } + + // Zero-length logical slices may retain run metadata from the source array. + if length == 0 { + return Ok(()); + } + + #[cfg(debug_assertions)] + { + // Run ends must be strictly sorted for binary search to work correctly. + let pre_validation = ends.statistics().to_owned(); + + let is_sorted = ends + .statistics() + .compute_is_strict_sorted(ctx) + .unwrap_or(false); + + // Preserve the original statistics since compute_is_strict_sorted may have mutated them. + // We don't want to run with different stats in debug mode and outside. + ends.statistics().inherit(pre_validation.iter()); + debug_assert!(is_sorted); + } + + // Skip host-only validation when ends are not host-resident. + if !ends.is_host() { + return Ok(()); + } + + // Validate the offset and length are valid for the given ends. + if offset != 0 && length != 0 { + let first_run_end = usize::try_from(&ends.execute_scalar(0, ctx)?)?; + if first_run_end < offset { + vortex_bail!("First run end {first_run_end} must be >= offset {offset}"); + } + } + + let last_run_end = usize::try_from(&ends.execute_scalar(ends.len() - 1, ctx)?)?; + let min_required_end = offset + length; + if last_run_end < min_required_end { + vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}"); + } + + Ok(()) +} + +/// Compute the logical length of a run-end array from its `ends` child. +/// +/// The logical length is the value of the last run end, or `0` if there are no runs. +pub fn logical_len_from_ends(ends: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + if ends.is_empty() { + Ok(0) + } else { + usize::try_from(&ends.execute_scalar(ends.len() - 1, ctx)?) + } +} diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 34a98f85b73..b6a468ad26b 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -30,6 +30,7 @@ vortex-fsst = { workspace = true } vortex-onpair = { workspace = true, optional = true } vortex-pco = { workspace = true, optional = true } vortex-runend = { workspace = true } +vortex-runend-bool = { workspace = true } vortex-sequence = { workspace = true } vortex-sparse = { workspace = true } vortex-utils = { workspace = true } diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs index 43df788473a..5115cc6c0b3 100644 --- a/vortex-btrblocks/src/builder.rs +++ b/vortex-btrblocks/src/builder.rs @@ -27,6 +27,7 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[ // Bool schemes. //////////////////////////////////////////////////////////////////////////////////////////////// &bool::BoolConstantScheme, + &bool::BoolRunEndScheme, //////////////////////////////////////////////////////////////////////////////////////////////// // Integer schemes. //////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/vortex-btrblocks/src/canonical_compressor.rs b/vortex-btrblocks/src/canonical_compressor.rs index 37e736fb550..17396a9f05a 100644 --- a/vortex-btrblocks/src/canonical_compressor.rs +++ b/vortex-btrblocks/src/canonical_compressor.rs @@ -198,6 +198,30 @@ mod tests { Ok(()) } + #[test] + fn test_run_heavy_bool_uses_runend() -> VortexResult<()> { + use vortex_runend_bool::RunEndBool; + + // 10 runs of length 100 each = 1000 rows, alternating true/false. + let mut bits = Vec::with_capacity(1000); + for run in 0..10 { + bits.extend(std::iter::repeat_n(run % 2 == 0, 100)); + } + let array = BoolArray::from(BitBuffer::from(bits)); + let btr = BtrBlocksCompressor::default(); + let compressed = btr.compress( + &array.clone().into_array(), + &mut SESSION.create_execution_ctx(), + )?; + assert!( + compressed.is::(), + "expected RunEndBool, got {}", + compressed.encoding_id() + ); + assert_arrays_eq!(compressed, array); + Ok(()) + } + #[test] fn test_binary_constant_compressed() -> VortexResult<()> { let values = vec![Some(b"constant-bytes".as_slice()); 100]; diff --git a/vortex-btrblocks/src/schemes/bool.rs b/vortex-btrblocks/src/schemes/bool/mod.rs similarity index 83% rename from vortex-btrblocks/src/schemes/bool.rs rename to vortex-btrblocks/src/schemes/bool/mod.rs index c27251a8599..7aee11e828a 100644 --- a/vortex-btrblocks/src/schemes/bool.rs +++ b/vortex-btrblocks/src/schemes/bool/mod.rs @@ -3,5 +3,8 @@ //! Bool compression schemes. +mod runend; + +pub use runend::BoolRunEndScheme; pub use vortex_compressor::builtins::BoolConstantScheme; pub use vortex_compressor::stats::BoolStats; diff --git a/vortex-btrblocks/src/schemes/bool/runend.rs b/vortex-btrblocks/src/schemes/bool/runend.rs new file mode 100644 index 00000000000..b80b0de2ae6 --- /dev/null +++ b/vortex-btrblocks/src/schemes/bool/runend.rs @@ -0,0 +1,131 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Run-end encoding for boolean arrays. + +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::Bool; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::bool::BoolArrayExt; +use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::DeferredEstimate; +use vortex_compressor::estimate::EstimateVerdict; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_runend_bool::RunEndBool; +use vortex_runend_bool::RunEndBoolArrayExt; +use vortex_runend_bool::compress::encode_runend_bool; + +use crate::ArrayAndStats; +use crate::CascadingCompressor; +use crate::CompressorContext; +use crate::Scheme; +use crate::SchemeExt; + +/// Minimum average run length before run-end encoding bool arrays is considered worthwhile. +const RUN_END_THRESHOLD: usize = 8; + +/// Run-end encoding for boolean arrays. +/// +/// Boolean runs strictly alternate, so the encoded form stores only the run ends (plus a `start` +/// flag and optional validity). This is profitable when runs are long. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct BoolRunEndScheme; + +impl BoolRunEndScheme { + /// Count the number of boolean runs in the canonical bool array. + fn run_count(data: &ArrayAndStats) -> usize { + let bool_array = data + .array() + .as_opt::() + .vortex_expect("BoolRunEndScheme matches only canonical bool arrays"); + let bits = bool_array.to_bit_buffer(); + if bits.is_empty() { + return 0; + } + // Each transition between true and false starts a new run; the number of runs is the number + // of `true` slices plus the number of `false` gaps, which equals transitions + 1. + let mut runs = 1usize; + let mut prev = bits.value(0); + for i in 1..bits.len() { + let cur = bits.value(i); + if cur != prev { + runs += 1; + prev = cur; + } + } + runs + } +} + +impl Scheme for BoolRunEndScheme { + fn scheme_name(&self) -> &'static str { + "vortex.bool.runend" + } + + fn matches(&self, canonical: &Canonical) -> bool { + matches!(canonical, Canonical::Bool(_)) + } + + /// Children: ends=0. + fn num_children(&self) -> usize { + 1 + } + + fn expected_compression_ratio( + &self, + data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + let length = data.array_len(); + if length == 0 { + return CompressionEstimate::Verdict(EstimateVerdict::Skip); + } + + let runs = Self::run_count(data).max(1); + let average_run_length = length / runs; + if average_run_length < RUN_END_THRESHOLD { + return CompressionEstimate::Verdict(EstimateVerdict::Skip); + } + + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + compressor: &CascadingCompressor, + data: &ArrayAndStats, + compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let bool_array = data + .array() + .as_opt::() + .vortex_expect("BoolRunEndScheme matches only canonical bool arrays"); + let encoded = encode_runend_bool(bool_array, exec_ctx)?; + + let ends = encoded.ends().clone(); + let start = encoded.start(); + let offset = encoded.offset(); + let validity = encoded.bool_validity(); + let length = encoded.as_ref().len(); + + let ends_primitive = ends.execute::(exec_ctx)?; + let compressed_ends = compressor.compress_child( + &ends_primitive.into_array(), + &compress_ctx, + self.id(), + 0, + exec_ctx, + )?; + + // SAFETY: compression of the ends preserves the strictly-increasing invariant. + Ok(unsafe { + RunEndBool::new_unchecked(compressed_ends, start, offset, length, validity).into_array() + }) + } +} diff --git a/vortex-cuda/src/dynamic_dispatch/plan_builder.rs b/vortex-cuda/src/dynamic_dispatch/plan_builder.rs index d7c5a58712d..e9700d6a254 100644 --- a/vortex-cuda/src/dynamic_dispatch/plan_builder.rs +++ b/vortex-cuda/src/dynamic_dispatch/plan_builder.rs @@ -34,6 +34,7 @@ use vortex::encodings::fastlanes::FoR; use vortex::encodings::fastlanes::FoRArrayExt; use vortex::encodings::runend::RunEnd; use vortex::encodings::runend::RunEndArrayExt; +use vortex::encodings::runend::RunEndIndex; use vortex::encodings::sequence::Sequence; use vortex::encodings::zigzag::ZigZag; use vortex::encodings::zigzag::ZigZagArrayExt; diff --git a/vortex-cuda/src/kernel/encodings/runend.rs b/vortex-cuda/src/kernel/encodings/runend.rs index fca435478d8..7242c056120 100644 --- a/vortex-cuda/src/kernel/encodings/runend.rs +++ b/vortex-cuda/src/kernel/encodings/runend.rs @@ -22,6 +22,7 @@ use vortex::dtype::PType; use vortex::encodings::runend::RunEnd; use vortex::encodings::runend::RunEndArray; use vortex::encodings::runend::RunEndArrayExt; +use vortex::encodings::runend::RunEndIndex; use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_ensure; diff --git a/vortex-duckdb/src/exporter/run_end.rs b/vortex-duckdb/src/exporter/run_end.rs index de3c08c4759..525a5fd8f0d 100644 --- a/vortex-duckdb/src/exporter/run_end.rs +++ b/vortex-duckdb/src/exporter/run_end.rs @@ -13,6 +13,7 @@ use vortex::array::search_sorted::SearchSortedSide; use vortex::dtype::IntegerPType; use vortex::encodings::runend::RunEndArray; use vortex::encodings::runend::RunEndArrayExt; +use vortex::encodings::runend::RunEndIndex; use vortex::error::VortexExpect; use vortex::error::VortexResult; diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index c4bf980d683..edbaf3d8107 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -49,6 +49,7 @@ vortex-metrics = { workspace = true } vortex-onpair = { workspace = true, optional = true } vortex-pco = { workspace = true } vortex-runend = { workspace = true } +vortex-runend-bool = { workspace = true } vortex-scan = { workspace = true } vortex-sequence = { workspace = true } vortex-session = { workspace = true } diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index 9b927abff54..e67de5e7009 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -184,6 +184,7 @@ pub fn register_default_encodings(session: &VortexSession) { vortex_decimal_byte_parts::initialize(session); vortex_fastlanes::initialize(session); vortex_runend::initialize(session); + vortex_runend_bool::initialize(session); vortex_sequence::initialize(session); vortex_sparse::initialize(session); diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index e320cf2e9d9..312f5761ade 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -217,6 +217,61 @@ async fn test_round_trip_many_types() { assert_eq!(read.len(), 3); } +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn test_round_trip_runend_bool() { + use vortex_array::arrays::BoolArray; + use vortex_buffer::BitBuffer; + use vortex_runend_bool::RunEndBool; + + let mut ctx = SESSION.create_execution_ctx(); + + // A run-heavy bool array encoded as RunEndBool. + let runend = RunEndBool::try_new( + buffer![3u32, 6, 10].into_array(), + true, + Validity::NonNullable, + &mut ctx, + ) + .unwrap(); + + let expected = BoolArray::from(BitBuffer::from(vec![ + true, true, true, false, false, false, true, true, true, true, + ])) + .into_array(); + + let st = StructArray::from_fields(&[("bools", runend.into_array())]).unwrap(); + let mut buf = ByteBufferMut::empty(); + + SESSION + .write_options() + .write(&mut buf, st.into_array().to_array_stream()) + .await + .unwrap(); + + let chunks: Vec<_> = SESSION + .open_options() + .open_buffer(buf) + .unwrap() + .scan() + .unwrap() + .into_array_stream() + .unwrap() + .try_collect() + .await + .unwrap(); + + assert_eq!(chunks.len(), 1); + let read_field = chunks[0] + .clone() + .execute::(&mut ctx) + .unwrap() + .unmasked_field_by_name("bools") + .unwrap() + .clone(); + assert_arrays_eq!(read_field, expected); +} + #[tokio::test] #[cfg_attr(miri, ignore)] async fn test_read_simple_with_spawn() { diff --git a/vortex/benches/common_encoding_tree_throughput.rs b/vortex/benches/common_encoding_tree_throughput.rs index 1507e0fb813..ba1f44b261a 100644 --- a/vortex/benches/common_encoding_tree_throughput.rs +++ b/vortex/benches/common_encoding_tree_throughput.rs @@ -42,6 +42,7 @@ use vortex::encodings::fsst::fsst_compress; use vortex::encodings::fsst::fsst_train_compressor; use vortex::encodings::runend::RunEnd; use vortex::encodings::runend::RunEndArrayExt; +use vortex::encodings::runend::RunEndIndex; use vortex::error::VortexExpect; use vortex::extension::datetime::TimeUnit; use vortex_session::VortexSession;