From 38379b84d05f8134544dd8a3a0c91f7fd6ad73ef Mon Sep 17 00:00:00 2001 From: Aravindan Ramkumar <1028385+aravindanr@users.noreply.github.com> Date: Fri, 9 Apr 2021 14:53:24 -0700 Subject: [PATCH] WorkflowSystemTask.name -> WorkflowSystemTask.taskType --- .../contribs/tasks/http/HttpTask.java | 8 +-- .../core/execution/WorkflowExecutor.java | 4 +- .../execution/tasks/SystemTaskRegistry.java | 2 +- .../tasks/SystemTaskWorkerCoordinator.java | 41 +++++++-------- .../execution/tasks/WorkflowSystemTask.java | 15 +++--- .../conductor/metrics/WorkflowMonitor.java | 8 +-- .../tasks/TestSystemTaskExecutor.java | 51 ++++++++++--------- .../TestSystemTaskWorkerCoordinator.java | 2 +- 8 files changed, 67 insertions(+), 64 deletions(-) diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/HttpTask.java b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/HttpTask.java index db4c967cd0..5c0709be7c 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/HttpTask.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/HttpTask.java @@ -23,6 +23,7 @@ import com.netflix.conductor.core.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -62,6 +63,7 @@ public class HttpTask extends WorkflowSystemTask { protected RestTemplateProvider restTemplateProvider; private final String requestParameter; + @Autowired public HttpTask(RestTemplateProvider restTemplateProvider, ObjectMapper objectMapper) { this(TASK_TYPE_HTTP, restTemplateProvider, objectMapper); @@ -74,7 +76,7 @@ public HttpTask(String name, this.restTemplateProvider = restTemplateProvider; this.objectMapper = objectMapper; this.requestParameter = REQUEST_PARAMETER_NAME; - LOGGER.info("{} initialized...", getName()); + LOGGER.info("{} initialized...", getTaskType()); } @Override @@ -125,10 +127,10 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) { } } catch (Exception e) { - LOGGER.error("Failed to invoke {} task: {} - uri: {}, vipAddress: {} in workflow: {}", getName(), task.getTaskId(), + LOGGER.error("Failed to invoke {} task: {} - uri: {}, vipAddress: {} in workflow: {}", getTaskType(), task.getTaskId(), input.getUri(), input.getVipAddress(), task.getWorkflowInstanceId(), e); task.setStatus(Status.FAILED); - task.setReasonForIncompletion("Failed to invoke " + getName() + " task due to: " + e.toString()); + task.setReasonForIncompletion("Failed to invoke " + getTaskType() + " task due to: " + e); task.getOutputData().put("response", e.toString()); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 48399ce86d..bf4c0db7f8 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -1228,7 +1228,7 @@ List cancelNonTerminalTasks(Workflow workflow) { } catch (Exception e) { erroredTasks.add(task.getReferenceTaskName()); LOGGER.error("Error canceling system task:{}/{} in workflow: {}", - workflowSystemTask.getName(), task.getTaskId(), workflow.getWorkflowId(), e); + workflowSystemTask.getTaskType(), task.getTaskId(), workflow.getWorkflowId(), e); } } executionDAOFacade.updateTask(task); @@ -1394,7 +1394,7 @@ public void executeSystemTask(WorkflowSystemTask systemTask, String taskId, long } if (workflow.getStatus().isTerminal()) { - LOGGER.info("Workflow {} has been completed for {}/{}", workflow.getWorkflowId(), systemTask.getName(), + LOGGER.info("Workflow {} has been completed for {}/{}", workflow.getWorkflowId(), systemTask.getTaskType(), task.getTaskId()); if (!task.getStatus().isTerminal()) { task.setStatus(CANCELED); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskRegistry.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskRegistry.java index 37d72733a0..34c24dc5d4 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskRegistry.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskRegistry.java @@ -33,7 +33,7 @@ public class SystemTaskRegistry { private final Map registry; public SystemTaskRegistry(Set tasks) { - this.registry = tasks.stream().collect(Collectors.toMap(WorkflowSystemTask::getName, Function.identity())); + this.registry = tasks.stream().collect(Collectors.toMap(WorkflowSystemTask::getTaskType, Function.identity())); } public WorkflowSystemTask get(String taskType) { diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java index efa5c75a26..dd5349e57a 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java @@ -1,14 +1,14 @@ /* - * Copyright 2020 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Copyright 2021 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package com.netflix.conductor.core.execution.tasks; @@ -20,6 +20,14 @@ import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.service.ExecutionService; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + import java.util.HashSet; import java.util.List; import java.util.Map; @@ -30,13 +38,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Component; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Component @@ -89,9 +90,9 @@ public void initSystemTaskExecutor() { } private void add(WorkflowSystemTask systemTask) { - LOGGER.info("Adding the queue for system task: {}", systemTask.getName()); - taskNameWorkflowTaskMapping.put(systemTask.getName(), systemTask); - queue.add(systemTask.getName()); + LOGGER.info("Adding the queue for system task: {}", systemTask.getTaskType()); + taskNameWorkflowTaskMapping.put(systemTask.getTaskType(), systemTask); + queue.add(systemTask.getTaskType()); } private void listen() { diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java index ee7531dc00..45ce129fe2 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java @@ -21,10 +21,10 @@ public abstract class WorkflowSystemTask { - private final String name; + private final String taskType; - public WorkflowSystemTask(String name) { - this.name = name; + public WorkflowSystemTask(String taskType) { + this.taskType = taskType; } /** @@ -55,8 +55,7 @@ public boolean execute(Workflow workflow, Task task, WorkflowExecutor workflowEx * @param task Instance of the Task * @param workflowExecutor Workflow Executor */ - public void cancel(Workflow workflow, Task task, WorkflowExecutor workflowExecutor) { - } + public void cancel(Workflow workflow, Task task, WorkflowExecutor workflowExecutor) {} /** * @return True if the task is supposed to be started asynchronously using internal queues. @@ -83,12 +82,12 @@ public boolean isAsyncComplete(Task task) { /** * @return name of the system task */ - public String getName() { - return name; + public String getTaskType() { + return taskType; } @Override public String toString() { - return name; + return taskType; } } diff --git a/core/src/main/java/com/netflix/conductor/metrics/WorkflowMonitor.java b/core/src/main/java/com/netflix/conductor/metrics/WorkflowMonitor.java index 338d9def36..9328c2325b 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/WorkflowMonitor.java +++ b/core/src/main/java/com/netflix/conductor/metrics/WorkflowMonitor.java @@ -93,10 +93,10 @@ public void init() { .stream() .filter(WorkflowSystemTask::isAsync) .forEach(workflowSystemTask -> { - long size = queueDAO.getSize(workflowSystemTask.getName()); - long inProgressCount = executionDAOFacade.getInProgressTaskCount(workflowSystemTask.getName()); - Monitors.recordQueueDepth(workflowSystemTask.getName(), size, "system"); - Monitors.recordTaskInProgress(workflowSystemTask.getName(), inProgressCount, "system"); + long size = queueDAO.getSize(workflowSystemTask.getTaskType()); + long inProgressCount = executionDAOFacade.getInProgressTaskCount(workflowSystemTask.getTaskType()); + Monitors.recordQueueDepth(workflowSystemTask.getTaskType(), size, "system"); + Monitors.recordTaskInProgress(workflowSystemTask.getTaskType(), inProgressCount, "system"); }); refreshCounter--; diff --git a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskExecutor.java index 669dc8731a..28be6c8f82 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskExecutor.java @@ -1,32 +1,27 @@ /* - * Copyright 2020 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Copyright 2021 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package com.netflix.conductor.core.execution.tasks; -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import com.google.common.util.concurrent.Uninterruptibles; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.service.ExecutionService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + import java.time.Duration; import java.util.Collections; import java.util.concurrent.CountDownLatch; @@ -34,10 +29,16 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @SuppressWarnings("UnstableApiUsage") public class TestSystemTaskExecutor { @@ -248,7 +249,7 @@ private void shutdownExecutorService(ExecutorService executorService) { private void createTaskMapping() { WorkflowSystemTask mockWorkflowTask = mock(WorkflowSystemTask.class); - when(mockWorkflowTask.getName()).thenReturn(TEST_TASK); + when(mockWorkflowTask.getTaskType()).thenReturn(TEST_TASK); when(mockWorkflowTask.isAsync()).thenReturn(true); SystemTaskWorkerCoordinator.taskNameWorkflowTaskMapping.put(TEST_TASK, mockWorkflowTask); } diff --git a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskWorkerCoordinator.java b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskWorkerCoordinator.java index 7fc6c0d90f..4a78f1a66b 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskWorkerCoordinator.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskWorkerCoordinator.java @@ -76,7 +76,7 @@ public void testIsFromCoordinatorExecutionNameSpace() { private void createTaskMapping() { WorkflowSystemTask mockWorkflowTask = mock(WorkflowSystemTask.class); - when(mockWorkflowTask.getName()).thenReturn(TEST_QUEUE); + when(mockWorkflowTask.getTaskType()).thenReturn(TEST_QUEUE); when(mockWorkflowTask.isAsync()).thenReturn(true); SystemTaskWorkerCoordinator.taskNameWorkflowTaskMapping.put(TEST_QUEUE, mockWorkflowTask); }