Skip to content

[AURON #2178] [AURON #2179] Implement native support for first_value and last_value window functions#2200

Open
officialasishkumar wants to merge 1 commit into
apache:masterfrom
officialasishkumar:feat/native-window-first-last-value
Open

[AURON #2178] [AURON #2179] Implement native support for first_value and last_value window functions#2200
officialasishkumar wants to merge 1 commit into
apache:masterfrom
officialasishkumar:feat/native-window-first-last-value

Conversation

@officialasishkumar

Copy link
Copy Markdown

Which issue does this PR close?

Closes #2178
Closes #2179

Rationale for this change

Spark first_value(...) and last_value(...) are not supported in Auron's native window execution path, causing queries using them to fall back instead of being executed natively.

This PR extends native window coverage to include both functions using the existing aggregate window infrastructure.

What changes are included in this PR?

first_value:

  • adds First and First IGNORE NULLS handling to NativeWindowBase
  • maps to the existing AggFunction::First / AggFunction::FirstIgnoresNull Rust aggregates via AggProcessor
  • the running-aggregate semantics of ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW naturally produce correct first-value behavior through the existing AggProcessor
  • no new Rust aggregate code is needed for first_value

last_value:

  • introduces AggFunction::Last and AggFunction::LastIgnoresNull to the Rust aggregate infrastructure
  • adds AggLast (always updates accumulator, including with null values) and AggLastIgnoresNull (updates accumulator only for non-null values)
  • extends the protobuf AggFunction enum with LAST = 10 and LAST_IGNORES_NULL = 11
  • adds planner and lib.rs mappings for the new proto values
  • adds Last and Last IGNORE NULLS handling to NativeWindowBase

Additional changes:

  • adds Last import and conversion case to NativeConverters.scala so last() as a group aggregate also works natively
  • adds Scala regression tests for first_value and last_value (both RESPECT NULLS and IGNORE NULLS variants)

Both functions use ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, consistent with all other aggregate window functions in Auron.

Are there any user-facing changes?

Yes.
Queries using first_value(...) and last_value(...) can now remain on Auron's native window execution path.
Both RESPECT NULLS (default) and IGNORE NULLS variants are supported.

How was this patch tested?

CI.

…first_value and last_value window functions

Spark `first_value(...)` and `last_value(...)` are not supported in
Auron's native window execution path, causing queries using them to fall
back to the Spark path instead of being executed natively.

This PR extends native window coverage to include both functions:

first_value:
- maps `First(child, ignoreNulls)` in the window expression to the
  existing `AggFunction::First` / `AggFunction::FirstIgnoresNull` Rust
  aggregates via `NativeWindowBase`
- the running-aggregate semantics of `ROWS BETWEEN UNBOUNDED PRECEDING
  AND CURRENT ROW` naturally produce the correct first-value behavior
  through the existing `AggProcessor`
- no new Rust aggregate code is required for first_value

last_value:
- introduces `AggFunction::Last` and `AggFunction::LastIgnoresNull`
  to the Rust aggregate infrastructure
- adds `AggLast` (always updates accumulator, including with null values)
  and `AggLastIgnoresNull` (updates accumulator only for non-null values)
- extends the protobuf `AggFunction` enum with `LAST = 10` and
  `LAST_IGNORES_NULL = 11`
- adds planner and lib.rs mappings for the new proto values
- maps `Last(child, ignoreNulls)` in the window expression to the new
  `AggFunction::Last` / `AggFunction::LastIgnoresNull` aggregates

Both functions use the existing `AggProcessor` with frame
`ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`, consistent with
all other aggregate window functions in Auron.

Additional changes:
- adds `Last` import and conversion case to `NativeConverters.scala`
  so `last()` as a group aggregate also works natively
- adds `First` and `Last` imports and match cases to `NativeWindowBase`
- adds Scala regression tests for first_value and last_value, covering
  both RESPECT NULLS and IGNORE NULLS variants

Signed-off-by: Asish Kumar <officialasishkumar@gmail.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds native execution support for Spark first_value / last_value window functions (including IGNORE NULLS) by wiring them through Auron’s existing aggregate-window infrastructure and extending the native aggregate set.

Changes:

  • Extend Spark-side window/aggregate conversion to emit FIRST(_IGNORES_NULL) and new LAST(_IGNORES_NULL) agg functions.
  • Add native-engine Rust implementations for Last and LastIgnoresNull, plus factory/planner/proto enum wiring.
  • Add Scala regression tests covering first_value / last_value with RESPECT/IGNORE NULLS.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala Adds window-function mapping for First/Last aggregates to protobuf agg functions.
spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala Adds group-aggregate conversion support for Last / Last IGNORE NULLS.
spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronWindowSuite.scala Adds regression coverage for first_value / last_value (incl. IGNORE NULLS).
native-engine/datafusion-ext-plans/src/agg/mod.rs Registers new Last / LastIgnoresNull modules and enum variants.
native-engine/datafusion-ext-plans/src/agg/last.rs Implements AggLast (RESPECT NULLS) aggregate behavior.
native-engine/datafusion-ext-plans/src/agg/last_ignores_null.rs Implements AggLastIgnoresNull aggregate behavior.
native-engine/datafusion-ext-plans/src/agg/agg.rs Wires new agg functions into the aggregate factory.
native-engine/auron-planner/src/planner.rs Maps protobuf window agg functions to native AggFunction::Last*.
native-engine/auron-planner/src/lib.rs Adds protobuf-to-native enum conversion for Last*.
native-engine/auron-planner/proto/auron.proto Extends protobuf AggFunction enum with LAST and LAST_IGNORES_NULL.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

package org.apache.auron

import org.apache.spark.sql.AuronQueryTest
import org.apache.spark.sql.execution.auron.plan.NativeWindowBase
Comment on lines +152 to +160
let accs = downcast_any!(accs, mut AccScalarValueColumn)?;
idx_for_zipped! {
((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => {
if partial_arg.is_valid(partial_arg_idx) {
accs.set_value(acc_idx, compacted_scalar_value_from_array(partial_arg, partial_arg_idx)?);
} else {
accs.set_value(acc_idx, ScalarValue::try_from(&self.data_type)?);
}
}

@weiqingy weiqingy left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking this on — nice that first_value needed zero new Rust by reusing the existing AggProcessor / aggregate-window infrastructure, and AggLast reads cleanly as the "always overwrite" inversion of AggFirst. The tests are well-built too: checkSparkAnswerAndOperator both compares against vanilla Spark and asserts the native window operator actually ran. A few questions inline.

})
aggBuilder.addChildren(convertExpr(child))

case Last(child, ignoresNullExpr) =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case Last(child, ignoresNullExpr) adds last as a plain group-aggregate, which is a separate path from the window conversion in NativeWindowBase. All four new tests go through the window operator, so this group-agg branch ships without coverage — the new AggLast / AggLastIgnoresNull aggregates are only ever driven through the window path in CI.

The PR is framed around window functions, so I'm curious where this branch fits: is a SQL last(...) / last_value(...) as a GROUP BY aggregate reachable and intended here? If it is, would one checkSparkAnswerAndOperator test through the group-agg path make sense — Spark's own group-agg Last is nondeterministic, so pinning a tested behavior seems worth doing before relying on it. If it isn't reachable yet, would it be cleaner to defer it so it doesn't ship untested?

test("last_value window function") {
withSQLConf("spark.auron.enable.window" -> "true") {
withTable("t1") {
sql("create table t1(id int, grp int, v string) using parquet")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The v column is string across all four tests, so only the DataType::Utf8 / handle_bytes! arm of the new aggregates runs in CI. The primitive (handle_primitive!, e.g. int/long/double), DataType::Boolean, and scalar _other arms of partial_update / partial_merge are never exercised — and the primitive merge arm copies unconditionally (including None), which is the "last wins" inversion of First's guarded write, so it'd be good to have it run at least once.

Would it be worth adding one window test over an int column and one over a boolean column? That'd cover the primitive and boolean arms that are currently dead in CI.

withSQLConf("spark.auron.enable.window" -> "true") {
withTable("t1") {
sql("create table t1(id int, grp int, v string) using parquet")
sql("insert into t1 values (1, 1, 'a'), (2, 1, null), (3, 1, 'c'), (4, 2, 'x')")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group 1 here starts with 'a' (non-null), so the leading value is never null. That means the case that actually distinguishes RESPECT NULLS from IGNORE NULLS — a group whose first ordered row is null, where first_value should return that leading null while ignore nulls skips to the first non-null — isn't hit by this test. The ignore-nulls test does lead with a null, but the plain RESPECT-NULLS test doesn't.

Could group 1 start with a null instead (e.g. (1, 1, null), (2, 1, 'b'), ...)? That makes the test show RESPECT NULLS returning the leading null, which is the most diagnostic shape for first_value.

if partial_arg.is_valid(partial_arg_idx) {
accs.set_value(acc_idx, compacted_scalar_value_from_array(partial_arg, partial_arg_idx)?);
} else {
accs.set_value(acc_idx, ScalarValue::try_from(&self.data_type)?);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reinforcing the earlier note on this line with one more data point: this scalar (_other) null branch rebuilds ScalarValue::try_from(&self.data_type)? per row, whereas AggFirst's equivalent branch writes the cheap constant ScalarValue::Null (first.rs:178), and AccScalarValueColumn already precomputes this exact value once as null_value (acc.rs:577). It's correct as-is and only affects the scalar path, but the per-row try_from could be hoisted above the idx_for_zipped! loop or replaced with the precomputed null.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement native support for last_value window function Implement native support for first_value window function

3 participants