Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 12 additions & 20 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,23 +507,15 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event *

lggr := e.logger.With("event", event, platform.KeyWorkflowExecutionID, executionID)
lggr.Debug("executing on a trigger event")
ec := &store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
workflows.KeywordTrigger: {
Outputs: store.StepOutput{
Value: event,
},
Status: store.StatusCompleted,
ExecutionID: executionID,
Ref: workflows.KeywordTrigger,
workflowExecution, err := e.executionsStore.Add(ctx, map[string]*store.WorkflowExecutionStep{
workflows.KeywordTrigger: {
Outputs: store.StepOutput{
Value: event,
},
},
WorkflowID: e.workflow.id,
ExecutionID: executionID,
Status: store.StatusStarted,
}

dbWex, err := e.executionsStore.Add(ctx, ec)
Status: store.StatusCompleted,
ExecutionID: executionID,
Ref: workflows.KeywordTrigger,
}}, executionID, e.workflow.id, store.StatusStarted)
if err != nil {
return err
}
Expand All @@ -547,10 +539,10 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event *
return nil
}
e.wg.Add(1)
go e.stepUpdateLoop(ctx, executionID, ch, dbWex.CreatedAt)
go e.stepUpdateLoop(ctx, executionID, ch, workflowExecution.CreatedAt)

for _, td := range triggerDependents {
e.queueIfReady(*ec, td)
e.queueIfReady(workflowExecution, td)
}

return nil
Expand Down Expand Up @@ -641,10 +633,10 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {

// If all dependencies are completed, enqueue the step.
if !waitingOnDependencies {
e.logger.With(platform.KeyStepRef, step.Ref, platform.KeyWorkflowExecutionID, state.ExecutionID, "state", copyState(state)).
e.logger.With(platform.KeyStepRef, step.Ref, platform.KeyWorkflowExecutionID, state.ExecutionID, "state", state.DeepCopy()).
Debug("step request enqueued")
e.pendingStepRequests <- stepRequest{
state: copyState(state),
state: state.DeepCopy(),
stepRef: step.Ref,
}
}
Expand Down
41 changes: 0 additions & 41 deletions core/services/workflows/state.go

This file was deleted.

37 changes: 37 additions & 0 deletions core/services/workflows/store/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,41 @@ func (w WorkflowExecution) ResultForStep(s string) (*exec.Result, bool) {
}, true
}

func (w WorkflowExecution) DeepCopy() WorkflowExecution {
steps := map[string]*WorkflowExecutionStep{}
for ref, step := range w.Steps {
var mval *values.Map
if step.Inputs != nil {
mval = step.Inputs.CopyMap()
}

copiedov := values.Copy(step.Outputs.Value)

newState := &WorkflowExecutionStep{
ExecutionID: step.ExecutionID,
Ref: step.Ref,
Status: step.Status,

Outputs: StepOutput{
Err: step.Outputs.Err,
Value: copiedov,
},

Inputs: mval,
UpdatedAt: step.UpdatedAt,
}

steps[ref] = newState
}
return WorkflowExecution{
ExecutionID: w.ExecutionID,
WorkflowID: w.WorkflowID,
Status: w.Status,
CreatedAt: w.CreatedAt,
UpdatedAt: w.UpdatedAt,
FinishedAt: w.FinishedAt,
Steps: steps,
}
}

var _ exec.Results = WorkflowExecution{}
63 changes: 63 additions & 0 deletions core/services/workflows/store/models_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package store

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/values"
)

