Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Set size limit for task exec log and add metrics (#3096)
Browse files Browse the repository at this point in the history
* Set size limit for task exec log and add metrics

* refactor
  • Loading branch information
jxu-nflx authored Jul 8, 2022
1 parent 7882c61 commit 036c0b2
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ public class ConductorProperties {
@DataSizeUnit(DataUnit.KILOBYTES)
private DataSize maxWorkflowVariablesPayloadSizeThreshold = DataSize.ofKilobytes(256L);

/** Used to limit the size of task execution logs. */
private int taskExecLogSizeLimit = 10;

public String getStack() {
return stack;
}
Expand Down Expand Up @@ -510,6 +513,14 @@ public void setMaxWorkflowVariablesPayloadSizeThreshold(
this.maxWorkflowVariablesPayloadSizeThreshold = maxWorkflowVariablesPayloadSizeThreshold;
}

public int getTaskExecLogSizeLimit() {
return taskExecLogSizeLimit;
}

public void setTaskExecLogSizeLimit(int taskExecLogSizeLimit) {
this.taskExecLogSizeLimit = taskExecLogSizeLimit;
}

/**
* @return Returns all the configurations in a map.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,17 @@ public boolean exceedsRateLimitPerFrequency(TaskModel task, TaskDef taskDef) {
}

public void addTaskExecLog(List<TaskExecLog> logs) {
if (properties.isTaskExecLogIndexingEnabled()) {
if (properties.isTaskExecLogIndexingEnabled() && !logs.isEmpty()) {
Monitors.recordTaskExecLogSize(logs.size());
int taskExecLogSizeLimit = properties.getTaskExecLogSizeLimit();
if (logs.size() > taskExecLogSizeLimit) {
LOGGER.warn(
"Task Execution log size: {} for taskId: {} exceeds the limit: {}",
logs.size(),
logs.get(0).getTaskId(),
taskExecLogSizeLimit);
logs = logs.stream().limit(taskExecLogSizeLimit).collect(Collectors.toList());
}
if (properties.isAsyncIndexingEnabled()) {
indexDAO.asyncAddTaskExecutionLogs(logs);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,4 +569,8 @@ public static void recordEventQueuePollSize(String queueType, int val) {
public static void recordQueueMessageRepushFromRepairService(String queueName) {
counter(classQualifier, "queue_message_repushed", "queueName", queueName);
}

public static void recordTaskExecLogSize(int val) {
gauge(classQualifier, "task_exec_log_size", val);
}
}

0 comments on commit 036c0b2

Please sign in to comment.