Skip to content

Commit 253256a

Browse files
committed
Code review
Signed-off-by: Nicola Loi <[email protected]>
1 parent c8d667f commit 253256a

File tree

5 files changed

+104
-70
lines changed

5 files changed

+104
-70
lines changed

ros2bag/ros2bag/verb/play.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,12 @@ def add_arguments(self, parser, cli_name): # noqa: D102
169169
choices=['debug', 'info', 'warn', 'error', 'fatal'],
170170
help='Logging level.')
171171
parser.add_argument(
172-
'--progress-bar', action='store_true', default=False,
173-
help='Print a progress bar of the playback player.')
172+
'--progress-bar', type=float, default=10.0,
173+
help='Print a progress bar of the playback player at a specific frequency in Hz. '
174+
'Default is %(default)d. '
175+
'Negative values mark an update for every published message, while '
176+
' a zero value disables the progress bar.',
177+
metavar='PROGRESS_BAR_FREQUENCY')
174178

175179
def get_playback_until_from_arg_group(self, playback_until_sec, playback_until_nsec) -> int:
176180
nano_scale = 1000 * 1000 * 1000
@@ -246,7 +250,7 @@ def main(self, *, args): # noqa: D102
246250
play_options.service_requests_source = ServiceRequestsSource.SERVICE_INTROSPECTION
247251
else:
248252
play_options.service_requests_source = ServiceRequestsSource.CLIENT_INTROSPECTION
249-
play_options.disable_progress_bar = not args.progress_bar
253+
play_options.progress_bar_print_frequency = args.progress_bar
250254

251255
player = Player(args.log_level)
252256
try:

rosbag2_py/rosbag2_py/_transport.pyi

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ class PlayOptions:
1010
delay: float
1111
disable_keyboard_controls: bool
1212
disable_loan_message: bool
13-
disable_progress_bar: bool
1413
exclude_regex_to_filter: str
1514
exclude_service_events_to_filter: List[str]
1615
exclude_topics_to_filter: List[str]
1716
loop: bool
1817
node_prefix: str
1918
playback_duration: float
2019
playback_until_timestamp: int
20+
progress_bar_print_frequency: float
2121
publish_service_requests: bool
2222
rate: float
2323
read_ahead_queue_size: int

rosbag2_py/src/rosbag2_py/_transport.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -544,8 +544,7 @@ PYBIND11_MODULE(_transport, m) {
544544
"playback_until_timestamp",
545545
&PlayOptions::getPlaybackUntilTimestamp,
546546
&PlayOptions::setPlaybackUntilTimestamp)
547-
.def_readwrite(
548-
"disable_progress_bar", &PlayOptions::disable_progress_bar)
547+
.def_readwrite("progress_bar_print_frequency", &PlayOptions::progress_bar_print_frequency)
549548
.def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout)
550549
.def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message)
551550
.def_readwrite("publish_service_requests", &PlayOptions::publish_service_requests)

rosbag2_transport/include/rosbag2_transport/play_options.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ struct PlayOptions
124124
// The source of the service request
125125
ServiceRequestsSource service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION;
126126

127-
// Disable to print a progress bar of the playback player
128-
bool disable_progress_bar = true;
127+
// Rate in Hz at which to print progress bar.
128+
double progress_bar_print_frequency = 10.0;
129129
};
130130

131131
} // namespace rosbag2_transport

rosbag2_transport/src/rosbag2_transport/player.cpp

