Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
SystemTaskRegistry builds the registry using task name instead of bea…
Browse files Browse the repository at this point in the history
…n name.
  • Loading branch information
aravindanr committed Apr 12, 2021
1 parent cb8cac3 commit 5108a4b
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* A container class that holds a mapping of system task types {@link com.netflix.conductor.common.metadata.tasks.TaskType} to
Expand All @@ -28,14 +32,8 @@ public class SystemTaskRegistry {

private final Map<String, WorkflowSystemTask> registry;

/**
* Spring creates a map of bean names to {@link WorkflowSystemTask} instances and injects it.
* <p>
* NOTE: It is important the {@link WorkflowSystemTask} instances are "qualified" with their respective
* {@link com.netflix.conductor.common.metadata.tasks.TaskType}.
*/
public SystemTaskRegistry(Map<String, WorkflowSystemTask> registry) {
this.registry = registry;
public SystemTaskRegistry(Set<WorkflowSystemTask> tasks) {
this.registry = tasks.stream().collect(Collectors.toMap(WorkflowSystemTask::getName, Function.identity()));
}

public WorkflowSystemTask get(String taskType) {
Expand All @@ -48,6 +46,6 @@ public boolean isSystemTask(String taskType) {
}

public Collection<WorkflowSystemTask> all() {
return registry.values();
return Collections.unmodifiableCollection(registry.values());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.netflix.conductor.common.metadata.tasks.TaskType.DECISION;
import static com.netflix.conductor.common.metadata.tasks.TaskType.DYNAMIC;
Expand Down Expand Up @@ -111,8 +112,8 @@ public Join join() {
}

@Bean
public SystemTaskRegistry systemTaskRegistry(Map<String, WorkflowSystemTask> beanFactory) {
return new SystemTaskRegistry(beanFactory);
public SystemTaskRegistry systemTaskRegistry(Set<WorkflowSystemTask> tasks) {
return new SystemTaskRegistry(tasks);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -165,8 +166,8 @@ public WorkflowSystemTask http2() {
}

@Bean
public SystemTaskRegistry systemTaskRegistry(Map<String, WorkflowSystemTask> registry) {
return new SystemTaskRegistry(registry);
public SystemTaskRegistry systemTaskRegistry(Set<WorkflowSystemTask> tasks) {
return new SystemTaskRegistry(tasks);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.Test;

import java.time.Duration;
import java.util.Collections;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -52,23 +53,23 @@ public void setUp() {
public void isSystemTask() {
createTaskMapping();
SystemTaskWorkerCoordinator systemTaskWorkerCoordinator = new SystemTaskWorkerCoordinator(queueDAO,
workflowExecutor, properties, executionService, null);
workflowExecutor, properties, executionService, Collections.emptyList());
assertTrue(systemTaskWorkerCoordinator.isAsyncSystemTask(TEST_QUEUE + ISOLATION_CONSTANT));
}

@Test
public void isSystemTaskNotPresent() {
createTaskMapping();
SystemTaskWorkerCoordinator systemTaskWorkerCoordinator = new SystemTaskWorkerCoordinator(queueDAO,
workflowExecutor, properties, executionService, null);
workflowExecutor, properties, executionService, Collections.emptyList());
assertFalse(systemTaskWorkerCoordinator.isAsyncSystemTask(null));
}

@Test
public void testIsFromCoordinatorExecutionNameSpace() {
doReturn("exeNS").when(properties).getSystemTaskWorkerExecutionNamespace();
SystemTaskWorkerCoordinator systemTaskWorkerCoordinator = new SystemTaskWorkerCoordinator(queueDAO,
workflowExecutor, properties, executionService, null);
workflowExecutor, properties, executionService, Collections.emptyList());
assertTrue(
systemTaskWorkerCoordinator.isFromCoordinatorExecutionNameSpace(TEST_QUEUE + EXECUTION_NAMESPACE_CONSTANT));
}
Expand All @@ -77,6 +78,6 @@ private void createTaskMapping() {
WorkflowSystemTask mockWorkflowTask = mock(WorkflowSystemTask.class);
when(mockWorkflowTask.getName()).thenReturn(TEST_QUEUE);
when(mockWorkflowTask.isAsync()).thenReturn(true);
// SystemTaskWorkerCoordinator.taskNameWorkflowTaskMapping.put(TEST_QUEUE, mockWorkflowTask);
SystemTaskWorkerCoordinator.taskNameWorkflowTaskMapping.put(TEST_QUEUE, mockWorkflowTask);
}
}

0 comments on commit 5108a4b

Please sign in to comment.