Skip to content

Commit

Permalink
L-849 Add maxQueueSize parameter (default 100k) to avoid memory overf…
Browse files Browse the repository at this point in the history
…low on network issues (#17)

* LogtailAppender: add maxQueueSize parameter (default 100k)

* Prevent potential dead-lock, when a blocking logger is configured
  • Loading branch information
PetrHeinz authored Jan 5, 2024
1 parent 2df85fd commit 6c4707a
Showing 1 changed file with 35 additions and 5 deletions.
40 changes: 35 additions & 5 deletions src/main/java/com/logtail/logback/LogtailAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class LogtailAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
protected List<String> mdcFields = new ArrayList<>();
protected List<String> mdcTypes = new ArrayList<>();

protected int maxQueueSize = 100000;
protected int batchSize = 1000;
protected int batchInterval = 3000;
protected int connectTimeout = 5000;
Expand All @@ -48,6 +49,7 @@ public class LogtailAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
protected Vector<ILoggingEvent> batch = new Vector<>();
protected AtomicBoolean isFlushing = new AtomicBoolean(false);
protected boolean mustReflush = false;
protected boolean warnAboutMaxQueueSize = true;

// Utils
protected ScheduledExecutorService scheduledExecutorService;
Expand Down Expand Up @@ -83,23 +85,40 @@ protected void append(ILoggingEvent event) {
return;

if (this.ingestUrl.isEmpty() || this.sourceToken == null || this.sourceToken.isEmpty()) {
errorLog.warn("Missing Source token for Better Stack - disabling LogtailAppender. Find out how to fix this at: https://betterstack.com/docs/logs/java ");
// Prevent potential dead-lock, when a blocking logger is configured - avoid using errorLog directly in append
startThread("logtail-warning-logger", () -> {
errorLog.warn("Missing Source token for Better Stack - disabling LogtailAppender. Find out how to fix this at: https://betterstack.com/docs/logs/java ");
});
this.disabled = true;
return;
}

batch.add(event);
if (batch.size() < maxQueueSize) {
batch.add(event);
}

if (warnAboutMaxQueueSize && batch.size() == maxQueueSize) {
this.warnAboutMaxQueueSize = false;
// Prevent potential dead-lock, when a blocking logger is configured - avoid using errorLog directly in append
startThread("logtail-error-logger", () -> {
errorLog.error("Maximum number of messages in queue reached ({}). New messages will be dropped.", maxQueueSize);
});
}

if (batch.size() >= batchSize) {
if (isFlushing.get())
return;

Thread thread = Executors.defaultThreadFactory().newThread(new LogtailSender());
thread.setName("logtail-appender-flush");
thread.start();
startThread("logtail-appender-flush", new LogtailSender());
}
}

protected void startThread(String threadName, Runnable runnable) {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
thread.setName(threadName);
thread.start();
}

protected void flush() {
if (batch.isEmpty())
return;
Expand All @@ -119,6 +138,7 @@ protected void flush() {
}

batch.subList(0, flushedSize).clear();
this.warnAboutMaxQueueSize = true;
} catch (JsonProcessingException e) {
errorLog.error("Error processing JSON data : {}", e.getMessage(), e);

Expand Down Expand Up @@ -341,6 +361,16 @@ public void setMdcTypes(String mdcTypes) {
this.mdcTypes = Arrays.asList(mdcTypes.split(","));
}

/**
* Sets the maximum number of messages in the queue. Messages over the limit will be dropped.
*
* @param maxQueueSize
* max size of the message queue
*/
public void setMaxQueueSize(int maxQueueSize) {
this.maxQueueSize = maxQueueSize;
}

/**
* Sets the batch size for the number of messages to be sent via the API
*
Expand Down

0 comments on commit 6c4707a

Please sign in to comment.