Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix for bag_split event callbacks called to early with file compression #1643

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,6 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
// Closes the current backed storage and opens the next bagfile.
void split_bagfile() override;

// Checks if the current recording bagfile needs to be split and rolled over to a new file.
bool should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time);

// Prepares the metadata by setting initial values.
void init_metadata() override;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void SequentialCompressionWriter::compression_thread_fn()

while (true) {
std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message;
std::string file;
std::string closed_file_relative_to_bag;
{
std::unique_lock<std::mutex> lock(compressor_queue_mutex_);
// *INDENT-OFF*
Expand All @@ -134,7 +134,7 @@ void SequentialCompressionWriter::compression_thread_fn()
compressor_message_queue_.pop();
compressor_condition_.notify_all();
} else if (!compressor_file_queue_.empty()) {
file = compressor_file_queue_.front();
closed_file_relative_to_bag = compressor_file_queue_.front();
compressor_file_queue_.pop();
} else if (!compression_is_running_) {
// I woke up, all work queues are empty, and the main thread has stopped execution. Exit.
Expand All @@ -151,8 +151,35 @@ void SequentialCompressionWriter::compression_thread_fn()
std::lock_guard<std::recursive_mutex> storage_lock(storage_mutex_);
SequentialWriter::write(compressed_message);
}
} else if (!file.empty()) {
compress_file(*compressor, file);
} else if (!closed_file_relative_to_bag.empty()) {
compress_file(*compressor, closed_file_relative_to_bag);

// Execute callbacks from the base class
static const std::string compressor_ext = "." + compressor->get_compression_identifier();
auto closed_file =
(fs::path(base_folder_) / (closed_file_relative_to_bag + compressor_ext)).generic_string();
std::string new_file;
// To determine, a new_file we can't rely on the metadata_.relative_file_paths.back(),
// because other compressor threads may have already pushed a new item above.
{
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
auto iter = std::find(
metadata_.relative_file_paths.begin(), metadata_.relative_file_paths.end(),
closed_file_relative_to_bag + compressor_ext);
if (iter != metadata_.relative_file_paths.end()) {
++iter;
if (iter != metadata_.relative_file_paths.end()) {
new_file = (fs::path(base_folder_) / *iter).generic_string();
}
}
}
if (!new_file.empty()) {
// The new_file is empty when we compressed the last file after calling close().
// Note: We shall not call 'execute_bag_split_callbacks(closed_file, new_file)' for the
// last compressed file because it will be called inside base class
// SequentialWriter::close().
SequentialWriter::execute_bag_split_callbacks(closed_file, new_file);
}
}
}
}
Expand Down Expand Up @@ -349,14 +376,17 @@ void SequentialCompressionWriter::split_bagfile()
std::lock_guard<std::mutex> compressor_lock(compressor_queue_mutex_);

// Grab last file before calling common splitting logic, which pushes the new filename
const auto last_file = metadata_.relative_file_paths.back();
SequentialWriter::split_bagfile();
const auto last_file_relative_to_bag = metadata_.relative_file_paths.back();
const auto new_file = SequentialWriter::split_bagfile_local(false);

// If we're in FILE compression mode, push this file's name on to the queue so another
// thread will handle compressing it. If not, we can just carry on.
if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) {
compressor_file_queue_.push(last_file);
compressor_file_queue_.push(last_file_relative_to_bag);
compressor_condition_.notify_one();
} else {
auto last_file = (fs::path(base_folder_) / last_file_relative_to_bag).generic_string();
SequentialWriter::execute_bag_split_callbacks(last_file, new_file);
}

if (!storage_) {
Expand Down Expand Up @@ -387,6 +417,7 @@ void SequentialCompressionWriter::write(
// If the compression mode is MESSAGE, push the message into a queue that will be handled
// by the compression threads.
if (compression_options_.compression_mode == CompressionMode::FILE) {
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
SequentialWriter::write(message);
} else {
std::unique_lock<std::mutex> lock(compressor_queue_mutex_);
Expand Down Expand Up @@ -416,17 +447,4 @@ void SequentialCompressionWriter::write(
}
}

bool SequentialCompressionWriter::should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time)
{
if (storage_options_.max_bagfile_size ==
rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT)
{
return false;
} else {
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
return SequentialWriter::should_split_bagfile(current_time);
}
}

} // namespace rosbag2_compression
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
tmp_dir_storage_options_{},
serialization_format_{"rmw_format"}
{
tmp_dir_storage_options_.uri = tmp_dir_.string();
tmp_dir_storage_options_.uri = (tmp_dir_ / bag_base_dir_).string();
fs::remove_all(tmp_dir_);
ON_CALL(*storage_factory_, open_read_write(_)).WillByDefault(Return(storage_));
EXPECT_CALL(*storage_factory_, open_read_write(_)).Times(AtLeast(0));
Expand Down Expand Up @@ -92,7 +92,7 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
DoAll(
Invoke(
[this](const rosbag2_storage::StorageOptions & storage_options) {
fake_storage_size_ = 0;
fake_storage_size_.store(0);
fake_storage_uri_ = storage_options.uri;
// Touch the file
std::ofstream output(storage_options.uri);
Expand All @@ -106,11 +106,11 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).WillByDefault(
[this](std::shared_ptr<const rosbag2_storage::SerializedBagMessage>) {
fake_storage_size_ += 1;
fake_storage_size_.fetch_add(1);
});
ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_;
return fake_storage_size_.load();
});
ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
Expand All @@ -135,7 +135,6 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));
}

