From 43a74b1c23fbf22f203a2253fa9527c3252c1604 Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Wed, 17 Jun 2026 14:34:45 +0100 Subject: [PATCH] remove duckdbfs Signed-off-by: Mikhail Kot --- vortex-duckdb/cpp/copy_function.cpp | 5 +- vortex-duckdb/cpp/table_function.cpp | 6 +- vortex-duckdb/include/vortex.h | 6 +- vortex-duckdb/src/copy.rs | 22 +- vortex-duckdb/src/duckdb/file_system.rs | 198 --------- vortex-duckdb/src/duckdb/mod.rs | 2 - .../src/e2e_test/vortex_scan_test.rs | 54 --- vortex-duckdb/src/ffi.rs | 9 +- vortex-duckdb/src/filesystem.rs | 384 ------------------ vortex-duckdb/src/lib.rs | 1 - vortex-duckdb/src/multi_file.rs | 33 +- vortex-duckdb/src/table_function.rs | 9 +- 12 files changed, 46 insertions(+), 683 deletions(-) delete mode 100644 vortex-duckdb/src/duckdb/file_system.rs delete mode 100644 vortex-duckdb/src/filesystem.rs diff --git a/vortex-duckdb/cpp/copy_function.cpp b/vortex-duckdb/cpp/copy_function.cpp index 0dda88ffee5..89d07aca462 100644 --- a/vortex-duckdb/cpp/copy_function.cpp +++ b/vortex-duckdb/cpp/copy_function.cpp @@ -58,13 +58,12 @@ unique_ptr copy_to_bind(ClientContext &, } unique_ptr -copy_to_initialize_global(ClientContext &context, FunctionData &bind_data, const string &file_path) { +copy_to_initialize_global(ClientContext &, FunctionData &bind_data, const string &file_path) { void *const ffi_bind = bind_data.Cast().ffi_data->DataPtr(); - const auto ffi_ctx = reinterpret_cast(&context); duckdb_vx_error error_out = nullptr; const duckdb_vx_data ffi_global = - duckdb_copy_function_copy_to_initialize_global(ffi_ctx, ffi_bind, file_path.c_str(), &error_out); + duckdb_copy_function_copy_to_initialize_global(ffi_bind, file_path.c_str(), &error_out); if (error_out) { throw ExecutorException(IntoErrString(error_out)); } diff --git a/vortex-duckdb/cpp/table_function.cpp b/vortex-duckdb/cpp/table_function.cpp index 2ea65f839b0..f18557a2d11 100644 --- a/vortex-duckdb/cpp/table_function.cpp +++ b/vortex-duckdb/cpp/table_function.cpp @@ -197,16 +197,14 @@ bool projection_expression_pushdown(ClientContext &, const TableFunctionProjecti * and after a query another file is added matching the glob, for second query * bind() will be called again. */ -unique_ptr duckdb_vx_table_function_bind(ClientContext &context, +unique_ptr duckdb_vx_table_function_bind(ClientContext &, TableFunctionBindInput &input, vector &return_types, vector &names) { CTableBindResult result = {return_types, names}; duckdb_vx_error error_out = nullptr; - auto ctx = reinterpret_cast(&context); - auto ffi_bind_data = duckdb_table_function_bind(ctx, - reinterpret_cast(&input), + auto ffi_bind_data = duckdb_table_function_bind(reinterpret_cast(&input), reinterpret_cast(&result), &error_out); if (error_out) { diff --git a/vortex-duckdb/include/vortex.h b/vortex-duckdb/include/vortex.h index 3480e625fb9..1a30478d0a6 100644 --- a/vortex-duckdb/include/vortex.h +++ b/vortex-duckdb/include/vortex.h @@ -86,8 +86,7 @@ duckdb_vx_data duckdb_table_function_init_global(const duckdb_vx_tfunc_init_inpu extern duckdb_vx_data duckdb_table_function_init_local(void *global_init_data); extern -duckdb_vx_data duckdb_table_function_bind(duckdb_client_context ctx, - duckdb_vx_tfunc_bind_input bind_input, +duckdb_vx_data duckdb_table_function_bind(duckdb_vx_tfunc_bind_input bind_input, duckdb_vx_tfunc_bind_result bind_result, duckdb_vx_error *error_out); @@ -101,8 +100,7 @@ duckdb_vx_data duckdb_copy_function_copy_to_bind(const char *const *column_names duckdb_vx_error *error_out); extern -duckdb_vx_data duckdb_copy_function_copy_to_initialize_global(duckdb_client_context client_context, - const void *bind_data, +duckdb_vx_data duckdb_copy_function_copy_to_initialize_global(const void *bind_data, const char *file_path, duckdb_vx_error *error_out); diff --git a/vortex-duckdb/src/copy.rs b/vortex-duckdb/src/copy.rs index 14cdbc0cd58..ec1d3b30775 100644 --- a/vortex-duckdb/src/copy.rs +++ b/vortex-duckdb/src/copy.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use async_fs::OpenOptions; use futures::SinkExt; use futures::TryStreamExt; use futures::channel::mpsc; @@ -28,9 +29,7 @@ use crate::RUNTIME; use crate::SESSION; use crate::convert::FromLogicalType; use crate::convert::data_chunk_to_vortex; -use crate::duckdb::ClientContextRef; use crate::duckdb::DataChunkRef; -use crate::duckdb::DuckDbFsWriter; use crate::duckdb::LogicalTypeRef; #[derive(Clone)] @@ -111,7 +110,6 @@ pub fn copy_to_finalize(init_global: &mut CopyFunctionGlobal) -> VortexResult<() } pub fn copy_to_initialize_global( - client_context: &ClientContextRef, bind_data: &CopyFunctionBind, file_path: String, ) -> VortexResult { @@ -120,16 +118,16 @@ pub fn copy_to_initialize_global( let array_stream = ArrayStreamAdapter::new(bind_data.dtype.clone(), rx.into_stream()); let handle = SESSION.handle(); - // SAFETY: The ClientContext is owned by the Connection and lives for the duration of - // query execution. DuckDB keeps the connection alive while this copy function runs. - let ctx = unsafe { client_context.erase_lifetime() }; - // Use DuckDB FS exclusively to match the DuckDB client context configuration. - let writer = DuckDbFsWriter::new(ctx, &file_path) - .map_err(|e| vortex_err!("Failed to create DuckDB FS writer for {file_path}: {e}"))?; - - let write_task = - handle.spawn(async move { SESSION.write_options().write(writer, array_stream).await }); + let write_task = handle.spawn(async move { + let writer = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(file_path) + .await?; + SESSION.write_options().write(writer, array_stream).await + }); let worker_pool = RUNTIME.new_pool(); worker_pool.set_workers_to_available_parallelism(); diff --git a/vortex-duckdb/src/duckdb/file_system.rs b/vortex-duckdb/src/duckdb/file_system.rs deleted file mode 100644 index a3c76af633d..00000000000 --- a/vortex-duckdb/src/duckdb/file_system.rs +++ /dev/null @@ -1,198 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::ffi::CStr; -use std::ffi::CString; -use std::ptr; -use std::sync::Arc; - -use vortex::error::VortexError; -use vortex::error::VortexResult; -use vortex::error::vortex_err; -use vortex::io::IoBuf; -use vortex::io::VortexWrite; -use vortex::io::runtime::BlockingRuntime; - -use crate::RUNTIME; -use crate::cpp; -use crate::duckdb::ClientContextRef; -use crate::lifetime_wrapper; - -lifetime_wrapper!( - FsFileHandle, - cpp::duckdb_vx_file_handle, - cpp::duckdb_vx_fs_close -); -unsafe impl Send for FsFileHandle {} -unsafe impl Sync for FsFileHandle {} - -pub(crate) fn fs_error(err: cpp::duckdb_vx_error) -> VortexError { - if err.is_null() { - return vortex_err!("DuckDB filesystem error (unknown)"); - } - let message = unsafe { CStr::from_ptr(cpp::duckdb_vx_error_value(err)) } - .to_string_lossy() - .to_string(); - unsafe { cpp::duckdb_vx_error_free(err) }; - vortex_err!("{message}") -} - -/// An entry returned by [`duckdb_fs_list_dir`]. -pub(crate) struct DirEntry { - /// Full path for S3 files, relative path for local files - pub name: String, - pub is_dir: bool, -} - -/// Non-recursively list entries in `directory` via DuckDB's filesystem. -/// -/// Returns full paths. The caller is responsible for joining paths and -/// recursing into subdirectories. -pub(crate) fn duckdb_fs_list_dir( - ctx: &ClientContextRef, - directory: &str, -) -> VortexResult> { - let c_directory = - CString::new(directory).map_err(|e| vortex_err!("Invalid directory path: {e}"))?; - - let mut entries: Vec = Vec::new(); - let mut err: cpp::duckdb_vx_error = ptr::null_mut(); - - let status = unsafe { - cpp::duckdb_vx_fs_list_files( - ctx.as_ptr(), - c_directory.as_ptr(), - Some(list_files_callback), - (&raw mut entries).cast(), - &raw mut err, - ) - }; - - if status != cpp::duckdb_state::DuckDBSuccess { - return Err(fs_error(err)); - } - - Ok(entries) -} - -/// FFI callback invoked by `duckdb_vx_fs_list_files` for each directory entry. -unsafe extern "C-unwind" fn list_files_callback( - name: *const std::ffi::c_char, - is_dir: bool, - user_data: *mut std::ffi::c_void, -) { - let entries = unsafe { &mut *user_data.cast::>() }; - let name = unsafe { CStr::from_ptr(name) } - .to_string_lossy() - .into_owned(); - entries.push(DirEntry { name, is_dir }); -} - -pub(crate) struct DuckDbFsWriter { - handle: Arc, - pos: u64, -} - -impl DuckDbFsWriter { - pub(crate) fn new(ctx: &ClientContextRef, path: &str) -> VortexResult { - let c_path = CString::new(path).map_err(|e| vortex_err!("Invalid path: {e}"))?; - let mut err: cpp::duckdb_vx_error = ptr::null_mut(); - let file_handle = - unsafe { cpp::duckdb_vx_fs_create(ctx.as_ptr(), c_path.as_ptr(), &raw mut err) }; - if file_handle.is_null() { - return Err(fs_error(err)); - } - - Ok(Self { - handle: Arc::new(unsafe { FsFileHandle::own(file_handle) }), - pos: 0, - }) - } -} - -impl VortexWrite for DuckDbFsWriter { - async fn write_all(&mut self, buffer: B) -> std::io::Result { - let len = buffer.bytes_init(); - let offset = self.pos; - let handle = Arc::clone(&self.handle); - - let runtime = RUNTIME.handle(); - let buffer = runtime - .spawn_blocking(move || { - let mut err: cpp::duckdb_vx_error = ptr::null_mut(); - let mut out_len: cpp::idx_t = 0; - let status = unsafe { - cpp::duckdb_vx_fs_write( - handle.as_ptr(), - offset as cpp::idx_t, - len as cpp::idx_t, - buffer.read_ptr() as *mut u8, - &raw mut out_len, - &raw mut err, - ) - }; - - if status != cpp::duckdb_state::DuckDBSuccess { - return Err(std::io::Error::other(fs_error(err).to_string())); - } - - Ok(buffer) - }) - .await?; - - self.pos = offset + len as u64; - Ok(buffer) - } - - async fn flush(&mut self) -> std::io::Result<()> { - let handle = Arc::clone(&self.handle); - - let runtime = RUNTIME.handle(); - runtime - .spawn_blocking(move || { - let mut err: cpp::duckdb_vx_error = ptr::null_mut(); - let status = unsafe { cpp::duckdb_vx_fs_sync(handle.as_ptr(), &raw mut err) }; - if status != cpp::duckdb_state::DuckDBSuccess { - return Err(std::io::Error::other(fs_error(err).to_string())); - } - Ok(()) - }) - .await - } - - async fn shutdown(&mut self) -> std::io::Result<()> { - self.flush().await - } -} - -#[cfg(test)] -mod tests { - use std::fs; - use std::path::PathBuf; - - use super::*; - use crate::duckdb::Database; - - #[test] - fn test_writer_roundtrip_local() { - let db = Database::open_in_memory().unwrap(); - let conn = db.connect().unwrap(); - let ctx = conn.client_context().unwrap(); - - let dir = tempfile::tempdir().unwrap(); - let path: PathBuf = dir.path().join("writer_local.vortex"); - let path_str = path.to_string_lossy(); - - let mut writer = DuckDbFsWriter::new(ctx, &path_str).unwrap(); - - futures::executor::block_on(async { - VortexWrite::write_all(&mut writer, vec![1_u8, 2, 3]) - .await - .unwrap(); - VortexWrite::flush(&mut writer).await.unwrap(); - }); - - let data = fs::read(path).unwrap(); - assert_eq!(data, vec![1, 2, 3]); - } -} diff --git a/vortex-duckdb/src/duckdb/mod.rs b/vortex-duckdb/src/duckdb/mod.rs index ac653e22813..c393df8dff7 100644 --- a/vortex-duckdb/src/duckdb/mod.rs +++ b/vortex-duckdb/src/duckdb/mod.rs @@ -10,7 +10,6 @@ mod data_chunk; mod database; mod ddb_string; mod expr; -mod file_system; mod logical_type; mod macro_; mod query_result; @@ -36,7 +35,6 @@ pub use data_chunk::*; pub use database::*; pub use ddb_string::*; pub use expr::*; -pub use file_system::*; pub use logical_type::*; pub use query_result::*; pub use reusable_dict::*; diff --git a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs index e0705e07de0..d94efb7a13e 100644 --- a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs +++ b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs @@ -395,60 +395,6 @@ fn test_vortex_scan_multiple_globs() { assert_eq!(total_sum, 55); } -#[test] -fn test_vortex_scan_over_http() { - let file = RUNTIME.block_on(async { - let strings = VarBinArray::from(vec!["a", "b", "c"]); - write_single_column_vortex_file("strings", strings).await - }); - - let file_bytes = std::fs::read(file.path()).unwrap(); - let listener = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = listener.local_addr().unwrap(); - - // Spawn 10 threads because DuckDB does HEAD and GET requests with retries, - // thus 2 threads, one for each implementation, aren't enough - std::thread::spawn(move || { - for _ in 0..10 { - if let Ok((mut stream, _)) = listener.accept() { - let response = format!( - "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", - file_bytes.len() - ); - stream.write_all(response.as_bytes()).unwrap(); - stream.write_all(&file_bytes).unwrap(); - } - } - }); - - let conn = database_connection(); - conn.query("SET vortex_filesystem = 'duckdb';").unwrap(); - for httpfs_impl in ["httplib", "curl"] { - println!("Testing httpfs client implementation: {httpfs_impl}"); - conn.query(&format!( - "SET httpfs_client_implementation = '{httpfs_impl}';" - )) - .unwrap(); - - let url = format!( - "http://{}/{}", - addr, - file.path().file_name().unwrap().to_string_lossy() - ); - println!("url={url}, file={}", file.path().display()); - - let result = conn - .query(&format!("SELECT COUNT(*) FROM read_vortex('{url}')")) - .unwrap(); - let chunk = result.into_iter().next().unwrap(); - let count = chunk - .get_vector(0) - .as_slice_with_len::(chunk.len().as_())[0]; - - assert_eq!(count, 3); - } -} - #[test] fn test_write_file() { let conn = database_connection(); diff --git a/vortex-duckdb/src/ffi.rs b/vortex-duckdb/src/ffi.rs index de243365efc..a07ae3b17a4 100644 --- a/vortex-duckdb/src/ffi.rs +++ b/vortex-duckdb/src/ffi.rs @@ -19,7 +19,6 @@ use crate::copy::copy_to_sink; use crate::cpp; use crate::duckdb::BindInput; use crate::duckdb::BindResult; -use crate::duckdb::ClientContext; use crate::duckdb::Data; use crate::duckdb::DataChunk; use crate::duckdb::DuckdbStringMap; @@ -213,17 +212,15 @@ pub unsafe extern "C-unwind" fn duckdb_table_function_init_local( #[unsafe(no_mangle)] pub unsafe extern "C-unwind" fn duckdb_table_function_bind( - ctx: cpp::duckdb_client_context, bind_input: cpp::duckdb_vx_tfunc_bind_input, bind_result: cpp::duckdb_vx_tfunc_bind_result, error_out: *mut cpp::duckdb_vx_error, ) -> cpp::duckdb_vx_data { - let client_context = unsafe { ClientContext::borrow(ctx) }; let bind_input = unsafe { BindInput::own(bind_input) }; let mut bind_result = unsafe { BindResult::own(bind_result) }; try_or_null(error_out, || { - let bind_data = bind(client_context, &bind_input, &mut bind_result)?; + let bind_data = bind(&bind_input, &mut bind_result)?; Ok(Data::from(Box::new(bind_data)).as_ptr()) }) } @@ -270,7 +267,6 @@ pub unsafe extern "C-unwind" fn duckdb_copy_function_copy_to_bind( #[unsafe(no_mangle)] pub unsafe extern "C-unwind" fn duckdb_copy_function_copy_to_initialize_global( - client_context: cpp::duckdb_client_context, bind_data: *const c_void, file_path: *const c_char, error_out: *mut cpp::duckdb_vx_error, @@ -280,9 +276,8 @@ pub unsafe extern "C-unwind" fn duckdb_copy_function_copy_to_initialize_global( .into_owned(); let bind_data = unsafe { bind_data.cast::().as_ref() } .vortex_expect("bind_data null pointer"); - let ctx = unsafe { ClientContext::borrow(client_context) }; try_or_null(error_out, || { - let bind_data = copy_to_initialize_global(ctx, bind_data, file_path)?; + let bind_data = copy_to_initialize_global(bind_data, file_path)?; Ok(Data::from(Box::new(bind_data)).as_ptr()) }) } diff --git a/vortex-duckdb/src/filesystem.rs b/vortex-duckdb/src/filesystem.rs deleted file mode 100644 index b9c09dfe9d3..00000000000 --- a/vortex-duckdb/src/filesystem.rs +++ /dev/null @@ -1,384 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::ffi::CString; -use std::fmt::Debug; -use std::fmt::Formatter; -use std::ptr; -use std::sync::Arc; -use std::sync::OnceLock; - -use async_trait::async_trait; -use futures::FutureExt; -use futures::StreamExt; -use futures::future::BoxFuture; -use futures::stream; -use futures::stream::BoxStream; -use object_store::ObjectStore; -use object_store::aws::AmazonS3Builder; -use object_store::local::LocalFileSystem; -use url::Url; -use vortex::array::buffer::BufferHandle; -use vortex::buffer::Alignment; -use vortex::buffer::ByteBufferMut; -use vortex::error::VortexError; -use vortex::error::VortexResult; -use vortex::error::vortex_bail; -use vortex::error::vortex_err; -use vortex::io::CoalesceConfig; -use vortex::io::VortexReadAt; -use vortex::io::compat::Compat; -use vortex::io::filesystem::FileListing; -use vortex::io::filesystem::FileSystem; -use vortex::io::filesystem::FileSystemRef; -use vortex::io::object_store::DEFAULT_CONCURRENCY as OBJECT_STORE_DEFAULT_CONCURRENCY; -use vortex::io::object_store::ObjectStoreFileSystem; -use vortex::io::runtime::BlockingRuntime; -use vortex::io::std_file::DEFAULT_CONCURRENCY as STD_FILE_DEFAULT_CONCURRENCY; - -use crate::RUNTIME; -use crate::cpp; -use crate::duckdb::ClientContextRef; -use crate::duckdb::FsFileHandle; -use crate::duckdb::duckdb_fs_list_dir; -use crate::duckdb::fs_error; - -pub(super) fn resolve_filesystem( - base_url: &Url, - ctx: &ClientContextRef, -) -> VortexResult { - let fs_config = ctx - .try_get_current_setting(c"vortex_filesystem") - .ok_or_else(|| { - vortex_err!("Failed to read 'vortex_filesystem' setting from DuckDB config") - })?; - let fs_config = fs_config.as_string(); - - Ok(if fs_config.as_str() == "duckdb" { - tracing::debug!( - "Using DuckDB's built-in filesystem for URL scheme '{}'", - base_url.scheme() - ); - // SAFETY: The ClientContext is owned by the Connection and lives for the duration of - // query execution. DuckDB keeps the connection alive while the filesystem is in use. - Arc::new(DuckDbFileSystem::new(base_url.clone(), unsafe { - ctx.erase_lifetime() - })) - } else if fs_config.as_str() == "vortex" { - tracing::debug!( - "Using Vortex's object store filesystem for URL scheme '{}'", - base_url.scheme() - ); - object_store_fs(base_url)? - } else { - vortex_bail!( - "Unsupported filesystem '{}', vortex_filesystem setting must be set to either 'duckdb' or 'vortex'", - fs_config.as_str() - ); - }) -} - -fn object_store_fs(base_url: &Url) -> VortexResult { - let object_store: Arc = if base_url.scheme() == "file" { - Arc::new(LocalFileSystem::new()) - } else if base_url.scheme() == "s3" { - Arc::new( - AmazonS3Builder::from_env() - .with_bucket_name(base_url.host_str().ok_or_else(|| { - vortex_err!("Failed to extract bucket name from URL: {base_url}") - })?) - .build()?, - ) - } else { - vortex_bail!( - "Unsupported URL scheme '{}', only 'file' and 's3' are supported with vortex_filesystem='vortex'", - base_url.scheme() - ); - }; - - let object_store = Arc::new(Compat::new(object_store)) as Arc; - - Ok(Arc::new(ObjectStoreFileSystem::new( - object_store, - RUNTIME.handle(), - ))) -} - -struct DuckDbFileSystem { - base_url: Url, - ctx: &'static ClientContextRef, -} - -impl Debug for DuckDbFileSystem { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("DuckDbFileSystem") - .field("base_url", &self.base_url) - .finish() - } -} - -impl DuckDbFileSystem { - pub fn new(base_url: Url, ctx: &'static ClientContextRef) -> Self { - Self { base_url, ctx } - } -} - -#[async_trait] -impl FileSystem for DuckDbFileSystem { - fn list(&self, prefix: &str) -> BoxStream<'_, VortexResult> { - let mut directory_url = self.base_url.clone(); - if !prefix.is_empty() { - directory_url.set_path(prefix); - } - - let ctx = self.ctx; - - let base_url = self.base_url.clone(); - stream::once(async move { - RUNTIME - .handle() - .spawn_blocking(move || list_recursive(ctx, &directory_url, &base_url)) - .await - }) - .flat_map(|result| match result { - Ok(listings) => stream::iter(listings.into_iter().map(Ok)).boxed(), - Err(e) => stream::once(async move { Err(e) }).boxed(), - }) - .boxed() - } - - async fn head(&self, path: &str) -> VortexResult> { - // DuckDB's filesystem exposes no stat/exists call, so probe the path by opening it and - // reading its size. DuckDB does not distinguish "not found" from other open failures, so - // any open error is treated as a missing file (logged for diagnosability). - match self.open_read(path).await { - Ok(reader) => Ok(Some(FileListing { - path: path.to_string(), - size: Some(reader.size().await?), - })), - Err(e) => { - tracing::debug!("head({path}): treating open error as not-found: {e}"); - Ok(None) - } - } - } - - async fn open_read(&self, path: &str) -> VortexResult> { - let mut url = self.base_url.clone(); - url.set_path(path); - let reader = unsafe { DuckDbFsReader::open_url(self.ctx.as_ptr(), &url)? }; - Ok(Arc::new(reader)) - } - - async fn delete(&self, path: &str) -> VortexResult<()> { - let mut url = self.base_url.clone(); - url.set_path(path); - let c_path = CString::new(url.as_str()).map_err(|e| vortex_err!("Invalid URL: {e}"))?; - let ctx = self.ctx; - - RUNTIME - .handle() - .spawn_blocking(move || { - let mut err: cpp::duckdb_vx_error = ptr::null_mut(); - let status = unsafe { - cpp::duckdb_vx_fs_remove(ctx.as_ptr(), c_path.as_ptr(), &raw mut err) - }; - if status != cpp::duckdb_state::DuckDBSuccess { - return Err(fs_error(err)); - } - Ok::<_, VortexError>(()) - }) - .await - } -} - -/// Recursively list all files under `directory`, stripping `base_path` from each -/// returned URL to produce relative paths. -fn list_recursive( - ctx: &ClientContextRef, - directory_url: &Url, - base_url: &Url, -) -> VortexResult> { - // DuckDB's ListFiles expects bare paths for local files, but full URLs - // for remote schemes (s3://, etc.). - let directory = if directory_url.scheme() == "file" { - directory_url.path().to_string() - } else { - directory_url.to_string() - }; - - let (base_path, is_remote_path) = if base_url.scheme() == "file" { - (base_url.path().to_string(), false) - } else { - // This is really ugly. As we operate on Strings and not on urls, we - // must produce a base path with / so as relative url would not have - // the / and thus match the glob - (format!("{base_url}/"), true) - }; - - let mut results = Vec::new(); - let mut stack = vec![directory]; - - while let Some(dir) = stack.pop() { - // TODO(myrrc) this doesn't work with curl backend in v1.4, producing - // "URL using bad/illegal format or missing URL error", see - // https://github.com/duckdb/duckdb-httpfs/pull/265 - for entry in duckdb_fs_list_dir(ctx, &dir)? { - // duckdb_fs_list_dir returns relative paths for local files but full - // paths for s3 files. - let full_path = if is_remote_path { - entry.name - } else { - format!("{}/{}", dir.trim_end_matches('/'), entry.name) - }; - if entry.is_dir { - stack.push(full_path); - } else { - let relative_path = full_path - .strip_prefix(&base_path) - .unwrap_or_else(|| &full_path) - .to_string(); - results.push(FileListing { - path: relative_path, - size: None, - }); - } - } - } - - Ok(results) -} - -/// A VortexReadAt implementation backed by DuckDB's filesystem (e.g., httpfs/s3). -pub(crate) struct DuckDbFsReader { - handle: Arc, - uri: Arc, - is_local: bool, - size: Arc>, -} - -impl DuckDbFsReader { - pub(crate) unsafe fn open_url( - ctx: cpp::duckdb_client_context, - url: &Url, - ) -> VortexResult { - let c_path = CString::new(url.as_str()).map_err(|e| vortex_err!("Invalid URL: {e}"))?; - let mut err: cpp::duckdb_vx_error = ptr::null_mut(); - let handle = unsafe { cpp::duckdb_vx_fs_open(ctx, c_path.as_ptr(), &raw mut err) }; - if handle.is_null() { - return Err(fs_error(err)); - } - - let is_local = url.scheme() == "file"; - - Ok(Self { - handle: Arc::new(unsafe { FsFileHandle::own(handle) }), - uri: Arc::from(url.as_str()), - is_local, - size: Arc::new(OnceLock::new()), - }) - } -} - -impl VortexReadAt for DuckDbFsReader { - fn uri(&self) -> Option<&Arc> { - Some(&self.uri) - } - - fn coalesce_config(&self) -> Option { - Some(if self.is_local { - CoalesceConfig::file() - } else { - CoalesceConfig::object_storage() - }) - } - - fn concurrency(&self) -> usize { - if self.is_local { - STD_FILE_DEFAULT_CONCURRENCY - } else { - OBJECT_STORE_DEFAULT_CONCURRENCY - } - } - - fn size(&self) -> BoxFuture<'static, VortexResult> { - let handle = Arc::clone(&self.handle); - let size_cell = Arc::clone(&self.size); - - async move { - if let Some(size) = size_cell.get() { - return Ok(*size); - } - - let runtime = RUNTIME.handle(); - let size = runtime - .spawn_blocking(move || { - let mut err: cpp::duckdb_vx_error = ptr::null_mut(); - let mut size_out: cpp::idx_t = 0; - let status = unsafe { - cpp::duckdb_vx_fs_get_size(handle.as_ptr(), &raw mut size_out, &raw mut err) - }; - if status != cpp::duckdb_state::DuckDBSuccess { - return Err(fs_error(err)); - } - Ok::<_, VortexError>(size_out as u64) - }) - .await?; - - let _ = size_cell.set(size); - Ok(size) - } - .boxed() - } - - fn read_at( - &self, - offset: u64, - length: usize, - alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { - let handle = Arc::clone(&self.handle); - - async move { - let runtime = RUNTIME.handle(); - let result: VortexResult = runtime - .spawn_blocking(move || -> VortexResult { - let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment); - unsafe { buffer.set_len(length) }; - - let mut err: cpp::duckdb_vx_error = ptr::null_mut(); - let mut out_len: cpp::idx_t = 0; - let status = unsafe { - cpp::duckdb_vx_fs_read( - handle.as_ptr(), - offset as cpp::idx_t, - length as cpp::idx_t, - buffer.as_mut_slice().as_mut_ptr(), - &raw mut out_len, - &raw mut err, - ) - }; - - if status != cpp::duckdb_state::DuckDBSuccess { - return Err(fs_error(err)); - } - - let used = usize::try_from(out_len) - .map_err(|e| vortex_err!("Invalid read len: {e}"))?; - unsafe { buffer.set_len(used) }; - - let frozen = buffer.freeze(); - Ok::<_, VortexError>(BufferHandle::new_host(frozen)) - }) - .await; - result - } - .boxed() - } -} - -// SAFETY: DuckDB file handles can be used across threads when operations are position-based. The -// C++ bridge opens handles with FILE_FLAGS_PARALLEL_ACCESS, and writes use explicit offsets, so -// there is no shared cursor state. -unsafe impl Send for DuckDbFsReader {} -unsafe impl Sync for DuckDbFsReader {} diff --git a/vortex-duckdb/src/lib.rs b/vortex-duckdb/src/lib.rs index b21801b4564..0abfdd8b8f2 100644 --- a/vortex-duckdb/src/lib.rs +++ b/vortex-duckdb/src/lib.rs @@ -26,7 +26,6 @@ mod convert; pub mod duckdb; mod exporter; mod ffi; -mod filesystem; mod multi_file; mod projection; mod table_function; diff --git a/vortex-duckdb/src/multi_file.rs b/vortex-duckdb/src/multi_file.rs index 165bcad6677..bb9e015af5c 100644 --- a/vortex-duckdb/src/multi_file.rs +++ b/vortex-duckdb/src/multi_file.rs @@ -6,12 +6,17 @@ use std::path::absolute; use std::sync::Arc; use itertools::Itertools; +use object_store::ObjectStore; +use object_store::aws::AmazonS3Builder; +use object_store::local::LocalFileSystem; use url::Url; use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_err; use vortex::file::multi::MultiFileDataSource; +use vortex::io::compat::Compat; use vortex::io::filesystem::FileSystemRef; +use vortex::io::object_store::ObjectStoreFileSystem; use vortex::io::runtime::BlockingRuntime; use vortex::layout::scan::multi::MultiLayoutDataSource; use vortex_utils::aliases::hash_map::HashMap; @@ -19,9 +24,7 @@ use vortex_utils::aliases::hash_map::HashMap; use crate::RUNTIME; use crate::SESSION; use crate::duckdb::BindInputRef; -use crate::duckdb::ClientContextRef; use crate::duckdb::ExtractedValue; -use crate::filesystem::resolve_filesystem; /// Parse a glob string into a [`Url`]. /// @@ -55,11 +58,27 @@ fn normalize_path(path: std::path::PathBuf) -> std::path::PathBuf { normalized } +fn resolve_filesystem(base_url: &Url) -> VortexResult { + let object_store: Arc = match base_url.scheme() { + "file" => Arc::new(LocalFileSystem::new()), + "s3" => Arc::new( + AmazonS3Builder::from_env() + .with_bucket_name(base_url.host_str().ok_or_else(|| { + vortex_err!("Failed to extract bucket name from URL: {base_url}") + })?) + .build()?, + ), + other => vortex_bail!("Unsupported URL scheme '{other}'"), + }; + + Ok(Arc::new(ObjectStoreFileSystem::new( + Arc::new(Compat::new(object_store)), + RUNTIME.handle(), + ))) +} + /// Shared bind logic for both single-glob and multi-glob variants. -pub fn bind_multi_file_scan( - ctx: &ClientContextRef, - input: &BindInputRef, -) -> VortexResult { +pub fn bind_multi_file_scan(input: &BindInputRef) -> VortexResult { let glob_url_parameter = input .get_parameter(0) .ok_or_else(|| vortex_err!("Missing file glob parameter"))?; @@ -94,7 +113,7 @@ pub fn bind_multi_file_scan( let mut base_url = glob_url.clone(); base_url.set_path(""); if !fs_cache.contains_key(&base_url) { - let fs = resolve_filesystem(&base_url, ctx)?; + let fs = resolve_filesystem(&base_url)?; fs_cache.insert(base_url, fs); } } diff --git a/vortex-duckdb/src/table_function.rs b/vortex-duckdb/src/table_function.rs index b5e8c3ff7e8..503c840bf8d 100644 --- a/vortex-duckdb/src/table_function.rs +++ b/vortex-duckdb/src/table_function.rs @@ -49,7 +49,6 @@ use crate::convert::try_from_bound_expression; use crate::convert::try_from_projection_expression; use crate::duckdb::BindInputRef; use crate::duckdb::BindResultRef; -use crate::duckdb::ClientContextRef; use crate::duckdb::DataChunkRef; use crate::duckdb::DuckdbStringMapRef; use crate::duckdb::ExpressionRef; @@ -143,12 +142,8 @@ pub enum Cardinality { Estimate(u64), } -pub fn bind( - ctx: &ClientContextRef, - input: &BindInputRef, - result: &mut BindResultRef, -) -> VortexResult { - let data_source = bind_multi_file_scan(ctx, input)?; +pub fn bind(input: &BindInputRef, result: &mut BindResultRef) -> VortexResult { + let data_source = bind_multi_file_scan(input)?; let column_fields = extract_schema_from_dtype(data_source.dtype())?; for fields in &column_fields { result.add_result_column(&fields.name, &fields.logical_type);