Skip to content

Commit

Permalink
Text writer
Browse files Browse the repository at this point in the history
  • Loading branch information
kewang1024 committed Dec 9, 2024
1 parent 21f1e21 commit b0db7b7
Show file tree
Hide file tree
Showing 15 changed files with 593 additions and 2 deletions.
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,11 @@ if(NOT TARGET gflags::gflags)
# target even when velox is built as a subproject which uses
# `find_package(gflags)` which does not create a globally imported target that
# we can ALIAS.
add_library(gflags_gflags INTERFACE)
add_library(gflags_gflags INTERFACE
velox/dwio/text/writer/BufferedWriter.cpp
velox/dwio/text/writer/BufferedWriter.h
velox/dwio/text/tests/writer/TextWriterTest.cpp
)
target_link_libraries(gflags_gflags INTERFACE gflags)
add_library(gflags::gflags ALIAS gflags_gflags)
endif()
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ velox_link_libraries(
velox_dwio_orc_reader
velox_dwio_parquet_reader
velox_dwio_parquet_writer
velox_dwio_text_writer_register
velox_file
velox_hive_partition_function
velox_type_tz
Expand Down
5 changes: 4 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1038,8 +1038,11 @@ void updateWriterOptionsFromHiveConfig(
case dwio::common::FileFormat::NIMBLE:
// No-op for now.
break;
case dwio::common::FileFormat::TEXT:
// No-op for now.
break;
default:
VELOX_UNSUPPORTED("{}", fileFormat);
VELOX_UNSUPPORTED("Unsupported file format: {}", fileFormat);
}
}

Expand Down
1 change: 1 addition & 0 deletions velox/dwio/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ add_subdirectory(catalog)
add_subdirectory(dwrf)
add_subdirectory(orc)
add_subdirectory(parquet)
add_subdirectory(text)
24 changes: 24 additions & 0 deletions velox/dwio/text/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if(${VELOX_BUILD_TESTING})
endif()

add_subdirectory(tests)
add_subdirectory(writer)

velox_add_library(velox_dwio_text_writer_register RegisterTextWriter.cpp)

velox_link_libraries(velox_dwio_text_writer_register
velox_dwio_text_writer)
29 changes: 29 additions & 0 deletions velox/dwio/text/RegisterTextWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/velox/dwio/text/writer/TextWriter.h"

namespace facebook::velox::text {

void registerTextWriterFactory() {
dwio::common::registerWriterFactory(std::make_shared<TextWriterFactory>());
}

void unregisterTextWriterFactory() {
dwio::common::unregisterWriterFactory(dwio::common::FileFormat::TEXT);
}

} // namespace facebook::velox::text
25 changes: 25 additions & 0 deletions velox/dwio/text/RegisterTextWriter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

namespace facebook::velox::text {

void registerTextWriterFactory();

void unregisterTextWriterFactory();

} // namespace facebook::velox::text
26 changes: 26 additions & 0 deletions velox/dwio/text/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set(TEST_LINK_LIBS
velox_dwio_common_test_utils
velox_vector_test_lib
velox_exec_test_lib
velox_temp_path
GTest::gtest
GTest::gtest_main
GTest::gmock
gflags::gflags
glog::glog)

add_subdirectory(writer)
31 changes: 31 additions & 0 deletions velox/dwio/text/tests/writer/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_executable(velox_text_writer_test TextWriterTest.cpp)

add_test(
NAME velox_text_writer_test
COMMAND velox_text_writer_test
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})

target_link_libraries(
velox_text_writer_test
velox_dwio_text_writer
velox_dwio_common_test_utils
velox_link_libs
Boost::regex
Folly::folly
${TEST_LINK_LIBS}
GTest::gtest
fmt::fmt)
94 changes: 94 additions & 0 deletions velox/dwio/text/tests/writer/TextWriterTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/dwio/text/writer/TextWriter.h"
#include <gtest/gtest.h>
#include "velox/common/base/Fs.h"
#include "velox/common/file/FileSystems.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

