-
Notifications
You must be signed in to change notification settings - Fork 105
Add JSON logging to Fluentd TCP socket for Bedrock #2499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f2c6b8f
f6a29a9
173dad8
ee6cb1f
31b65ce
be02f40
1c7f6cc
3f74803
0a31da8
15d6e4f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -280,6 +280,87 @@ void SSyslogSocketDirect(int priority, const char* format, ...) | |||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| void SSyslogNoop(int priority, const char* format, ...) | ||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| static int SFluentdSocketFD = -1; | ||||||||||||||||||||||||||||||
| static mutex SFluentdSocketMutex; | ||||||||||||||||||||||||||||||
| static string SFluentdHost; | ||||||||||||||||||||||||||||||
| static int SFluentdPort = 0; | ||||||||||||||||||||||||||||||
| static string SFluentdTag; | ||||||||||||||||||||||||||||||
| static atomic<bool> SFluentdConfigured{false}; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Lock parameter enforces mutex is held before calling this function. | ||||||||||||||||||||||||||||||
| static bool SFluentdConnect(const lock_guard<mutex>&) | ||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||
| SFluentdSocketFD = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); | ||||||||||||||||||||||||||||||
| if (SFluentdSocketFD == -1) { | ||||||||||||||||||||||||||||||
| return false; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| struct sockaddr_in addr; | ||||||||||||||||||||||||||||||
| memset(&addr, 0, sizeof(addr)); | ||||||||||||||||||||||||||||||
| addr.sin_family = AF_INET; | ||||||||||||||||||||||||||||||
| addr.sin_port = htons(SFluentdPort); | ||||||||||||||||||||||||||||||
| inet_pton(AF_INET, SFluentdHost.c_str(), &addr.sin_addr); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if (connect(SFluentdSocketFD, (struct sockaddr*) &addr, sizeof(addr)) == -1) { | ||||||||||||||||||||||||||||||
| close(SFluentdSocketFD); | ||||||||||||||||||||||||||||||
| SFluentdSocketFD = -1; | ||||||||||||||||||||||||||||||
| return false; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| return true; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| void SFluentdInitialize(const string& host, int port, const string& tag) | ||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||
| lock_guard<mutex> lock(SFluentdSocketMutex); | ||||||||||||||||||||||||||||||
| SFluentdHost = host; | ||||||||||||||||||||||||||||||
| SFluentdPort = port; | ||||||||||||||||||||||||||||||
| SFluentdTag = tag; | ||||||||||||||||||||||||||||||
| SFluentdConnect(lock); | ||||||||||||||||||||||||||||||
| SFluentdConfigured.store(true); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
Comment on lines
+317
to
+325
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| void SFluentdLog(int priority, const string& message, const STable& params) | ||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||
| if (!SFluentdConfigured.load()) { | ||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Build JSON before acquiring lock to avoid doing heavy stuff in the critical section | ||||||||||||||||||||||||||||||
| STable record; | ||||||||||||||||||||||||||||||
| record["timestamp"] = to_string(time(nullptr)); | ||||||||||||||||||||||||||||||
| record["priority"] = to_string(priority); | ||||||||||||||||||||||||||||||
| record["thread_name"] = SThreadLogName; | ||||||||||||||||||||||||||||||
| record["thread_prefix"] = SThreadLogPrefix; | ||||||||||||||||||||||||||||||
| record["process"] = SProcessName; | ||||||||||||||||||||||||||||||
| record["message"] = message; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| for (const auto& [key, value] : params) { | ||||||||||||||||||||||||||||||
| record[key] = value; | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
| record[key] = value; | |
| // Prevent user-supplied parameters from overwriting reserved metadata fields. | |
| if (!record.count(key)) { | |
| record[key] = value; | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redact params before writing Fluentd records
SFluentdLog currently copies every entry from params directly into the JSON record, which bypasses the existing redaction path used by addLogParams (libstuff/SLog.cpp lines 81-98). When -logDestination is fluentd or both, any sensitive fields passed in logging params (that are intentionally redacted in rsyslog output) will be emitted in cleartext to Fluentd, creating a data-leak regression.
Useful? React with 👍 / 👎.
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Fluentd tag is injected into the JSON payload via string concatenation (["" + SFluentdTag + "", ...]) without JSON escaping. If -fluentdTag contains quotes/backslashes/control characters, the emitted JSON becomes invalid and can be abused for log injection. Build the outer JSON using the existing JSON helpers (e.g., SToJSON(SFluentdTag, /*forceString=*/true)) so the tag is correctly escaped.
| string json = "[\"" + SFluentdTag + "\"," + to_string(time(nullptr)) + "," + SComposeJSONObject(record) + "]\n"; | |
| string json = "[" + SToJSON(SFluentdTag, /*forceString=*/true) + "," + to_string(time(nullptr)) + "," + SComposeJSONObject(record) + "]\n"; |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SFluentdLog calls time(nullptr) twice (once for record["timestamp"] and again for the Fluentd event time). If a second boundary is crossed between calls, the two timestamps can disagree. Capture the timestamp once and reuse it for both fields to keep the record consistent.
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New behavior builds a specific Fluentd JSON frame ([tag, time, record]\n) and reconnection logic, but there are existing libstuff unit tests (e.g., JSON helpers) and nothing here validates the emitted frame or escaping. Consider adding a unit test that exercises the JSON payload composition (including escaping and reserved-field behavior) without requiring a real Fluentd instance (e.g., by extracting payload formatting into a helper function).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle partial TCP writes when sending Fluentd JSON
The send path treats any return value other than -1 as success, but send() on a TCP socket may return a short byte count under backpressure; in that case the remaining bytes are dropped and the emitted Fluentd frame is truncated. This can corrupt JSON log records intermittently for larger messages or busy sockets, even though no reconnect/error path is triggered.
Useful? React with 👍 / 👎.
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send() on a TCP socket can return a short write; the current code treats any non--1 return as success, which can truncate the JSON frame and break Fluentd parsing. Track the number of bytes sent and loop until the full buffer is written, or treat partial writes as an error and reconnect.
| if (send(SFluentdSocketFD, json.c_str(), json.size(), MSG_NOSIGNAL) == -1) { | |
| close(SFluentdSocketFD); | |
| SFluentdSocketFD = -1; | |
| size_t totalSent = 0; | |
| const size_t totalSize = json.size(); | |
| while (totalSent < totalSize) { | |
| ssize_t bytesSent = send(SFluentdSocketFD, json.c_str() + totalSent, totalSize - totalSent, MSG_NOSIGNAL); | |
| if (bytesSent <= 0) { | |
| // Error or connection closed; close the socket so it will reconnect on the next attempt | |
| close(SFluentdSocketFD); | |
| SFluentdSocketFD = -1; | |
| break; | |
| } | |
| totalSent += static_cast<size_t>(bytesSent); |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -255,9 +255,23 @@ void SWhitelistLogParams(const set<string>& params); | |||||
| // This is a drop-in replacement for syslog that directly logs to `/run/systemd/journal/syslog` bypassing journald. | ||||||
| void SSyslogSocketDirect(int priority, const char* format, ...); | ||||||
|
|
||||||
| // Atomic pointer to the syslog function that we'll actually use. Easy to change to `syslog` or `SSyslogSocketDirect`. | ||||||
| // No-op function to disable rsyslog logging. | ||||||
| void SSyslogNoop(int priority, const char* format, ...); | ||||||
|
|
||||||
| // Atomic pointer to the syslog function that we'll actually use. | ||||||
| // Can be set to `syslog`, `SSyslogSocketDirect`, or `SSyslogNoop`. | ||||||
| extern atomic<void (*)(int priority, const char* format, ...)> SSyslogFunc; | ||||||
|
|
||||||
| // -------------------------------------------------------------------------- | ||||||
| // Fluentd JSON logging stuff | ||||||
| // -------------------------------------------------------------------------- | ||||||
| // Initialize Fluentd TCP socket connection. Call once at startup. | ||||||
| void SFluentdInitialize(const string& host, int port, const string& tag); | ||||||
|
||||||
| void SFluentdInitialize(const string& host, int port, const string& tag); | |
| // Performs best-effort initialization. If initialization fails, SFluentdLog will be a no-op. |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When -logDestination is set to fluentd, SSyslogFunc becomes SSyslogNoop, but this macro still builds s, appends params, and loops over 7k chunks calling the no-op function. This adds avoidable per-log CPU overhead in fluentd-only mode; consider gating the rsyslog formatting/chunking path behind a separate enabled flag (or skip the chunk loop when rsyslog is disabled).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Connection setup uses
inet_pton(AF_INET, SFluentdHost.c_str(), ...)and ignores its return value, so hostnames likefluentd.serviceare parsed as invalid and leave the destination as0.0.0.0; every connect attempt then fails silently. Since-fluentdHostis exposed as a host option, this breaks Fluentd logging whenever operators provide a DNS/service name instead of a raw IPv4 literal.Useful? React with 👍 / 👎.