Skip to content

Commit

Permalink
feat: default logstore implementation (delta-io#1742)
Browse files Browse the repository at this point in the history
Introduce a `LogStore` abstraction to channel all log store reads and
writes through a single place. This is supposed to allow implementations
with more sophisticated locking mechanisms that do not rely on atomic
rename semantics for the underlying object store.

This does not change any functionality - it reorganizes read operations
and commits on the delta commit log to be funneled through the
respective methods of `LogStore`.

The goal is to align the implementation of multi-cluster writes for
Delta Lake on S3 with the one provided by the original `delta` library,
enabling multi-cluster writes with some writers using Spark / Delta
library and other writers using `delta-rs` For an overview of how it's
done in delta, please see:
1. Delta [blog
post](https://delta.io/blog/2022-05-18-multi-cluster-writes-to-delta-lake-storage-in-s3/)
(high-level concept)
2. Associated Databricks [design
doc](https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h)
(detailed read)
3.
[S3DynamoDbLogStore.java](https://github.com/delta-io/delta/blob/master/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java)(content
warning: Java code behind this link)

This approach requires readers of a delta table to "recover" unfinished
commits from writers - as a result, reading and writing is combined in a
single interface, which in this PR is modeled after
[LogStore.java](https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/LogStore.java).
Currently in `delta-rs`, read path for commits is implemented directly
in `DeltaTable`, and there's no mechanism to implement storage-specific
behavior like interacting with DynamoDb.

---------

Co-authored-by: Robert Pack <[email protected]>
  • Loading branch information
2 people authored and Ryan Aston committed Nov 9, 2023
1 parent 680c2d9 commit 39019fd
Show file tree
Hide file tree
Showing 44 changed files with 818 additions and 746 deletions.
49 changes: 24 additions & 25 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ use url::Url;

use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType};
use crate::logstore::LogStoreRef;
use crate::protocol::{self};
use crate::storage::ObjectStoreRef;
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
use crate::{open_table, open_table_with_storage_options, DeltaTable};
Expand Down Expand Up @@ -357,10 +357,10 @@ impl PruningStatistics for DeltaTable {

// each delta table must register a specific object store, since paths are internally
// handled relative to the table root.
pub(crate) fn register_store(store: ObjectStoreRef, env: Arc<RuntimeEnv>) {
pub(crate) fn register_store(store: LogStoreRef, env: Arc<RuntimeEnv>) {
let object_store_url = store.object_store_url();
let url: &Url = object_store_url.as_ref();
env.register_object_store(url, store);
env.register_object_store(url, store.object_store());
}

pub(crate) fn logical_schema(
Expand Down Expand Up @@ -467,7 +467,7 @@ pub struct DeltaScanConfig {
#[derive(Debug)]
pub(crate) struct DeltaScanBuilder<'a> {
snapshot: &'a DeltaTableState,
object_store: ObjectStoreRef,
log_store: LogStoreRef,
filter: Option<Expr>,
state: &'a SessionState,
projection: Option<&'a Vec<usize>>,
Expand All @@ -480,12 +480,12 @@ pub(crate) struct DeltaScanBuilder<'a> {
impl<'a> DeltaScanBuilder<'a> {
pub fn new(
snapshot: &'a DeltaTableState,
object_store: ObjectStoreRef,
log_store: LogStoreRef,
state: &'a SessionState,
) -> Self {
DeltaScanBuilder {
snapshot,
object_store,
log_store,
filter: None,
state,
files: None,
Expand Down Expand Up @@ -532,7 +532,7 @@ impl<'a> DeltaScanBuilder<'a> {
Some(schema) => schema,
None => {
self.snapshot
.physical_arrow_schema(self.object_store.clone())
.physical_arrow_schema(self.log_store.object_store())
.await?
}
};
Expand Down Expand Up @@ -632,7 +632,7 @@ impl<'a> DeltaScanBuilder<'a> {
.create_physical_plan(
self.state,
FileScanConfig {
object_store_url: self.object_store.object_store_url(),
object_store_url: self.log_store.object_store_url(),
file_schema,
file_groups: file_groups.into_values().collect(),
statistics: self.snapshot.datafusion_table_statistics(),
Expand All @@ -647,9 +647,7 @@ impl<'a> DeltaScanBuilder<'a> {
.await?;

Ok(DeltaScan {
table_uri: ensure_table_uri(self.object_store.root_uri())?
.as_str()
.into(),
table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(),
parquet_scan: scan,
config,
logical_schema,
Expand Down Expand Up @@ -686,10 +684,10 @@ impl TableProvider for DeltaTable {
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
register_store(self.object_store(), session.runtime_env().clone());
register_store(self.log_store(), session.runtime_env().clone());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(&self.state, self.object_store(), session)
let scan = DeltaScanBuilder::new(&self.state, self.log_store(), session)
.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
Expand All @@ -714,7 +712,7 @@ impl TableProvider for DeltaTable {
/// A Delta table provider that enables additional metadata columns to be included during the scan
pub struct DeltaTableProvider {
snapshot: DeltaTableState,
store: ObjectStoreRef,
log_store: LogStoreRef,
config: DeltaScanConfig,
schema: Arc<ArrowSchema>,
}
Expand All @@ -723,13 +721,13 @@ impl DeltaTableProvider {
/// Build a DeltaTableProvider
pub fn try_new(
snapshot: DeltaTableState,
store: ObjectStoreRef,
log_store: LogStoreRef,
config: DeltaScanConfig,
) -> DeltaResult<Self> {
Ok(DeltaTableProvider {
schema: logical_schema(&snapshot, &config)?,
snapshot,
store,
log_store,
config,
})
}
Expand Down Expand Up @@ -764,10 +762,10 @@ impl TableProvider for DeltaTableProvider {
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
register_store(self.store.clone(), session.runtime_env().clone());
register_store(self.log_store.clone(), session.runtime_env().clone());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(&self.snapshot, self.store.clone(), session)
let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
Expand Down Expand Up @@ -1462,7 +1460,7 @@ fn join_batches_with_add_actions(
/// Determine which files contain a record that statisfies the predicate
pub(crate) async fn find_files_scan<'a>(
snapshot: &DeltaTableState,
store: ObjectStoreRef,
log_store: LogStoreRef,
state: &SessionState,
expression: Expr,
) -> DeltaResult<Vec<Add>> {
Expand All @@ -1489,7 +1487,7 @@ pub(crate) async fn find_files_scan<'a>(
// Add path column
used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?);

let scan = DeltaScanBuilder::new(snapshot, store.clone(), state)
let scan = DeltaScanBuilder::new(snapshot, log_store, state)
.with_filter(Some(expression.clone()))
.with_projection(Some(&used_columns))
.with_scan_config(scan_config)
Expand Down Expand Up @@ -1580,7 +1578,7 @@ pub(crate) async fn scan_memory_table(
/// Finds files in a snapshot that match the provided predicate.
pub async fn find_files<'a>(
snapshot: &DeltaTableState,
object_store: ObjectStoreRef,
log_store: LogStoreRef,
state: &SessionState,
predicate: Option<Expr>,
) -> DeltaResult<FindFiles> {
Expand Down Expand Up @@ -1608,8 +1606,7 @@ pub async fn find_files<'a>(
})
} else {
let candidates =
find_files_scan(snapshot, object_store.clone(), state, predicate.to_owned())
.await?;
find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?;

Ok(FindFiles {
candidates,
Expand Down Expand Up @@ -1924,7 +1921,8 @@ mod tests {
.build(&table.state)
.unwrap();

let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap();
let log_store = table.log_store();
let provider = DeltaTableProvider::try_new(table.state, log_store, config).unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down Expand Up @@ -1983,7 +1981,8 @@ mod tests {

let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap();

let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap();
let log_store = table.log_store();
let provider = DeltaTableProvider::try_new(table.state, log_store, config).unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ compile_error!(
pub mod data_catalog;
pub mod errors;
pub mod kernel;
pub mod logstore;
pub mod operations;
pub mod protocol;
pub mod schema;
Expand Down
89 changes: 89 additions & 0 deletions crates/deltalake-core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//! Default implementation of [`LogStore`] for storage backends with atomic put-if-absent operation
use std::sync::Arc;

use bytes::Bytes;
#[cfg(feature = "datafusion")]
use datafusion::execution::object_store::ObjectStoreUrl;
use object_store::{path::Path, ObjectStore};
use url::Url;

use super::{LogStore, LogStoreConfig};
use crate::{
operations::transaction::TransactionError,
storage::{
config::{self, StorageOptions},
ObjectStoreRef,
},
DeltaResult,
};

/// Default [`LogStore`] implementation
#[derive(Debug, Clone)]
pub struct DefaultLogStore {
pub(crate) storage: Arc<dyn ObjectStore>,
config: LogStoreConfig,
}

impl DefaultLogStore {
/// Create a new instance of [`DefaultLogStore`]
///
/// # Arguments
///
/// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located).
/// * `location` - A url corresponding to the storage location of `storage`.
pub fn new(storage: ObjectStoreRef, config: LogStoreConfig) -> Self {
Self { storage, config }
}

/// Create log store
pub fn try_new(location: Url, options: impl Into<StorageOptions> + Clone) -> DeltaResult<Self> {
let mut options = options.into();
let storage = config::configure_store(&location, &mut options)?;
Ok(Self {
storage: Arc::new(storage),
config: LogStoreConfig { location, options },
})
}
}

#[async_trait::async_trait]
impl LogStore for DefaultLogStore {
async fn read_commit_entry(&self, version: i64) -> DeltaResult<Bytes> {
super::read_commit_entry(self.storage.as_ref(), version).await
}

/// Tries to commit a prepared commit file. Returns [`TransactionError`]
/// if the given `version` already exists. The caller should handle the retry logic itself.
/// This is low-level transaction API. If user does not want to maintain the commit loop then
/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction`
/// with retry logic.
async fn write_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
super::write_commit_entry(self.storage.as_ref(), version, tmp_commit).await
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
super::get_latest_version(self, current_version).await
}

fn object_store(&self) -> Arc<dyn ObjectStore> {
self.storage.clone()
}

fn to_uri(&self, location: &Path) -> String {
super::to_uri(&self.config.location, location)
}

#[cfg(feature = "datafusion")]
fn object_store_url(&self) -> ObjectStoreUrl {
super::object_store_url(&self.config.location)
}

fn config(&self) -> &LogStoreConfig {
&self.config
}
}
Loading

0 comments on commit 39019fd

Please sign in to comment.