Skip to content

Commit

Permalink
Support trigger continueAsNew API (#385)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored May 15, 2024
1 parent a303990 commit c6fb37c
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 19 deletions.
17 changes: 17 additions & 0 deletions service/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,23 @@ func (h *handler) apiV1WorkflowConfigUpdate(c *gin.Context) {
return
}

func (h *handler) apiV1WorkflowTriggerContinueAsNew(c *gin.Context) {
var req iwfidl.TriggerContinueAsNewRequest
if err := c.ShouldBindJSON(&req); err != nil {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJson(req)))

errResp := h.svc.ApiV1WorkflowTriggerContinueAsNew(c.Request.Context(), req)
if errResp != nil {
h.processError(c, errResp)
return
}
c.JSON(http.StatusOK, struct{}{})
return
}

func (h *handler) apiV1WorkflowSearch(c *gin.Context) {
var req iwfidl.WorkflowSearchRequest
if err := c.ShouldBindJSON(&req); err != nil {
Expand Down
43 changes: 33 additions & 10 deletions service/api/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,43 @@ import (
)

type ApiService interface {
ApiV1WorkflowStartPost(ctx context.Context, request iwfidl.WorkflowStartRequest) (*iwfidl.WorkflowStartResponse, *errors.ErrorAndStatus)
ApiV1WorkflowWaitForStateCompletion(ctx context.Context, request iwfidl.WorkflowWaitForStateCompletionRequest) (*iwfidl.WorkflowWaitForStateCompletionResponse, *errors.ErrorAndStatus)
ApiV1WorkflowStartPost(
ctx context.Context, request iwfidl.WorkflowStartRequest,
) (*iwfidl.WorkflowStartResponse, *errors.ErrorAndStatus)
ApiV1WorkflowWaitForStateCompletion(
ctx context.Context, request iwfidl.WorkflowWaitForStateCompletionRequest,
) (*iwfidl.WorkflowWaitForStateCompletionResponse, *errors.ErrorAndStatus)
ApiV1WorkflowSignalPost(ctx context.Context, request iwfidl.WorkflowSignalRequest) *errors.ErrorAndStatus
ApiV1WorkflowStopPost(ctx context.Context, request iwfidl.WorkflowStopRequest) *errors.ErrorAndStatus
ApiV1WorkflowConfigUpdate(ctx context.Context, request iwfidl.WorkflowConfigUpdateRequest) *errors.ErrorAndStatus
ApiV1WorkflowGetQueryAttributesPost(ctx context.Context, request iwfidl.WorkflowGetDataObjectsRequest) (*iwfidl.WorkflowGetDataObjectsResponse, *errors.ErrorAndStatus)
ApiV1WorkflowGetSearchAttributesPost(ctx context.Context, request iwfidl.WorkflowGetSearchAttributesRequest) (*iwfidl.WorkflowGetSearchAttributesResponse, *errors.ErrorAndStatus)
ApiV1WorkflowGetPost(ctx context.Context, request iwfidl.WorkflowGetRequest) (*iwfidl.WorkflowGetResponse, *errors.ErrorAndStatus)
ApiV1WorkflowGetWithWaitPost(ctx context.Context, request iwfidl.WorkflowGetRequest) (*iwfidl.WorkflowGetResponse, *errors.ErrorAndStatus)
ApiV1WorkflowSearchPost(ctx context.Context, request iwfidl.WorkflowSearchRequest) (*iwfidl.WorkflowSearchResponse, *errors.ErrorAndStatus)
ApiV1WorkflowRpcPost(ctx context.Context, request iwfidl.WorkflowRpcRequest) (*iwfidl.WorkflowRpcResponse, *errors.ErrorAndStatus)
ApiV1WorkflowResetPost(ctx context.Context, request iwfidl.WorkflowResetRequest) (*iwfidl.WorkflowResetResponse, *errors.ErrorAndStatus)
ApiV1WorkflowTriggerContinueAsNew(
ctx context.Context, req iwfidl.TriggerContinueAsNewRequest,
) (retError *errors.ErrorAndStatus)
ApiV1WorkflowGetQueryAttributesPost(
ctx context.Context, request iwfidl.WorkflowGetDataObjectsRequest,
) (*iwfidl.WorkflowGetDataObjectsResponse, *errors.ErrorAndStatus)
ApiV1WorkflowGetSearchAttributesPost(
ctx context.Context, request iwfidl.WorkflowGetSearchAttributesRequest,
) (*iwfidl.WorkflowGetSearchAttributesResponse, *errors.ErrorAndStatus)
ApiV1WorkflowGetPost(
ctx context.Context, request iwfidl.WorkflowGetRequest,
) (*iwfidl.WorkflowGetResponse, *errors.ErrorAndStatus)
ApiV1WorkflowGetWithWaitPost(
ctx context.Context, request iwfidl.WorkflowGetRequest,
) (*iwfidl.WorkflowGetResponse, *errors.ErrorAndStatus)
ApiV1WorkflowSearchPost(
ctx context.Context, request iwfidl.WorkflowSearchRequest,
) (*iwfidl.WorkflowSearchResponse, *errors.ErrorAndStatus)
ApiV1WorkflowRpcPost(
ctx context.Context, request iwfidl.WorkflowRpcRequest,
) (*iwfidl.WorkflowRpcResponse, *errors.ErrorAndStatus)
ApiV1WorkflowResetPost(
ctx context.Context, request iwfidl.WorkflowResetRequest,
) (*iwfidl.WorkflowResetResponse, *errors.ErrorAndStatus)
ApiV1WorkflowSkipTimerPost(ctx context.Context, request iwfidl.WorkflowSkipTimerRequest) *errors.ErrorAndStatus
ApiV1WorkflowDumpPost(ctx context.Context, request iwfidl.WorkflowDumpRequest) (*iwfidl.WorkflowDumpResponse, *errors.ErrorAndStatus)
ApiV1WorkflowDumpPost(
ctx context.Context, request iwfidl.WorkflowDumpRequest,
) (*iwfidl.WorkflowDumpResponse, *errors.ErrorAndStatus)
ApiInfoHealth(ctx context.Context) *iwfidl.HealthInfo
Close()
}
2 changes: 2 additions & 0 deletions service/api/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const WorkflowSkipTimerApiPath = "/api/v1/workflow/timer/skip"
const WorkflowStopApiPath = "/api/v1/workflow/stop"
const WorkflowInternalDumpApiPath = "/api/v1/workflow/internal/dump"
const WorkflowConfigUpdateApiPath = "/api/v1/workflow/config/update"
const WorkflowTriggerContinueAsNewApiPath = "/api/v1/workflow/triggerContinueAsNew"
const WorkflowRpcApiPath = "/api/v1/workflow/rpc"
const InfoHealthCheck = "/info/healthcheck"

Expand All @@ -43,6 +44,7 @@ func NewService(config config.Config, client uclient.UnifiedClient, logger log.L
router.POST(WorkflowSkipTimerApiPath, handler.apiV1WorkflowSkipTimer)
router.POST(WorkflowInternalDumpApiPath, handler.apiV1WorkflowInternalDump)
router.POST(WorkflowConfigUpdateApiPath, handler.apiV1WorkflowConfigUpdate)
router.POST(WorkflowTriggerContinueAsNewApiPath, handler.apiV1WorkflowTriggerContinueAsNew)
router.POST(WorkflowRpcApiPath, handler.apiV1WorkflowRpc)
router.GET(InfoHealthCheck, handler.infoHealthCheck)

Expand Down
13 changes: 13 additions & 0 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,19 @@ func (s *serviceImpl) ApiV1WorkflowConfigUpdate(
return nil
}

func (s *serviceImpl) ApiV1WorkflowTriggerContinueAsNew(
ctx context.Context, req iwfidl.TriggerContinueAsNewRequest,
) (retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

err := s.client.SignalWorkflow(ctx,
req.GetWorkflowId(), req.GetWorkflowRunId(), service.TriggerContinueAsNewSignalChannelName, nil)
if err != nil {
return s.handleError(err, WorkflowTriggerContinueAsNewApiPath, req.GetWorkflowId())
}
return nil
}

func (s *serviceImpl) ApiV1WorkflowStopPost(
ctx context.Context, req iwfidl.WorkflowStopRequest,
) (retError *errors.ErrorAndStatus) {
Expand Down
11 changes: 6 additions & 5 deletions service/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ const (

IwfSystemConstPrefix = "__IwfSystem_"

SkipTimerSignalChannelName = IwfSystemConstPrefix + "SkipTimerChannel"
FailWorkflowSignalChannelName = IwfSystemConstPrefix + "FailWorkflowChannel"
UpdateConfigSignalChannelName = IwfSystemConstPrefix + "UpdateWorkflowConfig"
ExecuteRpcSignalChannelName = IwfSystemConstPrefix + "ExecuteRpc"
StateCompletionSignalChannelName = IwfSystemConstPrefix + "StateCompletion"
SkipTimerSignalChannelName = IwfSystemConstPrefix + "SkipTimerChannel"
FailWorkflowSignalChannelName = IwfSystemConstPrefix + "FailWorkflowChannel"
UpdateConfigSignalChannelName = IwfSystemConstPrefix + "UpdateWorkflowConfig"
ExecuteRpcSignalChannelName = IwfSystemConstPrefix + "ExecuteRpc"
StateCompletionSignalChannelName = IwfSystemConstPrefix + "StateCompletion"
TriggerContinueAsNewSignalChannelName = IwfSystemConstPrefix + "TriggerContinueAsNew"

WorkerUrlMemoKey = IwfSystemConstPrefix + "WorkerUrl"
UseMemoForDataAttributesKey = IwfSystemConstPrefix + "UseMemoForDataAttributes"
Expand Down
13 changes: 12 additions & 1 deletion service/interpreter/continueAsNewCounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ type ContinueAsNewCounter struct {
executedStateApis int32
signalsReceived int32
syncUpdateReceived int32
triggeredByAPI bool

configer *WorkflowConfiger
rootCtx UnifiedContext
provider WorkflowProvider
}

func NewContinueAsCounter(configer *WorkflowConfiger, rootCtx UnifiedContext, provider WorkflowProvider) *ContinueAsNewCounter {
func NewContinueAsCounter(
configer *WorkflowConfiger, rootCtx UnifiedContext, provider WorkflowProvider,
) *ContinueAsNewCounter {
return &ContinueAsNewCounter{
configer: configer,

Expand All @@ -35,6 +38,10 @@ func (c *ContinueAsNewCounter) IncSyncUpdateReceived() {
}

func (c *ContinueAsNewCounter) IsThresholdMet() bool {
if c.triggeredByAPI {
return true
}

// Note: when threshold == 0, it means unlimited

config := c.configer.Get()
Expand All @@ -45,3 +52,7 @@ func (c *ContinueAsNewCounter) IsThresholdMet() bool {

return totalOperations >= config.GetContinueAsNewThreshold()
}

func (c *ContinueAsNewCounter) TriggerByAPI() {
c.triggeredByAPI = true
}
30 changes: 27 additions & 3 deletions service/interpreter/signalReceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ type SignalReceiver struct {
persistenceManager *PersistenceManager
}

func NewSignalReceiver(ctx UnifiedContext, provider WorkflowProvider, interStateChannel *InterStateChannel, stateRequestQueue *StateRequestQueue,
persistenceManager *PersistenceManager, tp *TimerProcessor, continueAsNewCounter *ContinueAsNewCounter, workflowConfiger *WorkflowConfiger,
initReceivedSignals map[string][]*iwfidl.EncodedObject) *SignalReceiver {
func NewSignalReceiver(
ctx UnifiedContext, provider WorkflowProvider, interStateChannel *InterStateChannel,
stateRequestQueue *StateRequestQueue,
persistenceManager *PersistenceManager, tp *TimerProcessor, continueAsNewCounter *ContinueAsNewCounter,
workflowConfiger *WorkflowConfiger,
initReceivedSignals map[string][]*iwfidl.EncodedObject,
) *SignalReceiver {
if initReceivedSignals == nil {
initReceivedSignals = map[string][]*iwfidl.EncodedObject{}
}
Expand Down Expand Up @@ -110,6 +114,26 @@ func NewSignalReceiver(ctx UnifiedContext, provider WorkflowProvider, interState
}
})

provider.GoNamed(ctx, "trigger-continue-as-new-handler", func(ctx UnifiedContext) {
// NOTE: unlike other signal channels, this one doesn't need to drain during continueAsNew
// because if there is a continueAsNew, this signal is not needed anymore
ch := provider.GetSignalChannel(ctx, service.TriggerContinueAsNewSignalChannelName)

received := false
err := provider.Await(ctx, func() bool {
received = ch.ReceiveAsync(nil)
return received || continueAsNewCounter.IsThresholdMet()
})
if err != nil {
return
}
if received {
continueAsNewCounter.TriggerByAPI()
return
}
return
})

provider.GoNamed(ctx, "execute-rpc-signal-handler", func(ctx UnifiedContext) {
for {
ch := provider.GetSignalChannel(ctx, service.ExecuteRpcSignalChannelName)
Expand Down

0 comments on commit c6fb37c

Please sign in to comment.