diff --git a/crates/core/src/kernel/mod.rs b/crates/core/src/kernel/mod.rs index bcd9abbd15..afce2a1f4a 100644 --- a/crates/core/src/kernel/mod.rs +++ b/crates/core/src/kernel/mod.rs @@ -8,6 +8,7 @@ pub mod error; pub mod models; pub mod scalars; mod snapshot; +pub mod snapshot_next; pub use error::*; pub use models::*; diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 38d1dc570d..2938b3d3db 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -45,7 +45,6 @@ pub use self::log_data::*; mod log_data; pub(crate) mod log_segment; -mod next; pub(crate) mod parse; mod replay; mod serde; diff --git a/crates/core/src/kernel/snapshot/next.rs b/crates/core/src/kernel/snapshot/next.rs deleted file mode 100644 index 56127d21c0..0000000000 --- a/crates/core/src/kernel/snapshot/next.rs +++ /dev/null @@ -1,624 +0,0 @@ -//! Snapshot of a Delta Table at a specific version. -//! -use std::collections::HashSet; -use std::sync::{Arc, LazyLock}; - -use arrow::compute::{concat_batches, filter_record_batch}; -use arrow_arith::boolean::{and, is_null, not}; -use arrow_array::cast::AsArray; -use arrow_array::types::Int64Type; -use arrow_array::{Array, BooleanArray, RecordBatch}; -use arrow_cast::pretty::print_batches; -use chrono::{DateTime, Utc}; -use delta_kernel::actions::set_transaction::{SetTransactionMap, SetTransactionScanner}; -use delta_kernel::actions::visitors::{ - AddVisitor, CdcVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, SetTransactionVisitor, -}; -use delta_kernel::actions::{ - get_log_add_schema, get_log_schema, ADD_NAME, CDC_NAME, METADATA_NAME, PROTOCOL_NAME, - REMOVE_NAME, SET_TRANSACTION_NAME, -}; -use delta_kernel::actions::{Metadata, Protocol, SetTransaction}; -use delta_kernel::engine::arrow_data::ArrowEngineData; -use delta_kernel::engine::default::executor::tokio::{ - TokioBackgroundExecutor, TokioMultiThreadExecutor, -}; -use delta_kernel::engine::default::DefaultEngine; -use delta_kernel::engine_data::{GetData, RowVisitor, TypedGetData as _}; -use delta_kernel::expressions::{Scalar, StructData}; -use delta_kernel::log_segment::LogSegment; -use delta_kernel::scan::log_replay::scan_action_iter; -use delta_kernel::scan::scan_row_schema; -use delta_kernel::schema::{DataType, Schema, StructField, StructType}; -use delta_kernel::snapshot::Snapshot as SnapshotInner; -use delta_kernel::table_properties::TableProperties; -use delta_kernel::{ - DeltaResult as KernelResult, Engine, EngineData, Expression, ExpressionHandler, ExpressionRef, - Table, Version, -}; -use itertools::Itertools; -use object_store::path::Path; -use object_store::ObjectStore; -use tracing::warn; -use url::Url; - -use crate::kernel::scalars::ScalarExt; -use crate::kernel::{ActionType, ARROW_HANDLER}; -use crate::storage::cache::CommitCacheObjectStore; -use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; - -type ReplayIter = Box, Vec)>>>; - -type LocalFileSystem = CommitCacheObjectStore; - -#[derive(thiserror::Error, Debug)] -enum SnapshotError { - #[error("Snapshot not initialized for action type: {0}")] - MissingData(String), -} - -impl SnapshotError { - fn missing_data(action: ActionType) -> Self { - Self::MissingData(action.field_name_unckecked().to_string()) - } -} - -impl From for DeltaTableError { - fn from(e: SnapshotError) -> Self { - match &e { - SnapshotError::MissingData(_) => DeltaTableError::generic(e), - } - } -} - -impl ActionType { - pub(self) fn field_name_unckecked(&self) -> &'static str { - match self { - Self::Metadata => METADATA_NAME, - Self::Protocol => PROTOCOL_NAME, - Self::Remove => REMOVE_NAME, - Self::Add => ADD_NAME, - Self::Txn => SET_TRANSACTION_NAME, - Self::Cdc => CDC_NAME, - _ => panic!(), - } - } - - pub(self) fn field_name(&self) -> DeltaResult<&'static str> { - let name = match self { - Self::Metadata => METADATA_NAME, - Self::Protocol => PROTOCOL_NAME, - Self::Remove => REMOVE_NAME, - Self::Add => ADD_NAME, - Self::Txn => SET_TRANSACTION_NAME, - Self::Cdc => CDC_NAME, - _ => { - return Err(DeltaTableError::generic(format!( - "unsupported action type: {self:?}" - ))) - } - }; - Ok(name) - } -} - -#[derive(Clone)] -pub struct Snapshot { - inner: Arc, - engine: Arc, -} - -impl Snapshot { - /// Create a new [`Snapshot`] instance. - pub fn new(inner: Arc, engine: Arc) -> Self { - Self { inner, engine } - } - - /// Create a new [`Snapshot`] instance for a table. - pub async fn try_new( - table: Table, - store: Arc, - version: impl Into>, - ) -> DeltaResult { - // TODO: how to deal with the dedicated IO runtime? Would this already be covered by the - // object store implementation pass to this? - let table_root = Path::from_url_path(table.location().path())?; - let store_str = format!("{}", store); - let is_local = store_str.starts_with("LocalFileSystem"); - let store = Arc::new(CommitCacheObjectStore::new(store)); - let handle = tokio::runtime::Handle::current(); - let engine: Arc = match handle.runtime_flavor() { - tokio::runtime::RuntimeFlavor::MultiThread => Arc::new(DefaultEngine::new_with_opts( - store, - table_root, - Arc::new(TokioMultiThreadExecutor::new(handle)), - !is_local, - )), - tokio::runtime::RuntimeFlavor::CurrentThread => Arc::new(DefaultEngine::new_with_opts( - store, - table_root, - Arc::new(TokioBackgroundExecutor::new()), - !is_local, - )), - _ => return Err(DeltaTableError::generic("unsupported runtime flavor")), - }; - - let snapshot = table.snapshot(engine.as_ref(), version.into())?; - Ok(Self::new(Arc::new(snapshot), engine)) - } - - pub(crate) fn engine_ref(&self) -> &Arc { - &self.engine - } - - pub fn table_root(&self) -> &Url { - &self.inner.table_root() - } - - pub fn version(&self) -> Version { - self.inner.version() - } - - pub fn schema(&self) -> &Schema { - self.inner.schema() - } - - pub fn protocol(&self) -> &Protocol { - self.inner.protocol() - } - - pub fn metadata(&self) -> &Metadata { - self.inner.metadata() - } - - pub fn table_properties(&self) -> &TableProperties { - &self.inner.table_properties() - } - - /// Get the timestamp of the given version in miliscends since epoch. - /// - /// Extracts the timestamp from the commit file of the given version - /// from the current log segment. If the commit file is not part of the - /// current log segment, `None` is returned. - pub fn version_timestamp(&self, version: Version) -> Option { - self.inner - ._log_segment() - .ascending_commit_files - .iter() - .find(|f| f.version == version) - .map(|f| f.location.last_modified) - } - - /// read all active files from the log - pub(crate) fn files( - &self, - predicate: Option>, - ) -> DeltaResult>> { - let scan = self - .inner - .clone() - .scan_builder() - .with_predicate(predicate) - .build()?; - Ok(scan.scan_data(self.engine.as_ref())?.map(|res| { - res.and_then(|(data, mut predicate)| { - let batch: RecordBatch = ArrowEngineData::try_from_engine_data(data)?.into(); - if predicate.len() < batch.num_rows() { - predicate - .extend(std::iter::repeat(true).take(batch.num_rows() - predicate.len())); - } - Ok(filter_record_batch(&batch, &BooleanArray::from(predicate))?) - }) - })) - } - - pub(crate) fn tombstones(&self) -> DeltaResult>> { - static META_PREDICATE: LazyLock> = LazyLock::new(|| { - Some(Arc::new( - Expression::column([REMOVE_NAME, "path"]).is_not_null(), - )) - }); - let read_schema = get_log_schema().project(&[REMOVE_NAME])?; - Ok(self - .inner - ._log_segment() - .replay( - self.engine.as_ref(), - read_schema.clone(), - read_schema, - META_PREDICATE.clone(), - )? - .map_ok(|(d, _)| Ok(RecordBatch::from(ArrowEngineData::try_from_engine_data(d)?))) - .flatten()) - } - - /// Scan the Delta Log to obtain the latest transaction for all applications - /// - /// This method requires a full scan of the log to find all transactions. - /// When a specific application id is requested, it is much more efficient to use - /// [`application_transaction`](Self::application_transaction) instead. - pub fn application_transactions(&self) -> DeltaResult { - let scanner = SetTransactionScanner::new(self.inner.clone()); - Ok(scanner.application_transactions(self.engine.as_ref())?) - } - - /// Scan the Delta Log for the latest transaction entry for a specific application. - /// - /// Initiates a log scan, but terminates as soon as the transaction - /// for the given application is found. - pub fn application_transaction( - &self, - app_id: impl AsRef, - ) -> DeltaResult> { - let scanner = SetTransactionScanner::new(self.inner.clone()); - Ok(scanner.application_transaction(self.engine.as_ref(), app_id.as_ref())?) - } -} - -#[derive(Clone)] -pub struct EagerSnapshot { - snapshot: Snapshot, - files: Option, - predicate: Option>, -} - -impl EagerSnapshot { - /// Create a new [`EagerSnapshot`] instance tracking actions of the given types. - /// - /// Only actions supplied by `tracked_actions` will be loaded into memory. - /// This is useful when only a subset of actions are needed. `Add` and `Remove` actions - /// are treated specially. I.e. `Add` and `Remove` will be loaded as well. - pub async fn try_new_with_actions( - table_root: impl AsRef, - store: Arc, - config: DeltaTableConfig, - version: impl Into>, - tracked_actions: HashSet, - predicate: Option>, - ) -> DeltaResult { - let snapshot = Snapshot::try_new(Table::try_from_uri(table_root)?, store, version).await?; - let files = config - .require_files - .then(|| -> DeltaResult<_> { Ok(replay_file_actions(&snapshot)?) }) - .transpose()?; - Ok(Self { - snapshot, - files, - predicate, - }) - } - - pub fn version(&self) -> Version { - self.snapshot.version() - } - - pub fn schema(&self) -> &Schema { - self.snapshot.schema() - } - - pub fn protocol(&self) -> &Protocol { - self.snapshot.protocol() - } - - pub fn metadata(&self) -> &Metadata { - self.snapshot.metadata() - } - - pub fn table_properties(&self) -> &TableProperties { - &self.snapshot.table_properties() - } - - pub fn files(&self) -> DeltaResult> { - Ok(LogicalFileView { - files: self - .files - .clone() - .ok_or_else(|| SnapshotError::missing_data(ActionType::Add))?, - index: 0, - }) - } - - /// Get the number of files in the current snapshot - pub fn files_count(&self) -> DeltaResult { - Ok(self - .files - .as_ref() - .map(|f| f.num_rows()) - .ok_or_else(|| SnapshotError::missing_data(ActionType::Add))?) - } - - pub fn tombstones(&self) -> DeltaResult>> { - self.snapshot.tombstones() - } - - /// Scan the Delta Log to obtain the latest transaction for all applications - /// - /// This method requires a full scan of the log to find all transactions. - /// When a specific application id is requested, it is much more efficient to use - /// [`application_transaction`](Self::application_transaction) instead. - pub fn application_transactions(&self) -> DeltaResult { - self.snapshot.application_transactions() - } - - /// Scan the Delta Log for the latest transaction entry for a specific application. - /// - /// Initiates a log scan, but terminates as soon as the transaction - /// for the given application is found. - pub fn application_transaction( - &self, - app_id: impl AsRef, - ) -> DeltaResult> { - self.snapshot.application_transaction(app_id) - } - - pub(crate) fn update(&mut self) -> DeltaResult<()> { - let state = self - .files - .as_ref() - .ok_or(SnapshotError::missing_data(ActionType::Add))? - .clone(); - - let log_root = self.snapshot.table_root().join("_delta_log/").unwrap(); - let fs_client = self.snapshot.engine.get_file_system_client(); - let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?; - let checkpoint_read_schema = get_log_add_schema().clone(); - - let segment = - LogSegment::for_table_changes(fs_client.as_ref(), log_root, self.version() + 1, None)?; - let slice_iter = segment - .replay( - self.snapshot.engine.as_ref(), - commit_read_schema, - checkpoint_read_schema, - None, - )? - .chain(std::iter::once(Ok(( - Box::new(ArrowEngineData::from(state)) as Box, - false, - )))); - - let res = scan_action_iter(self.snapshot.engine.as_ref(), slice_iter, None) - .map(|res| { - res.and_then(|(d, sel)| { - let batch = RecordBatch::from(ArrowEngineData::try_from_engine_data(d)?); - Ok(filter_record_batch(&batch, &BooleanArray::from(sel))?) - }) - }) - .collect::, _>>()?; - - self.files = Some(concat_batches(res[0].schema_ref(), &res)?); - - Ok(()) - } -} - -fn replay_file_actions(snapshot: &Snapshot) -> DeltaResult { - let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?; - let checkpoint_read_schema = get_log_add_schema().clone(); - - let curr_data = snapshot - .inner - ._log_segment() - .replay( - snapshot.engine.as_ref(), - commit_read_schema.clone(), - checkpoint_read_schema.clone(), - None, - )? - .map_ok( - |(data, flag)| -> Result<(RecordBatch, bool), delta_kernel::Error> { - Ok((ArrowEngineData::try_from_engine_data(data)?.into(), flag)) - }, - ) - .flatten() - .collect::, _>>()?; - - let scan_iter = curr_data.clone().into_iter().map(|(data, flag)| { - Ok(( - Box::new(ArrowEngineData::new(data.clone())) as Box, - flag, - )) - }); - - let res = scan_action_iter(snapshot.engine.as_ref(), scan_iter, None) - .map(|res| { - res.and_then(|(d, selection)| { - Ok(( - RecordBatch::from(ArrowEngineData::try_from_engine_data(d)?), - selection, - )) - }) - }) - .zip(curr_data.into_iter()) - .map(|(scan_res, (data_raw, _))| match scan_res { - Ok((_, selection)) => { - let data = filter_record_batch(&data_raw, &BooleanArray::from(selection))?; - Ok(data.project(&[0])?) - } - Err(e) => Err(e), - }) - .collect::, _>>()?; - - Ok(concat_batches(res[0].schema_ref(), &res)?) -} - -/// Helper trait to extract individual values from a `StructData`. -pub trait StructDataExt { - fn get(&self, key: &str) -> Option<&Scalar>; -} - -impl StructDataExt for StructData { - fn get(&self, key: &str) -> Option<&Scalar> { - self.fields() - .iter() - .zip(self.values().iter()) - .find(|(k, _)| k.name() == key) - .map(|(_, v)| v) - } -} - -#[derive(Clone)] -pub struct LogicalFileView { - files: RecordBatch, - index: usize, -} - -impl LogicalFileView { - /// Path of the file. - pub fn path(&self) -> &str { - self.files.column(0).as_string::().value(self.index) - } - - /// Size of the file in bytes. - pub fn size(&self) -> i64 { - self.files - .column(1) - .as_primitive::() - .value(self.index) - } - - /// Modification time of the file in milliseconds since epoch. - pub fn modification_time(&self) -> i64 { - self.files - .column(2) - .as_primitive::() - .value(self.index) - } - - /// Datetime of the last modification time of the file. - pub fn modification_datetime(&self) -> DeltaResult> { - DateTime::from_timestamp_millis(self.modification_time()).ok_or(DeltaTableError::from( - crate::protocol::ProtocolError::InvalidField(format!( - "invalid modification_time: {:?}", - self.modification_time() - )), - )) - } - - pub fn stats(&self) -> Option<&str> { - let col = self.files.column(3).as_string::(); - col.is_valid(self.index).then(|| col.value(self.index)) - } - - pub fn partition_values(&self) -> Option { - self.files - .column_by_name("fileConstantValues") - .and_then(|col| col.as_struct_opt()) - .and_then(|s| s.column_by_name("partitionValues")) - .and_then(|arr| { - arr.is_valid(self.index) - .then(|| match Scalar::from_array(arr, self.index) { - Some(Scalar::Struct(s)) => Some(s), - _ => None, - }) - .flatten() - }) - } -} - -impl Iterator for LogicalFileView { - type Item = LogicalFileView; - - fn next(&mut self) -> Option { - if self.index < self.files.num_rows() { - let file = LogicalFileView { - files: self.files.clone(), - index: self.index, - }; - self.index += 1; - Some(file) - } else { - None - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use deltalake_test::acceptance::{read_dat_case, TestCaseInfo}; - use deltalake_test::TestResult; - use std::path::PathBuf; - - fn get_dat_dir() -> PathBuf { - let d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - let mut rep_root = d - .parent() - .and_then(|p| p.parent()) - .expect("valid directory") - .to_path_buf(); - rep_root.push("dat/out/reader_tests/generated"); - rep_root - } - - async fn load_snapshot() -> TestResult<()> { - // some comment - let mut dat_dir = get_dat_dir(); - dat_dir.push("multi_partitioned"); - - let dat_info: TestCaseInfo = read_dat_case(dat_dir)?; - let table_info = dat_info.table_summary()?; - - let table = Table::try_from_uri(dat_info.table_root()?)?; - - let snapshot = Snapshot::try_new( - table, - Arc::new(object_store::local::LocalFileSystem::default()), - None, - ) - .await?; - - assert_eq!(snapshot.version(), table_info.version); - assert_eq!( - ( - snapshot.protocol().min_reader_version(), - snapshot.protocol().min_writer_version() - ), - (table_info.min_reader_version, table_info.min_writer_version) - ); - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread")] - async fn load_snapshot_multi() -> TestResult<()> { - load_snapshot().await - } - - #[tokio::test(flavor = "current_thread")] - async fn load_snapshot_current() -> TestResult<()> { - load_snapshot().await - } - - #[tokio::test] - async fn load_eager_snapshot() -> TestResult<()> { - let mut dat_dir = get_dat_dir(); - dat_dir.push("multi_partitioned"); - - let dat_info: TestCaseInfo = read_dat_case(dat_dir)?; - let table_info = dat_info.table_summary()?; - - let table = Table::try_from_uri(dat_info.table_root()?)?; - - let mut snapshot = EagerSnapshot::try_new_with_actions( - table.location(), - Arc::new(object_store::local::LocalFileSystem::default()), - Default::default(), - Some(1), - Default::default(), - None, - ) - .await?; - - // assert_eq!(snapshot.version(), table_info.version); - // assert_eq!( - // snapshot.protocol().min_reader_version(), - // table_info.min_reader_version - // ); - - snapshot.update()?; - - Ok(()) - } -} diff --git a/crates/core/src/storage/cache.rs b/crates/core/src/kernel/snapshot_next/cache.rs similarity index 95% rename from crates/core/src/storage/cache.rs rename to crates/core/src/kernel/snapshot_next/cache.rs index eb6b5bd785..594d599942 100644 --- a/crates/core/src/storage/cache.rs +++ b/crates/core/src/kernel/snapshot_next/cache.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::stream::BoxStream; -use futures::{StreamExt, TryStreamExt}; +use futures::StreamExt; use object_store::path::Path; use object_store::{ Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, @@ -46,11 +46,10 @@ impl Entry { /// the object store are immutable and no attempt is made to invalidate the cache /// when files are updated in the remote object store. #[derive(Clone)] -pub(crate) struct CommitCacheObjectStore { +pub(super) struct CommitCacheObjectStore { inner: Arc, check: Arc bool + Send + Sync>, cache: Arc>, - has_ordered_listing: bool, } impl std::fmt::Debug for CommitCacheObjectStore { @@ -75,13 +74,10 @@ fn cache_json(path: &Path) -> bool { impl CommitCacheObjectStore { /// Create a new conditionally cached object store. pub fn new(inner: Arc) -> Self { - let store_str = format!("{}", inner); - let is_local = store_str.starts_with("LocalFileSystem"); Self { inner, check: Arc::new(cache_json), cache: Arc::new(Cache::new(100)), - has_ordered_listing: !is_local, } } diff --git a/crates/core/src/kernel/snapshot_next/eager.rs b/crates/core/src/kernel/snapshot_next/eager.rs new file mode 100644 index 0000000000..88306b8e49 --- /dev/null +++ b/crates/core/src/kernel/snapshot_next/eager.rs @@ -0,0 +1,263 @@ +use std::sync::Arc; + +use arrow::compute::{concat_batches, filter_record_batch}; +use arrow_array::{BooleanArray, RecordBatch}; +use chrono::format::Item; +use delta_kernel::actions::set_transaction::SetTransactionMap; +use delta_kernel::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME}; +use delta_kernel::actions::{Add, Metadata, Protocol, SetTransaction}; +use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::log_segment::LogSegment; +use delta_kernel::scan::log_replay::scan_action_iter; +use delta_kernel::schema::Schema; +use delta_kernel::table_properties::TableProperties; +use delta_kernel::{EngineData, Expression, Table, Version}; +use itertools::Itertools; +use object_store::ObjectStore; +use url::Url; + +use super::iterators::{AddIterator, AddView, AddViewItem}; +use super::lazy::LazySnapshot; +use super::{Snapshot, SnapshotError}; +use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; + +/// An eager snapshot of a Delta Table at a specific version. +/// +/// This snapshot loads some log data eagerly and keeps it in memory. +#[derive(Clone)] +pub struct EagerSnapshot { + snapshot: LazySnapshot, + files: Option, + predicate: Option>, +} + +impl Snapshot for EagerSnapshot { + fn table_root(&self) -> &Url { + self.snapshot.table_root() + } + + fn version(&self) -> Version { + self.snapshot.version() + } + + fn schema(&self) -> &Schema { + self.snapshot.schema() + } + + fn protocol(&self) -> &Protocol { + self.snapshot.protocol() + } + + fn metadata(&self) -> &Metadata { + self.snapshot.metadata() + } + + fn table_properties(&self) -> &TableProperties { + self.snapshot.table_properties() + } + + fn files(&self) -> DeltaResult>> { + Ok(std::iter::once(Ok(self + .files + .clone() + .ok_or(SnapshotError::FilesNotInitialized)?))) + } + + fn tombstones(&self) -> DeltaResult>> { + self.snapshot.tombstones() + } + + fn application_transactions(&self) -> DeltaResult { + self.snapshot.application_transactions() + } + + fn application_transaction( + &self, + app_id: impl AsRef, + ) -> DeltaResult> { + self.snapshot.application_transaction(app_id) + } +} + +impl EagerSnapshot { + /// Create a new [`EagerSnapshot`] instance + pub async fn try_new( + table_root: impl AsRef, + store: Arc, + config: DeltaTableConfig, + version: impl Into>, + predicate: impl Into>>, + ) -> DeltaResult { + let snapshot = + LazySnapshot::try_new(Table::try_from_uri(table_root)?, store, version).await?; + let files = config + .require_files + .then(|| -> DeltaResult<_> { Ok(replay_file_actions(&snapshot)?) }) + .transpose()?; + Ok(Self { + snapshot, + files, + predicate: predicate.into(), + }) + } + + pub fn file_data(&self) -> DeltaResult<&RecordBatch> { + Ok(self + .files + .as_ref() + .ok_or(SnapshotError::FilesNotInitialized)?) + } + + pub fn files(&self) -> DeltaResult> { + AddView::try_new(self.file_data()?.clone()) + } + + pub fn file_actions(&self) -> DeltaResult> + '_> { + AddIterator::try_new(self.file_data()?) + } + + /// Get the number of files in the current snapshot + pub fn files_count(&self) -> DeltaResult { + Ok(self + .files + .as_ref() + .map(|f| f.num_rows()) + .ok_or_else(|| SnapshotError::FilesNotInitialized)?) + } + + pub(crate) fn update(&mut self) -> DeltaResult<()> { + let log_root = self.snapshot.table_root().join("_delta_log/").unwrap(); + let fs_client = self.snapshot.engine_ref().get_file_system_client(); + let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?; + let checkpoint_read_schema = get_log_add_schema().clone(); + + let segment = + LogSegment::for_table_changes(fs_client.as_ref(), log_root, self.version() + 1, None)?; + let mut slice_iter = segment + .replay( + self.snapshot.engine_ref().as_ref(), + commit_read_schema, + checkpoint_read_schema, + None, + )? + .map_ok( + |(data, flag)| -> Result<(RecordBatch, bool), delta_kernel::Error> { + Ok((ArrowEngineData::try_from_engine_data(data)?.into(), flag)) + }, + ) + .flatten() + .collect::, _>>()?; + + slice_iter.push(( + self.files + .as_ref() + .ok_or(SnapshotError::FilesNotInitialized)? + .clone(), + false, + )); + + self.files = Some(scan_as_log_data(&self.snapshot, slice_iter)?); + + Ok(()) + } +} + +fn replay_file_actions(snapshot: &LazySnapshot) -> DeltaResult { + let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?; + let checkpoint_read_schema = get_log_add_schema().clone(); + + let curr_data = snapshot + .inner + ._log_segment() + .replay( + snapshot.engine_ref().as_ref(), + commit_read_schema.clone(), + checkpoint_read_schema.clone(), + None, + )? + .map_ok( + |(data, flag)| -> Result<(RecordBatch, bool), delta_kernel::Error> { + Ok((ArrowEngineData::try_from_engine_data(data)?.into(), flag)) + }, + ) + .flatten() + .collect::, _>>()?; + + scan_as_log_data(snapshot, curr_data) +} + +fn scan_as_log_data( + snapshot: &LazySnapshot, + curr_data: Vec<(RecordBatch, bool)>, +) -> Result { + let scan_iter = curr_data.clone().into_iter().map(|(data, flag)| { + Ok(( + Box::new(ArrowEngineData::new(data.clone())) as Box, + flag, + )) + }); + + let res = scan_action_iter(snapshot.engine_ref().as_ref(), scan_iter, None) + .map(|res| { + res.and_then(|(d, selection)| { + Ok(( + RecordBatch::from(ArrowEngineData::try_from_engine_data(d)?), + selection, + )) + }) + }) + .zip(curr_data.into_iter()) + .map(|(scan_res, (data_raw, _))| match scan_res { + Ok((_, selection)) => { + let data = filter_record_batch(&data_raw, &BooleanArray::from(selection))?; + Ok(data.project(&[0])?) + } + Err(e) => Err(e), + }) + .collect::, _>>()?; + + Ok(concat_batches(res[0].schema_ref(), &res)?) +} + +#[cfg(test)] +mod tests { + use deltalake_test::acceptance::{read_dat_case, TestCaseInfo}; + use deltalake_test::TestResult; + + use super::super::tests::get_dat_dir; + use super::*; + + #[tokio::test] + async fn load_eager_snapshot() -> TestResult<()> { + let mut dat_dir = get_dat_dir(); + dat_dir.push("multi_partitioned"); + + let dat_info: TestCaseInfo = read_dat_case(dat_dir)?; + let table_info = dat_info.table_summary()?; + + let table = Table::try_from_uri(dat_info.table_root()?)?; + + let mut snapshot = EagerSnapshot::try_new( + table.location(), + Arc::new(object_store::local::LocalFileSystem::default()), + Default::default(), + Some(1), + None, + ) + .await?; + + // assert_eq!(snapshot.version(), table_info.version); + // assert_eq!( + // snapshot.protocol().min_reader_version(), + // table_info.min_reader_version + // ); + + snapshot.update()?; + + for file in snapshot.file_actions()? { + println!("file: {:#?}", file.unwrap()); + } + + Ok(()) + } +} diff --git a/crates/core/src/kernel/snapshot_next/iterators.rs b/crates/core/src/kernel/snapshot_next/iterators.rs new file mode 100644 index 0000000000..4700cb9da3 --- /dev/null +++ b/crates/core/src/kernel/snapshot_next/iterators.rs @@ -0,0 +1,310 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use arrow_array::cast::AsArray; +use arrow_array::types::Int64Type; +use arrow_array::{ + Array, ArrayRef, BooleanArray, Int64Array, RecordBatch, StringArray, StructArray, +}; +use chrono::{DateTime, Utc}; +use delta_kernel::actions::visitors::AddVisitor; +use delta_kernel::actions::Add; +use delta_kernel::actions::ADD_NAME; +use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::engine::arrow_expression::ProvidesColumnByName; +use delta_kernel::engine_data::{GetData, RowVisitor}; +use delta_kernel::expressions::{Scalar, StructData}; + +use crate::kernel::scalars::ScalarExt; +use crate::{DeltaResult, DeltaTableError}; + +pub struct AddIterator<'a> { + paths: &'a StringArray, + getters: Arc>>, + index: usize, +} + +impl AddIterator<'_> { + pub fn try_new<'a>(actions: &'a RecordBatch) -> DeltaResult> { + validate_column::(actions, &[ADD_NAME, "path"])?; + validate_column::(actions, &[ADD_NAME, "size"])?; + validate_column::(actions, &[ADD_NAME, "modificationTime"])?; + validate_column::(actions, &[ADD_NAME, "dataChange"])?; + + let visitor = AddVisitor::new(); + let fields = visitor.selected_column_names_and_types(); + + let mut mask = HashSet::new(); + for column in fields.0 { + for i in 0..column.len() { + mask.insert(&column[..i + 1]); + } + } + + let mut getters = vec![]; + ArrowEngineData::extract_columns(&mut vec![], &mut getters, fields.1, &mask, actions)?; + + let paths = extract_column(actions, &[ADD_NAME, "path"])?.as_string::(); + + Ok(AddIterator { + paths, + getters: Arc::new(getters), + index: 0, + }) + } +} + +impl Iterator for AddIterator<'_> { + type Item = DeltaResult; + + fn next(&mut self) -> Option { + if self.index < self.paths.len() { + let path = self.paths.value(self.index).to_string(); + let add = AddVisitor::visit_add(self.index, path, self.getters.as_slice()) + .map_err(DeltaTableError::from); + self.index += 1; + Some(add) + } else { + None + } + } +} + +pub struct AddView { + actions: RecordBatch, + index: usize, +} + +impl AddView { + pub fn try_new(actions: RecordBatch) -> DeltaResult { + validate_column::(&actions, &[ADD_NAME, "path"])?; + validate_column::(&actions, &[ADD_NAME, "size"])?; + validate_column::(&actions, &[ADD_NAME, "modificationTime"])?; + validate_column::(&actions, &[ADD_NAME, "dataChange"])?; + Ok(Self { actions, index: 0 }) + } +} + +impl Iterator for AddView { + type Item = AddViewItem; + + fn next(&mut self) -> Option { + if self.index < self.actions.num_rows() { + let add = AddViewItem { + actions: self.actions.clone(), + index: self.index, + }; + self.index += 1; + Some(add) + } else { + None + } + } +} + +pub struct AddViewItem { + actions: RecordBatch, + index: usize, +} + +impl AddViewItem { + pub fn path(&self) -> &str { + extract_column(&self.actions, &[ADD_NAME, "path"]) + .unwrap() + .as_string::() + .value(self.index) + } + + pub fn size(&self) -> i64 { + extract_column(&self.actions, &[ADD_NAME, "size"]) + .unwrap() + .as_primitive::() + .value(self.index) + } + + pub fn modification_time(&self) -> i64 { + extract_column(&self.actions, &[ADD_NAME, "modificationTime"]) + .unwrap() + .as_primitive::() + .value(self.index) + } + + /// Datetime of the last modification time of the file. + pub fn modification_datetime(&self) -> DeltaResult> { + DateTime::from_timestamp_millis(self.modification_time()).ok_or(DeltaTableError::from( + crate::protocol::ProtocolError::InvalidField(format!( + "invalid modification_time: {:?}", + self.modification_time() + )), + )) + } + + pub fn data_change(&self) -> bool { + extract_column(&self.actions, &[ADD_NAME, "dataChange"]) + .unwrap() + .as_boolean() + .value(self.index) + } + + pub fn stats(&self) -> Option<&str> { + extract_column(&self.actions, &[ADD_NAME, "stats"]) + .ok() + .and_then(|c| c.as_string_opt::().map(|v| v.value(self.index))) + } + + pub fn base_row_id(&self) -> Option { + extract_column(&self.actions, &[ADD_NAME, "baseRowId"]) + .ok() + .and_then(|c| { + c.as_primitive_opt::() + .map(|v| v.value(self.index)) + }) + } + + pub fn default_row_commit_version(&self) -> Option { + extract_column(&self.actions, &[ADD_NAME, "defaultRowCommitVersion"]) + .ok() + .and_then(|c| { + c.as_primitive_opt::() + .map(|v| v.value(self.index)) + }) + } + + pub fn clustering_provider(&self) -> Option<&str> { + extract_column(&self.actions, &[ADD_NAME, "clusteringProvider"]) + .ok() + .and_then(|c| c.as_string_opt::().map(|v| v.value(self.index))) + } +} + +#[derive(Clone)] +pub struct LogicalFileView { + files: RecordBatch, + index: usize, +} + +impl LogicalFileView { + /// Path of the file. + pub fn path(&self) -> &str { + self.files.column(0).as_string::().value(self.index) + } + + /// Size of the file in bytes. + pub fn size(&self) -> i64 { + self.files + .column(1) + .as_primitive::() + .value(self.index) + } + + /// Modification time of the file in milliseconds since epoch. + pub fn modification_time(&self) -> i64 { + self.files + .column(2) + .as_primitive::() + .value(self.index) + } + + /// Datetime of the last modification time of the file. + pub fn modification_datetime(&self) -> DeltaResult> { + DateTime::from_timestamp_millis(self.modification_time()).ok_or(DeltaTableError::from( + crate::protocol::ProtocolError::InvalidField(format!( + "invalid modification_time: {:?}", + self.modification_time() + )), + )) + } + + pub fn stats(&self) -> Option<&str> { + let col = self.files.column(3).as_string::(); + col.is_valid(self.index).then(|| col.value(self.index)) + } + + pub fn partition_values(&self) -> Option { + self.files + .column_by_name("fileConstantValues") + .and_then(|col| col.as_struct_opt()) + .and_then(|s| s.column_by_name("partitionValues")) + .and_then(|arr| { + arr.is_valid(self.index) + .then(|| match Scalar::from_array(arr, self.index) { + Some(Scalar::Struct(s)) => Some(s), + _ => None, + }) + .flatten() + }) + } +} + +impl Iterator for LogicalFileView { + type Item = LogicalFileView; + + fn next(&mut self) -> Option { + if self.index < self.files.num_rows() { + let file = LogicalFileView { + files: self.files.clone(), + index: self.index, + }; + self.index += 1; + Some(file) + } else { + None + } + } +} + +fn validate_column<'a, T: Array + 'static>( + actions: &'a RecordBatch, + col: &'a [impl AsRef], +) -> DeltaResult<()> { + if let Ok(arr) = extract_column(actions, col) { + if arr.as_any().downcast_ref::().is_none() { + return Err(DeltaTableError::from( + crate::protocol::ProtocolError::InvalidField(format!("Invalid column: {:?}", arr)), + )); + } + if arr.null_count() > 0 { + return Err(DeltaTableError::from( + crate::protocol::ProtocolError::InvalidField(format!( + "Column has null values: {:?}", + arr + )), + )); + } + } else { + return Err(DeltaTableError::from( + crate::protocol::ProtocolError::InvalidField(format!("Column not found",)), + )); + } + Ok(()) +} + +fn extract_column<'a>( + mut parent: &'a dyn ProvidesColumnByName, + col: &[impl AsRef], +) -> DeltaResult<&'a ArrayRef> { + let mut field_names = col.iter(); + let Some(mut field_name) = field_names.next() else { + return Err(arrow_schema::ArrowError::SchemaError( + "Empty column path".to_string(), + ))?; + }; + loop { + let child = parent.column_by_name(field_name.as_ref()).ok_or_else(|| { + arrow_schema::ArrowError::SchemaError(format!("No such field: {}", field_name.as_ref())) + })?; + field_name = match field_names.next() { + Some(name) => name, + None => return Ok(child), + }; + parent = child + .as_any() + .downcast_ref::() + .ok_or_else(|| { + arrow_schema::ArrowError::SchemaError(format!( + "Not a struct: {}", + field_name.as_ref() + )) + })?; + } +} diff --git a/crates/core/src/kernel/snapshot_next/lazy.rs b/crates/core/src/kernel/snapshot_next/lazy.rs new file mode 100644 index 0000000000..386d0f63d6 --- /dev/null +++ b/crates/core/src/kernel/snapshot_next/lazy.rs @@ -0,0 +1,229 @@ +//! Snapshot of a Delta Table at a specific version. +//! +use std::sync::{Arc, LazyLock}; + +use arrow::compute::filter_record_batch; +use arrow_array::{BooleanArray, RecordBatch}; +use delta_kernel::actions::set_transaction::{SetTransactionMap, SetTransactionScanner}; +use delta_kernel::actions::{get_log_schema, REMOVE_NAME}; +use delta_kernel::actions::{Metadata, Protocol, SetTransaction}; +use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::engine::default::executor::tokio::{ + TokioBackgroundExecutor, TokioMultiThreadExecutor, +}; +use delta_kernel::engine::default::DefaultEngine; +use delta_kernel::schema::Schema; +use delta_kernel::snapshot::Snapshot as SnapshotInner; +use delta_kernel::table_properties::TableProperties; +use delta_kernel::{Engine, Expression, ExpressionRef, Table, Version}; +use itertools::Itertools; +use object_store::path::Path; +use object_store::ObjectStore; +use url::Url; + +use super::cache::CommitCacheObjectStore; +use super::Snapshot; +use crate::{DeltaResult, DeltaTableError}; + +// TODO: avoid repetitive parsing of json stats + +#[derive(Clone)] +pub struct LazySnapshot { + pub(super) inner: Arc, + engine: Arc, +} + +impl Snapshot for LazySnapshot { + fn table_root(&self) -> &Url { + &self.inner.table_root() + } + + fn version(&self) -> Version { + self.inner.version() + } + + fn schema(&self) -> &Schema { + self.inner.schema() + } + + fn protocol(&self) -> &Protocol { + self.inner.protocol() + } + + fn metadata(&self) -> &Metadata { + self.inner.metadata() + } + + fn table_properties(&self) -> &TableProperties { + &self.inner.table_properties() + } + + fn files(&self) -> DeltaResult>> { + Ok(self + .files_impl(None)? + .map(|batch| batch.map_err(|e| e.into()))) + } + + fn tombstones(&self) -> DeltaResult>> { + static META_PREDICATE: LazyLock> = LazyLock::new(|| { + Some(Arc::new( + Expression::column([REMOVE_NAME, "path"]).is_not_null(), + )) + }); + let read_schema = get_log_schema().project(&[REMOVE_NAME])?; + Ok(self + .inner + ._log_segment() + .replay( + self.engine.as_ref(), + read_schema.clone(), + read_schema, + META_PREDICATE.clone(), + )? + .map_ok(|(d, _)| Ok(RecordBatch::from(ArrowEngineData::try_from_engine_data(d)?))) + .flatten()) + } + + fn application_transactions(&self) -> DeltaResult { + let scanner = SetTransactionScanner::new(self.inner.clone()); + Ok(scanner.application_transactions(self.engine.as_ref())?) + } + + fn application_transaction( + &self, + app_id: impl AsRef, + ) -> DeltaResult> { + let scanner = SetTransactionScanner::new(self.inner.clone()); + Ok(scanner.application_transaction(self.engine.as_ref(), app_id.as_ref())?) + } +} + +impl LazySnapshot { + /// Create a new [`Snapshot`] instance. + pub fn new(inner: Arc, engine: Arc) -> Self { + Self { inner, engine } + } + + /// Create a new [`Snapshot`] instance for a table. + pub async fn try_new( + table: Table, + store: Arc, + version: impl Into>, + ) -> DeltaResult { + // TODO: how to deal with the dedicated IO runtime? Would this already be covered by the + // object store implementation pass to this? + let table_root = Path::from_url_path(table.location().path())?; + let store_str = format!("{}", store); + let is_local = store_str.starts_with("LocalFileSystem"); + let store = Arc::new(CommitCacheObjectStore::new(store)); + let handle = tokio::runtime::Handle::current(); + let engine: Arc = match handle.runtime_flavor() { + tokio::runtime::RuntimeFlavor::MultiThread => Arc::new(DefaultEngine::new_with_opts( + store, + table_root, + Arc::new(TokioMultiThreadExecutor::new(handle)), + !is_local, + )), + tokio::runtime::RuntimeFlavor::CurrentThread => Arc::new(DefaultEngine::new_with_opts( + store, + table_root, + Arc::new(TokioBackgroundExecutor::new()), + !is_local, + )), + _ => return Err(DeltaTableError::generic("unsupported runtime flavor")), + }; + + let snapshot = table.snapshot(engine.as_ref(), version.into())?; + Ok(Self::new(Arc::new(snapshot), engine)) + } + + /// A shared reference to the engine used for interacting with the Delta Table. + pub(super) fn engine_ref(&self) -> &Arc { + &self.engine + } + + /// Get the timestamp of the given version in miliscends since epoch. + /// + /// Extracts the timestamp from the commit file of the given version + /// from the current log segment. If the commit file is not part of the + /// current log segment, `None` is returned. + pub fn version_timestamp(&self, version: Version) -> Option { + self.inner + ._log_segment() + .ascending_commit_files + .iter() + .find(|f| f.version == version) + .map(|f| f.location.last_modified) + } + + /// read all active files from the log + fn files_impl( + &self, + predicate: impl Into>>, + ) -> DeltaResult>> { + let scan = self + .inner + .clone() + .scan_builder() + .with_predicate(predicate) + .build()?; + Ok(scan.scan_data(self.engine.as_ref())?.map(|res| { + res.and_then(|(data, mut predicate)| { + let batch: RecordBatch = ArrowEngineData::try_from_engine_data(data)?.into(); + if predicate.len() < batch.num_rows() { + predicate + .extend(std::iter::repeat(true).take(batch.num_rows() - predicate.len())); + } + Ok(filter_record_batch(&batch, &BooleanArray::from(predicate))?) + }) + })) + } +} + +#[cfg(test)] +mod tests { + use deltalake_test::acceptance::{read_dat_case, TestCaseInfo}; + use deltalake_test::TestResult; + + use super::super::tests::get_dat_dir; + use super::*; + + async fn load_snapshot() -> TestResult<()> { + // some comment + let mut dat_dir = get_dat_dir(); + dat_dir.push("multi_partitioned"); + + let dat_info: TestCaseInfo = read_dat_case(dat_dir)?; + let table_info = dat_info.table_summary()?; + + let table = Table::try_from_uri(dat_info.table_root()?)?; + + let snapshot = LazySnapshot::try_new( + table, + Arc::new(object_store::local::LocalFileSystem::default()), + None, + ) + .await?; + + assert_eq!(snapshot.version(), table_info.version); + assert_eq!( + ( + snapshot.protocol().min_reader_version(), + snapshot.protocol().min_writer_version() + ), + (table_info.min_reader_version, table_info.min_writer_version) + ); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn load_snapshot_multi() -> TestResult<()> { + load_snapshot().await + } + + #[tokio::test(flavor = "current_thread")] + async fn load_snapshot_current() -> TestResult<()> { + load_snapshot().await + } +} diff --git a/crates/core/src/kernel/snapshot_next/mod.rs b/crates/core/src/kernel/snapshot_next/mod.rs new file mode 100644 index 0000000000..879ef2824a --- /dev/null +++ b/crates/core/src/kernel/snapshot_next/mod.rs @@ -0,0 +1,161 @@ +//! Snapshot of a Delta Table at a specific version. +//! +use std::sync::Arc; + +use arrow_array::RecordBatch; +use delta_kernel::actions::visitors::SetTransactionMap; +use delta_kernel::actions::{Add, Metadata, Protocol, SetTransaction}; +use delta_kernel::expressions::{Scalar, StructData}; +use delta_kernel::schema::Schema; +use delta_kernel::table_properties::TableProperties; +use delta_kernel::Version; +use iterators::{AddIterator, AddView, AddViewItem}; +use url::Url; + +use crate::{DeltaResult, DeltaTableError}; + +pub use eager::EagerSnapshot; +pub use lazy::LazySnapshot; + +mod cache; +mod eager; +mod iterators; +mod lazy; + +// TODO: avoid repetitive parsing of json stats + +#[derive(thiserror::Error, Debug)] +enum SnapshotError { + #[error("Tried accessing file data at snapshot initialized with no files.")] + FilesNotInitialized, +} + +impl From for DeltaTableError { + fn from(e: SnapshotError) -> Self { + match &e { + SnapshotError::FilesNotInitialized => DeltaTableError::generic(e), + } + } +} + +/// Helper trait to extract individual values from a `StructData`. +pub trait StructDataExt { + fn get(&self, key: &str) -> Option<&Scalar>; +} + +impl StructDataExt for StructData { + fn get(&self, key: &str) -> Option<&Scalar> { + self.fields() + .iter() + .zip(self.values().iter()) + .find(|(k, _)| k.name() == key) + .map(|(_, v)| v) + } +} + +pub trait Snapshot { + /// Location where the Delta Table (metadata) is stored. + fn table_root(&self) -> &Url; + + /// Version of this `Snapshot` in the table. + fn version(&self) -> Version; + + /// Table [`Schema`] at this `Snapshot`s version. + fn schema(&self) -> &Schema; + + /// Table [`Metadata`] at this `Snapshot`s version. + fn metadata(&self) -> &Metadata; + + /// Table [`Protocol`] at this `Snapshot`s version. + fn protocol(&self) -> &Protocol; + + /// Get the [`TableProperties`] for this [`Snapshot`]. + fn table_properties(&self) -> &TableProperties; + + fn files(&self) -> DeltaResult>>; + + fn files_view( + &self, + ) -> DeltaResult>>> { + Ok(self.files()?.map(|r| r.and_then(|b| AddView::try_new(b)))) + } + + fn tombstones(&self) -> DeltaResult>>; + + /// Scan the Delta Log to obtain the latest transaction for all applications + /// + /// This method requires a full scan of the log to find all transactions. + /// When a specific application id is requested, it is much more efficient to use + /// [`application_transaction`](Self::application_transaction) instead. + fn application_transactions(&self) -> DeltaResult; + + /// Scan the Delta Log for the latest transaction entry for a specific application. + /// + /// Initiates a log scan, but terminates as soon as the transaction + /// for the given application is found. + fn application_transaction( + &self, + app_id: impl AsRef, + ) -> DeltaResult>; +} + +impl Snapshot for Arc { + fn table_root(&self) -> &Url { + self.as_ref().table_root() + } + + fn version(&self) -> Version { + self.as_ref().version() + } + + fn schema(&self) -> &Schema { + self.as_ref().schema() + } + + fn metadata(&self) -> &Metadata { + self.as_ref().metadata() + } + + fn protocol(&self) -> &Protocol { + self.as_ref().protocol() + } + + fn table_properties(&self) -> &TableProperties { + self.as_ref().table_properties() + } + + fn files(&self) -> DeltaResult>> { + self.as_ref().files() + } + + fn tombstones(&self) -> DeltaResult>> { + self.as_ref().tombstones() + } + + fn application_transactions(&self) -> DeltaResult { + self.as_ref().application_transactions() + } + + fn application_transaction( + &self, + app_id: impl AsRef, + ) -> DeltaResult> { + self.as_ref().application_transaction(app_id) + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + pub(super) fn get_dat_dir() -> PathBuf { + let d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let mut rep_root = d + .parent() + .and_then(|p| p.parent()) + .expect("valid directory") + .to_path_buf(); + rep_root.push("dat/out/reader_tests/generated"); + rep_root + } +} diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 87a4f26afd..a51d72b068 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -31,7 +31,6 @@ pub use retry_ext::ObjectStoreRetryExt; use std::ops::Range; pub use utils::*; -pub(crate) mod cache; pub mod file; pub mod retry_ext; pub mod utils;