namespace facebook::velox::text {
std::vector<std::vector<std::string>> readFile(const std::string& name) {
std::ifstream file(name);
std::string line;
std::vector<std::vector<std::string>> table;

while (std::getline(file, line)) {
std::stringstream ss(line);
std::string cell;
std::vector<std::string> row;

while (std::getline(ss, cell, SOH)) {
row.push_back(cell);
}

table.push_back(row);
}
return table;
}

class TextWriterTest : public testing::Test,
public velox::test::VectorTestBase {
public:
void SetUp() override {
velox::filesystems::registerLocalFileSystem();
dwio::common::LocalFileSink::registerFactory();
rootPool_ = memory::memoryManager()->addRootPool("TextWriterTests");
leafPool_ = rootPool_->addLeafChild("TextWriterTests");
tempPath_ = exec::test::TempDirectoryPath::create();
}

protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
}

std::shared_ptr<memory::MemoryPool> rootPool_;
std::shared_ptr<memory::MemoryPool> leafPool_;
std::shared_ptr<exec::test::TempDirectoryPath> tempPath_;
};

TEST_F(TextWriterTest, write) {
auto schema = ROW({"c0", "c1"}, {BIGINT(), BOOLEAN()});
auto data = makeRowVector(
{"c0", "c1"},
{
makeFlatVector<int64_t>({1, 2, 3}),
makeConstant(true, 3),
});

WriterOptions writerOptions;
auto filePath =
fs::path(fmt::format("{}/test_abort.txt", tempPath_->getPath()));
auto sink = std::make_unique<dwio::common::LocalFileSink>(
filePath, dwio::common::FileSink::Options{.pool = leafPool_.get()});
auto writer = std::make_unique<TextWriter>(
std::move(sink), writerOptions, leafPool_, schema);
writer->write(data);
writer->close();

std::vector<std::vector<std::string>> result = readFile(filePath);
EXPECT_EQ(result.size(), 3);
EXPECT_EQ(result[0].size(), 2);
EXPECT_EQ(result[0][0], "1");
EXPECT_EQ(result[0][1], "1");
EXPECT_EQ(result[1][0], "2");
EXPECT_EQ(result[1][1], "1");
EXPECT_EQ(result[2][0], "3");
EXPECT_EQ(result[2][1], "1");
}
} // namespace facebook::velox::text
61 changes: 61 additions & 0 deletions velox/dwio/text/writer/BufferedWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/dwio/text/writer/BufferedWriter.h"

namespace facebook::velox::text {

BufferedWriter::BufferedWriter(
std::unique_ptr<dwio::common::FileSink> sink,
std::shared_ptr<memory::MemoryPool> pool)
: pool_(pool),
sink_(std::move(sink)),
buf_(make_unique<dwio::common::DataBuffer<char>>(*pool)) {}

void BufferedWriter::write(char value) {
if (buf_->size() + 1 > DEFAULT_FLUSH_SIZE) {
flush();
}
buf_->append(value);
offset_++;
}

void BufferedWriter::write(const char* data, uint64_t dataLength) {
VELOX_CHECK_GE(DEFAULT_FLUSH_SIZE, dataLength, "");
if (buf_->size() + dataLength > DEFAULT_FLUSH_SIZE) {
flush();
}
buf_->append(offset_, data, dataLength);
offset_ += dataLength;
}

void BufferedWriter::close() {
flush();
sink_->close();
}

void BufferedWriter::flush() {
if (buf_->size() == 0) {
return;
}

sink_->write(std::move(*buf_));
buf_ =
make_unique<dwio::common::DataBuffer<char>>(*pool_);
offset_ = 0;
}

} // namespace facebook::velox::text
44 changes: 44 additions & 0 deletions velox/dwio/text/writer/BufferedWriter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/common/FileSink.h"

namespace facebook::velox::text {

constexpr int DEFAULT_FLUSH_SIZE = 1024;

class BufferedWriter {
public:
BufferedWriter(
std::unique_ptr<dwio::common::FileSink> sink,
std::shared_ptr<memory::MemoryPool> pool);

void write(char value);
void write(const char* data, uint64_t dataLength);
void close();

private:
void flush();

uint64_t offset_ = 0;
std::shared_ptr<memory::MemoryPool> pool_;
std::unique_ptr<dwio::common::FileSink> sink_;
std::unique_ptr<dwio::common::DataBuffer<char>> buf_;
};

} // namespace facebook::velox::text
Loading

0 comments on commit b0db7b7

Please sign in to comment.