From 42abcdadd38d5a93ea5a23556a371efe74099df8 Mon Sep 17 00:00:00 2001 From: jun-he Date: Fri, 9 Aug 2024 09:56:13 -0700 Subject: [PATCH] Add two SEL functions to support getting a specific param from a subworkflow and getting a step's end time. (#71) --- .../com/netflix/maestro/models/Constants.java | 3 + .../sample-step-runtime-summary-1.json | 15 +-- .../sample-step-runtime-summary-3.json | 104 ++++++++++++++++++ .../engine/eval/MaestroParamExtension.java | 60 ++++++++++ .../eval/MaestroParamExtensionTest.java | 51 +++++++++ .../execution/StepRuntimeSummaryTest.java | 6 +- 6 files changed, 224 insertions(+), 15 deletions(-) create mode 100644 maestro-common/src/testFixtures/resources/fixtures/execution/sample-step-runtime-summary-3.json diff --git a/maestro-common/src/main/java/com/netflix/maestro/models/Constants.java b/maestro-common/src/main/java/com/netflix/maestro/models/Constants.java index f3acb28..965fef9 100644 --- a/maestro-common/src/main/java/com/netflix/maestro/models/Constants.java +++ b/maestro-common/src/main/java/com/netflix/maestro/models/Constants.java @@ -262,6 +262,9 @@ public static WorkflowVersion of(String version) { /** Step instance status param name key used in SEL to retrieve the step status. */ public static final String STEP_STATUS_PARAM = "MAESTRO_STEP_STATUS"; + /** Step instance end time param name key used in SEL to retrieve the step end time. */ + public static final String STEP_END_TIME_PARAM = "MAESTRO_STEP_END_TIME"; + /** * Match all instances. Special value to denote a breakpoint which is set to match all wf * instances. diff --git a/maestro-common/src/testFixtures/resources/fixtures/execution/sample-step-runtime-summary-1.json b/maestro-common/src/testFixtures/resources/fixtures/execution/sample-step-runtime-summary-1.json index a7ce54e..5c5ac48 100644 --- a/maestro-common/src/testFixtures/resources/fixtures/execution/sample-step-runtime-summary-1.json +++ b/maestro-common/src/testFixtures/resources/fixtures/execution/sample-step-runtime-summary-1.json @@ -4,6 +4,7 @@ "step_instance_id": 123, "step_attempt_id": 2, "step_instance_uuid": "bar", + "type": "subworkflow", "step_run_params": { "foo": { "value": "bar", @@ -58,20 +59,6 @@ "total_step_count": 1 }, "type": "SUBWORKFLOW" - }, - "maestro_foreach": { - "foreach_workflow_id": "inline-wf", - "foreach_identity": "foo", - "total_loop_count": 10, - "next_loop_index": 0, - "foreach_overview": { - "checkpoint": 6, - "stats": { - "CREATED": 5, - "SUCCEEDED": 1 - } - }, - "type": "FOREACH" } }, "dependencies": { diff --git a/maestro-common/src/testFixtures/resources/fixtures/execution/sample-step-runtime-summary-3.json b/maestro-common/src/testFixtures/resources/fixtures/execution/sample-step-runtime-summary-3.json new file mode 100644 index 0000000..b1cf12c --- /dev/null +++ b/maestro-common/src/testFixtures/resources/fixtures/execution/sample-step-runtime-summary-3.json @@ -0,0 +1,104 @@ +{ + "step_id": "foo", + "step_name": "step1", + "step_instance_id": 123, + "step_attempt_id": 2, + "step_instance_uuid": "bar", + "type": "foreach", + "step_run_params": { + "foo": { + "value": "bar", + "type": "STRING", + "mode": "mutable" + } + }, + "params": { + "param1": { + "value": "foo", + "type": "STRING", + "evaluated_result": "foo", + "evaluated_time": 1608171805392 + } + }, + "synced": true, + "runtime_state": { + "status": "SUCCEEDED", + "end_time": 1608171805401, + "modify_time": 1608171805401 + }, + "timeline": [ + { + "timestamp": 1609272999666, + "type": "LOG", + "level": "INFO", + "message": "hello" + } + ], + "pending_records": [ + { + "event_time": 1608171805401, + "new_status": "SUCCEEDED", + "old_status": "NOT_CREATED" + } + ], + "artifacts": { + "artifact1": { + "value": 1, + "foo": "bar" + }, + "maestro_foreach": { + "foreach_workflow_id": "inline-wf", + "foreach_identity": "foo", + "total_loop_count": 10, + "next_loop_index": 0, + "foreach_overview": { + "checkpoint": 6, + "stats": { + "CREATED": 5, + "SUCCEEDED": 1 + } + }, + "type": "FOREACH" + } + }, + "dependencies": { + "SIGNAL": { + "type": "SIGNAL", + "statuses": [ + { + "params": { + "value": { + "name": { + "value": "signal/a/1", + "type": "STRING" + }, + "_step_dependency_sub_type": { + "value": "input_signal", + "type": "STRING" + }, + "foo": { + "parameter": { + "value": "bar", + "type": "STRING" + }, + "operator": "=", + "type": "SIGNAL" + } + }, + "type": "MAP", + "evaluated_result": { + "name": "signal/a/1", + "foo": "bar" + }, + "evaluated_time": 1617728847801 + }, + "signal_reference": { + "signal_instance_id": "fake_signal_id", + "timestamp": 1617728849086 + }, + "status": "MATCHED" + } + ] + } + } +} diff --git a/maestro-engine/src/main/java/com/netflix/maestro/engine/eval/MaestroParamExtension.java b/maestro-engine/src/main/java/com/netflix/maestro/engine/eval/MaestroParamExtension.java index d693765..7c4da26 100644 --- a/maestro-engine/src/main/java/com/netflix/maestro/engine/eval/MaestroParamExtension.java +++ b/maestro-engine/src/main/java/com/netflix/maestro/engine/eval/MaestroParamExtension.java @@ -18,14 +18,17 @@ import com.netflix.maestro.engine.utils.StepHelper; import com.netflix.maestro.engine.validations.DryRunValidator; import com.netflix.maestro.exceptions.MaestroInternalError; +import com.netflix.maestro.exceptions.MaestroInvalidExpressionException; import com.netflix.maestro.exceptions.MaestroValidationException; import com.netflix.maestro.models.Constants; import com.netflix.maestro.models.artifact.Artifact; import com.netflix.maestro.models.artifact.ForeachArtifact; +import com.netflix.maestro.models.artifact.SubworkflowArtifact; import com.netflix.maestro.models.definition.StepType; import com.netflix.maestro.models.initiator.Initiator; import com.netflix.maestro.models.initiator.ManualInitiator; import com.netflix.maestro.models.initiator.SignalInitiator; +import com.netflix.maestro.models.instance.StepInstance; import com.netflix.maestro.models.parameter.ParamType; import com.netflix.maestro.models.parameter.Parameter; import com.netflix.maestro.utils.Checks; @@ -72,6 +75,9 @@ public class MaestroParamExtension extends AbstractParamExtension { /** Function to get the signal metadata for return default value if not found. */ static final String GET_FROM_SIGNAL_OR_DEFAULT = "getFromSignalOrDefault"; + /** Function to get the data from the subworkflow instance. */ + static final String GET_FROM_SUBWORKFLOW = "getFromSubworkflow"; + /** Function to get the foreach metadata. */ static final String GET_FROM_FOREACH = "getFromForeach"; @@ -132,6 +138,8 @@ protected Object callWithThreeArgs(String methodName, String arg1, String arg2, throw new IllegalStateException( methodName + " must return an array instead of " + res.getClass()); } + } else if (GET_FROM_SUBWORKFLOW.equals(methodName)) { + return getFromSubworkflow(arg1, arg2, arg3); } else if (GET_FROM_SIGNAL_OR_DEFAULT.equals(methodName)) { return getFromSignalOrDefault(arg1, arg2, arg3); } @@ -167,6 +175,12 @@ private Object fromStep(String stepId, String paramName) { StepRuntimeSummary runtimeSummary = validateAndGet(stepId); if (Constants.STEP_STATUS_PARAM.equals(paramName)) { return runtimeSummary.getRuntimeState().getStatus().name(); + } else if (Constants.STEP_END_TIME_PARAM.equals(paramName)) { + return Checks.notNull( + runtimeSummary.getRuntimeState().getEndTime(), + "ERROR: step [%s]'s [%s] is not set yet.", + stepId, + paramName); } Parameter stepParam = Checks.notNull( @@ -365,6 +379,52 @@ private Object fromForeach(String foreachStepId, String stepId, String paramName } } + Object getFromSubworkflow(String subworkflowStepId, String stepId, String paramName) { + try { + return executor + .submit(() -> fromSubworkflow(subworkflowStepId, stepId, paramName)) + .get(TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new MaestroInternalError( + e, + "getFromSubworkflow throws an exception for subworkflowStepId=[%s], stepId=[%s], paramName=[%s]", + subworkflowStepId, + stepId, + paramName); + } + } + + private Object fromSubworkflow(String subworkflowStepId, String stepId, String paramName) { + StepRuntimeSummary runtimeSummary = validateAndGet(subworkflowStepId); + Checks.checkTrue( + runtimeSummary.getType() == StepType.SUBWORKFLOW, + "step [%s] is a type of [%s] instead of subworkflow step, cannot call getFromSubworkflow", + subworkflowStepId, + runtimeSummary.getType()); + + SubworkflowArtifact artifact = + Checks.notNull( + runtimeSummary.getArtifacts().get(Artifact.Type.SUBWORKFLOW.key()), + "Cannot load param [%s] of step [%s] from subworkflow [%s] as it is not initialized", + paramName, + stepId, + subworkflowStepId) + .asSubworkflow(); + + StepInstance stepInstance = + stepInstanceDao.getStepInstance( + artifact.getSubworkflowId(), + artifact.getSubworkflowInstanceId(), + artifact.getSubworkflowRunId(), + stepId, + Constants.LATEST_INSTANCE_RUN); + + if (stepInstance.getParams() == null || !stepInstance.getParams().containsKey(paramName)) { + throw new MaestroInvalidExpressionException("Cannot find the param name: [%s]", paramName); + } + return stepInstance.getParams().get(paramName).getEvaluatedResult(); + } + Long nextUniqueId() { try { Thread.sleep(ThreadLocalRandom.current().nextInt(RANDOM_JITTER_DELAY)); diff --git a/maestro-engine/src/test/java/com/netflix/maestro/engine/eval/MaestroParamExtensionTest.java b/maestro-engine/src/test/java/com/netflix/maestro/engine/eval/MaestroParamExtensionTest.java index 60d2ddc..dc39dfd 100644 --- a/maestro-engine/src/test/java/com/netflix/maestro/engine/eval/MaestroParamExtensionTest.java +++ b/maestro-engine/src/test/java/com/netflix/maestro/engine/eval/MaestroParamExtensionTest.java @@ -15,6 +15,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.when; import com.netflix.maestro.AssertHelper; @@ -32,6 +33,7 @@ import com.netflix.maestro.models.initiator.ManualInitiator; import com.netflix.maestro.models.initiator.SignalInitiator; import com.netflix.maestro.models.initiator.SubworkflowInitiator; +import com.netflix.maestro.models.instance.StepInstance; import com.netflix.maestro.models.parameter.MapParameter; import com.netflix.maestro.models.parameter.ParamType; import com.netflix.maestro.models.parameter.Parameter; @@ -51,6 +53,10 @@ public class MaestroParamExtensionTest extends MaestroEngineBaseTest { private static final String TEST_STEP_RUNTIME_SUMMARY = "fixtures/execution/sample-step-runtime-summary-2.json"; + private static final String TEST_SUBWORKFLOW_STEP_RUNTIME_SUMMARY = + "fixtures/execution/sample-step-runtime-summary-1.json"; + private static final String TEST_STEP_INSTANCE = + "fixtures/instances/sample-step-instance-succeeded.json"; @Mock MaestroStepInstanceDao stepInstanceDao; @Mock InstanceWrapper instanceWrapper; @@ -78,6 +84,8 @@ public void testGetFromStep() throws Exception { .thenReturn(Collections.singletonMap("maestro_step_runtime_summary", summary)); assertEquals("foo", paramExtension.getFromStep("step1", "param1")); assertEquals("SUCCEEDED", paramExtension.getFromStep("step1", Constants.STEP_STATUS_PARAM)); + assertEquals( + 1608171805401L, paramExtension.getFromStep("step1", Constants.STEP_END_TIME_PARAM)); } @Test @@ -134,6 +142,19 @@ public void testGetFromForeach() throws Exception { assertArrayEquals(new long[] {12, 0, 0, 0, 0, 0}, res); } + @Test + public void testGetFromSubworkflow() throws Exception { + StepRuntimeSummary summary = + loadObject(TEST_SUBWORKFLOW_STEP_RUNTIME_SUMMARY, StepRuntimeSummary.class); + when(allStepOutputData.get("foo")) + .thenReturn(Collections.singletonMap("maestro_step_runtime_summary", summary)); + StepInstance stepInSubworkflow = loadObject(TEST_STEP_INSTANCE, StepInstance.class); + when(stepInstanceDao.getStepInstance(any(), anyLong(), anyLong(), any(), any())) + .thenReturn(stepInSubworkflow); + long res = (Long) paramExtension.getFromSubworkflow("foo", "job1", "sleep_seconds"); + assertEquals(15, res); + } + @Test public void testInvalidGetFromStep() throws Exception { StepRuntimeSummary summary = loadObject(TEST_STEP_RUNTIME_SUMMARY, StepRuntimeSummary.class); @@ -264,6 +285,36 @@ public void testInvalidGetFromForeach() throws Exception { () -> paramExtension.getFromForeach("foreach-job", "job1", "sleep_seconds")); } + @Test + public void testInvalidGetFromSubworkflow() throws Exception { + AssertHelper.assertThrows( + "Cannot find the referenced step id", + MaestroInternalError.class, + "getFromSubworkflow throws an exception", + () -> paramExtension.getFromSubworkflow("non-existing-job", "job1", "sleep_seconds")); + + StepRuntimeSummary summary = loadObject(TEST_STEP_RUNTIME_SUMMARY, StepRuntimeSummary.class); + when(allStepOutputData.get("foo")) + .thenReturn(Collections.singletonMap("maestro_step_runtime_summary", summary)); + AssertHelper.assertThrows( + "step type is not subworkflow", + MaestroInternalError.class, + "getFromSubworkflow throws an exception", + () -> paramExtension.getFromSubworkflow("foo", "job1", "sleep_seconds")); + + summary = loadObject(TEST_SUBWORKFLOW_STEP_RUNTIME_SUMMARY, StepRuntimeSummary.class); + when(allStepOutputData.get("foo")) + .thenReturn(Collections.singletonMap("maestro_step_runtime_summary", summary)); + StepInstance stepInSubworkflow = loadObject(TEST_STEP_INSTANCE, StepInstance.class); + when(stepInstanceDao.getStepInstance(any(), anyLong(), anyLong(), any(), any())) + .thenReturn(stepInSubworkflow); + AssertHelper.assertThrows( + "param name does not exist", + MaestroInternalError.class, + "getFromSubworkflow throws an exception", + () -> paramExtension.getFromSubworkflow("foo", "job1", "not-existing")); + } + @Test public void testNextUniqueId() { Long expected = 750762533885116445L; diff --git a/maestro-engine/src/test/java/com/netflix/maestro/engine/execution/StepRuntimeSummaryTest.java b/maestro-engine/src/test/java/com/netflix/maestro/engine/execution/StepRuntimeSummaryTest.java index f20d9eb..a8e1ce8 100644 --- a/maestro-engine/src/test/java/com/netflix/maestro/engine/execution/StepRuntimeSummaryTest.java +++ b/maestro-engine/src/test/java/com/netflix/maestro/engine/execution/StepRuntimeSummaryTest.java @@ -211,7 +211,7 @@ public void testMergeSubworkflowArtifact() throws Exception { public void testMergeForeachArtifact() throws Exception { StepRuntimeSummary summary = loadObject( - "fixtures/execution/sample-step-runtime-summary-1.json", StepRuntimeSummary.class); + "fixtures/execution/sample-step-runtime-summary-3.json", StepRuntimeSummary.class); ForeachArtifact artifact = summary.getArtifacts().get(Artifact.Type.FOREACH.key()).asForeach(); assertEquals("inline-wf", artifact.getForeachWorkflowId()); assertEquals("foo", artifact.getForeachIdentity()); @@ -427,6 +427,10 @@ public void testNoChangeMerge() throws Exception { summary.mergeRuntimeUpdate(null, artifacts); assertTrue(summary.isSynced()); + summary = + loadObject( + "fixtures/execution/sample-step-runtime-summary-3.json", StepRuntimeSummary.class); + artifacts.clear(); ForeachArtifact artifact3 = new ForeachArtifact(); artifact3.setForeachWorkflowId("inline-wf"); artifact3.setForeachIdentity("foo");