Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #846 from Netflix/feature/add_workflow_completion_…
Browse files Browse the repository at this point in the history
…listener-AL

Feature to add workflow completion listener
  • Loading branch information
Alex authored Nov 2, 2018
2 parents 591e4d3 + 7ef5806 commit ecc8936
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
*/
package com.netflix.conductor.common.metadata.workflow;

import com.github.vmg.protogen.annotations.ProtoField;
import com.github.vmg.protogen.annotations.ProtoMessage;
import com.google.common.base.MoreObjects;
import com.netflix.conductor.common.metadata.Auditable;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
Expand All @@ -26,11 +31,6 @@
import java.util.Objects;
import java.util.Optional;

import com.github.vmg.protogen.annotations.ProtoField;
import com.github.vmg.protogen.annotations.ProtoMessage;
import com.google.common.base.MoreObjects;
import com.netflix.conductor.common.metadata.Auditable;

/**
* @author Viren
*
Expand Down Expand Up @@ -66,6 +66,9 @@ public class WorkflowDef extends Auditable {
@ProtoField(id = 9)
private boolean restartable = true;

@ProtoField(id = 10)
private boolean workflowStatusListenerEnabled = false;

/**
* @return the name
*/
Expand Down Expand Up @@ -201,6 +204,22 @@ public void setSchemaVersion(int schemaVersion) {
this.schemaVersion = schemaVersion;
}

/**
*
* @return true is workflow listener will be invoked when workflow gets into a terminal state
*/
public boolean isWorkflowStatusListenerEnabled() {
return workflowStatusListenerEnabled;
}

/**
* Specify if workflow listener is enabled to invoke a callback for completed or terminated workflows
* @param workflowStatusListenerEnabled
*/
public void setWorkflowStatusListenerEnabled(boolean workflowStatusListenerEnabled) {
this.workflowStatusListenerEnabled = workflowStatusListenerEnabled;
}

