Skip to content
This repository was archived by the owner on Dec 13, 2023. It is now read-only.

Commit 5084409

Browse files
authored
Changes in sub workflow trigger decide on the parent asynchronously (#2125)
Sub workflow updates the parent's task and status, but triggers the decide async by pushing the parent's id to the decider queue.
1 parent 6b57f20 commit 5084409

File tree

53 files changed

+4633
-1078
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+4633
-1078
lines changed

common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java

+17
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,13 @@ public boolean isRetriable() {
199199
@ProtoField(id = 41)
200200
private String subWorkflowId;
201201

202+
/**
203+
* Use to note that a sub workflow associated with SUB_WORKFLOW task
204+
* has an action performed on it directly.
205+
*/
206+
@ProtoField(id = 42)
207+
private boolean subworkflowChanged;
208+
202209
public Task() {
203210
}
204211

@@ -735,6 +742,14 @@ public void setWorkflowPriority(int workflowPriority) {
735742
this.workflowPriority = workflowPriority;
736743
}
737744

745+
public boolean isSubworkflowChanged() {
746+
return subworkflowChanged;
747+
}
748+
749+
public void setSubworkflowChanged(boolean subworkflowChanged) {
750+
this.subworkflowChanged = subworkflowChanged;
751+
}
752+
738753
public String getSubWorkflowId() {
739754
// For backwards compatibility
740755
if (StringUtils.isNotBlank(subWorkflowId)) {
@@ -787,6 +802,7 @@ public Task copy() {
787802
copy.setExecutionNameSpace(executionNameSpace);
788803
copy.setIsolationGroupId(isolationGroupId);
789804
copy.setSubWorkflowId(getSubWorkflowId());
805+
copy.setSubworkflowChanged(subworkflowChanged);
790806

791807
return copy;
792808
}
@@ -855,6 +871,7 @@ public String toString() {
855871
", externalOutputPayloadStoragePath='" + externalOutputPayloadStoragePath + '\'' +
856872
", isolationGroupId='" + isolationGroupId + '\'' +
857873
", executionNameSpace='" + executionNameSpace + '\'' +
874+
", subworkflowChanged='" + subworkflowChanged + '\'' +
858875
'}';
859876
}
860877

common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,19 @@
2020
import com.netflix.conductor.common.constraints.TaskReferenceNameUniqueConstraint;
2121
import com.netflix.conductor.common.metadata.Auditable;
2222
import com.netflix.conductor.common.metadata.tasks.TaskType;
23-
import java.util.HashMap;
24-
import java.util.Iterator;
25-
import java.util.LinkedList;
26-
import java.util.List;
27-
import java.util.Map;
28-
import java.util.Objects;
29-
import java.util.Optional;
23+
3024
import javax.validation.Valid;
3125
import javax.validation.constraints.Email;
3226
import javax.validation.constraints.Max;
3327
import javax.validation.constraints.Min;
3428
import javax.validation.constraints.NotEmpty;
3529
import javax.validation.constraints.NotNull;
30+
import java.util.HashMap;
31+
import java.util.Iterator;
32+
import java.util.LinkedList;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Objects;
3636

3737
@ProtoMessage
3838
@TaskReferenceNameUniqueConstraint
@@ -303,6 +303,10 @@ public static String getKey(String name, int version) {
303303
return name + "." + version;
304304
}
305305

306+
public boolean containsType(String taskType) {
307+
return collectTasks().stream().anyMatch(t -> t.getType().equals(taskType));
308+
}
309+
306310
public WorkflowTask getNextTask(String taskReferenceName) {
307311
WorkflowTask workflowTask = getTaskByRefName(taskReferenceName);
308312
if (workflowTask != null && TaskType.TERMINATE.name().equals(workflowTask.getType())) {

common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class WorkflowTask {
4444
@Deprecated
4545
public enum Type {
4646
SIMPLE, DYNAMIC, FORK_JOIN, FORK_JOIN_DYNAMIC, DECISION, JOIN, SUB_WORKFLOW, EVENT, WAIT, USER_DEFINED;
47-
private static Set<String> systemTasks = new HashSet<>();
47+
private static final Set<String> systemTasks = new HashSet<>();
4848

4949
static {
5050
systemTasks.add(Type.SIMPLE.name());

common/src/main/java/com/netflix/conductor/common/run/Workflow.java

+23-10
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
/*
2-
* Copyright 2020 Netflix, Inc.
3-
* <p>
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5-
* the License. You may obtain a copy of the License at
6-
* <p>
7-
* http://www.apache.org/licenses/LICENSE-2.0
8-
* <p>
9-
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10-
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11-
* specific language governing permissions and limitations under the License.
2+
* Copyright 2021 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
1212
*/
1313
package com.netflix.conductor.common.run;
1414

@@ -18,6 +18,7 @@
1818
import com.netflix.conductor.common.metadata.Auditable;
1919
import com.netflix.conductor.common.metadata.tasks.Task;
2020
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
21+
import org.apache.commons.lang3.StringUtils;
2122

2223
import javax.validation.constraints.Max;
2324
import javax.validation.constraints.Min;
@@ -485,6 +486,10 @@ public void setLastRetriedTime(long lastRetriedTime) {
485486
this.lastRetriedTime = lastRetriedTime;
486487
}
487488

489+
public boolean hasParent() {
490+
return StringUtils.isNotEmpty(parentWorkflowId);
491+
}
492+
488493
public Task getTaskByRefName(String refName) {
489494
if (refName == null) {
490495
throw new RuntimeException(
@@ -541,6 +546,14 @@ public String toString() {
541546
return getWorkflowName() + "." + getWorkflowVersion() + "/" + workflowId + "." + status;
542547
}
543548

549+
/**
550+
* A string representation of all relevant fields that identify this workflow.
551+
* Intended for use in log and other system generated messages.
552+
*/
553+
public String identifierString() {
554+
return String.format("%s.%s/%s", getWorkflowName(), getWorkflowVersion(), workflowId);
555+
}
556+
544557
@Override
545558
public boolean equals(Object o) {
546559
if (this == o) {

common/src/test/java/com/netflix/conductor/common/tasks/TaskTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void testTaskQueueWaitTime() {
9595
public void testDeepCopyTask() {
9696
final Task task = new Task();
9797
// In order to avoid forgetting putting inside the copy method the newly added fields check the number of declared fields.
98-
final int expectedTaskFieldsNumber = 39;
98+
final int expectedTaskFieldsNumber = 40;
9999
final int declaredFieldsNumber = task.getClass().getDeclaredFields().length;
100100

101101
assertEquals(expectedTaskFieldsNumber, declaredFieldsNumber);
@@ -135,6 +135,7 @@ public void testDeepCopyTask() {
135135
task.setReasonForIncompletion("");
136136
task.setWorkerId("");
137137
task.setSubWorkflowId("");
138+
task.setSubworkflowChanged(false);
138139

139140
final Task copy = task.deepCopy();
140141
assertEquals(task, copy);

contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929
import com.rabbitmq.client.DefaultConsumer;
3030
import com.rabbitmq.client.Envelope;
3131
import com.rabbitmq.client.GetResponse;
32+
import org.apache.commons.lang3.StringUtils;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
import rx.Observable;
36+
3237
import java.io.IOException;
3338
import java.util.ArrayList;
3439
import java.util.Arrays;
@@ -43,10 +48,6 @@
4348
import java.util.concurrent.atomic.AtomicBoolean;
4449
import java.util.concurrent.atomic.AtomicInteger;
4550
import java.util.stream.Collectors;
46-
import org.apache.commons.lang3.StringUtils;
47-
import org.slf4j.Logger;
48-
import org.slf4j.LoggerFactory;
49-
import rx.Observable;
5051

5152
/**
5253
* @author Ritu Parathody

contribs/src/test/java/com/netflix/conductor/contribs/metrics/PrometheusMetricsConfigurationTest.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,8 @@
1212
*/
1313
package com.netflix.conductor.contribs.metrics;
1414

15-
import java.lang.reflect.Field;
16-
import java.util.Arrays;
17-
import java.util.List;
18-
import java.util.Optional;
19-
15+
import com.netflix.spectator.api.Meter;
16+
import com.netflix.spectator.api.Spectator;
2017
import org.junit.Assert;
2118
import org.junit.Ignore;
2219
import org.junit.Test;
@@ -25,8 +22,10 @@
2522
import org.springframework.test.context.TestPropertySource;
2623
import org.springframework.test.context.junit4.SpringRunner;
2724

28-
import com.netflix.spectator.api.Meter;
29-
import com.netflix.spectator.api.Spectator;
25+
import java.lang.reflect.Field;
26+
import java.util.Arrays;
27+
import java.util.List;
28+
import java.util.Optional;
3029

3130
@RunWith(SpringRunner.class)
3231
@TestPropertySource(properties = {"conductor.metrics-prometheus.enabled=true"})

contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/QueueManagerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
import com.netflix.conductor.common.metadata.tasks.Task;
1919
import com.netflix.conductor.common.metadata.tasks.Task.Status;
2020
import com.netflix.conductor.common.run.Workflow;
21-
import com.netflix.conductor.core.events.queue.QueueManager;
2221
import com.netflix.conductor.core.events.queue.Message;
2322
import com.netflix.conductor.core.events.queue.ObservableQueue;
23+
import com.netflix.conductor.core.events.queue.QueueManager;
2424
import com.netflix.conductor.core.execution.tasks.Wait;
2525
import com.netflix.conductor.service.ExecutionService;
2626
import org.junit.Before;

core/src/main/java/com/netflix/conductor/core/Lifecycle.java

-45
This file was deleted.

core/src/main/java/com/netflix/conductor/core/LifecycleAwareComponent.java

+19-11
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,38 @@
11
/*
2-
* Copyright 2021 Netflix, Inc.
3-
* <p>
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5-
* the License. You may obtain a copy of the License at
6-
* <p>
7-
* http://www.apache.org/licenses/LICENSE-2.0
8-
* <p>
9-
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10-
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11-
* specific language governing permissions and limitations under the License.
2+
*
3+
* * Copyright 2021 Netflix, Inc.
4+
* * <p>
5+
* * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
6+
* * the License. You may obtain a copy of the License at
7+
* * <p>
8+
* * http://www.apache.org/licenses/LICENSE-2.0
9+
* * <p>
10+
* * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11+
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12+
* * specific language governing permissions and limitations under the License.
13+
*
1214
*/
1315
package com.netflix.conductor.core;
1416

17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
1519
import org.springframework.context.SmartLifecycle;
1620

17-
public class LifecycleAwareComponent implements SmartLifecycle {
21+
public abstract class LifecycleAwareComponent implements SmartLifecycle {
1822
private volatile boolean running = false;
1923

24+
private static final Logger LOGGER = LoggerFactory.getLogger(LifecycleAwareComponent.class);
25+
2026
@Override
2127
public void start() {
2228
running = true;
29+
LOGGER.info("{} started.", getClass().getSimpleName());
2330
}
2431

2532
@Override
2633
public void stop() {
2734
running = false;
35+
LOGGER.info("{} stopped.", getClass().getSimpleName());
2836
}
2937

3038
@Override

0 commit comments

Comments
 (0)