Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Doisy committed Dec 23, 2024
1 parent 0823be2 commit e77ac12
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>

#include "rcpputils/thread_safety_annotations.hpp"

Expand All @@ -28,6 +29,7 @@
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

#include "rosbag2_storage/rosbag2_storage/bag_metadata.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"

// This is necessary because of using stl types here. It is completely safe, because
Expand All @@ -52,7 +54,9 @@ class ROSBAG2_CPP_PUBLIC CircularMessageCache
: public MessageCacheInterface
{
public:
explicit CircularMessageCache(size_t max_buffer_size);
explicit CircularMessageCache(
size_t max_buffer_size, const std::unordered_map<std::string,
rosbag2_storage::TopicInformation> & topics_names_to_info);

~CircularMessageCache() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
#include <deque>
#include <memory>
#include <vector>
#include <unordered_map>
#include <string>

#include "rosbag2_cpp/visibility_control.hpp"
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
#include "rosbag2_storage/rosbag2_storage/bag_metadata.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"

// This is necessary because of using stl types here. It is completely safe, because
Expand Down Expand Up @@ -51,7 +54,9 @@ class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer
public:
// Delete default constructor since max_cache_size is required
MessageCacheCircularBuffer() = delete;
explicit MessageCacheCircularBuffer(size_t max_cache_size);
explicit MessageCacheCircularBuffer(
size_t max_cache_size, const std::unordered_map<std::string,
rosbag2_storage::TopicInformation> & topics_names_to_info);

/**
* If buffer size has some space left, we push the message regardless of its size,
Expand All @@ -73,6 +78,7 @@ class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer
std::vector<CacheBufferInterface::buffer_element_t> msg_vector_;
size_t buffer_bytes_size_ {0u};
const size_t max_bytes_size_;
const std::unordered_map<std::string, rosbag2_storage::TopicInformation> & topics_names_to_info_;
};

} // namespace cache
Expand Down
12 changes: 9 additions & 3 deletions rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@ namespace rosbag2_cpp
namespace cache
{

CircularMessageCache::CircularMessageCache(size_t max_buffer_size)
CircularMessageCache::CircularMessageCache(
size_t max_buffer_size,
const std::unordered_map<std::string, rosbag2_storage::TopicInformation> & topics_names_to_info)
{
producer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(max_buffer_size);
consumer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(max_buffer_size);
producer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(
max_buffer_size,
topics_names_to_info);
consumer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(
max_buffer_size,
topics_names_to_info);
}

CircularMessageCache::~CircularMessageCache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <deque>
#include <memory>
#include <vector>
Expand All @@ -25,8 +26,11 @@ namespace rosbag2_cpp
namespace cache
{

MessageCacheCircularBuffer::MessageCacheCircularBuffer(size_t max_cache_size)
: max_bytes_size_(max_cache_size)
MessageCacheCircularBuffer::MessageCacheCircularBuffer(
size_t max_cache_size,
const std::unordered_map<std::string,
rosbag2_storage::TopicInformation> & topics_names_to_info)
: max_bytes_size_(max_cache_size), topics_names_to_info_(topics_names_to_info)
{
}

Expand All @@ -38,10 +42,37 @@ bool MessageCacheCircularBuffer::push(CacheBufferInterface::buffer_element_t msg
return false;
}

// Remove any old items until there is room for new message
// Remove any old items that is no transient local until there is room for new message
while (buffer_bytes_size_ > (max_bytes_size_ - msg->serialized_data->buffer_length)) {
buffer_bytes_size_ -= buffer_.front()->serialized_data->buffer_length;
buffer_.pop_front();
auto is_not_transient_local = [this](buffer_element_t buffer_element)
{
auto it_matching_topic_name = topics_names_to_info_.find(buffer_element->topic_name);
if (it_matching_topic_name != topics_names_to_info_.end()) {
return it_matching_topic_name->second.topic_metadata.offered_qos_profiles.find(
"durability: 1") == std::string::npos;
}
return true;
};

// Find the first element which is non transient local
auto it_first_not_transient = std::find_if(
buffer_.begin(),
buffer_.end(), is_not_transient_local);

size_t position_first_not_transient = std::distance(buffer_.begin(), it_first_not_transient);

// Remove the first non transient msg if found and if older transient messages account for less
// than 10% of the total number of messages in the buffer
// else pop_front
if (it_first_not_transient != buffer_.end() &&
(position_first_not_transient + 1) < buffer_.size() / 10)
{
buffer_bytes_size_ -= it_first_not_transient->get()->serialized_data->buffer_length;
buffer_.erase(it_first_not_transient);
} else {
buffer_.pop_front();
buffer_bytes_size_ -= buffer_.front()->serialized_data->buffer_length;
}
}
// Add new message to end of buffer
buffer_bytes_size_ += msg->serialized_data->buffer_length;
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ void SequentialWriter::open(
if (use_cache_) {
if (storage_options.snapshot_mode) {
message_cache_ = std::make_shared<rosbag2_cpp::cache::CircularMessageCache>(
storage_options.max_cache_size);
storage_options.max_cache_size, topics_names_to_info_);
} else {
message_cache_ = std::make_shared<rosbag2_cpp::cache::MessageCache>(
storage_options.max_cache_size);
Expand Down
6 changes: 4 additions & 2 deletions rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ class CircularMessageCacheTest : public Test
TEST_F(CircularMessageCacheTest, circular_message_cache_overwrites_old) {
const unsigned message_count = 100;

std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_names_to_info;
auto circular_message_cache = std::make_shared<rosbag2_cpp::cache::CircularMessageCache>(
cache_size_);
cache_size_, topics_names_to_info);

for (unsigned i = 0; i < message_count; ++i) {
auto msg = make_test_msg();
Expand Down Expand Up @@ -109,8 +110,9 @@ TEST_F(CircularMessageCacheTest, circular_message_cache_overwrites_old) {
TEST_F(CircularMessageCacheTest, circular_message_cache_ensure_empty) {
const unsigned message_count = 100;

std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_names_to_info;
auto circular_message_cache = std::make_shared<rosbag2_cpp::cache::CircularMessageCache>(
cache_size_);
cache_size_, topics_names_to_info);

for (unsigned i = 0; i < message_count; ++i) {
auto msg = make_test_msg();
Expand Down

0 comments on commit e77ac12

Please sign in to comment.