func TestWorkflowExecution_DeepCopy(t *testing.T) {
now := time.Now()
inputs, err := values.NewMap(map[string]int{})
require.NoError(t, err)
step := &WorkflowExecutionStep{
ExecutionID: "exec1",
Ref: "step1",
Status: StatusStarted,
Inputs: inputs,
Outputs: StepOutput{Value: values.NewString("output")},
UpdatedAt: &now,
}

updatedNow := now.Add(1 * time.Minute)
finishedNow := now.Add(2 * time.Minute)
original := WorkflowExecution{
ExecutionID: "exec1",
WorkflowID: "workflow1",
Status: StatusStarted,
CreatedAt: &now,
UpdatedAt: &updatedNow,
FinishedAt: &finishedNow,
Steps: map[string]*WorkflowExecutionStep{"step1": step},
}

deepCopy := original.DeepCopy()

// Check that the copied execution is equal to the original
assert.Equal(t, original, deepCopy)

// Check that the copied execution is a different instance
assert.NotSame(t, &original, &deepCopy)

// Check that the steps map are the same before modification of the original
assert.Equal(t, original.Steps, deepCopy.Steps)

// Add a new step to the original steps and check that the deep copy is not affected
original.Steps["step2"] = &WorkflowExecutionStep{}

// Check that the steps map is a different instance
assert.NotEqual(t, original.Steps, deepCopy.Steps)

// Check that the step inside the steps map is a different instance
assert.NotSame(t, original.Steps["step1"], deepCopy.Steps["step1"])

// Check that the inputs map is a different instance
assert.NotSame(t, original.Steps["step1"].Inputs, deepCopy.Steps["step1"].Inputs)

// Check that the outputs value is a different instance
assert.NotSame(t, original.Steps["step1"].Outputs.Value, deepCopy.Steps["step1"].Outputs.Value)
}
3 changes: 2 additions & 1 deletion core/services/workflows/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import (
)

