Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
934 changes: 637 additions & 297 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ fuzzy-matcher = "0.3"
geo-traits = "0.3.0"
geo-types = "0.7.19"
geoarrow = "0.8.0"
geoarrow-cast = "0.8.0"
# Temporary fork bumped to DataFusion 54 until the upstream PR lands; pinned to an exact rev.
geodatafusion = { git = "https://github.com/HarukiMoriarty/geodatafusion", rev = "3d50d7e549df720707133852848edd1ecff89265" }
get_dir = "0.5.0"
Expand Down Expand Up @@ -241,6 +242,14 @@ similar = "3.0.0"
sketches-ddsketch = "0.4.0"
smallvec = "1.15.1"
smol = "2.0.2"
spatialbench = "0.2"
spatialbench-arrow = "0.2"
# spatialbench still pins arrow 56, two majors behind the workspace arrow. Until upstream
# catches up, write its generated batches with a matching parquet instead of converting
# arrow versions at the boundary.
spatialbench-parquet = { package = "parquet", version = "56", features = [
"async",
] }
static_assertions = "1.1"
strum = "0.28"
syn = { version = "2.0.117", features = ["full"] }
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/datafusion-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ datafusion = { workspace = true, features = [
datafusion-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
futures.workspace = true
geodatafusion = { workspace = true }
itertools.workspace = true
object_store = { workspace = true, features = ["aws", "gcp"] }
opentelemetry.workspace = true
Expand All @@ -39,6 +40,7 @@ vortex = { workspace = true, features = ["object_store", "files", "tokio"] }
vortex-bench = { workspace = true }
vortex-cuda = { workspace = true, optional = true }
vortex-datafusion = { workspace = true }
vortex-geo = { workspace = true }
vortex-metrics = { workspace = true }

[build-dependencies]
Expand Down
41 changes: 32 additions & 9 deletions benchmarks/datafusion-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ use object_store::aws::AmazonS3Builder;
use object_store::gcp::GoogleCloudStorageBuilder;
use object_store::local::LocalFileSystem;
use url::Url;
use vortex_bench::BenchmarkArg;
use vortex_bench::Format;
use vortex_bench::SESSION;
use vortex_datafusion::VortexFormat;
use vortex_datafusion::VortexFormatFactory;
use vortex_datafusion::VortexTableOptions;

#[expect(clippy::expect_used)]
pub fn get_session_context() -> SessionContext {
pub fn get_session_context(benchmark: BenchmarkArg) -> SessionContext {
let mut rt_builder = RuntimeEnvBuilder::new();

let file_static_cache = Arc::new(DefaultFileStatisticsCache::default());
Expand All @@ -45,13 +46,27 @@ pub fn get_session_context() -> SessionContext {
.build_arc()
.expect("could not build runtime environment");

let factory = VortexFormatFactory::new().with_options(VortexTableOptions {
projection_pushdown: true,
..Default::default()
});
let factory = VortexFormatFactory::new_with_options(
SESSION.clone(),
VortexTableOptions {
projection_pushdown: true,
..Default::default()
},
);

let mut config = SessionConfig::from_env().expect("shouldn't fail");
// SpatialBench reads geoarrow.point Parquet and benchmarks an ST_* predicate, so it needs
// Parquet-specific tuning.
if matches!(benchmark, BenchmarkArg::SpatialBench) {
// Keep Parquet field metadata so the geoarrow.point extension survives the read.
config.options_mut().execution.parquet.skip_metadata = false;
// Evaluate (and reorder) the filter inside the parquet scan -- fairest parquet baseline.
config.options_mut().execution.parquet.pushdown_filters = true;
config.options_mut().execution.parquet.reorder_filters = true;
}

let mut session_state_builder = SessionStateBuilder::new()
.with_config(SessionConfig::from_env().expect("shouldn't fail"))
.with_config(config)
.with_runtime_env(rt)
.with_default_features();

Expand All @@ -66,7 +81,10 @@ pub fn get_session_context() -> SessionContext {
file_formats.push(Arc::new(factory));
}

SessionContext::new_with_state(session_state_builder.build())
let ctx = SessionContext::new_with_state(session_state_builder.build());
// Register geodatafusion's PostGIS-style ST_* UDFs so SpatialBench SQL plans.
geodatafusion::register(&ctx);
ctx
}

pub fn make_object_store(
Expand Down Expand Up @@ -109,11 +127,16 @@ pub fn make_object_store(
}
}

pub fn format_to_df_format(format: Format) -> Arc<dyn FileFormat> {
pub fn format_to_df_format(format: Format, benchmark: BenchmarkArg) -> Arc<dyn FileFormat> {
match format {
Format::Csv => Arc::new(CsvFormat::default()) as _,
Format::Arrow => Arc::new(ArrowFormat),
Format::Parquet => Arc::new(ParquetFormat::new()),
Format::Parquet => {
// SpatialBench needs Parquet field metadata to rebuild the geoarrow.point extension
// during schema inference; other benchmarks keep the DataFusion default.
let skip_metadata = !matches!(benchmark, BenchmarkArg::SpatialBench);
Arc::new(ParquetFormat::new().with_skip_metadata(skip_metadata))
}
Format::OnDiskVortex | Format::VortexCompact => {
Arc::new(VortexFormat::new(SESSION.clone()))
}
Expand Down
14 changes: 8 additions & 6 deletions benchmarks/datafusion-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion_physical_plan::collect;
use futures::StreamExt;
use parking_lot::Mutex;
use tokio::fs::File;
use vortex::array::arrow::ArrowSessionExt;
use vortex::io::filesystem::FileSystemRef;
use vortex::scan::DataSourceRef;
use vortex_bench::Benchmark;
Expand Down Expand Up @@ -188,9 +189,9 @@ async fn main() -> anyhow::Result<()> {
|format| {
let benchmark = &*benchmark;
async move {
let session = datafusion_bench::get_session_context();
let session = datafusion_bench::get_session_context(args.benchmark);
datafusion_bench::make_object_store(&session, benchmark.data_url())?;
register_benchmark_tables(&session, benchmark, format).await?;
register_benchmark_tables(&session, benchmark, format, args.benchmark).await?;
Ok((session, format))
}
},
Expand Down Expand Up @@ -254,15 +255,16 @@ async fn register_benchmark_tables<B: Benchmark + ?Sized>(
session: &SessionContext,
benchmark: &B,
format: Format,
benchmark_arg: BenchmarkArg,
) -> anyhow::Result<()> {
match format {
Format::Arrow => register_arrow_tables(session, benchmark).await,
_ if use_scan_api() && matches!(format, Format::OnDiskVortex | Format::VortexCompact) => {
register_v2_tables(session, benchmark, format).await
}
_ => {
let benchmark_base = benchmark.data_url().join(&format!("{}/", format.name()))?;
let file_format = format_to_df_format(format);
let benchmark_base = benchmark.format_path(format, benchmark.data_url())?;
let file_format = format_to_df_format(format, benchmark_arg);

for table in benchmark.table_specs().iter() {
let pattern = benchmark.pattern(table.name, format);
Expand Down Expand Up @@ -307,7 +309,7 @@ async fn register_v2_tables<B: Benchmark + ?Sized>(
use vortex::scan::DataSource as _;
use vortex_datafusion::v2::VortexTable;

let benchmark_base = benchmark.data_url().join(&format!("{}/", format.name()))?;
let benchmark_base = benchmark.format_path(format, benchmark.data_url())?;

for table in benchmark.table_specs().iter() {
let pattern = benchmark.pattern(table.name, format);
Expand All @@ -334,7 +336,7 @@ async fn register_v2_tables<B: Benchmark + ?Sized>(
.build()
.await?;

let arrow_schema = Arc::new(multi_ds.dtype().to_arrow_schema()?);
let arrow_schema = Arc::new(SESSION.arrow().to_arrow_schema(multi_ds.dtype())?);
let data_source: DataSourceRef = Arc::new(multi_ds);

let table_provider = Arc::new(VortexTable::new(data_source, SESSION.clone(), arrow_schema));
Expand Down
6 changes: 6 additions & 0 deletions vortex-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ vortex = { workspace = true, features = [
"tokio",
"zstd",
] }
vortex-geo = { workspace = true }
vortex-tensor = { workspace = true } # TODO(connor): In the future, this might be inside vortex.

anyhow = { workspace = true }
Expand All @@ -33,6 +34,8 @@ async-trait = { workspace = true }
bzip2 = { workspace = true }
clap = { workspace = true, features = ["derive"] }
futures = { workspace = true }
geoarrow = { workspace = true }
geoarrow-cast = { workspace = true }
get_dir = { workspace = true }
glob = { workspace = true }
humansize = { workspace = true }
Expand All @@ -48,6 +51,9 @@ regex = { workspace = true }
reqwest = { workspace = true, features = ["stream"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
spatialbench = { workspace = true }
spatialbench-arrow = { workspace = true }
spatialbench-parquet = { workspace = true }
sysinfo = { workspace = true }
tabled = { workspace = true, features = ["std"] }
target-lexicon = { workspace = true }
Expand Down
Loading
Loading