Skip to content

Commit

Permalink
IWF-136: Support initial data attributes when starting workflow (#435)
Browse files Browse the repository at this point in the history
  • Loading branch information
lwolczynski authored Sep 30, 2024
1 parent d9321af commit e0c4caa
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 11 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Or maybe both just for testing to ensure the code works for both Cadence and Tem

### Option 1: Run with our docker-compose file (Recommended)

Simply run `docker compose -f docker-compose/integ-dependencies.yml up -` will:
Simply run `docker compose -f docker-compose/integ-dependencies.yml up` will:

* Start both Cadence & Temporal as dependencies
* Set up required system search attributes
Expand Down
24 changes: 23 additions & 1 deletion gen/iwfidl/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,15 @@ components:
workflowIDReusePolicy: null
useMemoForDataAttributes: true
cronSchedule: cronSchedule
dataAttributes:
- value:
data: data
encoding: encoding
key: key
- value:
data: data
encoding: encoding
key: key
properties:
workflowIDReusePolicy:
$ref: '#/components/schemas/WorkflowIDReusePolicy'
Expand All @@ -686,6 +695,10 @@ components:
items:
$ref: '#/components/schemas/SearchAttribute'
type: array
dataAttributes:
items:
$ref: '#/components/schemas/KeyValue'
type: array
workflowConfigOverride:
$ref: '#/components/schemas/WorkflowConfig'
idReusePolicy:
Expand All @@ -702,7 +715,7 @@ components:
type: string
IDReusePolicy:
enum:
- ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY # Keeping typo enum for backwards compatibility
- ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY
- ALLOW_IF_PREVIOUS_EXITS_ABNORMALLY
- ALLOW_IF_NO_RUNNING
- DISALLOW_REUSE
Expand Down Expand Up @@ -891,6 +904,15 @@ components:
workflowIDReusePolicy: null
useMemoForDataAttributes: true
cronSchedule: cronSchedule
dataAttributes:
- value:
data: data
encoding: encoding
key: key
- value:
data: data
encoding: encoding
key: key
iwfWorkerUrl: iwfWorkerUrl
workflowId: workflowId
stateInput:
Expand Down
2 changes: 1 addition & 1 deletion gen/iwfidl/docs/IDReusePolicy.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Enum

# Keeping typo enum for backwards compatibility

* `ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY` (value: `"ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY"`)

* `ALLOW_IF_PREVIOUS_EXITS_ABNORMALLY` (value: `"ALLOW_IF_PREVIOUS_EXITS_ABNORMALLY"`)
Expand Down
26 changes: 26 additions & 0 deletions gen/iwfidl/docs/WorkflowStartOptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Name | Type | Description | Notes
**WorkflowStartDelaySeconds** | Pointer to **int32** | | [optional]
**RetryPolicy** | Pointer to [**WorkflowRetryPolicy**](WorkflowRetryPolicy.md) | | [optional]
**SearchAttributes** | Pointer to [**[]SearchAttribute**](SearchAttribute.md) | | [optional]
**DataAttributes** | Pointer to [**[]KeyValue**](KeyValue.md) | | [optional]
**WorkflowConfigOverride** | Pointer to [**WorkflowConfig**](WorkflowConfig.md) | | [optional]
**IdReusePolicy** | Pointer to [**IDReusePolicy**](IDReusePolicy.md) | | [optional]
**UseMemoForDataAttributes** | Pointer to **bool** | | [optional]
Expand Down Expand Up @@ -157,6 +158,31 @@ SetSearchAttributes sets SearchAttributes field to given value.

HasSearchAttributes returns a boolean if a field has been set.

### GetDataAttributes

`func (o *WorkflowStartOptions) GetDataAttributes() []KeyValue`

GetDataAttributes returns the DataAttributes field if non-nil, zero value otherwise.

### GetDataAttributesOk

`func (o *WorkflowStartOptions) GetDataAttributesOk() (*[]KeyValue, bool)`

GetDataAttributesOk returns a tuple with the DataAttributes field if it's non-nil, zero value otherwise
and a boolean to check if the value has been set.

### SetDataAttributes

`func (o *WorkflowStartOptions) SetDataAttributes(v []KeyValue)`

SetDataAttributes sets DataAttributes field to given value.

### HasDataAttributes

`func (o *WorkflowStartOptions) HasDataAttributes() bool`

HasDataAttributes returns a boolean if a field has been set.

### GetWorkflowConfigOverride

`func (o *WorkflowStartOptions) GetWorkflowConfigOverride() WorkflowConfig`
Expand Down
4 changes: 2 additions & 2 deletions gen/iwfidl/model_id_reuse_policy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions gen/iwfidl/model_workflow_start_options.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 47 additions & 1 deletion integ/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/indeedeng/iwf/service/common/timeparser"
"log"
"net/http"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -123,6 +124,13 @@ func doTestPersistenceWorkflow(
nowTime := time.Now()
notTimeNanoStr := fmt.Sprintf("%v", nowTime.UnixNano())
nowTimeStr := nowTime.Format(timeparser.DateTimeFormat)
expectedDataAttribute := iwfidl.KeyValue{
Key: ptr.Any("TestKey"),
Value: &iwfidl.EncodedObject{
Encoding: ptr.Any("TestEncoding"),
Data: ptr.Any("TestValue"),
},
}
expectedDatetimeSearchAttribute := iwfidl.SearchAttribute{
Key: iwfidl.PtrString("CustomDatetimeField"),
ValueType: ptr.Any(iwfidl.DATETIME),
Expand All @@ -148,13 +156,39 @@ func doTestPersistenceWorkflow(
SearchAttributes: []iwfidl.SearchAttribute{
expectedDatetimeSearchAttribute,
},
DataAttributes: []iwfidl.KeyValue{
expectedDataAttribute,
},
WorkflowConfigOverride: config,
UseMemoForDataAttributes: ptr.Any(useMemo),
},
}
_, httpResp, err := reqStart.WorkflowStartRequest(wfReq).Execute()
panicAtHttpError(err, httpResp)

initReqQry := apiClient.DefaultApi.ApiV1WorkflowDataobjectsGetPost(context.Background())

queryResult, httpResp, err := getDataAttributes(initReqQry, wfId, expectedDataAttribute, useMemo)

retryCount := 0

// Config is only present for continueAsNew tests
if config != nil {
for {
if err == nil || retryCount >= 5 {
break
}
// Loading data to a continuedAsNew workflow might take a few seconds thus retry mechanism is needed
time.Sleep(time.Millisecond * 1000)
retryCount += 1
queryResult, httpResp, err = getDataAttributes(initReqQry, wfId, expectedDataAttribute, useMemo)
}
}

panicAtHttpError(err, httpResp)

assert.Contains(t, queryResult.GetObjects(), expectedDataAttribute)

reqWait := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
wfResponse, httpResp, err := reqWait.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
Expand All @@ -165,7 +199,7 @@ func doTestPersistenceWorkflow(
queryResult1, httpResp, err := reqQry.WorkflowGetDataObjectsRequest(iwfidl.WorkflowGetDataObjectsRequest{
WorkflowId: wfId,
Keys: []string{
persistence.TestDataObjectKey,
persistence.TestDataObjectKey, expectedDataAttribute.GetKey(),
},
UseMemoForDataAttributes: ptr.Any(useMemo),
}).Execute()
Expand Down Expand Up @@ -232,6 +266,7 @@ func doTestPersistenceWorkflow(
Key: iwfidl.PtrString(persistence.TestDataObjectKey),
Value: &persistence.TestDataObjectVal2,
},
expectedDataAttribute,
}
expected2 := []iwfidl.KeyValue{
{
Expand All @@ -242,6 +277,7 @@ func doTestPersistenceWorkflow(
Key: iwfidl.PtrString(persistence.TestDataObjectKey2),
Value: &persistence.TestDataObjectVal1,
},
expectedDataAttribute,
}
assertions.ElementsMatch(expected1, queryResult1.GetObjects())
assertions.ElementsMatch(expected2, queryResult2.GetObjects())
Expand Down Expand Up @@ -366,6 +402,16 @@ func doTestPersistenceWorkflow(
}
}

func getDataAttributes(initReqQry iwfidl.ApiApiV1WorkflowDataobjectsGetPostRequest, wfId string, expectedDataAttribute iwfidl.KeyValue, useMemo bool) (*iwfidl.WorkflowGetDataObjectsResponse, *http.Response, error) {
return initReqQry.WorkflowGetDataObjectsRequest(iwfidl.WorkflowGetDataObjectsRequest{
WorkflowId: wfId,
Keys: []string{
persistence.TestDataObjectKey, expectedDataAttribute.GetKey(),
},
UseMemoForDataAttributes: ptr.Any(useMemo),
}).Execute()
}

func assertSearch(query string, expectedCount int, apiClient *iwfidl.APIClient, assertions *assert.Assertions) {
// search through all wfs using search API with pagination
search := apiClient.DefaultApi.ApiV1WorkflowSearchPost(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion iwf-idl
Submodule iwf-idl updated 2 files
+5 −1 iwf-sdk.yaml
+6 −1 iwf.yaml
7 changes: 6 additions & 1 deletion service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
} else {
workflowConfig = *s.config.Interpreter.DefaultWorkflowConfig
}

var initCustomSAs []iwfidl.SearchAttribute
var initCustomDAs []iwfidl.KeyValue
// workerUrl is always needed, for optimizing None as persistence loading type
workflowOptions.Memo = map[string]interface{}{
service.WorkerUrlMemoKey: iwfidl.EncodedObject{
Expand All @@ -104,6 +104,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
workflowOptions.SearchAttributes = utils.MergeMap(initialCustomSAInternal, workflowOptions.SearchAttributes)

initCustomSAs = startOptions.SearchAttributes
initCustomDAs = startOptions.DataAttributes
if startOptions.HasWorkflowConfigOverride() {
workflowConfig = startOptions.GetWorkflowConfigOverride()
}
Expand All @@ -113,6 +114,9 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
// Note: the value is actually not too important, we will check the presence of the key only as today
Data: iwfidl.PtrString("true"),
}
for _, da := range initCustomDAs {
workflowOptions.Memo[da.GetKey()] = da.GetValue()
}
}
if startOptions.WorkflowStartDelaySeconds != nil {
workflowOptions.WorkflowStartDelay =
Expand All @@ -127,6 +131,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
StateInput: req.StateInput,
StateOptions: req.StateOptions,
InitSearchAttributes: initCustomSAs,
InitDataAttributes: initCustomDAs,
Config: workflowConfig,
UseMemoForDataAttributes: useMemo,
WaitForCompletionStateExecutionIds: req.GetWaitForCompletionStateExecutionIds(),
Expand Down
1 change: 1 addition & 0 deletions service/client/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type StartWorkflowOptions struct {
WorkflowIDReusePolicy *iwfidl.WorkflowIDReusePolicy
CronSchedule *string
RetryPolicy *iwfidl.WorkflowRetryPolicy
DataAttributes map[string]interface{}
SearchAttributes map[string]interface{}
Memo map[string]interface{}
WorkflowStartDelay *time.Duration
Expand Down
2 changes: 2 additions & 0 deletions service/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type (

InitSearchAttributes []iwfidl.SearchAttribute `json:"initSearchAttributes,omitempty"`

InitDataAttributes []iwfidl.KeyValue `json:"initDataAttributes,omitempty"`

UseMemoForDataAttributes bool `json:"useMemoForDataAttributes,omitempty"`

Config iwfidl.WorkflowConfig `json:"config,omitempty"`
Expand Down
8 changes: 6 additions & 2 deletions service/interpreter/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ type PersistenceManager struct {
}

func NewPersistenceManager(
provider WorkflowProvider, initSearchAttributes []iwfidl.SearchAttribute, useMemo bool,
provider WorkflowProvider, initDataAttributes []iwfidl.KeyValue, initSearchAttributes []iwfidl.SearchAttribute, useMemo bool,
) *PersistenceManager {
searchAttributes := make(map[string]iwfidl.SearchAttribute)
for _, sa := range initSearchAttributes {
searchAttributes[sa.GetKey()] = sa
}
dataAttributes := make(map[string]iwfidl.KeyValue)
for _, da := range initDataAttributes {
dataAttributes[da.GetKey()] = da
}
return &PersistenceManager{
dataObjects: make(map[string]iwfidl.KeyValue),
dataObjects: dataAttributes,
searchAttributes: searchAttributes,
provider: provider,

Expand Down
8 changes: 7 additions & 1 deletion service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func InterpreterImpl(
} else {
interStateChannel = NewInterStateChannel()
stateRequestQueue = NewStateRequestQueue()
persistenceManager = NewPersistenceManager(provider, input.InitSearchAttributes, input.UseMemoForDataAttributes)
persistenceManager = NewPersistenceManager(provider, input.InitDataAttributes, input.InitSearchAttributes, input.UseMemoForDataAttributes)
timerProcessor = NewTimerProcessor(ctx, provider, nil)
continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider)
signalReceiver = NewSignalReceiver(ctx, provider, interStateChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, nil)
Expand All @@ -95,6 +95,10 @@ func InterpreterImpl(
if err != nil {
return nil, err
}
// We intentionally set the query handler after the continueAsNew/dumpInternal activity.
// This is to ensure the correctness. If we set the query handler before that,
// the query handler could return empty data (since the loading hasn't completed), which will be incorrect response.
// We would rather return server errors and let the client retry later.
err = SetQueryHandlers(ctx, provider, persistenceManager, continueAsNewer, workflowConfiger, basicInfo)
if err != nil {
return nil, err
Expand Down Expand Up @@ -315,6 +319,8 @@ func InterpreterImpl(
input.StateInput = nil
input.StateOptions = nil
input.StartStateId = nil
input.InitDataAttributes = nil
input.InitSearchAttributes = nil
return nil, provider.NewInterpreterContinueAsNewError(ctx, input)
}
} // end main loop
Expand Down

0 comments on commit e0c4caa

Please sign in to comment.