+93-62
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,14 @@ rclcpp::QoS publisher_qos_for_topic(
8080

8181
namespace rosbag2_transport
8282
{
83+
84+
enum class PlayerStatus : char
85+
{
86+
RUNNING = 'R',
87+
PAUSED = 'P',
88+
DELAYED = 'D',
89+
};
90+
8391
constexpr Player::callback_handle_t Player::invalid_callback_handle;
8492

8593
class PlayerImpl
@@ -123,12 +131,13 @@ class PlayerImpl
123131
virtual bool set_rate(double);
124132

125133
/// \brief Playing next message from queue when in pause.
134+
/// \param in_burst_mode If true, it will not call the progress bar update to avoid delays.
126135
/// \details This is blocking call and it will wait until next available message will be
127136
/// published or rclcpp context shut down.
128137
/// \note If internal player queue is starving and storage has not been completely loaded,
129138
/// this method will wait until new element will be pushed to the queue.
130139
/// \return true if player in pause mode and successfully played next message, otherwise false.
131-
virtual bool play_next();
140+
virtual bool play_next(bool in_burst_mode = false);
132141

133142
/// \brief Burst the next \p num_messages messages from the queue when paused.
134143
/// \param num_messages The number of messages to burst from the queue. Specifying zero means no
@@ -298,12 +307,8 @@ class PlayerImpl
298307
void configure_play_until_timestamp();
299308
bool shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const;
300309

301-
template<char status>
302-
void print_status() const;
303-
void check_and_print_status() const;
304-
void print_running_status() const;
305-
void print_paused_status() const;
306-
void print_delayed_status() const;
310+
void print_progress_bar_help_str(double progress_bar_print_frequency) const;
311+
void print_progress_bar_status(const PlayerStatus & status, bool force_update = false);
307312

308313
static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
309314
static const std::chrono::milliseconds queue_read_wait_period_;
@@ -360,7 +365,11 @@ class PlayerImpl
360365
std::shared_ptr<PlayerServiceClientManager> player_service_client_manager_;
361366

362367
// progress bar
363-
const bool disable_progress_bar_;
368+
const bool enable_progress_bar_;
369+
const bool progress_bar_update_always_;
370+
const rcutils_duration_value_t progress_bar_update_period_;
371+
std::chrono::steady_clock::time_point progress_bar_last_time_printed_{};
372+
rcutils_time_point_value_t recv_timestamp_last_published_;
364373
};
365374

366375
PlayerImpl::PlayerImpl(
@@ -374,7 +383,11 @@ PlayerImpl::PlayerImpl(
374383
play_options_(play_options),
375384
keyboard_handler_(std::move(keyboard_handler)),
376385
player_service_client_manager_(std::make_shared<PlayerServiceClientManager>()),
377-
disable_progress_bar_(play_options.disable_progress_bar)
386+
enable_progress_bar_(play_options.progress_bar_print_frequency != 0),
387+
progress_bar_update_always_(play_options.progress_bar_print_frequency < 0),
388+
progress_bar_update_period_(play_options.progress_bar_print_frequency != 0 ?
389+
RCUTILS_S_TO_NS(1.0 / play_options.progress_bar_print_frequency) :
390+
std::numeric_limits<int64_t>::max())
378391
{
379392
for (auto & topic : play_options_.topics_to_filter) {
380393
topic = rclcpp::expand_topic_or_service_name(
@@ -423,6 +436,7 @@ PlayerImpl::PlayerImpl(
423436
}
424437
starting_time_secs_ = RCUTILS_NS_TO_S(static_cast<double>(starting_time_));
425438
duration_secs_ = RCUTILS_NS_TO_S(static_cast<double>(bag_duration));
439+
recv_timestamp_last_published_ = starting_time_;
426440
clock_ = std::make_unique<rosbag2_cpp::TimeControllerClock>(
427441
starting_time_, std::chrono::steady_clock::now,
428442
std::chrono::milliseconds{100}, play_options_.start_paused);
@@ -433,13 +447,7 @@ PlayerImpl::PlayerImpl(
433447
}
434448
create_control_services();
435449
add_keyboard_callbacks();
436-
std::string progress_bar_help_str;
437-
if (!disable_progress_bar_) {
438-
progress_bar_help_str = "Bag Time and Duration [?]: ? = R running, P paused, D delayed";
439-
} else {
440-
progress_bar_help_str = "Bag Time and Duration progress bar disabled.";
441-
}
442-
RCLCPP_INFO_STREAM(owner_->get_logger(), progress_bar_help_str);
450+
print_progress_bar_help_str(play_options.progress_bar_print_frequency);
443451
}
444452

445453
PlayerImpl::~PlayerImpl()
@@ -460,10 +468,9 @@ PlayerImpl::~PlayerImpl()
460468
if (reader_) {
461469
reader_->close();
462470
}
463-
// print new line to keep on screen the latest progress bar
464-
if (!disable_progress_bar_) {
465-
printf("\n");
466-
fflush(stdout);
471+
// print new line to keep on screen the latest progress bar (which had a carriage return)
472+
if (enable_progress_bar_) {
473+
std::cout << std::endl;
467474
}
468475
}
469476

@@ -515,9 +522,12 @@ bool PlayerImpl::play()
515522
do {
516523
if (delay > rclcpp::Duration(0, 0)) {
517524
RCLCPP_INFO_STREAM(owner_->get_logger(), "Sleep " << delay.nanoseconds() << " ns");
518-
print_delayed_status();
525+
print_progress_bar_status(PlayerStatus::DELAYED);
519526
std::chrono::nanoseconds delay_duration(delay.nanoseconds());
520527
std::this_thread::sleep_for(delay_duration);
528+
} else {
529+
print_progress_bar_status(clock_->is_paused() ?
530+
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
521531
}
522532
{
523533
std::lock_guard<std::mutex> lk(reader_mutex_);
@@ -582,6 +592,8 @@ bool PlayerImpl::play()
582592
is_in_playback_ = false;
583593
playback_finished_cv_.notify_all();
584594
}
595+
print_progress_bar_status(clock_->is_paused() ?
596+
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
585597
});
586598
return true;
587599
}
@@ -632,21 +644,22 @@ void PlayerImpl::stop()
632644
playback_thread_.join();
633645
}
634646
}
635-
check_and_print_status();
647+
print_progress_bar_status(clock_->is_paused() ?
648+
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
636649
}
637650

638651
void PlayerImpl::pause()
639652
{
640653
clock_->pause();
641654
RCLCPP_INFO_STREAM(owner_->get_logger(), "Pausing play.");
642-
print_paused_status();
655+
print_progress_bar_status(PlayerStatus::PAUSED);
643656
}
644657

645658
void PlayerImpl::resume()
646659
{
647660
clock_->resume();
648661
RCLCPP_INFO_STREAM(owner_->get_logger(), "Resuming play.");
649-
print_running_status();
662+
print_progress_bar_status(PlayerStatus::RUNNING, true);
650663
}
651664

652665
void PlayerImpl::toggle_paused()
@@ -673,7 +686,8 @@ bool PlayerImpl::set_rate(double rate)
673686
} else {
674687
RCLCPP_WARN_STREAM(owner_->get_logger(), "Failed to set rate to invalid value " << rate);
675688
}
676-
check_and_print_status();
689+
print_progress_bar_status(clock_->is_paused() ?
690+
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
677691
return ok;
678692
}
679693

@@ -706,10 +720,11 @@ rosbag2_storage::SerializedBagMessageSharedPtr PlayerImpl::peek_next_message_fro
706720
return nullptr;
707721
}
708722

709-
bool PlayerImpl::play_next()
723+
bool PlayerImpl::play_next(bool is_burst_mode)
710724
{
711725
if (!clock_->is_paused()) {
712726
RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state.");
727+
print_progress_bar_status(PlayerStatus::RUNNING, true);
713728
return false;
714729
}
715730

@@ -722,6 +737,7 @@ bool PlayerImpl::play_next()
722737
// resume() or stop() from another thread while we were waiting on mutex.
723738
if (!clock_->is_paused()) {
724739
RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state.");
740+
print_progress_bar_status(PlayerStatus::RUNNING, true);
725741
return false;
726742
}
727743
skip_message_in_main_play_loop_ = true;
@@ -741,7 +757,11 @@ bool PlayerImpl::play_next()
741757
next_message_published = publish_message(message_ptr);
742758
clock_->jump(message_ptr->recv_timestamp);
743759

744-
print_paused_status();
760+
// Update the progress bar if we were not called in burst mode to avoid delays.
761+
if (!is_burst_mode) {
762+
recv_timestamp_last_published_ = message_ptr->recv_timestamp;
763+
print_progress_bar_status(PlayerStatus::PAUSED);
764+
}
745765
message_queue_.pop();
746766
message_ptr = peek_next_message_from_queue();
747767
}
@@ -752,22 +772,23 @@ size_t PlayerImpl::burst(const size_t num_messages)
752772
{
753773
if (!clock_->is_paused()) {
754774
RCLCPP_WARN_STREAM(owner_->get_logger(), "Burst can only be used when in the paused state.");
755-
print_running_status();
775+
print_progress_bar_status(PlayerStatus::RUNNING, true);
756776
return 0;
757777
}
758778

759779
uint64_t messages_played = 0;
760780

761781
for (auto ii = 0u; ii < num_messages || num_messages == 0; ++ii) {
762-
if (play_next()) {
782+
if (play_next(true)) {
763783
++messages_played;
764784
} else {
765785
break;
766786
}
767787
}
768788

769789
RCLCPP_INFO_STREAM(owner_->get_logger(), "Burst " << messages_played << " messages.");
770-
print_running_status();
790+
print_progress_bar_status(clock_->is_paused() ?
791+
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
771792
return messages_played;
772793
}
773794

@@ -993,7 +1014,8 @@ void PlayerImpl::play_messages_from_queue()
9931014
continue;
9941015
}
9951016
publish_message(message_ptr);
996-
print_running_status();
1017+
recv_timestamp_last_published_ = message_ptr->recv_timestamp;
1018+
print_progress_bar_status(PlayerStatus::RUNNING);
9971019
}
9981020
message_queue_.pop();
9991021
message_ptr = peek_next_message_from_queue();
@@ -1565,48 +1587,57 @@ const rosbag2_transport::PlayOptions & PlayerImpl::get_play_options()
15651587
return play_options_;
15661588
}
15671589

