Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Cyclone DDS v0.10.x and master + ROS 2 type definitions in DDS #501

Draft
wants to merge 4 commits into
base: rolling
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions rmw_cyclonedds_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ find_package(tracetools REQUIRED)

find_package(CycloneDDS QUIET CONFIG)
if(CycloneDDS_FOUND)
# Support for shared memory in RMW depends on support for shared memory being compiled in
# Cyclone DDS and iceoryx_binding_c being available
get_target_property(_cyclonedds_has_shm CycloneDDS::ddsc SHM_SUPPORT_IS_AVAILABLE)
if(_cyclonedds_has_shm)
find_package(iceoryx_binding_c REQUIRED)
else()
message(STATUS "Cyclone DDS is NOT compiled with support for shared memory")
if(CycloneDDS_VERSION VERSION_LESS 0.11)
# Support for shared memory in RMW depends on support for shared memory being compiled
# in Cyclone DDS for Cyclone version 0.10.x (and older) and iceoryx_binding_c being
# available
get_target_property(_cyclonedds_has_shm CycloneDDS::ddsc SHM_SUPPORT_IS_AVAILABLE)
if(_cyclonedds_has_shm)
find_package(iceoryx_binding_c REQUIRED)
else()
message(STATUS "Cyclone DDS is NOT compiled with support for shared memory")
endif()
endif()
else()
message(WARNING "Could not find Eclipse Cyclone DDS - skipping '${PROJECT_NAME}'")
Expand Down Expand Up @@ -80,6 +83,9 @@ add_library(rmw_cyclonedds_cpp
src/TypeSupport2.cpp
src/TypeSupport.cpp)

configure_file("src/cdds_version.hpp.in" "src/cdds_version.hpp")
target_include_directories(rmw_cyclonedds_cpp PRIVATE ${CMAKE_BINARY_DIR}/src)

target_link_libraries(rmw_cyclonedds_cpp PUBLIC
rmw::rmw)

Expand Down
1 change: 1 addition & 0 deletions rmw_cyclonedds_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<buildtool_depend>ament_cmake_ros</buildtool_depend>

<depend>cyclonedds</depend>
<!-- Iceoryx not a build dependency for Cyclone versions greater than 0.10 -->
<depend>iceoryx_binding_c</depend>
<depend>rcutils</depend>
<depend>rcpputils</depend>
Expand Down
22 changes: 22 additions & 0 deletions rmw_cyclonedds_cpp/src/cdds_version.hpp.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2024 ZettaScale Technology
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef CDDS_VERSION_HPP_
#define CDDS_VERSION_HPP_

#define CDDS_VERSION_0_10 10
#define CDDS_VERSION_0_11 11

#define CDDS_VERSION (@CycloneDDS_VERSION_MAJOR@ * 100 + @CycloneDDS_VERSION_MINOR@)

#endif // CDDS_VERSION_HPP_
103 changes: 82 additions & 21 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,16 @@
#include "namespace_prefix.hpp"

#include "dds/dds.h"
#include "cdds_version.hpp"
#if CDDS_VERSION == CDDS_VERSION_0_10 && defined DDS_HAS_SHM
#include "dds/ddsc/dds_data_allocator.h"
#include "dds/ddsc/dds_loan_api.h"
#endif
#include "serdes.hpp"
#include "serdata.hpp"
#include "demangle.hpp"


using namespace std::literals::chrono_literals;

/* Security must be enabled when compiling and requires cyclone to support QOS property lists */
Expand All @@ -102,10 +106,6 @@ using namespace std::literals::chrono_literals;
#define RMW_SUPPORT_SECURITY 0
#endif

#if !DDS_HAS_DDSI_SERTYPE
#define ddsi_sertype_unref(x) ddsi_sertopic_unref(x)
#endif

