Skip to content

Commit

Permalink
Instrument client/service for end-to-end request/response tracking (#145
Browse files Browse the repository at this point in the history
)

Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
christophebedard authored Dec 14, 2024
1 parent ae2fed0 commit 699f572
Show file tree
Hide file tree
Showing 10 changed files with 549 additions and 43 deletions.
46 changes: 29 additions & 17 deletions doc/design_ros_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ The following table summarizes the instrumentation and links to the correspondin
| | `rmw_subscription_init` | [*Subscription creation*](#subscription-creation) |
| | `rmw_publish` | [*Message publishing*](#message-publishing) |
| | `rmw_take` | [*Subscription callbacks*](#subscription-callbacks) |
| | `rmw_client_init` | [*Client creation*](#client-creation) |
| | `rmw_take_request` | [*Service callbacks*](#service-callbacks) |
| | `rmw_send_response` | [*Service callbacks*](#service-callbacks) |
| | `rmw_send_request` | [*Client request/response*](#client-requestresponse) |
| | `rmw_take_response` | [*Client request/response*](#client-requestresponse) |

### General guidelines

Expand Down Expand Up @@ -438,6 +443,8 @@ For simple messages without loaning, it simply gets deallocated.
* Link to handle(s) of subscription being executed
* Message being taken
* Source timestamp of message being taken
* To link message being taken to the same message being published
* Note: there could be collisions if multiple publishers publish a message on the same topic at the exact same time (depending on clock resolution), but it's unlikely
* Link to callback object being dispatched, with start/end timestamps
* Whether the callback dispatching is for intra-process or not

Expand Down Expand Up @@ -575,7 +582,7 @@ The node calls `rclcpp::create_service()` which ends up creating a `rclcpp::Serv
In its constructor, it allocates a `rcl_service_t` handle, and then calls `rcl_service_init()`.
This processes the handle and validates the service name.
It calls `rmw_create_service()` to get the corresponding `rmw_service_t` handle.
`rclcpp::Service` creates an `rclcpp::AnySubscriptionCallback` object and associates it with itself.
`rclcpp::Service` creates an `rclcpp::AnyServiceCallback` object and associates it with itself.

**Important information**:
* Link between `rcl_service_t` and `rmw_service_t` handles
Expand Down Expand Up @@ -616,16 +623,17 @@ It then calls `rclcpp::Service::take_type_erased_request()`, which calls `rcl_ta

If those are successful and a new request is taken, then the `rclcpp::Executor` calls `rclcpp::Service::handle_request()` with the request.
This casts the request to its actual type, allocates a response object, and calls `rclcpp::AnyServiceCallback::dispatch()`, which calls the actual `std::function` with the right signature.
Depending on the service callback signature, a response object could be provided to the callback to be populated, a reference to the service could be used to send a response from inside the callback, or a response could be sent later.

If there is a service response for the request, `rclcpp::Service::send_response()` is called, which calls `rcl_send_response()` & `rmw_send_response()`.

**Important information**:
* Link to handle(s) of service being executed
* Request being taken
* Link to callback object being dispatched, with start/end timestamps
* TODO
* Link to handle(s) of service being executed
* Source timestamp of request being taken
* Link between request and response
* Sequence number and client GID of request being taken
* Sequence number and client GID of request that a response is for
* Source timestamp of response being sent

```mermaid
sequenceDiagram
Expand All @@ -644,17 +652,19 @@ sequenceDiagram
Executor->>Service: take_type_erased_request(out request *, out request_id): bool
Service->>rcl: rcl_take_request(rcl_service *, out request_header *, out request *)
rcl->>rmw: rmw_take_request(rmw_service_t *, out request_header *, out request *, out taken)
rmw-->>tracetools: TP(rmw_take_request, rmw_service_t *, request *, client_gid, sequence_number, taken)
opt taken
Executor->>Service: handle_request(request_header *, request *)
Note over Service: casts request to its actual type
Note over Service: allocates a response object
Service->>AnyServiceCallback: dispatch(request_header, typed_request): response
Service->>AnyServiceCallback: dispatch(service, request_header, request): response
AnyServiceCallback-->>tracetools: TP(callback_start, rclcpp::AnyServiceCallback *)
Note over AnyServiceCallback: std::function(...)
AnyServiceCallback-->>tracetools: TP(callback_end, rclcpp::AnyServiceCallback *)
opt response
Service->>rcl: rcl_send_response(rcl_service_t *, request_header *, response *)
rcl->>rmw: rmw_send_response(rmw_service_t *, request_header *, response *)
rmw-->>tracetools: TP(rmw_send_response, rmw_service_t *, response *, client_gid, sequence_number, timestamp)
end
end
```
Expand All @@ -666,10 +676,12 @@ The node calls `rclcpp::create_client()` which ends up creating a `rclcpp::Clien
In its constructor, it allocates a `rcl_client_t` handle, and then calls `rcl_client_init()`.
This validates and processes the handle.
It also calls `rmw_create_client()` which creates the `rmw_client_t` handle.
`rmw` creates or associates the `rmw_client_t` handle with *a* GID, since client `rmw` implementations use multiple DDS objects that have GIDs (e.g., request publisher, response subscription).

**Important information**:
* Link between `rcl_client_t` and `rmw_client_t` handles
* Link to corresponding node handle
* Link to corresponding client GID
* Service name

```mermaid
Expand All @@ -689,6 +701,7 @@ sequenceDiagram
Note over rcl: validates and processes rcl_client_t handle
rcl->>rmw: rmw_create_client(rmw_node_t *, service_name, qos_options): rmw_client_t
Note over rmw: creates rmw_client_t handle
rmw-->>tracetools: TP(rmw_client_init, rmw_client_t *, gid)
rcl-->>tracetools: TP(rcl_client_init, rcl_client_t *, rcl_node_t *, rmw_client_t *, service_name)
```

Expand All @@ -708,13 +721,12 @@ This waits until the future object is ready, or until timeout, and returns.
If this last call was successful, then the node can get the result (i.e., response) and do something with it.

**Important information**:
* TODO
* Link to handle(s) of client
* Request being sent, with timestamp
* Link to callback object being used, with start/end timestamps
* Response being taken
* Source timestamp of response being taken
* Link to response
* Link to handle(s) of client
* Sequence number of request being sent
* Sequence number of request that a response being taken is for
* Source timestamp of response being taken
* To link response being taken to the same response being sent
* Note: there could be collisions if multiple services replied to the same request at the exact same time (depending on clock resolution), but it's unlikely

```mermaid
sequenceDiagram
Expand All @@ -730,11 +742,11 @@ sequenceDiagram
Note over node: creates request
node->>Client: async_send_request(request[, callback]): result_future
Client->>rcl: rcl_send_request(rcl_client_t, request, out sequence_number): int64_t
Client->>rcl: rcl_send_request(rcl_client_t, request *, out sequence_number): int64_t
Note over rcl: assigns sequence_number
%% rcl-->>tracetools: TP(rcl_send_request, rcl_client_t *, sequence_number)
rcl->>rmw: rmw_send_request(rmw_client_t, request, sequence_number)
%% rmw-->>tracetools: TP(rmw_send_request, sequence_number)
rcl->>rmw: rmw_send_request(rmw_client_t, request *, sequence_number)
rmw-->>tracetools: TP(rmw_send_request, rmw_client_t *, request *, sequence_number)
Note over Client: puts sequence_number in a 'pending requests' map with promise+callback+future
alt without callback
Expand All @@ -753,7 +765,7 @@ sequenceDiagram
Executor->>Client: take_type_erased_response(response *, header *): bool
Client->>rcl: rcl_take_response*(rcl_client_t *, out request_header *, out response *)
rcl->>rmw: rmw_take_response(rmw_client_t *, out request_header *, out response *, out taken)
%% rmw-->>tracetools: TP(rmw_take_response)
rmw-->>tracetools: TP(rmw_take_response, rmw_client_t *, response *, sequence_number, source_timestamp, taken)
%% rcl-->>tracetools: TP(rcl_take_response)
opt taken
Executor->>Client: handle_response(request_header *, response *)
Expand Down
5 changes: 3 additions & 2 deletions test_tracetools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,11 @@ if(BUILD_TESTING)
# Tests to run with the default rmw implementation, which should not matter
set(_test_tracetools_pytest_tests
test/test_buffer.py
test/test_client.py
test/test_executor.py
test/test_intra.py
test/test_intra_pub_sub.py
test/test_lifecycle_node.py
test/test_node.py
test/test_service.py
test/test_timer.py
)
foreach(_test_path ${_test_tracetools_pytest_tests})
Expand All @@ -223,10 +221,13 @@ if(BUILD_TESTING)

# Tests to run with all instrumented/supported rmw implementations
set(_test_tracetools_pytest_tests_multi_rmw
test/test_client.py
test/test_generic_pub_sub.py
test/test_generic_subscription.py
test/test_pub_sub.py
test/test_publisher.py
test/test_service.py
test/test_service_req_resp.py
test/test_subscription.py
)
set(_test_tracetools_rmw_implementations
Expand Down
55 changes: 48 additions & 7 deletions test_tracetools/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ def __init__(self, *args) -> None:
session_name_prefix='session-test-client',
events_ros=[
tp.rcl_node_init,
tp.rmw_client_init,
tp.rcl_client_init,
tp.rmw_send_request,
tp.rmw_take_response,
],
package='test_tracetools',
nodes=['test_service_ping', 'test_service_pong'],
Expand All @@ -39,11 +42,25 @@ def test_all(self):
self.assertEventsSet(self._events_ros)

# Check fields
client_init_events = self.get_events_with_name(tp.rcl_client_init)
rmw_client_init_events = self.get_events_with_name(tp.rmw_client_init)
rcl_client_init_events = self.get_events_with_name(tp.rcl_client_init)
rmw_send_request_events = self.get_events_with_name(tp.rmw_send_request)
rmw_take_response_events = self.get_events_with_name(tp.rmw_take_response)

for event in client_init_events:
for event in rmw_client_init_events:
self.assertValidHandle(event, 'rmw_client_handle')
self.assertValidStaticArray(event, 'gid', int, 16)
for event in rcl_client_init_events:
self.assertValidHandle(event, ['client_handle', 'node_handle', 'rmw_client_handle'])
self.assertStringFieldNotEmpty(event, 'service_name')
for event in rmw_send_request_events:
self.assertValidHandle(event, 'rmw_client_handle')
self.assertValidPointer(event, 'request')
self.assertFieldType(event, 'sequence_number', int)
for event in rmw_take_response_events:
self.assertValidHandle(event, 'rmw_client_handle')
self.assertValidPointer(event, 'response')
self.assertFieldType(event, ['sequence_number', 'source_timestamp', 'taken'], int)

# Find node event
node_init_events = self.get_events_with_name(tp.rcl_node_init)
Expand All @@ -56,19 +73,43 @@ def test_all(self):
)
node_handle = self.get_field(test_node_init_event, 'node_handle')

# Find client init event and check that the node handle matches
test_client_init_event = self.get_event_with_field_value_and_assert(
# Find client init events and check that the handles match
test_rcl_client_init_event = self.get_event_with_field_value_and_assert(
'service_name',
'/ping',
client_init_events,
rcl_client_init_events,
allow_multiple=False,
)
self.assertFieldEquals(test_rcl_client_init_event, 'node_handle', node_handle)
rmw_client_handle = self.get_field(test_rcl_client_init_event, 'rmw_client_handle')
test_rmw_client_init_event = self.get_event_with_field_value_and_assert(
'rmw_client_handle',
rmw_client_handle,
rmw_client_init_events,
allow_multiple=False,
)

# Find rmw_send_request and rmw_take_response events
test_rmw_send_request_event = self.get_event_with_field_value_and_assert(
'rmw_client_handle',
rmw_client_handle,
rmw_send_request_events,
allow_multiple=False,
)
test_rmw_take_response_event = self.get_event_with_field_value_and_assert(
'rmw_client_handle',
rmw_client_handle,
rmw_take_response_events,
allow_multiple=False,
)
self.assertFieldEquals(test_client_init_event, 'node_handle', node_handle)

# Check events order
self.assertEventOrder([
test_node_init_event,
test_client_init_event,
test_rmw_client_init_event,
test_rcl_client_init_event,
test_rmw_send_request_event,
test_rmw_take_response_event,
])


Expand Down
55 changes: 43 additions & 12 deletions test_tracetools/test/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ def __init__(self, *args) -> None:
tp.rcl_service_init,
tp.rclcpp_service_callback_added,
tp.rclcpp_callback_register,
tp.rmw_take_request,
tp.callback_start,
tp.rmw_send_response,
tp.callback_end,
],
package='test_tracetools',
Expand All @@ -43,49 +45,62 @@ def test_all(self):
self.assertEventsSet(self._events_ros)

# Check fields
srv_init_events = self.get_events_with_name(tp.rcl_service_init)
rcl_srv_init_events = self.get_events_with_name(tp.rcl_service_init)
callback_added_events = self.get_events_with_name(tp.rclcpp_service_callback_added)
callback_register_events = self.get_events_with_name(tp.rclcpp_callback_register)
rmw_take_request_events = self.get_events_with_name(tp.rmw_take_request)
callback_start_events = self.get_events_with_name(tp.callback_start)
rmw_send_response_events = self.get_events_with_name(tp.rmw_send_response)
callback_end_events = self.get_events_with_name(tp.callback_end)

for event in srv_init_events:
for event in rcl_srv_init_events:
self.assertValidHandle(event, ['service_handle', 'node_handle', 'rmw_service_handle'])
self.assertStringFieldNotEmpty(event, 'service_name')
for event in callback_added_events:
self.assertValidHandle(event, ['service_handle', 'callback'])
for event in callback_register_events:
self.assertValidPointer(event, 'callback')
self.assertStringFieldNotEmpty(event, 'symbol')
for event in rmw_take_request_events:
self.assertValidHandle(event, 'rmw_service_handle')
self.assertValidPointer(event, 'request')
self.assertValidStaticArray(event, 'client_gid', int, 16)
self.assertFieldType(event, ['sequence_number', 'taken'], int)
for event in callback_start_events:
self.assertValidHandle(event, 'callback')
# Should not be 1 for services (yet)
self.assertFieldEquals(event, 'is_intra_process', 0)
for event in rmw_send_response_events:
self.assertValidHandle(event, 'rmw_service_handle')
self.assertValidPointer(event, 'response')
self.assertValidStaticArray(event, 'client_gid', int, 16)
self.assertFieldType(event, ['sequence_number', 'timestamp'], int)
for event in callback_end_events:
self.assertValidHandle(event, 'callback')

# Find node event
node_init_events = self.get_events_with_name(tp.rcl_node_init)
rcl_node_init_events = self.get_events_with_name(tp.rcl_node_init)
# The test_service_pong node is the one that has the service (server)
test_node_init_event = self.get_event_with_field_value_and_assert(
test_rcl_node_init_event = self.get_event_with_field_value_and_assert(
'node_name',
'test_service_pong',
node_init_events,
rcl_node_init_events,
allow_multiple=False,
)
node_handle = self.get_field(test_node_init_event, 'node_handle')
node_handle = self.get_field(test_rcl_node_init_event, 'node_handle')

# Find service init event and check that the node handle matches
test_srv_init_event = self.get_event_with_field_value_and_assert(
test_rcl_srv_init_event = self.get_event_with_field_value_and_assert(
'service_name',
'/ping',
srv_init_events,
rcl_srv_init_events,
allow_multiple=False,
)
self.assertFieldEquals(test_srv_init_event, 'node_handle', node_handle)
self.assertFieldEquals(test_rcl_srv_init_event, 'node_handle', node_handle)
rmw_service_handle = self.get_field(test_rcl_srv_init_event, 'rmw_service_handle')

# Check that there is a matching rclcpp_service_callback_added event
service_handle = self.get_field(test_srv_init_event, 'service_handle')
service_handle = self.get_field(test_rcl_srv_init_event, 'service_handle')
test_srv_callback_added_event = self.get_event_with_field_value_and_assert(
'service_handle',
service_handle,
Expand Down Expand Up @@ -116,13 +131,29 @@ def test_all(self):
allow_multiple=False,
)

# Find rmw_take_request and rmw_send_response events
test_rmw_take_request_event = self.get_event_with_field_value_and_assert(
'rmw_service_handle',
rmw_service_handle,
rmw_take_request_events,
allow_multiple=False,
)
test_rmw_send_response_event = self.get_event_with_field_value_and_assert(
'rmw_service_handle',
rmw_service_handle,
rmw_send_response_events,
allow_multiple=False,
)

# Check events order
self.assertEventOrder([
test_node_init_event,
test_srv_init_event,
test_rcl_node_init_event,
test_rcl_srv_init_event,
test_srv_callback_added_event,
test_callback_register_event,
test_rmw_take_request_event,
test_callback_start_event,
test_rmw_send_response_event,
test_callback_end_event,
])

Expand Down
Loading

0 comments on commit 699f572

Please sign in to comment.