diff --git a/rmw_connextdds_common/include/rmw_connextdds/context.hpp b/rmw_connextdds_common/include/rmw_connextdds/context.hpp index 0cd100fc..343da5be 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/context.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/context.hpp @@ -47,6 +47,7 @@ struct rmw_context_impl_s { public: rmw_dds_common::Context common; + std::mutex common_mutex; rmw_context_t * base; DDS_DomainId_t domain_id; diff --git a/rmw_connextdds_common/include/rmw_connextdds/graph_cache.hpp b/rmw_connextdds_common/include/rmw_connextdds/graph_cache.hpp index 90b7a2a3..4a2b01be 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/graph_cache.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/graph_cache.hpp @@ -29,8 +29,8 @@ rmw_connextdds_graph_finalize(rmw_context_impl_t * const ctx); rmw_ret_t rmw_connextdds_graph_publish_update( - rmw_context_impl_t * const ctx, - void * const msg); + const rmw_publisher_t * const ctx, + const void * const msg); rmw_ret_t rmw_connextdds_graph_on_node_created( diff --git a/rmw_connextdds_common/src/common/rmw_graph.cpp b/rmw_connextdds_common/src/common/rmw_graph.cpp index 98333f1f..ad461000 100644 --- a/rmw_connextdds_common/src/common/rmw_graph.cpp +++ b/rmw_connextdds_common/src/common/rmw_graph.cpp @@ -105,6 +105,10 @@ rmw_connextdds_graph_initialize(rmw_context_impl_t * const ctx) return RMW_RET_ERROR; } + ctx->common.publish_callback = [](const rmw_publisher_t * pub, const void * msg) { + return rmw_connextdds_graph_publish_update(pub, msg); + }; + pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_ALL; RMW_CONNEXT_LOG_DEBUG("creating discovery subscriber") @@ -234,6 +238,10 @@ rmw_connextdds_graph_finalize(rmw_context_impl_t * const ctx) ctx->common.sub = nullptr; } + if (nullptr != ctx->common.publish_callback) { + ctx->common.publish_callback = nullptr; + } + if (nullptr != ctx->common.pub) { if (RMW_RET_OK != rmw_connextdds_destroy_publisher(ctx, ctx->common.pub)) @@ -247,19 +255,18 @@ rmw_connextdds_graph_finalize(rmw_context_impl_t * const ctx) return RMW_RET_OK; } - rmw_ret_t rmw_connextdds_graph_publish_update( - rmw_context_impl_t * const ctx, - void * const msg) + const rmw_publisher_t * const pub, + const void * const msg) { - if (nullptr == ctx->common.pub) { + if (nullptr == pub) { RMW_CONNEXT_LOG_WARNING( "context already finalized, message not published") return RMW_RET_OK; } - if (RMW_RET_OK != rmw_publish(ctx->common.pub, msg, nullptr)) { + if (RMW_RET_OK != rmw_publish(pub, msg, nullptr)) { RMW_CONNEXT_LOG_ERROR("failed to publish discovery sample") return RMW_RET_ERROR; } @@ -272,7 +279,7 @@ rmw_connextdds_graph_on_node_created( rmw_context_impl_t * const ctx, const rmw_node_t * const node) { - std::lock_guard guard(ctx->common.node_update_mutex); + std::lock_guard guard(ctx->common_mutex); RMW_CONNEXT_LOG_DEBUG_A( "[graph] local node created: " @@ -284,17 +291,11 @@ rmw_connextdds_graph_on_node_created( reinterpret_cast(ctx->common.gid.data)[1], reinterpret_cast(ctx->common.gid.data)[2], reinterpret_cast(ctx->common.gid.data)[3]) - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - ctx->common.graph_cache.add_node( - ctx->common.gid, node->name, node->namespace_); - if (RMW_RET_OK != - rmw_connextdds_graph_publish_update( - ctx, reinterpret_cast(&msg))) - { - static_cast( - ctx->common.graph_cache.remove_node( - ctx->common.gid, node->name, node->namespace_)); + rmw_ret_t rmw_ret = ctx->common.add_node_graph( + node->name, node->namespace_); + if (RMW_RET_OK != rmw_ret) { + RMW_CONNEXT_LOG_ERROR("failed to publish discovery sample") return RMW_RET_ERROR; } @@ -306,16 +307,12 @@ rmw_connextdds_graph_on_node_deleted( rmw_context_impl_t * const ctx, const rmw_node_t * const node) { - std::lock_guard guard(ctx->common.node_update_mutex); - - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - ctx->common.graph_cache.remove_node( - ctx->common.gid, node->name, node->namespace_); + std::lock_guard guard(ctx->common_mutex); - if (RMW_RET_OK != - rmw_connextdds_graph_publish_update( - ctx, reinterpret_cast(&msg))) + if (RMW_RET_OK != ctx->common.remove_node_graph( + node->name, node->namespace_)) { + RMW_CONNEXT_LOG_ERROR("failed to publish discovery sample") return RMW_RET_ERROR; } @@ -329,7 +326,7 @@ rmw_connextdds_graph_on_publisher_created( const rmw_node_t * const node, RMW_Connext_Publisher * const pub) { - std::lock_guard guard(ctx->common.node_update_mutex); + std::lock_guard guard(ctx->common_mutex); rmw_ret_t rc = rmw_connextdds_graph_add_local_publisherEA(ctx, node, pub); if (RMW_RET_OK != rc) { @@ -337,21 +334,13 @@ rmw_connextdds_graph_on_publisher_created( } const rmw_gid_t gid = *pub->gid(); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - ctx->common.graph_cache.associate_writer( + rc = ctx->common.add_publisher_graph( gid, - ctx->common.gid, - node->name, - node->namespace_); - rc = rmw_connextdds_graph_publish_update(ctx, reinterpret_cast(&msg)); + node->name, node->namespace_); + if (RMW_RET_OK != rc) { DDS_InstanceHandle_t ih = pub->instance_handle(); rmw_connextdds_graph_remove_entityEA(ctx, &ih, false); - static_cast(ctx->common.graph_cache.dissociate_writer( - gid, - ctx->common.gid, - node->name, - node->namespace_)); } return rc; } @@ -364,7 +353,7 @@ rmw_connextdds_graph_on_publisher_deleted( { rmw_ret_t rc = RMW_RET_ERROR; bool failed = false; - std::lock_guard guard(ctx->common.node_update_mutex); + std::lock_guard guard(ctx->common_mutex); DDS_InstanceHandle_t ih = pub->instance_handle(); rc = rmw_connextdds_graph_remove_entityEA(ctx, &ih, false /* is_reader */); @@ -373,13 +362,10 @@ rmw_connextdds_graph_on_publisher_deleted( failed = true; } - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - ctx->common.graph_cache.dissociate_writer( + rc = ctx->common.remove_publisher_graph( *pub->gid(), - ctx->common.gid, - node->name, - node->namespace_); - rc = rmw_connextdds_graph_publish_update(ctx, reinterpret_cast(&msg)); + node->name, node->namespace_); + if (RMW_RET_OK != rc) { return rc; } @@ -392,20 +378,16 @@ rmw_connextdds_graph_on_subscriber_created( const rmw_node_t * const node, RMW_Connext_Subscriber * const sub) { - std::lock_guard guard(ctx->common.node_update_mutex); + std::lock_guard guard(ctx->common_mutex); rmw_ret_t rc = rmw_connextdds_graph_add_local_subscriberEA(ctx, node, sub); if (RMW_RET_OK != rc) { return rc; } const rmw_gid_t gid = *sub->gid(); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - ctx->common.graph_cache.associate_reader( + rc = ctx->common.add_subscriber_graph( gid, - ctx->common.gid, - node->name, - node->namespace_); - rc = rmw_connextdds_graph_publish_update(ctx, reinterpret_cast(&msg)); + node->name, node->namespace_); if (RMW_RET_OK != rc) { DDS_InstanceHandle_t ih = sub->instance_handle(); rmw_connextdds_graph_remove_entityEA(ctx, &ih, true); @@ -426,7 +408,7 @@ rmw_connextdds_graph_on_subscriber_deleted( { rmw_ret_t rc = RMW_RET_ERROR; bool failed = false; - std::lock_guard guard(ctx->common.node_update_mutex); + std::lock_guard guard(ctx->common_mutex); DDS_InstanceHandle_t ih = sub->instance_handle(); rc = rmw_connextdds_graph_remove_entityEA(ctx, &ih, true /* is_reader */); @@ -435,13 +417,9 @@ rmw_connextdds_graph_on_subscriber_deleted( failed = true; } - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - ctx->common.graph_cache.dissociate_reader( + rc = ctx->common.remove_subscriber_graph( *sub->gid(), - ctx->common.gid, - node->name, - node->namespace_); - rc = rmw_connextdds_graph_publish_update(ctx, reinterpret_cast(&msg)); + node->name, node->namespace_); if (RMW_RET_OK != rc) { return rc; } @@ -455,7 +433,7 @@ rmw_connextdds_graph_on_service_created( const rmw_node_t * const node, RMW_Connext_Service * const svc) { - std::lock_guard guard(ctx->common.node_update_mutex); + std::lock_guard guard(ctx->common_mutex); const rmw_gid_t pub_gid = *svc->publisher()->gid(), sub_gid = *svc->subscriber()->gid(); @@ -480,38 +458,20 @@ rmw_connextdds_graph_on_service_created( if (RMW_RET_OK != rc) { return rc; } + // set it so that it can be removed in the `scope_exit_entities_reset` + added_sub = true; rc = rmw_connextdds_graph_add_local_publisherEA(ctx, node, svc->publisher()); if (RMW_RET_OK != rc) { return rc; } + added_pub = true; - (void)ctx->common.graph_cache.associate_writer( - pub_gid, - ctx->common.gid, - node->name, - node->namespace_); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - ctx->common.graph_cache.associate_reader( - sub_gid, - ctx->common.gid, - node->name, - node->namespace_); - - if (RMW_RET_OK != - rmw_connextdds_graph_publish_update( - ctx, reinterpret_cast(&msg))) - { - (void)ctx->common.graph_cache.dissociate_writer( - pub_gid, - ctx->common.gid, - node->name, - node->namespace_); - (void)ctx->common.graph_cache.dissociate_reader( + if (RMW_RET_OK != ctx->common.add_service_graph( sub_gid, - ctx->common.gid, - node->name, - node->namespace_); + pub_gid, + node->name, node->namespace_)) + { return RMW_RET_ERROR; } @@ -525,7 +485,7 @@ rmw_connextdds_graph_on_service_deleted( const rmw_node_t * const node, RMW_Connext_Service * const svc) { - std::lock_guard guard(ctx->common.node_update_mutex); + std::lock_guard guard(ctx->common_mutex); bool failed = false; DDS_InstanceHandle_t ih = svc->subscriber()->instance_handle(); rmw_ret_t rc = rmw_connextdds_graph_remove_entityEA(ctx, &ih, true); @@ -535,19 +495,11 @@ rmw_connextdds_graph_on_service_deleted( rc = rmw_connextdds_graph_remove_entityEA(ctx, &ih, false); failed = failed && (RMW_RET_OK == rc); - (void)ctx->common.graph_cache.dissociate_writer( - *svc->publisher()->gid(), - ctx->common.gid, - node->name, - node->namespace_); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - ctx->common.graph_cache.dissociate_reader( + rc = ctx->common.remove_service_graph( *svc->subscriber()->gid(), - ctx->common.gid, - node->name, - node->namespace_); + *svc->publisher()->gid(), + node->name, node->namespace_); - rc = rmw_connextdds_graph_publish_update(ctx, reinterpret_cast(&msg)); failed = failed && (RMW_RET_OK == rc); return failed ? RMW_RET_ERROR : RMW_RET_OK; @@ -559,7 +511,7 @@ rmw_connextdds_graph_on_client_created( const rmw_node_t * const node, RMW_Connext_Client * const client) { - std::lock_guard guard(ctx->common.node_update_mutex); + std::lock_guard guard(ctx->common_mutex); const rmw_gid_t pub_gid = *client->publisher()->gid(), sub_gid = *client->subscriber()->gid(); @@ -584,38 +536,19 @@ rmw_connextdds_graph_on_client_created( if (RMW_RET_OK != rc) { return rc; } + added_sub = true; rc = rmw_connextdds_graph_add_local_publisherEA(ctx, node, client->publisher()); if (RMW_RET_OK != rc) { return rc; } + added_pub = true; - (void)ctx->common.graph_cache.associate_writer( - pub_gid, - ctx->common.gid, - node->name, - node->namespace_); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - ctx->common.graph_cache.associate_reader( - sub_gid, - ctx->common.gid, - node->name, - node->namespace_); - - if (RMW_RET_OK != - rmw_connextdds_graph_publish_update( - ctx, reinterpret_cast(&msg))) - { - (void)ctx->common.graph_cache.dissociate_writer( + if (RMW_RET_OK != ctx->common.add_client_graph( pub_gid, - ctx->common.gid, - node->name, - node->namespace_); - (void)ctx->common.graph_cache.dissociate_reader( sub_gid, - ctx->common.gid, - node->name, - node->namespace_); + node->name, node->namespace_)) + { return RMW_RET_ERROR; } @@ -629,7 +562,7 @@ rmw_connextdds_graph_on_client_deleted( const rmw_node_t * const node, RMW_Connext_Client * const client) { - std::lock_guard guard(ctx->common.node_update_mutex); + std::lock_guard guard(ctx->common_mutex); bool failed = false; DDS_InstanceHandle_t ih = client->subscriber()->instance_handle(); @@ -640,19 +573,10 @@ rmw_connextdds_graph_on_client_deleted( rc = rmw_connextdds_graph_remove_entityEA(ctx, &ih, false); failed = failed && (RMW_RET_OK == rc); - (void)ctx->common.graph_cache.dissociate_writer( + rc = ctx->common.remove_client_graph( *client->publisher()->gid(), - ctx->common.gid, - node->name, - node->namespace_); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - ctx->common.graph_cache.dissociate_reader( *client->subscriber()->gid(), - ctx->common.gid, - node->name, - node->namespace_); - - rc = rmw_connextdds_graph_publish_update(ctx, reinterpret_cast(&msg)); + node->name, node->namespace_); failed = failed && (RMW_RET_OK == rc); return failed ? RMW_RET_ERROR : RMW_RET_OK; @@ -678,7 +602,7 @@ rmw_connextdds_graph_on_participant_info(rmw_context_impl_t * ctx) reinterpret_cast(&msg.gid.data)[1], reinterpret_cast(&msg.gid.data)[2], reinterpret_cast(&msg.gid.data)[3]) - std::lock_guard guard(ctx->common.node_update_mutex); + std::lock_guard guard(ctx->common_mutex); ctx->common.graph_cache.update_participant_entities(msg); } } while (taken); @@ -720,7 +644,7 @@ rmw_connextdds_graph_add_participant( reinterpret_cast(dp_guid.value)[3], enclave_str.c_str()) - std::lock_guard guard(ctx->common.node_update_mutex); + std::lock_guard guard(ctx->common_mutex); ctx->common.graph_cache.add_participant(gid, enclave_str); return RMW_RET_OK; @@ -982,7 +906,7 @@ rmw_connextdds_graph_add_remote_entity( const DDS_LifespanQosPolicy * const lifespan, const bool is_reader) { - std::lock_guard guard(ctx->common.node_update_mutex); + std::lock_guard guard(ctx->common_mutex); rmw_gid_t gid; rmw_gid_t dp_gid; @@ -1039,7 +963,7 @@ rmw_connextdds_graph_remove_participant( { rmw_gid_t dp_gid; rmw_connextdds_ih_to_gid(*instance, dp_gid); - std::lock_guard guard(ctx->common.node_update_mutex); + std::lock_guard guard(ctx->common_mutex); ctx->common.graph_cache.remove_participant(dp_gid); return RMW_RET_OK; } @@ -1065,6 +989,6 @@ rmw_connextdds_graph_remove_entity( const DDS_InstanceHandle_t * const instance, const bool is_reader) { - std::lock_guard guard(ctx->common.node_update_mutex); + std::lock_guard guard(ctx->common_mutex); return rmw_connextdds_graph_remove_entityEA(ctx, instance, is_reader); }