Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Create workflow_v2 processor #16

Closed
wants to merge 29 commits into from

Conversation

jem-davies
Copy link
Collaborator

@jem-davies jem-davies commented Jun 2, 2024

This Draft Pull Request is to request interim feedback on the proposed solution to the issue #17

A new Benthos Processor workflow_v2 has been created here.

Some example benthos config files have been included in ./benthos/cmd/benthos.

config_old.yaml - runs the example DAG from the workflow processor docs using the workflow processor
config.yaml - runs the example DAG from the workflow processor docs using the new workflow_v2 processor
The config_old.yaml will output:

INFO STAGE A FINISHED   
INFO STAGE C FINISHED   
INFO STAGE B FINISHED   
INFO STAGE E FINISHED   
INFO STAGE D FINISHED   
INFO STAGE F FINISHED   

The config.yaml will output:

INFO STAGE A FINISHED   
INFO STAGE C FINISHED   
INFO STAGE E FINISHED   
INFO STAGE F FINISHED   
INFO STAGE B FINISHED   
INFO STAGE D FINISHED  

Notice that E will start / finish before B is started.

I would greatly appreciate some feedback regarding the proposed solution before I spend more time fixing some of the things that I know that I need to do:

  • unit tests
  • writing docs
  • implementing the ability to restart DAG execution at particular places described on the workflow documentation under structured metadata
  • Currently there is no logic to process batches > 1 .

Also I am a quite new to Go - so could well have made some newbie mistakes.

Specific questions that I have are:

Should this be a new processor i.e V2 or should the existing workflow be altered?
The ability to infer the DAG from the request_map fields of the branch - Is this something that the V2 needs to have?

Signed-off-by: Jem Davies <[email protected]>
Copy link
Collaborator

@gregfurman gregfurman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking great, Jem! Very happy with the changes + documentation for this. Couple of things:

  • Remove all those test flles in the cmd/bento directory
  • Some grammar/phrasing nits
  • Small concern re: the channel + goroutine batching functionality -- am worried this could introduce a memory leak
  • One or two concerns about race conditions when operating on branches concurrently

Comment on lines +25 to +32
func (w *workflowBranchMapV2) Close(ctx context.Context) error {
for _, c := range w.Branches {
if err := c.Close(ctx); err != nil {
return err
}
}
return nil
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maybe be using a mutex here?

Also, am thinking we should try close all branches as opposed to returning an error early if a single Close() fails -- concerned about memory leaks.


// Locks all branches contained in the branch map and returns the latest DAG, a
// map of resources, and a func to unlock the resources that were locked. If
// any error occurs in locked each branch (the resource is missing, or the DAG
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// any error occurs in locked each branch (the resource is missing, or the DAG
// an error occurs in any locked branch (the resource is missing, or the DAG


func validateDAG(graph map[string][]string) bool {
// Status maps to track the state of each node:
// 0 = unvisited, 1 = visiting, 2 = visited
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider making these values enums. IMO it'll make he function easier to follow

const (
   Unvisited NodeState = iota
   Visiting
   Visited
)

Description(`
## workflow vs workflow_v2

The workflow_v2 processor is an evolution of the original `+"[`workflow` processor][processors.workflow]"+`. The two key differences are: a change to the way the topology of branch processors are defined & an enhancement that increases the parallelism of the DAG execution. Also, the original workflow processor has some features such as: implicitly creating the DAG based upon the request_map & result_map field of the branch processors, which have been dropped in workflow_v2.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: believe "and" makes more sense here than "&" but feel free to ignore this lol

Suggested change
The workflow_v2 processor is an evolution of the original `+"[`workflow` processor][processors.workflow]"+`. The two key differences are: a change to the way the topology of branch processors are defined & an enhancement that increases the parallelism of the DAG execution. Also, the original workflow processor has some features such as: implicitly creating the DAG based upon the request_map & result_map field of the branch processors, which have been dropped in workflow_v2.
The workflow_v2 processor is an evolution of the original `+"[`workflow` processor][processors.workflow]"+`. The two key differences are: a change to the way the topology of branch processors are defined and an enhancement that increases the parallelism of the DAG execution. Also, the original workflow processor has some features such as: implicitly creating the DAG based upon the request_map and result_map field of the branch processors, which have been dropped in workflow_v2.

Description("A [dot path](/docs/configuration/field_paths) indicating where to store and reference structured metadata about the workflow_v2 execution.").
Default("meta.workflow_v2"),
service.NewObjectMapField(wflowProcFieldBranchesV2, workflowv2BranchSpecFields()...).
Description("An object of named [`branch` processors](/docs/components/processors/branch) that make up the workflow_v2."))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Description("An object of named [`branch` processors](/docs/components/processors/branch) that make up the workflow_v2."))
Description("An object named [`branch` processors](/docs/components/processors/branch) that make up the workflow_v2."))

Comment on lines 469 to 471
resultsBodge := make([]*message.Part, msg.Len())
xxx := mssge.results[0]
resultsBodge[mssge.mssgPartID] = xxx[0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I assume these variables are going to be renamed 🫠


batchResultChan := make(chan collector)

go func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this functionality be placed in a separate function?


branchMsg, branchSpans := tracing.WithChildSpans(w.tracer, eid, propMsg.ShallowCopy())

go func(i int, id string) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comment as above. Maybe place goroutine code in separate function


go func() {
for {
mssge := <-batchResultChan
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this may cause a memory leak since this channel is never closed. The <-batchResultChan will always block and cause the loop to never finish, and the goroutine to never clear.

I think you should create another channel called doneCh and run a for-select loop, where the function returns if/when doneCh <- true is called else the rest of the function runs as normal in the <-batchResultChan case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine all these files should be discarded/removed from your branch

@jem-davies jem-davies closed this Sep 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants