Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: default logstore implementation #1742

Merged
merged 10 commits into from
Nov 9, 2023
49 changes: 24 additions & 25 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ use url::Url;

use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType};
use crate::logstore::LogStoreRef;
use crate::protocol::{self};
use crate::storage::ObjectStoreRef;
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
use crate::{open_table, open_table_with_storage_options, DeltaTable};
Expand Down Expand Up @@ -357,10 +357,10 @@ impl PruningStatistics for DeltaTable {

// each delta table must register a specific object store, since paths are internally
// handled relative to the table root.
pub(crate) fn register_store(store: ObjectStoreRef, env: Arc<RuntimeEnv>) {
pub(crate) fn register_store(store: LogStoreRef, env: Arc<RuntimeEnv>) {
let object_store_url = store.object_store_url();
let url: &Url = object_store_url.as_ref();
env.register_object_store(url, store);
env.register_object_store(url, store.object_store());
}

pub(crate) fn logical_schema(
Expand Down Expand Up @@ -467,7 +467,7 @@ pub struct DeltaScanConfig {
#[derive(Debug)]
pub(crate) struct DeltaScanBuilder<'a> {
snapshot: &'a DeltaTableState,
object_store: ObjectStoreRef,
log_store: LogStoreRef,
filter: Option<Expr>,
state: &'a SessionState,
projection: Option<&'a Vec<usize>>,
Expand All @@ -480,12 +480,12 @@ pub(crate) struct DeltaScanBuilder<'a> {
impl<'a> DeltaScanBuilder<'a> {
pub fn new(
snapshot: &'a DeltaTableState,
object_store: ObjectStoreRef,
log_store: LogStoreRef,
state: &'a SessionState,
) -> Self {
DeltaScanBuilder {
snapshot,
object_store,
log_store,
filter: None,
state,
files: None,
Expand Down Expand Up @@ -532,7 +532,7 @@ impl<'a> DeltaScanBuilder<'a> {
Some(schema) => schema,
None => {
self.snapshot
.physical_arrow_schema(self.object_store.clone())
.physical_arrow_schema(self.log_store.object_store())
.await?
}
};
Expand Down Expand Up @@ -632,7 +632,7 @@ impl<'a> DeltaScanBuilder<'a> {
.create_physical_plan(
self.state,
FileScanConfig {
object_store_url: self.object_store.object_store_url(),
object_store_url: self.log_store.object_store_url(),
file_schema,
file_groups: file_groups.into_values().collect(),
statistics: self.snapshot.datafusion_table_statistics(),
Expand All @@ -647,9 +647,7 @@ impl<'a> DeltaScanBuilder<'a> {
.await?;

Ok(DeltaScan {
table_uri: ensure_table_uri(self.object_store.root_uri())?
.as_str()
.into(),
table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(),
parquet_scan: scan,
config,
logical_schema,
Expand Down Expand Up @@ -686,10 +684,10 @@ impl TableProvider for DeltaTable {
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
register_store(self.object_store(), session.runtime_env().clone());
register_store(self.log_store(), session.runtime_env().clone());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(&self.state, self.object_store(), session)
let scan = DeltaScanBuilder::new(&self.state, self.log_store(), session)
.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
Expand All @@ -714,7 +712,7 @@ impl TableProvider for DeltaTable {
/// A Delta table provider that enables additonal metadata columns to be included during the scan
pub struct DeltaTableProvider {
snapshot: DeltaTableState,
store: ObjectStoreRef,
log_store: LogStoreRef,
config: DeltaScanConfig,
schema: Arc<ArrowSchema>,
}
Expand All @@ -723,13 +721,13 @@ impl DeltaTableProvider {
/// Build a DeltaTableProvider
pub fn try_new(
snapshot: DeltaTableState,
store: ObjectStoreRef,
log_store: LogStoreRef,
config: DeltaScanConfig,
) -> DeltaResult<Self> {
Ok(DeltaTableProvider {
schema: logical_schema(&snapshot, &config)?,
snapshot,
store,
log_store,
config,
})
}
Expand Down Expand Up @@ -764,10 +762,10 @@ impl TableProvider for DeltaTableProvider {
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
register_store(self.store.clone(), session.runtime_env().clone());
register_store(self.log_store.clone(), session.runtime_env().clone());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(&self.snapshot, self.store.clone(), session)
let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
Expand Down Expand Up @@ -1462,7 +1460,7 @@ fn join_batches_with_add_actions(
/// Determine which files contain a record that statisfies the predicate
pub(crate) async fn find_files_scan<'a>(
snapshot: &DeltaTableState,
store: ObjectStoreRef,
log_store: LogStoreRef,
state: &SessionState,
expression: Expr,
) -> DeltaResult<Vec<Add>> {
Expand All @@ -1489,7 +1487,7 @@ pub(crate) async fn find_files_scan<'a>(
// Add path column
used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?);

let scan = DeltaScanBuilder::new(snapshot, store.clone(), state)
let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state)
.with_filter(Some(expression.clone()))
.with_projection(Some(&used_columns))
.with_scan_config(scan_config)
Expand Down Expand Up @@ -1580,7 +1578,7 @@ pub(crate) async fn scan_memory_table(
/// Finds files in a snapshot that match the provided predicate.
pub async fn find_files<'a>(
snapshot: &DeltaTableState,
object_store: ObjectStoreRef,
log_store: LogStoreRef,
state: &SessionState,
predicate: Option<Expr>,
) -> DeltaResult<FindFiles> {
Expand Down Expand Up @@ -1608,8 +1606,7 @@ pub async fn find_files<'a>(
})
} else {
let candidates =
find_files_scan(snapshot, object_store.clone(), state, predicate.to_owned())
.await?;
find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?;

Ok(FindFiles {
candidates,
Expand Down Expand Up @@ -1924,7 +1921,8 @@ mod tests {
.build(&table.state)
.unwrap();

let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap();
let log_store = table.log_store();
let provider = DeltaTableProvider::try_new(table.state, log_store, config).unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down Expand Up @@ -1983,7 +1981,8 @@ mod tests {

let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap();

let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap();
let log_store = table.log_store();
let provider = DeltaTableProvider::try_new(table.state, log_store, config).unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ compile_error!(
pub mod data_catalog;
pub mod errors;
pub mod kernel;
pub mod logstore;
pub mod operations;
pub mod protocol;
pub mod schema;
Expand Down
89 changes: 89 additions & 0 deletions crates/deltalake-core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//! Default implementation of [`LogStore`] for storage backends with atomic put-if-absent operation

use std::sync::Arc;

use bytes::Bytes;
#[cfg(feature = "datafusion")]
use datafusion::execution::object_store::ObjectStoreUrl;
use object_store::{path::Path, ObjectStore};
use url::Url;

use super::{LogStore, LogStoreConfig};
use crate::{
operations::transaction::TransactionError,
storage::{
config::{self, StorageOptions},
ObjectStoreRef,
},
DeltaResult,
};

/// Default [`LogStore`] implementation
#[derive(Debug, Clone)]
pub struct DefaultLogStore {
pub(crate) storage: Arc<dyn ObjectStore>,
config: LogStoreConfig,
}

impl DefaultLogStore {
/// Create a new instance of [`DefaultLogStore`]
///
/// # Arguments
///
/// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located).
/// * `location` - A url corresponding to the storage location of `storage`.
pub fn new(storage: ObjectStoreRef, config: LogStoreConfig) -> Self {
DefaultLogStore { storage, config }
dispanser marked this conversation as resolved.
Show resolved Hide resolved
}

/// Create log store
pub fn try_new(location: Url, options: impl Into<StorageOptions> + Clone) -> DeltaResult<Self> {
let mut options = options.into();
let storage = config::configure_store(&location, &mut options)?;
Ok(Self {
storage: Arc::new(storage),
config: LogStoreConfig { location, options },
})
}
}

#[async_trait::async_trait]
impl LogStore for DefaultLogStore {
async fn read_commit_entry(&self, version: i64) -> DeltaResult<Bytes> {
super::read_commit_entry(self.storage.as_ref(), version).await
}

/// Tries to commit a prepared commit file. Returns [`TransactionError`]
/// if the given `version` already exists. The caller should handle the retry logic itself.
/// This is low-level transaction API. If user does not want to maintain the commit loop then
/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction`
/// with retry logic.
async fn write_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
super::write_commit_entry(self.storage.as_ref(), version, tmp_commit).await
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
super::get_latest_version(self, current_version).await
}

fn object_store(&self) -> Arc<dyn ObjectStore> {
self.storage.clone()
}

fn to_uri(&self, location: &Path) -> String {
super::to_uri(&self.config.location, location)
}

#[cfg(feature = "datafusion")]
fn object_store_url(&self) -> ObjectStoreUrl {
super::object_store_url(&self.config.location)
}

fn config(&self) -> &LogStoreConfig {
&self.config
}
}
Loading
Loading