Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): enhance parquet file source #19221

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 24 additions & 21 deletions e2e_test/s3/fs_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,10 @@ def gen_data(file_num, item_num_per_file):
[{
'id': file_id * item_num_per_file + item_id,
'name': f'{file_id}_{item_id}_{file_id * item_num_per_file + item_id}',
'sex': item_id % 2,
'mark': (-1) ** (item_id % 2),
'test_int': pa.scalar(1, type=pa.int32()),
'test_real': pa.scalar(4.0, type=pa.float32()),
'test_double_precision': pa.scalar(5.0, type=pa.float64()),
'test_varchar': pa.scalar('7', type=pa.string()),
'test_bytea': pa.scalar(b'\xDe00BeEf', type=pa.binary()),
'test_date': pa.scalar(datetime.now().date(), type=pa.date32()),
'test_time': pa.scalar(datetime.now().time(), type=pa.time64('us')),
'test_timestamp': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')),
'test_timestamptz': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('us', tz='+00:00')),
'test_timestamp_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s')),
'test_timestamp_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms')),
'test_timestamp_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')),
'test_timestamp_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns')),
} for item_id in range(item_num_per_file)]
for file_id in range(file_num)
]
Expand Down Expand Up @@ -60,8 +53,10 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp
) WITH (
connector = 's3',
match_pattern = '*.parquet',
Expand Down Expand Up @@ -128,8 +123,10 @@ def _table():
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
Expand Down Expand Up @@ -158,8 +155,10 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp
) WITH (
connector = 's3',
match_pattern = 'test_parquet_sink/*.parquet',
Expand Down Expand Up @@ -196,8 +195,10 @@ def _table():
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
Expand Down Expand Up @@ -226,8 +227,10 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp
) WITH (
connector = 's3',
match_pattern = 'test_json_sink/*.json',
Expand Down
66 changes: 65 additions & 1 deletion src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ pub trait FromArrow {
if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
return self.from_extension_array(type_name, array);
}

match array.data_type() {
Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()),
Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()),
Expand All @@ -584,12 +583,30 @@ pub trait FromArrow {
Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
Timestamp(Second, None) => {
self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Second, Some(_)) => {
self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Millisecond, None) => {
self.from_timestampms_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Millisecond, Some(_)) => {
self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Microsecond, None) => {
self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Microsecond, Some(_)) => {
self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Nanosecond, None) => {
self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Nanosecond, Some(_)) => {
self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
}
Interval(MonthDayNano) => {
self.from_interval_array(array.as_any().downcast_ref().unwrap())
}
Expand Down Expand Up @@ -692,6 +709,33 @@ pub trait FromArrow {
Ok(ArrayImpl::Time(array.into()))
}

fn from_timestampsecond_array(
&self,
array: &arrow_array::TimestampSecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}
fn from_timestampsecond_some_array(
&self,
array: &arrow_array::TimestampSecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampms_array(
&self,
array: &arrow_array::TimestampMillisecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampms_some_array(
&self,
array: &arrow_array::TimestampMillisecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamptz(array.into()))
}
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved

fn from_timestampus_array(
&self,
array: &arrow_array::TimestampMicrosecondArray,
Expand All @@ -706,6 +750,20 @@ pub trait FromArrow {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_timestampns_array(
&self,
array: &arrow_array::TimestampNanosecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampns_some_array(
&self,
array: &arrow_array::TimestampNanosecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_interval_array(
&self,
array: &arrow_array::IntervalMonthDayNanoArray,
Expand Down Expand Up @@ -854,8 +912,14 @@ converts!(Utf8Array, arrow_array::StringArray);
converts!(Utf8Array, arrow_array::LargeStringArray);
converts!(DateArray, arrow_array::Date32Array, @map);
converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampSecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampMillisecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampMicrosecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampNanosecondArray, @map);
converts!(TimestamptzArray, arrow_array::TimestampSecondArray, @map);
converts!(TimestamptzArray, arrow_array::TimestampMillisecondArray, @map);
converts!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, @map);
converts!(TimestamptzArray, arrow_array::TimestampNanosecondArray, @map);
converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map);
converts!(SerialArray, arrow_array::Int64Array, @map);

Expand Down
29 changes: 8 additions & 21 deletions src/connector/src/parser/parquet_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use deltalake::parquet::arrow::async_reader::AsyncFileReader;
use futures_async_stream::try_stream;
use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert};
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk};
use risingwave_common::bail;
use risingwave_common::types::{Datum, ScalarImpl};
Expand Down Expand Up @@ -104,32 +104,19 @@ impl ParquetParser {
crate::source::SourceColumnType::Normal => {
match source_column.is_hidden_addition_col {
false => {
let rw_data_type = &source_column.data_type;
let rw_data_type: &risingwave_common::types::DataType =
&source_column.data_type;
let rw_column_name = &source_column.name;

if let Some(parquet_column) =
record_batch.column_by_name(rw_column_name)
{
let arrow_field = IcebergArrowConvert
.to_arrow_field(rw_column_name, rw_data_type)?;
let converted_arrow_data_type: &arrow_schema_iceberg::DataType =
arrow_field.data_type();
if converted_arrow_data_type == parquet_column.data_type() {
let array_impl = IcebergArrowConvert
.array_from_arrow_array(&arrow_field, parquet_column)?;
let column = Arc::new(array_impl);
chunk_columns.push(column);
} else {
// data type mismatch, this column is set to null.
let mut array_builder = ArrayBuilderImpl::with_type(
column_size,
rw_data_type.clone(),
);

array_builder.append_n_null(record_batch.num_rows());
let res = array_builder.finish();
let column = Arc::new(res);
chunk_columns.push(column);
}
let array_impl = IcebergArrowConvert
.array_from_arrow_array(&arrow_field, parquet_column)?;
let column = Arc::new(array_impl);
chunk_columns.push(column);
} else {
// For columns defined in the source schema but not present in the Parquet file, null values are filled in.
let mut array_builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use opendal::Operator;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::FileMetaData;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::arrow::arrow_schema_iceberg;
use risingwave_common::array::StreamChunk;
use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio::io::{AsyncRead, BufReader};
Expand Down Expand Up @@ -269,10 +269,10 @@ pub fn extract_valid_column_indices(
.iter()
.position(|&name| name == column.name)
.and_then(|pos| {
let arrow_field = IcebergArrowConvert
.to_arrow_field(&column.name, &column.data_type)
.ok()?;
if &arrow_field == converted_arrow_schema.field(pos) {
if is_data_type_matching(
&column.data_type,
converted_arrow_schema.field(pos).data_type(),
) {
Some(pos)
} else {
None
Expand All @@ -285,3 +285,85 @@ pub fn extract_valid_column_indices(
None => Ok(vec![]),
}
}

/// Checks if the data type in RisingWave matches the data type in a Parquet(arrow) file.
///
/// This function compares the `DataType` from RisingWave with the `DataType` from
/// Parquet file, returning `true` if they are compatible. Specifically, for `Timestamp`
/// types, it ensures that any of the four `TimeUnit` variants from Parquet
/// (i.e., `Second`, `Millisecond`, `Microsecond`, and `Nanosecond`) can be matched
/// with the corresponding `Timestamp` type in RisingWave.
pub fn is_data_type_matching(
rw_data_type: &risingwave_common::types::DataType,
arrow_data_type: &arrow_schema_iceberg::DataType,
) -> bool {
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
match rw_data_type {
risingwave_common::types::DataType::Boolean => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Boolean)
}
risingwave_common::types::DataType::Int16 => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Int16)
}
risingwave_common::types::DataType::Int32 => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Int32)
}
risingwave_common::types::DataType::Int64 => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Int64)
}
risingwave_common::types::DataType::Float32 => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Float32)
}
risingwave_common::types::DataType::Float64 => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Float64)
}
risingwave_common::types::DataType::Decimal => {
matches!(
arrow_data_type,
arrow_schema_iceberg::DataType::Decimal128(_, _)
) || matches!(
arrow_data_type,
arrow_schema_iceberg::DataType::Decimal256(_, _)
)
}
risingwave_common::types::DataType::Date => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Date32)
|| matches!(arrow_data_type, arrow_schema_iceberg::DataType::Date64)
}
risingwave_common::types::DataType::Varchar => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Utf8)
}
risingwave_common::types::DataType::Time => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Time32(_))
|| matches!(arrow_data_type, arrow_schema_iceberg::DataType::Time64(_))
}
risingwave_common::types::DataType::Timestamp => {
matches!(
arrow_data_type,
arrow_schema_iceberg::DataType::Timestamp(_, _)
)
}
risingwave_common::types::DataType::Timestamptz => {
matches!(
arrow_data_type,
arrow_schema_iceberg::DataType::Timestamp(_, _)
)
}
risingwave_common::types::DataType::Interval => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Interval(_))
}
risingwave_common::types::DataType::List(inner_type) => {
if let arrow_schema_iceberg::DataType::List(field_ref) = arrow_data_type {
let inner_rw_type = inner_type.clone();
let inner_arrow_type = field_ref.data_type();
is_data_type_matching(&inner_rw_type, inner_arrow_type)
} else {
false
}
}
risingwave_common::types::DataType::Map(_) => {
// Directly return false for Map types
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
false
}
_ => false, // Handle other data types as necessary
}
}
Loading