-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updated output handling of asynchronously run workflows to be the same as when run synchronously #6228
base: main
Are you sure you want to change the base?
Conversation
…e as when run synchronously
{ | ||
elsa.UseWorkflowRuntime(workflowRuntime => { | ||
workflowRuntime.ActivityExecutionLogStore = sp => activityExecutionStore; | ||
workflowRuntime.WorkflowRuntime = sp => sp.GetRequiredService<DistributedWorkflowRuntime>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is a separate bug or not... but to be able to run multiple asynchronous activities in parallel (the only way it makes sense to me) the new DistributedWorkflowRuntime
workflow runtime must be used. I don't believe that this was not the case with 3.2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was indeed intentional to require the DistributedWorkflowRuntime
or ProtoActorWorkflowRuntime
to have support for parallel activity execution. To be honest, I'd love it to be able to still have support for parallel activity execution using the LocalWorkflowRuntime
. Originally, the DistributedWorkflowRuntime
was created for hosting Elsa on multiple nodes, which requires distributed locking. But it's fair to expect that when running on a single node, you should still be able to run activities in parallel. Perhaps this could be added easily by updating the LocalWorkflowRuntime
with a semaphore to synchronise access to the workflow instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it should be possible to run activities in parallel when running on a single node. I'll likely look at this as a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My solution for this (running activities in parallel running on a single node) was to use a memory based implementation of distributed lock, and then to just use DistributedWorkflowRuntime. The main snag I ran into when I tried a difference approach is that DistributedWorkflowClient internally uses LocalWorkflowClient. If you are interested in the memory based implementation of distributed lock let me know, otherwise I'll just keep it local.
src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs
Show resolved
Hide resolved
|
||
return outputValues; | ||
{ | ||
var activityOutputRegister = activityExecutionContext.WorkflowExecutionContext.GetActivityOutputRegister(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fear that this "solution" may be too simplistic since the old code clearly went out of it's way to only return values that are backed by WorkflowInstanceStorageDriver
. I'm not sure that this is a concern now considering that the parent workflow seems to be now resumed in the same thread. Anyway - I'd welcome discussion in this particular change (as well as any other change!).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concern is that when an activity produces a large object, e.g. a file or JSON string of 5 MB, the user might use a different variable storage driver, such as "Blob Storage Driver" that stores the data in Azure Storage.
For this reason, we want to avoid those large values to become part of the output register, given that it will be serialized as well.
Obviously, the current implementation is rather crude and intended to be a temporary hack.
The ultimate solution is to allow for better control of how output is recorded.
For example, for small values, it might be OK to be recorded. The assumption here is that if the storage driver is "Workflow Instance", which stores the value as part of the workflow instance, it's safe to also include it as part of the output register.
For large values, the assumption is that the user will have used the Memory storage driver or a custom one such as "Blob Storage Driver". In this case, we definitely do not want the value to be recorded in the output register.
Now, we actually do want to record something in the output register. In the case of large blobs, a download link could be appropriate.
So perhaps a cleaner approach would be to leverage the associated storage driver to have it produce a value suitable for storing in the output register. This would clean up this code because now we no longer need to test against the storage driver type.
Pending that solution, perhaps for now it might be best to keep the code to only include variables into the output if those variables are associated with a workflow instance driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm. Having inclusion in the output register dependent on the associated storage driver implies that unmapped (unbound) outputs would continue to not be registered in the output register when activities are run asynchronously. And having synchronously executed activities behave differently than asynchronously executed activities also feels odd (for synchronously executed activities I think all outputs are registered regardless of being bound or not, and regardless of storage driver). And, in fact, for bound outputs I think it is less important to be included in the output register since down stream activities can just map to the variable.
Perhaps the best solution would be for me to introduce a IBackgroundActivityOutputExtractor
service where the default implementation would do exactly what the existing (prior to my PR) BackgroundActivityInvoker.ExtractActivityOutput
method does and would allow me to use an implementation that instead returns all outputs (basically what I have in this PR). What do you think of this @sfmskywalker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, considering the bug is actually more about the logging of outputs in the ActivityExecutionRecord
and not about the activity output register (although there is certainly overlap)... how about we use the activities specified persistence mode to determine whether to return the output from the background activity (also taking into account any outputs which are bound to WorkflowInstanceStorageDriver variables)? So, if an activity executed asynchronously, we would return outputs that are either configured to LogPersistenceMode.Include or bound to a WorkflowInstanceStorageDriver variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for a wonderful PR, and even with tests!
I think that the only thing left to be able to merge is the BackgroundActivityInvoker
(see comments)
|
||
return outputValues; | ||
{ | ||
var activityOutputRegister = activityExecutionContext.WorkflowExecutionContext.GetActivityOutputRegister(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concern is that when an activity produces a large object, e.g. a file or JSON string of 5 MB, the user might use a different variable storage driver, such as "Blob Storage Driver" that stores the data in Azure Storage.
For this reason, we want to avoid those large values to become part of the output register, given that it will be serialized as well.
Obviously, the current implementation is rather crude and intended to be a temporary hack.
The ultimate solution is to allow for better control of how output is recorded.
For example, for small values, it might be OK to be recorded. The assumption here is that if the storage driver is "Workflow Instance", which stores the value as part of the workflow instance, it's safe to also include it as part of the output register.
For large values, the assumption is that the user will have used the Memory storage driver or a custom one such as "Blob Storage Driver". In this case, we definitely do not want the value to be recorded in the output register.
Now, we actually do want to record something in the output register. In the case of large blobs, a download link could be appropriate.
So perhaps a cleaner approach would be to leverage the associated storage driver to have it produce a value suitable for storing in the output register. This would clean up this code because now we no longer need to test against the storage driver type.
Pending that solution, perhaps for now it might be best to keep the code to only include variables into the output if those variables are associated with a workflow instance driver.
{ | ||
elsa.UseWorkflowRuntime(workflowRuntime => { | ||
workflowRuntime.ActivityExecutionLogStore = sp => activityExecutionStore; | ||
workflowRuntime.WorkflowRuntime = sp => sp.GetRequiredService<DistributedWorkflowRuntime>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was indeed intentional to require the DistributedWorkflowRuntime
or ProtoActorWorkflowRuntime
to have support for parallel activity execution. To be honest, I'd love it to be able to still have support for parallel activity execution using the LocalWorkflowRuntime
. Originally, the DistributedWorkflowRuntime
was created for hosting Elsa on multiple nodes, which requires distributed locking. But it's fair to expect that when running on a single node, you should still be able to run activities in parallel. Perhaps this could be added easily by updating the LocalWorkflowRuntime
with a semaphore to synchronise access to the workflow instance.
These changes result in outputs of activities run asynchronously to be treated the same as if the activity were run synchronously with regards to ActivityExecutionRecord and the activity output register.
See #6227 for more details.
This change is