Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vortex-array/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ pub trait HostAllocatorExt: HostAllocator {
impl<A: HostAllocator + ?Sized> HostAllocatorExt for A {}

/// Session-scoped memory configuration for Vortex arrays.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MemorySession {
allocator: HostAllocatorRef,
}
Expand Down
1 change: 1 addition & 0 deletions vortex-io/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use vortex_session::SessionVar;
use crate::runtime::Handle;

/// Session state for Vortex async runtimes.
#[derive(Clone)]
pub struct RuntimeSession {
handle: Option<Handle>,
}
Expand Down
1 change: 1 addition & 0 deletions vortex-session/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ all-features = true
workspace = true

[dependencies]
arc-swap = { workspace = true }
dashmap = { workspace = true }
lasso = { workspace = true }
parking_lot = { workspace = true }
Expand Down
204 changes: 112 additions & 92 deletions vortex-session/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@ pub mod registry;
use std::any::Any;
use std::any::TypeId;
use std::any::type_name;
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::BuildHasherDefault;
use std::hash::Hasher;
use std::marker::PhantomData;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;

use dashmap::DashMap;
use dashmap::Entry;
use dashmap::mapref::one::MappedRef;
use dashmap::mapref::one::MappedRefMut;
use arc_swap::ArcSwap;
use vortex_error::VortexExpect;
use vortex_error::vortex_panic;

Expand Down Expand Up @@ -50,17 +49,20 @@ impl VortexSession {
///
/// If a variable of that type already exists.
pub fn with_some<V: SessionVar>(self, var: V) -> Self {
match self.0.entry(TypeId::of::<V>()) {
Entry::Occupied(_) => {
let tid = TypeId::of::<V>();
// Wrap once: the rcu closure may run multiple times under contention.
let arc: Arc<dyn SessionVar> = Arc::new(var);
self.0.rcu(|map| {
if map.contains_key(&tid) {
vortex_panic!(
"Session variable of type {} already exists",
type_name::<V>()
);
}
Entry::Vacant(e) => {
e.insert(Box::new(var));
}
}
let mut new = HashMap::clone(map);
new.insert(tid, arc.clone());
new
});
self
}

Expand Down Expand Up @@ -106,15 +108,17 @@ pub trait SessionExt: Sized + private::Sealed {
/// Returns the scope variable of type `V` if it exists.
fn get_opt<V: SessionVar>(&self) -> Option<Ref<'_, V>>;

/// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
///
/// Note that the returned value internally holds a lock on the variable.
fn get_mut<V: SessionVar + Default>(&self) -> RefMut<'_, V>;

/// Returns the scope variable of type `V`, if it exists.
/// Returns mutable access to the scope variable of type `V`, inserting a default if it does
/// not exist.
///
/// Note that the returned value internally holds a lock on the variable.
fn get_mut_opt<V: SessionVar>(&self) -> Option<RefMut<'_, V>>;
/// The store keeps variables behind shared snapshots, so this returns a *copy* that is
/// written back into the session when the [`RefMut`] is dropped (read-copy-update). The
/// mutation is therefore not observable through other handles until the returned guard drops —
/// fine for the setup-time builders that use it. Hence the `Clone` bound.
fn get_mut<V: SessionVar + Default + Clone>(&self) -> RefMut<'_, V>;

/// Like [`get_mut`](Self::get_mut), but returns `None` if the variable does not exist.
fn get_mut_opt<V: SessionVar + Clone>(&self) -> Option<RefMut<'_, V>>;
}

mod private {
Expand All @@ -129,73 +133,62 @@ impl SessionExt for VortexSession {

/// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
fn get<V: SessionVar + Default>(&self) -> Ref<'_, V> {
// NOTE(ngates): we don't use `entry().or_insert_with_key()` here because the DashMap
// would immediately acquire an exclusive write lock.
if let Some(v) = self.0.get(&TypeId::of::<V>()) {
return Ref(v.map(|v| {
(**v)
.as_any()
.downcast_ref::<V>()
.vortex_expect("Type mismatch - this is a bug")
}));
if let Some(r) = self.get_opt::<V>() {
return r;
}

// If we get here, the value was not present, so we insert the default with a write lock.
Ref(self
.0
.entry(TypeId::of::<V>())
.or_insert_with(|| Box::new(V::default()))
.downgrade()
.map(|v| {
(**v)
.as_any()
.downcast_ref::<V>()
.vortex_expect("Type mismatch - this is a bug")
}))
// Not present: insert a default via a copy-on-write swap, then read it back. The rcu may
// race with other inserters; whoever wins, the value is present afterwards.
let tid = TypeId::of::<V>();
let arc: Arc<dyn SessionVar> = Arc::new(V::default());
self.0.rcu(|map| {
let mut new = HashMap::clone(map);
new.entry(tid).or_insert_with(|| arc.clone());
new
});
self.get_opt::<V>()
.vortex_expect("default was just inserted")
}

fn get_opt<V: SessionVar>(&self) -> Option<Ref<'_, V>> {
self.0.get(&TypeId::of::<V>()).map(|v| {
Ref(v.map(|v| {
(**v)
.as_any()
.downcast_ref::<V>()
.vortex_expect("Type mismatch - this is a bug")
}))
// Lock-free read: load the current map snapshot (a plain atomic load — no shard RwLock),
// clone the value's Arc so it outlives the load guard, and downcast.
let map = self.0.load();
let arc = map.get(&TypeId::of::<V>())?.clone();
let ptr = (*arc)
.as_any()
.downcast_ref::<V>()
.vortex_expect("Type mismatch - this is a bug") as *const V;
Some(Ref {
_owner: arc,
ptr,
_marker: PhantomData,
})
}

/// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
///
/// Note that the returned value internally holds a lock on the variable.
fn get_mut<V: SessionVar + Default>(&self) -> RefMut<'_, V> {
RefMut(
self.0
.entry(TypeId::of::<V>())
.or_insert_with(|| Box::new(V::default()))
.map(|v| {
(**v)
.as_any_mut()
.downcast_mut::<V>()
.vortex_expect("Type mismatch - this is a bug")
}),
)
fn get_mut<V: SessionVar + Default + Clone>(&self) -> RefMut<'_, V> {
// Read-copy-update: hand back a copy, written back into the store on drop.
let value = self.get::<V>().clone();
RefMut {
session: self.session(),
value,
_marker: PhantomData,
}
}

fn get_mut_opt<V: SessionVar>(&self) -> Option<RefMut<'_, V>> {
self.0.get_mut(&TypeId::of::<V>()).map(|v| {
RefMut(v.map(|v| {
(**v)
.as_any_mut()
.downcast_mut::<V>()
.vortex_expect("Type mismatch - this is a bug")
}))
fn get_mut_opt<V: SessionVar + Clone>(&self) -> Option<RefMut<'_, V>> {
let value = self.get_opt::<V>()?.clone();
Some(RefMut {
session: self.session(),
value,
_marker: PhantomData,
})
}
}

/// A TypeMap based on `https://docs.rs/http/1.2.0/src/http/extensions.rs.html#41-266`.
type SessionVars = DashMap<TypeId, Box<dyn SessionVar>, BuildHasherDefault<IdHasher>>;
/// A read-lock-free typemap: writes (rare, setup-time) copy-on-write swap the whole map; reads
/// are plain atomic loads of the current snapshot, so concurrent per-node lookups never contend
/// a shard lock the way a `DashMap` keyed by a constant `TypeId` does.
type SessionVars = ArcSwap<HashMap<TypeId, Arc<dyn SessionVar>, BuildHasherDefault<IdHasher>>>;

/// With TypeIds as keys, there's no need to hash them. They are already hashes
/// themselves, coming from the compiler. The IdHasher just holds the u64 of
Expand Down Expand Up @@ -227,46 +220,73 @@ pub trait SessionVar: Any + Send + Sync + Debug + 'static {
fn as_any_mut(&mut self) -> &mut dyn Any;
}

// NOTE(ngates): we don't want to expose that the internals of a session is a DashMap, so we have
// our own wrapped Ref type.
pub struct Ref<'a, T>(MappedRef<'a, TypeId, Box<dyn SessionVar>, T>);
impl<'a, T> Deref for Ref<'a, T> {
// An owned, session-var-lifetime reference. Holds the value's `Arc` alive and a pointer into it,
// so it outlives the (temporary) `ArcSwap` load guard. The `'a` is vestigial (kept for API
// compatibility); the `Arc` is what actually keeps the borrow valid.
pub struct Ref<'a, T: ?Sized> {
_owner: Arc<dyn SessionVar>,
ptr: *const T,
_marker: PhantomData<&'a T>,
}
// SAFETY: `ptr` points into `_owner`, a `Send + Sync` `SessionVar`; we only expose `&T`, so this
// is sound whenever `T: Sync` (shared `&T` across threads) and `T: Send` is not required.
unsafe impl<T: ?Sized + Sync> Send for Ref<'_, T> {}
unsafe impl<T: ?Sized + Sync> Sync for Ref<'_, T> {}
impl<T: ?Sized> Deref for Ref<'_, T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.0
// SAFETY: `ptr` points into `_owner`, which is kept alive for `self`'s lifetime.
unsafe { &*self.ptr }
}
}
impl<'a, T> Ref<'a, T> {
/// Map this reference to a different target.
/// Map this reference to a different target within the same owning value.
pub fn map<F, U>(self, f: F) -> Ref<'a, U>
where
F: FnOnce(&T) -> &U,
{
Ref(self.0.map(f))
// SAFETY: `ptr` is valid for the borrow passed to `f`; the result points into `_owner`.
let ptr = f(unsafe { &*self.ptr }) as *const U;
Ref {
_owner: self._owner,
ptr,
_marker: PhantomData,
}
}
}

