Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime-v2: fix exit from parallel loop #830

Merged
merged 4 commits into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public void testDecryptInvalidString() throws Exception {
// ---

pir = waitForCompletion(processApi, spr.getInstanceId());
assertEquals(ProcessEntry.StatusEnum.FAILED, pir.getStatus());
assertEquals(ProcessEntry.StatusEnum.FAILED, pir.getStatus(), "Process logs: " + new String(getLog(pir.getLogFileName())));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class DefaultSynchronizationService implements SynchronizationService {

private final List<Runnable> callbacks = new ArrayList<>();

private boolean stop;

@Override
public boolean hasPoint() {
synchronized (this) {
Expand All @@ -43,6 +45,7 @@ public void maintain() {
}

callbacks.clear();
stop = false;
}

@Override
Expand All @@ -51,4 +54,14 @@ public void point(Runnable callback) {
callbacks.add(callback);
}
}

@Override
public void stop() {
this.stop = true;
}

@Override
public boolean hasStop() {
return stop;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ public interface SynchronizationService {
void maintain();

void point(Runnable callback);

void stop();

boolean hasStop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.walmartlabs.concord.svm.Runtime;
import com.walmartlabs.concord.svm.*;

import java.util.Map;

public class SynchronizationServiceListener implements ExecutionListener {

private final SynchronizationService delegate;
Expand All @@ -33,7 +35,7 @@ public SynchronizationServiceListener(SynchronizationService delegate) {

@Override
public Result afterCommand(Runtime runtime, VM vm, State state, ThreadId threadId, Command cmd) {
if (delegate.hasPoint()) {
if (delegate.hasPoint() || delegate.hasStop()) {
state.setStatus(threadId, ThreadStatus.SUSPENDED);
return Result.BREAK;
}
Expand All @@ -43,6 +45,13 @@ public Result afterCommand(Runtime runtime, VM vm, State state, ThreadId threadI

@Override
public Result afterEval(Runtime runtime, VM vm, State state) {
if (delegate.hasStop()) {
for (Map.Entry<ThreadId, ThreadStatus> e : state.threadStatus().entrySet()) {
state.setStatus(e.getKey(), ThreadStatus.DONE);
}
return Result.BREAK;
}

if (!delegate.hasPoint()) {
return Result.BREAK;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/

import com.walmartlabs.concord.runtime.v2.model.ExitStep;
import com.walmartlabs.concord.runtime.v2.runner.SynchronizationService;
import com.walmartlabs.concord.svm.Runtime;
import com.walmartlabs.concord.svm.State;
import com.walmartlabs.concord.svm.ThreadId;
Expand All @@ -35,6 +36,6 @@ public ExitCommand(ExitStep step) {

@Override
protected void execute(Runtime runtime, State state, ThreadId threadId) {
state.dropAllFrames();
runtime.getService(SynchronizationService.class).stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,43 @@ public void testUuidFunc() throws Exception {
assertLog(log, ".*uuid: [0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.*");
}

@Test
public void testExitFromParallelLoop() throws Exception {
deploy("parallelLoopExit");

save(ProcessConfiguration.builder()
.build());

byte[] log = run();

assertNoLog(log, ".*should not reach here.*");

// thread in loop should execute at least one step
assertLog(log, ".*inner start: one.*");
assertLog(log, ".*inner start: two.*");
assertLog(log, ".*inner start: three.*");
}

@Test
public void testExitFromSerialLoop() throws Exception {
deploy("serialLoopExit");

save(ProcessConfiguration.builder()
.build());

byte[] log = run();

assertNoLog(log, ".*should not reach here.*");

assertLog(log, ".*inner start: one.*");
assertLog(log, ".*inner end: one.*");
assertLog(log, ".*inner start: two.*");

assertNoLog(log, ".*inner end: two.*");
assertNoLog(log, ".*inner start: three.*");
assertNoLog(log, ".*inner start: four.*");
}

private void deploy(String resource) throws URISyntaxException, IOException {
Path src = Paths.get(MainTest.class.getResource(resource).toURI());
IOUtils.copy(src, workDir);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
configuration:
debug: true
runtime: concord-v2

flows:
default:
- call: "inner"
loop:
items:
- "one"
- "two"
- "three"
- "four"
mode: parallel

- log: "should not reach here"

inner:
- log: "inner start: ${item}"

- if: ${item == "four"}
then:
- checkpoint: "${item}"

- if: ${item == "two"}
then:
- exit

- if: ${item == "three"}
then:
- expr: ${sleep.ms(1000)}

- log: "inner end: ${item}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
configuration:
debug: true
runtime: concord-v2

flows:
default:
- call: "inner"
loop:
items:
- "one"
- "two"
- "three"
- "four"

- log: "should not reach here"

inner:
- log: "inner start: ${item}"

- if: ${item == "four"}
then:
- checkpoint: "${item}"

- if: ${item == "two"}
then:
- exit

- if: ${item == "three"}
then:
- expr: ${sleep.ms(1000)}

- log: "inner end: ${item}"
Loading