From d8ccc0070aa246bf8487c5b1f62de00582491881 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 19 Feb 2026 14:14:15 -0800 Subject: [PATCH 1/3] support `arrays_zip` function --- datafusion/functions-nested/src/lib.rs | 2 + datafusion/functions-nested/src/set_ops.rs | 262 ++++++++++++++++++- datafusion/sqllogictest/test_files/array.slt | 146 +++++++++++ 3 files changed, 407 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-nested/src/lib.rs b/datafusion/functions-nested/src/lib.rs index 9ac6911236e40..c92c99486e8bc 100644 --- a/datafusion/functions-nested/src/lib.rs +++ b/datafusion/functions-nested/src/lib.rs @@ -118,6 +118,7 @@ pub mod expr_fn { pub use super::set_ops::array_distinct; pub use super::set_ops::array_intersect; pub use super::set_ops::array_union; + pub use super::set_ops::arrays_zip; pub use super::sort::array_sort; pub use super::string::array_to_string; pub use super::string::string_to_array; @@ -159,6 +160,7 @@ pub fn all_default_nested_functions() -> Vec> { set_ops::array_distinct_udf(), set_ops::array_intersect_udf(), set_ops::array_union_udf(), + set_ops::arrays_zip_udf(), position::array_position_udf(), position::array_positions_udf(), remove::array_remove_udf(), diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 2348b3c530c53..5fea722791a54 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -15,15 +15,16 @@ // specific language governing permissions and limitations // under the License. -//! [`ScalarUDFImpl`] definitions for array_union, array_intersect and array_distinct functions. +//! [`ScalarUDFImpl`] definitions for array_union, array_intersect, array_distinct and arrays_zip functions. use crate::utils::make_scalar_function; use arrow::array::{ - Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_empty_array, new_null_array, + Array, ArrayRef, Capacities, GenericListArray, ListArray, MutableArrayData, + OffsetSizeTrait, StructArray, new_empty_array, new_null_array, }; use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::datatypes::DataType::{LargeList, List, Null}; -use arrow::datatypes::{DataType, Field, FieldRef}; +use arrow::datatypes::{DataType, Field, FieldRef, Fields}; use arrow::row::{RowConverter, SortField}; use datafusion_common::cast::{as_large_list_array, as_list_array}; use datafusion_common::utils::ListCoercion; @@ -64,6 +65,13 @@ make_udf_expr_and_func!( array_distinct_udf ); +make_udf_expr_and_func!( + ArraysZip, + arrays_zip, + "combines multiple arrays into a single array of structs.", + arrays_zip_udf +); + #[user_doc( doc_section(label = "Array Functions"), description = "Returns an array of elements that are present in both arrays (all elements from both arrays) without duplicates.", @@ -582,6 +590,254 @@ fn general_array_distinct( )?)) } +#[user_doc( + doc_section(label = "Array Functions"), + description = "Returns an array of structs created by combining the elements of each input array at the same index. If the arrays have different lengths, shorter arrays are padded with NULLs.", + syntax_example = "arrays_zip(array1, array2[, ..., array_n])", + sql_example = r#"```sql +> select arrays_zip([1, 2, 3], ['a', 'b', 'c']); ++---------------------------------------------------+ +| arrays_zip([1, 2, 3], ['a', 'b', 'c']) | ++---------------------------------------------------+ +| [{c0: 1, c1: a}, {c0: 2, c1: b}, {c0: 3, c1: c}] | ++---------------------------------------------------+ +> select arrays_zip([1, 2], [3, 4, 5]); ++---------------------------------------------------+ +| arrays_zip([1, 2], [3, 4, 5]) | ++---------------------------------------------------+ +| [{c0: 1, c1: 3}, {c0: 2, c1: 4}, {c0: , c1: 5}] | ++---------------------------------------------------+ +```"#, + argument(name = "array1", description = "First array expression."), + argument(name = "array2", description = "Second array expression."), + argument(name = "array_n", description = "Subsequent array expressions.") +)] +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct ArraysZip { + signature: Signature, + aliases: Vec, +} + +impl Default for ArraysZip { + fn default() -> Self { + Self::new() + } +} + +impl ArraysZip { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + aliases: vec![String::from("list_zip")], + } + } +} + +impl ScalarUDFImpl for ArraysZip { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "arrays_zip" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + if arg_types.is_empty() { + return exec_err!("arrays_zip requires at least two arguments"); + } + + let mut fields = Vec::with_capacity(arg_types.len()); + for (i, arg_type) in arg_types.iter().enumerate() { + let element_type = match arg_type { + List(field) | LargeList(field) => field.data_type().clone(), + Null => Null, + dt => { + return exec_err!("arrays_zip expects array arguments, got {dt}"); + } + }; + fields.push(Field::new(format!("c{i}"), element_type, true)); + } + + Ok(List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(fields)), + true, + )))) + } + + fn invoke_with_args( + &self, + args: datafusion_expr::ScalarFunctionArgs, + ) -> Result { + make_scalar_function(arrays_zip_inner)(&args.args) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} + +/// Core implementation for arrays_zip. +/// +/// Takes N list arrays and produces a list of structs where each struct +/// has one field per input array. If arrays within a row have different +/// lengths, shorter arrays are padded with NULLs. +fn arrays_zip_inner(args: &[ArrayRef]) -> Result { + if args.len() < 2 { + return exec_err!("arrays_zip requires at least two arguments"); + } + + let num_rows = args[0].len(); + + // Extract element type from each list argument + let mut list_arrays: Vec> = Vec::with_capacity(args.len()); + let mut element_types: Vec = Vec::with_capacity(args.len()); + + for (i, arg) in args.iter().enumerate() { + match arg.data_type() { + List(field) => { + let list_arr = as_list_array(arg)?; + element_types.push(field.data_type().clone()); + list_arrays.push(Some(list_arr)); + } + Null => { + element_types.push(Null); + list_arrays.push(None); + } + dt => { + return exec_err!("arrays_zip argument {i} expected list type, got {dt}"); + } + } + } + + // Collect per-column values data for MutableArrayData builders. + let values_data: Vec<_> = list_arrays + .iter() + .map(|la| la.map(|arr| arr.values().to_data())) + .collect(); + + let struct_fields: Fields = element_types + .iter() + .enumerate() + .map(|(i, dt)| Field::new(format!("c{i}"), dt.clone(), true)) + .collect::>() + .into(); + + // Create a MutableArrayData builder per column. For None (Null-typed) + // args we only need extend_nulls, so we track them separately. + let mut builders: Vec> = values_data + .iter() + .map(|vd| { + vd.as_ref().map(|data| { + MutableArrayData::with_capacities(vec![data], true, Capacities::Array(0)) + }) + }) + .collect(); + + let mut offsets: Vec = Vec::with_capacity(num_rows + 1); + offsets.push(0); + let mut null_mask: Vec = Vec::with_capacity(num_rows); + let mut total_values: usize = 0; + + // Process each row: compute per-array lengths, detect if aligned, + // then either copy values directly or pad shorter arrays with NULLs. + for row_idx in 0..num_rows { + // Compute per-array length for this row. + let mut max_len: usize = 0; + let mut all_null = true; + + for la in list_arrays.iter().flatten() { + if !la.is_null(row_idx) { + all_null = false; + let len = (la.value_offsets()[row_idx + 1] - la.value_offsets()[row_idx]) + as usize; + max_len = max_len.max(len); + } + } + + if all_null { + null_mask.push(true); + offsets.push(*offsets.last().unwrap()); + continue; + } + null_mask.push(false); + + // Extend each column builder for this row. + for (col_idx, la) in list_arrays.iter().enumerate() { + match la { + Some(arr) if !arr.is_null(row_idx) => { + let start = arr.value_offsets()[row_idx] as usize; + let end = arr.value_offsets()[row_idx + 1] as usize; + let len = end - start; + let builder = builders[col_idx].as_mut().unwrap(); + builder.extend(0, start, end); + if len < max_len { + builder.extend_nulls(max_len - len); + } + } + _ => { + // Null list entry or None (Null-typed) arg — all nulls. + if let Some(builder) = builders[col_idx].as_mut() { + builder.extend_nulls(max_len); + } + // None columns are handled after the loop. + } + } + } + + total_values += max_len; + let last = *offsets.last().unwrap(); + offsets.push(last + max_len as i32); + } + + // Assemble struct columns from builders. + let struct_columns: Vec = builders + .into_iter() + .zip(element_types.iter()) + .map(|(builder, elem_type)| match builder { + Some(b) => arrow::array::make_array(b.freeze()), + None => new_null_array( + if elem_type.is_null() { + &Null + } else { + elem_type + }, + total_values, + ), + }) + .collect(); + + let struct_array = StructArray::try_new(struct_fields, struct_columns, None)?; + + let null_buffer = if null_mask.iter().any(|&v| v) { + Some(NullBuffer::from( + null_mask.iter().map(|v| !v).collect::>(), + )) + } else { + None + }; + + let result = ListArray::try_new( + Arc::new(Field::new_list_field( + struct_array.data_type().clone(), + true, + )), + OffsetBuffer::new(offsets.into()), + Arc::new(struct_array), + null_buffer, + )?; + + Ok(Arc::new(result)) +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index f675763120718..c7365de2cacbb 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -6866,6 +6866,152 @@ from array_distinct_table_2D_fixed; [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] [[5, 6], NULL] +## arrays_zip (aliases: `list_zip`) + +# Spark example: arrays_zip(array(1, 2, 3), array(2, 3, 4)) +query ? +select arrays_zip([1, 2, 3], [2, 3, 4]); +---- +[{c0: 1, c1: 2}, {c0: 2, c1: 3}, {c0: 3, c1: 4}] + +# Spark example: arrays_zip(array(1, 2), array(2, 3), array(3, 4)) +query ? +select arrays_zip([1, 2], [2, 3], [3, 4]); +---- +[{c0: 1, c1: 2, c2: 3}, {c0: 2, c1: 3, c2: 4}] + +# basic: two integer arrays of equal length +query ? +select arrays_zip([1, 2, 3], [10, 20, 30]); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}, {c0: 3, c1: 30}] + +# basic: two arrays with different element types (int + string) +query ? +select arrays_zip([1, 2, 3], ['a', 'b', 'c']); +---- +[{c0: 1, c1: a}, {c0: 2, c1: b}, {c0: 3, c1: c}] + +# three arrays of equal length +query ? +select arrays_zip([1, 2, 3], [10, 20, 30], [100, 200, 300]); +---- +[{c0: 1, c1: 10, c2: 100}, {c0: 2, c1: 20, c2: 200}, {c0: 3, c1: 30, c2: 300}] + +# four arrays of equal length +query ? +select arrays_zip([1], [2], [3], [4]); +---- +[{c0: 1, c1: 2, c2: 3, c3: 4}] + +# mixed element types: float + boolean +query ? +select arrays_zip([1.5, 2.5], [true, false]); +---- +[{c0: 1.5, c1: true}, {c0: 2.5, c1: false}] + +# different length arrays: shorter array padded with NULLs +query ? +select arrays_zip([1, 2], [3, 4, 5]); +---- +[{c0: 1, c1: 3}, {c0: 2, c1: 4}, {c0: NULL, c1: 5}] + +# different length arrays: first longer +query ? +select arrays_zip([1, 2, 3], [10]); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: NULL}, {c0: 3, c1: NULL}] + +# different length: one single element, other three elements +query ? +select arrays_zip([1], ['a', 'b', 'c']); +---- +[{c0: 1, c1: a}, {c0: NULL, c1: b}, {c0: NULL, c1: c}] + +# empty arrays +query ? +select arrays_zip([], []); +---- +[] + +# one empty, one non-empty +query ? +select arrays_zip([], [1, 2, 3]); +---- +[{c0: NULL, c1: 1}, {c0: NULL, c1: 2}, {c0: NULL, c1: 3}] + +# NULL elements inside arrays +query ? +select arrays_zip([1, NULL, 3], ['a', 'b', 'c']); +---- +[{c0: 1, c1: a}, {c0: NULL, c1: b}, {c0: 3, c1: c}] + +# all NULL elements +query ? +select arrays_zip([NULL::int, NULL, NULL], [NULL::text, NULL, NULL]); +---- +[{c0: NULL, c1: NULL}, {c0: NULL, c1: NULL}, {c0: NULL, c1: NULL}] + +# both args are NULL (entire list null) +query ? +select arrays_zip(NULL::int[], NULL::int[]); +---- +NULL + +# one arg is NULL list, other is real array +query ? +select arrays_zip(NULL::int[], [1, 2, 3]); +---- +[{c0: NULL, c1: 1}, {c0: NULL, c1: 2}, {c0: NULL, c1: 3}] + +# real array + NULL list +query ? +select arrays_zip([1, 2], NULL::text[]); +---- +[{c0: 1, c1: NULL}, {c0: 2, c1: NULL}] + +# column-level test with multiple rows +query ? +select arrays_zip(a, b) from (values ([1, 2], [10, 20]), ([3, 4, 5], [30]), ([6], [60, 70])) as t(a, b); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}] +[{c0: 3, c1: 30}, {c0: 4, c1: NULL}, {c0: 5, c1: NULL}] +[{c0: 6, c1: 60}, {c0: NULL, c1: 70}] + +# column-level test with NULL rows +query ? +select arrays_zip(a, b) from (values ([1, 2], [10, 20]), (null, [30, 40]), ([5, 6], null)) as t(a, b); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}] +[{c0: NULL, c1: 30}, {c0: NULL, c1: 40}] +[{c0: 5, c1: NULL}, {c0: 6, c1: NULL}] + +# alias: list_zip +query ? +select list_zip([1, 2], [3, 4]); +---- +[{c0: 1, c1: 3}, {c0: 2, c1: 4}] + +# column test: total values equal (3 each) but per-row lengths differ +# a: [1] b: [10, 20] → row 0: a has 1, b has 2 +# a: [2, 3] b: [30] → row 1: a has 2, b has 1 +# total a values = 3, total b values = 3 (same!) but rows are misaligned +query ? +select arrays_zip(a, b) from (values ([1], [10, 20]), ([2, 3], [30])) as t(a, b); +---- +[{c0: 1, c1: 10}, {c0: NULL, c1: 20}] +[{c0: 2, c1: 30}, {c0: 3, c1: NULL}] + +# single element arrays +query ? +select arrays_zip([42], ['hello']); +---- +[{c0: 42, c1: hello}] + +# error: too few arguments +statement error +select arrays_zip([1, 2, 3]); + query ??? select array_intersect(column1, column2), array_intersect(column3, column4), From fb1b5f23ff54d2a8314d1ed40a4b19ff02707173 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 19 Feb 2026 15:11:09 -0800 Subject: [PATCH 2/3] support `arrays_zip` function --- .../source/user-guide/sql/scalar_functions.md | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 02b4b55fe68ee..ba6f7ef2dc9c9 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -3183,6 +3183,7 @@ _Alias of [current_date](#current_date)._ - [array_to_string](#array_to_string) - [array_union](#array_union) - [arrays_overlap](#arrays_overlap) +- [arrays_zip](#arrays_zip) - [cardinality](#cardinality) - [empty](#empty) - [flatten](#flatten) @@ -3228,6 +3229,7 @@ _Alias of [current_date](#current_date)._ - [list_sort](#list_sort) - [list_to_string](#list_to_string) - [list_union](#list_union) +- [list_zip](#list_zip) - [make_array](#make_array) - [make_list](#make_list) - [range](#range) @@ -4267,6 +4269,41 @@ array_union(array1, array2) _Alias of [array_has_any](#array_has_any)._ +### `arrays_zip` + +Returns an array of structs created by combining the elements of each input array at the same index. If the arrays have different lengths, shorter arrays are padded with NULLs. + +```sql +arrays_zip(array1, array2[, ..., array_n]) +``` + +#### Arguments + +- **array1**: First array expression. +- **array2**: Second array expression. +- **array_n**: Subsequent array expressions. + +#### Example + +```sql +> select arrays_zip([1, 2, 3], ['a', 'b', 'c']); ++---------------------------------------------------+ +| arrays_zip([1, 2, 3], ['a', 'b', 'c']) | ++---------------------------------------------------+ +| [{c0: 1, c1: a}, {c0: 2, c1: b}, {c0: 3, c1: c}] | ++---------------------------------------------------+ +> select arrays_zip([1, 2], [3, 4, 5]); ++---------------------------------------------------+ +| arrays_zip([1, 2], [3, 4, 5]) | ++---------------------------------------------------+ +| [{c0: 1, c1: 3}, {c0: 2, c1: 4}, {c0: , c1: 5}] | ++---------------------------------------------------+ +``` + +#### Aliases + +- list_zip + ### `cardinality` Returns the total number of elements in the array. @@ -4536,6 +4573,10 @@ _Alias of [array_to_string](#array_to_string)._ _Alias of [array_union](#array_union)._ +### `list_zip` + +_Alias of [arrays_zip](#arrays_zip)._ + ### `make_array` Returns an array using the specified input expressions. From c16e54b436ce71bcae04c434d32a38bfbfeb23e5 Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 20 Feb 2026 11:06:22 -0800 Subject: [PATCH 3/3] support `arrays_zip` function --- datafusion/functions-nested/src/arrays_zip.rs | 336 ++++++++++++++++++ datafusion/functions-nested/src/lib.rs | 5 +- datafusion/functions-nested/src/set_ops.rs | 262 +------------- datafusion/sqllogictest/test_files/array.slt | 54 +++ 4 files changed, 396 insertions(+), 261 deletions(-) create mode 100644 datafusion/functions-nested/src/arrays_zip.rs diff --git a/datafusion/functions-nested/src/arrays_zip.rs b/datafusion/functions-nested/src/arrays_zip.rs new file mode 100644 index 0000000000000..2ac30d07046e2 --- /dev/null +++ b/datafusion/functions-nested/src/arrays_zip.rs @@ -0,0 +1,336 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ScalarUDFImpl`] definitions for arrays_zip function. + +use crate::utils::make_scalar_function; +use arrow::array::{ + Array, ArrayRef, Capacities, ListArray, MutableArrayData, StructArray, new_null_array, +}; +use arrow::buffer::{NullBuffer, OffsetBuffer}; +use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null}; +use arrow::datatypes::{DataType, Field, Fields}; +use datafusion_common::cast::{ + as_fixed_size_list_array, as_large_list_array, as_list_array, +}; +use datafusion_common::{Result, exec_err}; +use datafusion_expr::{ + ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion_macros::user_doc; +use std::any::Any; +use std::sync::Arc; + +/// Type-erased view of a list column (works for both List and LargeList). +/// Stores the information needed to iterate rows without re-downcasting. +struct ListColumnView { + /// The flat values array backing this list column. + values: ArrayRef, + /// Pre-computed per-row start offsets (length = num_rows + 1). + offsets: Vec, + /// Pre-computed null bitmap: true means the row is null. + is_null: Vec, +} + +make_udf_expr_and_func!( + ArraysZip, + arrays_zip, + "combines multiple arrays into a single array of structs.", + arrays_zip_udf +); + +#[user_doc( + doc_section(label = "Array Functions"), + description = "Returns an array of structs created by combining the elements of each input array at the same index. If the arrays have different lengths, shorter arrays are padded with NULLs.", + syntax_example = "arrays_zip(array1, array2[, ..., array_n])", + sql_example = r#"```sql +> select arrays_zip([1, 2, 3], ['a', 'b', 'c']); ++---------------------------------------------------+ +| arrays_zip([1, 2, 3], ['a', 'b', 'c']) | ++---------------------------------------------------+ +| [{c0: 1, c1: a}, {c0: 2, c1: b}, {c0: 3, c1: c}] | ++---------------------------------------------------+ +> select arrays_zip([1, 2], [3, 4, 5]); ++---------------------------------------------------+ +| arrays_zip([1, 2], [3, 4, 5]) | ++---------------------------------------------------+ +| [{c0: 1, c1: 3}, {c0: 2, c1: 4}, {c0: , c1: 5}] | ++---------------------------------------------------+ +```"#, + argument(name = "array1", description = "First array expression."), + argument(name = "array2", description = "Second array expression."), + argument(name = "array_n", description = "Subsequent array expressions.") +)] +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct ArraysZip { + signature: Signature, + aliases: Vec, +} + +impl Default for ArraysZip { + fn default() -> Self { + Self::new() + } +} + +impl ArraysZip { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + aliases: vec![String::from("list_zip")], + } + } +} + +impl ScalarUDFImpl for ArraysZip { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "arrays_zip" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + if arg_types.is_empty() { + return exec_err!("arrays_zip requires at least two arguments"); + } + + let mut fields = Vec::with_capacity(arg_types.len()); + for (i, arg_type) in arg_types.iter().enumerate() { + let element_type = match arg_type { + List(field) | LargeList(field) | FixedSizeList(field, _) => { + field.data_type().clone() + } + Null => Null, + dt => { + return exec_err!("arrays_zip expects array arguments, got {dt}"); + } + }; + fields.push(Field::new(format!("c{i}"), element_type, true)); + } + + Ok(List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(fields)), + true, + )))) + } + + fn invoke_with_args( + &self, + args: datafusion_expr::ScalarFunctionArgs, + ) -> Result { + make_scalar_function(arrays_zip_inner)(&args.args) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} + +/// Core implementation for arrays_zip. +/// +/// Takes N list arrays and produces a list of structs where each struct +/// has one field per input array. If arrays within a row have different +/// lengths, shorter arrays are padded with NULLs. +/// Supports List, LargeList, and Null input types. +fn arrays_zip_inner(args: &[ArrayRef]) -> Result { + if args.len() < 2 { + return exec_err!("arrays_zip requires at least two arguments"); + } + + let num_rows = args[0].len(); + + // Build a type-erased ListColumnView for each argument. + // None means the argument is Null-typed (all nulls, no backing data). + let mut views: Vec> = Vec::with_capacity(args.len()); + let mut element_types: Vec = Vec::with_capacity(args.len()); + + for (i, arg) in args.iter().enumerate() { + match arg.data_type() { + List(field) => { + let arr = as_list_array(arg)?; + let raw_offsets = arr.value_offsets(); + let offsets: Vec = + raw_offsets.iter().map(|&o| o as usize).collect(); + let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect(); + element_types.push(field.data_type().clone()); + views.push(Some(ListColumnView { + values: Arc::clone(arr.values()), + offsets, + is_null, + })); + } + LargeList(field) => { + let arr = as_large_list_array(arg)?; + let raw_offsets = arr.value_offsets(); + let offsets: Vec = + raw_offsets.iter().map(|&o| o as usize).collect(); + let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect(); + element_types.push(field.data_type().clone()); + views.push(Some(ListColumnView { + values: Arc::clone(arr.values()), + offsets, + is_null, + })); + } + FixedSizeList(field, size) => { + let arr = as_fixed_size_list_array(arg)?; + let size = *size as usize; + let offsets: Vec = (0..=num_rows).map(|row| row * size).collect(); + let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect(); + element_types.push(field.data_type().clone()); + views.push(Some(ListColumnView { + values: Arc::clone(arr.values()), + offsets, + is_null, + })); + } + Null => { + element_types.push(Null); + views.push(None); + } + dt => { + return exec_err!("arrays_zip argument {i} expected list type, got {dt}"); + } + } + } + + // Collect per-column values data for MutableArrayData builders. + let values_data: Vec<_> = views + .iter() + .map(|v| v.as_ref().map(|view| view.values.to_data())) + .collect(); + + let struct_fields: Fields = element_types + .iter() + .enumerate() + .map(|(i, dt)| Field::new(format!("c{i}"), dt.clone(), true)) + .collect::>() + .into(); + + // Create a MutableArrayData builder per column. For None (Null-typed) + // args we only need extend_nulls, so we track them separately. + let mut builders: Vec> = values_data + .iter() + .map(|vd| { + vd.as_ref().map(|data| { + MutableArrayData::with_capacities(vec![data], true, Capacities::Array(0)) + }) + }) + .collect(); + + let mut offsets: Vec = Vec::with_capacity(num_rows + 1); + offsets.push(0); + let mut null_mask: Vec = Vec::with_capacity(num_rows); + let mut total_values: usize = 0; + + // Process each row: compute per-array lengths, then copy values + // and pad shorter arrays with NULLs. + for row_idx in 0..num_rows { + let mut max_len: usize = 0; + let mut all_null = true; + + for view in views.iter().flatten() { + if !view.is_null[row_idx] { + all_null = false; + let len = view.offsets[row_idx + 1] - view.offsets[row_idx]; + max_len = max_len.max(len); + } + } + + if all_null { + null_mask.push(true); + offsets.push(*offsets.last().unwrap()); + continue; + } + null_mask.push(false); + + // Extend each column builder for this row. + for (col_idx, view) in views.iter().enumerate() { + match view { + Some(v) if !v.is_null[row_idx] => { + let start = v.offsets[row_idx]; + let end = v.offsets[row_idx + 1]; + let len = end - start; + let builder = builders[col_idx].as_mut().unwrap(); + builder.extend(0, start, end); + if len < max_len { + builder.extend_nulls(max_len - len); + } + } + _ => { + // Null list entry or None (Null-typed) arg — all nulls. + if let Some(builder) = builders[col_idx].as_mut() { + builder.extend_nulls(max_len); + } + } + } + } + + total_values += max_len; + let last = *offsets.last().unwrap(); + offsets.push(last + max_len as i32); + } + + // Assemble struct columns from builders. + let struct_columns: Vec = builders + .into_iter() + .zip(element_types.iter()) + .map(|(builder, elem_type)| match builder { + Some(b) => arrow::array::make_array(b.freeze()), + None => new_null_array( + if elem_type.is_null() { + &Null + } else { + elem_type + }, + total_values, + ), + }) + .collect(); + + let struct_array = StructArray::try_new(struct_fields, struct_columns, None)?; + + let null_buffer = if null_mask.iter().any(|&v| v) { + Some(NullBuffer::from( + null_mask.iter().map(|v| !v).collect::>(), + )) + } else { + None + }; + + let result = ListArray::try_new( + Arc::new(Field::new_list_field( + struct_array.data_type().clone(), + true, + )), + OffsetBuffer::new(offsets.into()), + Arc::new(struct_array), + null_buffer, + )?; + + Ok(Arc::new(result)) +} diff --git a/datafusion/functions-nested/src/lib.rs b/datafusion/functions-nested/src/lib.rs index c92c99486e8bc..99b25ec96454b 100644 --- a/datafusion/functions-nested/src/lib.rs +++ b/datafusion/functions-nested/src/lib.rs @@ -38,6 +38,7 @@ pub mod macros; pub mod array_has; +pub mod arrays_zip; pub mod cardinality; pub mod concat; pub mod dimension; @@ -79,6 +80,7 @@ pub mod expr_fn { pub use super::array_has::array_has; pub use super::array_has::array_has_all; pub use super::array_has::array_has_any; + pub use super::arrays_zip::arrays_zip; pub use super::cardinality::cardinality; pub use super::concat::array_append; pub use super::concat::array_concat; @@ -118,7 +120,6 @@ pub mod expr_fn { pub use super::set_ops::array_distinct; pub use super::set_ops::array_intersect; pub use super::set_ops::array_union; - pub use super::set_ops::arrays_zip; pub use super::sort::array_sort; pub use super::string::array_to_string; pub use super::string::string_to_array; @@ -160,7 +161,7 @@ pub fn all_default_nested_functions() -> Vec> { set_ops::array_distinct_udf(), set_ops::array_intersect_udf(), set_ops::array_union_udf(), - set_ops::arrays_zip_udf(), + arrays_zip::arrays_zip_udf(), position::array_position_udf(), position::array_positions_udf(), remove::array_remove_udf(), diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 5fea722791a54..2348b3c530c53 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -15,16 +15,15 @@ // specific language governing permissions and limitations // under the License. -//! [`ScalarUDFImpl`] definitions for array_union, array_intersect, array_distinct and arrays_zip functions. +//! [`ScalarUDFImpl`] definitions for array_union, array_intersect and array_distinct functions. use crate::utils::make_scalar_function; use arrow::array::{ - Array, ArrayRef, Capacities, GenericListArray, ListArray, MutableArrayData, - OffsetSizeTrait, StructArray, new_empty_array, new_null_array, + Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_empty_array, new_null_array, }; use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::datatypes::DataType::{LargeList, List, Null}; -use arrow::datatypes::{DataType, Field, FieldRef, Fields}; +use arrow::datatypes::{DataType, Field, FieldRef}; use arrow::row::{RowConverter, SortField}; use datafusion_common::cast::{as_large_list_array, as_list_array}; use datafusion_common::utils::ListCoercion; @@ -65,13 +64,6 @@ make_udf_expr_and_func!( array_distinct_udf ); -make_udf_expr_and_func!( - ArraysZip, - arrays_zip, - "combines multiple arrays into a single array of structs.", - arrays_zip_udf -); - #[user_doc( doc_section(label = "Array Functions"), description = "Returns an array of elements that are present in both arrays (all elements from both arrays) without duplicates.", @@ -590,254 +582,6 @@ fn general_array_distinct( )?)) } -#[user_doc( - doc_section(label = "Array Functions"), - description = "Returns an array of structs created by combining the elements of each input array at the same index. If the arrays have different lengths, shorter arrays are padded with NULLs.", - syntax_example = "arrays_zip(array1, array2[, ..., array_n])", - sql_example = r#"```sql -> select arrays_zip([1, 2, 3], ['a', 'b', 'c']); -+---------------------------------------------------+ -| arrays_zip([1, 2, 3], ['a', 'b', 'c']) | -+---------------------------------------------------+ -| [{c0: 1, c1: a}, {c0: 2, c1: b}, {c0: 3, c1: c}] | -+---------------------------------------------------+ -> select arrays_zip([1, 2], [3, 4, 5]); -+---------------------------------------------------+ -| arrays_zip([1, 2], [3, 4, 5]) | -+---------------------------------------------------+ -| [{c0: 1, c1: 3}, {c0: 2, c1: 4}, {c0: , c1: 5}] | -+---------------------------------------------------+ -```"#, - argument(name = "array1", description = "First array expression."), - argument(name = "array2", description = "Second array expression."), - argument(name = "array_n", description = "Subsequent array expressions.") -)] -#[derive(Debug, PartialEq, Eq, Hash)] -pub struct ArraysZip { - signature: Signature, - aliases: Vec, -} - -impl Default for ArraysZip { - fn default() -> Self { - Self::new() - } -} - -impl ArraysZip { - pub fn new() -> Self { - Self { - signature: Signature::variadic_any(Volatility::Immutable), - aliases: vec![String::from("list_zip")], - } - } -} - -impl ScalarUDFImpl for ArraysZip { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - "arrays_zip" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> Result { - if arg_types.is_empty() { - return exec_err!("arrays_zip requires at least two arguments"); - } - - let mut fields = Vec::with_capacity(arg_types.len()); - for (i, arg_type) in arg_types.iter().enumerate() { - let element_type = match arg_type { - List(field) | LargeList(field) => field.data_type().clone(), - Null => Null, - dt => { - return exec_err!("arrays_zip expects array arguments, got {dt}"); - } - }; - fields.push(Field::new(format!("c{i}"), element_type, true)); - } - - Ok(List(Arc::new(Field::new_list_field( - DataType::Struct(Fields::from(fields)), - true, - )))) - } - - fn invoke_with_args( - &self, - args: datafusion_expr::ScalarFunctionArgs, - ) -> Result { - make_scalar_function(arrays_zip_inner)(&args.args) - } - - fn aliases(&self) -> &[String] { - &self.aliases - } - - fn documentation(&self) -> Option<&Documentation> { - self.doc() - } -} - -/// Core implementation for arrays_zip. -/// -/// Takes N list arrays and produces a list of structs where each struct -/// has one field per input array. If arrays within a row have different -/// lengths, shorter arrays are padded with NULLs. -fn arrays_zip_inner(args: &[ArrayRef]) -> Result { - if args.len() < 2 { - return exec_err!("arrays_zip requires at least two arguments"); - } - - let num_rows = args[0].len(); - - // Extract element type from each list argument - let mut list_arrays: Vec> = Vec::with_capacity(args.len()); - let mut element_types: Vec = Vec::with_capacity(args.len()); - - for (i, arg) in args.iter().enumerate() { - match arg.data_type() { - List(field) => { - let list_arr = as_list_array(arg)?; - element_types.push(field.data_type().clone()); - list_arrays.push(Some(list_arr)); - } - Null => { - element_types.push(Null); - list_arrays.push(None); - } - dt => { - return exec_err!("arrays_zip argument {i} expected list type, got {dt}"); - } - } - } - - // Collect per-column values data for MutableArrayData builders. - let values_data: Vec<_> = list_arrays - .iter() - .map(|la| la.map(|arr| arr.values().to_data())) - .collect(); - - let struct_fields: Fields = element_types - .iter() - .enumerate() - .map(|(i, dt)| Field::new(format!("c{i}"), dt.clone(), true)) - .collect::>() - .into(); - - // Create a MutableArrayData builder per column. For None (Null-typed) - // args we only need extend_nulls, so we track them separately. - let mut builders: Vec> = values_data - .iter() - .map(|vd| { - vd.as_ref().map(|data| { - MutableArrayData::with_capacities(vec![data], true, Capacities::Array(0)) - }) - }) - .collect(); - - let mut offsets: Vec = Vec::with_capacity(num_rows + 1); - offsets.push(0); - let mut null_mask: Vec = Vec::with_capacity(num_rows); - let mut total_values: usize = 0; - - // Process each row: compute per-array lengths, detect if aligned, - // then either copy values directly or pad shorter arrays with NULLs. - for row_idx in 0..num_rows { - // Compute per-array length for this row. - let mut max_len: usize = 0; - let mut all_null = true; - - for la in list_arrays.iter().flatten() { - if !la.is_null(row_idx) { - all_null = false; - let len = (la.value_offsets()[row_idx + 1] - la.value_offsets()[row_idx]) - as usize; - max_len = max_len.max(len); - } - } - - if all_null { - null_mask.push(true); - offsets.push(*offsets.last().unwrap()); - continue; - } - null_mask.push(false); - - // Extend each column builder for this row. - for (col_idx, la) in list_arrays.iter().enumerate() { - match la { - Some(arr) if !arr.is_null(row_idx) => { - let start = arr.value_offsets()[row_idx] as usize; - let end = arr.value_offsets()[row_idx + 1] as usize; - let len = end - start; - let builder = builders[col_idx].as_mut().unwrap(); - builder.extend(0, start, end); - if len < max_len { - builder.extend_nulls(max_len - len); - } - } - _ => { - // Null list entry or None (Null-typed) arg — all nulls. - if let Some(builder) = builders[col_idx].as_mut() { - builder.extend_nulls(max_len); - } - // None columns are handled after the loop. - } - } - } - - total_values += max_len; - let last = *offsets.last().unwrap(); - offsets.push(last + max_len as i32); - } - - // Assemble struct columns from builders. - let struct_columns: Vec = builders - .into_iter() - .zip(element_types.iter()) - .map(|(builder, elem_type)| match builder { - Some(b) => arrow::array::make_array(b.freeze()), - None => new_null_array( - if elem_type.is_null() { - &Null - } else { - elem_type - }, - total_values, - ), - }) - .collect(); - - let struct_array = StructArray::try_new(struct_fields, struct_columns, None)?; - - let null_buffer = if null_mask.iter().any(|&v| v) { - Some(NullBuffer::from( - null_mask.iter().map(|v| !v).collect::>(), - )) - } else { - None - }; - - let result = ListArray::try_new( - Arc::new(Field::new_list_field( - struct_array.data_type().clone(), - true, - )), - OffsetBuffer::new(offsets.into()), - Arc::new(struct_array), - null_buffer, - )?; - - Ok(Arc::new(result)) -} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index c7365de2cacbb..66503c957c5ad 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -7012,6 +7012,60 @@ select arrays_zip([42], ['hello']); statement error select arrays_zip([1, 2, 3]); +# arrays_zip with LargeList inputs +query ? +select arrays_zip( + arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'), + arrow_cast(make_array(10, 20, 30), 'LargeList(Int64)') +); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}, {c0: 3, c1: 30}] + +# arrays_zip with LargeList different lengths (padding) +query ? +select arrays_zip( + arrow_cast(make_array(1, 2), 'LargeList(Int64)'), + arrow_cast(make_array(10, 20, 30), 'LargeList(Int64)') +); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}, {c0: NULL, c1: 30}] + +# arrays_zip with FixedSizeList inputs +query ? +select arrays_zip( + arrow_cast(make_array(1, 2, 3), 'FixedSizeList(3, Int64)'), + arrow_cast(make_array(10, 20, 30), 'FixedSizeList(3, Int64)') +); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}, {c0: 3, c1: 30}] + +# arrays_zip mixing List and LargeList +query ? +select arrays_zip( + [1, 2, 3], + arrow_cast(make_array(10, 20, 30), 'LargeList(Int64)') +); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}, {c0: 3, c1: 30}] + +# arrays_zip mixing List and FixedSizeList with different lengths (padding) +query ? +select arrays_zip( + [1, 2, 3], + arrow_cast(make_array(10, 20), 'FixedSizeList(2, Int64)') +); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}, {c0: 3, c1: NULL}] + +# arrays_zip with LargeList and FixedSizeList mixed types +query ? +select arrays_zip( + arrow_cast(make_array(1, 2), 'LargeList(Int64)'), + arrow_cast(make_array('a', 'b'), 'FixedSizeList(2, Utf8)') +); +---- +[{c0: 1, c1: a}, {c0: 2, c1: b}] + query ??? select array_intersect(column1, column2), array_intersect(column3, column4),