Skip to content

Commit d2f7474

Browse files
adriangbclaude
andcommitted
feat(parquet): observer-driven pruning in row-group and page filters
`RowGroupAccessPlanFilter::prune_by_statistics_with_observer` takes a `PruningConjunction` plus `&mut dyn PruningObserver`. The untagged `prune_by_statistics` wraps the predicate via `PruningConjunction::single` and passes `NoopObserver` — one code path, two API shapes. `PagePruningAccessPlanFilter`: * Predicates carry an optional caller `Tag` internally (`TaggedPagePredicate`); untagged callers see no behavior change. * `new_tagged(&[(Tag, expr)], &schema)` for tagged construction. * The page-index loop emits one observer event per leaf actually evaluated; leaves cut off by the existing `!selects_any` short-circuit are not observed (resolves reviewer Q2 on #22235: per-conjunct stats are not biased by predicates that never ran). * `prune_plan_with_page_index` and the new `prune_plan_with_observer` share a single body — no duplicated method as in the prior draft. Fully-matched detection still consumes a flat `PruningPredicate` (it inverts the combined expression); the observer path receives that combined predicate explicitly and fires only for the per-leaf pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 258f650 commit d2f7474

3 files changed

Lines changed: 253 additions & 37 deletions

File tree

datafusion/datasource-parquet/src/opener/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,13 +1103,15 @@ impl RowGroupsPrunedParquetOpen {
11031103
&& !access_plan.is_empty()
11041104
&& let Some(page_pruning_predicate) = page_pruning_predicate
11051105
{
1106+
let mut obs = datafusion_pruning::NoopObserver;
11061107
let page_pruning_result = page_pruning_predicate
11071108
.prune_plan_with_page_index_and_metrics(
11081109
access_plan,
11091110
&prepared.physical_file_schema,
11101111
reader_metadata.parquet_schema(),
11111112
file_metadata.as_ref(),
11121113
&prepared.file_metrics,
1114+
&mut obs,
11131115
);
11141116
access_plan = page_pruning_result.access_plan;
11151117
ParquetFileMetrics::add_page_index_pages_skipped_by_fully_matched(

datafusion/datasource-parquet/src/page_filter.rs

Lines changed: 128 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use arrow::{
3131
use datafusion_common::ScalarValue;
3232
use datafusion_common::pruning::PruningStatistics;
3333
use datafusion_physical_expr::{PhysicalExpr, split_conjunction};
34-
use datafusion_pruning::PruningPredicate;
34+
use datafusion_pruning::{NoopObserver, PruningObserver, PruningPredicate, Tag};
3535

3636
use log::{debug, trace};
3737
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
@@ -110,9 +110,20 @@ use parquet::{
110110
/// row selection that is added to the [`ParquetAccessPlan`].
111111
#[derive(Debug)]
112112
pub struct PagePruningAccessPlanFilter {
113-
/// single column predicates (e.g. (`col = 5`) extracted from the overall
114-
/// predicate. Must all be true for a row to be included in the result.
115-
predicates: Vec<PruningPredicate>,
113+
/// Single-column predicates (e.g. `col = 5`) extracted from the
114+
/// overall predicate. Must all be true for a row to be included.
115+
/// Each carries an optional caller [`Tag`]; when present, the
116+
/// page-index evaluation loop fires `observer.on_leaf(tag, ..)`
117+
/// after evaluating that predicate. Untagged predicates produce
118+
/// `None` tags and no-op against `NoopObserver`.
119+
predicates: Vec<TaggedPagePredicate>,
120+
}
121+
122+
/// Single-column predicate paired with an optional caller tag.
123+
#[derive(Debug)]
124+
struct TaggedPagePredicate {
125+
tag: Option<Tag>,
126+
predicate: PruningPredicate,
116127
}
117128

118129
/// Result of applying page-index pruning to a [`ParquetAccessPlan`].
@@ -137,40 +148,55 @@ impl PagePruningResult {
137148

138149
impl PagePruningAccessPlanFilter {
139150
/// Create a new [`PagePruningAccessPlanFilter`] from a physical
140-
/// expression.
151+
/// expression. Predicates created this way have no caller tag.
141152
#[expect(clippy::needless_pass_by_value)]
142153
pub fn new(expr: &Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Self {
143-
// extract any single column predicates
144154
let predicates = split_conjunction(expr)
145155
.into_iter()
146-
.filter_map(|predicate| {
147-
let pp = match PruningPredicate::try_new(
148-
Arc::clone(predicate),
149-
Arc::clone(&schema),
150-
) {
151-
Ok(pp) => pp,
152-
Err(e) => {
153-
debug!("Ignoring error creating page pruning predicate: {e}");
154-
return None;
155-
}
156-
};
157-
158-
if pp.always_true() {
159-
debug!("Ignoring always true page pruning predicate: {predicate}");
160-
return None;
161-
}
162-
163-
if pp.required_columns().single_column().is_none() {
164-
debug!("Ignoring multi-column page pruning predicate: {predicate}");
165-
return None;
166-
}
156+
.filter_map(|predicate| Self::build_one(predicate, &schema, None))
157+
.collect::<Vec<_>>();
158+
Self { predicates }
159+
}
167160

168-
Some(pp)
169-
})
161+
/// Like [`Self::new`], but each conjunct carries a caller-supplied
162+
/// [`Tag`]. The page-index evaluation loop fires
163+
/// `observer.on_leaf(tag, ..)` once per leaf actually evaluated;
164+
/// leaves cut off by the existing AND short-circuit on row
165+
/// selection (`!selects_any`) are not observed.
166+
pub fn new_tagged(
167+
tagged: &[(Tag, Arc<dyn PhysicalExpr>)],
168+
schema: &SchemaRef,
169+
) -> Self {
170+
let predicates = tagged
171+
.iter()
172+
.filter_map(|(tag, expr)| Self::build_one(expr, schema, Some(*tag)))
170173
.collect::<Vec<_>>();
171174
Self { predicates }
172175
}
173176

177+
fn build_one(
178+
expr: &Arc<dyn PhysicalExpr>,
179+
schema: &SchemaRef,
180+
tag: Option<Tag>,
181+
) -> Option<TaggedPagePredicate> {
182+
let pp = match PruningPredicate::try_new(Arc::clone(expr), Arc::clone(schema)) {
183+
Ok(pp) => pp,
184+
Err(e) => {
185+
debug!("Ignoring error creating page pruning predicate: {e}");
186+
return None;
187+
}
188+
};
189+
if pp.always_true() {
190+
debug!("Ignoring always true page pruning predicate: {expr}");
191+
return None;
192+
}
193+
if pp.required_columns().single_column().is_none() {
194+
debug!("Ignoring multi-column page pruning predicate: {expr}");
195+
return None;
196+
}
197+
Some(TaggedPagePredicate { tag, predicate: pp })
198+
}
199+
174200
/// Returns an updated [`ParquetAccessPlan`] by applying predicates to the
175201
/// parquet page index, if any
176202
pub fn prune_plan_with_page_index(
@@ -181,26 +207,64 @@ impl PagePruningAccessPlanFilter {
181207
parquet_metadata: &ParquetMetaData,
182208
file_metrics: &ParquetFileMetrics,
183209
) -> ParquetAccessPlan {
210+
let mut obs = NoopObserver;
184211
self.prune_plan_with_page_index_and_metrics(
185212
access_plan,
186213
arrow_schema,
187214
parquet_schema,
188215
parquet_metadata,
189216
file_metrics,
217+
&mut obs,
218+
)
219+
.access_plan
220+
}
221+
222+
/// Tagged variant. Fires `observer.on_leaf(tag, mask)` once per
223+
/// predicate that is actually evaluated against page-index
224+
/// statistics. The `mask` is per-row-group at this layer:
225+
/// `mask[i] = true` means row group `i` still has at least one
226+
/// page that may match this leaf. Predicates skipped by the
227+
/// existing per-row-group `!selects_any` short-circuit are not
228+
/// observed — so per-conjunct stats are not biased by predicates
229+
/// that never ran (resolves the reviewer's Q2 concern on
230+
/// PR #22235).
231+
pub fn prune_plan_with_observer<O>(
232+
&self,
233+
access_plan: ParquetAccessPlan,
234+
arrow_schema: &Schema,
235+
parquet_schema: &SchemaDescriptor,
236+
parquet_metadata: &ParquetMetaData,
237+
file_metrics: &ParquetFileMetrics,
238+
observer: &mut O,
239+
) -> ParquetAccessPlan
240+
where
241+
O: PruningObserver + ?Sized,
242+
{
243+
self.prune_plan_with_page_index_and_metrics(
244+
access_plan,
245+
arrow_schema,
246+
parquet_schema,
247+
parquet_metadata,
248+
file_metrics,
249+
observer,
190250
)
191251
.access_plan
192252
}
193253

194254
/// Returns an updated [`ParquetAccessPlan`] and metrics by applying predicates
195255
/// to the parquet page index, if any.
196-
pub(crate) fn prune_plan_with_page_index_and_metrics(
256+
pub(crate) fn prune_plan_with_page_index_and_metrics<O>(
197257
&self,
198258
mut access_plan: ParquetAccessPlan,
199259
arrow_schema: &Schema,
200260
parquet_schema: &SchemaDescriptor,
201261
parquet_metadata: &ParquetMetaData,
202262
file_metrics: &ParquetFileMetrics,
203-
) -> PagePruningResult {
263+
observer: &mut O,
264+
) -> PagePruningResult
265+
where
266+
O: PruningObserver + ?Sized,
267+
{
204268
// scoped timer updates on drop
205269
let _timer_guard = file_metrics.page_index_eval_time.timer();
206270
if self.predicates.is_empty() {
@@ -210,6 +274,16 @@ impl PagePruningAccessPlanFilter {
210274
let page_index_predicates = &self.predicates;
211275
let groups = parquet_metadata.row_groups();
212276

277+
// Per-leaf "did this row group still have any matching pages
278+
// after the leaf alone?" mask. Built across all row groups,
279+
// emitted via `observer.on_leaf` at the end so each leaf gets
280+
// exactly one observation per call to this function. Leaves
281+
// never evaluated (because an earlier conjunct emptied the
282+
// running row selection — `!selects_any` break below) end up
283+
// with `None` here and are intentionally not observed.
284+
let mut per_leaf_mask: Vec<Option<Vec<bool>>> =
285+
(0..page_index_predicates.len()).map(|_| None).collect();
286+
213287
if groups.is_empty() {
214288
return PagePruningResult::new(access_plan, 0);
215289
}
@@ -262,7 +336,8 @@ impl PagePruningAccessPlanFilter {
262336
let mut matched_pages_in_group: HashSet<usize> =
263337
HashSet::from_iter(0..total_pages_in_group);
264338

265-
for predicate in page_index_predicates {
339+
for (leaf_idx, tagged) in page_index_predicates.iter().enumerate() {
340+
let predicate = &tagged.predicate;
266341
let Some(column) = predicate.required_columns().single_column() else {
267342
debug!(
268343
"Ignoring multi-column page pruning predicate: {:?}",
@@ -307,6 +382,15 @@ impl PagePruningAccessPlanFilter {
307382
predicate.predicate_expr(),
308383
);
309384

385+
// Per-leaf observation: this leaf ran for `row_group_index`
386+
// and produced `selection`. Whether this row group is
387+
// "kept" by the leaf is `selection.selects_any()`. The
388+
// entry is created lazily so leaves never observed (due
389+
// to short-circuit) stay `None`.
390+
let mask = per_leaf_mask[leaf_idx]
391+
.get_or_insert_with(|| vec![false; groups.len()]);
392+
mask[row_group_index] = selection.selects_any();
393+
310394
let matched_pages_indexes: HashSet<_> = pages
311395
.into_iter()
312396
.enumerate()
@@ -369,6 +453,18 @@ impl PagePruningAccessPlanFilter {
369453
file_metrics
370454
.page_index_pages_pruned
371455
.add_matched(total_pages_select);
456+
457+
// Emit one observer event per leaf that was actually
458+
// evaluated against at least one row group. Leaves that the
459+
// outer `!selects_any` short-circuit prevented from running
460+
// stay `None` here and are correctly absent from the stats.
461+
for (leaf_idx, mask_opt) in per_leaf_mask.into_iter().enumerate() {
462+
if let Some(mask) = mask_opt {
463+
let tag = page_index_predicates[leaf_idx].tag;
464+
observer.on_leaf(tag, &mask);
465+
}
466+
}
467+
372468
PagePruningResult::new(access_plan, total_pages_skipped_by_fully_matched)
373469
}
374470

0 commit comments

Comments
 (0)