Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Nov 2, 2018
2 parents 5ef6975 + ecc8936 commit d38bdd5
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,23 +355,20 @@ private void pollForTask(Worker worker) {
private void execute(Worker worker, Task task) {
String taskType = task.getTaskDefName();
try {

if(!worker.preAck(task)) {
logger.debug("Worker decided not to ack the task {}, taskId = {}", taskType, task.getTaskId());
return;
}

if (!taskClient.ack(task.getTaskId(), worker.getIdentity())) {
WorkflowTaskMetrics.incrementTaskAckFailedCount(worker.getTaskDefName());
logger.error("Ack failed for {}, taskId = {}", taskType, task.getTaskId());
returnTask(worker, task);
return;
}
logger.debug("Ack successful for {}, taskId = {}", taskType, task.getTaskId());

} catch (Exception e) {
logger.error(String.format("ack exception for task %s, taskId = %s in worker - %s", task.getTaskDefName(), task.getTaskId(), worker.getIdentity()), e);
WorkflowTaskMetrics.incrementTaskAckErrorCount(worker.getTaskDefName(), e);
returnTask(worker, task);
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -13,9 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
*
*/
package com.netflix.conductor.client.task;

import com.google.common.collect.ImmutableList;
Expand All @@ -24,6 +21,7 @@
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -54,7 +52,7 @@ public void testNoWorkersException() {

@Test
public void testThreadPool() {
Worker worker = Worker.create("test", (Task task)-> new TaskResult(task));
Worker worker = Worker.create("test", TaskResult::new);
WorkflowTaskCoordinator coordinator = new WorkflowTaskCoordinator.Builder().withWorkers(worker, worker, worker).withTaskClient(new TaskClient()).build();
assertEquals(-1, coordinator.getThreadCount()); //Not initialized yet
coordinator.init();
Expand Down Expand Up @@ -114,7 +112,7 @@ public void testTaskException() {
}

@Test
public void testReturnTaskWhenAckFailed() {
public void testNoOpWhenAckFailed() {
Worker worker = mock(Worker.class);
when(worker.getPollingInterval()).thenReturn(1000);
when(worker.getPollCount()).thenReturn(1);
Expand All @@ -135,27 +133,16 @@ public void testReturnTaskWhenAckFailed() {
testTask.setStatus(Task.Status.IN_PROGRESS);
when(client.batchPollTasksInDomain(anyString(), anyString(), anyString(), anyInt(), anyInt())).thenReturn(ImmutableList.of(testTask));
when(client.ack(anyString(), anyString())).thenReturn(false);
CountDownLatch latch = new CountDownLatch(1);

doAnswer(invocation -> {
Object[] args = invocation.getArguments();
TaskResult result = (TaskResult) args[0];
assertEquals(TaskResult.Status.IN_PROGRESS, result.getStatus());
latch.countDown();
return null;
}
).when(client).updateTask(any(), anyString());

coordinator.init();
Uninterruptibles.awaitUninterruptibly(latch);

// then worker.execute must not be called and task must be updated with IN_PROGRESS status
verify(worker, never()).execute(any());
Mockito.verify(client).updateTask(any(), anyString());
verify(client, never()).updateTask(any(), any());
}

@Test
public void testReturnTaskWhenAckThrowsException() {
public void testNoOpWhenAckThrowsException() {
Worker worker = mock(Worker.class);
when(worker.getPollingInterval()).thenReturn(1000);
when(worker.getPollCount()).thenReturn(1);
Expand All @@ -176,24 +163,12 @@ public void testReturnTaskWhenAckThrowsException() {
testTask.setStatus(Task.Status.IN_PROGRESS);
when(client.batchPollTasksInDomain(anyString(), anyString(), anyString(), anyInt(), anyInt())).thenReturn(ImmutableList.of(testTask));
when(client.ack(anyString(), anyString())).thenThrow(new RuntimeException("Ack failed"));
CountDownLatch latch = new CountDownLatch(1);

doAnswer(invocation -> {
assertEquals("test-worker-0", Thread.currentThread().getName());
Object[] args = invocation.getArguments();
TaskResult result = (TaskResult) args[0];
assertEquals(TaskResult.Status.IN_PROGRESS, result.getStatus());
latch.countDown();
return null;
}
).when(client).updateTask(any(), anyString());

coordinator.init();
Uninterruptibles.awaitUninterruptibly(latch);

// then worker.execute must not be called and task must be updated with IN_PROGRESS status
verify(worker, never()).execute(any());
Mockito.verify(client).updateTask(any(), anyString());
verify(client, never()).updateTask(any(), any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
*/
package com.netflix.conductor.common.metadata.workflow;

import com.github.vmg.protogen.annotations.ProtoField;
import com.github.vmg.protogen.annotations.ProtoMessage;
import com.google.common.base.MoreObjects;
import com.netflix.conductor.common.metadata.Auditable;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
Expand All @@ -26,11 +31,6 @@
import java.util.Objects;
import java.util.Optional;

import com.github.vmg.protogen.annotations.ProtoField;
import com.github.vmg.protogen.annotations.ProtoMessage;
import com.google.common.base.MoreObjects;
import com.netflix.conductor.common.metadata.Auditable;

/**
* @author Viren
*
Expand Down Expand Up @@ -66,6 +66,9 @@ public class WorkflowDef extends Auditable {
@ProtoField(id = 9)
private boolean restartable = true;

@ProtoField(id = 10)
private boolean workflowStatusListenerEnabled = false;

/**
* @return the name
*/
Expand Down Expand Up @@ -201,6 +204,22 @@ public void setSchemaVersion(int schemaVersion) {
this.schemaVersion = schemaVersion;
}

/**
*
* @return true is workflow listener will be invoked when workflow gets into a terminal state
*/
public boolean isWorkflowStatusListenerEnabled() {
return workflowStatusListenerEnabled;
}

/**
* Specify if workflow listener is enabled to invoke a callback for completed or terminated workflows
* @param workflowStatusListenerEnabled
*/
public void setWorkflowStatusListenerEnabled(boolean workflowStatusListenerEnabled) {
this.workflowStatusListenerEnabled = workflowStatusListenerEnabled;
}

public String key(){
return getKey(name, version);
}
Expand Down Expand Up @@ -287,6 +306,7 @@ public String toString() {
.add("failureWorkflow", failureWorkflow)
.add("schemaVersion", schemaVersion)
.add("restartable", restartable)
.add("workflowStatusListenerEnabled", workflowStatusListenerEnabled)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class WorkflowExecutor {
private final MetadataMapperService metadataMapperService;
private final ParametersUtils parametersUtils;

private WorkflowStatusListener workflowStatusListener;

private int activeWorkerLastPollInSecs;

public static final String DECIDER_QUEUE = "_deciderQueue";
Expand All @@ -103,6 +105,7 @@ public WorkflowExecutor(
QueueDAO queueDAO,
MetadataMapperService metadataMapperService,
ParametersUtils parametersUtils,
WorkflowStatusListener workflowStatusListener,
Configuration config
) {
this.deciderService = deciderService;
Expand All @@ -113,6 +116,7 @@ public WorkflowExecutor(
this.metadataMapperService = metadataMapperService;
this.activeWorkerLastPollInSecs = config.getIntProperty("tasks.active.worker.lastpoll", 10);
this.parametersUtils = parametersUtils;
this.workflowStatusListener = workflowStatusListener;
}

/**
Expand Down Expand Up @@ -535,6 +539,10 @@ void completeWorkflow(Workflow wf) {
Monitors.recordWorkflowCompletion(workflow.getWorkflowName(), workflow.getEndTime() - workflow.getStartTime(), wf.getOwnerApp());
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId()); //remove from the sweep queue
logger.debug("Removed workflow {} from decider queue", wf.getWorkflowId());

if(wf.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
workflowStatusListener.onWorkflowCompleted(wf);
}
}

public void terminateWorkflow(String workflowId, String reason) {
Expand Down Expand Up @@ -629,6 +637,10 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo

// Send to atlas
Monitors.recordWorkflowTermination(workflow.getWorkflowName(), workflow.getStatus(), workflow.getOwnerApp());

if(workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
workflowStatusListener.onWorkflowTerminated(workflow);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.netflix.conductor.core.execution;

import com.google.inject.AbstractModule;

/**
* Default implementation for the workflow status listener
*
*/
public class WorkflowExecutorModule extends AbstractModule {
@Override
protected void configure() {
bind(WorkflowStatusListener.class).to(WorkflowStatusListenerStub.class);//default implementation
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.netflix.conductor.core.execution;

import com.netflix.conductor.common.run.Workflow;

/**
* Listener for the completed and terminated workflows
*
*/
public interface WorkflowStatusListener {
void onWorkflowCompleted(Workflow workflow);
void onWorkflowTerminated(Workflow workflow);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.netflix.conductor.core.execution;

import com.netflix.conductor.common.run.Workflow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Stub listener default implementation
*/
public class WorkflowStatusListenerStub implements WorkflowStatusListener {

private static final Logger LOG = LoggerFactory.getLogger(WorkflowStatusListenerStub.class);

@Override
public void onWorkflowCompleted(Workflow workflow) {
LOG.debug("Workflow {} is completed", workflow.getWorkflowId());
}

@Override
public void onWorkflowTerminated(Workflow workflow) {
LOG.debug("Workflow {} is terminated", workflow.getWorkflowId());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,16 @@ public String updateTask(TaskResult taskResult) {
public String ackTaskReceived(String taskId, String workerId) {
ServiceUtils.checkNotNullOrEmpty(taskId, "TaskId cannot be null or empty.");
LOGGER.debug("Ack received for task: {} from worker: {}", taskId, workerId);
return String.valueOf(executionService.ackTaskReceived(taskId));
boolean ackResult;
try {
ackResult = executionService.ackTaskReceived(taskId);
} catch (Exception e) {
// safe to ignore exception here, since the task will not be processed by the worker due to ack failure
// The task will eventually be available to be polled again after the unack timeout
LOGGER.error("Exception when trying to ack task {} from worker {}", taskId, workerId, e);
ackResult = false;
}
return String.valueOf(ackResult);
}

/**
Expand Down
Loading

0 comments on commit d38bdd5

Please sign in to comment.