Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] The time bounded snapshot feature #1844

Draft
wants to merge 6 commits into
base: rolling
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from rclpy.qos import InvalidQoSProfileException
from ros2bag.api import add_writer_storage_plugin_extensions
from ros2bag.api import check_not_negative_float
from ros2bag.api import convert_service_to_service_event_topic
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import print_error
Expand Down Expand Up @@ -176,6 +177,15 @@ def add_recorder_arguments(parser: ArgumentParser) -> None:
help='Enable snapshot mode. Messages will not be written to the bagfile until '
'the "/rosbag2_recorder/snapshot" service is called. e.g. \n '
'ros2 service call /rosbag2_recorder/snapshot rosbag2_interfaces/Snapshot')
parser.add_argument(
'--snapshot-duration', type=check_not_negative_float, default=0.0,
help='Maximum snapshot duration in a fraction of seconds.\n'
'Default: %(default)d, indicates that the snapshot will be limited by the'
' --max-cache-size parameter only. If the value is more than 0, the cyclic buffer'
' for the snapshot will be limited by both the series of messages duration and the'
' maximum cache size parameter.\n'
'To override the upper bound by total messages size, the '
'--maximum-cache-size parameter can be settled to 0.')
parser.add_argument(
'--log-level', type=str, default='info',
choices=['debug', 'info', 'warn', 'error', 'fatal'],
Expand Down Expand Up @@ -281,6 +291,10 @@ def validate_parsed_arguments(args, uri) -> str:
if args.compression_queue_size < 0:
return print_error('Compression queue size must be at least 0.')

if args.snapshot_mode and args.snapshot_duration == 0.0 and args.max_cache_size == 0:
return print_error('In snapshot mode, either the snapshot_duration or max_bytes_size shall'
' not be set to zero.')


class RecordVerb(VerbExtension):
"""Record ROS data to a bag."""
Expand Down Expand Up @@ -325,6 +339,7 @@ def main(self, *, args): # noqa: D102
storage_preset_profile=args.storage_preset_profile,
storage_config_uri=storage_config_file,
snapshot_mode=args.snapshot_mode,
snapshot_duration=args.snapshot_duration,
custom_data=custom_data
)
record_options = RecordOptions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ROSBAG2_CPP_PUBLIC CircularMessageCache
: public MessageCacheInterface
{
public:
explicit CircularMessageCache(size_t max_buffer_size);
explicit CircularMessageCache(size_t max_buffer_size, int64_t max_buffer_duration_ns = 0);

~CircularMessageCache() override;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// Copyright 2021 Amazonhe t.com, Inc. or its affiliates. All Rights Reserved.
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accidental typo?

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,27 +36,35 @@ namespace rosbag2_cpp
namespace cache
{

/**
* This class implements a circular buffer message cache. Since the buffer
* size is limited by total byte size of the storage messages rather than
* a fix number of messages, a deque is used instead of a vector since
* older messages can always be dropped from the front and new messages added
* to the end. The buffer will never consume more than max_cache_size bytes,
* and will log a warning message if an individual message exceeds the buffer
* size.
*/
/// This class implements a circular buffer message cache. Since the buffer
/// size is limited by the total byte size of the storage messages or a total messages duration
/// rather than a fix number of messages, a deque is used instead of a vector since
/// older messages can always be dropped from the front and new messages added
/// to the end. The buffer will never consume more than max_cache_size bytes, if max_cache_size > 0.
/// And will log a warning message if an individual message exceeds the buffer size.
class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer
: public CacheBufferInterface
{
public:
// Delete default constructor since max_cache_size is required
MessageCacheCircularBuffer() = delete;
explicit MessageCacheCircularBuffer(size_t max_cache_size);

/**
* If buffer size has some space left, we push the message regardless of its size,
* but if this results in exceeding buffer size, we begin dropping old messages.
*/
/// \brief Parametrized constructor
/// \param max_cache_size Maximum amount of memory which could be occupied by the messages stored
/// in the circular buffer. Note. If max_cache_size is zero, the circular buffer will be only
/// bounded by the max_cache_duration.
/// \param max_cache_duration_ns Maximum duration in nanoseconds of message sequence allowed to be
/// stored in the circular buffer. Note. If max_cache_duration is zero, the circular buffer will
/// be only bounded by the max_cache_size.
/// \throws std::invalid_argument if both max_cache_size and max_cache_duration are zero.
explicit MessageCacheCircularBuffer(size_t max_cache_size, int64_t max_cache_duration_ns = 0);

/// \brief Push new message to the circular buffer
/// \details If buffer size has some space left, we push the message regardless of its size,
/// but if this results in exceeding buffer size, we begin dropping old messages.
/// \param msg Shared pointer to the rosbag2_storage::SerializedBagMessage
/// \return False if the buffer_bytes_size > 0 and
/// msg->serialized_data->buffer_length > max_bytes_size, otherwise true.
bool push(CacheBufferInterface::buffer_element_t msg) override;

/// Clear buffer
Expand All @@ -73,6 +81,7 @@ class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer
std::vector<CacheBufferInterface::buffer_element_t> msg_vector_;
size_t buffer_bytes_size_ {0u};
const size_t max_bytes_size_;
const int64_t max_cache_duration_;
};

} // namespace cache
Expand Down
8 changes: 5 additions & 3 deletions rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ namespace rosbag2_cpp
namespace cache
{

CircularMessageCache::CircularMessageCache(size_t max_buffer_size)
CircularMessageCache::CircularMessageCache(size_t max_buffer_size, int64_t max_buffer_duration_ns)
{
producer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(max_buffer_size);
consumer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(max_buffer_size);
producer_buffer_ =
std::make_shared<MessageCacheCircularBuffer>(max_buffer_size, max_buffer_duration_ns);
consumer_buffer_ =
std::make_shared<MessageCacheCircularBuffer>(max_buffer_size, max_buffer_duration_ns);
}

CircularMessageCache::~CircularMessageCache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,44 @@ namespace rosbag2_cpp
namespace cache
{

MessageCacheCircularBuffer::MessageCacheCircularBuffer(size_t max_cache_size)
: max_bytes_size_(max_cache_size)
MessageCacheCircularBuffer::MessageCacheCircularBuffer(
size_t max_cache_size,
int64_t max_cache_duration_ns)
: max_bytes_size_(max_cache_size), max_cache_duration_(max_cache_duration_ns)
{
if (max_bytes_size_ == 0 && max_cache_duration_ == 0) {
ROSBAG2_CPP_LOG_ERROR_STREAM("Invalid arguments for the MessageCacheCircularBuffer. "
"Both max_bytes_size and max_cache_duration are zero.");
Comment on lines +34 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is debug message, it should include some extra debug information that the following exception cannot throw as in the message? otherwise, it is the same information with the following exception, so maybe debugging message here is not really useful since user can see the exact same message via exception?

throw std::invalid_argument("Invalid arguments for the MessageCacheCircularBuffer. "
"Both max_bytes_size and max_cache_duration are zero.");
}
}

bool MessageCacheCircularBuffer::push(CacheBufferInterface::buffer_element_t msg)
{
// Drop message if it exceeds the buffer size
if (msg->serialized_data->buffer_length > max_bytes_size_) {
if (buffer_bytes_size_ > 0 && msg->serialized_data->buffer_length > max_bytes_size_) {
ROSBAG2_CPP_LOG_WARN_STREAM("Last message exceeds snapshot buffer size. Dropping message!");
return false;
}

// Remove any old items until there is room for new message
while (buffer_bytes_size_ > (max_bytes_size_ - msg->serialized_data->buffer_length)) {
// Remove any old items until there is room for a new message
while (max_bytes_size_ > 0 &&
buffer_bytes_size_ > (max_bytes_size_ - msg->serialized_data->buffer_length))
{
buffer_bytes_size_ -= buffer_.front()->serialized_data->buffer_length;
buffer_.pop_front();
}
// Add new message to end of buffer
// Remove any old items until the difference between last and newest message timestamp
// will be less than or equal to the max_cache_duration_.
if (buffer_.size() > 1) {
auto current_buffer_duration = buffer_.front()->recv_timestamp - buffer_.back()->recv_timestamp;
while (max_cache_duration_ > 0 && current_buffer_duration > max_cache_duration_) {
buffer_.pop_front();
current_buffer_duration = buffer_.front()->recv_timestamp - buffer_.back()->recv_timestamp;
}
}
// Add a new message to the end of the buffer
buffer_bytes_size_ += msg->serialized_data->buffer_length;
buffer_.push_back(msg);

Expand Down
8 changes: 5 additions & 3 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,18 @@ void SequentialWriter::open(
throw std::runtime_error{error.str()};
}

use_cache_ = storage_options.max_cache_size > 0u;
use_cache_ = storage_options.max_cache_size > 0u ||
(storage_options.snapshot_mode && storage_options.snapshot_duration.nanoseconds() > 0);
Comment on lines +148 to +149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking storage_options.snapshot_mode is redundant?

Suggested change
use_cache_ = storage_options.max_cache_size > 0u ||
(storage_options.snapshot_mode && storage_options.snapshot_duration.nanoseconds() > 0);
use_cache_ = storage_options.max_cache_size > 0u || storage_options.snapshot_duration.nanoseconds() > 0;

if (storage_options.snapshot_mode && !use_cache_) {
throw std::runtime_error(
"Max cache size must be greater than 0 when snapshot mode is enabled");
"Either the max cache size or the maximum snapshot duration must be greater than 0"
" when snapshot mode is enabled");
}

if (use_cache_) {
if (storage_options.snapshot_mode) {
message_cache_ = std::make_shared<rosbag2_cpp::cache::CircularMessageCache>(
storage_options.max_cache_size);
storage_options.max_cache_size, storage_options.snapshot_duration.nanoseconds());
} else {
message_cache_ = std::make_shared<rosbag2_cpp::cache::MessageCache>(
storage_options.max_cache_size);
Expand Down
3 changes: 2 additions & 1 deletion rosbag2_py/rosbag2_py/_storage.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,14 @@ class StorageOptions:
max_bagfile_duration: int
max_bagfile_size: int
max_cache_size: int
snapshot_duration: object
snapshot_mode: bool
start_time_ns: int
storage_config_uri: str
storage_id: str
storage_preset_profile: str
uri: str
def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ...) -> None: ...
def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., snapshot_duration: object = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ...) -> None: ...

class TopicInformation:
message_count: int
Expand Down
53 changes: 50 additions & 3 deletions rosbag2_py/src/rosbag2_py/_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <string>
#include <utility>
#include <vector>

#include "rosbag2_cpp/converter_options.hpp"
Expand Down Expand Up @@ -44,6 +45,16 @@ std::chrono::nanoseconds from_rclpy_duration(const pybind11::object & duration)
return std::chrono::nanoseconds(nanos);
}

pybind11::object rclcpp_duration_to_py_float(const rclcpp::Duration & duration)
{
return pybind11::cast(duration.seconds());
}

rclcpp::Duration py_float_to_rclcpp_duration(const pybind11::object & obj)
{
return rclcpp::Duration::from_seconds(obj.cast<double>());
}

template<typename T>
pybind11::object to_rclpy_time(T time)
{
Expand Down Expand Up @@ -81,9 +92,36 @@ PYBIND11_MODULE(_storage, m) {
using KEY_VALUE_MAP = std::unordered_map<std::string, std::string>;
pybind11::class_<rosbag2_storage::StorageOptions>(m, "StorageOptions")
.def(
pybind11::init<
std::string, std::string, uint64_t, uint64_t, uint64_t, std::string, std::string, bool,
int64_t, int64_t, KEY_VALUE_MAP>(),
pybind11::init(
[](
std::string uri,
std::string storage_id,
uint64_t max_bagfile_size,
uint64_t max_bagfile_duration,
uint64_t max_cache_size,
std::string storage_preset_profile,
std::string storage_config_uri,
bool snapshot_mode,
const pybind11::object & snapshot_duration,
int64_t start_time_ns,
int64_t end_time_ns,
KEY_VALUE_MAP custom_data)
{
return rosbag2_storage::StorageOptions{
std::move(uri),
std::move(storage_id),
max_bagfile_size,
max_bagfile_duration,
max_cache_size,
std::move(storage_preset_profile),
std::move(storage_config_uri),
snapshot_mode,
py_float_to_rclcpp_duration(snapshot_duration),
start_time_ns,
end_time_ns,
std::move(custom_data),
};
}),
pybind11::arg("uri"),
pybind11::arg("storage_id") = "",
pybind11::arg("max_bagfile_size") = 0,
Expand All @@ -92,6 +130,7 @@ PYBIND11_MODULE(_storage, m) {
pybind11::arg("storage_preset_profile") = "",
pybind11::arg("storage_config_uri") = "",
pybind11::arg("snapshot_mode") = false,
pybind11::arg("snapshot_duration") = rclcpp_duration_to_py_float(rclcpp::Duration(0, 0)),
pybind11::arg("start_time_ns") = -1,
pybind11::arg("end_time_ns") = -1,
pybind11::arg("custom_data") = KEY_VALUE_MAP{})
Expand All @@ -115,6 +154,14 @@ PYBIND11_MODULE(_storage, m) {
.def_readwrite(
"snapshot_mode",
&rosbag2_storage::StorageOptions::snapshot_mode)
.def_property(
"snapshot_duration",
[](const rosbag2_storage::StorageOptions & self) {
return rclcpp_duration_to_py_float(self.snapshot_duration);
},
[](rosbag2_storage::StorageOptions & self, const pybind11::object & obj) {
self.snapshot_duration = py_float_to_rclcpp_duration(obj);
})
.def_readwrite(
"start_time_ns",
&rosbag2_storage::StorageOptions::start_time_ns)
Expand Down
5 changes: 5 additions & 0 deletions rosbag2_storage/include/rosbag2_storage/storage_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <string>
#include <unordered_map>

#include "rclcpp/duration.hpp"
#include "rosbag2_storage/visibility_control.hpp"
#include "rosbag2_storage/yaml.hpp"

Expand Down Expand Up @@ -56,6 +57,10 @@ struct StorageOptions
// Defaults to disabled.
bool snapshot_mode = false;

// The maximum snapshot duration.
// A value of 0.0 indicates that snapshot will be limited by the max_cache_size only.
rclcpp::Duration snapshot_duration{0, 0};

// Start and end time for cutting
int64_t start_time_ns = -1;
int64_t end_time_ns = -1;
Expand Down
3 changes: 3 additions & 0 deletions rosbag2_storage/src/rosbag2_storage/storage_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <string>
#include <unordered_map>

#include "rclcpp/duration.hpp"
#include "rosbag2_storage/storage_options.hpp"

namespace YAML
Expand All @@ -32,6 +33,7 @@ Node convert<rosbag2_storage::StorageOptions>::encode(
node["storage_preset_profile"] = storage_options.storage_preset_profile;
node["storage_config_uri"] = storage_options.storage_config_uri;
node["snapshot_mode"] = storage_options.snapshot_mode;
node["snapshot_duration"] = storage_options.snapshot_duration;
node["start_time_ns"] = storage_options.start_time_ns;
node["end_time_ns"] = storage_options.end_time_ns;
node["custom_data"] = storage_options.custom_data;
Expand All @@ -50,6 +52,7 @@ bool convert<rosbag2_storage::StorageOptions>::decode(
node, "storage_preset_profile", storage_options.storage_preset_profile);
optional_assign<std::string>(node, "storage_config_uri", storage_options.storage_config_uri);
optional_assign<bool>(node, "snapshot_mode", storage_options.snapshot_mode);
optional_assign<rclcpp::Duration>(node, "snapshot_duration", storage_options.snapshot_duration);
optional_assign<int64_t>(node, "start_time_ns", storage_options.start_time_ns);
optional_assign<int64_t>(node, "end_time_ns", storage_options.end_time_ns);
using KEY_VALUE_MAP = std::unordered_map<std::string, std::string>;
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ TEST(storage_options, test_yaml_serialization)
original.storage_preset_profile = "profile";
original.storage_config_uri = "config_uri";
original.snapshot_mode = true;
original.snapshot_duration = rclcpp::Duration::from_seconds(1.5);
original.start_time_ns = 12345000;
original.end_time_ns = 23456000;
original.custom_data["key1"] = "value1";
Expand All @@ -50,6 +51,7 @@ TEST(storage_options, test_yaml_serialization)
ASSERT_EQ(original.storage_preset_profile, reconstructed.storage_preset_profile);
ASSERT_EQ(original.storage_config_uri, reconstructed.storage_config_uri);
ASSERT_EQ(original.snapshot_mode, reconstructed.snapshot_mode);
ASSERT_EQ(original.snapshot_duration, reconstructed.snapshot_duration);
ASSERT_EQ(original.start_time_ns, reconstructed.start_time_ns);
ASSERT_EQ(original.end_time_ns, reconstructed.end_time_ns);
ASSERT_EQ(original.custom_data, reconstructed.custom_data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ get_storage_options_from_node_params(rclcpp::Node & node)

storage_options.snapshot_mode = node.declare_parameter<bool>("storage.snapshot_mode", false);

storage_options.snapshot_duration =
param_utils::get_duration_from_node_param(node, "storage.snapshot_duration", 0, 0);

auto list_of_key_value_strings = node.declare_parameter<std::vector<std::string>>(
"storage.custom_data",
std::vector<std::string>());
Expand Down
3 changes: 3 additions & 0 deletions rosbag2_transport/test/resources/player_node_params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,7 @@ player_params_node:
max_cache_size: 9898
storage_preset_profile: "resilient"
snapshot_mode: false
snapshot_duration:
sec: 1
nsec: 500000000
custom_data: ["key1=value1", "key2=value2"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requires a newline at the end of the file?

3 changes: 3 additions & 0 deletions rosbag2_transport/test/resources/recorder_node_params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ recorder_params_node:
max_cache_size: 989888
storage_preset_profile: "none"
snapshot_mode: false
snapshot_duration:
sec: 1
nsec: 500000000
custom_data: ["key1=value1", "key2=value2"]
start_time_ns: 0
end_time_ns: 100000
Loading
Loading