Add JSON logging to Fluentd TCP socket for Bedrock#2499
Add JSON logging to Fluentd TCP socket for Bedrock#2499rushatgabhane wants to merge 10 commits intoExpensify:mainfrom
Conversation
|
@MelvinBot review this PR please |
1 similar comment
|
@MelvinBot review this PR please |
There was a problem hiding this comment.
Pull request overview
Adds optional Fluentd TCP JSON logging to Bedrock so logs can be forwarded without regex parsing, with support for -logDestination rsyslog|fluentd|both and basic reconnect behavior.
Changes:
- Introduces a Fluentd TCP logging backend (
SFluentdInitialize,SFluentdLog) that emits JSON frames. - Adds
SSyslogNoopto disable rsyslog logging when Fluentd-only is selected. - Updates Bedrock startup arg handling to configure Fluentd and optionally disable rsyslog.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 10 comments.
| File | Description |
|---|---|
| libstuff/libstuff.h | Adds Fluentd logging declarations; updates SSYSLOG macro to also emit to Fluentd; introduces SSyslogNoop. |
| libstuff/libstuff.cpp | Implements Fluentd TCP connection + JSON payload emission; adds no-op syslog implementation. |
| BedrockServer.cpp | Adds -logDestination and Fluentd config args, wiring initialization and rsyslog disabling. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| SFluentdSocketFD = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); | ||
| if (SFluentdSocketFD == -1) { | ||
| return false; | ||
| } | ||
|
|
||
| struct sockaddr_in addr; | ||
| memset(&addr, 0, sizeof(addr)); | ||
| addr.sin_family = AF_INET; | ||
| addr.sin_port = htons(SFluentdPort); | ||
| inet_pton(AF_INET, SFluentdHost.c_str(), &addr.sin_addr); | ||
|
|
||
| if (connect(SFluentdSocketFD, (struct sockaddr*)&addr, sizeof(addr)) == -1) { | ||
| close(SFluentdSocketFD); | ||
| SFluentdSocketFD = -1; | ||
| return false; | ||
| } | ||
|
|
There was a problem hiding this comment.
SFluentdConnect calls inet_pton(AF_INET, SFluentdHost.c_str(), …) without checking the return value, and it only supports numeric IPv4 literals. Passing a hostname like localhost (or IPv6) will silently produce an invalid address and connection attempts will fail. Use getaddrinfo (already available via <netdb.h>) or at least validate inet_pton and fail fast with SFluentdSocketFD = -1 when parsing fails.
| SFluentdSocketFD = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); | |
| if (SFluentdSocketFD == -1) { | |
| return false; | |
| } | |
| struct sockaddr_in addr; | |
| memset(&addr, 0, sizeof(addr)); | |
| addr.sin_family = AF_INET; | |
| addr.sin_port = htons(SFluentdPort); | |
| inet_pton(AF_INET, SFluentdHost.c_str(), &addr.sin_addr); | |
| if (connect(SFluentdSocketFD, (struct sockaddr*)&addr, sizeof(addr)) == -1) { | |
| close(SFluentdSocketFD); | |
| SFluentdSocketFD = -1; | |
| return false; | |
| } | |
| struct addrinfo hints; | |
| memset(&hints, 0, sizeof(hints)); | |
| hints.ai_family = AF_INET; // Preserve existing IPv4-only behavior | |
| hints.ai_socktype = SOCK_STREAM; | |
| hints.ai_protocol = IPPROTO_TCP; | |
| struct addrinfo* res = nullptr; | |
| string portStr = to_string(SFluentdPort); | |
| int rc = getaddrinfo(SFluentdHost.c_str(), portStr.c_str(), &hints, &res); | |
| if (rc != 0 || !res) { | |
| SFluentdSocketFD = -1; | |
| if (res) { | |
| freeaddrinfo(res); | |
| } | |
| return false; | |
| } | |
| SFluentdSocketFD = socket(res->ai_family, res->ai_socktype, res->ai_protocol); | |
| if (SFluentdSocketFD == -1) { | |
| freeaddrinfo(res); | |
| return false; | |
| } | |
| if (connect(SFluentdSocketFD, res->ai_addr, res->ai_addrlen) == -1) { | |
| close(SFluentdSocketFD); | |
| SFluentdSocketFD = -1; | |
| freeaddrinfo(res); | |
| return false; | |
| } | |
| freeaddrinfo(res); |
| record[key] = value; | ||
| } | ||
|
|
||
| string json = "[\"" + SFluentdTag + "\"," + to_string(time(nullptr)) + "," + SComposeJSONObject(record) + "]\n"; |
There was a problem hiding this comment.
The Fluentd tag is injected into the JSON payload via string concatenation (["" + SFluentdTag + "", ...]) without JSON escaping. If -fluentdTag contains quotes/backslashes/control characters, the emitted JSON becomes invalid and can be abused for log injection. Build the outer JSON using the existing JSON helpers (e.g., SToJSON(SFluentdTag, /*forceString=*/true)) so the tag is correctly escaped.
| string json = "[\"" + SFluentdTag + "\"," + to_string(time(nullptr)) + "," + SComposeJSONObject(record) + "]\n"; | |
| string json = "[" + SToJSON(SFluentdTag, /*forceString=*/true) + "," + to_string(time(nullptr)) + "," + SComposeJSONObject(record) + "]\n"; |
| if (send(SFluentdSocketFD, json.c_str(), json.size(), MSG_NOSIGNAL) == -1) { | ||
| close(SFluentdSocketFD); | ||
| SFluentdSocketFD = -1; |
There was a problem hiding this comment.
send() on a TCP socket can return a short write; the current code treats any non--1 return as success, which can truncate the JSON frame and break Fluentd parsing. Track the number of bytes sent and loop until the full buffer is written, or treat partial writes as an error and reconnect.
| if (send(SFluentdSocketFD, json.c_str(), json.size(), MSG_NOSIGNAL) == -1) { | |
| close(SFluentdSocketFD); | |
| SFluentdSocketFD = -1; | |
| size_t totalSent = 0; | |
| const size_t totalSize = json.size(); | |
| while (totalSent < totalSize) { | |
| ssize_t bytesSent = send(SFluentdSocketFD, json.c_str() + totalSent, totalSize - totalSent, MSG_NOSIGNAL); | |
| if (bytesSent <= 0) { | |
| // Error or connection closed; close the socket so it will reconnect on the next attempt | |
| close(SFluentdSocketFD); | |
| SFluentdSocketFD = -1; | |
| break; | |
| } | |
| totalSent += static_cast<size_t>(bytesSent); |
| void SFluentdInitialize(const string& host, int port, const string& tag) | ||
| { | ||
| lock_guard<mutex> lock(SFluentdSocketMutex); | ||
| SFluentdHost = host; | ||
| SFluentdPort = port; | ||
| SFluentdTag = tag; | ||
| SFluentdConnect(lock); | ||
| SFluentdConfigured.store(true); | ||
| } |
There was a problem hiding this comment.
SFluentdInitialize / SFluentdConnect will overwrite SFluentdSocketFD with a new socket() result without closing any previously-open Fluentd socket. If initialization can happen more than once in a process (e.g., tests, reconfiguration), this will leak file descriptors. Close the existing FD (if != -1) before creating a new socket, or make initialization explicitly one-shot and enforce it.
| record["timestamp"] = to_string(time(nullptr)); | ||
| record["priority"] = to_string(priority); | ||
| record["thread_name"] = SThreadLogName; | ||
| record["thread_prefix"] = SThreadLogPrefix; | ||
| record["process"] = SProcessName; | ||
| record["message"] = message; | ||
|
|
||
| for (const auto& [key, value] : params) { | ||
| record[key] = value; | ||
| } | ||
|
|
||
| string json = "[\"" + SFluentdTag + "\"," + to_string(time(nullptr)) + "," + SComposeJSONObject(record) + "]\n"; |
There was a problem hiding this comment.
SFluentdLog calls time(nullptr) twice (once for record["timestamp"] and again for the Fluentd event time). If a second boundary is crossed between calls, the two timestamps can disagree. Capture the timestamp once and reuse it for both fields to keep the record consistent.
| // Fluentd JSON logging stuff | ||
| // -------------------------------------------------------------------------- | ||
| // Initialize Fluentd TCP socket connection. Call once at startup. | ||
| void SFluentdInitialize(const string& host, int port, const string& tag); |
There was a problem hiding this comment.
The comment for SFluentdInitialize says it “Returns true on success”, but the function is declared/defined as void and there’s no success signal. Either change the signature to return bool (and actually reflect connection success), or update the comment to match the current behavior (e.g., best-effort configure + lazy reconnect).
| void SFluentdInitialize(const string& host, int port, const string& tag); | |
| // Performs best-effort initialization. If initialization fails, SFluentdLog will be a no-op. |
| const string s = addLogParams(string(__rawMsg), ## __VA_ARGS__); \ | ||
| const string prefix = SWHEREAMI; \ | ||
| for (size_t i = 0; i < s.size(); i += 7168) { \ | ||
| (*SSyslogFunc)(_PRI_, "%s", (prefix + s.substr(i, 7168)).c_str()); \ | ||
| } \ | ||
| SFluentdLog(_PRI_, prefix + __rawMsg, ## __VA_ARGS__); \ | ||
| } \ | ||
| } while (false) |
There was a problem hiding this comment.
When -logDestination is set to fluentd, SSyslogFunc becomes SSyslogNoop, but this macro still builds s, appends params, and loops over 7k chunks calling the no-op function. This adds avoidable per-log CPU overhead in fluentd-only mode; consider gating the rsyslog formatting/chunking path behind a separate enabled flag (or skip the chunk loop when rsyslog is disabled).
| record["message"] = message; | ||
|
|
||
| for (const auto& [key, value] : params) { | ||
| record[key] = value; |
There was a problem hiding this comment.
When merging params into record, user-supplied keys can overwrite reserved metadata fields like timestamp, priority, process, or even message. To keep the structured log schema stable, consider preventing overwrites (only insert when absent) or namespacing user params under a dedicated sub-object key (e.g., params).
| record[key] = value; | |
| // Prevent user-supplied parameters from overwriting reserved metadata fields. | |
| if (!record.count(key)) { | |
| record[key] = value; | |
| } |
| // Build JSON before acquiring lock to avoid doing heavy stuff in the critical section | ||
| STable record; | ||
| record["timestamp"] = to_string(time(nullptr)); | ||
| record["priority"] = to_string(priority); | ||
| record["thread_name"] = SThreadLogName; | ||
| record["thread_prefix"] = SThreadLogPrefix; | ||
| record["process"] = SProcessName; | ||
| record["message"] = message; | ||
|
|
||
| for (const auto& [key, value] : params) { | ||
| record[key] = value; | ||
| } | ||
|
|
||
| string json = "[\"" + SFluentdTag + "\"," + to_string(time(nullptr)) + "," + SComposeJSONObject(record) + "]\n"; |
There was a problem hiding this comment.
New behavior builds a specific Fluentd JSON frame ([tag, time, record]\n) and reconnection logic, but there are existing libstuff unit tests (e.g., JSON helpers) and nothing here validates the emitted frame or escaping. Consider adding a unit test that exercises the JSON payload composition (including escaping and reserved-field behavior) without requiring a real Fluentd instance (e.g., by extracting payload formatting into a helper function).
| SFluentdSocketFD = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); | ||
| if (SFluentdSocketFD == -1) { | ||
| return false; | ||
| } | ||
|
|
||
| struct sockaddr_in addr; | ||
| memset(&addr, 0, sizeof(addr)); | ||
| addr.sin_family = AF_INET; | ||
| addr.sin_port = htons(SFluentdPort); | ||
| inet_pton(AF_INET, SFluentdHost.c_str(), &addr.sin_addr); | ||
|
|
||
| if (connect(SFluentdSocketFD, (struct sockaddr*)&addr, sizeof(addr)) == -1) { | ||
| close(SFluentdSocketFD); |
There was a problem hiding this comment.
The Fluentd socket is created in blocking mode and connect()/send() are called while holding SFluentdSocketMutex. If Fluentd is slow/unreachable, these operations can block and stall logging (and potentially threads that are trying to log). Consider setting reasonable connect/send timeouts (e.g., SO_SNDTIMEO / non-blocking connect with poll) and/or avoiding holding the mutex across potentially-blocking syscalls.
|
@MelvinBot review this PR please |
|
@MelvinBot please review this PR |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 15d6e4f078
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| for (const auto& [key, value] : params) { | ||
| record[key] = value; |
There was a problem hiding this comment.
Redact params before writing Fluentd records
SFluentdLog currently copies every entry from params directly into the JSON record, which bypasses the existing redaction path used by addLogParams (libstuff/SLog.cpp lines 81-98). When -logDestination is fluentd or both, any sensitive fields passed in logging params (that are intentionally redacted in rsyslog output) will be emitted in cleartext to Fluentd, creating a data-leak regression.
Useful? React with 👍 / 👎.
| } | ||
|
|
||
| // Try to send the log over TCP. Close the socket on failure. It'll try to reconnect on next attempt | ||
| if (send(SFluentdSocketFD, json.c_str(), json.size(), MSG_NOSIGNAL) == -1) { |
There was a problem hiding this comment.
Handle partial TCP writes when sending Fluentd JSON
The send path treats any return value other than -1 as success, but send() on a TCP socket may return a short byte count under backpressure; in that case the remaining bytes are dropped and the emitted Fluentd frame is truncated. This can corrupt JSON log records intermittently for larger messages or busy sockets, even though no reconnect/error path is triggered.
Useful? React with 👍 / 👎.
| memset(&addr, 0, sizeof(addr)); | ||
| addr.sin_family = AF_INET; | ||
| addr.sin_port = htons(SFluentdPort); | ||
| inet_pton(AF_INET, SFluentdHost.c_str(), &addr.sin_addr); |
There was a problem hiding this comment.
Resolve non-literal fluentdHost values before connect
Connection setup uses inet_pton(AF_INET, SFluentdHost.c_str(), ...) and ignores its return value, so hostnames like fluentd.service are parsed as invalid and leave the destination as 0.0.0.0; every connect attempt then fails silently. Since -fluentdHost is exposed as a host option, this breaks Fluentd logging whenever operators provide a DNS/service name instead of a raw IPv4 literal.
Useful? React with 👍 / 👎.
Details
adds the ability to log directly to Fluentd's TCP socket in native JSON format. Since logs are pre formatted as JSON, fluentd can forward them without regex parsing : )
Recconnects on failure.
Supports three log destinations via
-logDestinationFluentd options:
Fixed Issues
Fixes https://github.com/Expensify/Expensify/issues/594121
Tests
Internal Testing Reminder: when changing bedrock, please compile auth against your new changes