Skip to content

Commit 863d110

Browse files
MartinCornelis2Timple
authored andcommitted
Add add_analyzer functionality
1 parent b169c12 commit 863d110

File tree

4 files changed

+129
-20
lines changed

4 files changed

+129
-20
lines changed

diagnostic_aggregator/CMakeLists.txt

+10
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ find_package(ament_cmake REQUIRED)
1414
find_package(diagnostic_msgs REQUIRED)
1515
find_package(pluginlib REQUIRED)
1616
find_package(rclcpp REQUIRED)
17+
find_package(rcl_interfaces REQUIRED)
1718
find_package(std_msgs REQUIRED)
1819

1920
add_library(${PROJECT_NAME} SHARED
@@ -67,6 +68,10 @@ add_executable(aggregator_node src/aggregator_node.cpp)
6768
target_link_libraries(aggregator_node
6869
${PROJECT_NAME})
6970

71+
# Add analyzer
72+
add_executable(add_analyzer src/add_analyzer.cpp)
73+
ament_target_dependencies(add_analyzer rclcpp rcl_interfaces)
74+
7075
# Testing macro
7176
if(BUILD_TESTING)
7277
find_package(ament_lint_auto REQUIRED)
@@ -135,6 +140,11 @@ install(
135140
DESTINATION lib/${PROJECT_NAME}
136141
)
137142

143+
install(
144+
TARGETS add_analyzer
145+
DESTINATION lib/${PROJECT_NAME}
146+
)
147+
138148
install(
139149
TARGETS ${PROJECT_NAME} ${ANALYZERS}
140150
EXPORT ${PROJECT_NAME}Targets

diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp

+12
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ class Aggregator
133133
rclcpp::Service<diagnostic_msgs::srv::AddDiagnostics>::SharedPtr add_srv_;
134134
/// DiagnosticArray, /diagnostics
135135
rclcpp::Subscription<diagnostic_msgs::msg::DiagnosticArray>::SharedPtr diag_sub_;
136+
/// ParameterEvent, /parameter_events
137+
rclcpp::Subscription<rcl_interfaces::msg::ParameterEvent>::SharedPtr param_sub_;
136138
/// DiagnosticArray, /diagnostics_agg
137139
rclcpp::Publisher<diagnostic_msgs::msg::DiagnosticArray>::SharedPtr agg_pub_;
138140
/// DiagnosticStatus, /diagnostics_toplevel_state
@@ -165,6 +167,16 @@ class Aggregator
165167
/// Records all ROS warnings. No warnings are repeated.
166168
std::set<std::string> ros_warnings_;
167169

170+
/*
171+
*!\brief Checks for new parameters to trigger reinitialization of the AnalyzerGroup and OtherAnalyzer
172+
*/
173+
void parameterCallback(const rcl_interfaces::msg::ParameterEvent::SharedPtr param_msg);
174+
175+
/*
176+
*!\brief (re)initializes the AnalyzerGroup and OtherAnalyzer
177+
*/
178+
void initAnalyzers();
179+
168180
/*
169181
*!\brief Checks timestamp of message, and warns if timestamp is 0 (not set)
170182
*/
+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#include "rclcpp/rclcpp.hpp"
2+
#include "rcl_interfaces/srv/set_parameters_atomically.hpp"
3+
#include "rcl_interfaces/msg/parameter.hpp"
4+
5+
#include <chrono>
6+
#include <cstdlib>
7+
#include <memory>
8+
9+
using namespace std::chrono_literals;
10+
11+
class AddAnalyzer : public rclcpp::Node
12+
{
13+
public:
14+
AddAnalyzer()
15+
: Node("add_analyzer_node", "", rclcpp::NodeOptions().allow_undeclared_parameters(
16+
true).automatically_declare_parameters_from_overrides(true))
17+
{
18+
client_ = this->create_client<rcl_interfaces::srv::SetParametersAtomically>(
19+
"/diagnostics_agg/set_parameters_atomically");
20+
}
21+
22+
void send_request()
23+
{
24+
while (!client_->wait_for_service(1s)) {
25+
if (!rclcpp::ok()) {
26+
RCLCPP_ERROR(this->get_logger(), "Interrupted while waiting for the service. Exiting.");
27+
return;
28+
}
29+
RCLCPP_INFO_ONCE(this->get_logger(), "service not available, waiting ...");
30+
}
31+
auto request = std::make_shared<rcl_interfaces::srv::SetParametersAtomically::Request>();
32+
std::map<std::string, rclcpp::Parameter> parameters;
33+
if (this->get_parameters("", parameters)) {
34+
for (const auto & param : parameters) {
35+
if (param.first.substr(0, prefix_.length()).compare(prefix_) == 0) {
36+
auto parameter_msg = param.second.to_parameter_msg();
37+
request->parameters.push_back(parameter_msg);
38+
}
39+
}
40+
}
41+
auto result = client_->async_send_request(request);
42+
// Wait for the result.
43+
if (rclcpp::spin_until_future_complete(this->get_node_base_interface(), result) ==
44+
rclcpp::FutureReturnCode::SUCCESS)
45+
{
46+
RCLCPP_INFO(this->get_logger(), "Parameters succesfully set");
47+
} else {
48+
RCLCPP_ERROR(this->get_logger(), "Failed to set parameters");
49+
}
50+
}
51+
52+
private:
53+
rclcpp::Client<rcl_interfaces::srv::SetParametersAtomically>::SharedPtr client_;
54+
std::string prefix_ = "analyzers.";
55+
};
56+
57+
int main(int argc, char ** argv)
58+
{
59+
rclcpp::init(argc, argv);
60+
61+
auto add_analyzer = std::make_shared<AddAnalyzer>();
62+
add_analyzer->send_request();
63+
rclcpp::shutdown();
64+
65+
return 0;
66+
}

diagnostic_aggregator/src/aggregator.cpp

+41-20
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ using diagnostic_msgs::msg::DiagnosticStatus;
5959
Aggregator::Aggregator()
6060
: n_(std::make_shared<rclcpp::Node>(
6161
"analyzers", "",
62-
rclcpp::NodeOptions().automatically_declare_parameters_from_overrides(true))),
62+
rclcpp::NodeOptions().allow_undeclared_parameters(true).
63+
automatically_declare_parameters_from_overrides(true))),
6364
logger_(rclcpp::get_logger("Aggregator")),
6465
pub_rate_(1.0),
6566
history_depth_(1000),
@@ -69,6 +70,35 @@ Aggregator::Aggregator()
6970
last_top_level_state_(DiagnosticStatus::STALE)
7071
{
7172
RCLCPP_DEBUG(logger_, "constructor");
73+
initAnalyzers();
74+
75+
diag_sub_ = n_->create_subscription<DiagnosticArray>(
76+
"/diagnostics", rclcpp::SystemDefaultsQoS().keep_last(history_depth_),
77+
std::bind(&Aggregator::diagCallback, this, _1));
78+
agg_pub_ = n_->create_publisher<DiagnosticArray>("/diagnostics_agg", 1);
79+
toplevel_state_pub_ =
80+
n_->create_publisher<DiagnosticStatus>("/diagnostics_toplevel_state", 1);
81+
82+
int publish_rate_ms = 1000 / pub_rate_;
83+
publish_timer_ = n_->create_wall_timer(
84+
std::chrono::milliseconds(publish_rate_ms),
85+
std::bind(&Aggregator::publishData, this));
86+
87+
param_sub_ = n_->create_subscription<rcl_interfaces::msg::ParameterEvent>(
88+
"/parameter_events", 1, std::bind(&Aggregator::parameterCallback, this, _1));
89+
}
90+
91+
void Aggregator::parameterCallback(const rcl_interfaces::msg::ParameterEvent::SharedPtr msg)
92+
{
93+
if (msg->node == "/" + std::string(n_->get_name())) {
94+
if (msg->new_parameters.size() != 0) {
95+
initAnalyzers();
96+
}
97+
}
98+
}
99+
100+
void Aggregator::initAnalyzers()
101+
{
72102
bool other_as_errors = false;
73103

74104
std::map<std::string, rclcpp::Parameter> parameters;
@@ -101,26 +131,17 @@ Aggregator::Aggregator()
101131
RCLCPP_DEBUG(
102132
logger_, "Aggregator critical publisher configured to: %s", (critical_ ? "true" : "false"));
103133

104-
analyzer_group_ = std::make_unique<AnalyzerGroup>();
105-
if (!analyzer_group_->init(base_path_, "", n_)) {
106-
RCLCPP_ERROR(logger_, "Analyzer group for diagnostic aggregator failed to initialize!");
107-
}
108-
109-
// Last analyzer handles remaining data
110-
other_analyzer_ = std::make_unique<OtherAnalyzer>(other_as_errors);
111-
other_analyzer_->init(base_path_); // This always returns true
112-
113-
diag_sub_ = n_->create_subscription<DiagnosticArray>(
114-
"/diagnostics", rclcpp::SystemDefaultsQoS().keep_last(history_depth_),
115-
std::bind(&Aggregator::diagCallback, this, _1));
116-
agg_pub_ = n_->create_publisher<DiagnosticArray>("/diagnostics_agg", 1);
117-
toplevel_state_pub_ =
118-
n_->create_publisher<DiagnosticStatus>("/diagnostics_toplevel_state", 1);
134+
{ // lock the mutex while analyzer_group_ and other_analyzer_ are being updated
135+
std::lock_guard<std::mutex> lock(mutex_);
136+
analyzer_group_ = std::make_unique<AnalyzerGroup>();
137+
if (!analyzer_group_->init(base_path_, "", n_)) {
138+
RCLCPP_ERROR(logger_, "Analyzer group for diagnostic aggregator failed to initialize!");
139+
}
119140

120-
int publish_rate_ms = 1000 / pub_rate_;
121-
publish_timer_ = n_->create_wall_timer(
122-
std::chrono::milliseconds(publish_rate_ms),
123-
std::bind(&Aggregator::publishData, this));
141+
// Last analyzer handles remaining data
142+
other_analyzer_ = std::make_unique<OtherAnalyzer>(other_as_errors);
143+
other_analyzer_->init(base_path_); // This always returns true
144+
}
124145
}
125146

126147
void Aggregator::checkTimestamp(const DiagnosticArray::SharedPtr diag_msg)

0 commit comments

Comments
 (0)