[AURON #2178] [AURON #2179] Implement native support for first_value and last_value window functions#2200
Conversation
…first_value and last_value window functions Spark `first_value(...)` and `last_value(...)` are not supported in Auron's native window execution path, causing queries using them to fall back to the Spark path instead of being executed natively. This PR extends native window coverage to include both functions: first_value: - maps `First(child, ignoreNulls)` in the window expression to the existing `AggFunction::First` / `AggFunction::FirstIgnoresNull` Rust aggregates via `NativeWindowBase` - the running-aggregate semantics of `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW` naturally produce the correct first-value behavior through the existing `AggProcessor` - no new Rust aggregate code is required for first_value last_value: - introduces `AggFunction::Last` and `AggFunction::LastIgnoresNull` to the Rust aggregate infrastructure - adds `AggLast` (always updates accumulator, including with null values) and `AggLastIgnoresNull` (updates accumulator only for non-null values) - extends the protobuf `AggFunction` enum with `LAST = 10` and `LAST_IGNORES_NULL = 11` - adds planner and lib.rs mappings for the new proto values - maps `Last(child, ignoreNulls)` in the window expression to the new `AggFunction::Last` / `AggFunction::LastIgnoresNull` aggregates Both functions use the existing `AggProcessor` with frame `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`, consistent with all other aggregate window functions in Auron. Additional changes: - adds `Last` import and conversion case to `NativeConverters.scala` so `last()` as a group aggregate also works natively - adds `First` and `Last` imports and match cases to `NativeWindowBase` - adds Scala regression tests for first_value and last_value, covering both RESPECT NULLS and IGNORE NULLS variants Signed-off-by: Asish Kumar <officialasishkumar@gmail.com>
There was a problem hiding this comment.
Pull request overview
Adds native execution support for Spark first_value / last_value window functions (including IGNORE NULLS) by wiring them through Auron’s existing aggregate-window infrastructure and extending the native aggregate set.
Changes:
- Extend Spark-side window/aggregate conversion to emit
FIRST(_IGNORES_NULL)and newLAST(_IGNORES_NULL)agg functions. - Add native-engine Rust implementations for
LastandLastIgnoresNull, plus factory/planner/proto enum wiring. - Add Scala regression tests covering
first_value/last_valuewith RESPECT/IGNORE NULLS.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala | Adds window-function mapping for First/Last aggregates to protobuf agg functions. |
| spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala | Adds group-aggregate conversion support for Last / Last IGNORE NULLS. |
| spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronWindowSuite.scala | Adds regression coverage for first_value / last_value (incl. IGNORE NULLS). |
| native-engine/datafusion-ext-plans/src/agg/mod.rs | Registers new Last / LastIgnoresNull modules and enum variants. |
| native-engine/datafusion-ext-plans/src/agg/last.rs | Implements AggLast (RESPECT NULLS) aggregate behavior. |
| native-engine/datafusion-ext-plans/src/agg/last_ignores_null.rs | Implements AggLastIgnoresNull aggregate behavior. |
| native-engine/datafusion-ext-plans/src/agg/agg.rs | Wires new agg functions into the aggregate factory. |
| native-engine/auron-planner/src/planner.rs | Maps protobuf window agg functions to native AggFunction::Last*. |
| native-engine/auron-planner/src/lib.rs | Adds protobuf-to-native enum conversion for Last*. |
| native-engine/auron-planner/proto/auron.proto | Extends protobuf AggFunction enum with LAST and LAST_IGNORES_NULL. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| package org.apache.auron | ||
|
|
||
| import org.apache.spark.sql.AuronQueryTest | ||
| import org.apache.spark.sql.execution.auron.plan.NativeWindowBase |
| let accs = downcast_any!(accs, mut AccScalarValueColumn)?; | ||
| idx_for_zipped! { | ||
| ((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => { | ||
| if partial_arg.is_valid(partial_arg_idx) { | ||
| accs.set_value(acc_idx, compacted_scalar_value_from_array(partial_arg, partial_arg_idx)?); | ||
| } else { | ||
| accs.set_value(acc_idx, ScalarValue::try_from(&self.data_type)?); | ||
| } | ||
| } |
weiqingy
left a comment
There was a problem hiding this comment.
Thanks for taking this on — nice that first_value needed zero new Rust by reusing the existing AggProcessor / aggregate-window infrastructure, and AggLast reads cleanly as the "always overwrite" inversion of AggFirst. The tests are well-built too: checkSparkAnswerAndOperator both compares against vanilla Spark and asserts the native window operator actually ran. A few questions inline.
| }) | ||
| aggBuilder.addChildren(convertExpr(child)) | ||
|
|
||
| case Last(child, ignoresNullExpr) => |
There was a problem hiding this comment.
This case Last(child, ignoresNullExpr) adds last as a plain group-aggregate, which is a separate path from the window conversion in NativeWindowBase. All four new tests go through the window operator, so this group-agg branch ships without coverage — the new AggLast / AggLastIgnoresNull aggregates are only ever driven through the window path in CI.
The PR is framed around window functions, so I'm curious where this branch fits: is a SQL last(...) / last_value(...) as a GROUP BY aggregate reachable and intended here? If it is, would one checkSparkAnswerAndOperator test through the group-agg path make sense — Spark's own group-agg Last is nondeterministic, so pinning a tested behavior seems worth doing before relying on it. If it isn't reachable yet, would it be cleaner to defer it so it doesn't ship untested?
| test("last_value window function") { | ||
| withSQLConf("spark.auron.enable.window" -> "true") { | ||
| withTable("t1") { | ||
| sql("create table t1(id int, grp int, v string) using parquet") |
There was a problem hiding this comment.
The v column is string across all four tests, so only the DataType::Utf8 / handle_bytes! arm of the new aggregates runs in CI. The primitive (handle_primitive!, e.g. int/long/double), DataType::Boolean, and scalar _other arms of partial_update / partial_merge are never exercised — and the primitive merge arm copies unconditionally (including None), which is the "last wins" inversion of First's guarded write, so it'd be good to have it run at least once.
Would it be worth adding one window test over an int column and one over a boolean column? That'd cover the primitive and boolean arms that are currently dead in CI.
| withSQLConf("spark.auron.enable.window" -> "true") { | ||
| withTable("t1") { | ||
| sql("create table t1(id int, grp int, v string) using parquet") | ||
| sql("insert into t1 values (1, 1, 'a'), (2, 1, null), (3, 1, 'c'), (4, 2, 'x')") |
There was a problem hiding this comment.
Group 1 here starts with 'a' (non-null), so the leading value is never null. That means the case that actually distinguishes RESPECT NULLS from IGNORE NULLS — a group whose first ordered row is null, where first_value should return that leading null while ignore nulls skips to the first non-null — isn't hit by this test. The ignore-nulls test does lead with a null, but the plain RESPECT-NULLS test doesn't.
Could group 1 start with a null instead (e.g. (1, 1, null), (2, 1, 'b'), ...)? That makes the test show RESPECT NULLS returning the leading null, which is the most diagnostic shape for first_value.
| if partial_arg.is_valid(partial_arg_idx) { | ||
| accs.set_value(acc_idx, compacted_scalar_value_from_array(partial_arg, partial_arg_idx)?); | ||
| } else { | ||
| accs.set_value(acc_idx, ScalarValue::try_from(&self.data_type)?); |
There was a problem hiding this comment.
Reinforcing the earlier note on this line with one more data point: this scalar (_other) null branch rebuilds ScalarValue::try_from(&self.data_type)? per row, whereas AggFirst's equivalent branch writes the cheap constant ScalarValue::Null (first.rs:178), and AccScalarValueColumn already precomputes this exact value once as null_value (acc.rs:577). It's correct as-is and only affects the scalar path, but the per-row try_from could be hoisted above the idx_for_zipped! loop or replaced with the precomputed null.
Which issue does this PR close?
Closes #2178
Closes #2179
Rationale for this change
Spark
first_value(...)andlast_value(...)are not supported in Auron's native window execution path, causing queries using them to fall back instead of being executed natively.This PR extends native window coverage to include both functions using the existing aggregate window infrastructure.
What changes are included in this PR?
first_value:
FirstandFirst IGNORE NULLShandling toNativeWindowBaseAggFunction::First/AggFunction::FirstIgnoresNullRust aggregates viaAggProcessorROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROWnaturally produce correct first-value behavior through the existingAggProcessorfirst_valuelast_value:
AggFunction::LastandAggFunction::LastIgnoresNullto the Rust aggregate infrastructureAggLast(always updates accumulator, including with null values) andAggLastIgnoresNull(updates accumulator only for non-null values)AggFunctionenum withLAST = 10andLAST_IGNORES_NULL = 11LastandLast IGNORE NULLShandling toNativeWindowBaseAdditional changes:
Lastimport and conversion case toNativeConverters.scalasolast()as a group aggregate also works nativelyfirst_valueandlast_value(both RESPECT NULLS and IGNORE NULLS variants)Both functions use
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, consistent with all other aggregate window functions in Auron.Are there any user-facing changes?
Yes.
Queries using
first_value(...)andlast_value(...)can now remain on Auron's native window execution path.Both RESPECT NULLS (default) and IGNORE NULLS variants are supported.
How was this patch tested?
CI.