Skip to content

Commit

Permalink
Merge pull request #1620 from jiacai2050/feat-reserved-columns
Browse files Browse the repository at this point in the history
  • Loading branch information
zealchen authored Dec 27, 2024
2 parents 4d3bb1c + cf2ff8a commit a8428d2
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 105 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/metric_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ description.workspace = true
[dependencies]
anyhow = { workspace = true }
arrow = { workspace = true }
arrow-schema = { workspace = true }
async-scoped = { workspace = true }
async-trait = { workspace = true }
byteorder = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/metric_engine/src/compaction/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl Executor {
task.inputs.clone(),
None, // projection
Vec::new(), // predicate
true, // keep_sequence
true, // keep_builtin
)?;
let mut stream = execute_stream(plan, Arc::new(TaskContext::default()))
.context("execute datafusion plan")?;
Expand Down
2 changes: 1 addition & 1 deletion src/metric_engine/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use anyhow::Context;
use arrow::{
array::{Array, BinaryArray, RecordBatch},
buffer::OffsetBuffer,
datatypes::DataType,
};
use arrow_schema::DataType;
use tracing::debug;

use crate::{ensure, Result};
Expand Down
108 changes: 54 additions & 54 deletions src/metric_engine/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ use arrow::{
array::{AsArray, RecordBatch},
compute::concat_batches,
datatypes::{
GenericBinaryType, Int32Type, Int64Type, Int8Type, Schema, UInt32Type, UInt64Type,
UInt8Type,
GenericBinaryType, Int32Type, Int64Type, Int8Type, Schema, SchemaRef, UInt32Type,
UInt64Type, UInt8Type,
},
};
use arrow_schema::SchemaRef;
use datafusion::{
common::{internal_err, DFSchema},
datasource::{
Expand Down Expand Up @@ -58,7 +57,9 @@ use crate::{
config::UpdateMode,
operator::{BytesMergeOperator, LastValueOperator, MergeOperator, MergeOperatorRef},
sst::{SstFile, SstPathGenerator},
types::{ObjectStoreRef, StorageSchema, SEQ_COLUMN_NAME},
types::{
ObjectStoreRef, StorageSchema, BUILTIN_COLUMN_NUM, RESERVED_COLUMN_NAME, SEQ_COLUMN_NAME,
},
Result,
};

Expand Down Expand Up @@ -101,28 +102,24 @@ pub(crate) struct MergeExec {
input: Arc<dyn ExecutionPlan>,
/// (0..num_primary_keys) are primary key columns
num_primary_keys: usize,
/// Sequence column index
seq_idx: usize,
/// Operator to merge values when primary keys are the same
value_operator: Arc<dyn MergeOperator>,
/// Whether to keep the sequence column in the output
keep_sequence: bool,
/// Whether to keep the builtin columns in the output
keep_builtin: bool,
}

impl MergeExec {
pub fn new(
input: Arc<dyn ExecutionPlan>,
num_primary_keys: usize,
seq_idx: usize,
value_operator: Arc<dyn MergeOperator>,
keep_sequence: bool,
keep_builtin: bool,
) -> Self {
Self {
input,
num_primary_keys,
seq_idx,
value_operator,
keep_sequence,
keep_builtin,
}
}
}
Expand All @@ -134,8 +131,8 @@ impl DisplayAs for MergeExec {
) -> std::fmt::Result {
write!(
f,
"MergeExec: [primary_keys: {}, seq_idx: {}, keep_sequence: {}]",
self.num_primary_keys, self.seq_idx, self.keep_sequence
"MergeExec: [primary_keys: {}, keep_builtin: {}]",
self.num_primary_keys, self.keep_builtin
)?;
Ok(())
}
Expand Down Expand Up @@ -173,9 +170,8 @@ impl ExecutionPlan for MergeExec {
Ok(Arc::new(MergeExec::new(
Arc::clone(&children[0]),
self.num_primary_keys,
self.seq_idx,
self.value_operator.clone(),
self.keep_sequence,
self.keep_builtin,
)))
}

Expand All @@ -191,19 +187,17 @@ impl ExecutionPlan for MergeExec {
Ok(Box::pin(MergeStream::new(
self.input.execute(partition, context)?,
self.num_primary_keys,
self.seq_idx,
self.value_operator.clone(),
self.keep_sequence,
self.keep_builtin,
)))
}
}

struct MergeStream {
stream: SendableRecordBatchStream,
num_primary_keys: usize,
seq_idx: usize,
value_operator: MergeOperatorRef,
keep_sequence: bool,
keep_builtin: bool,

pending_batch: Option<RecordBatch>,
arrow_schema: SchemaRef,
Expand All @@ -213,22 +207,26 @@ impl MergeStream {
fn new(
stream: SendableRecordBatchStream,
num_primary_keys: usize,
seq_idx: usize,
value_operator: MergeOperatorRef,
keep_sequence: bool,
keep_builtin: bool,
) -> Self {
let arrow_schema = if keep_sequence {
let arrow_schema = if keep_builtin {
let schema = stream.schema();
let found_seq = schema.fields().iter().any(|f| f.name() == SEQ_COLUMN_NAME);
assert!(found_seq, "Sequence column not found");
let found_reserved = schema
.fields()
.iter()
.any(|f| f.name() == RESERVED_COLUMN_NAME);
assert!(found_reserved, "Reserved column not found");
schema
} else {
let fields = stream
.schema()
.fields()
.into_iter()
.filter_map(|f| {
if f.name() == SEQ_COLUMN_NAME {
if StorageSchema::is_builtin_field(f) {
None
} else {
Some(f.clone())
Expand All @@ -243,14 +241,24 @@ impl MergeStream {
Self {
stream,
num_primary_keys,
seq_idx,
value_operator,
keep_sequence,
keep_builtin,
pending_batch: None,
arrow_schema,
}
}

fn maybe_remove_builtin_columns(&self, batch: &mut RecordBatch) {
if self.keep_builtin {
return;
}

// builtin columns are always at the end.
for _ in 0..BUILTIN_COLUMN_NUM {
batch.remove_column(batch.num_columns() - 1);
}
}

fn primary_key_eq(
&self,
lhs: &RecordBatch,
Expand Down Expand Up @@ -329,12 +337,8 @@ impl MergeStream {

let mut output_batches =
concat_batches(&self.stream.schema(), output_batches.iter()).context("concat batch")?;
if self.keep_sequence {
return Ok(Some(output_batches));
}

// Remove seq column
output_batches.remove_column(self.seq_idx);
self.maybe_remove_builtin_columns(&mut output_batches);
Ok(Some(output_batches))
}
}
Expand All @@ -351,9 +355,7 @@ impl Stream for MergeStream {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
let value = if let Some(mut pending) = self.pending_batch.take() {
if !self.keep_sequence {
pending.remove_column(self.seq_idx);
}
self.maybe_remove_builtin_columns(&mut pending);
let res = self
.value_operator
.merge(pending)
Expand Down Expand Up @@ -427,9 +429,9 @@ impl ParquetReader {
pub fn build_df_plan(
&self,
ssts: Vec<SstFile>,
projections: Option<Vec<usize>>,
projection: Option<Vec<usize>>,
predicates: Vec<Expr>,
keep_sequence: bool,
keep_builtin: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
// we won't use url for selecting object_store.
let dummy_url = ObjectStoreUrl::parse("empty://").unwrap();
Expand All @@ -449,7 +451,7 @@ impl ParquetReader {
let scan_config = FileScanConfig::new(dummy_url, self.schema.arrow_schema.clone())
.with_output_ordering(vec![sort_exprs.clone(); file_groups.len()])
.with_file_groups(file_groups)
.with_projection(projections);
.with_projection(projection);

let mut builder = ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
Expand Down Expand Up @@ -480,14 +482,13 @@ impl ParquetReader {
let merge_exec = MergeExec::new(
Arc::new(sort_exec),
self.schema.num_primary_keys,
self.schema.seq_idx,
match self.schema.update_mode {
UpdateMode::Overwrite => Arc::new(LastValueOperator),
UpdateMode::Append => {
Arc::new(BytesMergeOperator::new(self.schema.value_idxes.clone()))
}
},
keep_sequence,
keep_builtin,
);
Ok(Arc::new(merge_exec))
}
Expand Down Expand Up @@ -543,42 +544,41 @@ mod tests {
record_batch!(
("pk1", UInt8, vec![11, 11, 12, 12, 13]),
("value", Binary, vec![b"1", b"2", b"3", b"4", b"5"]),
("seq", UInt8, vec![1, 2, 3, 4, 5])
(SEQ_COLUMN_NAME, UInt8, vec![1, 2, 3, 4, 5]),
(RESERVED_COLUMN_NAME, UInt8, vec![None; 5])
)
.unwrap(),
record_batch!(
("pk1", UInt8, vec![13, 13]),
("value", Binary, vec![b"6", b"7"]),
("seq", UInt8, vec![6, 7])
(SEQ_COLUMN_NAME, UInt8, vec![6, 7]),
(RESERVED_COLUMN_NAME, UInt8, vec![None; 2])
)
.unwrap(),
record_batch!(
("pk1", UInt8, vec![13, 14]),
("value", Binary, vec![b"8", b"9"]),
("seq", UInt8, vec![8, 9])
(SEQ_COLUMN_NAME, UInt8, vec![8, 9]),
(RESERVED_COLUMN_NAME, UInt8, vec![None; 2])
)
.unwrap(),
]);

let stream = MergeStream::new(
stream, 1, 2, merge_op, false, // keep_sequence
stream, 1, // num_primary_keys
merge_op, // merge_operator
false, // keep_builtin
);
check_stream(Box::pin(stream), expected).await;
}

#[tokio::test]
async fn test_build_scan_plan() {
let schema = arrow_schema!(("pk1", UInt8), ("value", UInt8), (SEQ_COLUMN_NAME, UInt64));
let schema = arrow_schema!(("pk1", UInt8), ("value", UInt8));
let store = Arc::new(LocalFileSystem::new());
let reader = ParquetReader::new(
store,
StorageSchema {
arrow_schema: schema.clone(),
num_primary_keys: 1,
seq_idx: 2,
value_idxes: vec![1],
update_mode: UpdateMode::Overwrite,
},
StorageSchema::try_new(schema, 1, UpdateMode::Overwrite).unwrap(),
Arc::new(SstPathGenerator::new("mock".to_string())),
);

Expand All @@ -600,17 +600,17 @@ mod tests {
.collect(),
None,
vec![expr],
false, // keep_sequence
false, // keep_builtin
)
.unwrap();
let display_plan =
datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref())
.indent(true);
assert_eq!(
r#"MergeExec: [primary_keys: 1, seq_idx: 2, keep_sequence: false]
r#"MergeExec: [primary_keys: 1, keep_builtin: false]
SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC]
FilterExec: pk1@0 = 0
ParquetExec: file_groups={3 groups: [[mock/data/100.sst], [mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC]], predicate=pk1@0 = 0, pruning_predicate=CASE WHEN pk1_null_count@2 = pk1_row_count@3 THEN false ELSE pk1_min@0 <= 0 AND 0 <= pk1_max@1 END, required_guarantees=[pk1 in (0)]
ParquetExec: file_groups={3 groups: [[mock/data/100.sst], [mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__, __reserved__], output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC]], predicate=pk1@0 = 0, pruning_predicate=CASE WHEN pk1_null_count@2 = pk1_row_count@3 THEN false ELSE pk1_min@0 <= 0 AND 0 <= pk1_max@1 END, required_guarantees=[pk1 in (0)]
"#,
format!("{display_plan}")
);
Expand Down
Loading

0 comments on commit a8428d2

Please sign in to comment.