pub struct RefMut<'a, T>(MappedRefMut<'a, TypeId, Box<dyn SessionVar>, T>);
impl<'a, T> Deref for RefMut<'a, T> {
type Target = T;
/// A mutable handle to a session variable.
///
/// Because the store keeps variables behind shared, copy-on-write snapshots, this owns a working
/// copy of the variable and writes it back into the session on drop. Mutations are not visible to
/// other handles until this guard is dropped.
pub struct RefMut<'a, V: SessionVar + Clone> {
session: VortexSession,
value: V,
_marker: PhantomData<&'a mut V>,
}
impl<V: SessionVar + Clone> Deref for RefMut<'_, V> {
type Target = V;

fn deref(&self) -> &Self::Target {
&self.0
&self.value
}
}
impl<'a, T> DerefMut for RefMut<'a, T> {
impl<V: SessionVar + Clone> DerefMut for RefMut<'_, V> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.deref_mut()
&mut self.value
}
}
impl<'a, T> RefMut<'a, T> {
/// Map this mutable reference to a different target.
pub fn map<F, U>(self, f: F) -> RefMut<'a, U>
where
F: FnOnce(&mut T) -> &mut U,
{
RefMut(self.0.map(f))
impl<V: SessionVar + Clone> Drop for RefMut<'_, V> {
fn drop(&mut self) {
let tid = TypeId::of::<V>();
let arc: Arc<dyn SessionVar> = Arc::new(self.value.clone());
self.session.0.rcu(|map| {
let mut new = HashMap::clone(map);
new.insert(tid, arc.clone());
new
});
}
}

Expand Down
Loading