Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion encodings/parquet-variant/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ prost = { workspace = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
vortex-error = { workspace = true }
vortex-json = { workspace = true }
vortex-mask = { workspace = true }
vortex-proto = { workspace = true }
vortex-proto = { workspace = true, features = ["expr"] }
vortex-session = { workspace = true }

[dev-dependencies]
Expand Down
175 changes: 175 additions & 0 deletions encodings/parquet-variant/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ use vortex_array::arrays::List;
use vortex_array::arrays::ListArray;
use vortex_array::arrays::Struct;
use vortex_array::arrays::StructArray;
use vortex_array::arrays::Variant;
use vortex_array::arrays::VariantArray;
use vortex_array::arrays::list::ListArrayExt;
use vortex_array::arrays::struct_::StructArrayExt;
use vortex_array::arrays::variant::VariantArrayExt;
#[expect(
deprecated,
reason = "TODO(aduffy): figure out what to do with Parquet Variant"
Expand All @@ -42,6 +44,7 @@ use vortex_array::vtable::validity_to_child;
use vortex_buffer::BitBuffer;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_err;

use crate::ParquetVariant;
use crate::ParquetVariantArray;
Expand Down Expand Up @@ -307,6 +310,111 @@ fn inferred_shredded_field_validity(
Ok(Validity::from_mask(validity, Nullability::Nullable))
}

/// Reconstructs a Parquet `typed_value` tree from the storage-agnostic canonical shredded tree.
///
/// This is the inverse of [`logical_shredded_from_parquet_typed_value`]: canonicalization strips
/// the Parquet `value`/`typed_value` wrapper shells out of the shredded tree (representing
/// partially shredded fields as nested [`VariantArray`]s), and this re-adds them. The result is a
/// valid Parquet `typed_value` that can be reattached to a [`ParquetVariant`] and serialized to
/// Arrow, where [`parquet_variant_compute::unshred_variant`] merges it back with the residual
/// `value`.
///
/// `forward` then `inverse` is structure-preserving for the shapes canonicalization produces;
/// fields the forward transform omitted (Parquet wrapper shells with no `typed_value`) are served
/// from the residual `value` and are intentionally not reconstructed here.
pub(crate) fn parquet_typed_value_from_logical_shredded(
shredded: ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
if let Some(list_array) = shredded.as_opt::<List>() {
let elements = parquet_shredded_field_from_logical(list_array.elements().clone(), ctx)?;
return Ok(ListArray::try_new(
elements,
list_array.offsets().clone(),
list_array.list_validity(),
)?
.into_array());
}

let Some(struct_array) = shredded.as_opt::<Struct>() else {
// A bare typed leaf (a fully shredded scalar) is already a valid Parquet `typed_value`.
return Ok(shredded);
};

let mut names = Vec::with_capacity(struct_array.names().len());
let mut fields = Vec::with_capacity(struct_array.names().len());
for (name, field) in struct_array
.names()
.iter()
.zip(struct_array.iter_unmasked_fields())
{
names.push(FieldName::from(name.as_ref()));
fields.push(parquet_shredded_field_from_logical(field.clone(), ctx)?);
}

Ok(StructArray::try_new(
FieldNames::from_iter(names),
fields,
struct_array.len(),
struct_array.validity()?,
)?
.into_array())
}

/// Reconstructs one Parquet shredded field shell (`{value?, typed_value}`) from its canonical
/// representation, the inverse of [`logical_shredded_from_parquet_field`].
fn parquet_shredded_field_from_logical(
logical_field: ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let len = logical_field.len();

// Partially shredded fields canonicalize to a nested Variant whose core storage holds the
// residual `value` and whose own shredded tree holds the typed children.
if let Some(variant) = logical_field.as_opt::<Variant>() {
let core = variant
.core_storage()
.as_opt::<ParquetVariant>()
.ok_or_else(|| {
vortex_err!(
"cannot rebuild Parquet shredded field: nested Variant lacks Parquet Variant core storage"
)
})?;
let value = core.value_array().cloned().ok_or_else(|| {
vortex_err!("cannot rebuild Parquet shredded field: partially shredded Variant has no residual value")
})?;
let typed_value = variant
.shredded()
.cloned()
.map(|shredded| parquet_typed_value_from_logical_shredded(shredded, ctx))
.transpose()?;

let mut names = vec![FieldName::from("value")];
let mut fields = vec![value];
if let Some(typed_value) = typed_value {
names.push(FieldName::from("typed_value"));
fields.push(typed_value);
}
return Ok(StructArray::try_new(
FieldNames::from_iter(names),
fields,
len,
Validity::NonNullable,
)?
.into_array());
}

// Fully shredded field: rebuild its typed subtree and wrap it in a `typed_value`-only shell.
let typed_value = parquet_typed_value_from_logical_shredded(logical_field, ctx)?;
Ok(StructArray::try_new(
FieldNames::from_iter([FieldName::from("typed_value")]),
vec![typed_value],
len,
Validity::NonNullable,
)?
.into_array())
}

/// Accessors and Arrow conversion for Parquet Variant storage arrays.
pub trait ParquetVariantArrayExt: TypedArrayRef<ParquetVariant> {
/// Returns the non-nullable Parquet Variant metadata child.
Expand Down Expand Up @@ -389,20 +497,26 @@ mod tests {
use arrow_array::Array as _;
use arrow_array::ArrayRef as ArrowArrayRef;
use arrow_array::Int32Array;
use arrow_array::StringArray;
use arrow_array::StructArray;
use arrow_array::builder::BinaryViewBuilder;
use arrow_buffer::NullBuffer;
use arrow_schema::DataType;
use arrow_schema::Field;
use arrow_schema::Fields;
use parquet_variant::Variant as PqVariant;
use parquet_variant_compute::ShreddedSchemaBuilder;
use parquet_variant_compute::VariantArray as ArrowVariantArray;
use parquet_variant_compute::VariantArrayBuilder;
use parquet_variant_compute::json_to_variant;
use parquet_variant_compute::shred_variant;
use vortex_array::Canonical;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::arrays::variant::VariantArrayExt;
use vortex_array::assert_arrays_eq;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
Expand All @@ -413,6 +527,7 @@ mod tests {

use crate::ParquetVariant;
use crate::array::ParquetVariantArrayExt;
use crate::array::parquet_typed_value_from_logical_shredded;

fn assert_arrow_variant_storage_roundtrip(struct_array: StructArray) -> VortexResult<()> {
let arrow_variant = ArrowVariantArray::try_new(&struct_array)?;
Expand Down Expand Up @@ -678,4 +793,64 @@ mod tests {

assert_arrow_variant_storage_roundtrip(struct_array)
}

/// `parquet_typed_value_from_logical_shredded` must invert the wrapper-stripping that
/// canonicalization performs: an object-shredded Parquet variant, once canonicalized and then
/// rebuilt, must produce the same per-row values as the original.
#[test]
fn parquet_typed_value_inverse_roundtrips_object_shredding() -> VortexResult<()> {
// Shred `$.a` as Int32 over conforming, non-conforming, and missing-field rows.
let json: ArrowArrayRef = Arc::new(StringArray::from(vec![
Some(r#"{"a":1,"b":"x"}"#),
Some(r#"{"a":"not-a-number","b":"y"}"#),
Some(r#"{"b":"z"}"#),
]));
let shredding = ShreddedSchemaBuilder::new()
.with_path("a", &DataType::Int32)?
.build();
let shredded = shred_variant(&json_to_variant(&json)?, &shredding)?;
let original = ParquetVariant::from_arrow_variant(&shredded)?;
assert!(
original
.as_opt::<ParquetVariant>()
.ok_or_else(|| vortex_err!("expected parquet variant"))?
.typed_value_array()
.is_some(),
"fixture must be shredded"
);

// Canonicalize: the forward transform lifts `typed_value` into a logical shredded child.
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let Canonical::Variant(canonical) = original.clone().execute::<Canonical>(&mut ctx)? else {
return Err(vortex_err!("expected canonical variant"));
};
let core = canonical
.core_storage()
.as_opt::<ParquetVariant>()
.ok_or_else(|| vortex_err!("expected parquet variant core storage"))?;
let logical = canonical
.shredded()
.ok_or_else(|| vortex_err!("expected canonical shredded child"))?
.clone();

// Inverse transform: rebuild a Parquet `typed_value` and reattach it.
let rebuilt_typed_value = parquet_typed_value_from_logical_shredded(logical, &mut ctx)?;
let rebuilt = ParquetVariant::try_new(
ParquetVariantArrayExt::validity(&core),
core.metadata_array().clone(),
core.value_array().cloned(),
Some(rebuilt_typed_value),
)?
.into_array();

assert_eq!(rebuilt.len(), original.len());
for idx in 0..original.len() {
assert_eq!(
rebuilt.execute_scalar(idx, &mut ctx)?,
original.execute_scalar(idx, &mut ctx)?,
"row {idx}"
);
}
Ok(())
}
}
17 changes: 13 additions & 4 deletions encodings/parquet-variant/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use vortex_session::registry::Id;

use crate::ParquetVariant;
use crate::ParquetVariantArrayExt;
use crate::array::parquet_typed_value_from_logical_shredded;

/// Arrow canonical extension name for Parquet Variant storage.
const PARQUET_VARIANT_ARROW_EXTENSION_NAME: &str = "arrow.parquet.variant";
Expand All @@ -58,7 +59,7 @@ fn parquet_variant_storage_request(fields: &Fields) -> Option<(bool, bool)> {
(has_metadata && (has_value || has_typed_value)).then_some((has_value, has_typed_value))
}

fn export_storage_to_target<T: ParquetVariantArrayExt>(
pub(crate) fn export_storage_to_target<T: ParquetVariantArrayExt>(
parquet_array: &T,
target_fields: &Fields,
ctx: &mut ExecutionCtx,
Expand Down Expand Up @@ -99,7 +100,7 @@ fn export_storage_to_target<T: ParquetVariantArrayExt>(
)?))
}

fn export_unshredded_storage_to_target<T: ParquetVariantArrayExt>(
pub(crate) fn export_unshredded_storage_to_target<T: ParquetVariantArrayExt>(
parquet_array: &T,
target_fields: &Fields,
ctx: &mut ExecutionCtx,
Expand All @@ -115,7 +116,10 @@ fn export_unshredded_storage_to_target<T: ParquetVariantArrayExt>(
export_storage_to_target(&unshredded_parquet, target_fields, ctx)
}

fn parquet_variant_for_export(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
pub(crate) fn parquet_variant_for_export(
array: ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let executed = array.execute_until::<ParquetVariant>(ctx)?;
if executed.is::<ParquetVariant>() {
return Ok(executed);
Expand All @@ -135,11 +139,16 @@ fn parquet_variant_for_export(array: ArrayRef, ctx: &mut ExecutionCtx) -> Vortex
return Ok(core_storage);
};

// The canonical shredded child has had its Parquet `value`/`typed_value` wrapper shells
// stripped; rebuild them so the reattached `typed_value` is valid Parquet storage that
// `to_arrow` and `unshred_variant` can consume.
let typed_value = parquet_typed_value_from_logical_shredded(shredded.clone(), ctx)?;

ParquetVariant::try_new(
ParquetVariantArrayExt::validity(&parquet_core),
parquet_core.metadata_array().clone(),
parquet_core.value_array().cloned(),
Some(shredded.clone()),
Some(typed_value),
)
.map(IntoArray::into_array)
}
Expand Down
Loading
Loading