Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle transient local messages in snapshot mode when the buffer is full #1887

Draft
wants to merge 1 commit into
base: rolling
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>

#include "rcpputils/thread_safety_annotations.hpp"

Expand All @@ -28,6 +29,7 @@
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

#include "rosbag2_storage/rosbag2_storage/bag_metadata.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"

// This is necessary because of using stl types here. It is completely safe, because
Expand All @@ -52,7 +54,9 @@ class ROSBAG2_CPP_PUBLIC CircularMessageCache
: public MessageCacheInterface
{
public:
explicit CircularMessageCache(size_t max_buffer_size);
explicit CircularMessageCache(
size_t max_buffer_size, const std::unordered_map<std::string,
rosbag2_storage::TopicInformation> & topics_names_to_info);

~CircularMessageCache() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
#include <deque>
#include <memory>
#include <vector>
#include <unordered_map>
#include <string>

#include "rosbag2_cpp/visibility_control.hpp"
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
#include "rosbag2_storage/rosbag2_storage/bag_metadata.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"

// This is necessary because of using stl types here. It is completely safe, because
Expand Down Expand Up @@ -51,7 +54,9 @@ class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer
public:
// Delete default constructor since max_cache_size is required
MessageCacheCircularBuffer() = delete;
explicit MessageCacheCircularBuffer(size_t max_cache_size);
explicit MessageCacheCircularBuffer(
size_t max_cache_size, const std::unordered_map<std::string,
rosbag2_storage::TopicInformation> & topics_names_to_info);

/**
* If buffer size has some space left, we push the message regardless of its size,
Expand All @@ -73,6 +78,7 @@ class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer
std::vector<CacheBufferInterface::buffer_element_t> msg_vector_;
size_t buffer_bytes_size_ {0u};
const size_t max_bytes_size_;
const std::unordered_map<std::string, rosbag2_storage::TopicInformation> & topics_names_to_info_;
};

} // namespace cache
Expand Down
12 changes: 9 additions & 3 deletions rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@ namespace rosbag2_cpp
namespace cache
{

CircularMessageCache::CircularMessageCache(size_t max_buffer_size)
CircularMessageCache::CircularMessageCache(
size_t max_buffer_size,
const std::unordered_map<std::string, rosbag2_storage::TopicInformation> & topics_names_to_info)
{
producer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(max_buffer_size);
consumer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(max_buffer_size);
producer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(
max_buffer_size,
topics_names_to_info);
consumer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(
max_buffer_size,
topics_names_to_info);
}

CircularMessageCache::~CircularMessageCache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <deque>
#include <memory>
#include <vector>
Expand All @@ -25,8 +26,11 @@ namespace rosbag2_cpp
namespace cache
{

MessageCacheCircularBuffer::MessageCacheCircularBuffer(size_t max_cache_size)
: max_bytes_size_(max_cache_size)
MessageCacheCircularBuffer::MessageCacheCircularBuffer(
size_t max_cache_size,
const std::unordered_map<std::string,
rosbag2_storage::TopicInformation> & topics_names_to_info)
: max_bytes_size_(max_cache_size), topics_names_to_info_(topics_names_to_info)
{
}

Expand All @@ -38,10 +42,37 @@ bool MessageCacheCircularBuffer::push(CacheBufferInterface::buffer_element_t msg
return false;
}

// Remove any old items until there is room for new message
// Remove any old items that is no transient local until there is room for new message
while (buffer_bytes_size_ > (max_bytes_size_ - msg->serialized_data->buffer_length)) {
buffer_bytes_size_ -= buffer_.front()->serialized_data->buffer_length;
buffer_.pop_front();
auto is_not_transient_local = [this](buffer_element_t buffer_element)
{
auto it_matching_topic_name = topics_names_to_info_.find(buffer_element->topic_name);
if (it_matching_topic_name != topics_names_to_info_.end()) {
return it_matching_topic_name->second.topic_metadata.offered_qos_profiles.find(
"durability: 1") == std::string::npos;
}
return true;
};

// Find the first element which is non transient local
auto it_first_not_transient = std::find_if(
buffer_.begin(),
buffer_.end(), is_not_transient_local);

size_t position_first_not_transient = std::distance(buffer_.begin(), it_first_not_transient);

// Remove the first non transient msg if found and if older transient messages account for less
// than 10% of the total number of messages in the buffer
// else pop_front
if (it_first_not_transient != buffer_.end() &&
(position_first_not_transient + 1) < buffer_.size() / 10)
{
buffer_bytes_size_ -= it_first_not_transient->get()->serialized_data->buffer_length;
buffer_.erase(it_first_not_transient);
} else {
buffer_.pop_front();
buffer_bytes_size_ -= buffer_.front()->serialized_data->buffer_length;
}
}
// Add new message to end of buffer
buffer_bytes_size_ += msg->serialized_data->buffer_length;
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ void SequentialWriter::open(
if (use_cache_) {
if (storage_options.snapshot_mode) {
message_cache_ = std::make_shared<rosbag2_cpp::cache::CircularMessageCache>(
storage_options.max_cache_size);
storage_options.max_cache_size, topics_names_to_info_);
} else {
message_cache_ = std::make_shared<rosbag2_cpp::cache::MessageCache>(
storage_options.max_cache_size);
Expand Down
6 changes: 4 additions & 2 deletions rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ class CircularMessageCacheTest : public Test
TEST_F(CircularMessageCacheTest, circular_message_cache_overwrites_old) {
const unsigned message_count = 100;

std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_names_to_info;
auto circular_message_cache = std::make_shared<rosbag2_cpp::cache::CircularMessageCache>(
cache_size_);
cache_size_, topics_names_to_info);

for (unsigned i = 0; i < message_count; ++i) {
auto msg = make_test_msg();
Expand Down Expand Up @@ -109,8 +110,9 @@ TEST_F(CircularMessageCacheTest, circular_message_cache_overwrites_old) {
TEST_F(CircularMessageCacheTest, circular_message_cache_ensure_empty) {
const unsigned message_count = 100;

std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_names_to_info;
auto circular_message_cache = std::make_shared<rosbag2_cpp::cache::CircularMessageCache>(
cache_size_);
cache_size_, topics_names_to_info);

for (unsigned i = 0; i < message_count; ++i) {
auto msg = make_test_msg();
Expand Down