const std::string bag_name_ = "SequentialCompressionWriterTest";
std::unique_ptr<StrictMock<MockStorageFactory>> storage_factory_;
std::shared_ptr<NiceMock<MockStorage>> storage_;
std::shared_ptr<StrictMock<MockConverterFactory>> converter_factory_;
Expand All @@ -148,8 +147,10 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
std::unique_ptr<rosbag2_cpp::Writer> writer_;

std::string serialization_format_;
uint64_t fake_storage_size_{};
std::atomic<uint32_t> fake_storage_size_{0}; // Need to be atomic for cache update since it
MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved
// uses in callback from cache_consumer thread
std::string fake_storage_uri_;
const std::string bag_base_dir_ = "test_bag";

const uint64_t kDefaultCompressionQueueSize = 1;
const uint64_t kDefaultCompressionQueueThreads = 4;
Expand Down Expand Up @@ -299,11 +300,10 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative

EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), 3u);

const auto base_path = tmp_dir_storage_options_.uri;
int counter = 0;
for (const auto & path : intercepted_write_metadata_.relative_file_paths) {
std::stringstream ss;
ss << bag_name_ << "_" << counter << "." << DefaultTestCompressor;
ss << bag_base_dir_ << "_" << counter << "." << DefaultTestCompressor;
counter++;
EXPECT_EQ(ss.str(), path);
}
Expand Down Expand Up @@ -426,7 +426,7 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz
}
writer_.reset(); // reset will call writer destructor

EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
EXPECT_EQ(fake_storage_size_.load(), kNumMessagesToWrite);
}

TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority)
Expand Down Expand Up @@ -480,7 +480,148 @@ TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority)
writer_.reset(); // reset will call writer destructor

EXPECT_EQ(detected_thread_priority, *compression_options.thread_priority);
EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
EXPECT_EQ(fake_storage_size_.load(), kNumMessagesToWrite);
}

TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_msg_compression)
{
const uint64_t max_bagfile_size = 3;
const size_t num_splits = 2;
const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
1,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
initializeWriter(compression_options);

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";

tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size;

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

for (auto i = 0; i < message_count; ++i) {
writer_->write(message);
}
writer_->close();

EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), num_splits + 1);

EXPECT_THAT(closed_files.size(), num_splits + 1);
EXPECT_THAT(opened_files.size(), num_splits + 1);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
for (size_t i = 0; i < intercepted_write_metadata_.relative_file_paths.size(); i++) {
std::cout << "metadata file path [" << i << "] = '" <<
intercepted_write_metadata_.relative_file_paths[i] << "'\n";
}
}

ASSERT_GE(opened_files.size(), num_splits + 1);
ASSERT_GE(closed_files.size(), num_splits + 1);
for (size_t i = 0; i < num_splits + 1; i++) {
auto expected_closed =
fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i));
auto expected_opened = (i == num_splits) ?
// The last opened file shall be empty string when we do "writer->close();"
"" : fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1));
EXPECT_EQ(closed_files[i], expected_closed.generic_string()) << "i = " << i;
EXPECT_EQ(opened_files[i], expected_opened.generic_string()) << "i = " << i;
}
}

TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_compression)
{
const uint64_t max_bagfile_size = 3;
const size_t num_splits = 2;
const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
0,
1,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
initializeWriter(compression_options);

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";

tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size;

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

for (auto i = 0; i < message_count; ++i) {
writer_->write(message);
}
writer_->close();

EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), num_splits + 1);

EXPECT_THAT(closed_files.size(), num_splits + 1);
EXPECT_THAT(opened_files.size(), num_splits + 1);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
for (size_t i = 0; i < intercepted_write_metadata_.relative_file_paths.size(); i++) {
std::cout << "metadata file path [" << i << "] = '" <<
intercepted_write_metadata_.relative_file_paths[i] << "'\n";
}
}

ASSERT_GE(opened_files.size(), num_splits + 1);
ASSERT_GE(closed_files.size(), num_splits + 1);
for (size_t i = 0; i < num_splits + 1; i++) {
auto expected_closed =
fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i) +
"." + DefaultTestCompressor);
auto expected_opened = (i == num_splits) ?
// The last opened file shall be empty string when we do "writer->close();"
"" : fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1));
EXPECT_EQ(closed_files[i], expected_closed.generic_string()) << "i = " << i;
EXPECT_EQ(opened_files[i], expected_opened.generic_string()) << "i = " << i;
}
}

INSTANTIATE_TEST_SUITE_P(
Expand Down
5 changes: 5 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
std::shared_ptr<rosbag2_cpp::cache::MessageCacheInterface> message_cache_;
std::unique_ptr<rosbag2_cpp::cache::CacheConsumer> cache_consumer_;

std::string split_bagfile_local(bool execute_callbacks = true);

void execute_bag_split_callbacks(
const std::string & closed_file, const std::string & opened_file);

void switch_to_next_storage();

std::string format_storage_uri(
Expand Down
Loading
Loading