Skip to content

Commit

Permalink
Merge branch 'main' into datafusion-44
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Dec 22, 2024
2 parents 1a4a94e + 72311ed commit 62ecf57
Showing 1 changed file with 251 additions and 2 deletions.
253 changes: 251 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1819,10 +1819,15 @@ impl From<Column> for DeltaColumn {

#[cfg(test)]
mod tests {
use crate::kernel::log_segment::PathExt;
use crate::logstore::default_logstore::DefaultLogStore;
use crate::operations::write::SchemaMode;
use crate::storage::ObjectStoreRef;
use crate::writer::test_utils::get_delta_schema;
use arrow::array::StructArray;
use arrow::datatypes::{Field, Schema};
use arrow_array::cast::AsArray;
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::physical_plan::ParquetExec;
Expand All @@ -1831,9 +1836,15 @@ mod tests {
use datafusion_expr::lit;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf;
use object_store::path::Path;
use futures::{stream::BoxStream, StreamExt};
use object_store::{
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectStore,
PutMultipartOpts, PutOptions, PutPayload, PutResult,
};
use serde_json::json;
use std::ops::Deref;
use std::fmt::{Debug, Display, Formatter};
use std::ops::{Deref, Range};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

use super::*;

Expand Down Expand Up @@ -2678,4 +2689,242 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_delta_scan_uses_parquet_column_pruning() {
let small: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["a"]));
let large: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["b"
.repeat(1024)
.as_str()]));
let batch = RecordBatch::try_from_iter(vec![("small", small), ("large", large)]).unwrap();
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch])
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();

let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.unwrap();

let (object_store, mut operations) =
RecordingObjectStore::new(table.log_store().object_store());
let log_store =
DefaultLogStore::new(Arc::new(object_store), table.log_store().config().clone());
let provider = DeltaTableProvider::try_new(
table.snapshot().unwrap().clone(),
Arc::new(log_store),
config,
)
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();
let state = ctx.state();

let df = ctx.sql("select small from test").await.unwrap();
let plan = df.create_physical_plan().await.unwrap();

let mut stream = plan.execute(0, state.task_ctx()).unwrap();
let Some(Ok(batch)) = stream.next().await else {
panic!()
};
assert!(stream.next().await.is_none());
assert_eq!(1, batch.num_columns());
assert_eq!(1, batch.num_rows());
let small = batch.column_by_name("small").unwrap().as_string::<i32>();
assert_eq!("a", small.iter().next().unwrap().unwrap());

let expected = vec![
ObjectStoreOperation::GetRange(LocationType::Data, 4920..4928),
ObjectStoreOperation::GetRange(LocationType::Data, 2399..4920),
ObjectStoreOperation::GetRanges(LocationType::Data, vec![4..58]),
];
let mut actual = Vec::new();
operations.recv_many(&mut actual, 3).await;
assert_eq!(expected, actual);
}

/// Records operations made by the inner object store on a channel obtained at construction
struct RecordingObjectStore {
inner: ObjectStoreRef,
operations: UnboundedSender<ObjectStoreOperation>,
}

impl RecordingObjectStore {
/// Returns an object store and a channel recording all operations made by the inner object store
fn new(inner: ObjectStoreRef) -> (Self, UnboundedReceiver<ObjectStoreOperation>) {
let (operations, operations_receiver) = unbounded_channel();
(Self { inner, operations }, operations_receiver)
}
}

impl Display for RecordingObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Display::fmt(&self.inner, f)
}
}

impl Debug for RecordingObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Debug::fmt(&self.inner, f)
}
}

#[derive(Debug, PartialEq)]
enum ObjectStoreOperation {
GetRanges(LocationType, Vec<Range<usize>>),
GetRange(LocationType, Range<usize>),
GetOpts(LocationType),
Get(LocationType),
}

#[derive(Debug, PartialEq)]
enum LocationType {
Data,
Commit,
}

impl From<&Path> for LocationType {
fn from(value: &Path) -> Self {
if value.is_commit_file() {
LocationType::Commit
} else if value.to_string().starts_with("part-") {
LocationType::Data
} else {
panic!("Unknown location type: {:?}", value)
}
}
}

// Currently only read operations are recorded. Extend as necessary.
#[async_trait]
impl ObjectStore for RecordingObjectStore {
async fn put(
&self,
location: &Path,
payload: PutPayload,
) -> object_store::Result<PutResult> {
self.inner.put(location, payload).await
}

async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> object_store::Result<PutResult> {
self.inner.put_opts(location, payload, opts).await
}

async fn put_multipart(
&self,
location: &Path,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart(location).await
}

async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOpts,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart_opts(location, opts).await
}

async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
self.operations
.send(ObjectStoreOperation::Get(location.into()))
.unwrap();
self.inner.get(location).await
}

async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
self.operations
.send(ObjectStoreOperation::GetOpts(location.into()))
.unwrap();
self.inner.get_opts(location, options).await
}

async fn get_range(
&self,
location: &Path,
range: Range<usize>,
) -> object_store::Result<Bytes> {
self.operations
.send(ObjectStoreOperation::GetRange(
location.into(),
range.clone(),
))
.unwrap();
self.inner.get_range(location, range).await
}

async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> object_store::Result<Vec<Bytes>> {
self.operations
.send(ObjectStoreOperation::GetRanges(
location.into(),
ranges.to_vec(),
))
.unwrap();
self.inner.get_ranges(location, ranges).await
}

async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
self.inner.head(location).await
}

async fn delete(&self, location: &Path) -> object_store::Result<()> {
self.inner.delete(location).await
}

fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, object_store::Result<Path>>,
) -> BoxStream<'a, object_store::Result<Path>> {
self.inner.delete_stream(locations)
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
self.inner.list(prefix)
}

fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
self.inner.list_with_offset(prefix, offset)
}

async fn list_with_delimiter(
&self,
prefix: Option<&Path>,
) -> object_store::Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
}

async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy(from, to).await
}

async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.rename(from, to).await
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy_if_not_exists(from, to).await
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.rename_if_not_exists(from, to).await
}
}
}

0 comments on commit 62ecf57

Please sign in to comment.