Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions vortex-array/src/arrays/constant/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,10 @@ fn append_value_or_nulls<B: ArrayBuilder + 'static>(
mod tests {
use rstest::rstest;
use vortex_error::VortexResult;
use vortex_session::VortexSession;

use crate::IntoArray;
use crate::VortexSessionExecute;
use crate::array_session;
use crate::arrays::ConstantArray;
use crate::arrays::constant::vtable::canonical::constant_canonicalize;
use crate::assert_arrays_eq;
Expand All @@ -286,7 +286,7 @@ mod tests {

/// Appends `array` into a fresh builder and asserts the result matches `constant_canonicalize`.
fn assert_append_matches_canonical(array: ConstantArray) -> VortexResult<()> {
let mut ctx = VortexSession::empty().create_execution_ctx();
let mut ctx = array_session().create_execution_ctx();

let expected = constant_canonicalize(array.as_view(), &mut ctx)?.into_array();
let mut builder = builder_with_capacity(array.dtype(), array.len());
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/arrays/patched/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ mod tests {

use vortex_buffer::buffer;
use vortex_error::VortexResult;
use vortex_session::VortexSession;

use crate::ArrayRef;
use crate::ExecutionCtx;
use crate::IntoArray;
use crate::array_session;
use crate::arrays::Patched;
use crate::arrays::PrimitiveArray;
use crate::assert_arrays_eq;
Expand All @@ -158,7 +158,7 @@ mod tests {
None,
)?;

let session = VortexSession::empty();
let session = array_session();
let mut ctx = ExecutionCtx::new(session);

Patched::from_array_and_patches(values, &patches, &mut ctx)?
Expand Down
18 changes: 9 additions & 9 deletions vortex-array/src/arrays/patched/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ mod tests {
use vortex_buffer::buffer;
use vortex_buffer::buffer_mut;
use vortex_error::VortexResult;
use vortex_session::VortexSession;
use vortex_session::registry::ReadContext;

use crate::ArrayContext;
Expand All @@ -363,6 +362,7 @@ mod tests {
use crate::ExecutionCtx;
use crate::IntoArray;
use crate::LEGACY_SESSION;
use crate::array_session;
use crate::arrays::Patched;
use crate::arrays::PatchedArray;
use crate::arrays::PrimitiveArray;
Expand All @@ -389,7 +389,7 @@ mod tests {
)
.unwrap();

let session = VortexSession::empty();
let session = array_session();
let mut ctx = ExecutionCtx::new(session);

let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
Expand Down Expand Up @@ -422,7 +422,7 @@ mod tests {
)
.unwrap();

let session = VortexSession::empty();
let session = array_session();
let mut ctx = ExecutionCtx::new(session);

let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
Expand Down Expand Up @@ -455,7 +455,7 @@ mod tests {
)
.unwrap();

let session = VortexSession::empty();
let session = array_session();
let mut ctx = ExecutionCtx::new(session);

let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
Expand Down Expand Up @@ -488,7 +488,7 @@ mod tests {
)
.unwrap();

let session = VortexSession::empty();
let session = array_session();
let mut ctx = ExecutionCtx::new(session);

let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
Expand Down Expand Up @@ -525,7 +525,7 @@ mod tests {
)
.unwrap();

let session = VortexSession::empty();
let session = array_session();
let mut ctx = ExecutionCtx::new(session);

let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
Expand Down Expand Up @@ -569,7 +569,7 @@ mod tests {

let patches = Patches::new(len, 0, indices, patch_vals, None)?;

let session = VortexSession::empty();
let session = array_session();
let mut ctx = ExecutionCtx::new(session);

Patched::from_array_and_patches(array, &patches, &mut ctx)
Expand Down Expand Up @@ -646,7 +646,7 @@ mod tests {
assert_eq!(array_ref.dtype(), new_array.dtype());

// Execute both and compare results
let mut ctx = ExecutionCtx::new(VortexSession::empty());
let mut ctx = ExecutionCtx::new(array_session());
let original_executed = array_ref.execute::<Canonical>(&mut ctx)?.into_primitive();
let new_executed = new_array.execute::<Canonical>(&mut ctx)?.into_primitive();

Expand All @@ -672,7 +672,7 @@ mod tests {
let new_array = array_ref.with_slots(slots.into_slots())?;

// Execute and verify the inner values changed (except at patch positions)
let mut ctx = ExecutionCtx::new(VortexSession::empty());
let mut ctx = ExecutionCtx::new(array_session());
let executed = new_array.execute::<Canonical>(&mut ctx)?.into_primitive();

// Expected: all 5s except indices 1, 2, 3 which are patched to 10, 20, 30
Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/arrays/patched/vtable/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ impl OperationsVTable<Patched> for Patched {
#[cfg(test)]
mod tests {
use vortex_buffer::buffer;
use vortex_session::VortexSession;

use crate::ExecutionCtx;
use crate::IntoArray;
use crate::LEGACY_SESSION;
use crate::VortexSessionExecute;
use crate::array_session;
use crate::arrays::Patched;
use crate::dtype::Nullability;
use crate::optimizer::ArrayOptimizer;
Expand All @@ -82,7 +82,7 @@ mod tests {
)
.unwrap();

let session = VortexSession::empty();
let session = array_session();
let mut ctx = ExecutionCtx::new(session);

let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
Expand Down Expand Up @@ -127,7 +127,7 @@ mod tests {
)
.unwrap();

let session = VortexSession::empty();
let session = array_session();
let mut ctx = ExecutionCtx::new(session);

let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
Expand Down Expand Up @@ -159,7 +159,7 @@ mod tests {
)
.unwrap();

let session = VortexSession::empty();
let session = array_session();
let mut ctx = ExecutionCtx::new(session);

let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
Expand Down
144 changes: 78 additions & 66 deletions vortex-array/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ impl ArrayRef {
let mut stack: Vec<StackFrame> = Vec::new();
let max_iterations = max_iterations();

let session = ctx.session().clone();
let fallback_kernels;
let kernels = match session.get_opt::<ArrayKernels>() {
Some(kernels) => kernels,
None => {
fallback_kernels = ArrayKernels::empty();
&fallback_kernels
}
};

for _ in 0..max_iterations {
let is_done = stack
.last()
Expand Down Expand Up @@ -196,13 +206,13 @@ impl ArrayRef {
// would be lost when we restore frame.parent_builder.
if current_builder.is_none()
&& let Some(frame) = stack.last()
&& let Some(result) =
current_array.execute_parent(&frame.parent_array, frame.slot_idx, ctx)?
&& let Some(result) = try_execute_parent(
&frame.parent_array,
std::iter::once((frame.slot_idx, &current_array)),
kernels,
ctx,
)?
{
ctx.log(format_args!(
"execute_parent (stack) rewrote {} -> {}",
current_array, result
));
let frame = stack.pop().vortex_expect("just peeked");
current_array = result.optimize_ctx(ctx.session())?;
current_builder = frame.parent_builder;
Expand All @@ -211,12 +221,13 @@ impl ArrayRef {

// Step 2b: execute_parent against current_array's own children.
if current_builder.is_none()
&& let Some(rewritten) = try_execute_parent(&current_array, ctx)?
&& let Some(rewritten) = try_execute_parent(
&current_array,
occupied_slots(&current_array),
kernels,
ctx,
)?
{
ctx.log(format_args!(
"execute_parent rewrote {} -> {}",
current_array, rewritten
));
current_array = rewritten.optimize_ctx(ctx.session())?;
continue;
}
Expand Down Expand Up @@ -422,26 +433,20 @@ impl Executable for ArrayRef {
}
}

let tmp_session = ctx.session().clone();
let kernels = tmp_session.get_opt::<ArrayKernels>();

for (slot_idx, slot) in array.slots().iter().enumerate() {
let Some(child) = slot else { continue };
if let Some(executed_parent) =
execute_parent_for_child(&array, child, slot_idx, kernels, ctx)?
{
ctx.log(format_args!(
"execute_parent: slot[{}]({}) rewrote {} -> {}",
slot_idx,
child.encoding_id(),
array,
executed_parent
));
executed_parent
.statistics()
.inherit_from(array.statistics());
return Ok(executed_parent);
let session = ctx.session().clone();
let fallback_kernels;
let kernels = match session.get_opt::<ArrayKernels>() {
Some(kernels) => kernels,
None => {
fallback_kernels = ArrayKernels::empty();
&fallback_kernels
}
};

if let Some(executed_parent) =
try_execute_parent(&array, occupied_slots(&array), kernels, ctx)?
{
return Ok(executed_parent);
}

ctx.log(format_args!("executing {}", array));
Expand Down Expand Up @@ -536,51 +541,58 @@ fn finalize_done(
Ok((output, None))
}

fn execute_parent_for_child(
/// Offer `parent` to each `(slot_idx, child)` pair, consulting the [`ArrayKernels`] registry
/// first and then the child's `execute_parent` method, and return the first successful rewrite
/// with `parent`'s statistics inherited onto it.
///
/// Step 2a of [`ArrayRef::execute_until`] passes a single pair (the suspended stack parent and
/// the focused child); Step 2b and the single-step [`Executable`] pass every occupied slot of an
/// array via [`occupied_slots`].
fn try_execute_parent<'a>(
parent: &ArrayRef,
child: &ArrayRef,
slot_idx: usize,
kernels: Option<&ArrayKernels>,
children: impl IntoIterator<Item = (usize, &'a ArrayRef)>,
kernels: &ArrayKernels,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
if let Some(kernels) = kernels
&& let Some(plugins) =
for (slot_idx, child) in children {
let mut rewritten = None;
if let Some(plugins) =
kernels.find_execute_parent(parent.encoding_id(), child.encoding_id())
{
for plugin in plugins.as_ref() {
if let Some(result) = plugin(child, parent, slot_idx, ctx)? {
return Ok(Some(result));
{
for plugin in plugins.as_ref() {
if let Some(result) = plugin(child, parent, slot_idx, ctx)? {
rewritten = Some(result);
break;
}
}
}
let rewritten = match rewritten {
Some(result) => result,
None => match child.execute_parent(parent, slot_idx, ctx)? {
Some(result) => result,
None => continue,
},
};
ctx.log(format_args!(
"execute_parent: slot[{}]({}) rewrote {} -> {}",
slot_idx,
child.encoding_id(),
parent,
rewritten
));
rewritten.statistics().inherit_from(parent.statistics());
return Ok(Some(rewritten));
}

child.execute_parent(parent, slot_idx, ctx)
Ok(None)
}

/// Try execute_parent on each occupied slot of the array.
fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
let tmp_session = ctx.session().clone();
let kernels = tmp_session.get_opt::<ArrayKernels>();

for (slot_idx, slot) in array.slots().iter().enumerate() {
let Some(child) = slot else { continue };
if let Some(executed_parent) =
execute_parent_for_child(array, child, slot_idx, kernels, ctx)?
{
ctx.log(format_args!(
"execute_parent: slot[{}]({}) rewrote {} -> {}",
slot_idx,
child.encoding_id(),
array,
executed_parent
));
executed_parent
.statistics()
.inherit_from(array.statistics());
return Ok(Some(executed_parent));
}
}
Ok(None)
/// Iterator over an array's occupied (`Some`) slots paired with their slot index.
fn occupied_slots(array: &ArrayRef) -> impl Iterator<Item = (usize, &ArrayRef)> {
array
.slots()
.iter()
.enumerate()
.filter_map(|(idx, slot)| slot.as_ref().map(|child| (idx, child)))
}

/// A predicate that determines when an array has reached a desired form during execution.
Expand Down
Loading