Skip to content

Commit 26dc2c5

Browse files
author
Guillaume Doisy
committed
poc
Signed-off-by: Guillaume Doisy <[email protected]>
1 parent 0823be2 commit 26dc2c5

File tree

6 files changed

+62
-13
lines changed

6 files changed

+62
-13
lines changed

rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <memory>
2121
#include <mutex>
2222
#include <string>
23+
#include <unordered_map>
2324

2425
#include "rcpputils/thread_safety_annotations.hpp"
2526

@@ -28,6 +29,7 @@
2829
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
2930
#include "rosbag2_cpp/visibility_control.hpp"
3031

32+
#include "rosbag2_storage/rosbag2_storage/bag_metadata.hpp"
3133
#include "rosbag2_storage/serialized_bag_message.hpp"
3234

3335
// This is necessary because of using stl types here. It is completely safe, because
@@ -52,7 +54,9 @@ class ROSBAG2_CPP_PUBLIC CircularMessageCache
5254
: public MessageCacheInterface
5355
{
5456
public:
55-
explicit CircularMessageCache(size_t max_buffer_size);
57+
explicit CircularMessageCache(
58+
size_t max_buffer_size, const std::unordered_map<std::string,
59+
rosbag2_storage::TopicInformation> & topics_names_to_info);
5660

5761
~CircularMessageCache() override;
5862

rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp

+7-1
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
#include <deque>
1919
#include <memory>
2020
#include <vector>
21+
#include <unordered_map>
22+
#include <string>
2123

2224
#include "rosbag2_cpp/visibility_control.hpp"
2325
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
26+
#include "rosbag2_storage/rosbag2_storage/bag_metadata.hpp"
2427
#include "rosbag2_storage/serialized_bag_message.hpp"
2528

2629
// This is necessary because of using stl types here. It is completely safe, because
@@ -51,7 +54,9 @@ class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer
5154
public:
5255
// Delete default constructor since max_cache_size is required
5356
MessageCacheCircularBuffer() = delete;
54-
explicit MessageCacheCircularBuffer(size_t max_cache_size);
57+
explicit MessageCacheCircularBuffer(
58+
size_t max_cache_size, const std::unordered_map<std::string,
59+
rosbag2_storage::TopicInformation> & topics_names_to_info);
5560

5661
/**
5762
* If buffer size has some space left, we push the message regardless of its size,
@@ -73,6 +78,7 @@ class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer
7378
std::vector<CacheBufferInterface::buffer_element_t> msg_vector_;
7479
size_t buffer_bytes_size_ {0u};
7580
const size_t max_bytes_size_;
81+
const std::unordered_map<std::string, rosbag2_storage::TopicInformation> & topics_names_to_info_;
7682
};
7783

7884
} // namespace cache

rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp

+9-3
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,16 @@ namespace rosbag2_cpp
2727
namespace cache
2828
{
2929

30-
CircularMessageCache::CircularMessageCache(size_t max_buffer_size)
30+
CircularMessageCache::CircularMessageCache(
31+
size_t max_buffer_size,
32+
const std::unordered_map<std::string, rosbag2_storage::TopicInformation> & topics_names_to_info)
3133
{
32-
producer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(max_buffer_size);
33-
consumer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(max_buffer_size);
34+
producer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(
35+
max_buffer_size,
36+
topics_names_to_info);
37+
consumer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(
38+
max_buffer_size,
39+
topics_names_to_info);
3440
}
3541

3642
CircularMessageCache::~CircularMessageCache()

rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp

+36-5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#include <algorithm>
1516
#include <deque>
1617
#include <memory>
1718
#include <vector>
@@ -25,8 +26,11 @@ namespace rosbag2_cpp
2526
namespace cache
2627
{
2728

28-
MessageCacheCircularBuffer::MessageCacheCircularBuffer(size_t max_cache_size)
29-
: max_bytes_size_(max_cache_size)
29+
MessageCacheCircularBuffer::MessageCacheCircularBuffer(
30+
size_t max_cache_size,
31+
const std::unordered_map<std::string,
32+
rosbag2_storage::TopicInformation> & topics_names_to_info)
33+
: max_bytes_size_(max_cache_size), topics_names_to_info_(topics_names_to_info)
3034
{
3135
}
3236

@@ -38,10 +42,37 @@ bool MessageCacheCircularBuffer::push(CacheBufferInterface::buffer_element_t msg
3842
return false;
3943
}
4044

41-
// Remove any old items until there is room for new message
45+
// Remove any old items that is no transient local until there is room for new message
4246
while (buffer_bytes_size_ > (max_bytes_size_ - msg->serialized_data->buffer_length)) {
43-
buffer_bytes_size_ -= buffer_.front()->serialized_data->buffer_length;
44-
buffer_.pop_front();
47+
auto is_not_transient_local = [this](buffer_element_t buffer_element)
48+
{
49+
auto it_matching_topic_name = topics_names_to_info_.find(buffer_element->topic_name);
50+
if (it_matching_topic_name != topics_names_to_info_.end()) {
51+
return it_matching_topic_name->second.topic_metadata.offered_qos_profiles.find(
52+
"durability: 1") == std::string::npos;
53+
}
54+
return true;
55+
};
56+
57+
// Find the first element which is non transient local
58+
auto it_first_not_transient = std::find_if(
59+
buffer_.begin(),
60+
buffer_.end(), is_not_transient_local);
61+
62+
size_t position_first_not_transient = std::distance(buffer_.begin(), it_first_not_transient);
63+
64+
// Remove the first non transient msg if found and if older transient messages account for less
65+
// than 10% of the total number of messages in the buffer
66+
// else pop_front
67+
if (it_first_not_transient != buffer_.end() &&
68+
(position_first_not_transient + 1) < buffer_.size() / 10)
69+
{
70+
buffer_bytes_size_ -= it_first_not_transient->get()->serialized_data->buffer_length;
71+
buffer_.erase(it_first_not_transient);
72+
} else {
73+
buffer_.pop_front();
74+
buffer_bytes_size_ -= buffer_.front()->serialized_data->buffer_length;
75+
}
4576
}
4677
// Add new message to end of buffer
4778
buffer_bytes_size_ += msg->serialized_data->buffer_length;

rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ void SequentialWriter::open(
160160
if (use_cache_) {
161161
if (storage_options.snapshot_mode) {
162162
message_cache_ = std::make_shared<rosbag2_cpp::cache::CircularMessageCache>(
163-
storage_options.max_cache_size);
163+
storage_options.max_cache_size, topics_names_to_info_);
164164
} else {
165165
message_cache_ = std::make_shared<rosbag2_cpp::cache::MessageCache>(
166166
storage_options.max_cache_size);

rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,9 @@ class CircularMessageCacheTest : public Test
7272
TEST_F(CircularMessageCacheTest, circular_message_cache_overwrites_old) {
7373
const unsigned message_count = 100;
7474

75+
std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_names_to_info;
7576
auto circular_message_cache = std::make_shared<rosbag2_cpp::cache::CircularMessageCache>(
76-
cache_size_);
77+
cache_size_, topics_names_to_info);
7778

7879
for (unsigned i = 0; i < message_count; ++i) {
7980
auto msg = make_test_msg();
@@ -109,8 +110,9 @@ TEST_F(CircularMessageCacheTest, circular_message_cache_overwrites_old) {
109110
TEST_F(CircularMessageCacheTest, circular_message_cache_ensure_empty) {
110111
const unsigned message_count = 100;
111112

113+
std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_names_to_info;
112114
auto circular_message_cache = std::make_shared<rosbag2_cpp::cache::CircularMessageCache>(
113-
cache_size_);
115+
cache_size_, topics_names_to_info);
114116

115117
for (unsigned i = 0; i < message_count; ++i) {
116118
auto msg = make_test_msg();

0 commit comments

Comments
 (0)