diff --git a/datafusion/functions-nested/src/arrays_zip.rs b/datafusion/functions-nested/src/arrays_zip.rs new file mode 100644 index 0000000000000..2ac30d07046e2 --- /dev/null +++ b/datafusion/functions-nested/src/arrays_zip.rs @@ -0,0 +1,336 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ScalarUDFImpl`] definitions for arrays_zip function. + +use crate::utils::make_scalar_function; +use arrow::array::{ + Array, ArrayRef, Capacities, ListArray, MutableArrayData, StructArray, new_null_array, +}; +use arrow::buffer::{NullBuffer, OffsetBuffer}; +use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null}; +use arrow::datatypes::{DataType, Field, Fields}; +use datafusion_common::cast::{ + as_fixed_size_list_array, as_large_list_array, as_list_array, +}; +use datafusion_common::{Result, exec_err}; +use datafusion_expr::{ + ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion_macros::user_doc; +use std::any::Any; +use std::sync::Arc; + +/// Type-erased view of a list column (works for both List and LargeList). +/// Stores the information needed to iterate rows without re-downcasting. +struct ListColumnView { + /// The flat values array backing this list column. + values: ArrayRef, + /// Pre-computed per-row start offsets (length = num_rows + 1). + offsets: Vec, + /// Pre-computed null bitmap: true means the row is null. + is_null: Vec, +} + +make_udf_expr_and_func!( + ArraysZip, + arrays_zip, + "combines multiple arrays into a single array of structs.", + arrays_zip_udf +); + +#[user_doc( + doc_section(label = "Array Functions"), + description = "Returns an array of structs created by combining the elements of each input array at the same index. If the arrays have different lengths, shorter arrays are padded with NULLs.", + syntax_example = "arrays_zip(array1, array2[, ..., array_n])", + sql_example = r#"```sql +> select arrays_zip([1, 2, 3], ['a', 'b', 'c']); ++---------------------------------------------------+ +| arrays_zip([1, 2, 3], ['a', 'b', 'c']) | ++---------------------------------------------------+ +| [{c0: 1, c1: a}, {c0: 2, c1: b}, {c0: 3, c1: c}] | ++---------------------------------------------------+ +> select arrays_zip([1, 2], [3, 4, 5]); ++---------------------------------------------------+ +| arrays_zip([1, 2], [3, 4, 5]) | ++---------------------------------------------------+ +| [{c0: 1, c1: 3}, {c0: 2, c1: 4}, {c0: , c1: 5}] | ++---------------------------------------------------+ +```"#, + argument(name = "array1", description = "First array expression."), + argument(name = "array2", description = "Second array expression."), + argument(name = "array_n", description = "Subsequent array expressions.") +)] +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct ArraysZip { + signature: Signature, + aliases: Vec, +} + +impl Default for ArraysZip { + fn default() -> Self { + Self::new() + } +} + +impl ArraysZip { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + aliases: vec![String::from("list_zip")], + } + } +} + +impl ScalarUDFImpl for ArraysZip { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "arrays_zip" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + if arg_types.is_empty() { + return exec_err!("arrays_zip requires at least two arguments"); + } + + let mut fields = Vec::with_capacity(arg_types.len()); + for (i, arg_type) in arg_types.iter().enumerate() { + let element_type = match arg_type { + List(field) | LargeList(field) | FixedSizeList(field, _) => { + field.data_type().clone() + } + Null => Null, + dt => { + return exec_err!("arrays_zip expects array arguments, got {dt}"); + } + }; + fields.push(Field::new(format!("c{i}"), element_type, true)); + } + + Ok(List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(fields)), + true, + )))) + } + + fn invoke_with_args( + &self, + args: datafusion_expr::ScalarFunctionArgs, + ) -> Result { + make_scalar_function(arrays_zip_inner)(&args.args) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} + +/// Core implementation for arrays_zip. +/// +/// Takes N list arrays and produces a list of structs where each struct +/// has one field per input array. If arrays within a row have different +/// lengths, shorter arrays are padded with NULLs. +/// Supports List, LargeList, and Null input types. +fn arrays_zip_inner(args: &[ArrayRef]) -> Result { + if args.len() < 2 { + return exec_err!("arrays_zip requires at least two arguments"); + } + + let num_rows = args[0].len(); + + // Build a type-erased ListColumnView for each argument. + // None means the argument is Null-typed (all nulls, no backing data). + let mut views: Vec> = Vec::with_capacity(args.len()); + let mut element_types: Vec = Vec::with_capacity(args.len()); + + for (i, arg) in args.iter().enumerate() { + match arg.data_type() { + List(field) => { + let arr = as_list_array(arg)?; + let raw_offsets = arr.value_offsets(); + let offsets: Vec = + raw_offsets.iter().map(|&o| o as usize).collect(); + let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect(); + element_types.push(field.data_type().clone()); + views.push(Some(ListColumnView { + values: Arc::clone(arr.values()), + offsets, + is_null, + })); + } + LargeList(field) => { + let arr = as_large_list_array(arg)?; + let raw_offsets = arr.value_offsets(); + let offsets: Vec = + raw_offsets.iter().map(|&o| o as usize).collect(); + let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect(); + element_types.push(field.data_type().clone()); + views.push(Some(ListColumnView { + values: Arc::clone(arr.values()), + offsets, + is_null, + })); + } + FixedSizeList(field, size) => { + let arr = as_fixed_size_list_array(arg)?; + let size = *size as usize; + let offsets: Vec = (0..=num_rows).map(|row| row * size).collect(); + let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect(); + element_types.push(field.data_type().clone()); + views.push(Some(ListColumnView { + values: Arc::clone(arr.values()), + offsets, + is_null, + })); + } + Null => { + element_types.push(Null); + views.push(None); + } + dt => { + return exec_err!("arrays_zip argument {i} expected list type, got {dt}"); + } + } + } + + // Collect per-column values data for MutableArrayData builders. + let values_data: Vec<_> = views + .iter() + .map(|v| v.as_ref().map(|view| view.values.to_data())) + .collect(); + + let struct_fields: Fields = element_types + .iter() + .enumerate() + .map(|(i, dt)| Field::new(format!("c{i}"), dt.clone(), true)) + .collect::>() + .into(); + + // Create a MutableArrayData builder per column. For None (Null-typed) + // args we only need extend_nulls, so we track them separately. + let mut builders: Vec> = values_data + .iter() + .map(|vd| { + vd.as_ref().map(|data| { + MutableArrayData::with_capacities(vec![data], true, Capacities::Array(0)) + }) + }) + .collect(); + + let mut offsets: Vec = Vec::with_capacity(num_rows + 1); + offsets.push(0); + let mut null_mask: Vec = Vec::with_capacity(num_rows); + let mut total_values: usize = 0; + + // Process each row: compute per-array lengths, then copy values + // and pad shorter arrays with NULLs. + for row_idx in 0..num_rows { + let mut max_len: usize = 0; + let mut all_null = true; + + for view in views.iter().flatten() { + if !view.is_null[row_idx] { + all_null = false; + let len = view.offsets[row_idx + 1] - view.offsets[row_idx]; + max_len = max_len.max(len); + } + } + + if all_null { + null_mask.push(true); + offsets.push(*offsets.last().unwrap()); + continue; + } + null_mask.push(false); + + // Extend each column builder for this row. + for (col_idx, view) in views.iter().enumerate() { + match view { + Some(v) if !v.is_null[row_idx] => { + let start = v.offsets[row_idx]; + let end = v.offsets[row_idx + 1]; + let len = end - start; + let builder = builders[col_idx].as_mut().unwrap(); + builder.extend(0, start, end); + if len < max_len { + builder.extend_nulls(max_len - len); + } + } + _ => { + // Null list entry or None (Null-typed) arg — all nulls. + if let Some(builder) = builders[col_idx].as_mut() { + builder.extend_nulls(max_len); + } + } + } + } + + total_values += max_len; + let last = *offsets.last().unwrap(); + offsets.push(last + max_len as i32); + } + + // Assemble struct columns from builders. + let struct_columns: Vec = builders + .into_iter() + .zip(element_types.iter()) + .map(|(builder, elem_type)| match builder { + Some(b) => arrow::array::make_array(b.freeze()), + None => new_null_array( + if elem_type.is_null() { + &Null + } else { + elem_type + }, + total_values, + ), + }) + .collect(); + + let struct_array = StructArray::try_new(struct_fields, struct_columns, None)?; + + let null_buffer = if null_mask.iter().any(|&v| v) { + Some(NullBuffer::from( + null_mask.iter().map(|v| !v).collect::>(), + )) + } else { + None + }; + + let result = ListArray::try_new( + Arc::new(Field::new_list_field( + struct_array.data_type().clone(), + true, + )), + OffsetBuffer::new(offsets.into()), + Arc::new(struct_array), + null_buffer, + )?; + + Ok(Arc::new(result)) +} diff --git a/datafusion/functions-nested/src/lib.rs b/datafusion/functions-nested/src/lib.rs index 9ac6911236e40..99b25ec96454b 100644 --- a/datafusion/functions-nested/src/lib.rs +++ b/datafusion/functions-nested/src/lib.rs @@ -38,6 +38,7 @@ pub mod macros; pub mod array_has; +pub mod arrays_zip; pub mod cardinality; pub mod concat; pub mod dimension; @@ -79,6 +80,7 @@ pub mod expr_fn { pub use super::array_has::array_has; pub use super::array_has::array_has_all; pub use super::array_has::array_has_any; + pub use super::arrays_zip::arrays_zip; pub use super::cardinality::cardinality; pub use super::concat::array_append; pub use super::concat::array_concat; @@ -159,6 +161,7 @@ pub fn all_default_nested_functions() -> Vec> { set_ops::array_distinct_udf(), set_ops::array_intersect_udf(), set_ops::array_union_udf(), + arrays_zip::arrays_zip_udf(), position::array_position_udf(), position::array_positions_udf(), remove::array_remove_udf(), diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index f675763120718..66503c957c5ad 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -6866,6 +6866,206 @@ from array_distinct_table_2D_fixed; [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] [[5, 6], NULL] +## arrays_zip (aliases: `list_zip`) + +# Spark example: arrays_zip(array(1, 2, 3), array(2, 3, 4)) +query ? +select arrays_zip([1, 2, 3], [2, 3, 4]); +---- +[{c0: 1, c1: 2}, {c0: 2, c1: 3}, {c0: 3, c1: 4}] + +# Spark example: arrays_zip(array(1, 2), array(2, 3), array(3, 4)) +query ? +select arrays_zip([1, 2], [2, 3], [3, 4]); +---- +[{c0: 1, c1: 2, c2: 3}, {c0: 2, c1: 3, c2: 4}] + +# basic: two integer arrays of equal length +query ? +select arrays_zip([1, 2, 3], [10, 20, 30]); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}, {c0: 3, c1: 30}] + +# basic: two arrays with different element types (int + string) +query ? +select arrays_zip([1, 2, 3], ['a', 'b', 'c']); +---- +[{c0: 1, c1: a}, {c0: 2, c1: b}, {c0: 3, c1: c}] + +# three arrays of equal length +query ? +select arrays_zip([1, 2, 3], [10, 20, 30], [100, 200, 300]); +---- +[{c0: 1, c1: 10, c2: 100}, {c0: 2, c1: 20, c2: 200}, {c0: 3, c1: 30, c2: 300}] + +# four arrays of equal length +query ? +select arrays_zip([1], [2], [3], [4]); +---- +[{c0: 1, c1: 2, c2: 3, c3: 4}] + +# mixed element types: float + boolean +query ? +select arrays_zip([1.5, 2.5], [true, false]); +---- +[{c0: 1.5, c1: true}, {c0: 2.5, c1: false}] + +# different length arrays: shorter array padded with NULLs +query ? +select arrays_zip([1, 2], [3, 4, 5]); +---- +[{c0: 1, c1: 3}, {c0: 2, c1: 4}, {c0: NULL, c1: 5}] + +# different length arrays: first longer +query ? +select arrays_zip([1, 2, 3], [10]); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: NULL}, {c0: 3, c1: NULL}] + +# different length: one single element, other three elements +query ? +select arrays_zip([1], ['a', 'b', 'c']); +---- +[{c0: 1, c1: a}, {c0: NULL, c1: b}, {c0: NULL, c1: c}] + +# empty arrays +query ? +select arrays_zip([], []); +---- +[] + +# one empty, one non-empty +query ? +select arrays_zip([], [1, 2, 3]); +---- +[{c0: NULL, c1: 1}, {c0: NULL, c1: 2}, {c0: NULL, c1: 3}] + +# NULL elements inside arrays +query ? +select arrays_zip([1, NULL, 3], ['a', 'b', 'c']); +---- +[{c0: 1, c1: a}, {c0: NULL, c1: b}, {c0: 3, c1: c}] + +# all NULL elements +query ? +select arrays_zip([NULL::int, NULL, NULL], [NULL::text, NULL, NULL]); +---- +[{c0: NULL, c1: NULL}, {c0: NULL, c1: NULL}, {c0: NULL, c1: NULL}] + +# both args are NULL (entire list null) +query ? +select arrays_zip(NULL::int[], NULL::int[]); +---- +NULL + +# one arg is NULL list, other is real array +query ? +select arrays_zip(NULL::int[], [1, 2, 3]); +---- +[{c0: NULL, c1: 1}, {c0: NULL, c1: 2}, {c0: NULL, c1: 3}] + +# real array + NULL list +query ? +select arrays_zip([1, 2], NULL::text[]); +---- +[{c0: 1, c1: NULL}, {c0: 2, c1: NULL}] + +# column-level test with multiple rows +query ? +select arrays_zip(a, b) from (values ([1, 2], [10, 20]), ([3, 4, 5], [30]), ([6], [60, 70])) as t(a, b); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}] +[{c0: 3, c1: 30}, {c0: 4, c1: NULL}, {c0: 5, c1: NULL}] +[{c0: 6, c1: 60}, {c0: NULL, c1: 70}] + +# column-level test with NULL rows +query ? +select arrays_zip(a, b) from (values ([1, 2], [10, 20]), (null, [30, 40]), ([5, 6], null)) as t(a, b); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}] +[{c0: NULL, c1: 30}, {c0: NULL, c1: 40}] +[{c0: 5, c1: NULL}, {c0: 6, c1: NULL}] + +# alias: list_zip +query ? +select list_zip([1, 2], [3, 4]); +---- +[{c0: 1, c1: 3}, {c0: 2, c1: 4}] + +# column test: total values equal (3 each) but per-row lengths differ +# a: [1] b: [10, 20] → row 0: a has 1, b has 2 +# a: [2, 3] b: [30] → row 1: a has 2, b has 1 +# total a values = 3, total b values = 3 (same!) but rows are misaligned +query ? +select arrays_zip(a, b) from (values ([1], [10, 20]), ([2, 3], [30])) as t(a, b); +---- +[{c0: 1, c1: 10}, {c0: NULL, c1: 20}] +[{c0: 2, c1: 30}, {c0: 3, c1: NULL}] + +# single element arrays +query ? +select arrays_zip([42], ['hello']); +---- +[{c0: 42, c1: hello}] + +# error: too few arguments +statement error +select arrays_zip([1, 2, 3]); + +# arrays_zip with LargeList inputs +query ? +select arrays_zip( + arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'), + arrow_cast(make_array(10, 20, 30), 'LargeList(Int64)') +); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}, {c0: 3, c1: 30}] + +# arrays_zip with LargeList different lengths (padding) +query ? +select arrays_zip( + arrow_cast(make_array(1, 2), 'LargeList(Int64)'), + arrow_cast(make_array(10, 20, 30), 'LargeList(Int64)') +); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}, {c0: NULL, c1: 30}] + +# arrays_zip with FixedSizeList inputs +query ? +select arrays_zip( + arrow_cast(make_array(1, 2, 3), 'FixedSizeList(3, Int64)'), + arrow_cast(make_array(10, 20, 30), 'FixedSizeList(3, Int64)') +); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}, {c0: 3, c1: 30}] + +# arrays_zip mixing List and LargeList +query ? +select arrays_zip( + [1, 2, 3], + arrow_cast(make_array(10, 20, 30), 'LargeList(Int64)') +); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}, {c0: 3, c1: 30}] + +# arrays_zip mixing List and FixedSizeList with different lengths (padding) +query ? +select arrays_zip( + [1, 2, 3], + arrow_cast(make_array(10, 20), 'FixedSizeList(2, Int64)') +); +---- +[{c0: 1, c1: 10}, {c0: 2, c1: 20}, {c0: 3, c1: NULL}] + +# arrays_zip with LargeList and FixedSizeList mixed types +query ? +select arrays_zip( + arrow_cast(make_array(1, 2), 'LargeList(Int64)'), + arrow_cast(make_array('a', 'b'), 'FixedSizeList(2, Utf8)') +); +---- +[{c0: 1, c1: a}, {c0: 2, c1: b}] + query ??? select array_intersect(column1, column2), array_intersect(column3, column4), diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 02b4b55fe68ee..ba6f7ef2dc9c9 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -3183,6 +3183,7 @@ _Alias of [current_date](#current_date)._ - [array_to_string](#array_to_string) - [array_union](#array_union) - [arrays_overlap](#arrays_overlap) +- [arrays_zip](#arrays_zip) - [cardinality](#cardinality) - [empty](#empty) - [flatten](#flatten) @@ -3228,6 +3229,7 @@ _Alias of [current_date](#current_date)._ - [list_sort](#list_sort) - [list_to_string](#list_to_string) - [list_union](#list_union) +- [list_zip](#list_zip) - [make_array](#make_array) - [make_list](#make_list) - [range](#range) @@ -4267,6 +4269,41 @@ array_union(array1, array2) _Alias of [array_has_any](#array_has_any)._ +### `arrays_zip` + +Returns an array of structs created by combining the elements of each input array at the same index. If the arrays have different lengths, shorter arrays are padded with NULLs. + +```sql +arrays_zip(array1, array2[, ..., array_n]) +``` + +#### Arguments + +- **array1**: First array expression. +- **array2**: Second array expression. +- **array_n**: Subsequent array expressions. + +#### Example + +```sql +> select arrays_zip([1, 2, 3], ['a', 'b', 'c']); ++---------------------------------------------------+ +| arrays_zip([1, 2, 3], ['a', 'b', 'c']) | ++---------------------------------------------------+ +| [{c0: 1, c1: a}, {c0: 2, c1: b}, {c0: 3, c1: c}] | ++---------------------------------------------------+ +> select arrays_zip([1, 2], [3, 4, 5]); ++---------------------------------------------------+ +| arrays_zip([1, 2], [3, 4, 5]) | ++---------------------------------------------------+ +| [{c0: 1, c1: 3}, {c0: 2, c1: 4}, {c0: , c1: 5}] | ++---------------------------------------------------+ +``` + +#### Aliases + +- list_zip + ### `cardinality` Returns the total number of elements in the array. @@ -4536,6 +4573,10 @@ _Alias of [array_to_string](#array_to_string)._ _Alias of [array_union](#array_union)._ +### `list_zip` + +_Alias of [arrays_zip](#arrays_zip)._ + ### `make_array` Returns an array using the specified input expressions.