/* Set to > 0 for printing warnings to stderr for each messages that was taken more than this many
ms after writing */
#define REPORT_LATE_MESSAGES 0
Expand Down Expand Up @@ -343,7 +343,9 @@ struct CddsPublisher : CddsEntity
rmw_gid_t gid;
struct ddsi_sertype * sertype;
rosidl_message_type_support_t type_supports;
#if CDDS_VERSION == CDDS_VERSION_0_10
dds_data_allocator_t data_allocator;
#endif
uint32_t sample_size;
bool is_loaning_available;
user_callback_data_t user_callback_data;
Expand All @@ -354,7 +356,9 @@ struct CddsSubscription : CddsEntity
rmw_gid_t gid;
dds_entity_t rdcondh;
rosidl_message_type_support_t type_supports;
#if CDDS_VERSION == CDDS_VERSION_0_10
dds_data_allocator_t data_allocator;
#endif
bool is_loaning_available;
user_callback_data_t user_callback_data;
};
Expand Down Expand Up @@ -1216,7 +1220,6 @@ static bool check_create_domain(dds_domainid_t did, rmw_discovery_options_t * di
}
config += "</Peers>";
}

/* NOTE: Empty configuration fragments are ignored, so it is safe to
unconditionally append a comma. */
config += "</Discovery></Domain></CycloneDDS>,";
Expand Down Expand Up @@ -1530,6 +1533,7 @@ static void * init_and_alloc_sample(
entityT & entity, const uint32_t sample_size, const bool alloc_on_heap = false)
{
// initialise the data allocator
#if CDDS_VERSION == CDDS_VERSION_0_10
if (alloc_on_heap) {
if (dds_data_allocator_init_heap(&entity->data_allocator) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("Reader data allocator initialization failed for heap");
Expand All @@ -1544,6 +1548,9 @@ static void * init_and_alloc_sample(
// allocate memory for message + header
// the header will be initialized and the chunk pointer will be returned
auto chunk_ptr = dds_data_allocator_alloc(&entity->data_allocator, sample_size);
#else
auto chunk_ptr = dds_request_loan_of_size(entity->enth, sample_size);
#endif
RMW_CHECK_FOR_NULL_WITH_MSG(
chunk_ptr,
"Failed to get loan",
Expand All @@ -1559,6 +1566,7 @@ static rmw_ret_t fini_and_free_sample(entityT & entity, void * loaned_message)
{
// fini the message
rmw_cyclonedds_cpp::fini_message(&entity->type_supports, loaned_message);
#if CDDS_VERSION == CDDS_VERSION_0_10
// free the message memory
if (dds_data_allocator_free(&entity->data_allocator, loaned_message) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("Failed to free the loaned message");
Expand All @@ -1569,6 +1577,12 @@ static rmw_ret_t fini_and_free_sample(entityT & entity, void * loaned_message)
RMW_SET_ERROR_MSG("Failed to fini data allocator");
return RMW_RET_ERROR;
}
#else
if (dds_return_loan(entity->enth, loaned_message) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("Failed to free the loaned message");
return RMW_RET_ERROR;
}
#endif
return RMW_RET_OK;
}

Expand Down Expand Up @@ -1899,12 +1913,7 @@ static dds_entity_t create_topic(
struct ddsi_sertype ** stact)
{
dds_entity_t tp;
#if DDS_HAS_DDSI_SERTYPE
tp = dds_create_topic_sertype(pp, name, &sertype, nullptr, nullptr, nullptr);
#else
static_cast<void>(name);
tp = dds_create_topic_generic(pp, &sertype, nullptr, nullptr, nullptr);
#endif
if (tp < 0) {
ddsi_sertype_unref(sertype);
} else {
Expand Down Expand Up @@ -2278,6 +2287,7 @@ static rmw_qos_policy_kind_t dds_qos_policy_to_rmw_qos_policy(dds_qos_policy_id_
case DDS_LIFESPAN_QOS_POLICY_ID:
return RMW_QOS_POLICY_LIFESPAN;
default:
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "%d", policy_id);
return RMW_QOS_POLICY_INVALID;
}
}
Expand Down Expand Up @@ -2444,10 +2454,15 @@ static CddsPublisher * create_cdds_publisher(
std::string fqtopic_name = make_fqtopic(ROS_TOPIC_PREFIX, topic_name, "", qos_policies);
bool is_fixed_type = is_type_self_contained(type_support);
uint32_t sample_size = static_cast<uint32_t>(rmw_cyclonedds_cpp::get_message_size(type_support));

auto sertype = create_sertype(
type_support->typesupport_identifier,
create_message_type_support(type_support->data, type_support->typesupport_identifier), false,
rmw_cyclonedds_cpp::make_message_value_type(type_supports), sample_size, is_fixed_type);
create_message_type_support(type_support->data, type_support->typesupport_identifier),
false,
rmw_cyclonedds_cpp::make_message_value_type(type_supports),
sample_size, is_fixed_type);
create_msg_dds_dynamic_type(type_support->typesupport_identifier, type_support->data, dds_ppant,
sertype);
struct ddsi_sertype * stact = nullptr;
topic = create_topic(dds_ppant, fqtopic_name.c_str(), sertype, &stact);

Expand All @@ -2472,14 +2487,24 @@ static CddsPublisher * create_cdds_publisher(
RMW_SET_ERROR_MSG("failed to get instance handle for writer");
goto fail_instance_handle;
}

get_entity_gid(pub->enth, pub->gid);
pub->sertype = stact;
dds_delete_listener(listener);
pub->type_supports = *type_supports;
#if CDDS_VERSION == CDDS_VERSION_0_10
pub->is_loaning_available = is_fixed_type && dds_is_loan_available(pub->enth);
#else
// Some form of loaning is always possible, but the exact behaviour depends on type and
// whether or not Iceoryx can be used. I'm not sure to what the expectations are
// exactly, this should keep it essentially unchanged from the behaviour in Humble and
// Iron.
pub->is_loaning_available = is_fixed_type && dds_is_shared_memory_available(pub->enth);
#endif
pub->sample_size = sample_size;
dds_delete_qos(qos);
dds_delete(topic);

return pub;

fail_instance_handle:
Expand Down Expand Up @@ -2948,10 +2973,15 @@ static CddsSubscription * create_cdds_subscription(
std::string fqtopic_name = make_fqtopic(ROS_TOPIC_PREFIX, topic_name, "", qos_policies);
bool is_fixed_type = is_type_self_contained(type_support);
uint32_t sample_size = static_cast<uint32_t>(rmw_cyclonedds_cpp::get_message_size(type_support));

auto sertype = create_sertype(
type_support->typesupport_identifier,
create_message_type_support(type_support->data, type_support->typesupport_identifier), false,
rmw_cyclonedds_cpp::make_message_value_type(type_supports), sample_size, is_fixed_type);
create_message_type_support(type_support->data, type_support->typesupport_identifier),
false,
rmw_cyclonedds_cpp::make_message_value_type(type_supports),
sample_size, is_fixed_type);
create_msg_dds_dynamic_type(type_support->typesupport_identifier, type_support->data, dds_ppant,
sertype);
topic = create_topic(dds_ppant, fqtopic_name.c_str(), sertype);

dds_listener_t * listener = dds_create_listener(&sub->user_callback_data);
Expand Down Expand Up @@ -2981,9 +3011,18 @@ static CddsSubscription * create_cdds_subscription(
}
dds_delete_listener(listener);
sub->type_supports = *type_support;
#if CDDS_VERSION == CDDS_VERSION_0_10
sub->is_loaning_available = is_fixed_type && dds_is_loan_available(sub->enth);
#else
// Some form of loaning is always possible, but the exact behaviour depends on type and
// whether or not Iceoryx can be used. I'm not sure to what the expectations are
// exactly, this should keep it essentially unchanged from the behaviour in Humble and
// Iron.
sub->is_loaning_available = is_fixed_type && dds_is_shared_memory_available(sub->enth);
#endif
dds_delete_qos(qos);
dds_delete(topic);

return sub;
fail_readcond:
if (dds_delete(sub->enth) < 0) {
Expand Down Expand Up @@ -3586,10 +3625,12 @@ static rmw_ret_t rmw_take_loan_int(
return RMW_RET_ERROR;
}
*taken = true;
#if CDDS_VERSION == CDDS_VERSION_0_10
// doesn't allocate, but initialise the allocator to free the chunk later when the loan
// is returned
dds_data_allocator_init(
cdds_subscription->enth, &cdds_subscription->data_allocator);
#endif
// set the loaned chunk to null, so that the loaned chunk is not release in
// rmw_serdata_free(), but will be released when
// `rmw_return_loaned_message_from_subscription()` is called
Expand Down Expand Up @@ -4954,6 +4995,7 @@ static rmw_ret_t rmw_init_cs(
std::string user_data;

std::unique_ptr<rmw_cyclonedds_cpp::StructValueType> pub_msg_ts, sub_msg_ts;
struct sertype_rmw * pub_st, * sub_st;

dds_listener_t * listener = dds_create_listener(cb_data);
dds_lset_data_available_arg(listener, dds_listener_callback, cb_data, false);
Expand All @@ -4973,6 +5015,19 @@ static rmw_ret_t rmw_init_cs(
subtopic_name =
make_fqtopic(ROS_SERVICE_REQUESTER_PREFIX, service_name, "Request", qos_policies);
pubtopic_name = make_fqtopic(ROS_SERVICE_RESPONSE_PREFIX, service_name, "Reply", qos_policies);

pub_st = create_sertype(
type_support->typesupport_identifier, pub_type_support, true,
std::move(pub_msg_ts), 0U, false
);
create_res_dds_dynamic_type(type_support->typesupport_identifier, type_support->data,
node->context->impl->ppant, pub_st);
sub_st = create_sertype(
type_support->typesupport_identifier, sub_type_support, true,
std::move(sub_msg_ts), 0U, false
);
create_req_dds_dynamic_type(type_support->typesupport_identifier, type_support->data,
node->context->impl->ppant, sub_st);
} else {
std::tie(pub_msg_ts, sub_msg_ts) =
rmw_cyclonedds_cpp::make_request_response_value_types(type_supports);
Expand All @@ -4988,6 +5043,19 @@ static rmw_ret_t rmw_init_cs(
pubtopic_name =
make_fqtopic(ROS_SERVICE_REQUESTER_PREFIX, service_name, "Request", qos_policies);
subtopic_name = make_fqtopic(ROS_SERVICE_RESPONSE_PREFIX, service_name, "Reply", qos_policies);

pub_st = create_sertype(
type_support->typesupport_identifier, pub_type_support, true,
std::move(pub_msg_ts), 0U, false
);
create_req_dds_dynamic_type(type_support->typesupport_identifier, type_support->data,
node->context->impl->ppant, pub_st);
sub_st = create_sertype(
type_support->typesupport_identifier, sub_type_support, true,
std::move(sub_msg_ts), 0U, false
);
create_res_dds_dynamic_type(type_support->typesupport_identifier, type_support->data,
node->context->impl->ppant, sub_st);
}

RCUTILS_LOG_DEBUG_NAMED(
Expand All @@ -4998,21 +5066,14 @@ static rmw_ret_t rmw_init_cs(
RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "***********");

dds_entity_t pubtopic, subtopic;
struct sertype_rmw * pub_st, * sub_st;

pub_st = create_sertype(
type_support->typesupport_identifier, pub_type_support, true,
std::move(pub_msg_ts));
struct ddsi_sertype * pub_stact;
pubtopic = create_topic(node->context->impl->ppant, pubtopic_name.c_str(), pub_st, &pub_stact);
if (pubtopic < 0) {
set_error_message_from_create_topic(pubtopic, pubtopic_name);
goto fail_pubtopic;
}

sub_st = create_sertype(
type_support->typesupport_identifier, sub_type_support, true,
std::move(sub_msg_ts));
subtopic = create_topic(node->context->impl->ppant, subtopic_name.c_str(), sub_st);
if (subtopic < 0) {
set_error_message_from_create_topic(subtopic, subtopic_name);
Expand Down
Loading