[AURON #2177] Implement native support for lag window function#2199
[AURON #2177] Implement native support for lag window function#2199officialasishkumar wants to merge 1 commit into
Conversation
Spark `lag(...)` is not supported in Auron's native window execution
path, causing queries using it to fall back to the Spark path instead
of being executed natively.
This PR extends native window coverage to include `lag(...)`:
- adds `Lag` handling in `NativeWindowBase`
- extends the protobuf/planner window function enum with `LAG`
- adds native planner support to decode `LAG` into the native window plan
- introduces a native `LagProcessor` in `datafusion-ext-plans`
- evaluates `lag` using Spark-compatible offset/default/null behavior
- adds a full-partition processing path for `lag` so that lookback
works correctly across input batches
- adds Rust regression coverage for cross-batch `lag`
- adds Scala regression tests for:
- native `lag(...)` execution
- Spark fallback for `lag(...) IGNORE NULLS`
The native implementation supports Spark semantics for:
- `lag(input)`
- default offset is 1
- default value is null
- `lag(input, offset, default)`
- returns the value of `input` at the `offset`th row before the
current row in the same window partition
- if the target row exists and `input` there is null, returns null
- if the target row does not exist, returns `default`
Supported scope in this PR:
- standard `RESPECT NULLS` behavior
Not supported natively in this PR:
- `IGNORE NULLS`
Unsupported `IGNORE NULLS` queries continue to fall back to Spark
to preserve correctness.
The full-partition processing infrastructure added here mirrors the
approach used for `lead` offset functions, ensuring all rows in a
partition are available before computing lag values across batch
boundaries.
Signed-off-by: Asish Kumar <officialasishkumar@gmail.com>
There was a problem hiding this comment.
Pull request overview
Adds native execution support for Spark’s lag(...) window function in Auron’s native window path, including planner/protobuf wiring and regression tests, while explicitly falling back for unsupported IGNORE NULLS semantics.
Changes:
- Add
Laghandling in Spark-side native window plan construction (with Spark fallback forIGNORE NULLS). - Extend native planner + protobuf to represent
LAG, and implement a native RustLagProcessor. - Add Rust + Scala regression tests, including cross-batch correctness and Spark fallback coverage.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala | Adds Spark-side detection/planning for LAG window expressions and blocks IGNORE NULLS. |
| spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronWindowSuite.scala | Adds Scala tests for native lag and Spark fallback for IGNORE NULLS. |
| native-engine/datafusion-ext-plans/src/window_exec.rs | Adds “full-partition” execution path (concat all batches) to support cross-batch lag evaluation. |
| native-engine/datafusion-ext-plans/src/window/window_context.rs | Adds requires_full_partition() helper to drive execution strategy. |
| native-engine/datafusion-ext-plans/src/window/processors/mod.rs | Exposes new lag_processor module. |
| native-engine/datafusion-ext-plans/src/window/processors/lag_processor.rs | Implements LagProcessor computing lag with offset/default semantics. |
| native-engine/datafusion-ext-plans/src/window/mod.rs | Adds WindowFunction::Lag wiring and a requires_full_partition() marker on expressions. |
| native-engine/auron-planner/src/planner.rs | Decodes protobuf LAG into native WindowFunction::Lag. |
| native-engine/auron-planner/proto/auron.proto | Extends protobuf enum with LAG. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private def lagIgnoreNulls(expr: Lag): Boolean = | ||
| expr.getClass.getMethods | ||
| .find(method => method.getName == "ignoreNulls" && method.getParameterCount == 0) | ||
| .exists(method => method.invoke(expr).asInstanceOf[Boolean]) | ||
|
|
| if window_ctx.requires_full_partition() { | ||
| let mut staging_batches = vec![]; | ||
| while let Some(batch) = input.next().await.transpose()? { | ||
| staging_batches.push(batch); | ||
| } | ||
|
|
||
| let outputs: Vec<ArrayRef> = batch | ||
| .columns() | ||
| .iter() | ||
| .cloned() | ||
| .chain(if window_ctx.output_window_cols { | ||
| window_cols | ||
| } else { | ||
| vec![] | ||
| }) | ||
| .zip(window_ctx.output_schema.fields()) | ||
| .map(|(array, field)| { | ||
| if array.data_type() != field.data_type() { | ||
| return cast(&array, field.data_type()); | ||
| } | ||
| Ok(array.clone()) | ||
| }) | ||
| .collect::<Result<_>>()?; | ||
| let output_batch = RecordBatch::try_new_with_options( | ||
| window_ctx.output_schema.clone(), | ||
| outputs, | ||
| &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())), | ||
| )?; | ||
| if !staging_batches.is_empty() { | ||
| let _timer = elapsed_compute.timer(); | ||
| let batch = concat_batches(&window_ctx.input_schema, &staging_batches)?; | ||
| let output_batch = | ||
| process_window_batch(batch, &window_ctx, processors.as_mut_slice())?; | ||
| exec_ctx | ||
| .baseline_metrics() | ||
| .record_output(output_batch.num_rows()); | ||
| sender.send(output_batch).await; | ||
| } | ||
| return Ok(()); | ||
| } |
There was a problem hiding this comment.
+1 to this, the full-partition path buffers all input batches, then concat_batches allocates another contiguous copy. Peak memory is ~3x partition size all live simultaneously
| let input_values = self.children[0] | ||
| .evaluate(batch) | ||
| .and_then(|v| v.into_array(batch.num_rows()))?; | ||
|
|
||
| let offset_values = self.children[1] | ||
| .evaluate(batch) | ||
| .and_then(|v| v.into_array(batch.num_rows()))?; | ||
| let offset_values = if offset_values.data_type() == &DataType::Int32 { | ||
| offset_values | ||
| } else { | ||
| cast(&offset_values, &DataType::Int32)? | ||
| }; | ||
| let offset = match ScalarValue::try_from_array(&offset_values, 0)? { | ||
| ScalarValue::Int32(Some(offset)) => offset as i64, | ||
| other => { | ||
| return Err(DataFusionError::Execution(format!( | ||
| "lag offset must be a non-null foldable integer, got {other:?}", | ||
| ))); | ||
| } | ||
| }; | ||
|
|
| assert_eq!( | ||
| self.children.len(), | ||
| 3, | ||
| "lag expects input/offset/default children", | ||
| ); |
| if window_ctx.requires_full_partition() { | ||
| let mut staging_batches = vec![]; | ||
| while let Some(batch) = input.next().await.transpose()? { | ||
| staging_batches.push(batch); | ||
| } | ||
|
|
||
| let outputs: Vec<ArrayRef> = batch | ||
| .columns() | ||
| .iter() | ||
| .cloned() | ||
| .chain(if window_ctx.output_window_cols { | ||
| window_cols | ||
| } else { | ||
| vec![] | ||
| }) | ||
| .zip(window_ctx.output_schema.fields()) | ||
| .map(|(array, field)| { | ||
| if array.data_type() != field.data_type() { | ||
| return cast(&array, field.data_type()); | ||
| } | ||
| Ok(array.clone()) | ||
| }) | ||
| .collect::<Result<_>>()?; | ||
| let output_batch = RecordBatch::try_new_with_options( | ||
| window_ctx.output_schema.clone(), | ||
| outputs, | ||
| &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())), | ||
| )?; | ||
| if !staging_batches.is_empty() { | ||
| let _timer = elapsed_compute.timer(); | ||
| let batch = concat_batches(&window_ctx.input_schema, &staging_batches)?; | ||
| let output_batch = | ||
| process_window_batch(batch, &window_ctx, processors.as_mut_slice())?; | ||
| exec_ctx | ||
| .baseline_metrics() | ||
| .record_output(output_batch.num_rows()); | ||
| sender.send(output_batch).await; | ||
| } | ||
| return Ok(()); | ||
| } |
There was a problem hiding this comment.
+1 to this, the full-partition path buffers all input batches, then concat_batches allocates another contiguous copy. Peak memory is ~3x partition size all live simultaneously
| let value = if target_idx >= partition_start && target_idx < partition_end { | ||
| ScalarValue::try_from_array(&input_values, target_idx as usize)? | ||
| } else { | ||
| ScalarValue::try_from_array(&default_values, row_idx)? |
There was a problem hiding this comment.
If I understand correctly, the row-by-row ScalarValue::try_from_array + ScalarValue::iter_to_array pattern creates N heap-allocated scalar objects. For large partitions that full-partition buffering implies, this is will become an issue. Can we use arrow::compute::take with a pre-computed indices array to gather values in O(1) allocations?
| )?; | ||
| if !staging_batches.is_empty() { | ||
| let _timer = elapsed_compute.timer(); | ||
| let batch = concat_batches(&window_ctx.input_schema, &staging_batches)?; |
There was a problem hiding this comment.
The full-partition buffering approach works okay but introduces unbounded memory risk for skewed partitions. Since the input is guaranteed sorted by partition keys, lag can be implemented as a streaming processor (like Rank/Agg) with an O(offset) ring buffer. This would eliminate the need for requires_full_partition(), concat_batches, and the dual code paths in execute_window. Wanted to check with you and see if you had considered a streaming approach here?
weiqingy
left a comment
There was a problem hiding this comment.
Thanks for taking this on — the Spark-semantics handling reads carefully (the offset sign, null-in-target vs default, and the IGNORE NULLS fallback are all faithful), and the fallback test that asserts no NativeWindowBase survives is a nice way to lock the boundary down. My main feedback is about the base this is built on rather than the logic itself — a few questions inline.
| windowExprBuilder.setFuncType(pb.WindowFunctionType.Window) | ||
| windowExprBuilder.setWindowFunc(pb.WindowFunction.LAG) | ||
| windowExprBuilder.addChildren(NativeConverters.convertExpr(e.input)) | ||
| windowExprBuilder.addChildren(NativeConverters.convertExpr(e.inputOffset)) |
There was a problem hiding this comment.
Since this opened in April, native lead(...) landed on master — the mirror offset function — and with it lag may not need a parallel native path at all. lag_processor.rs here is nearly identical to the merged lead_processor.rs; the only real difference is target_idx = row_idx - offset instead of + offset. Spark models the two as the same thing: Lag and Lead both extend FrameLessOffsetWindowFunction, and Lag carries both an inputOffset() and a separate derived offset() that negates it. The merged Lead case feeds e.offset into WindowFunction.LEAD → LeadProcessor (row_idx + offset). So a Lag case passing e.offset (rather than e.inputOffset as here) and mapping to WindowFunction.LEAD would reuse LeadProcessor and produce correct lag — with no new proto value, planner arm, or processor. Did you consider folding lag into the Lead path? If you'd rather keep them separate (anticipated divergence, readability), that's fair — worth capturing the rationale, since otherwise it's two copies of the same logic to maintain. Same goes for lagIgnoreNulls just above, which duplicates the existing leadIgnoreNulls / invokeNoArg[Boolean] helper.
| )?; | ||
| window_cols[0] = arrow::compute::filter(&window_cols[0], &limited)?; | ||
| batch = arrow::compute::filter_record_batch(&batch, &limited)?; | ||
| if window_ctx.requires_full_partition() { |
There was a problem hiding this comment.
This full-partition staging path already exists on master — requires_full_partition(), process_window_batch(), and the concat_batches staging were added when Lead merged, so this part of the diff largely duplicates what's there and will conflict on rebase (the requires_full_partition() helpers on WindowExpr/WindowContext are already defined too). Could you rebase onto current master? The real delta should shrink a lot. It also means the memory/streaming questions on the existing threads can lean on the precedent the merged Lead path already set — full-partition buffering is now the established approach for offset functions, rather than something this PR has to settle.
| ROW_NUMBER = 0; | ||
| RANK = 1; | ||
| DENSE_RANK = 2; | ||
| LAG = 3; |
There was a problem hiding this comment.
LAG = 3 collides with LEAD = 3, which is already on master. Two values sharing the same ordinal in one enum won't compile without allow_alias, so on rebase this needs a fresh number (8 is the next free one). Flagging because a textual merge can hide it — the conflict is on the comment/whitespace, not the = 3, so it can resolve cleanly and still break the build. (If lag ends up reusing the LEAD path per the other comment, this enum entry goes away entirely.)
Which issue does this PR close?
Closes #2177
Rationale for this change
Auron's native window support previously covered rank-like functions and a subset of aggregate window functions, but did not support offset-based window functions such as
lag(...).This PR extends native window coverage to include
lag(...):lag(...)input,inputOffset, anddefaultWhat changes are included in this PR?
This PR:
Laghandling inNativeWindowBaseLAGLAGinto the native window planLagProcessorindatafusion-ext-planslagusing Spark-compatible offset/default/null behaviorlagso that lookback works correctly across input batcheslaglag(...)executionlag(...) IGNORE NULLSThe native implementation supports Spark semantics for:
lag(input)1nulllag(input, offset, default)inputat theoffsetth row before the current row in the same window partitioninputthere isnull, returnsnulldefaultSupported scope in this PR:
RESPECT NULLSbehaviorNot supported natively in this PR:
IGNORE NULLSUnsupported
IGNORE NULLSqueries continue to fall back to Spark to preserve correctness.The full-partition processing infrastructure added here mirrors the approach used for offset-based window functions, ensuring all rows in a partition are available before computing lag values across batch boundaries.
Are there any user-facing changes?
Yes.
Queries using
lag(...)can now remain on Auron's native window execution path when they use supported semantics.Queries using unsupported
lag(...) IGNORE NULLSbehavior will continue to fall back to Spark.How was this patch tested?
CI.