Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
WorkflowSystemTask.name -> WorkflowSystemTask.taskType
Browse files Browse the repository at this point in the history
  • Loading branch information
aravindanr committed Apr 12, 2021
1 parent 5108a4b commit 38379b8
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.conductor.core.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class HttpTask extends WorkflowSystemTask {
protected RestTemplateProvider restTemplateProvider;
private final String requestParameter;

@Autowired
public HttpTask(RestTemplateProvider restTemplateProvider,
ObjectMapper objectMapper) {
this(TASK_TYPE_HTTP, restTemplateProvider, objectMapper);
Expand All @@ -74,7 +76,7 @@ public HttpTask(String name,
this.restTemplateProvider = restTemplateProvider;
this.objectMapper = objectMapper;
this.requestParameter = REQUEST_PARAMETER_NAME;
LOGGER.info("{} initialized...", getName());
LOGGER.info("{} initialized...", getTaskType());
}

@Override
Expand Down Expand Up @@ -125,10 +127,10 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
}

} catch (Exception e) {
LOGGER.error("Failed to invoke {} task: {} - uri: {}, vipAddress: {} in workflow: {}", getName(), task.getTaskId(),
LOGGER.error("Failed to invoke {} task: {} - uri: {}, vipAddress: {} in workflow: {}", getTaskType(), task.getTaskId(),
input.getUri(), input.getVipAddress(), task.getWorkflowInstanceId(), e);
task.setStatus(Status.FAILED);
task.setReasonForIncompletion("Failed to invoke " + getName() + " task due to: " + e.toString());
task.setReasonForIncompletion("Failed to invoke " + getTaskType() + " task due to: " + e);
task.getOutputData().put("response", e.toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ List<String> cancelNonTerminalTasks(Workflow workflow) {
} catch (Exception e) {
erroredTasks.add(task.getReferenceTaskName());
LOGGER.error("Error canceling system task:{}/{} in workflow: {}",
workflowSystemTask.getName(), task.getTaskId(), workflow.getWorkflowId(), e);
workflowSystemTask.getTaskType(), task.getTaskId(), workflow.getWorkflowId(), e);
}
}
executionDAOFacade.updateTask(task);
Expand Down Expand Up @@ -1394,7 +1394,7 @@ public void executeSystemTask(WorkflowSystemTask systemTask, String taskId, long
}

if (workflow.getStatus().isTerminal()) {
LOGGER.info("Workflow {} has been completed for {}/{}", workflow.getWorkflowId(), systemTask.getName(),
LOGGER.info("Workflow {} has been completed for {}/{}", workflow.getWorkflowId(), systemTask.getTaskType(),
task.getTaskId());
if (!task.getStatus().isTerminal()) {
task.setStatus(CANCELED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class SystemTaskRegistry {
private final Map<String, WorkflowSystemTask> registry;

public SystemTaskRegistry(Set<WorkflowSystemTask> tasks) {
this.registry = tasks.stream().collect(Collectors.toMap(WorkflowSystemTask::getName, Function.identity()));
this.registry = tasks.stream().collect(Collectors.toMap(WorkflowSystemTask::getTaskType, Function.identity()));
}

public WorkflowSystemTask get(String taskType) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.tasks;

Expand All @@ -20,6 +20,14 @@
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.service.ExecutionService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -30,13 +38,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Component
Expand Down Expand Up @@ -89,9 +90,9 @@ public void initSystemTaskExecutor() {
}

private void add(WorkflowSystemTask systemTask) {
LOGGER.info("Adding the queue for system task: {}", systemTask.getName());
taskNameWorkflowTaskMapping.put(systemTask.getName(), systemTask);
queue.add(systemTask.getName());
LOGGER.info("Adding the queue for system task: {}", systemTask.getTaskType());
taskNameWorkflowTaskMapping.put(systemTask.getTaskType(), systemTask);
queue.add(systemTask.getTaskType());
}

private void listen() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

public abstract class WorkflowSystemTask {

private final String name;
private final String taskType;

public WorkflowSystemTask(String name) {
this.name = name;
public WorkflowSystemTask(String taskType) {
this.taskType = taskType;
}

/**
Expand Down Expand Up @@ -55,8 +55,7 @@ public boolean execute(Workflow workflow, Task task, WorkflowExecutor workflowEx
* @param task Instance of the Task
* @param workflowExecutor Workflow Executor
*/
public void cancel(Workflow workflow, Task task, WorkflowExecutor workflowExecutor) {
}
public void cancel(Workflow workflow, Task task, WorkflowExecutor workflowExecutor) {}

/**
* @return True if the task is supposed to be started asynchronously using internal queues.
Expand All @@ -83,12 +82,12 @@ public boolean isAsyncComplete(Task task) {
/**
* @return name of the system task
*/
public String getName() {
return name;
public String getTaskType() {
return taskType;
}

@Override
public String toString() {
return name;
return taskType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ public void init() {
.stream()
.filter(WorkflowSystemTask::isAsync)
.forEach(workflowSystemTask -> {
long size = queueDAO.getSize(workflowSystemTask.getName());
long inProgressCount = executionDAOFacade.getInProgressTaskCount(workflowSystemTask.getName());
Monitors.recordQueueDepth(workflowSystemTask.getName(), size, "system");
Monitors.recordTaskInProgress(workflowSystemTask.getName(), inProgressCount, "system");
long size = queueDAO.getSize(workflowSystemTask.getTaskType());
long inProgressCount = executionDAOFacade.getInProgressTaskCount(workflowSystemTask.getTaskType());
Monitors.recordQueueDepth(workflowSystemTask.getTaskType(), size, "system");
Monitors.recordTaskInProgress(workflowSystemTask.getTaskType(), inProgressCount, "system");
});

refreshCounter--;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,44 @@
/*
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.tasks;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.service.ExecutionService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings("UnstableApiUsage")
public class TestSystemTaskExecutor {
Expand Down Expand Up @@ -248,7 +249,7 @@ private void shutdownExecutorService(ExecutorService executorService) {

private void createTaskMapping() {
WorkflowSystemTask mockWorkflowTask = mock(WorkflowSystemTask.class);
when(mockWorkflowTask.getName()).thenReturn(TEST_TASK);
when(mockWorkflowTask.getTaskType()).thenReturn(TEST_TASK);
when(mockWorkflowTask.isAsync()).thenReturn(true);
SystemTaskWorkerCoordinator.taskNameWorkflowTaskMapping.put(TEST_TASK, mockWorkflowTask);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testIsFromCoordinatorExecutionNameSpace() {

private void createTaskMapping() {
WorkflowSystemTask mockWorkflowTask = mock(WorkflowSystemTask.class);
when(mockWorkflowTask.getName()).thenReturn(TEST_QUEUE);
when(mockWorkflowTask.getTaskType()).thenReturn(TEST_QUEUE);
when(mockWorkflowTask.isAsync()).thenReturn(true);
SystemTaskWorkerCoordinator.taskNameWorkflowTaskMapping.put(TEST_QUEUE, mockWorkflowTask);
}
Expand Down

0 comments on commit 38379b8

Please sign in to comment.