Skip to content

Commit

Permalink
Add support for taking a sequence of messages (ros2#148)
Browse files Browse the repository at this point in the history
* Add support for taking a sequence of messages

Signed-off-by: Michael Carroll <[email protected]>

* Reorder valid messages to front of sequence

Signed-off-by: Michael Carroll <[email protected]>

* Initialize taken value

Signed-off-by: Michael Carroll <[email protected]>
  • Loading branch information
mjcarroll authored Apr 24, 2020
1 parent 2638f4a commit 224d761
Showing 1 changed file with 84 additions and 0 deletions.
84 changes: 84 additions & 0 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2163,6 +2163,80 @@ static rmw_ret_t rmw_take_int(
return RMW_RET_OK;
}

static rmw_ret_t rmw_take_seq(
const rmw_subscription_t * subscription,
size_t count,
rmw_message_sequence_t * message_sequence,
rmw_message_info_sequence_t * message_info_sequence,
size_t * taken)
{
RET_NULL(taken);
RET_NULL(message_sequence);
RET_NULL(message_info_sequence);
RET_WRONG_IMPLID(subscription);

if (count > message_sequence->capacity) {
RMW_SET_ERROR_MSG("Insuffient capacity in message_sequence");
return RMW_RET_ERROR;
}

if (count > message_info_sequence->capacity) {
RMW_SET_ERROR_MSG("Insuffient capacity in message_info_sequence");
return RMW_RET_ERROR;
}

CddsSubscription * sub = static_cast<CddsSubscription *>(subscription->data);
RET_NULL(sub);

std::vector<dds_sample_info_t> infos(count);
auto ret = dds_take(sub->enth, message_sequence->data, infos.data(), count, count);

// Returning 0 should not be an error, as it just indicates that no messages were available.
if (ret < 0) {
return RMW_RET_ERROR;
}

// Keep track of taken/not taken to reorder sequence with valid messages at the front
std::vector<void *> taken_msg;
std::vector<void *> not_taken_msg;
*taken = 0u;

for (int ii = 0; ii < ret; ++ii) {
const dds_sample_info_t & info = infos[ii];

void * message = &message_sequence->data[ii];
rmw_message_info_t * message_info = &message_info_sequence->data[*taken];

if (info.valid_data) {
taken_msg.push_back(message);
(*taken)++;
if (message_info) {
message_info->publisher_gid.implementation_identifier = eclipse_cyclonedds_identifier;
memset(message_info->publisher_gid.data, 0, sizeof(message_info->publisher_gid.data));
assert(sizeof(info.publication_handle) <= sizeof(message_info->publisher_gid.data));
memcpy(
message_info->publisher_gid.data, &info.publication_handle,
sizeof(info.publication_handle));
}
} else {
not_taken_msg.push_back(message);
}
}

for (size_t ii = 0; ii < taken_msg.size(); ++ii) {
message_sequence->data[ii] = taken_msg[ii];
}

for (size_t ii = 0; ii < not_taken_msg.size(); ++ii) {
message_sequence->data[ii + taken_msg.size()] = not_taken_msg[ii];
}

message_sequence->size = *taken;
message_info_sequence->size = *taken;

return RMW_RET_OK;
}

static rmw_ret_t rmw_take_ser_int(
const rmw_subscription_t * subscription,
rmw_serialized_message_t * serialized_message, bool * taken,
Expand Down Expand Up @@ -2221,6 +2295,16 @@ extern "C" rmw_ret_t rmw_take_with_info(
return rmw_take_int(subscription, ros_message, taken, message_info);
}

extern "C" rmw_ret_t rmw_take_sequence(
const rmw_subscription_t * subscription, size_t count,
rmw_message_sequence_t * message_sequence,
rmw_message_info_sequence_t * message_info_sequence,
size_t * taken, rmw_subscription_allocation_t * allocation)
{
static_cast<void>(allocation);
return rmw_take_seq(subscription, count, message_sequence, message_info_sequence, taken);
}

extern "C" rmw_ret_t rmw_take_serialized_message(
const rmw_subscription_t * subscription,
rmw_serialized_message_t * serialized_message,
Expand Down

0 comments on commit 224d761

Please sign in to comment.