Skip to content

Commit

Permalink
Merge branch 'default-logstore-implementation' of github.com:dispanse…
Browse files Browse the repository at this point in the history
…r/delta-rs into default-logstore-implementation
  • Loading branch information
dispanser committed Nov 1, 2023
2 parents 71b94fb + 5e2c3e3 commit ccf67b3
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 28 deletions.
File renamed without changes.
File renamed without changes.
10 changes: 6 additions & 4 deletions crates/deltalake-core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ use serde_json::{Map, Value};
use super::transaction::commit;
use crate::crate_version;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::logstore::{LogStore, LogStoreRef};
use crate::protocol::{Action, DeltaOperation}; // Txn CommitInfo
use crate::logstore::LogStoreRef;
use crate::storage::DeltaObjectStore;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;

Expand Down Expand Up @@ -238,7 +237,9 @@ impl std::future::IntoFuture for VacuumBuilder {
));
}

let metrics = plan.execute(&this.log_store.object_store()).await?;
let metrics = plan
.execute(this.log_store.as_ref(), &this.snapshot)
.await?;
Ok((
DeltaTable::new_with_state(this.log_store, this.snapshot),
metrics,
Expand All @@ -265,7 +266,7 @@ impl VacuumPlan {
/// Execute the vacuum plan and delete files from underlying storage
pub async fn execute(
self,
store: &DeltaObjectStore,
store: &dyn LogStore,
snapshot: &DeltaTableState,
) -> Result<VacuumMetrics, DeltaTableError> {
if self.files_to_delete.is_empty() {
Expand Down Expand Up @@ -314,6 +315,7 @@ impl VacuumPlan {
.boxed();

let files_deleted = store
.object_store()
.delete_stream(locations)
.map(|res| match res {
Ok(path) => Ok(path.to_string()),
Expand Down
3 changes: 1 addition & 2 deletions crates/deltalake-core/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#![allow(dead_code, unused_variables)]

use bytes::Bytes;
use deltalake_core::logstore::LogStore;
use deltalake_core::operations::create::CreateBuilder;
use deltalake_core::operations::transaction::commit;
use deltalake_core::protocol::{self, Add, DeltaOperation, Remove, SaveMode};
use deltalake_core::storage::DeltaObjectStore;
use deltalake_core::DeltaTableBuilder;
use deltalake::logstore::LogStore;
use deltalake_core::{DeltaTable, Schema};
use object_store::{path::Path, ObjectStore};
use std::any::Any;
Expand Down
29 changes: 15 additions & 14 deletions crates/deltalake-core/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::error::Error;
mod common;

mod local {
use deltalake::{writer::JsonWriter, SchemaTypeMap};
use deltalake_core::{writer::JsonWriter, SchemaTypeMap};

use super::*;
#[tokio::test]
Expand Down Expand Up @@ -153,7 +153,7 @@ mod local {
#[tokio::test]
async fn test_datafusion_simple_query_partitioned() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/delta-0.8.0-partitioned")
let table = deltalake_core::open_table("./tests/data/delta-0.8.0-partitioned")
.await
.unwrap();
ctx.register_table("demo", Arc::new(table))?;
Expand Down Expand Up @@ -182,7 +182,7 @@ mod local {
let source_scan_bytes = {
let ctx = SessionContext::new();
let state = ctx.state();
let source_table = deltalake::open_table("./tests/data/delta-0.8.0-date").await?;
let source_table = deltalake_core::open_table("./tests/data/delta-0.8.0-date").await?;
let source_scan = source_table.scan(&state, None, &[], None).await?;
physical_plan_to_bytes_with_extension_codec(source_scan, &DeltaPhysicalCodec {})?
};
Expand Down Expand Up @@ -261,7 +261,7 @@ mod local {
#[tokio::test]
async fn test_datafusion_date_column() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/delta-0.8.0-date")
let table = deltalake_core::open_table("./tests/data/delta-0.8.0-date")
.await
.unwrap();
ctx.register_table("dates", Arc::new(table))?;
Expand All @@ -282,7 +282,7 @@ mod local {

#[tokio::test]
async fn test_datafusion_stats() -> Result<()> {
let table = deltalake::open_table("./tests/data/delta-0.8.0")
let table = deltalake_core::open_table("./tests/data/delta-0.8.0")
.await
.unwrap();
let statistics = table.state.datafusion_table_statistics();
Expand Down Expand Up @@ -734,7 +734,7 @@ mod local {
assert_eq!(metrics.num_scanned_files(), 1);

// Ensure that tables without stats and partition columns can be pruned for just partitions
// let table = deltalake::open_table("./tests/data/delta-0.8.0-null-partition").await?;
// let table = deltalake_core::open_table("./tests/data/delta-0.8.0-null-partition").await?;

/*
// Logically this should prune. See above
Expand Down Expand Up @@ -764,7 +764,7 @@ mod local {
#[tokio::test]
async fn test_datafusion_partitioned_types() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/delta-2.2.0-partitioned-types")
let table = deltalake_core::open_table("./tests/data/delta-2.2.0-partitioned-types")
.await
.unwrap();
ctx.register_table("demo", Arc::new(table))?;
Expand Down Expand Up @@ -813,7 +813,7 @@ mod local {
#[tokio::test]
async fn test_datafusion_scan_timestamps() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/table_with_edge_timestamps")
let table = deltalake_core::open_table("./tests/data/table_with_edge_timestamps")
.await
.unwrap();
ctx.register_table("demo", Arc::new(table))?;
Expand All @@ -837,7 +837,7 @@ mod local {
#[tokio::test]
async fn test_issue_1292_datafusion_sql_projection() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/http_requests")
let table = deltalake_core::open_table("./tests/data/http_requests")
.await
.unwrap();
ctx.register_table("http_requests", Arc::new(table))?;
Expand Down Expand Up @@ -868,7 +868,7 @@ mod local {
#[tokio::test]
async fn test_issue_1291_datafusion_sql_partitioned_data() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/http_requests")
let table = deltalake_core::open_table("./tests/data/http_requests")
.await
.unwrap();
ctx.register_table("http_requests", Arc::new(table))?;
Expand Down Expand Up @@ -901,7 +901,7 @@ mod local {
#[tokio::test]
async fn test_issue_1374() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/issue_1374")
let table = deltalake_core::open_table("./tests/data/issue_1374")
.await
.unwrap();
ctx.register_table("t", Arc::new(table))?;
Expand Down Expand Up @@ -948,14 +948,15 @@ mod local {
true,
HashMap::new(),
)];
let schema = deltalake::Schema::new(fields);
let table = deltalake::DeltaTableBuilder::from_uri("./tests/data/issue-1619").build()?;
let schema = deltalake_core::Schema::new(fields);
let table =
deltalake_core::DeltaTableBuilder::from_uri("./tests/data/issue-1619").build()?;
let _ = DeltaOps::from(table)
.create()
.with_columns(schema.get_fields().to_owned())
.await?;

let mut table = deltalake::open_table("./tests/data/issue-1619").await?;
let mut table = deltalake_core::open_table("./tests/data/issue-1619").await?;

let mut writer = JsonWriter::for_table(&table).unwrap();
writer
Expand Down
10 changes: 5 additions & 5 deletions crates/deltalake-core/tests/integration_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod local {
assert_eq!(table.get_files(), vec![Path::from(a.path.clone())]);

// Remove added file.
let r = deltalake::protocol::Remove {
let r = deltalake_core::protocol::Remove {
path: a.path.clone(),
deletion_timestamp: Some(chrono::Utc::now().timestamp_millis()),
data_change: false,
Expand Down Expand Up @@ -211,7 +211,7 @@ async fn read_simple_table(integration: &IntegrationContext) -> TestResult {
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 31);
assert!(tombstones.contains(&deltalake::protocol::Remove {
assert!(tombstones.contains(&deltalake_core::protocol::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1587968596250),
data_change: true,
Expand Down Expand Up @@ -247,7 +247,7 @@ async fn read_simple_table_with_version(integration: &IntegrationContext) -> Tes
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 29);
assert!(tombstones.contains(&deltalake::protocol::Remove {
assert!(tombstones.contains(&deltalake_core::protocol::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1587968596250),
data_change: true,
Expand Down Expand Up @@ -292,7 +292,7 @@ mod gcs {
#[tokio::test]
async fn test_gcs_simple() {
let bucket = std::env::var("GCS_DELTA_BUCKET").unwrap();
let table = deltalake::open_table(format!("gs://{}/simple_table", bucket).as_str())
let table = deltalake_core::open_table(format!("gs://{}/simple_table", bucket).as_str())
.await
.unwrap();
assert_eq!(table.version(), 4);
Expand All @@ -310,7 +310,7 @@ mod gcs {
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 31);
assert!(tombstones.contains(&deltalake::protocol::Remove {
assert!(tombstones.contains(&deltalake_core::protocol::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1587968596250),
data_change: true,
Expand Down
5 changes: 2 additions & 3 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,9 +859,8 @@ impl RawDeltaTable {
/// have been deleted or are malformed
#[pyo3(signature = (dry_run = true))]
pub fn repair(&mut self, dry_run: bool) -> PyResult<String> {
let cmd =
FileSystemCheckBuilder::new(self._table.object_store(), self._table.state.clone())
.with_dry_run(dry_run);
let cmd = FileSystemCheckBuilder::new(self._table.log_store(), self._table.state.clone())
.with_dry_run(dry_run);

let (table, metrics) = rt()?
.block_on(cmd.into_future())
Expand Down

0 comments on commit ccf67b3

Please sign in to comment.