diff --git a/Cargo.lock b/Cargo.lock index 693082a77f8..1c659c7dd60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10082,6 +10082,7 @@ dependencies = [ "vortex-error", "vortex-file", "vortex-io", + "vortex-json", "vortex-layout", "vortex-mask", "vortex-proto", diff --git a/encodings/parquet-variant/Cargo.toml b/encodings/parquet-variant/Cargo.toml index 6b3900abb27..d0c0a0d418e 100644 --- a/encodings/parquet-variant/Cargo.toml +++ b/encodings/parquet-variant/Cargo.toml @@ -27,8 +27,9 @@ prost = { workspace = true } vortex-array = { workspace = true } vortex-buffer = { workspace = true } vortex-error = { workspace = true } +vortex-json = { workspace = true } vortex-mask = { workspace = true } -vortex-proto = { workspace = true } +vortex-proto = { workspace = true, features = ["expr"] } vortex-session = { workspace = true } [dev-dependencies] diff --git a/encodings/parquet-variant/src/array.rs b/encodings/parquet-variant/src/array.rs index 37247430c37..18361f39334 100644 --- a/encodings/parquet-variant/src/array.rs +++ b/encodings/parquet-variant/src/array.rs @@ -19,9 +19,11 @@ use vortex_array::arrays::List; use vortex_array::arrays::ListArray; use vortex_array::arrays::Struct; use vortex_array::arrays::StructArray; +use vortex_array::arrays::Variant; use vortex_array::arrays::VariantArray; use vortex_array::arrays::list::ListArrayExt; use vortex_array::arrays::struct_::StructArrayExt; +use vortex_array::arrays::variant::VariantArrayExt; #[expect( deprecated, reason = "TODO(aduffy): figure out what to do with Parquet Variant" @@ -42,6 +44,7 @@ use vortex_array::vtable::validity_to_child; use vortex_buffer::BitBuffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_error::vortex_err; use crate::ParquetVariant; use crate::ParquetVariantArray; @@ -307,6 +310,111 @@ fn inferred_shredded_field_validity( Ok(Validity::from_mask(validity, Nullability::Nullable)) } +/// Reconstructs a Parquet `typed_value` tree from the storage-agnostic canonical shredded tree. +/// +/// This is the inverse of [`logical_shredded_from_parquet_typed_value`]: canonicalization strips +/// the Parquet `value`/`typed_value` wrapper shells out of the shredded tree (representing +/// partially shredded fields as nested [`VariantArray`]s), and this re-adds them. The result is a +/// valid Parquet `typed_value` that can be reattached to a [`ParquetVariant`] and serialized to +/// Arrow, where [`parquet_variant_compute::unshred_variant`] merges it back with the residual +/// `value`. +/// +/// `forward` then `inverse` is structure-preserving for the shapes canonicalization produces; +/// fields the forward transform omitted (Parquet wrapper shells with no `typed_value`) are served +/// from the residual `value` and are intentionally not reconstructed here. +pub(crate) fn parquet_typed_value_from_logical_shredded( + shredded: ArrayRef, + ctx: &mut ExecutionCtx, +) -> VortexResult { + if let Some(list_array) = shredded.as_opt::() { + let elements = parquet_shredded_field_from_logical(list_array.elements().clone(), ctx)?; + return Ok(ListArray::try_new( + elements, + list_array.offsets().clone(), + list_array.list_validity(), + )? + .into_array()); + } + + let Some(struct_array) = shredded.as_opt::() else { + // A bare typed leaf (a fully shredded scalar) is already a valid Parquet `typed_value`. + return Ok(shredded); + }; + + let mut names = Vec::with_capacity(struct_array.names().len()); + let mut fields = Vec::with_capacity(struct_array.names().len()); + for (name, field) in struct_array + .names() + .iter() + .zip(struct_array.iter_unmasked_fields()) + { + names.push(FieldName::from(name.as_ref())); + fields.push(parquet_shredded_field_from_logical(field.clone(), ctx)?); + } + + Ok(StructArray::try_new( + FieldNames::from_iter(names), + fields, + struct_array.len(), + struct_array.validity()?, + )? + .into_array()) +} + +/// Reconstructs one Parquet shredded field shell (`{value?, typed_value}`) from its canonical +/// representation, the inverse of [`logical_shredded_from_parquet_field`]. +fn parquet_shredded_field_from_logical( + logical_field: ArrayRef, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let len = logical_field.len(); + + // Partially shredded fields canonicalize to a nested Variant whose core storage holds the + // residual `value` and whose own shredded tree holds the typed children. + if let Some(variant) = logical_field.as_opt::() { + let core = variant + .core_storage() + .as_opt::() + .ok_or_else(|| { + vortex_err!( + "cannot rebuild Parquet shredded field: nested Variant lacks Parquet Variant core storage" + ) + })?; + let value = core.value_array().cloned().ok_or_else(|| { + vortex_err!("cannot rebuild Parquet shredded field: partially shredded Variant has no residual value") + })?; + let typed_value = variant + .shredded() + .cloned() + .map(|shredded| parquet_typed_value_from_logical_shredded(shredded, ctx)) + .transpose()?; + + let mut names = vec![FieldName::from("value")]; + let mut fields = vec![value]; + if let Some(typed_value) = typed_value { + names.push(FieldName::from("typed_value")); + fields.push(typed_value); + } + return Ok(StructArray::try_new( + FieldNames::from_iter(names), + fields, + len, + Validity::NonNullable, + )? + .into_array()); + } + + // Fully shredded field: rebuild its typed subtree and wrap it in a `typed_value`-only shell. + let typed_value = parquet_typed_value_from_logical_shredded(logical_field, ctx)?; + Ok(StructArray::try_new( + FieldNames::from_iter([FieldName::from("typed_value")]), + vec![typed_value], + len, + Validity::NonNullable, + )? + .into_array()) +} + /// Accessors and Arrow conversion for Parquet Variant storage arrays. pub trait ParquetVariantArrayExt: TypedArrayRef { /// Returns the non-nullable Parquet Variant metadata child. @@ -389,6 +497,7 @@ mod tests { use arrow_array::Array as _; use arrow_array::ArrayRef as ArrowArrayRef; use arrow_array::Int32Array; + use arrow_array::StringArray; use arrow_array::StructArray; use arrow_array::builder::BinaryViewBuilder; use arrow_buffer::NullBuffer; @@ -396,13 +505,18 @@ mod tests { use arrow_schema::Field; use arrow_schema::Fields; use parquet_variant::Variant as PqVariant; + use parquet_variant_compute::ShreddedSchemaBuilder; use parquet_variant_compute::VariantArray as ArrowVariantArray; use parquet_variant_compute::VariantArrayBuilder; + use parquet_variant_compute::json_to_variant; + use parquet_variant_compute::shred_variant; + use vortex_array::Canonical; use vortex_array::IntoArray; use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::VarBinViewArray; + use vortex_array::arrays::variant::VariantArrayExt; use vortex_array::assert_arrays_eq; use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; @@ -413,6 +527,7 @@ mod tests { use crate::ParquetVariant; use crate::array::ParquetVariantArrayExt; + use crate::array::parquet_typed_value_from_logical_shredded; fn assert_arrow_variant_storage_roundtrip(struct_array: StructArray) -> VortexResult<()> { let arrow_variant = ArrowVariantArray::try_new(&struct_array)?; @@ -678,4 +793,64 @@ mod tests { assert_arrow_variant_storage_roundtrip(struct_array) } + + /// `parquet_typed_value_from_logical_shredded` must invert the wrapper-stripping that + /// canonicalization performs: an object-shredded Parquet variant, once canonicalized and then + /// rebuilt, must produce the same per-row values as the original. + #[test] + fn parquet_typed_value_inverse_roundtrips_object_shredding() -> VortexResult<()> { + // Shred `$.a` as Int32 over conforming, non-conforming, and missing-field rows. + let json: ArrowArrayRef = Arc::new(StringArray::from(vec![ + Some(r#"{"a":1,"b":"x"}"#), + Some(r#"{"a":"not-a-number","b":"y"}"#), + Some(r#"{"b":"z"}"#), + ])); + let shredding = ShreddedSchemaBuilder::new() + .with_path("a", &DataType::Int32)? + .build(); + let shredded = shred_variant(&json_to_variant(&json)?, &shredding)?; + let original = ParquetVariant::from_arrow_variant(&shredded)?; + assert!( + original + .as_opt::() + .ok_or_else(|| vortex_err!("expected parquet variant"))? + .typed_value_array() + .is_some(), + "fixture must be shredded" + ); + + // Canonicalize: the forward transform lifts `typed_value` into a logical shredded child. + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let Canonical::Variant(canonical) = original.clone().execute::(&mut ctx)? else { + return Err(vortex_err!("expected canonical variant")); + }; + let core = canonical + .core_storage() + .as_opt::() + .ok_or_else(|| vortex_err!("expected parquet variant core storage"))?; + let logical = canonical + .shredded() + .ok_or_else(|| vortex_err!("expected canonical shredded child"))? + .clone(); + + // Inverse transform: rebuild a Parquet `typed_value` and reattach it. + let rebuilt_typed_value = parquet_typed_value_from_logical_shredded(logical, &mut ctx)?; + let rebuilt = ParquetVariant::try_new( + ParquetVariantArrayExt::validity(&core), + core.metadata_array().clone(), + core.value_array().cloned(), + Some(rebuilt_typed_value), + )? + .into_array(); + + assert_eq!(rebuilt.len(), original.len()); + for idx in 0..original.len() { + assert_eq!( + rebuilt.execute_scalar(idx, &mut ctx)?, + original.execute_scalar(idx, &mut ctx)?, + "row {idx}" + ); + } + Ok(()) + } } diff --git a/encodings/parquet-variant/src/arrow.rs b/encodings/parquet-variant/src/arrow.rs index 760cd1fb1aa..d5aca1e2e87 100644 --- a/encodings/parquet-variant/src/arrow.rs +++ b/encodings/parquet-variant/src/arrow.rs @@ -36,6 +36,7 @@ use vortex_session::registry::Id; use crate::ParquetVariant; use crate::ParquetVariantArrayExt; +use crate::array::parquet_typed_value_from_logical_shredded; /// Arrow canonical extension name for Parquet Variant storage. const PARQUET_VARIANT_ARROW_EXTENSION_NAME: &str = "arrow.parquet.variant"; @@ -58,7 +59,7 @@ fn parquet_variant_storage_request(fields: &Fields) -> Option<(bool, bool)> { (has_metadata && (has_value || has_typed_value)).then_some((has_value, has_typed_value)) } -fn export_storage_to_target( +pub(crate) fn export_storage_to_target( parquet_array: &T, target_fields: &Fields, ctx: &mut ExecutionCtx, @@ -99,7 +100,7 @@ fn export_storage_to_target( )?)) } -fn export_unshredded_storage_to_target( +pub(crate) fn export_unshredded_storage_to_target( parquet_array: &T, target_fields: &Fields, ctx: &mut ExecutionCtx, @@ -115,7 +116,10 @@ fn export_unshredded_storage_to_target( export_storage_to_target(&unshredded_parquet, target_fields, ctx) } -fn parquet_variant_for_export(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { +pub(crate) fn parquet_variant_for_export( + array: ArrayRef, + ctx: &mut ExecutionCtx, +) -> VortexResult { let executed = array.execute_until::(ctx)?; if executed.is::() { return Ok(executed); @@ -135,11 +139,16 @@ fn parquet_variant_for_export(array: ArrayRef, ctx: &mut ExecutionCtx) -> Vortex return Ok(core_storage); }; + // The canonical shredded child has had its Parquet `value`/`typed_value` wrapper shells + // stripped; rebuild them so the reattached `typed_value` is valid Parquet storage that + // `to_arrow` and `unshred_variant` can consume. + let typed_value = parquet_typed_value_from_logical_shredded(shredded.clone(), ctx)?; + ParquetVariant::try_new( ParquetVariantArrayExt::validity(&parquet_core), parquet_core.metadata_array().clone(), parquet_core.value_array().cloned(), - Some(shredded.clone()), + Some(typed_value), ) .map(IntoArray::into_array) } diff --git a/encodings/parquet-variant/src/fns/json_to_variant.rs b/encodings/parquet-variant/src/fns/json_to_variant.rs new file mode 100644 index 00000000000..0e47f465dd5 --- /dev/null +++ b/encodings/parquet-variant/src/fns/json_to_variant.rs @@ -0,0 +1,570 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! The `vortex.json_to_variant` scalar function. + +use std::fmt; +use std::fmt::Display; +use std::fmt::Formatter; +use std::sync::Arc; + +use arrow_schema::FieldRef; +use parquet_variant_compute::ShreddedSchemaBuilder; +use parquet_variant_compute::shred_variant; +use prost::Message; +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::arrow::ArrowSessionExt; +use vortex_array::dtype::DType; +use vortex_array::scalar_fn::Arity; +use vortex_array::scalar_fn::ChildName; +use vortex_array::scalar_fn::ExecutionArgs; +use vortex_array::scalar_fn::ScalarFnId; +use vortex_array::scalar_fn::ScalarFnVTable; +use vortex_array::scalar_fn::fns::variant_get::VariantPath; +use vortex_array::scalar_fn::fns::variant_get::VariantPathElement; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_json::Json; +use vortex_proto::expr as pb; +use vortex_session::VortexSession; +use vortex_session::registry::CachedId; + +use crate::ParquetVariant; +use crate::kernel::to_parquet_variant_path; + +/// Parses JSON strings into Parquet Variant values, optionally shredding fields. +/// +/// Accepts `Utf8` inputs or [`Json`] extension inputs and returns `Variant` values with the +/// input's nullability. Null rows stay null, the JSON literal `null` becomes a variant-null +/// value, and any row that fails to parse as JSON fails the whole expression. +/// +/// A non-empty [`ShreddingSpec`] additionally shreds the selected paths into a typed +/// `typed_value` child, following the [Parquet Variant shredding] rules: rows whose value does +/// not match the requested type stay readable through the residual variant value. +/// +/// # Not an inverse of `variant_to_json` +/// +/// `json_to_variant` and [`VariantToJson`](crate::VariantToJson) are lossy, normalizing +/// conversions and are NOT inverses of each other: +/// - JSON whitespace is not preserved. +/// - Object keys may be reordered: Variant metadata stores keys in sorted order, so +/// `variant_to_json` emits fields in a canonical order, not source order. +/// - Number formatting and precision change: e.g. `1.0` may render as `1`, exponent forms and +/// very large numbers are re-rendered, and floating-point values are re-encoded. +/// - Duplicate object keys are collapsed to a single entry. +/// - Unicode escape sequences are normalized (e.g. `\u0041` becomes `A`). +/// - `variant_to_json` stringifies Variant-only types — date, timestamp, UUID, binary, +/// decimal — so `json_to_variant(variant_to_json(v))` yields plain strings/numbers and loses +/// the original type information. +/// - Shredding structure is not recoverable from JSON: `variant_to_json` unshreds its input +/// first, and re-parsing produces an unshredded Variant unless a new [`ShreddingSpec`] is +/// supplied. +/// +/// [Parquet Variant shredding]: https://github.com/apache/parquet-format/blob/master/VariantShredding.md +#[derive(Clone)] +pub struct JsonToVariant; + +impl ScalarFnVTable for JsonToVariant { + type Options = JsonToVariantOptions; + + fn id(&self) -> ScalarFnId { + static ID: CachedId = CachedId::new("vortex.json_to_variant"); + *ID + } + + fn serialize(&self, options: &Self::Options) -> VortexResult>> { + let shredding = options + .shredding() + .fields() + .iter() + .map(|(path, dtype)| { + Ok(pb::ShreddingSpecField { + path: path + .elements() + .iter() + .map(VariantPathElement::to_proto) + .collect(), + dtype: Some(dtype.try_into()?), + }) + }) + .collect::>()?; + + Ok(Some(pb::JsonToVariantOpts { shredding }.encode_to_vec())) + } + + fn deserialize(&self, metadata: &[u8], session: &VortexSession) -> VortexResult { + let opts = pb::JsonToVariantOpts::decode(metadata)?; + let fields = opts + .shredding + .into_iter() + .map(|field| { + let path = field + .path + .into_iter() + .map(VariantPathElement::from_proto) + .collect::>()?; + let dtype = field + .dtype + .as_ref() + .ok_or_else(|| vortex_err!("ShreddingSpecField missing dtype")) + .and_then(|dtype| DType::from_proto(dtype, session))?; + Ok((path, dtype)) + }) + .collect::>>()?; + + Ok(JsonToVariantOptions::new(ShreddingSpec::try_new(fields)?)) + } + + fn arity(&self, _options: &Self::Options) -> Arity { + Arity::Exact(1) + } + + fn child_name(&self, _options: &Self::Options, child_idx: usize) -> ChildName { + match child_idx { + 0 => ChildName::from("input"), + _ => unreachable!("Invalid child index {child_idx} for JsonToVariant expression"), + } + } + + fn return_dtype(&self, _options: &Self::Options, arg_dtypes: &[DType]) -> VortexResult { + let input_dtype = &arg_dtypes[0]; + vortex_ensure!( + input_dtype.is_utf8() + || input_dtype + .as_extension_opt() + .is_some_and(|ext_dtype| ext_dtype.is::()), + "JsonToVariant input must be Utf8 or a Json extension, found {input_dtype}" + ); + + Ok(DType::Variant(input_dtype.nullability())) + } + + fn execute( + &self, + options: &Self::Options, + args: &dyn ExecutionArgs, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let input = args.get(0)?; + let input_nullable = input.dtype().is_nullable(); + + let storage = if input.dtype().is_utf8() { + input + } else if input + .dtype() + .as_extension_opt() + .is_some_and(|ext_dtype| ext_dtype.is::()) + { + input + .execute::(ctx)? + .storage_array() + .clone() + } else { + vortex_bail!( + "JsonToVariant input must be Utf8 or a Json extension, found {}", + input.dtype() + ); + }; + + let session = ctx.session().clone(); + let arrow_strings = session.arrow().execute_arrow(storage, None, ctx)?; + // Any row that fails to parse as JSON fails the whole conversion. + let arrow_variant = parquet_variant_compute::json_to_variant(&arrow_strings)?; + + let arrow_variant = if options.shredding().is_empty() { + arrow_variant + } else { + let mut builder = ShreddedSchemaBuilder::new(); + for (path, dtype) in options.shredding().fields() { + let field: FieldRef = Arc::new(session.arrow().to_arrow_field("shredded", dtype)?); + builder = builder.with_path(to_parquet_variant_path(path)?, field)?; + } + shred_variant(&arrow_variant, &builder.build())? + }; + + if input_nullable { + ParquetVariant::from_arrow_variant_nullable(&arrow_variant) + } else { + ParquetVariant::from_arrow_variant(&arrow_variant) + } + } +} + +/// A list of `(path, dtype)` directives describing which Variant paths to shred and as what +/// type. +/// +/// Paths must contain only object-field elements; list index elements are rejected because +/// Parquet Variant shredding schemas cannot express list element shredding yet. The root path +/// (`$`) shreds the top-level value itself. When entries overlap (e.g. `$.a` and `$.a.b`), +/// later entries overwrite earlier ones. +#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] +pub struct ShreddingSpec(Vec<(VariantPath, DType)>); + +impl ShreddingSpec { + /// Creates an empty spec, meaning no shredding is performed. + pub fn empty() -> Self { + Self::default() + } + + /// Creates a spec from `(path, dtype)` directives. + /// + /// # Errors + /// + /// Returns an error if any path contains a list index element. + pub fn try_new(fields: impl IntoIterator) -> VortexResult { + let fields = fields.into_iter().collect::>(); + for (path, _) in &fields { + vortex_ensure!( + path.elements() + .iter() + .all(|element| matches!(element, VariantPathElement::Field(_))), + "ShreddingSpec paths must only contain object fields, found list index in {path}" + ); + } + Ok(Self(fields)) + } + + /// Returns the `(path, dtype)` directives. + pub fn fields(&self) -> &[(VariantPath, DType)] { + &self.0 + } + + /// Returns whether this spec contains no directives. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl Display for ShreddingSpec { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + for (idx, (path, dtype)) in self.0.iter().enumerate() { + if idx > 0 { + write!(f, ", ")?; + } + write!(f, "{path} as {dtype}")?; + } + Ok(()) + } +} + +/// Options for [`JsonToVariant`]. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct JsonToVariantOptions { + /// The paths to shred into typed storage, if any. + shredding: ShreddingSpec, +} + +impl JsonToVariantOptions { + /// Creates options that shred the paths selected by `shredding`. + pub fn new(shredding: ShreddingSpec) -> Self { + Self { shredding } + } + + /// Creates options that perform no shredding. + pub fn unshredded() -> Self { + Self::new(ShreddingSpec::empty()) + } + + /// Returns the shredding spec. + pub fn shredding(&self) -> &ShreddingSpec { + &self.shredding + } +} + +impl Display for JsonToVariantOptions { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.shredding.fmt(f) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use vortex_array::Canonical; + use vortex_array::EmptyMetadata; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::arrays::StructArray; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::arrays::struct_::StructArrayExt; + use vortex_array::arrays::variant::VariantArrayExt; + use vortex_array::arrow::ArrowSession; + use vortex_array::assert_arrays_eq; + use vortex_array::assert_nth_scalar_is_null; + use vortex_array::dtype::Nullability; + use vortex_array::dtype::PType; + use vortex_array::dtype::session::DTypeSession; + use vortex_array::expr::Expression; + use vortex_array::expr::proto::ExprSerializeProtoExt; + use vortex_array::expr::root; + use vortex_array::expr::variant_get; + use vortex_array::scalar_fn::session::ScalarFnSession; + use vortex_array::session::ArraySession; + use vortex_error::vortex_bail; + + use super::*; + use crate::ParquetVariantArrayExt; + use crate::fns::json_to_variant; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty() + .with::() + .with::() + .with::() + .with::(); + crate::initialize(&session); + session + }); + + fn i64_dtype() -> DType { + DType::Primitive(PType::I64, Nullability::Nullable) + } + + fn shred_field_as_i64(field: &str) -> VortexResult { + ShreddingSpec::try_new([(VariantPath::field(field), i64_dtype())]) + } + + fn execute_json_to_variant( + input: ArrayRef, + shredding: ShreddingSpec, + ) -> VortexResult { + let expr = json_to_variant(root(), shredding); + input + .apply(&expr)? + .execute::(&mut SESSION.create_execution_ctx()) + } + + fn assert_variant_i64_rows(array: &ArrayRef, expected: &[Option]) -> VortexResult<()> { + assert_eq!(array.len(), expected.len()); + let mut ctx = SESSION.create_execution_ctx(); + for (idx, expected) in expected.iter().enumerate() { + let scalar = array.execute_scalar(idx, &mut ctx)?; + let variant = scalar.as_variant(); + match expected { + Some(expected) => { + let value = variant + .value() + .ok_or_else(|| vortex_err!("expected non-null variant at row {idx}"))? + .cast(&i64_dtype())?; + assert_eq!(value.as_primitive().typed_value::(), Some(*expected)); + } + None => assert!(scalar.is_null(), "expected null row {idx}"), + } + } + Ok(()) + } + + #[test] + fn expression_roundtrip_serialization() -> VortexResult<()> { + let expr: Expression = json_to_variant(root(), shred_field_as_i64("a")?); + let proto = expr.serialize_proto()?; + let actual = Expression::from_proto(&proto, &SESSION)?; + + assert_eq!(actual, expr); + Ok(()) + } + + #[test] + fn converts_utf8_json_rows() -> VortexResult<()> { + let input = + VarBinViewArray::from_iter_str([r#"{"a": 1}"#, "2", r#"{"a": 3}"#]).into_array(); + + let result = execute_json_to_variant(input, ShreddingSpec::empty())?; + + assert_eq!(result.dtype(), &DType::Variant(Nullability::NonNullable)); + let mut ctx = SESSION.create_execution_ctx(); + let row0 = result.execute_scalar(0, &mut ctx)?; + let object = row0 + .as_variant() + .value() + .ok_or_else(|| vortex_err!("expected non-null variant"))?; + let field = object + .as_struct() + .field("a") + .ok_or_else(|| vortex_err!("expected field a"))? + .as_variant() + .value() + .ok_or_else(|| vortex_err!("expected non-null field a"))? + .cast(&i64_dtype())?; + assert_eq!(field.as_primitive().typed_value::(), Some(1)); + + let row1 = result.execute_scalar(1, &mut ctx)?; + let value = row1 + .as_variant() + .value() + .ok_or_else(|| vortex_err!("expected non-null variant"))? + .cast(&i64_dtype())?; + assert_eq!(value.as_primitive().typed_value::(), Some(2)); + Ok(()) + } + + #[test] + fn converts_json_extension_input() -> VortexResult<()> { + let storage = VarBinViewArray::from_iter_str(["1", "2"]).into_array(); + let input = ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage)?.into_array(); + + let result = execute_json_to_variant(input, ShreddingSpec::empty())?; + + assert_eq!(result.dtype(), &DType::Variant(Nullability::NonNullable)); + assert_variant_i64_rows(&result, &[Some(1), Some(2)]) + } + + #[test] + fn null_rows_stay_null_and_json_null_becomes_variant_null() -> VortexResult<()> { + let input = + VarBinViewArray::from_iter_nullable_str([Some("1"), None, Some("null")]).into_array(); + + let result = execute_json_to_variant(input, ShreddingSpec::empty())?; + + assert_eq!(result.dtype(), &DType::Variant(Nullability::Nullable)); + let mut ctx = SESSION.create_execution_ctx(); + assert!(!result.execute_scalar(0, &mut ctx)?.is_null()); + assert_nth_scalar_is_null!(result, 1); + let row2 = result.execute_scalar(2, &mut ctx)?; + assert!(!row2.is_null(), "JSON null must not be a row null"); + assert_eq!(row2.as_variant().is_variant_null(), Some(true)); + Ok(()) + } + + #[test] + fn invalid_json_errors() { + let input = VarBinViewArray::from_iter_str([r#"{"a": 1}"#, r#"{"a":"#]).into_array(); + + let err = execute_json_to_variant(input, ShreddingSpec::empty()).unwrap_err(); + assert!(!err.to_string().is_empty()); + } + + #[test] + fn shredding_spec_rejects_index_paths() { + let err = ShreddingSpec::try_new([( + VariantPath::new([VariantPathElement::field("a"), VariantPathElement::index(0)]), + i64_dtype(), + )]) + .unwrap_err(); + + assert!( + err.to_string() + .contains("ShreddingSpec paths must only contain object fields"), + "unexpected error: {err}" + ); + } + + #[test] + fn shredding_produces_typed_value_child() -> VortexResult<()> { + let input = VarBinViewArray::from_iter_str([ + r#"{"a": 1, "b": "x"}"#, + r#"{"a": 2, "b": "y"}"#, + r#"{"a": "not-a-number", "b": "z"}"#, + r#"{"b": "missing-a"}"#, + ]) + .into_array(); + + let result = execute_json_to_variant(input, shred_field_as_i64("a")?)?; + + assert!( + result.as_::().typed_value_array().is_some(), + "expected shredded typed_value child" + ); + + // The canonical form must expose field `a` through the shredded tree. + let mut ctx = SESSION.create_execution_ctx(); + let Canonical::Variant(canonical) = result.clone().execute::(&mut ctx)? else { + vortex_bail!("expected canonical variant array"); + }; + let shredded = canonical + .shredded() + .ok_or_else(|| vortex_err!("expected canonical shredded child"))? + .clone() + .execute::(&mut ctx)?; + assert!(shredded.unmasked_field_by_name_opt("a").is_some()); + + // Typed extraction must serve shredded rows and fall back for mismatched rows. + let typed = result + .clone() + .apply(&variant_get( + root(), + VariantPath::field("a"), + Some(i64_dtype()), + ))? + .execute::(&mut ctx)?; + assert_arrays_eq!( + typed, + PrimitiveArray::from_option_iter([Some(1i64), Some(2), None, None]) + ); + + // Mismatched rows keep their original value through the variant fallback. + let untyped = result + .apply(&variant_get( + root(), + VariantPath::field("a"), + Some(DType::Utf8(Nullability::Nullable)), + ))? + .execute::(&mut ctx)?; + let row2 = untyped.execute_scalar(2, &mut ctx)?; + assert_eq!( + row2.as_utf8().value().map(|value| value.to_string()), + Some("not-a-number".to_string()) + ); + Ok(()) + } + + #[test] + fn shredding_preserves_null_rows() -> VortexResult<()> { + let input = VarBinViewArray::from_iter_nullable_str([ + Some(r#"{"a": 1}"#), + None, + Some(r#"{"a": 3}"#), + ]) + .into_array(); + + let result = execute_json_to_variant(input, shred_field_as_i64("a")?)?; + + assert_eq!(result.dtype(), &DType::Variant(Nullability::Nullable)); + assert_nth_scalar_is_null!(result, 1); + let mut ctx = SESSION.create_execution_ctx(); + let typed = result + .apply(&variant_get( + root(), + VariantPath::field("a"), + Some(i64_dtype()), + ))? + .execute::(&mut ctx)?; + assert_arrays_eq!( + typed, + PrimitiveArray::from_option_iter([Some(1i64), None, Some(3)]) + ); + Ok(()) + } + + #[test] + fn shredding_root_path_shreds_top_level_values() -> VortexResult<()> { + let input = VarBinViewArray::from_iter_str(["1", "2", r#""not-a-number""#]).into_array(); + let spec = ShreddingSpec::try_new([(VariantPath::root(), i64_dtype())])?; + + let result = execute_json_to_variant(input, spec)?; + + assert!( + result.as_::().typed_value_array().is_some(), + "expected shredded typed_value child" + ); + assert_variant_i64_rows(&result.slice(0..2)?, &[Some(1), Some(2)])?; + let mut ctx = SESSION.create_execution_ctx(); + let row2 = result.execute_scalar(2, &mut ctx)?; + let value = row2 + .as_variant() + .value() + .ok_or_else(|| vortex_err!("expected non-null variant"))?; + assert_eq!( + value.as_utf8().value().map(|value| value.to_string()), + Some("not-a-number".to_string()) + ); + Ok(()) + } +} diff --git a/encodings/parquet-variant/src/fns/mod.rs b/encodings/parquet-variant/src/fns/mod.rs new file mode 100644 index 00000000000..b1958a24000 --- /dev/null +++ b/encodings/parquet-variant/src/fns/mod.rs @@ -0,0 +1,40 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Scalar functions converting between JSON strings and Variant values. + +mod json_to_variant; +mod variant_to_json; + +pub use json_to_variant::JsonToVariant; +pub use json_to_variant::JsonToVariantOptions; +pub use json_to_variant::ShreddingSpec; +pub use variant_to_json::VariantToJson; +use vortex_array::expr::Expression; +use vortex_array::scalar_fn::EmptyOptions; +use vortex_array::scalar_fn::ScalarFnVTableExt; + +/// Creates a [`JsonToVariant`] expression that parses `child`'s JSON strings into Variant +/// values, shredding the paths selected by `shredding`. +/// +/// `child` must produce `Utf8` or [`Json`](vortex_json::Json) extension values; the result is +/// `Variant` with the input's nullability. Rows containing invalid JSON fail the expression. +/// +/// Note that this is NOT an inverse of [`variant_to_json()`]: both conversions normalize their +/// input. See [`JsonToVariant`] for the full list of caveats. +pub fn json_to_variant(child: Expression, shredding: ShreddingSpec) -> Expression { + JsonToVariant.new_expr(JsonToVariantOptions::new(shredding), [child]) +} + +/// Creates a [`VariantToJson`] expression that renders `child`'s Variant values as JSON +/// strings with the [`Json`](vortex_json::Json) extension dtype. +/// +/// Shredded inputs are unshredded before rendering, and the result keeps the input's +/// nullability. +/// +/// Note that this is NOT an inverse of [`json_to_variant()`]: both conversions normalize their +/// input, and Variant-only types (dates, timestamps, UUIDs, binary, decimals) are rendered as +/// plain JSON strings or numbers. See [`VariantToJson`] for the full list of caveats. +pub fn variant_to_json(child: Expression) -> Expression { + VariantToJson.new_expr(EmptyOptions, [child]) +} diff --git a/encodings/parquet-variant/src/fns/variant_to_json.rs b/encodings/parquet-variant/src/fns/variant_to_json.rs new file mode 100644 index 00000000000..2ed09239565 --- /dev/null +++ b/encodings/parquet-variant/src/fns/variant_to_json.rs @@ -0,0 +1,503 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! The `vortex.variant_to_json` scalar function. + +use std::fmt; +use std::fmt::Formatter; +use std::sync::Arc; + +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::Fields; +use vortex_array::ArrayRef; +use vortex_array::EmptyMetadata; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrow::FromArrowArray; +use vortex_array::dtype::DType; +use vortex_array::dtype::extension::ExtDType; +use vortex_array::expr::Expression; +use vortex_array::scalar_fn::Arity; +use vortex_array::scalar_fn::ChildName; +use vortex_array::scalar_fn::EmptyOptions; +use vortex_array::scalar_fn::ExecutionArgs; +use vortex_array::scalar_fn::ScalarFnId; +use vortex_array::scalar_fn::ScalarFnVTable; +use vortex_error::VortexResult; +use vortex_error::vortex_ensure; +use vortex_json::Json; +use vortex_session::VortexSession; +use vortex_session::registry::CachedId; + +use crate::ParquetVariant; +use crate::ParquetVariantArrayExt; +use crate::arrow::export_storage_to_target; +use crate::arrow::export_unshredded_storage_to_target; +use crate::arrow::parquet_variant_for_export; + +/// Renders Variant values as JSON strings with the [`Json`] extension dtype. +/// +/// Accepts `Variant` inputs backed by Parquet Variant storage, including shredded storage +/// (top-level, object, and nested fields), which is unshredded before rendering. Null rows stay +/// null, while variant-null values render as the JSON literal `null`. The output nullability +/// matches the input's. +/// +/// Inputs are exported through their Parquet Variant storage, so a `Variant` whose core storage +/// is not Parquet Variant-backed is not supported and fails. +/// +/// # Not an inverse of `json_to_variant` +/// +/// [`JsonToVariant`](crate::JsonToVariant) and `variant_to_json` are lossy, normalizing +/// conversions and are NOT inverses of each other: +/// - JSON whitespace is not preserved. +/// - Object keys may be reordered: Variant metadata stores keys in sorted order, so +/// `variant_to_json` emits fields in a canonical order, not source order. +/// - Number formatting and precision change: e.g. `1.0` may render as `1`, exponent forms and +/// very large numbers are re-rendered, and floating-point values are re-encoded. +/// - Duplicate object keys are collapsed to a single entry. +/// - Unicode escape sequences are normalized (e.g. `\u0041` becomes `A`). +/// - `variant_to_json` stringifies Variant-only types — date, timestamp, UUID, binary, +/// decimal — so `json_to_variant(variant_to_json(v))` yields plain strings/numbers and loses +/// the original type information. +/// - Shredding structure is not recoverable from JSON: `variant_to_json` unshreds its input +/// first, and re-parsing produces an unshredded Variant unless a new +/// [`ShreddingSpec`](crate::ShreddingSpec) is supplied. +#[derive(Clone)] +pub struct VariantToJson; + +impl ScalarFnVTable for VariantToJson { + type Options = EmptyOptions; + + fn id(&self) -> ScalarFnId { + static ID: CachedId = CachedId::new("vortex.variant_to_json"); + *ID + } + + fn serialize(&self, _options: &Self::Options) -> VortexResult>> { + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) + } + + fn arity(&self, _options: &Self::Options) -> Arity { + Arity::Exact(1) + } + + fn child_name(&self, _options: &Self::Options, child_idx: usize) -> ChildName { + match child_idx { + 0 => ChildName::from("input"), + _ => unreachable!("Invalid child index {child_idx} for VariantToJson expression"), + } + } + + fn fmt_sql( + &self, + _options: &Self::Options, + expr: &Expression, + f: &mut Formatter<'_>, + ) -> fmt::Result { + write!(f, "variant_to_json(")?; + expr.child(0).fmt_sql(f)?; + write!(f, ")") + } + + fn return_dtype(&self, _options: &Self::Options, arg_dtypes: &[DType]) -> VortexResult { + let input_dtype = &arg_dtypes[0]; + vortex_ensure!( + matches!(input_dtype, DType::Variant(_)), + "VariantToJson input must be Variant, found {input_dtype}" + ); + + let storage_dtype = DType::Utf8(input_dtype.nullability()); + Ok(DType::Extension( + ExtDType::::try_new(EmptyMetadata, storage_dtype)?.erased(), + )) + } + + fn execute( + &self, + _options: &Self::Options, + args: &dyn ExecutionArgs, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let input = args.get(0)?; + let input_nullable = input.dtype().is_nullable(); + + let parquet_array = parquet_variant_for_export(input, ctx)?; + let parquet_array = parquet_array.as_::(); + + // `parquet_variant_compute::variant_to_json` only accepts unshredded + // `STRUCT` storage, so request exactly that shape and + // unshred any typed values first. + let target_fields: Fields = vec![ + Arc::new(Field::new("metadata", DataType::Binary, false)), + Arc::new(Field::new("value", DataType::Binary, true)), + ] + .into(); + let arrow_storage = if parquet_array.typed_value_array().is_some() { + export_unshredded_storage_to_target(&parquet_array, &target_fields, ctx)? + } else { + export_storage_to_target(&parquet_array, &target_fields, ctx)? + }; + + let arrow_json = parquet_variant_compute::variant_to_json(&arrow_storage)?; + let storage = ArrayRef::from_arrow(&arrow_json, input_nullable)?; + + ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage).map(IntoArray::into_array) + } + + fn is_null_sensitive(&self, _options: &Self::Options) -> bool { + false + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use chrono::NaiveDate; + use parquet_variant::Variant as PqVariant; + use parquet_variant::VariantBuilder; + use parquet_variant_compute::VariantArrayBuilder; + use vortex_array::Canonical; + use vortex_array::VortexSessionExecute; + use vortex_array::accessor::ArrayAccessor; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::arrays::Variant; + use vortex_array::arrays::extension::ExtensionArrayExt; + use vortex_array::arrays::variant::VariantArrayExt; + use vortex_array::arrow::ArrowSession; + use vortex_array::dtype::Nullability; + use vortex_array::dtype::PType; + use vortex_array::dtype::session::DTypeSession; + use vortex_array::expr::root; + use vortex_array::scalar_fn::fns::variant_get::VariantPath; + use vortex_array::scalar_fn::fns::variant_get::VariantPathElement; + use vortex_array::scalar_fn::session::ScalarFnSession; + use vortex_array::session::ArraySession; + use vortex_array::validity::Validity; + use vortex_error::vortex_bail; + use vortex_error::vortex_err; + + use super::*; + use crate::ShreddingSpec; + use crate::fns::json_to_variant; + use crate::fns::variant_to_json; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty() + .with::() + .with::() + .with::() + .with::(); + crate::initialize(&session); + session + }); + + fn json_dtype(nullability: Nullability) -> VortexResult { + Ok(DType::Extension( + ExtDType::::try_new(EmptyMetadata, DType::Utf8(nullability))?.erased(), + )) + } + + fn json_strings(array: &ArrayRef) -> VortexResult>> { + let mut ctx = SESSION.create_execution_ctx(); + let ext = array.clone().execute::(&mut ctx)?; + let storage = ext + .storage_array() + .clone() + .execute::(&mut ctx)?; + Ok(storage.with_iterator(|iter| { + iter.map(|value| value.map(|bytes| String::from_utf8_lossy(bytes).into_owned())) + .collect() + })) + } + + fn unshredded_variant( + rows: impl IntoIterator>, + ) -> VortexResult { + let rows = rows.into_iter().collect::>(); + let mut builder = VariantArrayBuilder::new(rows.len()); + for row in rows { + builder.append_variant(row); + } + ParquetVariant::from_arrow_variant(&builder.build()) + } + + fn json_rows_to_variant( + rows: Vec>, + shredding: ShreddingSpec, + ) -> VortexResult { + let input = VarBinViewArray::from_iter_nullable_str(rows).into_array(); + input + .apply(&json_to_variant(root(), shredding))? + .execute::(&mut SESSION.create_execution_ctx()) + } + + #[test] + fn renders_unshredded_values() -> VortexResult<()> { + let input = unshredded_variant([ + PqVariant::from(42i32), + PqVariant::from("hello"), + PqVariant::from(true), + PqVariant::Null, + ])?; + + let result = input + .apply(&variant_to_json(root()))? + .execute::(&mut SESSION.create_execution_ctx())?; + + assert_eq!(result.dtype(), &json_dtype(Nullability::NonNullable)?); + assert_eq!( + json_strings(&result)?, + vec![ + Some("42".to_string()), + Some(r#""hello""#.to_string()), + Some("true".to_string()), + Some("null".to_string()), + ] + ); + Ok(()) + } + + #[test] + fn null_rows_stay_null_and_variant_null_renders_as_json_null() -> VortexResult<()> { + let input = + json_rows_to_variant(vec![Some("1"), None, Some("null")], ShreddingSpec::empty())?; + + let result = input + .apply(&variant_to_json(root()))? + .execute::(&mut SESSION.create_execution_ctx())?; + + assert_eq!(result.dtype(), &json_dtype(Nullability::Nullable)?); + assert_eq!( + json_strings(&result)?, + vec![Some("1".to_string()), None, Some("null".to_string())] + ); + Ok(()) + } + + fn typed_value_only_variant() -> VortexResult { + let rows = [ + VariantBuilder::new().with_value(10i32).finish(), + VariantBuilder::new().with_value(20i32).finish(), + VariantBuilder::new().with_value(30i32).finish(), + ]; + let metadata = + VarBinViewArray::from_iter_bin(rows.iter().map(|(metadata, _)| metadata.as_slice())) + .into_array(); + let typed_value = PrimitiveArray::from_iter([10i32, 20, 30]).into_array(); + Ok( + ParquetVariant::try_new(Validity::NonNullable, metadata, None, Some(typed_value))? + .into_array(), + ) + } + + #[test] + fn unshreds_typed_value_only_storage() -> VortexResult<()> { + let result = { + let input = typed_value_only_variant()?; + input + .apply(&variant_to_json(root()))? + .execute::(&mut SESSION.create_execution_ctx()) + }?; + + assert_eq!( + json_strings(&result)?, + vec![ + Some("10".to_string()), + Some("20".to_string()), + Some("30".to_string()), + ] + ); + Ok(()) + } + + #[test] + fn unshreds_partially_shredded_storage() -> VortexResult<()> { + let spec = ShreddingSpec::try_new([( + VariantPath::field("a"), + DType::Primitive(PType::I64, Nullability::Nullable), + )])?; + let input = json_rows_to_variant( + vec![ + Some(r#"{"a": 1, "b": "x"}"#), + Some(r#"{"a": "not-a-number", "b": "y"}"#), + Some(r#"{"b": "z"}"#), + ], + spec, + )?; + assert!( + input.as_::().typed_value_array().is_some(), + "fixture must be shredded" + ); + + let result = input + .apply(&variant_to_json(root()))? + .execute::(&mut SESSION.create_execution_ctx())?; + + assert_eq!( + json_strings(&result)?, + vec![ + Some(r#"{"a":1,"b":"x"}"#.to_string()), + Some(r#"{"a":"not-a-number","b":"y"}"#.to_string()), + Some(r#"{"b":"z"}"#.to_string()), + ] + ); + Ok(()) + } + + /// Shreds `rows` per `spec`, then canonicalizes so the typed values are lifted into a logical + /// shredded child (as a file read-back would produce). + fn canonical_shredded(rows: Vec>, spec: ShreddingSpec) -> VortexResult { + let shredded = json_rows_to_variant(rows, spec)?; + let Canonical::Variant(canonical) = + shredded.execute::(&mut SESSION.create_execution_ctx())? + else { + vortex_bail!("expected canonical variant"); + }; + Ok(canonical.into_array()) + } + + /// Rendering a canonicalized shredded variant must match rendering the same data unshredded: + /// the shredding is a storage optimization and is invisible to JSON output. + fn assert_canonical_matches_unshredded( + rows: Vec>, + spec: ShreddingSpec, + ) -> VortexResult<()> { + let unshredded = json_rows_to_variant(rows.clone(), ShreddingSpec::empty())?; + let want = json_strings( + &unshredded + .apply(&variant_to_json(root()))? + .execute::(&mut SESSION.create_execution_ctx())?, + )?; + + let canonical = canonical_shredded(rows, spec)?; + assert!( + canonical.as_::().shredded().is_some(), + "fixture must carry a canonical shredded child" + ); + let got = json_strings( + &canonical + .apply(&variant_to_json(root()))? + .execute::(&mut SESSION.create_execution_ctx())?, + )?; + + assert_eq!(got, want); + Ok(()) + } + + /// Regression for the canonicalization bug: object-field shredding lifted into a logical + /// shredded child must round-trip back to the full object, including the shredded field and + /// the non-conforming/missing-field fallbacks. + #[test] + fn renders_canonical_object_shredded_variant() -> VortexResult<()> { + let spec = ShreddingSpec::try_new([( + VariantPath::field("a"), + DType::Primitive(PType::I64, Nullability::Nullable), + )])?; + let canonical = canonical_shredded( + vec![ + Some(r#"{"a": 1, "b": "x"}"#), + Some(r#"{"a": "not-a-number", "b": "y"}"#), + Some(r#"{"b": "z"}"#), + ], + spec, + )?; + assert!( + canonical.as_::().shredded().is_some(), + "fixture must carry a canonical shredded child" + ); + + let result = canonical + .apply(&variant_to_json(root()))? + .execute::(&mut SESSION.create_execution_ctx())?; + + assert_eq!( + json_strings(&result)?, + vec![ + Some(r#"{"a":1,"b":"x"}"#.to_string()), + Some(r#"{"a":"not-a-number","b":"y"}"#.to_string()), + Some(r#"{"b":"z"}"#.to_string()), + ] + ); + Ok(()) + } + + #[test] + fn canonical_object_shredding_matches_unshredded() -> VortexResult<()> { + assert_canonical_matches_unshredded( + vec![ + Some(r#"{"a": 1, "b": "x", "c": 3}"#), + Some(r#"{"a": 2, "b": "y", "c": 4}"#), + None, + ], + ShreddingSpec::try_new([ + ( + VariantPath::field("a"), + DType::Primitive(PType::I64, Nullability::Nullable), + ), + ( + VariantPath::field("c"), + DType::Primitive(PType::I64, Nullability::Nullable), + ), + ])?, + ) + } + + #[test] + fn canonical_nested_dotted_shredding_matches_unshredded() -> VortexResult<()> { + assert_canonical_matches_unshredded( + vec![ + Some(r#"{"a": {"b": 100}, "c": "keep"}"#), + Some(r#"{"a": {"b": 200}, "c": "keep2"}"#), + ], + ShreddingSpec::try_new([( + VariantPath::new([ + VariantPathElement::field("a"), + VariantPathElement::field("b"), + ]), + DType::Primitive(PType::I64, Nullability::Nullable), + )])?, + ) + } + + #[test] + fn variant_only_types_are_stringified_so_reparsing_loses_types() -> VortexResult<()> { + let date = + NaiveDate::from_ymd_opt(2026, 6, 11).ok_or_else(|| vortex_err!("invalid test date"))?; + let input = unshredded_variant([PqVariant::from(date)])?; + + let rendered = input + .apply(&variant_to_json(root()))? + .execute::(&mut SESSION.create_execution_ctx())?; + let json = json_strings(&rendered)?; + assert_eq!(json, vec![Some(r#""2026-06-11""#.to_string())]); + + // Re-parsing the rendered JSON yields a plain string variant, not a date: the type + // information is lost, demonstrating that the conversions are not inverses. + let reparsed = rendered + .apply(&json_to_variant(root(), ShreddingSpec::empty()))? + .execute::(&mut SESSION.create_execution_ctx())?; + let mut ctx = SESSION.create_execution_ctx(); + let row0 = reparsed.execute_scalar(0, &mut ctx)?; + let value = row0 + .as_variant() + .value() + .ok_or_else(|| vortex_err!("expected non-null variant"))?; + assert_eq!( + value.as_utf8().value().map(|value| value.to_string()), + Some("2026-06-11".to_string()) + ); + Ok(()) + } +} diff --git a/encodings/parquet-variant/src/kernel.rs b/encodings/parquet-variant/src/kernel.rs index a136b483678..8b8e298af4c 100644 --- a/encodings/parquet-variant/src/kernel.rs +++ b/encodings/parquet-variant/src/kernel.rs @@ -87,7 +87,7 @@ impl ExecuteParentKernel for VariantGetKernel { } } -fn to_parquet_variant_path(path: &VariantPath) -> VortexResult> { +pub(crate) fn to_parquet_variant_path(path: &VariantPath) -> VortexResult> { path.elements() .iter() .map(|element| match element { diff --git a/encodings/parquet-variant/src/lib.rs b/encodings/parquet-variant/src/lib.rs index 03d2a046442..3020db32440 100644 --- a/encodings/parquet-variant/src/lib.rs +++ b/encodings/parquet-variant/src/lib.rs @@ -26,6 +26,7 @@ mod array; mod arrow; +mod fns; mod kernel; mod operations; mod validity; @@ -34,15 +35,29 @@ mod vtable; use std::sync::Arc; pub use array::ParquetVariantArrayExt; +pub use fns::JsonToVariant; +pub use fns::JsonToVariantOptions; +pub use fns::ShreddingSpec; +pub use fns::VariantToJson; +pub use fns::json_to_variant; +pub use fns::variant_to_json; use vortex_array::arrow::ArrowSessionExt; +use vortex_array::scalar_fn::session::ScalarFnSessionExt; use vortex_array::session::ArraySessionExt; use vortex_session::VortexSession; pub use vtable::ParquetVariant; pub use vtable::ParquetVariantArray; -/// Register Parquet Variant array and Arrow extension support with a session. +/// Register Parquet Variant array, Arrow extension, and scalar function support with a +/// session. +/// +/// This also registers the [`Json`](vortex_json::Json) extension dtype so that sessions able +/// to execute [`VariantToJson`] can serialize and deserialize its output dtype. pub fn initialize(session: &VortexSession) { session.arrays().register(ParquetVariant); session.arrow().register_exporter(Arc::new(ParquetVariant)); session.arrow().register_importer(Arc::new(ParquetVariant)); + vortex_json::initialize(session); + session.scalar_fns().register(JsonToVariant); + session.scalar_fns().register(VariantToJson); } diff --git a/vortex-array/src/scalar_fn/fns/variant_get/mod.rs b/vortex-array/src/scalar_fn/fns/variant_get/mod.rs index ac6aa562aa6..13e283204bb 100644 --- a/vortex-array/src/scalar_fn/fns/variant_get/mod.rs +++ b/vortex-array/src/scalar_fn/fns/variant_get/mod.rs @@ -55,16 +55,7 @@ impl ScalarFnVTable for VariantGet { .path() .elements() .iter() - .map(|element| match element { - VariantPathElement::Field(name) => pb::VariantPathElement { - element: Some(variant_path_element::Element::Field( - name.as_ref().to_string(), - )), - }, - VariantPathElement::Index(index) => pb::VariantPathElement { - element: Some(variant_path_element::Element::Index(*index)), - }, - }) + .map(VariantPathElement::to_proto) .collect(); let dtype = options.dtype().map(TryInto::try_into).transpose()?; @@ -355,15 +346,30 @@ impl VariantPathElement { Self::Index(index) } - fn from_proto(value: pb::VariantPathElement) -> VortexResult { + /// Decodes a path element from its protobuf representation. + pub fn from_proto(value: pb::VariantPathElement) -> VortexResult { match value .element - .ok_or_else(|| vortex_err!("VariantGet path element missing value"))? + .ok_or_else(|| vortex_err!("Variant path element missing value"))? { variant_path_element::Element::Field(field) => Ok(Self::field(field)), variant_path_element::Element::Index(index) => Ok(Self::index(index)), } } + + /// Encodes this path element into its protobuf representation. + pub fn to_proto(&self) -> pb::VariantPathElement { + match self { + VariantPathElement::Field(name) => pb::VariantPathElement { + element: Some(variant_path_element::Element::Field( + name.as_ref().to_string(), + )), + }, + VariantPathElement::Index(index) => pb::VariantPathElement { + element: Some(variant_path_element::Element::Index(*index)), + }, + } + } } impl From for VariantPathElement { diff --git a/vortex-proto/proto/expr.proto b/vortex-proto/proto/expr.proto index 3263352cc07..76fd3dbc4a7 100644 --- a/vortex-proto/proto/expr.proto +++ b/vortex-proto/proto/expr.proto @@ -55,6 +55,17 @@ message VariantPathElement { } } +// Options for `vortex.json_to_variant` +message JsonToVariantOpts { + repeated ShreddingSpecField shredding = 1; +} + +// One (path, dtype) shredding directive for `vortex.json_to_variant`. +message ShreddingSpecField { + repeated VariantPathElement path = 1; + vortex.dtype.DType dtype = 2; +} + // Options for `vortex.binary` message BinaryOpts { BinaryOp op = 1; diff --git a/vortex-proto/src/generated/vortex.expr.rs b/vortex-proto/src/generated/vortex.expr.rs index c153ce5433b..a93a8ac91ca 100644 --- a/vortex-proto/src/generated/vortex.expr.rs +++ b/vortex-proto/src/generated/vortex.expr.rs @@ -62,6 +62,20 @@ pub mod variant_path_element { Index(u64), } } +/// Options for `vortex.json_to_variant` +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct JsonToVariantOpts { + #[prost(message, repeated, tag = "1")] + pub shredding: ::prost::alloc::vec::Vec, +} +/// One (path, dtype) shredding directive for `vortex.json_to_variant`. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ShreddingSpecField { + #[prost(message, repeated, tag = "1")] + pub path: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "2")] + pub dtype: ::core::option::Option, +} /// Options for `vortex.binary` #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct BinaryOpts {