Skip to content

Commit

Permalink
Prepare queues for bidirectional data exchange (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhgzhg committed Apr 5, 2021
1 parent 689f09f commit 83de37e
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 27 deletions.
10 changes: 5 additions & 5 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ void appIntubator(char* const argv[]) { // {{{
len = strlen(argv[0]);
if (len > 0 && len != sizeof(exePath)) {
strcpy(exePath, argv[0]);
} else {
len = 0;
}
} else {
len = 0;
}
}
exePath[len] = '\0';

Expand All @@ -74,7 +74,7 @@ void uplinkPacketSenderWorker(LoRaPacketTrafficStats_t *loraPacketStats) { // {{
bool iterateOnceMore = false;
do
{
PackagedDataToSend_t packet{DequeuePacket()};
PackagedDataToSend_t packet{DequeuePacket(UP)};
iterateOnceMore = (packet.data_len > 0);
if (iterateOnceMore)
{
Expand All @@ -87,7 +87,7 @@ void uplinkPacketSenderWorker(LoRaPacketTrafficStats_t *loraPacketStats) { // {{
char asciiTime[25];
std::strftime(asciiTime, sizeof(asciiTime), "%c", std::localtime(&currTime));
printf("(%s) No uplink ACK received from %s\n", asciiTime, packet.destination.address.c_str());
if (RequeuePacket(std::move(packet), 4))
if (RequeuePacket(std::move(packet), 4, UP))
{ printf("(%s) Requeued the uplink packet.\n", asciiTime); }
fflush(stdout);
}
Expand Down
88 changes: 69 additions & 19 deletions smtUdpPacketForwarder/UdpUtils.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "UdpUtils.h"

static std::queue<PackagedDataToSend> uplink_data_queue;
static std::timed_mutex g_uplink_data_queue_mutex;
static std::queue<PackagedDataToSend> uplink_data_queue, downlink_data_queue;
static std::timed_mutex g_uplink_data_queue_mutex, g_downlink_data_queue_mutex;
static Server_t NO_SERVER;
PackagedDataToSend_t NO_PACKAGED_DATA{0UL, 0UL, {}, NO_SERVER};

Expand Down Expand Up @@ -118,31 +118,55 @@ NetworkConf_t PrepareNetworking(const char* networkInterfaceName, suseconds_t da
return result;
} // }}}

bool RequeuePacket(PackagedDataToSend_t &&packet, uint32_t maxAttempts)
bool RequeuePacket(PackagedDataToSend_t &&packet, uint32_t maxAttempts, QueueDirection direction)
{
if (packet.curr_attempt >= maxAttempts)
{ return false; }

std::unique_lock<std::timed_mutex> lock(g_uplink_data_queue_mutex,
std::chrono::system_clock::now() + std::chrono::seconds(2));
std::unique_lock<std::timed_mutex> lock;
if (direction == UP)
{
lock = std::unique_lock<std::timed_mutex>(g_uplink_data_queue_mutex,
std::chrono::system_clock::now() + std::chrono::seconds(2));
}
else if (direction == DOWN)
{
lock = std::unique_lock<std::timed_mutex>(g_downlink_data_queue_mutex,
std::chrono::system_clock::now() + std::chrono::seconds(2));
}

if (!lock.owns_lock())
{
printf("Failed to obtain uplink queue lock! Giving up on requeuing the packet!");
return false;
}

packet.curr_attempt++;
uplink_data_queue.push(std::move(packet));

if (direction == UP)
{ uplink_data_queue.push(std::move(packet)); }
else if (direction == DOWN)
{ downlink_data_queue.push(std::move(packet)); }

return true;
}

void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest) // {{{
void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest, QueueDirection direction) // {{{
{
if (data == nullptr) return;

std::unique_lock<std::timed_mutex> lock(g_uplink_data_queue_mutex,
std::chrono::system_clock::now() + std::chrono::seconds(2));
std::unique_lock<std::timed_mutex> lock;
if (direction == UP)
{
lock = std::unique_lock<std::timed_mutex>(g_uplink_data_queue_mutex,
std::chrono::system_clock::now() + std::chrono::seconds(2));
}
else if (direction == DOWN)
{
lock = std::unique_lock<std::timed_mutex>(g_downlink_data_queue_mutex,
std::chrono::system_clock::now() + std::chrono::seconds(2));
}

if (!lock.owns_lock())
{
printf("Failed to obtain uplink queue lock! Giving up on that packet!");
Expand All @@ -151,18 +175,44 @@ void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest) // {{{

PackagedDataToSend_t packaged_data{ 0UL, data_length, data, dest };

uplink_data_queue.push(std::move(packaged_data));
if (direction == UP)
{ uplink_data_queue.push(std::move(packaged_data)); }
else if (direction == DOWN)
{ downlink_data_queue.push(std::move(packaged_data)); }
} // }}}

PackagedDataToSend_t DequeuePacket() // {{{
PackagedDataToSend_t DequeuePacket(QueueDirection direction) // {{{
{
std::unique_lock<std::timed_mutex> lock(g_uplink_data_queue_mutex,
std::chrono::system_clock::now() + std::chrono::seconds(1));

if (!lock.owns_lock() || uplink_data_queue.empty()) return std::move(NO_PACKAGED_DATA);
std::unique_lock<std::timed_mutex> lock;
if (direction == UP)
{
lock = std::unique_lock<std::timed_mutex>(g_uplink_data_queue_mutex,
std::chrono::system_clock::now() + std::chrono::seconds(1));
}
else if (direction == DOWN)
{
lock = std::unique_lock<std::timed_mutex>(g_downlink_data_queue_mutex,
std::chrono::system_clock::now() + std::chrono::seconds(1));
}

PackagedDataToSend_t result = std::move(uplink_data_queue.front());
uplink_data_queue.pop();
if (!lock.owns_lock() || (direction == UP && uplink_data_queue.empty()) ||
(direction == DOWN && downlink_data_queue.empty()))
{ return std::move(NO_PACKAGED_DATA); }

PackagedDataToSend_t result = [&direction]() -> PackagedDataToSend_t {
if (direction == UP)
{
auto res = std::move(uplink_data_queue.front());
uplink_data_queue.pop();
return res;
}
else if (direction == DOWN)
{
auto res = std::move(downlink_data_queue.front());
downlink_data_queue.pop();
return res;
}
}();

lock.unlock();

Expand Down Expand Up @@ -250,7 +300,7 @@ void PublishStatProtocolPacket(PlatformInfo_t &cfg, LoRaPacketTrafficStats_t &pk
size_t packet_sz = stat_index + json.size();
uint8_t *packet = new uint8_t[packet_sz];
memcpy(packet, status_report, packet_sz);
EnqueuePacket(packet, packet_sz, serv);
EnqueuePacket(packet, packet_sz, serv, UP);
}

} // }}}
Expand Down Expand Up @@ -366,7 +416,7 @@ void PublishLoRaProtocolPacket(PlatformInfo_t &cfg, LoRaDataPkt_t &loraPacket) /
size_t packet_sz = buff_index + json.size();
uint8_t *packet = new uint8_t[packet_sz];
memcpy(packet, buff_up, packet_sz);
EnqueuePacket(packet, packet_sz, serv);
EnqueuePacket(packet, packet_sz, serv, UP);
}

} // }}}
8 changes: 5 additions & 3 deletions smtUdpPacketForwarder/UdpUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,17 @@ typedef struct PackagedDataToSend

} PackagedDataToSend_t;

enum QueueDirection { UP, DOWN };

void Die(const char *s);
bool SolveHostname(const char* p_hostname, uint16_t port, struct sockaddr_in* p_sin);
bool SendUdp(Server_t &server, char *msg, int length,
std::function<bool(char*, int, char*, int)> &validator);
NetworkConf_t PrepareNetworking(const char* networkInterfaceName, suseconds_t dataRecvTimeout, char gatewayId[25]);

void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest);
bool RequeuePacket(PackagedDataToSend_t &&packet, uint32_t maxAttempts);
PackagedDataToSend_t DequeuePacket();
void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest, QueueDirection direction);
bool RequeuePacket(PackagedDataToSend_t &&packet, uint32_t maxAttempts, QueueDirection direction);
PackagedDataToSend_t DequeuePacket(QueueDirection direction);


void PublishStatProtocolPacket(PlatformInfo_t &cfg, LoRaPacketTrafficStats_t &pktStats);
Expand Down

0 comments on commit 83de37e

Please sign in to comment.