Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Fix update with large payload task result (#2982)
Browse files Browse the repository at this point in the history
* fix update with large payload task result

* retryOperation method refactored

Co-authored-by: Aravindan Ramkumar <[email protected]>
  • Loading branch information
apanicker-nflx and aravindanr authored May 10, 2022
1 parent f5bc54f commit 8b1c50f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,23 +277,27 @@ private void finalizeTask(Task task, Throwable throwable) {

private void updateTaskResult(int count, Task task, TaskResult result, Worker worker) {
try {
TaskResult finalResult =
// upload if necessary
Optional<String> optionalExternalStorageLocation =
retryOperation(
(TaskResult taskResult) -> {
taskClient.evaluateAndUploadLargePayload(
taskResult, task.getTaskType());
return null;
},
(TaskResult taskResult) -> upload(taskResult, task.getTaskType()),
count,
result);
result,
"evaluateAndUploadLargePayload");

if (optionalExternalStorageLocation.isPresent()) {
result.setExternalOutputPayloadStoragePath(optionalExternalStorageLocation.get());
result.setOutputData(null);
}

retryOperation(
(TaskResult taskResult) -> {
taskClient.updateTask(taskResult);
return null;
},
count,
finalResult);
result,
"updateTask");
} catch (Exception e) {
worker.onErrorUpdate(task);
MetricsContainer.incrementTaskUpdateErrorCount(worker.getTaskDefName(), e);
Expand All @@ -305,14 +309,22 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo
}
}

