Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-45203: [C++][Acero] TeeNode metadata #45211

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ add_arrow_dataset_test(partition_test)
add_arrow_dataset_test(scanner_test)
add_arrow_dataset_test(subtree_test)
add_arrow_dataset_test(write_node_test)
add_arrow_dataset_test(tee_node_test)

if(ARROW_CSV)
add_arrow_dataset_test(file_csv_test)
Expand Down
104 changes: 59 additions & 45 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,53 @@ Status WriteBatch(
return Status::OK();
}

Status ValidateAndPrepareSchema(const WriteNodeOptions& write_node_options,
const std::shared_ptr<Schema>& input_schema,
std::shared_ptr<Schema>& custom_schema) {
custom_schema = write_node_options.custom_schema;
const std::shared_ptr<const KeyValueMetadata>& custom_metadata =
write_node_options.custom_metadata;
const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options;

if (custom_schema != nullptr) {
if (custom_metadata) {
return Status::TypeError(
"Do not provide both custom_metadata and custom_schema. If custom_schema is "
"used then custom_schema->metadata should be used instead of custom_metadata");
}

if (custom_schema->num_fields() != input_schema->num_fields()) {
return Status::TypeError(
"The provided custom_schema did not have the same number of fields as the "
"data. The custom schema can only be used to add metadata / nullability to "
"fields and cannot change the type or number of fields.");
}
for (int field_idx = 0; field_idx < input_schema->num_fields(); field_idx++) {
if (!input_schema->field(field_idx)->type()->Equals(
custom_schema->field(field_idx)->type())) {
return Status::TypeError("The provided custom_schema specified type ",
custom_schema->field(field_idx)->type()->ToString(),
" for field ", field_idx, "and the input data has type ",
input_schema->field(field_idx),
"The custom schema can only be used to add metadata / "
"nullability to fields and "
"cannot change the type or number of fields.");
}
}
} else {
custom_schema = input_schema;
}

if (custom_metadata) {
custom_schema = input_schema->WithMetadata(custom_metadata);
}

if (!write_options.partitioning) {
return Status::Invalid("Must provide partitioning");
}

return Status::OK();
}
class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
public:
DatasetWritingSinkNodeConsumer(std::shared_ptr<Schema> custom_schema,
Expand Down Expand Up @@ -492,51 +539,15 @@ Result<acero::ExecNode*> MakeWriteNode(acero::ExecPlan* plan,

const WriteNodeOptions write_node_options =
checked_cast<const WriteNodeOptions&>(options);
std::shared_ptr<Schema> custom_schema = write_node_options.custom_schema;
const std::shared_ptr<const KeyValueMetadata>& custom_metadata =
write_node_options.custom_metadata;
const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options;

std::shared_ptr<Schema> custom_schema;
const std::shared_ptr<Schema>& input_schema = inputs[0]->output_schema();

if (custom_schema != nullptr) {
if (custom_metadata) {
return Status::TypeError(
"Do not provide both custom_metadata and custom_schema. If custom_schema is "
"used then custom_schema->metadata should be used instead of custom_metadata");
}

if (custom_schema->num_fields() != input_schema->num_fields()) {
return Status::TypeError(
"The provided custom_schema did not have the same number of fields as the "
"data. The custom schema can only be used to add metadata / nullability to "
"fields and cannot change the type or number of fields.");
}
for (int field_idx = 0; field_idx < input_schema->num_fields(); field_idx++) {
if (!input_schema->field(field_idx)->type()->Equals(
custom_schema->field(field_idx)->type())) {
return Status::TypeError("The provided custom_schema specified type ",
custom_schema->field(field_idx)->type()->ToString(),
" for field ", field_idx, "and the input data has type ",
input_schema->field(field_idx),
"The custom schema can only be used to add metadata / "
"nullability to fields and "
"cannot change the type or number of fields.");
}
}
}

if (custom_metadata) {
custom_schema = input_schema->WithMetadata(custom_metadata);
}

if (!write_options.partitioning) {
return Status::Invalid("Must provide partitioning");
}
ARROW_RETURN_NOT_OK(
ValidateAndPrepareSchema(write_node_options, input_schema, custom_schema));

std::shared_ptr<DatasetWritingSinkNodeConsumer> consumer =
std::make_shared<DatasetWritingSinkNodeConsumer>(custom_schema, write_options);

std::make_shared<DatasetWritingSinkNodeConsumer>(custom_schema,
write_node_options.write_options);
ARROW_ASSIGN_OR_RAISE(
auto node,
acero::MakeExecNode("consuming_sink", plan, std::move(inputs),
Expand Down Expand Up @@ -571,11 +582,14 @@ class TeeNode : public acero::MapNode {

const WriteNodeOptions write_node_options =
checked_cast<const WriteNodeOptions&>(options);
const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options;
const std::shared_ptr<Schema> schema = inputs[0]->output_schema();
const std::shared_ptr<Schema>& input_schema = inputs[0]->output_schema();
std::shared_ptr<Schema> custom_schema;

ARROW_RETURN_NOT_OK(
ValidateAndPrepareSchema(write_node_options, input_schema, custom_schema));

return plan->EmplaceNode<TeeNode>(plan, std::move(inputs), std::move(schema),
std::move(write_options));
return plan->EmplaceNode<TeeNode>(plan, std::move(inputs), std::move(custom_schema),
std::move(write_node_options.write_options));
}

const char* kind_name() const override { return "TeeNode"; }
Expand Down
174 changes: 174 additions & 0 deletions cpp/src/arrow/dataset/tee_node_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>

#include <memory>

#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/file_ipc.h"
#include "arrow/dataset/partition.h"
#include "arrow/dataset/plan.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/filesystem/mockfs.h"
#include "arrow/io/interfaces.h"
#include "arrow/ipc/reader.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"

#include "arrow/table.h"
#include "arrow/util/key_value_metadata.h"

namespace arrow {

namespace dataset {

class SimpleTeeNodeTest : public ::testing::Test {
protected:
void SetUp() override {
internal::Initialize();
mock_fs_ = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
auto ipc_format = std::make_shared<dataset::IpcFileFormat>();

fs_write_options_.filesystem = mock_fs_;
fs_write_options_.base_dir = "/my_dataset";
fs_write_options_.basename_template = "{i}.arrow";
fs_write_options_.file_write_options = ipc_format->DefaultWriteOptions();
fs_write_options_.partitioning = dataset::Partitioning::Default();
}

std::shared_ptr<fs::internal::MockFileSystem> mock_fs_;
dataset::FileSystemDatasetWriteOptions fs_write_options_;
};

TEST_F(SimpleTeeNodeTest, CustomNullability) {
// Create an input table with a nullable and a non-nullable type
ExecBatch batch = gen::Gen({gen::Step()})->FailOnError()->ExecBatch(/*num_rows=*/1);
std::shared_ptr<Schema> test_schema =
schema({field("nullable_i32", uint32(), /*nullable=*/true),
field("non_nullable_i32", uint32(), /*nullable=*/false)});
std::shared_ptr<RecordBatch> record_batch =
RecordBatch::Make(test_schema, /*num_rows=*/1,
{batch.values[0].make_array(), batch.values[0].make_array()});
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Table> table,
Table::FromRecordBatches({std::move(record_batch)}));

ASSERT_TRUE(table->field(0)->nullable());
ASSERT_FALSE(table->field(1)->nullable());

dataset::WriteNodeOptions write_options(fs_write_options_);
write_options.custom_schema = test_schema;

// Write the data to disk (these plans use a project because it destroys whatever
// metadata happened to be in the table source node's output schema). This more
// accurately simulates reading from a dataset.
acero::Declaration plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(table)},
{"project",
acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
{"tee", write_options}});

ASSERT_OK(DeclarationToStatus(plan));

// Read the file back out and verify the nullability
ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::RandomAccessFile> file,
mock_fs_->OpenInputFile("/my_dataset/0.arrow"));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> file_reader,
ipc::RecordBatchFileReader::Open(file));
std::shared_ptr<Schema> file_schema = file_reader->schema();

ASSERT_TRUE(file_schema->field(0)->nullable());
ASSERT_FALSE(file_schema->field(1)->nullable());

// Invalid custom schema

// Incorrect # of fields
write_options.custom_schema = schema({});
plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(table)},
{"project",
acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
{"tee", write_options}});

