Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions vortex-duckdb/cpp/copy_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,12 @@ unique_ptr<FunctionData> copy_to_bind(ClientContext &,
}

unique_ptr<GlobalFunctionData>
copy_to_initialize_global(ClientContext &context, FunctionData &bind_data, const string &file_path) {
copy_to_initialize_global(ClientContext &, FunctionData &bind_data, const string &file_path) {
void *const ffi_bind = bind_data.Cast<CopyBindData>().ffi_data->DataPtr();
const auto ffi_ctx = reinterpret_cast<duckdb_client_context>(&context);

duckdb_vx_error error_out = nullptr;
const duckdb_vx_data ffi_global =
duckdb_copy_function_copy_to_initialize_global(ffi_ctx, ffi_bind, file_path.c_str(), &error_out);
duckdb_copy_function_copy_to_initialize_global(ffi_bind, file_path.c_str(), &error_out);
if (error_out) {
throw ExecutorException(IntoErrString(error_out));
}
Expand Down
6 changes: 2 additions & 4 deletions vortex-duckdb/cpp/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,14 @@ bool projection_expression_pushdown(ClientContext &, const TableFunctionProjecti
* and after a query another file is added matching the glob, for second query
* bind() will be called again.
*/
unique_ptr<FunctionData> duckdb_vx_table_function_bind(ClientContext &context,
unique_ptr<FunctionData> duckdb_vx_table_function_bind(ClientContext &,
TableFunctionBindInput &input,
vector<LogicalType> &return_types,
vector<string> &names) {
CTableBindResult result = {return_types, names};

duckdb_vx_error error_out = nullptr;
auto ctx = reinterpret_cast<duckdb_client_context>(&context);
auto ffi_bind_data = duckdb_table_function_bind(ctx,
reinterpret_cast<duckdb_vx_tfunc_bind_input>(&input),
auto ffi_bind_data = duckdb_table_function_bind(reinterpret_cast<duckdb_vx_tfunc_bind_input>(&input),
reinterpret_cast<duckdb_vx_tfunc_bind_result>(&result),
&error_out);
if (error_out) {
Expand Down
6 changes: 2 additions & 4 deletions vortex-duckdb/include/vortex.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ duckdb_vx_data duckdb_table_function_init_global(const duckdb_vx_tfunc_init_inpu
extern duckdb_vx_data duckdb_table_function_init_local(void *global_init_data);

extern
duckdb_vx_data duckdb_table_function_bind(duckdb_client_context ctx,
duckdb_vx_tfunc_bind_input bind_input,
duckdb_vx_data duckdb_table_function_bind(duckdb_vx_tfunc_bind_input bind_input,
duckdb_vx_tfunc_bind_result bind_result,
duckdb_vx_error *error_out);

Expand All @@ -101,8 +100,7 @@ duckdb_vx_data duckdb_copy_function_copy_to_bind(const char *const *column_names
duckdb_vx_error *error_out);

extern
duckdb_vx_data duckdb_copy_function_copy_to_initialize_global(duckdb_client_context client_context,
const void *bind_data,
duckdb_vx_data duckdb_copy_function_copy_to_initialize_global(const void *bind_data,
const char *file_path,
duckdb_vx_error *error_out);

Expand Down
22 changes: 10 additions & 12 deletions vortex-duckdb/src/copy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use async_fs::OpenOptions;
use futures::SinkExt;
use futures::TryStreamExt;
use futures::channel::mpsc;
Expand Down Expand Up @@ -28,9 +29,7 @@ use crate::RUNTIME;
use crate::SESSION;
use crate::convert::FromLogicalType;
use crate::convert::data_chunk_to_vortex;
use crate::duckdb::ClientContextRef;
use crate::duckdb::DataChunkRef;
use crate::duckdb::DuckDbFsWriter;
use crate::duckdb::LogicalTypeRef;

#[derive(Clone)]
Expand Down Expand Up @@ -111,7 +110,6 @@ pub fn copy_to_finalize(init_global: &mut CopyFunctionGlobal) -> VortexResult<()
}

pub fn copy_to_initialize_global(
client_context: &ClientContextRef,
bind_data: &CopyFunctionBind,
file_path: String,
) -> VortexResult<CopyFunctionGlobal> {
Expand All @@ -120,16 +118,16 @@ pub fn copy_to_initialize_global(
let array_stream = ArrayStreamAdapter::new(bind_data.dtype.clone(), rx.into_stream());

let handle = SESSION.handle();
// SAFETY: The ClientContext is owned by the Connection and lives for the duration of
// query execution. DuckDB keeps the connection alive while this copy function runs.
let ctx = unsafe { client_context.erase_lifetime() };

// Use DuckDB FS exclusively to match the DuckDB client context configuration.
let writer = DuckDbFsWriter::new(ctx, &file_path)
.map_err(|e| vortex_err!("Failed to create DuckDB FS writer for {file_path}: {e}"))?;

let write_task =
handle.spawn(async move { SESSION.write_options().write(writer, array_stream).await });
let write_task = handle.spawn(async move {
let writer = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(file_path)
.await?;
SESSION.write_options().write(writer, array_stream).await
});

let worker_pool = RUNTIME.new_pool();
worker_pool.set_workers_to_available_parallelism();
Expand Down
198 changes: 0 additions & 198 deletions vortex-duckdb/src/duckdb/file_system.rs

This file was deleted.

2 changes: 0 additions & 2 deletions vortex-duckdb/src/duckdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ mod data_chunk;
mod database;
mod ddb_string;
mod expr;
mod file_system;
mod logical_type;
mod macro_;
mod query_result;
Expand All @@ -36,7 +35,6 @@ pub use data_chunk::*;
pub use database::*;
pub use ddb_string::*;
pub use expr::*;
pub use file_system::*;
pub use logical_type::*;
pub use query_result::*;
pub use reusable_dict::*;
Expand Down
54 changes: 0 additions & 54 deletions vortex-duckdb/src/e2e_test/vortex_scan_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,60 +395,6 @@ fn test_vortex_scan_multiple_globs() {
assert_eq!(total_sum, 55);
}

#[test]
fn test_vortex_scan_over_http() {
let file = RUNTIME.block_on(async {
let strings = VarBinArray::from(vec!["a", "b", "c"]);
write_single_column_vortex_file("strings", strings).await
});

let file_bytes = std::fs::read(file.path()).unwrap();
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();

// Spawn 10 threads because DuckDB does HEAD and GET requests with retries,
// thus 2 threads, one for each implementation, aren't enough
std::thread::spawn(move || {
for _ in 0..10 {
if let Ok((mut stream, _)) = listener.accept() {
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n",
file_bytes.len()
);
stream.write_all(response.as_bytes()).unwrap();
stream.write_all(&file_bytes).unwrap();
}
}
});

let conn = database_connection();
conn.query("SET vortex_filesystem = 'duckdb';").unwrap();
for httpfs_impl in ["httplib", "curl"] {
println!("Testing httpfs client implementation: {httpfs_impl}");
conn.query(&format!(
"SET httpfs_client_implementation = '{httpfs_impl}';"
))
.unwrap();

let url = format!(
"http://{}/{}",
addr,
file.path().file_name().unwrap().to_string_lossy()
);
println!("url={url}, file={}", file.path().display());

let result = conn
.query(&format!("SELECT COUNT(*) FROM read_vortex('{url}')"))
.unwrap();
let chunk = result.into_iter().next().unwrap();
let count = chunk
.get_vector(0)
.as_slice_with_len::<i64>(chunk.len().as_())[0];

assert_eq!(count, 3);
}
}

#[test]
fn test_write_file() {
let conn = database_connection();
Expand Down
Loading
Loading