Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple latched tcp topics #3

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nimbro_topic_transport/launch/topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ topics:
compress: true # enable bz2 compression
rate: 1.0 # rate limit at 1Hz
- name: "/my_second_topic"
latch: True
3 changes: 2 additions & 1 deletion nimbro_topic_transport/src/tcp/tcp_packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace nimbro_topic_transport

enum TCPFlag
{
TCP_FLAG_COMPRESSED = (1 << 0)
TCP_FLAG_COMPRESSED = (1 << 0),
TCP_FLAG_LATCHED = (1 << 1)
};

struct TCPHeader
Expand Down
4 changes: 4 additions & 0 deletions nimbro_topic_transport/src/tcp/tcp_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,10 @@ void TCPReceiver::ClientHandler::run()
topic_info::getMsgDef(type)
);

if (header.flags() & TCP_FLAG_LATCHED) {
options.latch = true;
}

// It will take subscribers some time to connect to our publisher.
// Therefore, latch messages so they will not be lost.
// No, this is often unexpected. Instead, wait before publishing.
Expand Down
31 changes: 27 additions & 4 deletions nimbro_topic_transport/src/tcp/tcp_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ TCPSender::TCPSender()
if(entry.hasMember("compress") && ((bool)entry["compress"]) == true)
flags |= TCP_FLAG_COMPRESSED;

if(entry.hasMember("latch") && ((bool)entry["latch"]) == true)
flags |= TCP_FLAG_LATCHED;

boost::function<void(const topic_tools::ShapeShifter::ConstPtr&)> func;
func = boost::bind(&TCPSender::send, this, topic, flags, _1);
func = boost::bind(&TCPSender::send, this, topic, flags, _1, true);

ros::SubscribeOptions options;
options.initByFullCallbackType<topic_tools::ShapeShifter::ConstPtr>(topic, 20, func);
Expand Down Expand Up @@ -138,7 +141,7 @@ TCPSender::TCPSender()
boost::bind(&TCPSender::updateStats, this)
);
m_statsTimer.start();
}
}

TCPSender::~TCPSender()
{
Expand Down Expand Up @@ -218,6 +221,8 @@ bool TCPSender::connect()
ROS_WARN("Not setting TCP_USER_TIMEOUT");
#endif

this->sendLatched();

return true;
}

Expand All @@ -234,7 +239,8 @@ class PtrStream
uint8_t* m_ptr;
};

void TCPSender::send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter)
void TCPSender::send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter,
const bool reconnect)
{
#if WITH_CONFIG_SERVER
if (! (*m_enableTopic[topic])() )
Expand All @@ -250,6 +256,9 @@ void TCPSender::send(const std::string& topic, int flags, const topic_tools::Sha
if(flags & TCP_FLAG_COMPRESSED)
maxDataSize = size + size / 100 + 1200; // taken from bzip2 docs

if (flags & TCP_FLAG_LATCHED)
this->m_latchedMessages[topic] = std::make_pair(shifter, flags);

m_packet.resize(
sizeof(TCPHeader) + topic.length() + type.length() + maxDataSize
);
Expand Down Expand Up @@ -312,10 +321,12 @@ void TCPSender::send(const std::string& topic, int flags, const topic_tools::Sha
{
if(m_fd == -1)
{
if(!connect())
if(reconnect && !connect())
{
ROS_WARN("Connection failed, trying again");
continue;
} else if (!reconnect) {
break;
}
}

Expand Down Expand Up @@ -363,6 +374,18 @@ void TCPSender::updateStats()
m_sentBytesInStatsInterval = 0;
}

void TCPSender::sendLatched() {

std::map<std::string, std::pair<topic_tools::ShapeShifter::ConstPtr, int>>::iterator it =
this->m_latchedMessages.begin();

// send all latched messages
while (it != this->m_latchedMessages.end()) {
this->send(it->first, it->second.second, it->second.first, false);
it++;
}
}

}


Expand Down
8 changes: 7 additions & 1 deletion nimbro_topic_transport/src/tcp/tcp_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include <config_server/parameter.h>
#endif

#include <map>

#include <nimbro_topic_transport/SenderStats.h>

namespace nimbro_topic_transport
Expand All @@ -27,7 +29,9 @@ class TCPSender

bool connect();

void send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter);
void send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter,
const bool reconnect = true);
void sendLatched();
private:
void updateStats();

Expand All @@ -43,6 +47,8 @@ class TCPSender
std::vector<uint8_t> m_packet;
std::vector<uint8_t> m_compressionBuf;

std::map<std::string, std::pair<topic_tools::ShapeShifter::ConstPtr, int> > m_latchedMessages;

#if WITH_CONFIG_SERVER
std::map<std::string, boost::shared_ptr<config_server::Parameter<bool>>> m_enableTopic;
#endif
Expand Down
4 changes: 4 additions & 0 deletions nimbro_topic_transport/src/udp/udp_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ UDPSender::UDPSender()
if(list[i].hasMember("type"))
type = (std::string)(list[i]["type"]);

if(list[i].hasMember("latch"))
ROS_WARN_STREAM("Ignoring 'latch' flag at UDP topic " << ((std::string)list[i]["name"]).c_str() <<
" (UDP topics can't be latched).");

TopicSender* sender = new TopicSender(this, &nh, list[i]["name"], rate, resend, flags, enabled, type);

if(m_relayMode)
Expand Down