Skip to content

Commit

Permalink
feat: default logstore implementation
Browse files Browse the repository at this point in the history
Introduce a `LogStore` abstraction to channel all log store reads and writes through a single place. This is supposed to allow implementations with more sophisticated locking mechanisms that do not rely on atomic rename semantics for the underlying object store.

This does not change any functionality - it reorganizes read operations and commits on the delta commit log to be funneled through the respective methods of `LogStore`.

The goal is to align the implementation of multi-cluster writes for Delta Lake on S3 with the one provided by the original `delta` library, enabling multi-cluster writes with some writers using Spark / Delta library and other writers using `delta-rs`  For an overview of how it's done in delta, please see:
1. Delta [blog post](https://delta.io/blog/2022-05-18-multi-cluster-writes-to-delta-lake-storage-in-s3/) (high-level concept)
2. Associated Databricks [design doc](https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h) (detailed read)
3. [S3DynamoDbLogStore.java](https://github.com/delta-io/delta/blob/master/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java)(content warning: Java code behind this link)

This approach requires readers of a delta table to "recover" unfinished commits from writers - as a result, reading and writing is combined in a single interface, which in this PR is modeled after [LogStore.java](https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/LogStore.java). Currently in `delta-rs`, read path for commits is implemented directly in `DeltaTable`, and there's no mechanism to implement storage-specific behavior like interacting with DynamoDb.
  • Loading branch information
dispanser committed Nov 6, 2023
1 parent dcc3a7b commit 394ae2b
Show file tree
Hide file tree
Showing 41 changed files with 521 additions and 323 deletions.
6 changes: 4 additions & 2 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1924,7 +1924,8 @@ mod tests {
.build(&table.state)
.unwrap();

let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap();
let storage = table.object_store();
let provider = DeltaTableProvider::try_new(table.state, storage, config).unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down Expand Up @@ -1983,7 +1984,8 @@ mod tests {

let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap();

let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap();
let storage = table.object_store();
let provider = DeltaTableProvider::try_new(table.state, storage, config).unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ compile_error!(
pub mod data_catalog;
pub mod errors;
pub mod kernel;
pub mod logstore;
pub mod operations;
pub mod protocol;
pub mod schema;
Expand Down
115 changes: 115 additions & 0 deletions crates/deltalake-core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//! Default implementation of [`LogStore`] for storage backends with atomic put-if-absent operation
use std::cmp::max;

use bytes::Bytes;
use futures::StreamExt;
use lazy_static::lazy_static;
use log::debug;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
use regex::Regex;

use super::LogStore;
use crate::{
operations::transaction::TransactionError,
protocol::{get_last_checkpoint, ProtocolError},
storage::{commit_uri_from_version, ObjectStoreRef},
DeltaResult, DeltaTableError,
};

/// Default [`LogStore`] implementation
#[derive(Debug, Clone)]
pub struct DefaultLogStore {
pub(crate) storage: ObjectStoreRef,
}

#[async_trait::async_trait]
impl LogStore for DefaultLogStore {
async fn read_commit_entry(&self, version: i64) -> DeltaResult<Bytes> {
let commit_uri = commit_uri_from_version(version);
let data = self.storage.get(&commit_uri).await?.bytes().await?;
Ok(data)

// TODO: return actual actions instead
// let actions = Self::get_actions(next_version, commit_log_bytes).await;
}

/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists]
/// if the given `version` already exists. The caller should handle the retry logic itself.
/// This is low-level transaction API. If user does not want to maintain the commit loop then
/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction`
/// with retry logic.
async fn write_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
// move temporary commit file to delta log directory
// rely on storage to fail if the file already exists -
self.storage
.rename_if_not_exists(tmp_commit, &commit_uri_from_version(version))
.await
.map_err(|err| match err {
ObjectStoreError::AlreadyExists { .. } => {
TransactionError::VersionAlreadyExists(version)
}
_ => TransactionError::from(err),
})?;
Ok(())
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
let version_start = match get_last_checkpoint(&self.storage).await {
Ok(last_check_point) => last_check_point.version,
Err(ProtocolError::CheckpointNotFound) => {
// no checkpoint
-1
}
Err(e) => {
return Err(DeltaTableError::from(e));
}
};

debug!("latest checkpoint version: {version_start}");

let version_start = max(current_version, version_start);

lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap();
}

// list files to find max version
let version = async {
let mut max_version: i64 = version_start;
let prefix = Some(self.storage.log_path());
let offset_path = commit_uri_from_version(max_version);
let mut files = self.storage.list_with_offset(prefix, &offset_path).await?;
// let mut files = self.storage.list_with_offset(prefix, &offset_path).await?;

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) {
let log_version = captures.get(1).unwrap().as_str().parse().unwrap();
// listing may not be ordered
max_version = max(max_version, log_version);
// also cache timestamp for version, for faster time-travel
// TODO: temporarily disabled because `version_timestamp` is not available in the [`LogStore`]
// self.version_timestamp
// .insert(log_version, obj_meta.last_modified.timestamp());
}
}

if max_version < 0 {
return Err(DeltaTableError::not_a_table(self.storage.root_uri()));
}

Ok::<i64, DeltaTableError>(max_version)
}
.await?;
Ok(version)
}

fn object_store(&self) -> ObjectStoreRef {
self.storage.clone()
}
}
53 changes: 53 additions & 0 deletions crates/deltalake-core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//! Delta log store.
use std::sync::Arc;

use crate::{
errors::DeltaResult, operations::transaction::TransactionError, storage::ObjectStoreRef,
};
use bytes::Bytes;
use object_store::path::Path;

pub mod default_logstore;

/// Sharable reference to [`LogStore`]
pub type LogStoreRef = Arc<dyn LogStore>;

/// Trait for critical operations required to read and write commit entries in Delta logs.
///
/// The correctness is predicated on the atomicity and durability guarantees of
/// the implementation of this interface. Specifically,
///
/// - Atomic visibility: Any commit created via `write_commit_entry` must become visible atomically.
/// - Mutual exclusion: Only one writer must be able to create a commit for a specific version.
/// - Consistent listing: Once a commit entry for version `v` has been written, any future call to
/// `get_latest_version` must return a version >= `v`, i.e. the underlying file system entry must
/// become visible immediately.
#[async_trait::async_trait]
pub trait LogStore: Sync + Send {
/// Read data for commit entry with the given version.
/// TODO: return the actual commit data, i.e. Vec<Action>, instead?
async fn read_commit_entry(&self, version: i64) -> DeltaResult<Bytes>;

/// Write list of actions as delta commit entry for given version.
///
/// This operation can be retried with a higher version in case the write
/// fails with `TransactionError::VersionAlreadyExists`.
async fn write_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError>;

/// Find latest version currently stored in the delta log.
async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;

/// Get underlying object store.
fn object_store(&self) -> ObjectStoreRef;
}

// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders
impl std::fmt::Debug for dyn LogStore + '_ {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.object_store().fmt(f)
}
}
28 changes: 14 additions & 14 deletions crates/deltalake-core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use super::transaction::commit;
use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, DataType, Metadata, Protocol, StructField, StructType};
use crate::logstore::{LogStore, LogStoreRef};
use crate::protocol::{DeltaOperation, SaveMode};
use crate::storage::DeltaObjectStore;
use crate::table::builder::ensure_table_uri;
use crate::table::config::DeltaConfigKey;
use crate::table::DeltaTableMetaData;
Expand Down Expand Up @@ -55,7 +55,7 @@ pub struct CreateBuilder {
partition_columns: Option<Vec<String>>,
storage_options: Option<HashMap<String, String>>,
actions: Vec<Action>,
object_store: Option<Arc<DeltaObjectStore>>,
log_store: Option<LogStoreRef>,
configuration: HashMap<String, Option<String>>,
metadata: Option<Map<String, Value>>,
}
Expand All @@ -78,7 +78,7 @@ impl CreateBuilder {
partition_columns: None,
storage_options: None,
actions: Default::default(),
object_store: None,
log_store: None,
configuration: Default::default(),
metadata: Default::default(),
}
Expand Down Expand Up @@ -198,9 +198,9 @@ impl CreateBuilder {
self
}

/// Provide a [`DeltaObjectStore`] instance, that points at table location
pub fn with_object_store(mut self, object_store: Arc<DeltaObjectStore>) -> Self {
self.object_store = Some(object_store);
/// Provide a [`LogStore`] instance, that points at table location
pub fn with_log_store(mut self, log_store: Arc<dyn LogStore>) -> Self {
self.log_store = Some(log_store);
self
}

Expand All @@ -219,12 +219,12 @@ impl CreateBuilder {
return Err(CreateError::MissingSchema.into());
}

let (storage_url, table) = if let Some(object_store) = self.object_store {
let (storage_url, table) = if let Some(log_store) = self.log_store {
(
ensure_table_uri(object_store.root_uri())?
ensure_table_uri(log_store.object_store().root_uri())?
.as_str()
.to_string(),
DeltaTable::new(object_store, Default::default()),
DeltaTable::new(log_store, Default::default()),
)
} else {
let storage_url = ensure_table_uri(self.location.ok_or(CreateError::MissingLocation)?)?;
Expand Down Expand Up @@ -311,7 +311,7 @@ impl std::future::IntoFuture for CreateBuilder {
};

let version = commit(
table.object_store().as_ref(),
table.log_store.as_ref(),
&actions,
operation,
table_state,
Expand Down Expand Up @@ -443,19 +443,19 @@ mod tests {
assert_eq!(table.version(), 0);
let first_id = table.get_metadata().unwrap().id.clone();

let object_store = table.object_store();
let log_store = table.log_store;

// Check an error is raised when a table exists at location
let table = CreateBuilder::new()
.with_object_store(object_store.clone())
.with_log_store(log_store.clone())
.with_columns(schema.fields().clone())
.with_save_mode(SaveMode::ErrorIfExists)
.await;
assert!(table.is_err());

// Check current table is returned when ignore option is chosen.
let table = CreateBuilder::new()
.with_object_store(object_store.clone())
.with_log_store(log_store.clone())
.with_columns(schema.fields().clone())
.with_save_mode(SaveMode::Ignore)
.await
Expand All @@ -464,7 +464,7 @@ mod tests {

// Check table is overwritten
let table = CreateBuilder::new()
.with_object_store(object_store.clone())
.with_log_store(log_store.clone())
.with_columns(schema.fields().iter().cloned())
.with_save_mode(SaveMode::Overwrite)
.await
Expand Down
Loading

0 comments on commit 394ae2b

Please sign in to comment.