diff --git a/main.cpp b/main.cpp index 1ba33b2..a83c633 100644 --- a/main.cpp +++ b/main.cpp @@ -46,9 +46,9 @@ void appIntubator(char* const argv[]) { // {{{ len = strlen(argv[0]); if (len > 0 && len != sizeof(exePath)) { strcpy(exePath, argv[0]); - } else { - len = 0; - } + } else { + len = 0; + } } exePath[len] = '\0'; @@ -74,7 +74,7 @@ void uplinkPacketSenderWorker(LoRaPacketTrafficStats_t *loraPacketStats) { // {{ bool iterateOnceMore = false; do { - PackagedDataToSend_t packet{DequeuePacket()}; + PackagedDataToSend_t packet{DequeuePacket(UP)}; iterateOnceMore = (packet.data_len > 0); if (iterateOnceMore) { @@ -87,7 +87,7 @@ void uplinkPacketSenderWorker(LoRaPacketTrafficStats_t *loraPacketStats) { // {{ char asciiTime[25]; std::strftime(asciiTime, sizeof(asciiTime), "%c", std::localtime(&currTime)); printf("(%s) No uplink ACK received from %s\n", asciiTime, packet.destination.address.c_str()); - if (RequeuePacket(std::move(packet), 4)) + if (RequeuePacket(std::move(packet), 4, UP)) { printf("(%s) Requeued the uplink packet.\n", asciiTime); } fflush(stdout); } diff --git a/smtUdpPacketForwarder/UdpUtils.cpp b/smtUdpPacketForwarder/UdpUtils.cpp index 1e5f647..3fed29f 100644 --- a/smtUdpPacketForwarder/UdpUtils.cpp +++ b/smtUdpPacketForwarder/UdpUtils.cpp @@ -1,7 +1,7 @@ #include "UdpUtils.h" -static std::queue uplink_data_queue; -static std::timed_mutex g_uplink_data_queue_mutex; +static std::queue uplink_data_queue, downlink_data_queue; +static std::timed_mutex g_uplink_data_queue_mutex, g_downlink_data_queue_mutex; static Server_t NO_SERVER; PackagedDataToSend_t NO_PACKAGED_DATA{0UL, 0UL, {}, NO_SERVER}; @@ -118,13 +118,23 @@ NetworkConf_t PrepareNetworking(const char* networkInterfaceName, suseconds_t da return result; } // }}} -bool RequeuePacket(PackagedDataToSend_t &&packet, uint32_t maxAttempts) +bool RequeuePacket(PackagedDataToSend_t &&packet, uint32_t maxAttempts, QueueDirection direction) { if (packet.curr_attempt >= maxAttempts) { return false; } - std::unique_lock lock(g_uplink_data_queue_mutex, - std::chrono::system_clock::now() + std::chrono::seconds(2)); + std::unique_lock lock; + if (direction == UP) + { + lock = std::unique_lock(g_uplink_data_queue_mutex, + std::chrono::system_clock::now() + std::chrono::seconds(2)); + } + else if (direction == DOWN) + { + lock = std::unique_lock(g_downlink_data_queue_mutex, + std::chrono::system_clock::now() + std::chrono::seconds(2)); + } + if (!lock.owns_lock()) { printf("Failed to obtain uplink queue lock! Giving up on requeuing the packet!"); @@ -132,17 +142,31 @@ bool RequeuePacket(PackagedDataToSend_t &&packet, uint32_t maxAttempts) } packet.curr_attempt++; - uplink_data_queue.push(std::move(packet)); + + if (direction == UP) + { uplink_data_queue.push(std::move(packet)); } + else if (direction == DOWN) + { downlink_data_queue.push(std::move(packet)); } return true; } -void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest) // {{{ +void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest, QueueDirection direction) // {{{ { if (data == nullptr) return; - std::unique_lock lock(g_uplink_data_queue_mutex, - std::chrono::system_clock::now() + std::chrono::seconds(2)); + std::unique_lock lock; + if (direction == UP) + { + lock = std::unique_lock(g_uplink_data_queue_mutex, + std::chrono::system_clock::now() + std::chrono::seconds(2)); + } + else if (direction == DOWN) + { + lock = std::unique_lock(g_downlink_data_queue_mutex, + std::chrono::system_clock::now() + std::chrono::seconds(2)); + } + if (!lock.owns_lock()) { printf("Failed to obtain uplink queue lock! Giving up on that packet!"); @@ -151,18 +175,44 @@ void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest) // {{{ PackagedDataToSend_t packaged_data{ 0UL, data_length, data, dest }; - uplink_data_queue.push(std::move(packaged_data)); + if (direction == UP) + { uplink_data_queue.push(std::move(packaged_data)); } + else if (direction == DOWN) + { downlink_data_queue.push(std::move(packaged_data)); } } // }}} -PackagedDataToSend_t DequeuePacket() // {{{ +PackagedDataToSend_t DequeuePacket(QueueDirection direction) // {{{ { - std::unique_lock lock(g_uplink_data_queue_mutex, - std::chrono::system_clock::now() + std::chrono::seconds(1)); - - if (!lock.owns_lock() || uplink_data_queue.empty()) return std::move(NO_PACKAGED_DATA); + std::unique_lock lock; + if (direction == UP) + { + lock = std::unique_lock(g_uplink_data_queue_mutex, + std::chrono::system_clock::now() + std::chrono::seconds(1)); + } + else if (direction == DOWN) + { + lock = std::unique_lock(g_downlink_data_queue_mutex, + std::chrono::system_clock::now() + std::chrono::seconds(1)); + } - PackagedDataToSend_t result = std::move(uplink_data_queue.front()); - uplink_data_queue.pop(); + if (!lock.owns_lock() || (direction == UP && uplink_data_queue.empty()) || + (direction == DOWN && downlink_data_queue.empty())) + { return std::move(NO_PACKAGED_DATA); } + + PackagedDataToSend_t result = [&direction]() -> PackagedDataToSend_t { + if (direction == UP) + { + auto res = std::move(uplink_data_queue.front()); + uplink_data_queue.pop(); + return res; + } + else if (direction == DOWN) + { + auto res = std::move(downlink_data_queue.front()); + downlink_data_queue.pop(); + return res; + } + }(); lock.unlock(); @@ -250,7 +300,7 @@ void PublishStatProtocolPacket(PlatformInfo_t &cfg, LoRaPacketTrafficStats_t &pk size_t packet_sz = stat_index + json.size(); uint8_t *packet = new uint8_t[packet_sz]; memcpy(packet, status_report, packet_sz); - EnqueuePacket(packet, packet_sz, serv); + EnqueuePacket(packet, packet_sz, serv, UP); } } // }}} @@ -366,7 +416,7 @@ void PublishLoRaProtocolPacket(PlatformInfo_t &cfg, LoRaDataPkt_t &loraPacket) / size_t packet_sz = buff_index + json.size(); uint8_t *packet = new uint8_t[packet_sz]; memcpy(packet, buff_up, packet_sz); - EnqueuePacket(packet, packet_sz, serv); + EnqueuePacket(packet, packet_sz, serv, UP); } } // }}} diff --git a/smtUdpPacketForwarder/UdpUtils.h b/smtUdpPacketForwarder/UdpUtils.h index 2606377..41e4bae 100644 --- a/smtUdpPacketForwarder/UdpUtils.h +++ b/smtUdpPacketForwarder/UdpUtils.h @@ -72,15 +72,17 @@ typedef struct PackagedDataToSend } PackagedDataToSend_t; +enum QueueDirection { UP, DOWN }; + void Die(const char *s); bool SolveHostname(const char* p_hostname, uint16_t port, struct sockaddr_in* p_sin); bool SendUdp(Server_t &server, char *msg, int length, std::function &validator); NetworkConf_t PrepareNetworking(const char* networkInterfaceName, suseconds_t dataRecvTimeout, char gatewayId[25]); -void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest); -bool RequeuePacket(PackagedDataToSend_t &&packet, uint32_t maxAttempts); -PackagedDataToSend_t DequeuePacket(); +void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest, QueueDirection direction); +bool RequeuePacket(PackagedDataToSend_t &&packet, uint32_t maxAttempts, QueueDirection direction); +PackagedDataToSend_t DequeuePacket(QueueDirection direction); void PublishStatProtocolPacket(PlatformInfo_t &cfg, LoRaPacketTrafficStats_t &pktStats);