diff --git a/BedrockServer.cpp b/BedrockServer.cpp index b56cc80bd..c1f6559c3 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -1299,6 +1299,19 @@ BedrockServer::BedrockServer(const SData& args_) SSyslogFunc = &SSyslogSocketDirect; } + // Log destination: rsyslog, fluentd, or both. Default is rsyslog. + // Fluentd defaults: 127.0.0.1:24224, tag=bedrock. Override with -fluentdHost, -fluentdPort, -fluentdTag + string logDestination = args.isSet("-logDestination") ? args["-logDestination"] : "rsyslog"; + if (logDestination == "fluentd" || logDestination == "both") { + string host = args.isSet("-fluentdHost") ? args["-fluentdHost"] : "127.0.0.1"; + int port = args.isSet("-fluentdPort") ? SToInt(args["-fluentdPort"]) : 24224; + string tag = args.isSet("-fluentdTag") ? args["-fluentdTag"] : "bedrock"; + SFluentdInitialize(host, port, tag); + } + if (logDestination == "fluentd") { + SSyslogFunc = &SSyslogNoop; + } + // Check for commands that will be forced to use QUORUM write consistency. if (args.isSet("-synchronousCommands")) { list syncCommands; diff --git a/libstuff/SFluentdLogger.cpp b/libstuff/SFluentdLogger.cpp new file mode 100644 index 000000000..0b921995b --- /dev/null +++ b/libstuff/SFluentdLogger.cpp @@ -0,0 +1,91 @@ +#include +#include + +#include +#include +#include +#include +#include + +SFluentdLogger::SFluentdLogger(const string& host, int port) : host(host), port(port), running(true) +{ + auto [thread, future] = SThread(&SFluentdLogger::senderLoop, this); + senderThread = move(thread); +} + +SFluentdLogger::~SFluentdLogger() +{ + running.store(false); + if (senderThread.joinable()) { + senderThread.join(); + } +} + +int SFluentdLogger::openSocket() +{ + int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (fd == -1) { + return -1; + } + + struct sockaddr_in addr = {}; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + inet_pton(AF_INET, host.data(), &addr.sin_addr); + + if (connect(fd, (struct sockaddr*) &addr, sizeof(addr)) == -1) { + close(fd); + return -1; + } + + return fd; +} + +bool SFluentdLogger::sendAll(int fd, const string& data) +{ + size_t sent = 0; + while (sent < data.size()) { + ssize_t n = send(fd, data.data() + sent, data.size() - sent, MSG_NOSIGNAL); + if (n <= 0) { + return false; + } + sent += n; + } + return true; +} + +void SFluentdLogger::senderLoop() +{ + int fd = -1; + + while (true) { + auto entry = buffer.pop(); + + if (!entry.has_value()) { + if (!running.load()) { + break; + } + this_thread::sleep_for(chrono::milliseconds(1)); + continue; + } + + if (fd == -1) { + fd = openSocket(); + } + + if (!sendAll(fd, entry.value())) { + close(fd); + fd = -1; + syslog(LOG_WARNING, "%s", entry.value().data()); + } + } + + if (fd != -1) { + close(fd); + } +} + +bool SFluentdLogger::log(string&& json) +{ + return buffer.push(move(json)); +} diff --git a/libstuff/SFluentdLogger.h b/libstuff/SFluentdLogger.h new file mode 100644 index 000000000..0915e39b8 --- /dev/null +++ b/libstuff/SFluentdLogger.h @@ -0,0 +1,28 @@ +#pragma once + +#include +#include +#include + +#include + +using namespace std; + +class SFluentdLogger { +public: + SFluentdLogger(const string& host, int port); + ~SFluentdLogger(); + + bool log(string&& json); + +private: + int openSocket(); + bool sendAll(int fd, const string& data); + void senderLoop(); + + string host; + int port; + atomic running{false}; + SRingBuffer buffer; + thread senderThread; +}; diff --git a/libstuff/SLog.cpp b/libstuff/SLog.cpp index 450992b54..9eb260a44 100644 --- a/libstuff/SLog.cpp +++ b/libstuff/SLog.cpp @@ -104,3 +104,8 @@ void SWhitelistLogParams(const set& params) { PARAMS_WHITELIST.insert(params.begin(), params.end()); } + +bool SIsLogParamWhitelisted(const string& key) +{ + return SContains(PARAMS_WHITELIST, key); +} diff --git a/libstuff/SRingBuffer.h b/libstuff/SRingBuffer.h new file mode 100644 index 000000000..c60245572 --- /dev/null +++ b/libstuff/SRingBuffer.h @@ -0,0 +1,71 @@ +#pragma once + +#include +#include +#include +#include + +using namespace std; + +// 10M items, ~1GB at 100 bytes/item, ~200 seconds buffer at 50K items/sec +constexpr size_t SRINGBUFFER_DEFAULT_CAPACITY = 10'000'000; + +/* + * Lock free multi producer, single consumer ring buffer. Used for Fluentd logging + */ +template class SRingBuffer { +public: + struct BufferElement + { + T data; + + // Prevents consumer from reading partially-written data. Producer reserves slot with CAS, then writes data, then signals ready. + atomic isReady{false}; + }; + + bool push(T&& data) + { + size_t currentTail = tail.load(memory_order_relaxed); + + while (true) { + if (currentTail - head.load(memory_order_acquire) >= C) { + return false; + } + if (tail.compare_exchange_weak(currentTail, currentTail + 1, memory_order_acq_rel, memory_order_relaxed)) { + break; + } + } + + size_t index = currentTail % C; + buffer[index].data = move(data); + buffer[index].isReady.store(true, memory_order_release); + + return true; + } + + optional pop() + { + size_t currentHead = head.load(); + size_t index = currentHead % C; + + if (!buffer[index].isReady.load(memory_order_acquire)) { + return nullopt; + } + + T bufferData = move(buffer[index].data); + buffer[index].isReady.store(false, memory_order_release); + + head.store(currentHead + 1, memory_order_release); + + return bufferData; + } + +private: + array buffer; + + // Single consumer reads from here + atomic head{0}; + + // Multiple producers write here + atomic tail{0}; +}; diff --git a/libstuff/libstuff.cpp b/libstuff/libstuff.cpp index 0af27cb04..54df47d72 100644 --- a/libstuff/libstuff.cpp +++ b/libstuff/libstuff.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include // Additional headers @@ -280,6 +281,44 @@ void SSyslogSocketDirect(int priority, const char* format, ...) } } +void SSyslogNoop(int priority, const char* format, ...) +{ +} + +static unique_ptr fluentdLogger; +static string fluentdTag; + +void SFluentdInitialize(const string& host, int port, const string& tag) +{ + fluentdTag = tag; + fluentdLogger = make_unique(host, port); +} + +void SFluentdLog(int priority, const string& message, const STable& params) +{ + if (!fluentdLogger) { + return; + } + + STable record; + record["_timestamp"] = to_string(time(nullptr)); + record["_priority"] = to_string(priority); + record["_thread_name"] = SThreadLogName; + record["_thread_prefix"] = SThreadLogPrefix; + record["_process"] = SProcessName; + record["_message"] = message; + record["_tag"] = fluentdTag; + + for (const auto& [key, value] : params) { + record[key] = SIsLogParamWhitelisted(key) ? value : ""; + } + string json = SComposeJSONObject(record) + "\n"; + + if (!fluentdLogger->log(move(json))) { + syslog(priority, "%s", message.data()); + } +} + ///////////////////////////////////////////////////////////////////////////// // Math stuff ///////////////////////////////////////////////////////////////////////////// diff --git a/libstuff/libstuff.h b/libstuff/libstuff.h index dc39b00e1..e1e767241 100644 --- a/libstuff/libstuff.h +++ b/libstuff/libstuff.h @@ -252,12 +252,27 @@ void SLogStackTrace(int level = LOG_WARNING); // This method will allow plugins to whitelist log params they need to log. void SWhitelistLogParams(const set& params); +// Check if a log param is in the whitelist. +bool SIsLogParamWhitelisted(const string& key); + // This is a drop-in replacement for syslog that directly logs to `/run/systemd/journal/syslog` bypassing journald. void SSyslogSocketDirect(int priority, const char* format, ...); -// Atomic pointer to the syslog function that we'll actually use. Easy to change to `syslog` or `SSyslogSocketDirect`. +// No-op function to disable rsyslog logging. +void SSyslogNoop(int priority, const char* format, ...); + +// Atomic pointer to the syslog function that we'll actually use. +// Can be set to `syslog`, `SSyslogSocketDirect`, or `SSyslogNoop`. extern atomic SSyslogFunc; +// -------------------------------------------------------------------------- +// Fluentd logging with lock-free ring buffer. +// Producers push to buffer. Sender thread writes to Fluentd. +// Falls back to syslog if buffer full or Fluentd unavailable. +// -------------------------------------------------------------------------- +void SFluentdInitialize(const string& host, int port, const string& tag); +void SFluentdLog(int priority, const string& message, const STable& params = {}); + string addLogParams(string&& message, const STable& params = {}); // **NOTE: rsyslog default max line size is 8k bytes. We split on 7k byte boundaries in order to fit the syslog line prefix and the expanded \r\n to #015#012 @@ -267,11 +282,13 @@ string addLogParams(string&& message, const STable& params = {}); if (_g_SLogMask & (1 << (_PRI_))) { \ ostringstream __out; \ __out << _MSG_; \ - const string s = addLogParams(__out.str(), ## __VA_ARGS__); \ + const string __rawMsg = __out.str(); \ + const string s = addLogParams(string(__rawMsg), ## __VA_ARGS__); \ const string prefix = SWHEREAMI; \ for (size_t i = 0; i < s.size(); i += 7168) { \ (*SSyslogFunc)(_PRI_, "%s", (prefix + s.substr(i, 7168)).c_str()); \ } \ + SFluentdLog(_PRI_, prefix + __rawMsg, ## __VA_ARGS__); \ } \ } while (false) diff --git a/test/tests/SFluentdLoggerTest.cpp b/test/tests/SFluentdLoggerTest.cpp new file mode 100644 index 000000000..9ac6c768c --- /dev/null +++ b/test/tests/SFluentdLoggerTest.cpp @@ -0,0 +1,135 @@ +#include +#include +#include +#include +#include +#include + +#include +#include + +// Mock TCP server that counts newline-delimited messages +class MockServer { +public: + int port = 0; + atomic messageCount{0}; + + MockServer() + { + // Create socket + serverFd = socket(AF_INET, SOCK_STREAM, 0); + int opt = 1; + setsockopt(serverFd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + + // Bind to random port + struct sockaddr_in addr = {}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(0); + bind(serverFd, (struct sockaddr*) &addr, sizeof(addr)); + listen(serverFd, 1); + + // Get assigned port + socklen_t len = sizeof(addr); + getsockname(serverFd, (struct sockaddr*) &addr, &len); + port = ntohs(addr.sin_port); + + // Accept and count messages in background + acceptThread = thread([this]() { + int clientFd = accept(serverFd, nullptr, nullptr); + if (clientFd >= 0) { + char buf[4096]; + ssize_t n; + while ((n = recv(clientFd, buf, sizeof(buf), 0)) > 0) { + for (ssize_t i = 0; i < n; i++) { + if (buf[i] == '\n') { + messageCount++; + } + } + } + close(clientFd); + } + }); + } + + ~MockServer() + { + close(serverFd); + acceptThread.join(); + } + +private: + int serverFd; + thread acceptThread; +}; + +struct SFluentdLoggerTest : tpunit::TestFixture +{ + SFluentdLoggerTest() : tpunit::TestFixture( + "SFluentdLogger", + TEST(SFluentdLoggerTest::testBuffersWithoutServer), + TEST(SFluentdLoggerTest::testMultipleMessages), + TEST(SFluentdLoggerTest::testDrainOnShutdown) + ) + { + } + + // Test log() buffers messages even without a server + void testBuffersWithoutServer() + { + // Create logger pointing to non-existent server + SFluentdLogger logger("127.0.0.1", 59999); + + // All logs return true (buffered) + for (int i = 0; i < 100; i++) { + string json = "{\"count\":" + to_string(i) + "}\n"; + ASSERT_TRUE(logger.log(move(json))); + } + } + + // Test multiple messages are sent to server + void testMultipleMessages() + { + // Start mock server + MockServer server; + + { + // Create logger and send 50 messages + SFluentdLogger logger("127.0.0.1", server.port); + for (int i = 0; i < 50; i++) { + string json = "{\"msg\":" + to_string(i) + "}\n"; + logger.log(move(json)); + } + + // Wait for sender thread to transmit + this_thread::sleep_for(chrono::milliseconds(100)); + } + + // All messages received + ASSERT_EQUAL(server.messageCount.load(), 50); + } + + // Test destructor drains all buffered messages + void testDrainOnShutdown() + { + // Start mock server + MockServer server; + + { + // Create logger and send 100 messages + SFluentdLogger logger("127.0.0.1", server.port); + for (int i = 0; i < 100; i++) { + string json = "{\"drain\":" + to_string(i) + "}\n"; + logger.log(move(json)); + } + + // Destructor drains buffer + } + + // Wait for server to finish receiving + this_thread::sleep_for(chrono::milliseconds(50)); + + // All messages drained before shutdown + ASSERT_EQUAL(server.messageCount.load(), 100); + } +} __SFluentdLoggerTest; diff --git a/test/tests/SRingBufferTest.cpp b/test/tests/SRingBufferTest.cpp new file mode 100644 index 000000000..798b719c0 --- /dev/null +++ b/test/tests/SRingBufferTest.cpp @@ -0,0 +1,167 @@ +#include +#include +#include + +#include +#include + +struct SRingBufferTest : tpunit::TestFixture +{ + SRingBufferTest() : tpunit::TestFixture( + "SRingBuffer", + TEST(SRingBufferTest::testPushPop), + TEST(SRingBufferTest::testEmptyPop), + TEST(SRingBufferTest::testFullBuffer), + TEST(SRingBufferTest::testFIFOOrder), + TEST(SRingBufferTest::testMultiProducer), + TEST(SRingBufferTest::testProducerConsumer) + ) + { + } + + // Test basic push and pop + void testPushPop() + { + SRingBuffer buffer; + + // Push returns true + int val = 42; + ASSERT_TRUE(buffer.push(move(val))); + + // Pop returns the value + auto result = buffer.pop(); + ASSERT_TRUE(result.has_value()); + ASSERT_EQUAL(result.value(), 42); + } + + // Test pop on empty buffer + void testEmptyPop() + { + SRingBuffer buffer; + + // Pop on empty returns nullopt + auto val = buffer.pop(); + ASSERT_FALSE(val.has_value()); + } + + // Test buffer rejects push when full + void testFullBuffer() + { + SRingBuffer buffer; + + // Fill buffer to capacity + for (int i = 0; i < 5; i++) { + int val = i; + ASSERT_TRUE(buffer.push(move(val))); + } + + // Push fails when full + int val = 100; + ASSERT_FALSE(buffer.push(move(val))); + + // Pop one item + buffer.pop(); + + // Push succeeds again + val = 100; + ASSERT_TRUE(buffer.push(move(val))); + } + + // Test FIFO ordering + void testFIFOOrder() + { + SRingBuffer buffer; + + // Push 1, 2, 3 + int a = 1, b = 2, c = 3; + buffer.push(move(a)); + buffer.push(move(b)); + buffer.push(move(c)); + + // Pop in same order + ASSERT_EQUAL(buffer.pop().value(), 1); + ASSERT_EQUAL(buffer.pop().value(), 2); + ASSERT_EQUAL(buffer.pop().value(), 3); + } + + // Test multiple threads pushing concurrently + void testMultiProducer() + { + SRingBuffer buffer; + atomic pushCount{0}; + const int numThreads = 4; + const int pushesPerThread = 100; + + // Spawn producer threads + vector producers; + for (int t = 0; t < numThreads; t++) { + producers.emplace_back([&buffer, &pushCount]() { + for (int i = 0; i < 100; i++) { + int val = i; + if (buffer.push(move(val))) { + pushCount++; + } + } + }); + } + + // Wait for all producers + for (auto& t : producers) { + t.join(); + } + + // All pushes succeeded + ASSERT_EQUAL(pushCount.load(), numThreads * pushesPerThread); + + // Pop all items + int popCount = 0; + while (buffer.pop().has_value()) { + popCount++; + } + + // All items accounted for + ASSERT_EQUAL(popCount, numThreads * pushesPerThread); + } + + // Test producer-consumer pattern with wrap-around + void testProducerConsumer() + { + SRingBuffer buffer; + atomic done{false}; + atomic produced{0}; + atomic consumed{0}; + const int totalItems = 1000; + + // Producer pushes 1000 items through size-100 buffer + thread producer([&]() { + for (int i = 0; i < totalItems; i++) { + int val = i; + while (!buffer.push(move(val))) { + val = i; + this_thread::yield(); + } + produced++; + } + done = true; + }); + + // Consumer pops until done + thread consumer([&]() { + while (!done || consumed < totalItems) { + auto val = buffer.pop(); + if (val.has_value()) { + consumed++; + } else { + this_thread::yield(); + } + } + }); + + producer.join(); + consumer.join(); + + // All items produced and consumed + ASSERT_EQUAL(produced.load(), totalItems); + ASSERT_EQUAL(consumed.load(), totalItems); + } +} __SRingBufferTest;