ASSERT_THAT(
DeclarationToStatus(plan),
Raises(StatusCode::TypeError,
::testing::HasSubstr("did not have the same number of fields as the data")));

// Incorrect types
write_options.custom_schema =
schema({field("nullable_i32", int32()), field("non_nullable_i32", int32())});
plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(table)},
{"project",
acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
{"tee", write_options}});
ASSERT_THAT(
DeclarationToStatus(plan),
Raises(StatusCode::TypeError, ::testing::HasSubstr("and the input data has type")));

// Cannot have both custom_schema and custom_metadata
write_options.custom_schema = test_schema;
write_options.custom_metadata = key_value_metadata({{"foo", "bar"}});
plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(std::move(table))},
{"project",
acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
{"tee", write_options}});
ASSERT_THAT(DeclarationToStatus(plan),
Raises(StatusCode::TypeError,
::testing::HasSubstr(
"Do not provide both custom_metadata and custom_schema")));
}

TEST_F(SimpleTeeNodeTest, CustomMetadata) {
constexpr int64_t kRowsPerChunk = 1;
constexpr int64_t kNumChunks = 1;
// Create an input table with no schema metadata
std::shared_ptr<Table> table =
gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerChunk, kNumChunks);

std::shared_ptr<KeyValueMetadata> custom_metadata =
key_value_metadata({{"foo", "bar"}});

dataset::WriteNodeOptions write_options(fs_write_options_);
write_options.custom_metadata = custom_metadata;

// Write the data to disk
acero::Declaration plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(table)},
{"project", acero::ProjectNodeOptions({compute::field_ref(0)})},
{"tee", write_options}});

ASSERT_OK(DeclarationToStatus(plan));

// Read the file back out and verify the schema metadata
ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::RandomAccessFile> file,
mock_fs_->OpenInputFile("/my_dataset/0.arrow"));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> file_reader,
ipc::RecordBatchFileReader::Open(file));
std::shared_ptr<Schema> file_schema = file_reader->schema();

ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata()));
}

} // namespace dataset
} // namespace arrow
Loading