From b0db7b751e2041619dfc620ad9ff9900fbb5509a Mon Sep 17 00:00:00 2001 From: Ke Date: Sat, 16 Nov 2024 00:06:52 -0800 Subject: [PATCH] Text writer --- CMakeLists.txt | 6 +- velox/connectors/hive/CMakeLists.txt | 1 + velox/connectors/hive/HiveConnectorUtil.cpp | 5 +- velox/dwio/CMakeLists.txt | 1 + velox/dwio/text/CMakeLists.txt | 24 ++++ velox/dwio/text/RegisterTextWriter.cpp | 29 ++++ velox/dwio/text/RegisterTextWriter.h | 25 ++++ velox/dwio/text/tests/CMakeLists.txt | 26 ++++ velox/dwio/text/tests/writer/CMakeLists.txt | 31 ++++ .../dwio/text/tests/writer/TextWriterTest.cpp | 94 +++++++++++++ velox/dwio/text/writer/BufferedWriter.cpp | 61 ++++++++ velox/dwio/text/writer/BufferedWriter.h | 44 ++++++ velox/dwio/text/writer/CMakeLists.txt | 24 ++++ velox/dwio/text/writer/TextWriter.cpp | 132 ++++++++++++++++++ velox/dwio/text/writer/TextWriter.h | 92 ++++++++++++ 15 files changed, 593 insertions(+), 2 deletions(-) create mode 100644 velox/dwio/text/CMakeLists.txt create mode 100644 velox/dwio/text/RegisterTextWriter.cpp create mode 100644 velox/dwio/text/RegisterTextWriter.h create mode 100644 velox/dwio/text/tests/CMakeLists.txt create mode 100644 velox/dwio/text/tests/writer/CMakeLists.txt create mode 100644 velox/dwio/text/tests/writer/TextWriterTest.cpp create mode 100644 velox/dwio/text/writer/BufferedWriter.cpp create mode 100644 velox/dwio/text/writer/BufferedWriter.h create mode 100644 velox/dwio/text/writer/CMakeLists.txt create mode 100644 velox/dwio/text/writer/TextWriter.cpp create mode 100644 velox/dwio/text/writer/TextWriter.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 5929473aeb7bc..2ffb2273024cf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -429,7 +429,11 @@ if(NOT TARGET gflags::gflags) # target even when velox is built as a subproject which uses # `find_package(gflags)` which does not create a globally imported target that # we can ALIAS. - add_library(gflags_gflags INTERFACE) + add_library(gflags_gflags INTERFACE + velox/dwio/text/writer/BufferedWriter.cpp + velox/dwio/text/writer/BufferedWriter.h + velox/dwio/text/tests/writer/TextWriterTest.cpp + ) target_link_libraries(gflags_gflags INTERFACE gflags) add_library(gflags::gflags ALIAS gflags_gflags) endif() diff --git a/velox/connectors/hive/CMakeLists.txt b/velox/connectors/hive/CMakeLists.txt index 2e137dbd6c293..87234e4732597 100644 --- a/velox/connectors/hive/CMakeLists.txt +++ b/velox/connectors/hive/CMakeLists.txt @@ -44,6 +44,7 @@ velox_link_libraries( velox_dwio_orc_reader velox_dwio_parquet_reader velox_dwio_parquet_writer + velox_dwio_text_writer_register velox_file velox_hive_partition_function velox_type_tz diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index fb64001db0d0d..9491e00490fc2 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -1038,8 +1038,11 @@ void updateWriterOptionsFromHiveConfig( case dwio::common::FileFormat::NIMBLE: // No-op for now. break; + case dwio::common::FileFormat::TEXT: + // No-op for now. + break; default: - VELOX_UNSUPPORTED("{}", fileFormat); + VELOX_UNSUPPORTED("Unsupported file format: {}", fileFormat); } } diff --git a/velox/dwio/CMakeLists.txt b/velox/dwio/CMakeLists.txt index efcb3c06bebe3..d4a879e8f6803 100644 --- a/velox/dwio/CMakeLists.txt +++ b/velox/dwio/CMakeLists.txt @@ -35,3 +35,4 @@ add_subdirectory(catalog) add_subdirectory(dwrf) add_subdirectory(orc) add_subdirectory(parquet) +add_subdirectory(text) diff --git a/velox/dwio/text/CMakeLists.txt b/velox/dwio/text/CMakeLists.txt new file mode 100644 index 0000000000000..725012f7e5847 --- /dev/null +++ b/velox/dwio/text/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if(${VELOX_BUILD_TESTING}) +endif() + +add_subdirectory(tests) +add_subdirectory(writer) + +velox_add_library(velox_dwio_text_writer_register RegisterTextWriter.cpp) + +velox_link_libraries(velox_dwio_text_writer_register + velox_dwio_text_writer) diff --git a/velox/dwio/text/RegisterTextWriter.cpp b/velox/dwio/text/RegisterTextWriter.cpp new file mode 100644 index 0000000000000..59a81228e7212 --- /dev/null +++ b/velox/dwio/text/RegisterTextWriter.cpp @@ -0,0 +1,29 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/velox/dwio/text/writer/TextWriter.h" + +namespace facebook::velox::text { + +void registerTextWriterFactory() { + dwio::common::registerWriterFactory(std::make_shared()); +} + +void unregisterTextWriterFactory() { + dwio::common::unregisterWriterFactory(dwio::common::FileFormat::TEXT); +} + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/RegisterTextWriter.h b/velox/dwio/text/RegisterTextWriter.h new file mode 100644 index 0000000000000..08ac312d74704 --- /dev/null +++ b/velox/dwio/text/RegisterTextWriter.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::text { + +void registerTextWriterFactory(); + +void unregisterTextWriterFactory(); + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/tests/CMakeLists.txt b/velox/dwio/text/tests/CMakeLists.txt new file mode 100644 index 0000000000000..34a05424d3662 --- /dev/null +++ b/velox/dwio/text/tests/CMakeLists.txt @@ -0,0 +1,26 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set(TEST_LINK_LIBS + velox_dwio_common_test_utils + velox_vector_test_lib + velox_exec_test_lib + velox_temp_path + GTest::gtest + GTest::gtest_main + GTest::gmock + gflags::gflags + glog::glog) + +add_subdirectory(writer) diff --git a/velox/dwio/text/tests/writer/CMakeLists.txt b/velox/dwio/text/tests/writer/CMakeLists.txt new file mode 100644 index 0000000000000..7a38526dfd630 --- /dev/null +++ b/velox/dwio/text/tests/writer/CMakeLists.txt @@ -0,0 +1,31 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_text_writer_test TextWriterTest.cpp) + +add_test( + NAME velox_text_writer_test + COMMAND velox_text_writer_test + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) + +target_link_libraries( + velox_text_writer_test + velox_dwio_text_writer + velox_dwio_common_test_utils + velox_link_libs + Boost::regex + Folly::folly + ${TEST_LINK_LIBS} + GTest::gtest + fmt::fmt) diff --git a/velox/dwio/text/tests/writer/TextWriterTest.cpp b/velox/dwio/text/tests/writer/TextWriterTest.cpp new file mode 100644 index 0000000000000..a79ae5a4eb60b --- /dev/null +++ b/velox/dwio/text/tests/writer/TextWriterTest.cpp @@ -0,0 +1,94 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/text/writer/TextWriter.h" +#include +#include "velox/common/base/Fs.h" +#include "velox/common/file/FileSystems.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +namespace facebook::velox::text { +std::vector> readFile(const std::string& name) { + std::ifstream file(name); + std::string line; + std::vector> table; + + while (std::getline(file, line)) { + std::stringstream ss(line); + std::string cell; + std::vector row; + + while (std::getline(ss, cell, SOH)) { + row.push_back(cell); + } + + table.push_back(row); + } + return table; +} + +class TextWriterTest : public testing::Test, + public velox::test::VectorTestBase { + public: + void SetUp() override { + velox::filesystems::registerLocalFileSystem(); + dwio::common::LocalFileSink::registerFactory(); + rootPool_ = memory::memoryManager()->addRootPool("TextWriterTests"); + leafPool_ = rootPool_->addLeafChild("TextWriterTests"); + tempPath_ = exec::test::TempDirectoryPath::create(); + } + + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + std::shared_ptr rootPool_; + std::shared_ptr leafPool_; + std::shared_ptr tempPath_; +}; + +TEST_F(TextWriterTest, write) { + auto schema = ROW({"c0", "c1"}, {BIGINT(), BOOLEAN()}); + auto data = makeRowVector( + {"c0", "c1"}, + { + makeFlatVector({1, 2, 3}), + makeConstant(true, 3), + }); + + WriterOptions writerOptions; + auto filePath = + fs::path(fmt::format("{}/test_abort.txt", tempPath_->getPath())); + auto sink = std::make_unique( + filePath, dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto writer = std::make_unique( + std::move(sink), writerOptions, leafPool_, schema); + writer->write(data); + writer->close(); + + std::vector> result = readFile(filePath); + EXPECT_EQ(result.size(), 3); + EXPECT_EQ(result[0].size(), 2); + EXPECT_EQ(result[0][0], "1"); + EXPECT_EQ(result[0][1], "1"); + EXPECT_EQ(result[1][0], "2"); + EXPECT_EQ(result[1][1], "1"); + EXPECT_EQ(result[2][0], "3"); + EXPECT_EQ(result[2][1], "1"); +} +} // namespace facebook::velox::text diff --git a/velox/dwio/text/writer/BufferedWriter.cpp b/velox/dwio/text/writer/BufferedWriter.cpp new file mode 100644 index 0000000000000..abff11c4987b4 --- /dev/null +++ b/velox/dwio/text/writer/BufferedWriter.cpp @@ -0,0 +1,61 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/text/writer/BufferedWriter.h" + +namespace facebook::velox::text { + +BufferedWriter::BufferedWriter( + std::unique_ptr sink, + std::shared_ptr pool) + : pool_(pool), + sink_(std::move(sink)), + buf_(make_unique>(*pool)) {} + +void BufferedWriter::write(char value) { + if (buf_->size() + 1 > DEFAULT_FLUSH_SIZE) { + flush(); + } + buf_->append(value); + offset_++; +} + +void BufferedWriter::write(const char* data, uint64_t dataLength) { + VELOX_CHECK_GE(DEFAULT_FLUSH_SIZE, dataLength, ""); + if (buf_->size() + dataLength > DEFAULT_FLUSH_SIZE) { + flush(); + } + buf_->append(offset_, data, dataLength); + offset_ += dataLength; +} + +void BufferedWriter::close() { + flush(); + sink_->close(); +} + +void BufferedWriter::flush() { + if (buf_->size() == 0) { + return; + } + + sink_->write(std::move(*buf_)); + buf_ = + make_unique>(*pool_); + offset_ = 0; +} + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/writer/BufferedWriter.h b/velox/dwio/text/writer/BufferedWriter.h new file mode 100644 index 0000000000000..0ca6049a32d46 --- /dev/null +++ b/velox/dwio/text/writer/BufferedWriter.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/common/FileSink.h" + +namespace facebook::velox::text { + +constexpr int DEFAULT_FLUSH_SIZE = 1024; + +class BufferedWriter { + public: + BufferedWriter( + std::unique_ptr sink, + std::shared_ptr pool); + + void write(char value); + void write(const char* data, uint64_t dataLength); + void close(); + + private: + void flush(); + + uint64_t offset_ = 0; + std::shared_ptr pool_; + std::unique_ptr sink_; + std::unique_ptr> buf_; +}; + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/writer/CMakeLists.txt b/velox/dwio/text/writer/CMakeLists.txt new file mode 100644 index 0000000000000..e33aa77bd2b4e --- /dev/null +++ b/velox/dwio/text/writer/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +velox_add_library(velox_dwio_text_writer TextWriter.cpp BufferedWriter.cpp) + +velox_link_libraries( + velox_dwio_text_writer + velox_dwio_arrow_parquet_writer_lib + velox_dwio_arrow_parquet_writer_util_lib + velox_dwio_common + velox_arrow_bridge + arrow + fmt::fmt) diff --git a/velox/dwio/text/writer/TextWriter.cpp b/velox/dwio/text/writer/TextWriter.cpp new file mode 100644 index 0000000000000..2d73642f24922 --- /dev/null +++ b/velox/dwio/text/writer/TextWriter.cpp @@ -0,0 +1,132 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/text/writer/TextWriter.h" +#include "velox/common/base/Pointers.h" +#include "velox/exec/MemoryReclaimer.h" + +namespace facebook::velox::text { + +static std::string getColumnData( + const std::shared_ptr& columnDecodedData, + const TypePtr& type, + vector_size_t row) { + if (columnDecodedData->isNullAt(row)) { + return NULLDATA; + } + + switch (type->kind()) { + case TypeKind::BOOLEAN: + return std::to_string(columnDecodedData->valueAt(row)); + case TypeKind::TINYINT: + return std::to_string(columnDecodedData->valueAt(row)); + case TypeKind::SMALLINT: + return std::to_string(columnDecodedData->valueAt(row)); + case TypeKind::INTEGER: + return std::to_string(columnDecodedData->valueAt(row)); + case TypeKind::BIGINT: + return std::to_string(columnDecodedData->valueAt(row)); + case TypeKind::REAL: + return std::to_string(columnDecodedData->valueAt(row)); + case TypeKind::DOUBLE: + return std::to_string(columnDecodedData->valueAt(row)); + case TypeKind::VARCHAR: + return columnDecodedData->valueAt(row).getString(); + case TypeKind::TIMESTAMP: + return columnDecodedData->valueAt(row).toString(); + case TypeKind::VARBINARY: + return columnDecodedData->valueAt(row).getString(); + // TODO + case TypeKind::ARRAY: + case TypeKind::MAP: + case TypeKind::ROW: + case TypeKind::UNKNOWN: + default: + VELOX_NYI("{} is not supported yet in TextWriter", type->kind()); + } +} + +TextWriter::TextWriter( + std::unique_ptr sink, + const WriterOptions& options, + std::shared_ptr pool, + const RowTypePtr schema) + : pool_(pool), schema_(schema) { + bufferedWriter_ = std::make_unique(std::move(sink), pool); +} + +/** + * + */ +void TextWriter::write(const VectorPtr& data) { + VELOX_CHECK( + data->encoding() == VectorEncoding::Simple::ROW, + "Text writer expects row vector input"); + VELOX_USER_CHECK( + data->type()->equivalent(*schema_), + "The file schema type should be equal with the input row vector type."); + const RowVector* dataRowVector = data->as(); + + const auto columnSize = dataRowVector->childrenSize(); + + std::vector> decodedData; + for (size_t column = 0; column < columnSize; ++column) { + auto decodedColumn = std::make_shared(DecodedVector( + *dataRowVector->childAt(column), + SelectivityVector(dataRowVector->size()))); + decodedData.push_back(decodedColumn); + } + + for (vector_size_t row = 0; row < data->size(); ++row) { + for (size_t column = 0; column < columnSize; ++column) { + if (column != 0) { + bufferedWriter_->write(SOH); + } + auto columnData = getColumnData( + decodedData.at(column), schema_->childAt(column), row); + bufferedWriter_->write(columnData.c_str(), columnData.length()); + } + bufferedWriter_->write(NEWLINE); + } +} + +void TextWriter::flush() { + // Do nothing. +} + +void TextWriter::close() { + bufferedWriter_->close(); +} + +void TextWriter::abort() {} + +std::unique_ptr TextWriterFactory::createWriter( + std::unique_ptr sink, + const std::shared_ptr& options) { + auto textOptions = std::dynamic_pointer_cast(options); + VELOX_CHECK_NOT_NULL( + textOptions, "Text writer factory expected a Text WriterOptions object."); + auto leafPool = options->memoryPool->addLeafChild("TextWriter"); + return std::make_unique( + std::move(sink), *textOptions, leafPool, asRowType(options->schema)); +} + +std::unique_ptr +TextWriterFactory::createWriterOptions() { + return std::make_unique(); +} + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/writer/TextWriter.h b/velox/dwio/text/writer/TextWriter.h new file mode 100644 index 0000000000000..597d9fc400825 --- /dev/null +++ b/velox/dwio/text/writer/TextWriter.h @@ -0,0 +1,92 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#define SOH '\x01' +#define NEWLINE '\n' +#define NULLDATA "\\N" + +#include "velox/common/compression/Compression.h" +#include "velox/common/config/Config.h" +#include "velox/dwio/common/DataBuffer.h" +#include "velox/dwio/common/FileSink.h" +#include "velox/dwio/common/FlushPolicy.h" +#include "velox/dwio/common/Options.h" +#include "velox/dwio/common/Writer.h" +#include "velox/dwio/common/WriterFactory.h" +#include "velox/vector/ComplexVector.h" +#include "velox/dwio/text/writer/BufferedWriter.h" + +namespace facebook::velox::text { + +struct WriterOptions : public dwio::common::WriterOptions { + bool enableDictionary = true; + int64_t dataPageSize = 1'024 * 1'024; + int64_t dictionaryPageSizeLimit = 1'024 * 1'024; +}; + +// Writes Velox vectors into a DataSink. +class TextWriter : public dwio::common::Writer { + public: + // Constructs a writer with output to 'sink'. A new row group is + // started every 'rowsInRowGroup' top level rows. 'pool' is used for + // temporary memory. 'properties' specifies Parquet-specific + // options. 'schema' specifies the file's overall schema, and it is always + // non-null. + TextWriter( + std::unique_ptr sink, + const WriterOptions& options, + std::shared_ptr pool, + const RowTypePtr schema); + + ~TextWriter() override = default; + + // Appends 'data' into the writer. + void write(const VectorPtr& data) override; + + void flush() override; + + bool finish() override { + return true; + } + + // Closes 'this', After close, data can no longer be added and the completed + // Parquet file is flushed into 'sink' provided at construction. 'sink' stays + // live until destruction of 'this'. + void close() override; + + void abort() override; + + private: + std::unique_ptr sink_; + std::shared_ptr pool_; + std::unique_ptr bufferedWriter_; + + const RowTypePtr schema_; +}; + +class TextWriterFactory : public dwio::common::WriterFactory { + public: + TextWriterFactory() : WriterFactory(dwio::common::FileFormat::TEXT) {} + + std::unique_ptr createWriter( + std::unique_ptr sink, + const std::shared_ptr& options) override; + + std::unique_ptr createWriterOptions() override; +}; + +} // namespace facebook::velox::text