Skip to content

Commit

Permalink
Add two SEL functions to support getting a specific param from a subw…
Browse files Browse the repository at this point in the history
…orkflow and getting a step's end time. (#71)
  • Loading branch information
jun-he authored Aug 9, 2024
1 parent b8c79f7 commit 42abcda
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ public static WorkflowVersion of(String version) {
/** Step instance status param name key used in SEL to retrieve the step status. */
public static final String STEP_STATUS_PARAM = "MAESTRO_STEP_STATUS";

/** Step instance end time param name key used in SEL to retrieve the step end time. */
public static final String STEP_END_TIME_PARAM = "MAESTRO_STEP_END_TIME";

/**
* Match all instances. Special value to denote a breakpoint which is set to match all wf
* instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"step_instance_id": 123,
"step_attempt_id": 2,
"step_instance_uuid": "bar",
"type": "subworkflow",
"step_run_params": {
"foo": {
"value": "bar",
Expand Down Expand Up @@ -58,20 +59,6 @@
"total_step_count": 1
},
"type": "SUBWORKFLOW"
},
"maestro_foreach": {
"foreach_workflow_id": "inline-wf",
"foreach_identity": "foo",
"total_loop_count": 10,
"next_loop_index": 0,
"foreach_overview": {
"checkpoint": 6,
"stats": {
"CREATED": 5,
"SUCCEEDED": 1
}
},
"type": "FOREACH"
}
},
"dependencies": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
{
"step_id": "foo",
"step_name": "step1",
"step_instance_id": 123,
"step_attempt_id": 2,
"step_instance_uuid": "bar",
"type": "foreach",
"step_run_params": {
"foo": {
"value": "bar",
"type": "STRING",
"mode": "mutable"
}
},
"params": {
"param1": {
"value": "foo",
"type": "STRING",
"evaluated_result": "foo",
"evaluated_time": 1608171805392
}
},
"synced": true,
"runtime_state": {
"status": "SUCCEEDED",
"end_time": 1608171805401,
"modify_time": 1608171805401
},
"timeline": [
{
"timestamp": 1609272999666,
"type": "LOG",
"level": "INFO",
"message": "hello"
}
],
"pending_records": [
{
"event_time": 1608171805401,
"new_status": "SUCCEEDED",
"old_status": "NOT_CREATED"
}
],
"artifacts": {
"artifact1": {
"value": 1,
"foo": "bar"
},
"maestro_foreach": {
"foreach_workflow_id": "inline-wf",
"foreach_identity": "foo",
"total_loop_count": 10,
"next_loop_index": 0,
"foreach_overview": {
"checkpoint": 6,
"stats": {
"CREATED": 5,
"SUCCEEDED": 1
}
},
"type": "FOREACH"
}
},
"dependencies": {
"SIGNAL": {
"type": "SIGNAL",
"statuses": [
{
"params": {
"value": {
"name": {
"value": "signal/a/1",
"type": "STRING"
},
"_step_dependency_sub_type": {
"value": "input_signal",
"type": "STRING"
},
"foo": {
"parameter": {
"value": "bar",
"type": "STRING"
},
"operator": "=",
"type": "SIGNAL"
}
},
"type": "MAP",
"evaluated_result": {
"name": "signal/a/1",
"foo": "bar"
},
"evaluated_time": 1617728847801
},
"signal_reference": {
"signal_instance_id": "fake_signal_id",
"timestamp": 1617728849086
},
"status": "MATCHED"
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
import com.netflix.maestro.engine.utils.StepHelper;
import com.netflix.maestro.engine.validations.DryRunValidator;
import com.netflix.maestro.exceptions.MaestroInternalError;
import com.netflix.maestro.exceptions.MaestroInvalidExpressionException;
import com.netflix.maestro.exceptions.MaestroValidationException;
import com.netflix.maestro.models.Constants;
import com.netflix.maestro.models.artifact.Artifact;
import com.netflix.maestro.models.artifact.ForeachArtifact;
import com.netflix.maestro.models.artifact.SubworkflowArtifact;
import com.netflix.maestro.models.definition.StepType;
import com.netflix.maestro.models.initiator.Initiator;
import com.netflix.maestro.models.initiator.ManualInitiator;
import com.netflix.maestro.models.initiator.SignalInitiator;
import com.netflix.maestro.models.instance.StepInstance;
import com.netflix.maestro.models.parameter.ParamType;
import com.netflix.maestro.models.parameter.Parameter;
import com.netflix.maestro.utils.Checks;
Expand Down Expand Up @@ -72,6 +75,9 @@ public class MaestroParamExtension extends AbstractParamExtension {
/** Function to get the signal metadata for return default value if not found. */
static final String GET_FROM_SIGNAL_OR_DEFAULT = "getFromSignalOrDefault";

/** Function to get the data from the subworkflow instance. */
static final String GET_FROM_SUBWORKFLOW = "getFromSubworkflow";

/** Function to get the foreach metadata. */
static final String GET_FROM_FOREACH = "getFromForeach";

Expand Down Expand Up @@ -132,6 +138,8 @@ protected Object callWithThreeArgs(String methodName, String arg1, String arg2,
throw new IllegalStateException(
methodName + " must return an array instead of " + res.getClass());
}
} else if (GET_FROM_SUBWORKFLOW.equals(methodName)) {
return getFromSubworkflow(arg1, arg2, arg3);
} else if (GET_FROM_SIGNAL_OR_DEFAULT.equals(methodName)) {
return getFromSignalOrDefault(arg1, arg2, arg3);
}
Expand Down Expand Up @@ -167,6 +175,12 @@ private Object fromStep(String stepId, String paramName) {
StepRuntimeSummary runtimeSummary = validateAndGet(stepId);
if (Constants.STEP_STATUS_PARAM.equals(paramName)) {
return runtimeSummary.getRuntimeState().getStatus().name();
} else if (Constants.STEP_END_TIME_PARAM.equals(paramName)) {
return Checks.notNull(
runtimeSummary.getRuntimeState().getEndTime(),
"ERROR: step [%s]'s [%s] is not set yet.",
stepId,
paramName);
}
Parameter stepParam =
Checks.notNull(
Expand Down Expand Up @@ -365,6 +379,52 @@ private Object fromForeach(String foreachStepId, String stepId, String paramName
}
}

Object getFromSubworkflow(String subworkflowStepId, String stepId, String paramName) {
try {
return executor
.submit(() -> fromSubworkflow(subworkflowStepId, stepId, paramName))
.get(TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new MaestroInternalError(
e,
"getFromSubworkflow throws an exception for subworkflowStepId=[%s], stepId=[%s], paramName=[%s]",
subworkflowStepId,
stepId,
paramName);
}
}

private Object fromSubworkflow(String subworkflowStepId, String stepId, String paramName) {
StepRuntimeSummary runtimeSummary = validateAndGet(subworkflowStepId);
Checks.checkTrue(
runtimeSummary.getType() == StepType.SUBWORKFLOW,
"step [%s] is a type of [%s] instead of subworkflow step, cannot call getFromSubworkflow",
subworkflowStepId,
runtimeSummary.getType());

SubworkflowArtifact artifact =
Checks.notNull(
runtimeSummary.getArtifacts().get(Artifact.Type.SUBWORKFLOW.key()),
"Cannot load param [%s] of step [%s] from subworkflow [%s] as it is not initialized",
paramName,
stepId,
subworkflowStepId)
.asSubworkflow();

StepInstance stepInstance =
stepInstanceDao.getStepInstance(
artifact.getSubworkflowId(),
artifact.getSubworkflowInstanceId(),
artifact.getSubworkflowRunId(),
stepId,
Constants.LATEST_INSTANCE_RUN);

if (stepInstance.getParams() == null || !stepInstance.getParams().containsKey(paramName)) {
throw new MaestroInvalidExpressionException("Cannot find the param name: [%s]", paramName);
}
return stepInstance.getParams().get(paramName).getEvaluatedResult();
}

Long nextUniqueId() {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(RANDOM_JITTER_DELAY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.when;

import com.netflix.maestro.AssertHelper;
Expand All @@ -32,6 +33,7 @@
import com.netflix.maestro.models.initiator.ManualInitiator;
import com.netflix.maestro.models.initiator.SignalInitiator;
import com.netflix.maestro.models.initiator.SubworkflowInitiator;
import com.netflix.maestro.models.instance.StepInstance;
import com.netflix.maestro.models.parameter.MapParameter;
import com.netflix.maestro.models.parameter.ParamType;
import com.netflix.maestro.models.parameter.Parameter;
Expand All @@ -51,6 +53,10 @@
public class MaestroParamExtensionTest extends MaestroEngineBaseTest {
private static final String TEST_STEP_RUNTIME_SUMMARY =
"fixtures/execution/sample-step-runtime-summary-2.json";
private static final String TEST_SUBWORKFLOW_STEP_RUNTIME_SUMMARY =
"fixtures/execution/sample-step-runtime-summary-1.json";
private static final String TEST_STEP_INSTANCE =
"fixtures/instances/sample-step-instance-succeeded.json";

@Mock MaestroStepInstanceDao stepInstanceDao;
@Mock InstanceWrapper instanceWrapper;
Expand Down Expand Up @@ -78,6 +84,8 @@ public void testGetFromStep() throws Exception {
.thenReturn(Collections.singletonMap("maestro_step_runtime_summary", summary));
assertEquals("foo", paramExtension.getFromStep("step1", "param1"));
assertEquals("SUCCEEDED", paramExtension.getFromStep("step1", Constants.STEP_STATUS_PARAM));
assertEquals(
1608171805401L, paramExtension.getFromStep("step1", Constants.STEP_END_TIME_PARAM));
}

@Test
Expand Down Expand Up @@ -134,6 +142,19 @@ public void testGetFromForeach() throws Exception {
assertArrayEquals(new long[] {12, 0, 0, 0, 0, 0}, res);
}

@Test
public void testGetFromSubworkflow() throws Exception {
StepRuntimeSummary summary =
loadObject(TEST_SUBWORKFLOW_STEP_RUNTIME_SUMMARY, StepRuntimeSummary.class);
when(allStepOutputData.get("foo"))
.thenReturn(Collections.singletonMap("maestro_step_runtime_summary", summary));
StepInstance stepInSubworkflow = loadObject(TEST_STEP_INSTANCE, StepInstance.class);
when(stepInstanceDao.getStepInstance(any(), anyLong(), anyLong(), any(), any()))
.thenReturn(stepInSubworkflow);
long res = (Long) paramExtension.getFromSubworkflow("foo", "job1", "sleep_seconds");
assertEquals(15, res);
}

@Test
public void testInvalidGetFromStep() throws Exception {
StepRuntimeSummary summary = loadObject(TEST_STEP_RUNTIME_SUMMARY, StepRuntimeSummary.class);
Expand Down Expand Up @@ -264,6 +285,36 @@ public void testInvalidGetFromForeach() throws Exception {
() -> paramExtension.getFromForeach("foreach-job", "job1", "sleep_seconds"));
}

@Test
public void testInvalidGetFromSubworkflow() throws Exception {
AssertHelper.assertThrows(
"Cannot find the referenced step id",
MaestroInternalError.class,
"getFromSubworkflow throws an exception",
() -> paramExtension.getFromSubworkflow("non-existing-job", "job1", "sleep_seconds"));

StepRuntimeSummary summary = loadObject(TEST_STEP_RUNTIME_SUMMARY, StepRuntimeSummary.class);
when(allStepOutputData.get("foo"))
.thenReturn(Collections.singletonMap("maestro_step_runtime_summary", summary));
AssertHelper.assertThrows(
"step type is not subworkflow",
MaestroInternalError.class,
"getFromSubworkflow throws an exception",
() -> paramExtension.getFromSubworkflow("foo", "job1", "sleep_seconds"));

summary = loadObject(TEST_SUBWORKFLOW_STEP_RUNTIME_SUMMARY, StepRuntimeSummary.class);
when(allStepOutputData.get("foo"))
.thenReturn(Collections.singletonMap("maestro_step_runtime_summary", summary));
StepInstance stepInSubworkflow = loadObject(TEST_STEP_INSTANCE, StepInstance.class);
when(stepInstanceDao.getStepInstance(any(), anyLong(), anyLong(), any(), any()))
.thenReturn(stepInSubworkflow);
AssertHelper.assertThrows(
"param name does not exist",
MaestroInternalError.class,
"getFromSubworkflow throws an exception",
() -> paramExtension.getFromSubworkflow("foo", "job1", "not-existing"));
}

@Test
public void testNextUniqueId() {
Long expected = 750762533885116445L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void testMergeSubworkflowArtifact() throws Exception {
public void testMergeForeachArtifact() throws Exception {
StepRuntimeSummary summary =
loadObject(
"fixtures/execution/sample-step-runtime-summary-1.json", StepRuntimeSummary.class);
"fixtures/execution/sample-step-runtime-summary-3.json", StepRuntimeSummary.class);
ForeachArtifact artifact = summary.getArtifacts().get(Artifact.Type.FOREACH.key()).asForeach();
assertEquals("inline-wf", artifact.getForeachWorkflowId());
assertEquals("foo", artifact.getForeachIdentity());
Expand Down Expand Up @@ -427,6 +427,10 @@ public void testNoChangeMerge() throws Exception {
summary.mergeRuntimeUpdate(null, artifacts);
assertTrue(summary.isSynced());

summary =
loadObject(
"fixtures/execution/sample-step-runtime-summary-3.json", StepRuntimeSummary.class);
artifacts.clear();
ForeachArtifact artifact3 = new ForeachArtifact();
artifact3.setForeachWorkflowId("inline-wf");
artifact3.setForeachIdentity("foo");
Expand Down

0 comments on commit 42abcda

Please sign in to comment.