5
5
import com .netflix .conductor .client .worker .Worker ;
6
6
import com .netflix .conductor .common .metadata .tasks .Task ;
7
7
import com .netflix .conductor .common .metadata .tasks .TaskResult ;
8
- import com .netflix .conductor .common .metadata .workflow .StartWorkflowRequest ;
9
8
import com .netflix .conductor .common .metadata .workflow .WorkflowDef ;
10
9
import com .netflix .conductor .sdk .workflow .def .ConductorWorkflow ;
11
10
import com .netflix .conductor .sdk .workflow .def .tasks .*;
12
11
import com .netflix .conductor .sdk .workflow .executor .WorkflowExecutor ;
13
- import io .orkes .conductor .client .WorkflowClient ;
14
12
import io .orkes .samples .models .MediationRules ;
15
13
import lombok .AllArgsConstructor ;
16
14
import lombok .extern .slf4j .Slf4j ;
17
15
import org .springframework .stereotype .Component ;
18
16
19
17
20
- import java .util .HashMap ;
21
18
import java .util .List ;
22
19
import java .util .Map ;
23
20
26
23
@ AllArgsConstructor
27
24
public class DynamicSubworkflowWorker implements Worker {
28
25
29
- private final WorkflowClient workflowClient ;
30
26
private final ObjectMapper objectMapper = new ObjectMapper ();
31
27
private final WorkflowExecutor executor ;
32
28
33
29
@ Override
34
30
public String getTaskDefName () {
35
- return "quest_start_subworkflow " ;
31
+ return "create_dynamic_workflow_def " ;
36
32
}
37
33
38
34
/**
@@ -42,7 +38,7 @@ public String getTaskDefName() {
42
38
*/
43
39
@ Override
44
40
public TaskResult execute (Task task ) {
45
- System .out .println ("Starting quest_start_subworkflow task" );
41
+ System .out .println ("Starting create_dynamic_workflow_def task" );
46
42
TaskResult result = new TaskResult (task );
47
43
try {
48
44
MediationRules mediationRules = objectMapper .convertValue (task .getInputData ().get ("mediation_rules" ), MediationRules .class );
@@ -55,17 +51,8 @@ public TaskResult execute(Task task) {
55
51
}
56
52
57
53
public Map <String , Object > startExistingWorkflow (MediationRules mediationRules ) throws JsonProcessingException {
58
- StartWorkflowRequest request = new StartWorkflowRequest ();
59
- request .setName ("dynamic_workflow" );
60
- Map <String , Object > inputData = new HashMap <>();
61
- //inputData.put("enrichmentSubworkflowsDef", subworkflowDef());
62
54
Object dynamicSubworkflowDef = objectMapper .convertValue (createDynamicSubworkflow (mediationRules ), Object .class );
63
- inputData .put ("dynamicSubworkflowDef" , dynamicSubworkflowDef );
64
- request .setInput (inputData );
65
-
66
- String workflowId = workflowClient .startWorkflow (request );
67
- log .info ("Workflow id: {}" , workflowId );
68
- return Map .of ("workflowId" , workflowId );
55
+ return Map .of ("workflow_def" , dynamicSubworkflowDef );
69
56
}
70
57
71
58
private WorkflowDef createDynamicSubworkflow (MediationRules mediationRules ) throws JsonProcessingException {
@@ -83,20 +70,8 @@ private WorkflowDef createDynamicSubworkflow(MediationRules mediationRules) thro
83
70
// --------------- Enrichment level started ------------------
84
71
com .netflix .conductor .sdk .workflow .def .tasks .Task [][] enrichmentForkTasks = new com .netflix .conductor .sdk .workflow .def .tasks .Task [mediationRules .getEnrichments ().size ()][1 ];
85
72
for (int i = 0 ; i < mediationRules .getEnrichments ().size (); i ++) {
86
- ConductorWorkflow conductorWorkflow = new ConductorWorkflow (executor );
87
- conductorWorkflow .setName (mediationRules .getEnrichments ().get (i ).getEnrichmentType () + "_workflow" );
88
-
89
- SubWorkflow natsSubworkflow = new SubWorkflow ("nats_" + mediationRules .getEnrichments ().get (i ).getEnrichmentType () + "_subworkflow_ref" , mediationRules .getEnrichments ().get (i ).getNatsWorkflowName (), mediationRules .getEnrichments ().get (i ).getNatsWorkflowVersion ());
90
- natsSubworkflow .input (mediationRules .getEnrichments ().get (i ).getNatsWorkflowInput ());
91
- Switch sendToORBEnrichmentSwitch = new Switch ("send_to_" + mediationRules .getEnrichments ().get (i ).getEnrichmentType () + "_switch" , "${workflow.input.sendToORB}" ).switchCase ("Y" , natsSubworkflow ).defaultCase (List .of ());
92
- conductorWorkflow .add (sendToORBEnrichmentSwitch );
93
-
94
- Http httptask = new Http (mediationRules .getEnrichments ().get (i ).getEnrichmentType () + "_enrichment_workflow_task" );
95
- httptask .url ("https://orkes-api-tester.orkesconductor.com/api" );
96
- conductorWorkflow .add (httptask );
97
-
98
- SubWorkflow forkSubworkflow = new SubWorkflow (mediationRules .getEnrichments ().get (i ).getEnrichmentType () + "_subworkflow_ref" , conductorWorkflow );
99
- forkSubworkflow .input ("sendToORB" , mediationRules .getEnrichments ().get (i ).getSendToORB ());
73
+ SubWorkflow forkSubworkflow = new SubWorkflow (mediationRules .getEnrichments ().get (i ).getEnrichmentType () + "_subworkflow_ref" , "OP_" + mediationRules .getEnrichments ().get (i ).getEnrichmentType (), 1 );
74
+ forkSubworkflow .input ("sendToORB" , mediationRules .getEnrichments ().get (i ).getOrbpFlags ());
100
75
enrichmentForkTasks [i ][0 ] = forkSubworkflow ;
101
76
}
102
77
ForkJoin forkEnrichment = new ForkJoin ("fork_enrichment" , enrichmentForkTasks );
@@ -107,24 +82,11 @@ private WorkflowDef createDynamicSubworkflow(MediationRules mediationRules) thro
107
82
// -------------- Translation Level started ----------------
108
83
com .netflix .conductor .sdk .workflow .def .tasks .Task [][] translationForkTasks = new com .netflix .conductor .sdk .workflow .def .tasks .Task [mediationRules .getTranslations ().size ()][1 ];
109
84
for (int i = 0 ; i < mediationRules .getTranslations ().size (); i ++) {
110
- ConductorWorkflow conductorWorkflow = new ConductorWorkflow (executor );
111
- SubWorkflow forkSubworkflow = new SubWorkflow (mediationRules .getTranslations ().get (i ).getName () + "_subworkflow_ref" , conductorWorkflow );
112
- forkSubworkflow .input ("sendToORB" , mediationRules .getTranslations ().get (i ).getSendToORB ());
113
- conductorWorkflow .setName (mediationRules .getTranslations ().get (i ).getName () + "_workflow" );
114
- SubWorkflow natsSubworkflow = new SubWorkflow ("nats_" + mediationRules .getTranslations ().get (i ).getName () + "_subworkflow_ref" , mediationRules .getTranslations ().get (i ).getNatsWorkflowName (), mediationRules .getTranslations ().get (i ).getNatsWorkflowVersion ());
115
- natsSubworkflow .input (mediationRules .getTranslations ().get (i ).getNatsWorkflowInput ());
116
- Switch sendToORBTranslationSwitch = new Switch ("send_to_" + mediationRules .getTranslations ().get (i ).getName () + "_switch" , "${workflow.input.sendToORB}" ).switchCase ("Y" , natsSubworkflow ).defaultCase (List .of ());
117
- conductorWorkflow .add (sendToORBTranslationSwitch );
118
-
119
- for (int j = 0 ; j < mediationRules .getTranslations ().get (i ).getEnrichments ().size (); j ++) {
120
- Http httptask = new Http (mediationRules .getTranslations ().get (i ).getEnrichments ().get (j )+ "_translations_workflow_task" );
121
- httptask .url ("https://orkes-api-tester.orkesconductor.com/api" );
122
- String taskRef = mediationRules .getTranslations ().get (i ).getEnrichments ().get (j ) + "_subworkflow_ref" ;
123
- String outputExpression = "${" + taskRef + ".output.response}" ; //Can differ with different different tasks. Example with Simple/Inline tasks we might have to use result
124
- forkSubworkflow .input (mediationRules .getTranslations ().get (i ).getEnrichments ().get (j ), outputExpression );
125
- conductorWorkflow .add (httptask );
85
+ SubWorkflow forkSubworkflow = new SubWorkflow (mediationRules .getTranslations ().get (i ).getName () + "_subworkflow_ref" , "OP_" + mediationRules .getTranslations ().get (i ).getName (), 1 );
86
+ forkSubworkflow .input ("sendToORB" , mediationRules .getTranslations ().get (i ).getOrbpFlags ());
87
+ for (String enrichmentInput : mediationRules .getTranslations ().get (i ).getEnrichments ()) {
88
+ forkSubworkflow .input (enrichmentInput , "${" + enrichmentInput + "_subworkflow_ref" + ".output.result}" );
126
89
}
127
-
128
90
translationForkTasks [i ][0 ] = forkSubworkflow ;
129
91
}
130
92
ForkJoin forkTranslation = new ForkJoin ("fork_translation" , translationForkTasks );
@@ -135,24 +97,9 @@ private WorkflowDef createDynamicSubworkflow(MediationRules mediationRules) thro
135
97
// -------------- Distribution level started --------------------
136
98
com .netflix .conductor .sdk .workflow .def .tasks .Task [][] distributionForkTasks = new com .netflix .conductor .sdk .workflow .def .tasks .Task [mediationRules .getDistributions ().size ()][1 ];
137
99
for (int i = 0 ; i < mediationRules .getDistributions ().size (); i ++) {
138
- ConductorWorkflow conductorWorkflow = new ConductorWorkflow (executor );
139
- conductorWorkflow .setName (mediationRules .getDistributions ().get (i ).getDistributeTo () + "_workflow" );
140
-
141
- SubWorkflow natsSubworkflow = new SubWorkflow ("nats_" + mediationRules .getDistributions ().get (i ).getDistributeTo () + "_subworkflow_ref" , mediationRules .getDistributions ().get (i ).getNatsWorkflowName (), mediationRules .getDistributions ().get (i ).getNatsWorkflowVersion ());
142
- natsSubworkflow .input (mediationRules .getDistributions ().get (i ).getNatsWorkflowInput ());
143
- Switch sendToORBDistributionSwitch = new Switch ("send_to_" + mediationRules .getDistributions ().get (i ).getDistributeTo () + "_switch" , "${workflow.input.sendToORB}" ).switchCase ("Y" , natsSubworkflow ).defaultCase (List .of ());
144
- conductorWorkflow .add (sendToORBDistributionSwitch );
145
-
146
- Http httptask = new Http (mediationRules .getDistributions ().get (i ).getDistributeTo () + "_distributions_workflow_task" );
147
- httptask .url ("https://orkes-api-tester.orkesconductor.com/api" );
148
- conductorWorkflow .add (httptask );
149
-
150
- SubWorkflow forkSubworkflow = new SubWorkflow (mediationRules .getDistributions ().get (i ).getDistributeTo () + "_subworkflow_ref" , conductorWorkflow );
151
- forkSubworkflow .input ("sendToORB" , mediationRules .getDistributions ().get (i ).getSendToORB ());
152
- String taskRef = mediationRules .getDistributions ().get (i ).getTranslation () + "_subworkflow_ref" ;
153
- String outputExpression = "${" + taskRef + ".output.response}" ; //Can differ with different different tasks. Example with Simple/Inline tasks we might have to use result
154
- forkSubworkflow .input (mediationRules .getDistributions ().get (i ).getTranslation (), outputExpression );
155
- forkSubworkflow .input ("sink" , "nats:nats-integ:subject" );
100
+ SubWorkflow forkSubworkflow = new SubWorkflow (mediationRules .getDistributions ().get (i ).getDistributeTo () + "_subworkflow_ref" , "OP_" + mediationRules .getDistributions ().get (i ).getDistributeTo (), 1 );
101
+ forkSubworkflow .input ("sendToORB" , mediationRules .getDistributions ().get (i ).getOrbpFlags ());
102
+ forkSubworkflow .input (mediationRules .getDistributions ().get (i ).getTranslation (), "${" + mediationRules .getDistributions ().get (i ).getTranslation () + "_subworkflow_ref" + ".output.result}" );
156
103
distributionForkTasks [i ][0 ] = forkSubworkflow ;
157
104
}
158
105
ForkJoin forkDistribution = new ForkJoin ("fork_distribution" , distributionForkTasks );
0 commit comments