Skip to content

Commit

Permalink
[core] add/improved IsPublished/IsSubscribed logic (5.13.x) (#1632)
Browse files Browse the repository at this point in the history
additional check for CSubscriber::GetPublisherCount() using the connection state of the matching subscriber to ensure that GetPublisherCount is increased only if the matching publisher is able to send data
  • Loading branch information
rex-schilasky authored Jun 20, 2024
1 parent 7174d7d commit 3923a1d
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 11 deletions.
2 changes: 1 addition & 1 deletion ecal/core/include/ecal/ecal_subscriber.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
18 changes: 13 additions & 5 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -295,8 +295,12 @@ namespace eCAL

iter->second->ApplyLocLayerParameter(process_id, topic_id, tlayer.type(), writer_par);
}
// inform for local publisher connection
iter->second->ApplyLocPublication(process_id, topic_id, topic_info);
// we only inform the subscriber when the publisher has already recognized at least one local subscriber
// this should avoid to set the "IsPublished" state before the publisher is able to send data
if (ecal_sample_.topic().connections_loc() > 0)
{
iter->second->ApplyLocPublication(process_id, topic_id, topic_info);
}
}
}

Expand Down Expand Up @@ -345,8 +349,12 @@ namespace eCAL
const std::string writer_par = tlayer.par_layer().SerializeAsString();
iter->second->ApplyExtLayerParameter(host_name, tlayer.type(), writer_par);
}
// inform for external publisher connection
iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info);
// we only inform the subscriber when the publisher has already recognized at least one external subscriber
// this should avoid to set the "IsPublished" state before the publisher is able to send data
if (ecal_sample_.topic().connections_ext() > 0)
{
iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info);
}
}
}

Expand Down
8 changes: 7 additions & 1 deletion ecal/core/src/pubsub/ecal_subscriber.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -250,6 +250,12 @@ namespace eCAL
return(m_datareader->RemEventCallback(type_));
}

//bool CSubscriber::IsPublished() const
//{
// if (m_datareader == nullptr) return(false);
// return(m_datareader->IsPublished());
//}

size_t CSubscriber::GetPublisherCount() const
{
if(m_datareader == nullptr) return(0);
Expand Down
3 changes: 2 additions & 1 deletion ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -96,6 +96,7 @@ namespace eCAL

bool IsCreated() const {return(m_created);}

bool IsPublished() const { return(m_loc_published || m_ext_published); }
size_t GetPublisherCount() const
{
const std::lock_guard<std::mutex> lock(m_pub_map_sync);
Expand Down
98 changes: 95 additions & 3 deletions testing/ecal/pubsub_test/src/pubsub_receive_test.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -161,8 +161,6 @@ TEST(PubSub, TimingSubscriberReceive)
EXPECT_EQ(0, eCAL::Finalize());
}



// This tests test for sporadically received empty messages which were a problem.
TEST(PubSub, SporadicEmptyReceives)
{
Expand Down Expand Up @@ -219,3 +217,97 @@ TEST(PubSub, SporadicEmptyReceives)
// finalize eCAL API
EXPECT_EQ(0, eCAL::Finalize());
}

TEST(PubSub, TestSubscriberSeen)
{
// initialize eCAL API
EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "subscriber_seen"));

// enable data loopback
eCAL::Util::EnableLoopback(true);

std::atomic<bool> subscriber_seen_at_publication_start(false);
std::atomic<bool> subscriber_seen_at_publication_end(false);

std::atomic<bool> do_start_publication(false);
std::atomic<bool> publication_finished(false);

// publishing thread
auto publisher_thread = [&]() {
eCAL::CPublisher pub("blob");
pub.ShmSetAcknowledgeTimeout(500);

int cnt(0);
const auto max_runs(1000);
while (eCAL::Ok())
{
if (do_start_publication && cnt < max_runs)
{
if (cnt == 0)
{
subscriber_seen_at_publication_start = pub.IsSubscribed();
}

pub.Send(std::to_string(cnt));
cnt++;

if (cnt == max_runs)
{
subscriber_seen_at_publication_end = pub.IsSubscribed();
publication_finished = true;
break;
}
}
}
};

// subscribing thread
auto subscriber_thread = [&]() {
eCAL::CSubscriber sub("blob");
bool received(false);
auto max_lines(10);
auto receive_lambda = [&received, &max_lines](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_)
{
if (max_lines != 0)
{
// the final log should look like this
// -----------------------------------
// Receiving 0
// Receiving 1
// Receiving 2
// Receiving 3
// Receiving 4
// Receiving 5
// Receiving 6
// Receiving 7
// Receiving 8
// Receiving 9
// -----------------------------------
std::cout << "Receiving " << std::string(static_cast<const char*>(data_->buf), data_->size) << std::endl;
max_lines--;
}
};
sub.AddReceiveCallback(receive_lambda);

while (eCAL::Ok() && !publication_finished)
{
//if (sub.IsPublished()) do_start_publication = true;
if (sub.GetPublisherCount() > 0) do_start_publication = true;
}
};

// create threads for publisher and subscriber
std::thread pub_thread(publisher_thread);
std::thread sub_thread(subscriber_thread);

// join threads to the main thread
pub_thread.join();
sub_thread.join();

// finalize eCAL API
eCAL::Finalize();

// check if the publisher has seen the subscriber
EXPECT_TRUE(subscriber_seen_at_publication_start);
EXPECT_TRUE(subscriber_seen_at_publication_end);
}

0 comments on commit 3923a1d

Please sign in to comment.