From 354fda319f989e7532a79617e77c16182910599d Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Thu, 25 Jan 2024 12:20:38 -0600 Subject: [PATCH 01/11] fix: schema issue within writebuilder (#2106) # Description The schema when using `with_input_execution_plan` wasn't being applied. # Related Issue(s) closes https://github.com/delta-io/delta-rs/issues/2105 # Documentation --- crates/deltalake-core/src/operations/write.rs | 4 +- .../tests/integration_datafusion.rs | 59 ++++++++++++++++++- 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index bf0ca86d86..bf36b32459 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -403,14 +403,13 @@ impl std::future::IntoFuture for WriteBuilder { Ok(this.partition_columns.unwrap_or_default()) }?; - let mut schema: ArrowSchemaRef = arrow_schema::Schema::empty().into(); let plan = if let Some(plan) = this.input { Ok(plan) } else if let Some(batches) = this.batches { if batches.is_empty() { Err(WriteError::MissingData) } else { - schema = batches[0].schema(); + let schema = batches[0].schema(); if let Some(snapshot) = &this.snapshot { let table_schema = snapshot @@ -460,6 +459,7 @@ impl std::future::IntoFuture for WriteBuilder { } else { Err(WriteError::MissingData) }?; + let schema = plan.schema(); let state = match this.state { Some(state) => state, diff --git a/crates/deltalake-core/tests/integration_datafusion.rs b/crates/deltalake-core/tests/integration_datafusion.rs index 4cc7c5a37c..f0138f85b3 100644 --- a/crates/deltalake-core/tests/integration_datafusion.rs +++ b/crates/deltalake-core/tests/integration_datafusion.rs @@ -45,7 +45,8 @@ use std::error::Error; mod local { use datafusion::common::stats::Precision; - use deltalake_core::writer::JsonWriter; + use deltalake_core::{logstore::default_logstore, writer::JsonWriter}; + use object_store::local::LocalFileSystem; use super::*; #[tokio::test] @@ -1071,6 +1072,62 @@ mod local { Ok(()) } + + #[tokio::test] + async fn test_issue_2105() -> Result<()> { + use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; + let path = tempfile::tempdir().unwrap(); + let path = path.into_path(); + + let file_store = LocalFileSystem::new_with_prefix(path.clone()).unwrap(); + let log_store = default_logstore( + Arc::new(file_store), + &Url::from_file_path(path.clone()).unwrap(), + &Default::default(), + ); + + let tbl = CreateBuilder::new() + .with_log_store(log_store.clone()) + .with_save_mode(SaveMode::Overwrite) + .with_table_name("test") + .with_column( + "id", + DataType::Primitive(PrimitiveType::Integer), + true, + None, + ); + let tbl = tbl.await.unwrap(); + let ctx = SessionContext::new(); + let plan = ctx + .sql("SELECT 1 as id") + .await + .unwrap() + .create_physical_plan() + .await + .unwrap(); + let write_builder = WriteBuilder::new(log_store, tbl.state); + let _ = write_builder + .with_input_execution_plan(plan) + .with_save_mode(SaveMode::Overwrite) + .await + .unwrap(); + + let table = open_table(path.to_str().unwrap()).await.unwrap(); + let prov: Arc = Arc::new(table); + ctx.register_table("test", prov).unwrap(); + let mut batches = ctx + .sql("SELECT * FROM test") + .await + .unwrap() + .collect() + .await + .unwrap(); + let batch = batches.pop().unwrap(); + + let expected_schema = Schema::new(vec![Field::new("id", ArrowDataType::Int32, true)]); + assert_eq!(batch.schema().as_ref(), &expected_schema); + Ok(()) + } } async fn test_datafusion(context: &IntegrationContext) -> TestResult { From e928f8babe6ab0c9bed7de0e404f7f01c10cdc72 Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Fri, 26 Jan 2024 22:19:50 +0100 Subject: [PATCH 02/11] fix: temporarily skip s3 roundtrip test (#2124) # Description Right now we have some [issue](https://github.com/delta-io/delta-rs/pull/2120#issuecomment-1912367573)in how we identify if a location is a delta-table. This disables an affected test so we can merge PRs again without having to ignore requires CI runs. --- python/tests/test_fs.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/tests/test_fs.py b/python/tests/test_fs.py index adbacb29cc..8ece647d4c 100644 --- a/python/tests/test_fs.py +++ b/python/tests/test_fs.py @@ -91,6 +91,9 @@ def test_read_simple_table_from_remote(s3_localstack): @pytest.mark.s3 @pytest.mark.integration @pytest.mark.timeout(timeout=15, method="thread") +@pytest.mark.skip( + reason="Temporarily disabled until we can resolve https://github.com/delta-io/delta-rs/pull/2120#issuecomment-1912367573" +) def test_roundtrip_s3_env(s3_localstack, sample_data: pa.Table, monkeypatch): table_path = "s3://deltars/roundtrip" From 83f77b1a8a55acbae1f999bae8e5bd080945a1f5 Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Fri, 26 Jan 2024 17:33:33 -0600 Subject: [PATCH 03/11] fix: set partition values for added files when building compaction plan (#2119) # Description After the table state refactor optimize, compaction, was not writing files to the partition directory properly nor setting the partitionValues field in the Add action. --- crates/deltalake-core/src/operations/optimize.rs | 7 ++++++- crates/deltalake-core/tests/command_optimize.rs | 12 ++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/crates/deltalake-core/src/operations/optimize.rs b/crates/deltalake-core/src/operations/optimize.rs index c2c95f65e9..1fc0286754 100644 --- a/crates/deltalake-core/src/operations/optimize.rs +++ b/crates/deltalake-core/src/operations/optimize.rs @@ -859,10 +859,15 @@ fn build_compaction_plan( metrics.total_files_skipped += 1; continue; } + let partition_values = add + .partition_values()? + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect::>(); partition_files .entry(add.partition_values()?.hive_partition_path()) - .or_default() + .or_insert_with(|| (partition_values, vec![])) .1 .push(object_meta); } diff --git a/crates/deltalake-core/tests/command_optimize.rs b/crates/deltalake-core/tests/command_optimize.rs index 75788c6792..5c3875eb92 100644 --- a/crates/deltalake-core/tests/command_optimize.rs +++ b/crates/deltalake-core/tests/command_optimize.rs @@ -236,6 +236,18 @@ async fn test_optimize_with_partitions() -> Result<(), Box> { assert_eq!(metrics.num_files_removed, 2); assert_eq!(dt.get_files_count(), 3); + let partition_adds = dt + .get_active_add_actions_by_partitions(&filter)? + .collect::, _>>()?; + assert_eq!(partition_adds.len(), 1); + let partition_values = partition_adds[0].partition_values()?; + assert_eq!( + partition_values.get("date"), + Some(&deltalake_core::kernel::Scalar::String( + "2022-05-22".to_string() + )) + ); + Ok(()) } From 2a284755f7d7359297456d8cf269631193f73fb9 Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Sat, 27 Jan 2024 00:55:21 +0100 Subject: [PATCH 04/11] fix: clean-up paths created during tests (#2126) # Description This PR make sure we clean up after ourselves when running tests by removing some folder artifacts we create in tests. --- crates/deltalake-core/src/table/builder.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/deltalake-core/src/table/builder.rs b/crates/deltalake-core/src/table/builder.rs index c5e9e8e0a6..c8aed3f791 100644 --- a/crates/deltalake-core/src/table/builder.rs +++ b/crates/deltalake-core/src/table/builder.rs @@ -450,8 +450,6 @@ mod tests { // parse an existing relative directory let uri = ensure_table_uri("."); assert!(uri.is_ok()); - let uri = ensure_table_uri("./nonexistent"); - assert!(uri.is_ok()); let uri = ensure_table_uri("s3://container/path"); assert!(uri.is_ok()); #[cfg(not(windows))] @@ -459,6 +457,10 @@ mod tests { let uri = ensure_table_uri("file:///tmp/nonexistent/some/path"); assert!(uri.is_ok()); } + let uri = ensure_table_uri("./nonexistent"); + assert!(uri.is_ok()); + let file_path = std::path::Path::new("./nonexistent"); + std::fs::remove_dir(file_path).unwrap(); // These cases should all roundtrip to themselves cfg_if::cfg_if! { @@ -535,7 +537,7 @@ mod tests { assert!(!relative_path.exists()); ensure_table_uri(relative_path.as_os_str().to_str().unwrap()).unwrap(); assert!(relative_path.exists()); - std::fs::remove_dir_all(relative_path).unwrap(); + std::fs::remove_dir_all(std::path::Path::new("_tmp")).unwrap(); } #[test] From a0fee61eddd89d6f51539e20769afc111fc67391 Mon Sep 17 00:00:00 2001 From: Tim-Haarman Date: Sat, 27 Jan 2024 01:18:40 +0100 Subject: [PATCH 05/11] fix: add missing pandas import (#2116) # Description Pandas was only imported for static type checking, but was actually used in the code of the module leading to import errors. Added the import similarly to `writer.py`, and updated the pandas alias in the type hints from 'pandas' to 'pd'. # Related Issue(s) - closes #2112 # Documentation x Co-authored-by: Tim Haarman Co-authored-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> --- python/deltalake/table.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 6f335feba0..c7c5532eaa 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -37,8 +37,6 @@ if TYPE_CHECKING: import os - import pandas - from deltalake._internal import DeltaDataChecker as _DeltaDataChecker from deltalake._internal import RawDeltaTable from deltalake._internal import create_deltalake as _create_deltalake @@ -48,6 +46,13 @@ from deltalake.fs import DeltaStorageHandler from deltalake.schema import Schema as DeltaSchema +try: + import pandas as pd # noqa: F811 +except ModuleNotFoundError: + _has_pandas = False +else: + _has_pandas = True + MAX_SUPPORTED_READER_VERSION = 1 MAX_SUPPORTED_WRITER_VERSION = 2 @@ -892,7 +897,7 @@ def merge( pyarrow.RecordBatch, pyarrow.RecordBatchReader, ds.Dataset, - "pandas.DataFrame", + "pd.DataFrame", ], predicate: str, source_alias: Optional[str] = None, @@ -937,7 +942,7 @@ def merge( source = convert_pyarrow_table(source, large_dtypes) elif isinstance(source, ds.Dataset): source = convert_pyarrow_dataset(source, large_dtypes) - elif isinstance(source, pandas.DataFrame): + elif _has_pandas and isinstance(source, pd.DataFrame): source = convert_pyarrow_table( pyarrow.Table.from_pandas(source), large_dtypes ) @@ -1094,7 +1099,7 @@ def to_pandas( columns: Optional[List[str]] = None, filesystem: Optional[Union[str, pa_fs.FileSystem]] = None, filters: Optional[FilterType] = None, - ) -> "pandas.DataFrame": + ) -> "pd.DataFrame": """ Build a pandas dataframe using data from the DeltaTable. From 603a81e89af15f9f9f6d4a5036323817a8f3f6dc Mon Sep 17 00:00:00 2001 From: Richard Pelgrim <68642378+rrpelgrim@users.noreply.github.com> Date: Sat, 27 Jan 2024 11:09:35 +0000 Subject: [PATCH 06/11] docs: add dask page to integration docs (#2122) Co-authored-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> --- docs/integrations/delta-lake-dask.md | 181 +++++++++++++++++++++++++++ 1 file changed, 181 insertions(+) create mode 100644 docs/integrations/delta-lake-dask.md diff --git a/docs/integrations/delta-lake-dask.md b/docs/integrations/delta-lake-dask.md new file mode 100644 index 0000000000..d3f0ebaf18 --- /dev/null +++ b/docs/integrations/delta-lake-dask.md @@ -0,0 +1,181 @@ +# Using Delta Lake with Dask + +Delta Lake is a great storage format for Dask analyses. This page explains why and how to use Delta Lake with Dask. + +You will learn how to read Delta Lakes into Dask DataFrames, how to query Delta tables with Dask, and the unique advantages Delta Lake offers the Dask community. + +Here are some of the benefits that Delta Lake provides Dask users: +- better performance with file skipping +- enhanced file skipping via Z Ordering +- ACID transactions for reliable writes +- easy time-travel functionality + +> ❗️ dask-deltatable currently works with deltalake<=13.0. See https://github.com/dask-contrib/dask-deltatable/issues/65 + +## Install Dask-Deltatable + +To use Delta Lake with Dask, first install the library using + +``` +pip install dask-deltatable +``` + + +## Reading Delta Tables into a Dask DataFrame + +You can read data stored in a Delta Lake into a Dask DataFrame using `dask-deltatable.read_deltalake`. + +Let's read in a toy dataset to see what we can do with Delta Lake and Dask. You can access the data stored as a Delta Lake [on Github](https://github.com/rrpelgrim/delta-examples/tree/master/data) + +``` +import dask_deltatable as ddt + +# read delta table into Dask DataFrame +delta_path = "path/to/data/people_countries_delta_dask" +ddf = ddt.read_deltalake(delta_path) + +``` + +Dask is a library for efficient distributed computing and works with lazy evaluation. Function calls to `dask.dataframe` build a task graph in the background. To trigger computation, call `.compute()`: + +``` +> ddf.compute() + +| | first_name | last_name | country | continent | +|---:|:-------------|:------------|:----------|:------------| +| 0 | Ernesto | Guevara | Argentina | NaN | +| 0 | Bruce | Lee | China | Asia | +| 1 | Jack | Ma | China | Asia | +| 0 | Wolfgang | Manche | Germany | NaN | +| 1 | Soraya | Jala | Germany | NaN | +``` + + +You can read in specific versions of Delta tables by specifying a `version` number or a timestamp: + +``` +# with specific version +ddf = ddt.read_deltalake(delta_path, version=3) + +# with specific datetime +ddt.read_deltalake(delta_path, datetime="2018-12-19T16:39:57-08:00")``` + +`dask-deltatable` also supports reading from remote sources like S3 with: + +``` +ddt.read_deltalake("s3://bucket_name/delta_path", version=3) +``` + +> To read data from remote sources you'll need to make sure the credentials are properly configured in environment variables or config files. Refer to your cloud provider documentation to configure these. + +## What can I do with a Dask Deltatable? + +Reading a Delta Lake in with `dask-deltatable` returns a regular Dask DataFrame. You can perform [all the regular Dask operations](https://docs.dask.org/en/stable/dataframe.html) on this DataFrame. + +Let's take a look at the first few rows: + +``` +> ddf.head(n=3) + +| | first_name | last_name | country | continent | +|---:|:-------------|:------------|:----------|------------:| +| 0 | Ernesto | Guevara | Argentina | nan | +``` + +`dask.dataframe.head()` shows you the first rows of the first partition in the dataframe. In this case, the first partition only has 1 row. + +This is because the Delta Lake has been partitioned by country: + +``` +> !ls ../../data/people_countries_delta_dask +_delta_log country=Argentina country=China country=Germany +``` + +`dask-deltatable` neatly reads in the partitioned Delta Lake into corresponding Dask DataFrame partitions: + +``` +> # see number of partitions +> ddf.npartitions +3 +``` + +You can inspect a single partition using `dask.dataframe.get_partition()`: + +``` +> ddf.get_partition(n=1).compute() + +| | first_name | last_name | country | continent | +|---:|:-------------|:------------|:----------|:------------| +| 0 | Bruce | Lee | China | Asia | +| 1 | Jack | Ma | China | Asia | +``` + +## Perform Dask Operations + +Let's perform some basic computations over the Delta Lake data that's now stored in our Dask DataFrame. + +Suppose you want to group the dataset by the `country` column: + +``` +> ddf.groupby(['country']).count().compute() + +| country | first_name | last_name | continent | +|:----------|-------------:|------------:|------------:| +| Argentina | 1 | 1 | 1 | +| China | 2 | 2 | 2 | +| Germany | 2 | 2 | 2 | +``` + +Dask executes this `groupby` operation in parallel across all available cores. + +## Map Functions over Partitions + +You can also use Dask's `map_partitions` method to map a custom Python function over all the partitions. + +Let's write a function that will replace the missing `continent` values with the right continent names. + +``` +# define custom python function + +# get na_string +df = ddf.get_partition(0).compute() +na_string = df.iloc[0].continent +na_string + +# define function +def replace_proper(partition, na_string): + if [partition.country == "Argentina"]: + partition.loc[partition.country=="Argentina"] = partition.loc[partition.country=="Argentina"].replace(na_string, "South America") + if [partition.country == "Germany"]: + partition.loc[partition.country=="Germany"] = partition.loc[partition.country=="Germany"].replace(na_string, "Europe") + else: + pass + return partition +``` + +Now map this over all partitions in the Dask DataFrame: + +``` +# define metadata and map function over partitions +> meta = dict(ddf.dtypes) +> ddf3 = ddf.map_partitions(replace_proper, na_string, meta=meta) +> ddf3.compute() + +| | first_name | last_name | country | continent | +|---:|:-------------|:------------|:----------|:--------------| +| 0 | Ernesto | Guevara | Argentina | South America | +| 0 | Bruce | Lee | China | Asia | +| 1 | Jack | Ma | China | Asia | +| 0 | Wolfgang | Manche | Germany | Europe | +| 1 | Soraya | Jala | Germany | Europe | +``` + +## Write to Delta Lake +After doing your data processing in Dask, you can write the data back out to Delta Lake using `to_deltalake`: + +``` +ddt.to_deltalake(ddf, "tmp/test_write") +``` + +## Contribute to `dask-deltalake` +To contribute, go to the [`dask-deltalake` Github repository](https://github.com/rrpelgrim/dask-deltatable). \ No newline at end of file From 919930101fe35aa753b3f32bda0de86ec3ed8d49 Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Sat, 27 Jan 2024 16:02:20 +0100 Subject: [PATCH 07/11] feat: update table config to contain new config keys (#2127) # Description This PR updates the table config to contain newly defined table configuration keys. Mainly trying to keep things bite-sized for once in updating our check-point creation. --- crates/deltalake-core/src/table/config.rs | 48 +++++++++++++++++++++-- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/crates/deltalake-core/src/table/config.rs b/crates/deltalake-core/src/table/config.rs index 00255d0c1f..24b11a01a4 100644 --- a/crates/deltalake-core/src/table/config.rs +++ b/crates/deltalake-core/src/table/config.rs @@ -47,6 +47,11 @@ pub enum DeltaConfigKey { /// statistics beyond this number, even when such statistics exist). DataSkippingNumIndexedCols, + /// A comma-separated list of column names on which Delta Lake collects statistics to enhance + /// data skipping functionality. This property takes precedence over + /// [DataSkippingNumIndexedCols](Self::DataSkippingNumIndexedCols). + DataSkippingStatsColumns, + /// The shortest duration for Delta Lake to keep logically deleted data files before deleting /// them physically. This is to prevent failures in stale readers after compactions or partition overwrites. /// @@ -61,6 +66,9 @@ pub enum DeltaConfigKey { /// true to enable change data feed. EnableChangeDataFeed, + /// true to enable deletion vectors and predictive I/O for updates. + EnableDeletionVectors, + /// The degree to which a transaction must be isolated from modifications made by concurrent transactions. /// /// Valid values are `Serializable` and `WriteSerializable`. @@ -120,8 +128,10 @@ impl AsRef for DeltaConfigKey { Self::CheckpointPolicy => "delta.checkpointPolicy", Self::ColumnMappingMode => "delta.columnMapping.mode", Self::DataSkippingNumIndexedCols => "delta.dataSkippingNumIndexedCols", + Self::DataSkippingStatsColumns => "delta.dataSkippingStatsColumns", Self::DeletedFileRetentionDuration => "delta.deletedFileRetentionDuration", Self::EnableChangeDataFeed => "delta.enableChangeDataFeed", + Self::EnableDeletionVectors => "delta.enableDeletionVectors", Self::IsolationLevel => "delta.isolationLevel", Self::LogRetentionDuration => "delta.logRetentionDuration", Self::EnableExpiredLogCleanup => "delta.enableExpiredLogCleanup", @@ -150,10 +160,12 @@ impl FromStr for DeltaConfigKey { "delta.checkpointPolicy" => Ok(Self::CheckpointPolicy), "delta.columnMapping.mode" => Ok(Self::ColumnMappingMode), "delta.dataSkippingNumIndexedCols" => Ok(Self::DataSkippingNumIndexedCols), + "delta.dataSkippingStatsColumns" => Ok(Self::DataSkippingStatsColumns), "delta.deletedFileRetentionDuration" | "deletedFileRetentionDuration" => { Ok(Self::DeletedFileRetentionDuration) } "delta.enableChangeDataFeed" => Ok(Self::EnableChangeDataFeed), + "delta.enableDeletionVectors" => Ok(Self::EnableDeletionVectors), "delta.isolationLevel" => Ok(Self::IsolationLevel), "delta.logRetentionDuration" | "logRetentionDuration" => Ok(Self::LogRetentionDuration), "delta.enableExpiredLogCleanup" | "enableExpiredLogCleanup" => { @@ -180,9 +192,9 @@ pub enum DeltaConfigError { } macro_rules! table_config { - ($(($key:expr, $name:ident, $ret:ty, $default:literal),)*) => { + ($(($docs:literal, $key:expr, $name:ident, $ret:ty, $default:literal),)*) => { $( - /// read property $key + #[doc = $docs] pub fn $name(&self) -> $ret { self.0 .get($key.as_ref()) @@ -198,20 +210,29 @@ pub struct TableConfig<'a>(pub(crate) &'a HashMap>); impl<'a> TableConfig<'a> { table_config!( - (DeltaConfigKey::AppendOnly, append_only, bool, false), ( + "true for this Delta table to be append-only", + DeltaConfigKey::AppendOnly, + append_only, + bool, + false + ), + ( + "true for Delta Lake to write file statistics in checkpoints in JSON format for the stats column.", DeltaConfigKey::CheckpointWriteStatsAsJson, write_stats_as_json, bool, true ), ( + "true for Delta Lake to write file statistics to checkpoints in struct format", DeltaConfigKey::CheckpointWriteStatsAsStruct, write_stats_as_struct, bool, false ), ( + "The target file size in bytes or higher units for file tuning", DeltaConfigKey::TargetFileSize, target_file_size, i64, @@ -219,24 +240,37 @@ impl<'a> TableConfig<'a> { 104857600 ), ( + "true to enable change data feed.", DeltaConfigKey::EnableChangeDataFeed, enable_change_data_feed, bool, false ), ( + "true to enable deletion vectors and predictive I/O for updates.", + DeltaConfigKey::EnableDeletionVectors, + enable_deletio0n_vectors, + bool, + // in databricks the default is dependent on the workspace settings and runtime version + // https://learn.microsoft.com/en-us/azure/databricks/administration-guide/workspace-settings/deletion-vectors + false + ), + ( + "The number of columns for Delta Lake to collect statistics about for data skipping.", DeltaConfigKey::DataSkippingNumIndexedCols, num_indexed_cols, i32, 32 ), ( + "whether to cleanup expired logs", DeltaConfigKey::EnableExpiredLogCleanup, enable_expired_log_cleanup, bool, true ), ( + "Interval (number of commits) after which a new checkpoint should be created", DeltaConfigKey::CheckpointInterval, checkpoint_interval, i32, @@ -318,6 +352,14 @@ impl<'a> TableConfig<'a> { }) .collect() } + + /// Column names on which Delta Lake collects statistics to enhance data skipping functionality. + /// This property takes precedence over [num_indexed_cols](Self::num_indexed_cols). + pub fn stats_columns(&self) -> Option> { + self.0 + .get(DeltaConfigKey::DataSkippingStatsColumns.as_ref()) + .and_then(|o| o.as_ref().map(|v| v.split(',').collect())) + } } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] From 64b2547477177ea1e4f107c1f41652680d846516 Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Sat, 27 Jan 2024 18:15:00 +0100 Subject: [PATCH 08/11] feat: expose stats schema on Snapshot (#2128) # Description The new table config `delta.dataSkippingStatsColumns` changes the logic which columns should be considered when computing file statistics. Here we expose (and use) a new method on `Snapshot` which reflects the updated logic. This PR does not contain any tests for the new code paths, but I'll test this against the reference implementation once the new checkpoint writer lands. --- .../deltalake-core/src/kernel/snapshot/mod.rs | 101 ++++++++++++++---- .../src/kernel/snapshot/replay.rs | 71 +++--------- 2 files changed, 92 insertions(+), 80 deletions(-) diff --git a/crates/deltalake-core/src/kernel/snapshot/mod.rs b/crates/deltalake-core/src/kernel/snapshot/mod.rs index 005406ee89..715fb2feec 100644 --- a/crates/deltalake-core/src/kernel/snapshot/mod.rs +++ b/crates/deltalake-core/src/kernel/snapshot/mod.rs @@ -28,7 +28,7 @@ use object_store::ObjectStore; use self::log_segment::{CommitData, LogSegment, PathExt}; use self::parse::{read_adds, read_removes}; use self::replay::{LogMapper, LogReplayScanner, ReplayStream}; -use super::{Action, Add, CommitInfo, Metadata, Protocol, Remove}; +use super::{Action, Add, CommitInfo, DataType, Metadata, Protocol, Remove, StructField}; use crate::kernel::StructType; use crate::table::config::TableConfig; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; @@ -69,8 +69,7 @@ impl Snapshot { "Cannot read metadata from log segment".into(), )); }; - let metadata = metadata.unwrap(); - let protocol = protocol.unwrap(); + let (metadata, protocol) = (metadata.unwrap(), protocol.unwrap()); let schema = serde_json::from_str(&metadata.schema_string)?; Ok(Self { log_segment, @@ -212,12 +211,7 @@ impl Snapshot { &log_segment::CHECKPOINT_SCHEMA, &self.config, ); - ReplayStream::try_new( - log_stream, - checkpoint_stream, - &self.schema, - self.config.clone(), - ) + ReplayStream::try_new(log_stream, checkpoint_stream, &self) } /// Get the commit infos in the snapshot @@ -288,6 +282,59 @@ impl Snapshot { }) .boxed()) } + + /// Get the statistics schema of the snapshot + pub fn stats_schema(&self) -> DeltaResult { + let stats_fields = if let Some(stats_cols) = self.table_config().stats_columns() { + stats_cols + .iter() + .map(|col| match self.schema().field_with_name(col) { + Ok(field) => match field.data_type() { + DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => { + Err(DeltaTableError::Generic(format!( + "Stats column {} has unsupported type {}", + col, + field.data_type() + ))) + } + _ => Ok(StructField::new( + field.name(), + field.data_type().clone(), + true, + )), + }, + _ => Err(DeltaTableError::Generic(format!( + "Stats column {} not found in schema", + col + ))), + }) + .collect::, _>>()? + } else { + let num_indexed_cols = self.table_config().num_indexed_cols(); + self.schema() + .fields + .iter() + .enumerate() + .filter_map(|(idx, f)| match f.data_type() { + DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => None, + _ if num_indexed_cols < 0 || (idx as i32) < num_indexed_cols => { + Some(StructField::new(f.name(), f.data_type().clone(), true)) + } + _ => None, + }) + .collect() + }; + Ok(StructType::new(vec![ + StructField::new("numRecords", DataType::LONG, true), + StructField::new("minValues", StructType::new(stats_fields.clone()), true), + StructField::new("maxValues", StructType::new(stats_fields.clone()), true), + StructField::new( + "nullCount", + StructType::new(stats_fields.iter().filter_map(to_count_field).collect()), + true, + ), + ])) + } } /// A snapshot of a Delta table that has been eagerly loaded into memory. @@ -318,7 +365,7 @@ impl EagerSnapshot { let mut files = Vec::new(); let mut scanner = LogReplayScanner::new(); files.push(scanner.process_files_batch(&batch, true)?); - let mapper = LogMapper::try_new(snapshot.schema(), snapshot.config.clone())?; + let mapper = LogMapper::try_new(&snapshot)?; files = files .into_iter() .map(|b| mapper.map_batch(b)) @@ -357,16 +404,11 @@ impl EagerSnapshot { ) .boxed() }; - let mapper = LogMapper::try_new(self.snapshot.schema(), self.snapshot.config.clone())?; - let files = ReplayStream::try_new( - log_stream, - checkpoint_stream, - self.schema(), - self.snapshot.config.clone(), - )? - .map(|batch| batch.and_then(|b| mapper.map_batch(b))) - .try_collect() - .await?; + let mapper = LogMapper::try_new(&self.snapshot)?; + let files = ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot)? + .map(|batch| batch.and_then(|b| mapper.map_batch(b))) + .try_collect() + .await?; self.files = files; } @@ -473,7 +515,7 @@ impl EagerSnapshot { files.push(scanner.process_files_batch(&batch?, true)?); } - let mapper = LogMapper::try_new(self.snapshot.schema(), self.snapshot.config.clone())?; + let mapper = LogMapper::try_new(&self.snapshot)?; self.files = files .into_iter() .chain( @@ -496,6 +538,23 @@ impl EagerSnapshot { } } +fn to_count_field(field: &StructField) -> Option { + match field.data_type() { + DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => None, + DataType::Struct(s) => Some(StructField::new( + field.name(), + StructType::new( + s.fields() + .iter() + .filter_map(to_count_field) + .collect::>(), + ), + true, + )), + _ => Some(StructField::new(field.name(), DataType::LONG, true)), + } +} + #[cfg(feature = "datafusion")] mod datafusion { use datafusion_common::stats::Statistics; diff --git a/crates/deltalake-core/src/kernel/snapshot/replay.rs b/crates/deltalake-core/src/kernel/snapshot/replay.rs index 75c7967874..71408b27d5 100644 --- a/crates/deltalake-core/src/kernel/snapshot/replay.rs +++ b/crates/deltalake-core/src/kernel/snapshot/replay.rs @@ -21,9 +21,10 @@ use tracing::debug; use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName}; use crate::kernel::arrow::json; -use crate::kernel::{DataType, Schema, StructField, StructType}; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; +use super::Snapshot; + pin_project! { pub struct ReplayStream { scanner: LogReplayScanner, @@ -38,60 +39,12 @@ pin_project! { } } -fn to_count_field(field: &StructField) -> Option { - match field.data_type() { - DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => None, - DataType::Struct(s) => Some(StructField::new( - field.name(), - StructType::new( - s.fields() - .iter() - .filter_map(to_count_field) - .collect::>(), - ), - true, - )), - _ => Some(StructField::new(field.name(), DataType::LONG, true)), - } -} - -pub(super) fn get_stats_schema(table_schema: &StructType) -> DeltaResult { - let data_fields: Vec<_> = table_schema - .fields - .iter() - .enumerate() - .filter_map(|(idx, f)| match f.data_type() { - DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => None, - // TODO: the number of stats fields shopuld be configurable? - // or rather we should likely read all of we parse JSON? - _ if idx < 32 => Some(StructField::new(f.name(), f.data_type().clone(), true)), - _ => None, - }) - .collect(); - let stats_schema = StructType::new(vec![ - StructField::new("numRecords", DataType::LONG, true), - StructField::new("minValues", StructType::new(data_fields.clone()), true), - StructField::new("maxValues", StructType::new(data_fields.clone()), true), - StructField::new( - "nullCount", - StructType::new(data_fields.iter().filter_map(to_count_field).collect()), - true, - ), - ]); - Ok(std::sync::Arc::new((&stats_schema).try_into()?)) -} - impl ReplayStream { - pub(super) fn try_new( - commits: S, - checkpoint: S, - table_schema: &Schema, - config: DeltaTableConfig, - ) -> DeltaResult { - let stats_schema = get_stats_schema(table_schema)?; + pub(super) fn try_new(commits: S, checkpoint: S, snapshot: &Snapshot) -> DeltaResult { + let stats_schema = Arc::new((&snapshot.stats_schema()?).try_into()?); let mapper = Arc::new(LogMapper { stats_schema, - config, + config: snapshot.config.clone(), }); Ok(Self { commits, @@ -108,10 +61,10 @@ pub(super) struct LogMapper { } impl LogMapper { - pub(super) fn try_new(table_schema: &Schema, config: DeltaTableConfig) -> DeltaResult { + pub(super) fn try_new(snapshot: &Snapshot) -> DeltaResult { Ok(Self { - stats_schema: get_stats_schema(table_schema)?, - config, + stats_schema: Arc::new((&snapshot.stats_schema()?).try_into()?), + config: snapshot.config.clone(), }) } @@ -120,7 +73,7 @@ impl LogMapper { } } -pub(super) fn map_batch( +fn map_batch( batch: RecordBatch, stats_schema: ArrowSchemaRef, config: &DeltaTableConfig, @@ -135,7 +88,7 @@ pub(super) fn map_batch( Arc::new(json::parse_json(stats, stats_schema.clone(), config)?.into()); let schema = batch.schema(); let add_col = ex::extract_and_cast::(&batch, "add")?; - let add_idx = schema.column_with_name("add").unwrap(); + let (add_idx, _) = schema.column_with_name("add").unwrap(); let add_type = add_col .fields() .iter() @@ -162,9 +115,9 @@ pub(super) fn map_batch( true, )); let mut fields = schema.fields().to_vec(); - let _ = std::mem::replace(&mut fields[add_idx.0], new_add_field); + let _ = std::mem::replace(&mut fields[add_idx], new_add_field); let mut columns = batch.columns().to_vec(); - let _ = std::mem::replace(&mut columns[add_idx.0], new_add); + let _ = std::mem::replace(&mut columns[add_idx], new_add); return Ok(RecordBatch::try_new( Arc::new(ArrowSchema::new(fields)), columns, From c954d61b77d1069b10963d0a298f6e4c1fdc8051 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sat, 27 Jan 2024 16:05:47 -0500 Subject: [PATCH 09/11] fix: order logical schema to match physical schema (#2129) # Description When using logical plans with DF, the order & location of partitioned columns did not match with physical schema. This would cause errors when logical relations were converted to physical relations. # Related Issue(s) - closes #2104 --- .../src/delta_datafusion/mod.rs | 38 +++++++++++++++---- crates/deltalake-core/src/table/mod.rs | 1 - 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 443d5ef4b5..7f13f6f27a 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -141,14 +141,29 @@ pub(crate) fn register_store(store: LogStoreRef, env: Arc) { env.register_object_store(url, store.object_store()); } -pub(crate) fn logical_schema( +/// The logical schema for a Deltatable is different then protocol level schema since partiton columns must appear at the end of the schema. +/// This is to align with how partition are handled at the physical level +pub(crate) fn df_logical_schema( snapshot: &DeltaTableState, scan_config: &DeltaScanConfig, ) -> DeltaResult { let input_schema = snapshot.arrow_schema()?; - let mut fields = Vec::new(); - for field in input_schema.fields.iter() { - fields.push(field.to_owned()); + let table_partition_cols = &snapshot.metadata().partition_columns; + + let mut fields: Vec> = input_schema + .fields() + .iter() + .filter(|f| !table_partition_cols.contains(f.name())) + .cloned() + .collect(); + + for partition_col in table_partition_cols.iter() { + fields.push(Arc::new( + input_schema + .field_with_name(partition_col) + .unwrap() + .to_owned(), + )); } if let Some(file_column_name) = &scan_config.file_column_name { @@ -309,7 +324,7 @@ impl<'a> DeltaScanBuilder<'a> { .await? } }; - let logical_schema = logical_schema(self.snapshot, &config)?; + let logical_schema = df_logical_schema(self.snapshot, &config)?; let logical_schema = if let Some(used_columns) = self.projection { let mut fields = vec![]; @@ -501,7 +516,7 @@ impl DeltaTableProvider { config: DeltaScanConfig, ) -> DeltaResult { Ok(DeltaTableProvider { - schema: logical_schema(&snapshot, &config)?, + schema: df_logical_schema(&snapshot, &config)?, snapshot, log_store, config, @@ -1197,7 +1212,7 @@ pub(crate) async fn find_files_scan<'a>( } .build(snapshot)?; - let logical_schema = logical_schema(snapshot, &scan_config)?; + let logical_schema = df_logical_schema(snapshot, &scan_config)?; // Identify which columns we need to project let mut used_columns = expression @@ -1750,9 +1765,17 @@ mod tests { let provider = DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config) .unwrap(); + let logical_schema = provider.schema(); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(provider)).unwrap(); + let expected_logical_order = vec!["value", "modified", "id"]; + let actual_order: Vec = logical_schema + .fields() + .iter() + .map(|f| f.name().to_owned()) + .collect(); + let df = ctx.sql("select * from test").await.unwrap(); let actual = df.collect().await.unwrap(); let expected = vec![ @@ -1766,6 +1789,7 @@ mod tests { "+-------+------------+----+", ]; assert_batches_sorted_eq!(&expected, &actual); + assert_eq!(expected_logical_order, actual_order); } #[tokio::test] diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index ad3133a112..ad260295ba 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -425,7 +425,6 @@ impl DeltaTable { &self, filters: &[PartitionFilter], ) -> Result, DeltaTableError> { - println!("get_files_by_partitions ----------->"); Ok(self .get_active_add_actions_by_partitions(filters)? .collect::, _>>()? From 7fbc02b3a739615d3e4d31b5056e512a69bb3c11 Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Sat, 27 Jan 2024 17:51:30 -0600 Subject: [PATCH 10/11] =?UTF-8?q?fix:=20do=20not=20write=20empty=20parquet?= =?UTF-8?q?=20file/add=20on=20writer=20close;=20accurately=20=E2=80=A6=20(?= =?UTF-8?q?#2123)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …track unflushed row group size # Description When writing a batch with the writer, if that batch would result in a flush and no more batches follow, then on close an empty parquet file (no rows) is written. This includes an Add in the log that looks like ``` Add { path: "part-00002-3da49db6-e5e9-4426-8839-0092a56cc155-c000.parquet", partition_values: {}, size: 346, modification_time: 1706297596165, data_change: true, stats: Some( "{\"numRecords\":0,\"minValues\":{},\"maxValues\":{},\"nullCount\":{}}", ), tags: None, deletion_vector: None, base_row_id: None, default_row_commit_version: None, clustering_provider: None, stats_parsed: None, }, ``` The empty stats structs causes issues with scans in datafusion. Also changed it so the writer tracking internal buffers includes the parquet writer buffering within its row group writer. --- .../deltalake-core/src/operations/writer.rs | 72 +++++++++++++++++-- 1 file changed, 66 insertions(+), 6 deletions(-) diff --git a/crates/deltalake-core/src/operations/writer.rs b/crates/deltalake-core/src/operations/writer.rs index d1249f1766..5d8808fa3c 100644 --- a/crates/deltalake-core/src/operations/writer.rs +++ b/crates/deltalake-core/src/operations/writer.rs @@ -322,6 +322,11 @@ impl PartitionWriter { // replace counter / buffers and close the current writer let (writer, buffer) = self.reset_writer()?; let metadata = writer.close()?; + // don't write empty file + if metadata.num_rows == 0 { + return Ok(()); + } + let buffer = match buffer.into_inner() { Some(buffer) => Bytes::from(buffer), None => return Ok(()), // Nothing to write @@ -367,8 +372,12 @@ impl PartitionWriter { let length = usize::min(self.config.write_batch_size, max_offset - offset); self.write_batch(&batch.slice(offset, length))?; // flush currently buffered data to disk once we meet or exceed the target file size. - if self.buffer.len() >= self.config.target_file_size { - debug!("Writing file with size {:?} to disk.", self.buffer.len()); + let estimated_size = self.buffer.len() + self.arrow_writer.in_progress_size(); + if estimated_size >= self.config.target_file_size { + debug!( + "Writing file with estimated size {:?} to disk.", + estimated_size + ); self.flush_arrow_writer().await?; } } @@ -401,7 +410,7 @@ mod tests { let batch = get_record_batch(None, false); // write single un-partitioned batch - let mut writer = get_writer(object_store.clone(), &batch, None, None); + let mut writer = get_writer(object_store.clone(), &batch, None, None, None); writer.write(&batch).await.unwrap(); let files = list(object_store.as_ref(), None).await.unwrap(); assert_eq!(files.len(), 0); @@ -433,8 +442,8 @@ mod tests { let properties = WriterProperties::builder() .set_max_row_group_size(1024) .build(); - // configure small target file size and and row group size so we can observe multiple files written - let mut writer = get_writer(object_store, &batch, Some(properties), Some(10_000)); + // configure small target file size and row group size so we can observe multiple files written + let mut writer = get_writer(object_store, &batch, Some(properties), Some(10_000), None); writer.write(&batch).await.unwrap(); // check that we have written more then once file, and no more then 1 is below target size @@ -446,18 +455,69 @@ mod tests { assert!(target_file_count >= adds.len() as i32 - 1) } + #[tokio::test] + async fn test_unflushed_row_group_size() { + let base_int = Arc::new(Int32Array::from((0..10000).collect::>())); + let base_str = Arc::new(StringArray::from(vec!["A"; 10000])); + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ])); + let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap(); + + let object_store = DeltaTableBuilder::from_uri("memory://") + .build_storage() + .unwrap() + .object_store(); + // configure small target file size so we can observe multiple files written + let mut writer = get_writer(object_store, &batch, None, Some(10_000), None); + writer.write(&batch).await.unwrap(); + + // check that we have written more then once file, and no more then 1 is below target size + let adds = writer.close().await.unwrap(); + assert!(adds.len() > 1); + let target_file_count = adds + .iter() + .fold(0, |acc, add| acc + (add.size > 10_000) as i32); + assert!(target_file_count >= adds.len() as i32 - 1) + } + + #[tokio::test] + async fn test_do_not_write_empty_file_on_close() { + let base_int = Arc::new(Int32Array::from((0..10000 as i32).collect::>())); + let base_str = Arc::new(StringArray::from(vec!["A"; 10000])); + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ])); + let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap(); + + let object_store = DeltaTableBuilder::from_uri("memory://") + .build_storage() + .unwrap() + .object_store(); + // configure high batch size and low file size to observe one file written and flushed immediately + // upon writing batch, then ensures the buffer is empty upon closing writer + let mut writer = get_writer(object_store, &batch, None, Some(9000), Some(10000)); + writer.write(&batch).await.unwrap(); + + let adds = writer.close().await.unwrap(); + assert!(adds.len() == 1); + } + fn get_writer( object_store: ObjectStoreRef, batch: &RecordBatch, writer_properties: Option, target_file_size: Option, + write_batch_size: Option, ) -> PartitionWriter { let config = PartitionWriterConfig::try_new( batch.schema(), BTreeMap::new(), writer_properties, target_file_size, - None, + write_batch_size, ) .unwrap(); PartitionWriter::try_with_config(object_store, config).unwrap() From 0f6790fd90be869990f55379514339eb09280ba3 Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Sat, 27 Jan 2024 20:13:09 -0600 Subject: [PATCH 11/11] fix: prevent empty stats struct during parquet write (#2125) # Description When building the arrow schema for delta checkpoints, List, Map, and Binary max/min stats are not collected. If you have a Struct column with only a List Map, or Binary field, then the arrow schema gets an empty Struct. Parquet writer fails with this: ``` ParquetParseError { source: ArrowError("Parquet does not support writing empty structs") } ``` --- crates/deltalake-core/src/kernel/arrow/mod.rs | 12 +++--- .../src/protocol/checkpoints.rs | 40 ++++++++++++++++++- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/crates/deltalake-core/src/kernel/arrow/mod.rs b/crates/deltalake-core/src/kernel/arrow/mod.rs index bfbfb76b7b..ab121ee8a6 100644 --- a/crates/deltalake-core/src/kernel/arrow/mod.rs +++ b/crates/deltalake-core/src/kernel/arrow/mod.rs @@ -564,11 +564,13 @@ fn max_min_schema_for_fields(dest: &mut Vec, f: &ArrowField) { max_min_schema_for_fields(&mut child_dest, f); } - dest.push(ArrowField::new( - f.name(), - ArrowDataType::Struct(child_dest.into()), - true, - )); + if !child_dest.is_empty() { + dest.push(ArrowField::new( + f.name(), + ArrowDataType::Struct(child_dest.into()), + true, + )); + } } // don't compute min or max for list, map or binary types ArrowDataType::List(_) | ArrowDataType::Map(_, _) | ArrowDataType::Binary => { /* noop */ } diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index e4a155e477..b6787b9b31 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -522,7 +522,8 @@ fn apply_stats_conversion( mod tests { use std::sync::Arc; - use arrow_array::{ArrayRef, RecordBatch}; + use arrow_array::builder::{Int32Builder, ListBuilder, StructBuilder}; + use arrow_array::{ArrayRef, Int32Array, RecordBatch}; use arrow_schema::Schema as ArrowSchema; use chrono::Duration; use lazy_static::lazy_static; @@ -903,6 +904,43 @@ mod tests { ); } + #[tokio::test] + async fn test_struct_with_single_list_field() { + // you need another column otherwise the entire stats struct is empty + // which also fails parquet write during checkpoint + let other_column_array: ArrayRef = Arc::new(Int32Array::from(vec![1])); + + let mut list_item_builder = Int32Builder::new(); + list_item_builder.append_value(1); + + let mut list_in_struct_builder = ListBuilder::new(list_item_builder); + list_in_struct_builder.append(true); + + let mut struct_builder = StructBuilder::new( + vec![arrow_schema::Field::new( + "list_in_struct", + arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new( + "item", + arrow_schema::DataType::Int32, + true, + ))), + true, + )], + vec![Box::new(list_in_struct_builder)], + ); + struct_builder.append(true); + + let struct_with_list_array: ArrayRef = Arc::new(struct_builder.finish()); + let batch = RecordBatch::try_from_iter(vec![ + ("other_column", other_column_array), + ("struct_with_list", struct_with_list_array), + ]) + .unwrap(); + let table = DeltaOps::new_in_memory().write(vec![batch]).await.unwrap(); + + create_checkpoint(&table).await.unwrap(); + } + lazy_static! { static ref SCHEMA: Value = json!({ "type": "struct",