1568-
template<char status>
1569-
void PlayerImpl::print_status() const
1570-
{
1571-
static_assert(status == 'R' || status == 'P' || status == 'D',
1572-
"Invalid status character");
1573-
double current_time_secs = RCUTILS_NS_TO_S(static_cast<double>(clock_->now()));
1574-
double progress_secs = current_time_secs - starting_time_secs_;
1575-
printf(" Bag Time %13.6f Duration %.6f/%.6f [%c] \r",
1576-
current_time_secs, progress_secs, duration_secs_, status);
1577-
fflush(stdout);
1578-
}
1579-
1580-
void PlayerImpl::check_and_print_status() const
1590+
void PlayerImpl::print_progress_bar_help_str(double progress_bar_print_frequency) const
15811591
{
1582-
if (!disable_progress_bar_) {
1583-
if (clock_->is_paused()) {
1584-
print_status<'P'>();
1592+
std::string progress_bar_help_str;
1593+
if (enable_progress_bar_) {
1594+
if (progress_bar_update_period_ < 0) {
1595+
progress_bar_help_str = "Progress bar enabled with updates for every message.";
15851596
} else {
1586-
print_status<'R'>();
1597+
std::ostringstream oss;
1598+
oss << "Progress bar enabled with updates at "
1599+
<< std::fixed << std::setprecision(1) << progress_bar_print_frequency << " Hz.";
1600+
progress_bar_help_str = oss.str();
15871601
}
1602+
progress_bar_help_str += " [?]: [R]unning, [P]aused, [D]elayed.";
1603+
} else {
1604+
progress_bar_help_str = "Progress bar disabled.";
15881605
}
1606+
RCLCPP_INFO_STREAM(owner_->get_logger(), progress_bar_help_str);
15891607
}
15901608

1591-
void PlayerImpl::print_running_status() const
1609+
void PlayerImpl::print_progress_bar_status(const PlayerStatus & status, bool force_update)
15921610
{
1593-
if (!disable_progress_bar_) {
1594-
print_status<'R'>();
1611+
if (!enable_progress_bar_) {
1612+
return;
15951613
}
1596-
}
15971614

1598-
void PlayerImpl::print_paused_status() const
1599-
{
1600-
if (!disable_progress_bar_) {
1601-
print_status<'P'>();
1602-
}
1603-
}
1615+
const std::chrono::steady_clock::time_point steady_time_now = std::chrono::steady_clock::now();
16041616

1605-
void PlayerImpl::print_delayed_status() const
1606-
{
1607-
if (!disable_progress_bar_) {
1608-
print_status<'D'>();
1617+
// Check if we should force an update of the progress bar, otherwise check time since last update.
1618+
// Force an update if:
1619+
// - the user-defined update period is negative;
1620+
// - or this function was called with a status other than RUNNING (since we want to update the
1621+
// progress bar when we pause or delay the playback, otherwise it will be stuck
1622+
// to the last update if not enough time has passed since it);
1623+
// - or this function was called with force_update set to true.
1624+
if (!(progress_bar_update_always_ || status != PlayerStatus::RUNNING || force_update)) {
1625+
if (std::chrono::duration_cast<std::chrono::nanoseconds>(
1626+
steady_time_now - progress_bar_last_time_printed_).count() < progress_bar_update_period_)
1627+
{
1628+
return;
1629+
}
16091630
}
1631+
1632+
const double current_time_secs = RCUTILS_NS_TO_S(
1633+
static_cast<double>(recv_timestamp_last_published_));
1634+
const double progress_secs = current_time_secs - starting_time_secs_;
1635+
// Print progress bar ending witt carriage return '/r' so it will be overwritten by next print
1636+
std::cout << " Bag Time " << std::setw(13) << std::fixed << std::setprecision(6) <<
1637+
current_time_secs << " Duration " << std::fixed << std::setprecision(6) <<
1638+
progress_secs << "/" << duration_secs_ << " [" << static_cast<char>(status) <<
1639+
"]\r" << std::flush;
1640+
progress_bar_last_time_printed_ = steady_time_now;
16101641
}
16111642

16121643
///////////////////////////////

0 commit comments

Comments
 (0)