public String key(){
return getKey(name, version);
}
Expand Down Expand Up @@ -287,6 +306,7 @@ public String toString() {
.add("failureWorkflow", failureWorkflow)
.add("schemaVersion", schemaVersion)
.add("restartable", restartable)
.add("workflowStatusListenerEnabled", workflowStatusListenerEnabled)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class WorkflowExecutor {
private final MetadataMapperService metadataMapperService;
private final ParametersUtils parametersUtils;

private WorkflowStatusListener workflowStatusListener;

private int activeWorkerLastPollInSecs;

public static final String DECIDER_QUEUE = "_deciderQueue";
Expand All @@ -103,6 +105,7 @@ public WorkflowExecutor(
QueueDAO queueDAO,
MetadataMapperService metadataMapperService,
ParametersUtils parametersUtils,
WorkflowStatusListener workflowStatusListener,
Configuration config
) {
this.deciderService = deciderService;
Expand All @@ -113,6 +116,7 @@ public WorkflowExecutor(
this.metadataMapperService = metadataMapperService;
this.activeWorkerLastPollInSecs = config.getIntProperty("tasks.active.worker.lastpoll", 10);
this.parametersUtils = parametersUtils;
this.workflowStatusListener = workflowStatusListener;
}

/**
Expand Down Expand Up @@ -535,6 +539,10 @@ void completeWorkflow(Workflow wf) {
Monitors.recordWorkflowCompletion(workflow.getWorkflowName(), workflow.getEndTime() - workflow.getStartTime(), wf.getOwnerApp());
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId()); //remove from the sweep queue
logger.debug("Removed workflow {} from decider queue", wf.getWorkflowId());

if(wf.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
workflowStatusListener.onWorkflowCompleted(wf);
}
}

public void terminateWorkflow(String workflowId, String reason) {
Expand Down Expand Up @@ -629,6 +637,10 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo

// Send to atlas
Monitors.recordWorkflowTermination(workflow.getWorkflowName(), workflow.getStatus(), workflow.getOwnerApp());

if(workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
workflowStatusListener.onWorkflowTerminated(workflow);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.netflix.conductor.core.execution;

import com.google.inject.AbstractModule;

/**
* Default implementation for the workflow status listener
*
*/
public class WorkflowExecutorModule extends AbstractModule {
@Override
protected void configure() {
bind(WorkflowStatusListener.class).to(WorkflowStatusListenerStub.class);//default implementation
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.netflix.conductor.core.execution;

import com.netflix.conductor.common.run.Workflow;

/**
* Listener for the completed and terminated workflows
*
*/
public interface WorkflowStatusListener {
void onWorkflowCompleted(Workflow workflow);
void onWorkflowTerminated(Workflow workflow);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.netflix.conductor.core.execution;

import com.netflix.conductor.common.run.Workflow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Stub listener default implementation
*/
public class WorkflowStatusListenerStub implements WorkflowStatusListener {

private static final Logger LOG = LoggerFactory.getLogger(WorkflowStatusListenerStub.class);

@Override
public void onWorkflowCompleted(Workflow workflow) {
LOG.debug("Workflow {} is completed", workflow.getWorkflowId());
}

@Override
public void onWorkflowTerminated(Workflow workflow) {
LOG.debug("Workflow {} is terminated", workflow.getWorkflowId());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
Expand All @@ -79,13 +81,15 @@ public class TestWorkflowExecutor {
private ExecutionDAO executionDAO;
private MetadataDAO metadataDAO;
private QueueDAO queueDAO;
private WorkflowStatusListener workflowStatusListener;

@Before
public void init() {
TestConfiguration config = new TestConfiguration();
executionDAO = mock(ExecutionDAO.class);
metadataDAO = mock(MetadataDAO.class);
queueDAO = mock(QueueDAO.class);
workflowStatusListener = mock(WorkflowStatusListener.class);
ExternalPayloadStorageUtils externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class);
ObjectMapper objectMapper = new ObjectMapper();
ParametersUtils parametersUtils = new ParametersUtils();
Expand All @@ -103,7 +107,8 @@ public void init() {

DeciderService deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers);
MetadataMapperService metadataMapperService = new MetadataMapperService(metadataDAO);
workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, executionDAO, queueDAO, metadataMapperService, parametersUtils, config);
workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, executionDAO, queueDAO, metadataMapperService,
parametersUtils, workflowStatusListener, config);
}

@Test
Expand Down Expand Up @@ -266,8 +271,63 @@ public void testCompleteWorkflow() {
assertEquals(1, updateWorkflowCalledCounter.get());
assertEquals(1, updateTasksCalledCounter.get());
assertEquals(1, removeQueueEntryCalledCounter.get());

verify(workflowStatusListener, times(0)).onWorkflowCompleted(any(Workflow.class));

def.setWorkflowStatusListenerEnabled(true);
workflow.setStatus(Workflow.WorkflowStatus.RUNNING);
workflowExecutor.completeWorkflow(workflow);
verify(workflowStatusListener, times(1)).onWorkflowCompleted(any(Workflow.class));
}

@Test
@SuppressWarnings("unchecked")
public void testTerminatedWorkflow() {
WorkflowDef def = new WorkflowDef();
def.setName("test");

Workflow workflow = new Workflow();
workflow.setWorkflowDefinition(def);
workflow.setWorkflowId("1");
workflow.setStatus(Workflow.WorkflowStatus.RUNNING);
workflow.setOwnerApp("junit_test");
workflow.setStartTime(10L);
workflow.setEndTime(100L);
workflow.setOutput(Collections.EMPTY_MAP);

when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow);

AtomicInteger updateWorkflowCalledCounter = new AtomicInteger(0);
doAnswer(invocation -> {
updateWorkflowCalledCounter.incrementAndGet();
return null;
}).when(executionDAO).updateWorkflow(any());

AtomicInteger updateTasksCalledCounter = new AtomicInteger(0);
doAnswer(invocation -> {
updateTasksCalledCounter.incrementAndGet();
return null;
}).when(executionDAO).updateTasks(any());

AtomicInteger removeQueueEntryCalledCounter = new AtomicInteger(0);
doAnswer(invocation -> {
removeQueueEntryCalledCounter.incrementAndGet();
return null;
}).when(queueDAO).remove(anyString(), anyString());

workflowExecutor.terminateWorkflow("workflowId", "reason");
assertEquals(Workflow.WorkflowStatus.TERMINATED, workflow.getStatus());
assertEquals(1, updateWorkflowCalledCounter.get());
assertEquals(1, removeQueueEntryCalledCounter.get());

verify(workflowStatusListener, times(0)).onWorkflowTerminated(any(Workflow.class));

def.setWorkflowStatusListenerEnabled(true);
workflow.setStatus(Workflow.WorkflowStatus.RUNNING);
workflowExecutor.completeWorkflow(workflow);
verify(workflowStatusListener, times(1)).onWorkflowCompleted(any(Workflow.class));
}

@Test
public void testGetFailedTasksToRetry() {
//setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ public WorkflowDefPb.WorkflowDef toProto(WorkflowDef from) {
}
to.setSchemaVersion( from.getSchemaVersion() );
to.setRestartable( from.isRestartable() );
to.setWorkflowStatusListenerEnabled( from.isWorkflowStatusListenerEnabled() );
return to.build();
}

Expand All @@ -1031,6 +1032,7 @@ public WorkflowDef fromProto(WorkflowDefPb.WorkflowDef from) {
to.setFailureWorkflow( from.getFailureWorkflow() );
to.setSchemaVersion( from.getSchemaVersion() );
to.setRestartable( from.getRestartable() );
to.setWorkflowStatusListenerEnabled( from.getWorkflowStatusListenerEnabled() );
return to;
}

Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/proto/model/workflowdef.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ message WorkflowDef {
string failure_workflow = 7;
int32 schema_version = 8;
bool restartable = 9;
bool workflow_status_listener_enabled = 10;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.netflix.conductor.contribs.http.RestClientManager;
import com.netflix.conductor.contribs.json.JsonJqTransform;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.execution.WorkflowExecutorModule;
import com.netflix.conductor.core.utils.DummyPayloadStorage;
import com.netflix.conductor.core.utils.S3PayloadStorage;
import com.netflix.conductor.dao.RedisWorkflowModule;
Expand Down Expand Up @@ -91,6 +92,8 @@ private List<AbstractModule> selectModulesToLoad() {

modules.add(new ElasticSearchV5Module());

modules.add(new WorkflowExecutorModule());

if (configuration.getJerseyEnabled()) {
modules.add(new JerseyModule());
modules.add(new SwaggerModule());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.netflix.conductor.common.utils.JsonMapperProvider;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.config.CoreModule;
import com.netflix.conductor.core.execution.WorkflowStatusListenerStub;
import com.netflix.conductor.core.execution.WorkflowStatusListener;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.dao.MetadataDAO;
Expand Down Expand Up @@ -60,6 +62,8 @@ protected void configure() {
bind(QueueDAO.class).to(DynoQueueDAO.class);
bind(IndexDAO.class).to(MockIndexDAO.class);

bind(WorkflowStatusListener.class).to(WorkflowStatusListenerStub.class);

install(new CoreModule());
bind(UserTask.class).asEagerSingleton();
bind(ObjectMapper.class).toProvider(JsonMapperProvider.class);
Expand Down

0 comments on commit ecc8936

Please sign in to comment.