private TaskResult retryOperation(
Function<TaskResult, Void> operation, int count, TaskResult result) {
private Optional<String> upload(TaskResult result, String taskType) {
try {
return taskClient.evaluateAndUploadLargePayload(result.getOutputData(), taskType);
} catch (IllegalArgumentException iae) {
result.setReasonForIncompletion(iae.getMessage());
result.setOutputData(null);
result.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
return Optional.empty();
}
}

private <T, R> R retryOperation(Function<T, R> operation, int count, T input, String opName) {
int index = 0;
while (index < count) {
try {
TaskResult taskResult = result.copy();
operation.apply(taskResult);
return taskResult;
return operation.apply(input);
} catch (Exception e) {
index++;
try {
Expand All @@ -322,8 +334,7 @@ private TaskResult retryOperation(
}
}
}
throw new RuntimeException(
String.format("Exhausted retries for updating task: %s", result.getTaskId()));
throw new RuntimeException("Exhausted retries performing " + opName);
}

private void handleException(Throwable t, TaskResult result, Worker worker, Task task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,10 @@ public void updateTask(TaskResult taskResult) {
postForEntityWithRequestOnly("tasks", taskResult);
}

public void evaluateAndUploadLargePayload(TaskResult taskResult, String taskType) {
Preconditions.checkNotNull(taskResult, "Task result cannot be null");
Preconditions.checkArgument(
StringUtils.isBlank(taskResult.getExternalOutputPayloadStoragePath()),
"External Storage Path must not be set");

public Optional<String> evaluateAndUploadLargePayload(
Map<String, Object> taskOutputData, String taskType) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
objectMapper.writeValue(byteArrayOutputStream, taskResult.getOutputData());
objectMapper.writeValue(byteArrayOutputStream, taskOutputData);
byte[] taskOutputBytes = byteArrayOutputStream.toByteArray();
long taskResultSize = taskOutputBytes.length;
MetricsContainer.recordTaskResultPayloadSize(taskType, taskResultSize);
Expand All @@ -257,30 +253,22 @@ public void evaluateAndUploadLargePayload(TaskResult taskResult, String taskType
|| taskResultSize
> conductorClientConfiguration.getTaskOutputMaxPayloadThresholdKB()
* 1024L) {
taskResult.setReasonForIncompletion(
throw new IllegalArgumentException(
String.format(
"The TaskResult payload size: %d is greater than the permissible %d bytes",
taskResultSize, payloadSizeThreshold));
taskResult.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
taskResult.setOutputData(null);
} else {
MetricsContainer.incrementExternalPayloadUsedCount(
taskType,
ExternalPayloadStorage.Operation.WRITE.name(),
ExternalPayloadStorage.PayloadType.TASK_OUTPUT.name());
String externalStoragePath =
uploadToExternalPayloadStorage(
ExternalPayloadStorage.PayloadType.TASK_OUTPUT,
taskOutputBytes,
taskResultSize);
taskResult.setExternalOutputPayloadStoragePath(externalStoragePath);
taskResult.setOutputData(null);
}
MetricsContainer.incrementExternalPayloadUsedCount(
taskType,
ExternalPayloadStorage.Operation.WRITE.name(),
ExternalPayloadStorage.PayloadType.TASK_OUTPUT.name());
return Optional.of(
uploadToExternalPayloadStorage(
PayloadType.TASK_OUTPUT, taskOutputBytes, taskResultSize));
}
return Optional.empty();
} catch (IOException e) {
String errorMsg =
String.format(
"Unable to update task: %s with task result", taskResult.getTaskId());
String errorMsg = String.format("Unable to update task: %s with task result", taskType);
LOGGER.error(errorMsg, e);
throw new ConductorClientException(errorMsg, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.*;

Expand Down Expand Up @@ -136,6 +137,7 @@ public TaskResult answer(InvocationOnMock invocation) {
verify(taskClient, times(3)).updateTask(any());
}

@SuppressWarnings("unchecked")
@Test
public void testLargePayloadCanFailUpdateWithRetry() throws InterruptedException {
Task task = testTask();
Expand All @@ -158,7 +160,7 @@ public void testLargePayloadCanFailUpdateWithRetry() throws InterruptedException
throw new ConductorClientException();
})
.when(taskClient)
.evaluateAndUploadLargePayload(any(TaskResult.class), any());
.evaluateAndUploadLargePayload(any(Map.class), any());

TaskPollExecutor taskPollExecutor =
new TaskPollExecutor(
Expand All @@ -181,6 +183,50 @@ public void testLargePayloadCanFailUpdateWithRetry() throws InterruptedException
verify(taskClient, times(0)).updateTask(any());
}

@Test
public void testLargePayloadLocationUpdate() throws InterruptedException {
Task task = testTask();
String largePayloadLocation = "large_payload_location";

Worker worker = mock(Worker.class);
when(worker.getPollingInterval()).thenReturn(3000);
when(worker.getTaskDefName()).thenReturn(TEST_TASK_DEF_NAME);
when(worker.execute(any())).thenReturn(new TaskResult(task));

TaskClient taskClient = Mockito.mock(TaskClient.class);
when(taskClient.pollTask(any(), any(), any())).thenReturn(task);
when(taskClient.ack(any(), any())).thenReturn(true);
//noinspection unchecked
when(taskClient.evaluateAndUploadLargePayload(any(Map.class), any()))
.thenReturn(Optional.of(largePayloadLocation));

TaskPollExecutor taskPollExecutor =
new TaskPollExecutor(
null, taskClient, 1, 3, new HashMap<>(), "test-worker-", new HashMap<>());
CountDownLatch latch = new CountDownLatch(1);

doAnswer(
invocation -> {
Object[] args = invocation.getArguments();
TaskResult result = (TaskResult) args[0];
assertNull(result.getOutputData());
assertEquals(
largePayloadLocation,
result.getExternalOutputPayloadStoragePath());
latch.countDown();
return null;
})
.when(taskClient)
.updateTask(any());

Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(
() -> taskPollExecutor.pollAndExecute(worker), 0, 1, TimeUnit.SECONDS);
latch.await();

verify(taskClient, times(1)).updateTask(any());
}

@Test
public void testTaskPollException() throws InterruptedException {
Task task = testTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,25 +311,4 @@ public static TaskResult newTaskResult(Status status) {
result.setStatus(status);
return result;
}

/**
* Copy the given task result object
*
* @return a deep copy of the task result object except the externalOutputPayloadStoragePath
* field
*/
public TaskResult copy() {
TaskResult taskResult = new TaskResult();
taskResult.setWorkflowInstanceId(workflowInstanceId);
taskResult.setTaskId(taskId);
taskResult.setReasonForIncompletion(reasonForIncompletion);
taskResult.setCallbackAfterSeconds(callbackAfterSeconds);
taskResult.setWorkerId(workerId);
taskResult.setStatus(status);
taskResult.setOutputData(outputData);
taskResult.setOutputMessage(outputMessage);
taskResult.setLogs(logs);
taskResult.setSubWorkflowId(subWorkflowId);
return taskResult;
}
}

0 comments on commit 8b1c50f

Please sign in to comment.