type Store interface {
Add(ctx context.Context, state *WorkflowExecution) (WorkflowExecution, error)
Add(ctx context.Context, steps map[string]*WorkflowExecutionStep,
executionID string, workflowID string, status string) (WorkflowExecution, error)
UpsertStep(ctx context.Context, step *WorkflowExecutionStep) (WorkflowExecution, error)
FinishExecution(ctx context.Context, executionID string, status string) (WorkflowExecution, error)
Get(ctx context.Context, executionID string) (WorkflowExecution, error)
Expand Down
52 changes: 32 additions & 20 deletions core/services/workflows/store/store_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ const (
maximumExecutionAge = 1 * time.Hour
)

// InMemoryStore is an in-memory implementation of the Store interface used to store workflow execution states
// InMemoryStore is an in-memory implementation of the Store interface used to store workflow execution states.
// The store always returns a copy of the current workflow execution state in the store such that it is effectively an
// immutable object as state modification only occurs within the store.
// TODO make the WorkflowExecution type immutable to reflect the latter fact and prevent unexpected side effects from
// TODO code being added that modifies WorkflowExecution objects outside of the store. (https://smartcontract-it.atlassian.net/browse/CAPPL-682)
type InMemoryStore struct {
lggr logger.Logger
commonservices.StateMachine
idToState map[string]*WorkflowExecution
idToExecution map[string]*WorkflowExecution
mu sync.RWMutex
shutdownWaitGroup sync.WaitGroup
chStop commonservices.StopChan
Expand All @@ -46,32 +50,40 @@ func NewInMemoryStore(lggr logger.Logger, clock clockwork.Clock) *InMemoryStore

func NewInMemoryStoreWithPruneConfiguration(lggr logger.Logger, clock clockwork.Clock, pruneFrequency time.Duration,
maximumExecutionAge time.Duration) *InMemoryStore {
return &InMemoryStore{lggr: lggr, idToState: map[string]*WorkflowExecution{}, clock: clock, chStop: make(chan struct{}),
return &InMemoryStore{lggr: lggr, idToExecution: map[string]*WorkflowExecution{}, clock: clock, chStop: make(chan struct{}),
pruneInterval: pruneFrequency, maximumExecutionAge: maximumExecutionAge}
}

// Add adds a new execution state under the given executionID
func (s *InMemoryStore) Add(ctx context.Context, execution *WorkflowExecution) (WorkflowExecution, error) {
func (s *InMemoryStore) Add(ctx context.Context, steps map[string]*WorkflowExecutionStep,
executionID string, workflowID string, status string) (WorkflowExecution, error) {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.idToState[execution.ExecutionID]
_, ok := s.idToExecution[executionID]
if ok {
return WorkflowExecution{}, fmt.Errorf("execution ID %s already exists in store", execution.ExecutionID)
return WorkflowExecution{}, fmt.Errorf("execution ID %s already exists in store", executionID)
}

now := s.clock.Now()
execution.CreatedAt = &now
execution.UpdatedAt = &now
execution := &WorkflowExecution{
Steps: steps,
WorkflowID: workflowID,
ExecutionID: executionID,
Status: status,
CreatedAt: &now,
UpdatedAt: &now,
}

s.idToExecution[execution.ExecutionID] = execution

s.idToState[execution.ExecutionID] = execution
return *execution, nil
return execution.DeepCopy(), nil
}

// UpsertStep updates a step for the given executionID
func (s *InMemoryStore) UpsertStep(ctx context.Context, step *WorkflowExecutionStep) (WorkflowExecution, error) {
s.mu.Lock()
defer s.mu.Unlock()
execution, ok := s.idToState[step.ExecutionID]
execution, ok := s.idToExecution[step.ExecutionID]
if !ok {
return WorkflowExecution{}, fmt.Errorf("could not find execution %s", step.ExecutionID)
}
Expand All @@ -80,14 +92,14 @@ func (s *InMemoryStore) UpsertStep(ctx context.Context, step *WorkflowExecutionS
execution.UpdatedAt = &now

execution.Steps[step.Ref] = step
return *execution, nil
return execution.DeepCopy(), nil
}

// FinishExecution marks the execution as finished with the given status
func (s *InMemoryStore) FinishExecution(ctx context.Context, executionID string, status string) (WorkflowExecution, error) {
s.mu.Lock()
defer s.mu.Unlock()
execution, ok := s.idToState[executionID]
execution, ok := s.idToExecution[executionID]
if !ok {
return WorkflowExecution{}, fmt.Errorf("could not find execution %s", executionID)
}
Expand All @@ -101,7 +113,7 @@ func (s *InMemoryStore) FinishExecution(ctx context.Context, executionID string,
execution.Status = status
execution.FinishedAt = &now

return *execution, nil
return execution.DeepCopy(), nil
}

func isCompletedStatus(status string) bool {
Expand All @@ -116,12 +128,12 @@ func isCompletedStatus(status string) bool {
func (s *InMemoryStore) Get(ctx context.Context, executionID string) (WorkflowExecution, error) {
s.mu.RLock()
defer s.mu.RUnlock()
state, ok := s.idToState[executionID]
execution, ok := s.idToExecution[executionID]
if !ok {
return WorkflowExecution{}, fmt.Errorf("could not find execution %s", executionID)
}

return *state, nil
return execution.DeepCopy(), nil
}

func (s *InMemoryStore) Start(context.Context) error {
Expand Down Expand Up @@ -163,19 +175,19 @@ func (s *InMemoryStore) pruneExpiredExecutionEntries() {
case <-ticker.Chan():
expirationTime := s.clock.Now().Add(-s.maximumExecutionAge)
s.mu.Lock()
for id, state := range s.idToState {
for id, state := range s.idToExecution {
if isCompletedStatus(state.Status) {
delete(s.idToState, id)
delete(s.idToExecution, id)
}
}

// Prune non-terminated executions that are older than the maximum expiration time
// This shouldn't be necessary - erring on the side of caution for now as this pruning logic
// existed in the old store.
var prunedNonTerminatedExecutionIDs []string
for id, state := range s.idToState {
for id, state := range s.idToExecution {
if state.UpdatedAt.Before(expirationTime) {
delete(s.idToState, id)
delete(s.idToExecution, id)
prunedNonTerminatedExecutionIDs = append(prunedNonTerminatedExecutionIDs, id)
}
}
Expand Down
Loading
Loading