From 0fc35a9dcc20fe913a4449a8423aec15fd8eceb8 Mon Sep 17 00:00:00 2001 From: Alex Lich Date: Wed, 31 Oct 2018 16:22:44 -0700 Subject: [PATCH 1/5] add default listeners for the workflow when it gets to a terminal state --- .../execution/WorkflowStatusListenerStub.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListenerStub.java diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListenerStub.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListenerStub.java new file mode 100644 index 0000000000..dd85001981 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListenerStub.java @@ -0,0 +1,24 @@ +package com.netflix.conductor.core.execution; + +import com.netflix.conductor.common.run.Workflow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Stub listener default implementation + */ +public class WorkflowStatusListenerStub implements WorkflowStatusListener { + + private static final Logger LOG = LoggerFactory.getLogger(WorkflowStatusListenerStub.class); + + @Override + public void onWorkflowCompleted(Workflow workflow) { + LOG.debug("Workflow {} is completed", workflow.toString()); + } + + @Override + public void onWorkflowTerminated(Workflow workflow) { + LOG.debug("Workflow {} is terminated", workflow.toString()); + } + +} From d950dd10b88882823b35e6d4db40c60c4bd63b94 Mon Sep 17 00:00:00 2001 From: Alex Lich Date: Wed, 31 Oct 2018 16:48:40 -0700 Subject: [PATCH 2/5] add default listeners for the workflow when it gets to a terminal state --- .../common/metadata/workflow/WorkflowDef.java | 30 +++++++-- .../core/execution/WorkflowExecutor.java | 12 ++++ .../execution/WorkflowExecutorModule.java | 14 +++++ .../execution/WorkflowStatusListener.java | 12 ++++ .../core/execution/TestWorkflowExecutor.java | 62 ++++++++++++++++++- .../conductor/grpc/AbstractProtoMapper.java | 2 + grpc/src/main/proto/model/workflowdef.proto | 1 + .../conductor/bootstrap/ModulesProvider.java | 4 +- .../conductor/tests/utils/TestModule.java | 4 ++ 9 files changed, 134 insertions(+), 7 deletions(-) create mode 100644 core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorModule.java create mode 100644 core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListener.java diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java index f68765fa17..d091e070ad 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java @@ -18,6 +18,11 @@ */ package com.netflix.conductor.common.metadata.workflow; +import com.github.vmg.protogen.annotations.ProtoField; +import com.github.vmg.protogen.annotations.ProtoMessage; +import com.google.common.base.MoreObjects; +import com.netflix.conductor.common.metadata.Auditable; + import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -26,11 +31,6 @@ import java.util.Objects; import java.util.Optional; -import com.github.vmg.protogen.annotations.ProtoField; -import com.github.vmg.protogen.annotations.ProtoMessage; -import com.google.common.base.MoreObjects; -import com.netflix.conductor.common.metadata.Auditable; - /** * @author Viren * @@ -66,6 +66,9 @@ public class WorkflowDef extends Auditable { @ProtoField(id = 9) private boolean restartable = true; + @ProtoField(id = 10) + private boolean terminalWorkflowListenerEnabled = false; + /** * @return the name */ @@ -201,6 +204,22 @@ public void setSchemaVersion(int schemaVersion) { this.schemaVersion = schemaVersion; } + /** + * + * @return true is workflow listener will be invoked when workflow gets into a terminal state + */ + public boolean isTerminalWorkflowListenerEnabled() { + return terminalWorkflowListenerEnabled; + } + + /** + * Specify if workflow listener is enabled to invoke a callback for completed or terminated workflows + * @param terminalWorkflowListenerEnabled + */ + public void setTerminalWorkflowListenerEnabled(boolean terminalWorkflowListenerEnabled) { + this.terminalWorkflowListenerEnabled = terminalWorkflowListenerEnabled; + } + public String key(){ return getKey(name, version); } @@ -287,6 +306,7 @@ public String toString() { .add("failureWorkflow", failureWorkflow) .add("schemaVersion", schemaVersion) .add("restartable", restartable) + .add("terminalWorkflowListenerEnabled", terminalWorkflowListenerEnabled) .toString(); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index d468f0d6f9..c348b721ec 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -90,6 +90,8 @@ public class WorkflowExecutor { private final MetadataMapperService metadataMapperService; private final ParametersUtils parametersUtils; + private WorkflowStatusListener workflowStatusListener; + private int activeWorkerLastPollInSecs; public static final String DECIDER_QUEUE = "_deciderQueue"; @@ -103,6 +105,7 @@ public WorkflowExecutor( QueueDAO queueDAO, MetadataMapperService metadataMapperService, ParametersUtils parametersUtils, + WorkflowStatusListener workflowStatusListener, Configuration config ) { this.deciderService = deciderService; @@ -113,6 +116,7 @@ public WorkflowExecutor( this.metadataMapperService = metadataMapperService; this.activeWorkerLastPollInSecs = config.getIntProperty("tasks.active.worker.lastpoll", 10); this.parametersUtils = parametersUtils; + this.workflowStatusListener = workflowStatusListener; } /** @@ -535,6 +539,10 @@ void completeWorkflow(Workflow wf) { Monitors.recordWorkflowCompletion(workflow.getWorkflowName(), workflow.getEndTime() - workflow.getStartTime(), wf.getOwnerApp()); queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId()); //remove from the sweep queue logger.debug("Removed workflow {} from decider queue", wf.getWorkflowId()); + + if(wf.getWorkflowDefinition().isTerminalWorkflowListenerEnabled()) { + workflowStatusListener.onWorkflowCompleted(wf); + } } public void terminateWorkflow(String workflowId, String reason) { @@ -629,6 +637,10 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo // Send to atlas Monitors.recordWorkflowTermination(workflow.getWorkflowName(), workflow.getStatus(), workflow.getOwnerApp()); + + if(workflow.getWorkflowDefinition().isTerminalWorkflowListenerEnabled()) { + workflowStatusListener.onWorkflowTerminated(workflow); + } } /** diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorModule.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorModule.java new file mode 100644 index 0000000000..d9e6aa09d8 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorModule.java @@ -0,0 +1,14 @@ +package com.netflix.conductor.core.execution; + +import com.google.inject.AbstractModule; + +/** + * Default implementation for the workflow status listener + * + */ +public class WorkflowExecutorModule extends AbstractModule { + @Override + protected void configure() { + bind(WorkflowStatusListener.class).to(WorkflowStatusListenerStub.class);//default implementation + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListener.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListener.java new file mode 100644 index 0000000000..a70a4ea27e --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListener.java @@ -0,0 +1,12 @@ +package com.netflix.conductor.core.execution; + +import com.netflix.conductor.common.run.Workflow; + +/** + * Listener for the completed and terminated workflows + * + */ +public interface WorkflowStatusListener { + void onWorkflowCompleted(Workflow workflow); + void onWorkflowTerminated(Workflow workflow); +} diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index 04cc50f5ca..6d27dc9b05 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -68,6 +68,8 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -79,6 +81,7 @@ public class TestWorkflowExecutor { private ExecutionDAO executionDAO; private MetadataDAO metadataDAO; private QueueDAO queueDAO; + private WorkflowStatusListener workflowStatusListener; @Before public void init() { @@ -86,6 +89,7 @@ public void init() { executionDAO = mock(ExecutionDAO.class); metadataDAO = mock(MetadataDAO.class); queueDAO = mock(QueueDAO.class); + workflowStatusListener = mock(WorkflowStatusListener.class); ExternalPayloadStorageUtils externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class); ObjectMapper objectMapper = new ObjectMapper(); ParametersUtils parametersUtils = new ParametersUtils(); @@ -103,7 +107,8 @@ public void init() { DeciderService deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers); MetadataMapperService metadataMapperService = new MetadataMapperService(metadataDAO); - workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, executionDAO, queueDAO, metadataMapperService, parametersUtils, config); + workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, executionDAO, queueDAO, metadataMapperService, + parametersUtils, workflowStatusListener, config); } @Test @@ -266,8 +271,63 @@ public void testCompleteWorkflow() { assertEquals(1, updateWorkflowCalledCounter.get()); assertEquals(1, updateTasksCalledCounter.get()); assertEquals(1, removeQueueEntryCalledCounter.get()); + + verify(workflowStatusListener, times(0)).onWorkflowCompleted(any(Workflow.class)); + + def.setTerminalWorkflowListenerEnabled(true); + workflow.setStatus(Workflow.WorkflowStatus.RUNNING); + workflowExecutor.completeWorkflow(workflow); + verify(workflowStatusListener, times(1)).onWorkflowCompleted(any(Workflow.class)); } + @Test + @SuppressWarnings("unchecked") + public void testTerminatedWorkflow() { + WorkflowDef def = new WorkflowDef(); + def.setName("test"); + + Workflow workflow = new Workflow(); + workflow.setWorkflowDefinition(def); + workflow.setWorkflowId("1"); + workflow.setStatus(Workflow.WorkflowStatus.RUNNING); + workflow.setOwnerApp("junit_test"); + workflow.setStartTime(10L); + workflow.setEndTime(100L); + workflow.setOutput(Collections.EMPTY_MAP); + + when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow); + + AtomicInteger updateWorkflowCalledCounter = new AtomicInteger(0); + doAnswer(invocation -> { + updateWorkflowCalledCounter.incrementAndGet(); + return null; + }).when(executionDAO).updateWorkflow(any()); + + AtomicInteger updateTasksCalledCounter = new AtomicInteger(0); + doAnswer(invocation -> { + updateTasksCalledCounter.incrementAndGet(); + return null; + }).when(executionDAO).updateTasks(any()); + + AtomicInteger removeQueueEntryCalledCounter = new AtomicInteger(0); + doAnswer(invocation -> { + removeQueueEntryCalledCounter.incrementAndGet(); + return null; + }).when(queueDAO).remove(anyString(), anyString()); + + workflowExecutor.terminateWorkflow("workflowId", "reason"); + assertEquals(Workflow.WorkflowStatus.TERMINATED, workflow.getStatus()); + assertEquals(1, updateWorkflowCalledCounter.get()); + assertEquals(1, removeQueueEntryCalledCounter.get()); + + verify(workflowStatusListener, times(0)).onWorkflowTerminated(any(Workflow.class)); + + def.setTerminalWorkflowListenerEnabled(true); + workflow.setStatus(Workflow.WorkflowStatus.RUNNING); + workflowExecutor.completeWorkflow(workflow); + verify(workflowStatusListener, times(1)).onWorkflowCompleted(any(Workflow.class)); + } + @Test public void testGetFailedTasksToRetry() { //setup diff --git a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java index 978401accf..b71ae9bddd 100644 --- a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java +++ b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java @@ -1013,6 +1013,7 @@ public WorkflowDefPb.WorkflowDef toProto(WorkflowDef from) { } to.setSchemaVersion( from.getSchemaVersion() ); to.setRestartable( from.isRestartable() ); + to.setTerminalWorkflowListenerEnabled( from.isTerminalWorkflowListenerEnabled() ); return to.build(); } @@ -1031,6 +1032,7 @@ public WorkflowDef fromProto(WorkflowDefPb.WorkflowDef from) { to.setFailureWorkflow( from.getFailureWorkflow() ); to.setSchemaVersion( from.getSchemaVersion() ); to.setRestartable( from.getRestartable() ); + to.setTerminalWorkflowListenerEnabled( from.getTerminalWorkflowListenerEnabled() ); return to; } diff --git a/grpc/src/main/proto/model/workflowdef.proto b/grpc/src/main/proto/model/workflowdef.proto index 9e5be4f627..e724546398 100644 --- a/grpc/src/main/proto/model/workflowdef.proto +++ b/grpc/src/main/proto/model/workflowdef.proto @@ -18,4 +18,5 @@ message WorkflowDef { string failure_workflow = 7; int32 schema_version = 8; bool restartable = 9; + bool terminal_workflow_listener_enabled = 10; } diff --git a/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java b/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java index fa8797f1e1..cff86c9ae3 100644 --- a/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java +++ b/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java @@ -7,7 +7,7 @@ import com.netflix.conductor.contribs.http.RestClientManager; import com.netflix.conductor.contribs.json.JsonJqTransform; import com.netflix.conductor.core.config.Configuration; -import com.netflix.conductor.core.config.SystemPropertiesConfiguration; +import com.netflix.conductor.core.execution.WorkflowExecutorModule; import com.netflix.conductor.core.utils.DummyPayloadStorage; import com.netflix.conductor.core.utils.S3PayloadStorage; import com.netflix.conductor.dao.RedisWorkflowModule; @@ -92,6 +92,8 @@ private List selectModulesToLoad() { modules.add(new ElasticSearchV5Module()); + modules.add(new WorkflowExecutorModule()); + if (configuration.getJerseyEnabled()) { modules.add(new JerseyModule()); modules.add(new SwaggerModule()); diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/utils/TestModule.java b/test-harness/src/test/java/com/netflix/conductor/tests/utils/TestModule.java index 695a989087..d2ca90ce66 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/utils/TestModule.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/utils/TestModule.java @@ -19,6 +19,8 @@ import com.netflix.conductor.common.utils.JsonMapperProvider; import com.netflix.conductor.core.config.Configuration; import com.netflix.conductor.core.config.CoreModule; +import com.netflix.conductor.core.execution.WorkflowStatusListenerStub; +import com.netflix.conductor.core.execution.WorkflowStatusListener; import com.netflix.conductor.dao.ExecutionDAO; import com.netflix.conductor.dao.IndexDAO; import com.netflix.conductor.dao.MetadataDAO; @@ -60,6 +62,8 @@ protected void configure() { bind(QueueDAO.class).to(DynoQueueDAO.class); bind(IndexDAO.class).to(MockIndexDAO.class); + bind(WorkflowStatusListener.class).to(WorkflowStatusListenerStub.class); + install(new CoreModule()); bind(UserTask.class).asEagerSingleton(); bind(ObjectMapper.class).toProvider(JsonMapperProvider.class); From febac6c913934deeef10ab4847d7dcb901271602 Mon Sep 17 00:00:00 2001 From: Alex Lich Date: Thu, 1 Nov 2018 14:18:54 -0700 Subject: [PATCH 3/5] PR cleanup, fields renamed --- .../common/metadata/workflow/WorkflowDef.java | 14 +++++++------- .../conductor/core/execution/WorkflowExecutor.java | 4 ++-- .../core/execution/WorkflowStatusListenerStub.java | 4 ++-- .../core/execution/TestWorkflowExecutor.java | 4 ++-- .../conductor/grpc/AbstractProtoMapper.java | 4 ++-- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java index d091e070ad..e7bd12584e 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java @@ -67,7 +67,7 @@ public class WorkflowDef extends Auditable { private boolean restartable = true; @ProtoField(id = 10) - private boolean terminalWorkflowListenerEnabled = false; + private boolean workflowStatusListenerEnabled = false; /** * @return the name @@ -208,16 +208,16 @@ public void setSchemaVersion(int schemaVersion) { * * @return true is workflow listener will be invoked when workflow gets into a terminal state */ - public boolean isTerminalWorkflowListenerEnabled() { - return terminalWorkflowListenerEnabled; + public boolean isWorkflowStatusListenerEnabled() { + return workflowStatusListenerEnabled; } /** * Specify if workflow listener is enabled to invoke a callback for completed or terminated workflows - * @param terminalWorkflowListenerEnabled + * @param workflowStatusListenerEnabled */ - public void setTerminalWorkflowListenerEnabled(boolean terminalWorkflowListenerEnabled) { - this.terminalWorkflowListenerEnabled = terminalWorkflowListenerEnabled; + public void setWorkflowStatusListenerEnabled(boolean workflowStatusListenerEnabled) { + this.workflowStatusListenerEnabled = workflowStatusListenerEnabled; } public String key(){ @@ -306,7 +306,7 @@ public String toString() { .add("failureWorkflow", failureWorkflow) .add("schemaVersion", schemaVersion) .add("restartable", restartable) - .add("terminalWorkflowListenerEnabled", terminalWorkflowListenerEnabled) + .add("workflowStatusListenerEnabled", workflowStatusListenerEnabled) .toString(); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index c348b721ec..d7722acc11 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -540,7 +540,7 @@ void completeWorkflow(Workflow wf) { queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId()); //remove from the sweep queue logger.debug("Removed workflow {} from decider queue", wf.getWorkflowId()); - if(wf.getWorkflowDefinition().isTerminalWorkflowListenerEnabled()) { + if(wf.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) { workflowStatusListener.onWorkflowCompleted(wf); } } @@ -638,7 +638,7 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo // Send to atlas Monitors.recordWorkflowTermination(workflow.getWorkflowName(), workflow.getStatus(), workflow.getOwnerApp()); - if(workflow.getWorkflowDefinition().isTerminalWorkflowListenerEnabled()) { + if(workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) { workflowStatusListener.onWorkflowTerminated(workflow); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListenerStub.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListenerStub.java index dd85001981..25091cd54d 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListenerStub.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListenerStub.java @@ -13,12 +13,12 @@ public class WorkflowStatusListenerStub implements WorkflowStatusListener { @Override public void onWorkflowCompleted(Workflow workflow) { - LOG.debug("Workflow {} is completed", workflow.toString()); + LOG.debug("Workflow {} is completed", workflow.getWorkflowId()); } @Override public void onWorkflowTerminated(Workflow workflow) { - LOG.debug("Workflow {} is terminated", workflow.toString()); + LOG.debug("Workflow {} is terminated", workflow.getWorkflowId()); } } diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index 6d27dc9b05..4f48a1d5ed 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -274,7 +274,7 @@ public void testCompleteWorkflow() { verify(workflowStatusListener, times(0)).onWorkflowCompleted(any(Workflow.class)); - def.setTerminalWorkflowListenerEnabled(true); + def.setWorkflowStatusListenerEnabled(true); workflow.setStatus(Workflow.WorkflowStatus.RUNNING); workflowExecutor.completeWorkflow(workflow); verify(workflowStatusListener, times(1)).onWorkflowCompleted(any(Workflow.class)); @@ -322,7 +322,7 @@ public void testTerminatedWorkflow() { verify(workflowStatusListener, times(0)).onWorkflowTerminated(any(Workflow.class)); - def.setTerminalWorkflowListenerEnabled(true); + def.setWorkflowStatusListenerEnabled(true); workflow.setStatus(Workflow.WorkflowStatus.RUNNING); workflowExecutor.completeWorkflow(workflow); verify(workflowStatusListener, times(1)).onWorkflowCompleted(any(Workflow.class)); diff --git a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java index b71ae9bddd..c7675f2e6f 100644 --- a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java +++ b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java @@ -1013,7 +1013,7 @@ public WorkflowDefPb.WorkflowDef toProto(WorkflowDef from) { } to.setSchemaVersion( from.getSchemaVersion() ); to.setRestartable( from.isRestartable() ); - to.setTerminalWorkflowListenerEnabled( from.isTerminalWorkflowListenerEnabled() ); + to.setWorkflowStatusListenerEnabled( from.isWorkflowStatusListenerEnabled() ); return to.build(); } @@ -1032,7 +1032,7 @@ public WorkflowDef fromProto(WorkflowDefPb.WorkflowDef from) { to.setFailureWorkflow( from.getFailureWorkflow() ); to.setSchemaVersion( from.getSchemaVersion() ); to.setRestartable( from.getRestartable() ); - to.setTerminalWorkflowListenerEnabled( from.getTerminalWorkflowListenerEnabled() ); + to.setWorkflowStatusListenerEnabled( from.getWorkflowStatusListenerEnabled() ); return to; } From 8c544c666c4a7f982be7310702507f0cc3c96291 Mon Sep 17 00:00:00 2001 From: Alex Lich Date: Thu, 1 Nov 2018 14:22:42 -0700 Subject: [PATCH 4/5] PR cleanup, fields renamed --- grpc/src/main/proto/model/workflowdef.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grpc/src/main/proto/model/workflowdef.proto b/grpc/src/main/proto/model/workflowdef.proto index e724546398..1224b6267d 100644 --- a/grpc/src/main/proto/model/workflowdef.proto +++ b/grpc/src/main/proto/model/workflowdef.proto @@ -18,5 +18,5 @@ message WorkflowDef { string failure_workflow = 7; int32 schema_version = 8; bool restartable = 9; - bool terminal_workflow_listener_enabled = 10; + bool workflow_status_listener_enabled = 10; } From 169485b9223d5a518e516f2e973a8c8bc2aae772 Mon Sep 17 00:00:00 2001 From: Anoop Panicker Date: Thu, 1 Nov 2018 18:03:40 -0700 Subject: [PATCH 5/5] do not return task if ack failed/throws exception --- .../client/task/WorkflowTaskCoordinator.java | 5 +-- .../task/WorkflowTaskCoordinatorTests.java | 39 ++++--------------- .../conductor/service/TaskService.java | 11 +++++- .../conductor/bootstrap/ModulesProvider.java | 3 +- 4 files changed, 19 insertions(+), 39 deletions(-) diff --git a/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java b/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java index c72f5feb2e..c84c08d498 100644 --- a/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java +++ b/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java @@ -355,7 +355,6 @@ private void pollForTask(Worker worker) { private void execute(Worker worker, Task task) { String taskType = task.getTaskDefName(); try { - if(!worker.preAck(task)) { logger.debug("Worker decided not to ack the task {}, taskId = {}", taskType, task.getTaskId()); return; @@ -363,15 +362,13 @@ private void execute(Worker worker, Task task) { if (!taskClient.ack(task.getTaskId(), worker.getIdentity())) { WorkflowTaskMetrics.incrementTaskAckFailedCount(worker.getTaskDefName()); - logger.error("Ack failed for {}, taskId = {}", taskType, task.getTaskId()); - returnTask(worker, task); return; } + logger.debug("Ack successful for {}, taskId = {}", taskType, task.getTaskId()); } catch (Exception e) { logger.error(String.format("ack exception for task %s, taskId = %s in worker - %s", task.getTaskDefName(), task.getTaskId(), worker.getIdentity()), e); WorkflowTaskMetrics.incrementTaskAckErrorCount(worker.getTaskDefName(), e); - returnTask(worker, task); return; } diff --git a/client/src/test/java/com/netflix/conductor/client/task/WorkflowTaskCoordinatorTests.java b/client/src/test/java/com/netflix/conductor/client/task/WorkflowTaskCoordinatorTests.java index a9277cf522..7d6a9f8f91 100644 --- a/client/src/test/java/com/netflix/conductor/client/task/WorkflowTaskCoordinatorTests.java +++ b/client/src/test/java/com/netflix/conductor/client/task/WorkflowTaskCoordinatorTests.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2017 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,9 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -/** - * - */ package com.netflix.conductor.client.task; import com.google.common.collect.ImmutableList; @@ -24,6 +21,7 @@ import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskResult; +import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; @@ -54,7 +52,7 @@ public void testNoWorkersException() { @Test public void testThreadPool() { - Worker worker = Worker.create("test", (Task task)-> new TaskResult(task)); + Worker worker = Worker.create("test", TaskResult::new); WorkflowTaskCoordinator coordinator = new WorkflowTaskCoordinator.Builder().withWorkers(worker, worker, worker).withTaskClient(new TaskClient()).build(); assertEquals(-1, coordinator.getThreadCount()); //Not initialized yet coordinator.init(); @@ -114,7 +112,7 @@ public void testTaskException() { } @Test - public void testReturnTaskWhenAckFailed() { + public void testNoOpWhenAckFailed() { Worker worker = mock(Worker.class); when(worker.getPollingInterval()).thenReturn(1000); when(worker.getPollCount()).thenReturn(1); @@ -135,27 +133,16 @@ public void testReturnTaskWhenAckFailed() { testTask.setStatus(Task.Status.IN_PROGRESS); when(client.batchPollTasksInDomain(anyString(), anyString(), anyString(), anyInt(), anyInt())).thenReturn(ImmutableList.of(testTask)); when(client.ack(anyString(), anyString())).thenReturn(false); - CountDownLatch latch = new CountDownLatch(1); - - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - TaskResult result = (TaskResult) args[0]; - assertEquals(TaskResult.Status.IN_PROGRESS, result.getStatus()); - latch.countDown(); - return null; - } - ).when(client).updateTask(any(), anyString()); coordinator.init(); - Uninterruptibles.awaitUninterruptibly(latch); // then worker.execute must not be called and task must be updated with IN_PROGRESS status verify(worker, never()).execute(any()); - Mockito.verify(client).updateTask(any(), anyString()); + verify(client, never()).updateTask(any(), any()); } @Test - public void testReturnTaskWhenAckThrowsException() { + public void testNoOpWhenAckThrowsException() { Worker worker = mock(Worker.class); when(worker.getPollingInterval()).thenReturn(1000); when(worker.getPollCount()).thenReturn(1); @@ -176,24 +163,12 @@ public void testReturnTaskWhenAckThrowsException() { testTask.setStatus(Task.Status.IN_PROGRESS); when(client.batchPollTasksInDomain(anyString(), anyString(), anyString(), anyInt(), anyInt())).thenReturn(ImmutableList.of(testTask)); when(client.ack(anyString(), anyString())).thenThrow(new RuntimeException("Ack failed")); - CountDownLatch latch = new CountDownLatch(1); - - doAnswer(invocation -> { - assertEquals("test-worker-0", Thread.currentThread().getName()); - Object[] args = invocation.getArguments(); - TaskResult result = (TaskResult) args[0]; - assertEquals(TaskResult.Status.IN_PROGRESS, result.getStatus()); - latch.countDown(); - return null; - } - ).when(client).updateTask(any(), anyString()); coordinator.init(); - Uninterruptibles.awaitUninterruptibly(latch); // then worker.execute must not be called and task must be updated with IN_PROGRESS status verify(worker, never()).execute(any()); - Mockito.verify(client).updateTask(any(), anyString()); + verify(client, never()).updateTask(any(), any()); } @Test diff --git a/core/src/main/java/com/netflix/conductor/service/TaskService.java b/core/src/main/java/com/netflix/conductor/service/TaskService.java index 17c14d52c3..976f1561af 100644 --- a/core/src/main/java/com/netflix/conductor/service/TaskService.java +++ b/core/src/main/java/com/netflix/conductor/service/TaskService.java @@ -152,7 +152,16 @@ public String updateTask(TaskResult taskResult) { public String ackTaskReceived(String taskId, String workerId) { ServiceUtils.checkNotNullOrEmpty(taskId, "TaskId cannot be null or empty."); LOGGER.debug("Ack received for task: {} from worker: {}", taskId, workerId); - return String.valueOf(executionService.ackTaskReceived(taskId)); + boolean ackResult; + try { + ackResult = executionService.ackTaskReceived(taskId); + } catch (Exception e) { + // safe to ignore exception here, since the task will not be processed by the worker due to ack failure + // The task will eventually be available to be polled again after the unack timeout + LOGGER.error("Exception when trying to ack task {} from worker {}", taskId, workerId, e); + ackResult = false; + } + return String.valueOf(ackResult); } /** diff --git a/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java b/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java index fa8797f1e1..7cba287e1b 100644 --- a/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java +++ b/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java @@ -7,7 +7,6 @@ import com.netflix.conductor.contribs.http.RestClientManager; import com.netflix.conductor.contribs.json.JsonJqTransform; import com.netflix.conductor.core.config.Configuration; -import com.netflix.conductor.core.config.SystemPropertiesConfiguration; import com.netflix.conductor.core.utils.DummyPayloadStorage; import com.netflix.conductor.core.utils.S3PayloadStorage; import com.netflix.conductor.dao.RedisWorkflowModule; @@ -54,7 +53,7 @@ public List get() { } private List selectModulesToLoad() { - Configuration.DB database = null; + Configuration.DB database; List modules = new ArrayList<>(); try {