Skip to content

Commit

Permalink
Add tests for split event callbacks when using file and msg compression
Browse files Browse the repository at this point in the history
 - Added "split_event_calls_callback_with_msg_compression" and
"split_event_calls_callback_with_file_compression" uit tests

Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
MichaelOrlov committed Jun 7, 2024
1 parent 620439f commit eecb607
Showing 1 changed file with 151 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
tmp_dir_storage_options_{},
serialization_format_{"rmw_format"}
{
tmp_dir_storage_options_.uri = tmp_dir_.string();
tmp_dir_storage_options_.uri = (tmp_dir_ / bag_base_dir_).string();
fs::remove_all(tmp_dir_);
ON_CALL(*storage_factory_, open_read_write(_)).WillByDefault(Return(storage_));
EXPECT_CALL(*storage_factory_, open_read_write(_)).Times(AtLeast(0));
Expand Down Expand Up @@ -92,7 +92,7 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
DoAll(
Invoke(
[this](const rosbag2_storage::StorageOptions & storage_options) {
fake_storage_size_ = 0;
fake_storage_size_.store(0);
fake_storage_uri_ = storage_options.uri;
// Touch the file
std::ofstream output(storage_options.uri);
Expand All @@ -106,11 +106,11 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).WillByDefault(
[this](std::shared_ptr<const rosbag2_storage::SerializedBagMessage>) {
fake_storage_size_ += 1;
fake_storage_size_.fetch_add(1);
});
ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_;
return fake_storage_size_.load();
});
ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
Expand All @@ -135,7 +135,6 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));
}

const std::string bag_name_ = "SequentialCompressionWriterTest";
std::unique_ptr<StrictMock<MockStorageFactory>> storage_factory_;
std::shared_ptr<NiceMock<MockStorage>> storage_;
std::shared_ptr<StrictMock<MockConverterFactory>> converter_factory_;
Expand All @@ -148,8 +147,10 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
std::unique_ptr<rosbag2_cpp::Writer> writer_;

std::string serialization_format_;
uint64_t fake_storage_size_{};
std::atomic<uint32_t> fake_storage_size_{0}; // Need to be atomic for cache update since it
// uses in callback from cache_consumer thread
std::string fake_storage_uri_;
const std::string bag_base_dir_ = "test_bag";

const uint64_t kDefaultCompressionQueueSize = 1;
const uint64_t kDefaultCompressionQueueThreads = 4;
Expand Down Expand Up @@ -299,11 +300,10 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative

EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), 3u);

const auto base_path = tmp_dir_storage_options_.uri;
int counter = 0;
for (const auto & path : intercepted_write_metadata_.relative_file_paths) {
std::stringstream ss;
ss << bag_name_ << "_" << counter << "." << DefaultTestCompressor;
ss << bag_base_dir_ << "_" << counter << "." << DefaultTestCompressor;
counter++;
EXPECT_EQ(ss.str(), path);
}
Expand Down Expand Up @@ -426,7 +426,7 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz
}
writer_.reset(); // reset will call writer destructor

EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
EXPECT_EQ(fake_storage_size_.load(), kNumMessagesToWrite);
}

TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority)
Expand Down Expand Up @@ -480,7 +480,148 @@ TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority)
writer_.reset(); // reset will call writer destructor

EXPECT_EQ(detected_thread_priority, *compression_options.thread_priority);
EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
EXPECT_EQ(fake_storage_size_.load(), kNumMessagesToWrite);
}

TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_msg_compression)
{
const uint64_t max_bagfile_size = 3;
const size_t num_splits = 2;
const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
1,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
initializeWriter(compression_options);

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";

tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size;

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

for (auto i = 0; i < message_count; ++i) {
writer_->write(message);
}
writer_->close();

EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), num_splits + 1);

EXPECT_THAT(closed_files.size(), num_splits + 1);
EXPECT_THAT(opened_files.size(), num_splits + 1);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
for (size_t i = 0; i < intercepted_write_metadata_.relative_file_paths.size(); i++) {
std::cout << "metadata file path [" << i << "] = '" <<
intercepted_write_metadata_.relative_file_paths[i] << "'\n";
}
}

ASSERT_GE(opened_files.size(), num_splits + 1);
ASSERT_GE(closed_files.size(), num_splits + 1);
for (size_t i = 0; i < num_splits + 1; i++) {
auto expected_closed =
fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i));
auto expected_opened = (i == num_splits) ?
// The last opened file shall be empty string when we do "writer->close();"
"" : fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1));
EXPECT_EQ(closed_files[i], expected_closed.generic_string()) << "i = " << i;
EXPECT_EQ(opened_files[i], expected_opened.generic_string()) << "i = " << i;
}
}

TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_compression)
{
const uint64_t max_bagfile_size = 3;
const size_t num_splits = 2;
const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
0,
1,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
initializeWriter(compression_options);

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";

tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size;

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

for (auto i = 0; i < message_count; ++i) {
writer_->write(message);
}
writer_->close();

EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), num_splits + 1);

EXPECT_THAT(closed_files.size(), num_splits + 1);
EXPECT_THAT(opened_files.size(), num_splits + 1);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
for (size_t i = 0; i < intercepted_write_metadata_.relative_file_paths.size(); i++) {
std::cout << "metadata file path [" << i << "] = '" <<
intercepted_write_metadata_.relative_file_paths[i] << "'\n";
}
}

ASSERT_GE(opened_files.size(), num_splits + 1);
ASSERT_GE(closed_files.size(), num_splits + 1);
for (size_t i = 0; i < num_splits + 1; i++) {
auto expected_closed =
fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i) +
"." + DefaultTestCompressor);
auto expected_opened = (i == num_splits) ?
// The last opened file shall be empty string when we do "writer->close();"
"" : fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1));
EXPECT_EQ(closed_files[i], expected_closed.generic_string()) << "i = " << i;
EXPECT_EQ(opened_files[i], expected_opened.generic_string()) << "i = " << i;
}
}

INSTANTIATE_TEST_SUITE_P(
Expand Down

0 comments on commit eecb607

Please sign in to comment.