diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index ed58f32e11d..eed31d66f4b 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -149,6 +149,7 @@ pub struct WriteStrategyBuilder { field_writers: HashMap>, allow_encodings: Option>, flat_strategy: Option>, + probe_compressor: Option>, } impl Default for WriteStrategyBuilder { @@ -161,6 +162,7 @@ impl Default for WriteStrategyBuilder { field_writers: HashMap::new(), allow_encodings: Some(ALLOWED_ENCODINGS.clone()), flat_strategy: None, + probe_compressor: None, } } } @@ -215,6 +217,12 @@ impl WriteStrategyBuilder { self } + /// Override the compressor used to probe whether a column is dict-eligible. + pub fn with_probe_compressor(mut self, compressor: C) -> Self { + self.probe_compressor = Some(Arc::new(compressor)); + self + } + /// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides /// applied. pub fn build(self) -> Arc { @@ -268,14 +276,20 @@ impl WriteStrategyBuilder { CompressorConfig::BtrBlocks(builder) => Arc::new(builder.build()), CompressorConfig::Opaque(compressor) => compressor, }; - let compress_then_flat = CompressingStrategy::new(flat, stats_compressor); + let compress_then_flat = CompressingStrategy::new(flat, Arc::clone(&stats_compressor)); // 3. apply dict encoding or fallback + let probe_compressor = if let Some(probe_compressor) = self.probe_compressor { + probe_compressor + } else { + Arc::clone(&stats_compressor) + }; let dict = DictStrategy::new( coalescing.clone(), compress_then_flat.clone(), coalescing, Default::default(), + probe_compressor, ); // 2. calculate stats for each row group diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index e320cf2e9d9..e6d6c48605f 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -59,6 +59,9 @@ use vortex_array::stats::PRUNING_STATS; use vortex_array::stream::ArrayStreamAdapter; use vortex_array::stream::ArrayStreamExt; use vortex_array::validity::Validity; +use vortex_btrblocks::BtrBlocksCompressorBuilder; +use vortex_btrblocks::SchemeExt; +use vortex_btrblocks::schemes::string::StringDictScheme; use vortex_buffer::Buffer; use vortex_buffer::ByteBufferMut; use vortex_buffer::buffer; @@ -1813,6 +1816,16 @@ fn assert_offsets_ordered(before: &[u64], after: &[u64], context: &str) { } } +/// Whether any node in the layout tree is a dict layout. +fn layout_has_dict(layout: &dyn Layout) -> bool { + layout.encoding_id().as_ref() == "vortex.dict" + || layout + .children() + .unwrap() + .iter() + .any(|child| layout_has_dict(child.as_ref())) +} + #[tokio::test] #[cfg_attr(miri, ignore)] async fn test_segment_ordering_dict_codes_before_values() -> VortexResult<()> { @@ -1861,6 +1874,75 @@ async fn test_segment_ordering_dict_codes_before_values() -> VortexResult<()> { Ok(()) } +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn dict_probe_honours_configured_compressor() -> VortexResult<()> { + // Low-cardinality strings so the default cascade picks a dictionary. + let n = 32_768; + let values: Vec<&str> = (0..n).map(|i| ["alpha", "beta", "gamma"][i % 3]).collect(); + let strings = VarBinArray::from(values).into_array(); + + let mut buf = ByteBufferMut::empty(); + let summary = SESSION + .write_options() + .with_strategy(crate::strategy::WriteStrategyBuilder::default().build()) + .write(&mut buf, strings.clone().to_array_stream()) + .await?; + assert!( + layout_has_dict(summary.footer().layout().as_ref()), + "default builder should produce a dict layout for low-cardinality strings" + ); + + let no_string_dict = + BtrBlocksCompressorBuilder::default().exclude_schemes([StringDictScheme.id()]); + let mut buf = ByteBufferMut::empty(); + let summary = SESSION + .write_options() + .with_strategy( + crate::strategy::WriteStrategyBuilder::default() + .with_btrblocks_builder(no_string_dict) + .build(), + ) + .write(&mut buf, strings.to_array_stream()) + .await?; + assert!( + !layout_has_dict(summary.footer().layout().as_ref()), + "excluding StringDict from the configured compressor should disable the dict layout" + ); + + Ok(()) +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn probe_compressor_override_is_independent() -> VortexResult<()> { + // Low-cardinality strings the default cascade would dict-encode. + let n = 32_768; + let values: Vec<&str> = (0..n).map(|i| ["alpha", "beta", "gamma"][i % 3]).collect(); + let strings = VarBinArray::from(values).into_array(); + + let probe_without_dict = BtrBlocksCompressorBuilder::default() + .exclude_schemes([StringDictScheme.id()]) + .build(); + + let mut buf = ByteBufferMut::empty(); + let summary = SESSION + .write_options() + .with_strategy( + crate::strategy::WriteStrategyBuilder::default() + .with_probe_compressor(probe_without_dict) + .build(), + ) + .write(&mut buf, strings.to_array_stream()) + .await?; + assert!( + !layout_has_dict(summary.footer().layout().as_ref()), + "probe override should disable the dict layout independently of the data/stats compressor" + ); + + Ok(()) +} + #[tokio::test] #[cfg_attr(miri, ignore)] async fn test_segment_ordering_zonemaps_after_data() -> VortexResult<()> { diff --git a/vortex-layout/src/layouts/dict/reader.rs b/vortex-layout/src/layouts/dict/reader.rs index 002b4b1e902..d6d2a30fbbb 100644 --- a/vortex-layout/src/layouts/dict/reader.rs +++ b/vortex-layout/src/layouts/dict/reader.rs @@ -289,6 +289,7 @@ mod tests { use vortex_array::scalar_fn::session::ScalarFnSession; use vortex_array::session::ArraySession; use vortex_array::validity::Validity; + use vortex_btrblocks::BtrBlocksCompressor; use vortex_error::VortexExpect; use vortex_io::runtime::Handle; use vortex_io::runtime::single::block_on; @@ -329,6 +330,7 @@ mod tests { FlatLayoutStrategy::default(), FlatLayoutStrategy::default(), DictLayoutOptions::default(), + Arc::new(BtrBlocksCompressor::default()), ); let array = VarBinArray::from_iter( @@ -428,6 +430,7 @@ mod tests { FlatLayoutStrategy::default(), FlatLayoutStrategy::default(), DictLayoutOptions::default(), + Arc::new(BtrBlocksCompressor::default()), ); let array = @@ -479,6 +482,7 @@ mod tests { FlatLayoutStrategy::default(), FlatLayoutStrategy::default(), DictLayoutOptions::default(), + Arc::new(BtrBlocksCompressor::default()), ); let array = VarBinArray::from_iter( diff --git a/vortex-layout/src/layouts/dict/writer.rs b/vortex-layout/src/layouts/dict/writer.rs index 7013becd4df..efb06241743 100644 --- a/vortex-layout/src/layouts/dict/writer.rs +++ b/vortex-layout/src/layouts/dict/writer.rs @@ -28,7 +28,6 @@ use vortex_array::builders::dict::dict_encoder; use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; use vortex_array::dtype::PType; -use vortex_btrblocks::BtrBlocksCompressor; use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -42,6 +41,7 @@ use crate::LayoutRef; use crate::LayoutStrategy; use crate::OwnedLayoutChildren; use crate::layouts::chunked::ChunkedLayout; +use crate::layouts::compressed::CompressorPlugin; use crate::layouts::dict::DictLayout; use crate::segments::SegmentSinkRef; use crate::sequence::SendableSequentialStream; @@ -106,6 +106,7 @@ pub struct DictStrategy { values: Arc, fallback: Arc, options: DictLayoutOptions, + probe_compressor: Arc, } impl DictStrategy { @@ -114,12 +115,14 @@ impl DictStrategy { values: Values, fallback: Fallback, options: DictLayoutOptions, + probe_compressor: Arc, ) -> Self { Self { codes: Arc::new(codes), values: Arc::new(values), fallback: Arc::new(fallback), options, + probe_compressor, } } } @@ -153,7 +156,9 @@ impl LayoutStrategy for DictStrategy { None => true, // empty stream Some(chunk) => { let mut exec_ctx = session.create_execution_ctx(); - let compressed = BtrBlocksCompressor::default().compress(&chunk, &mut exec_ctx)?; + let compressed = self + .probe_compressor + .compress_chunk(&chunk, &mut exec_ctx)?; !compressed.is::() } };