-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Workflow Update and Signal handlers concurrency sample #123
Workflow Update and Signal handlers concurrency sample #123
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
README should be updated referencing this sample
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We consider it bad practice to put non-workflow code with workflow code in the same file and so we don't do it in samples except for the hello
ones (which we may change see #49 and #67). Users have done bad things combining code since entire workflow files run in a sandbox including all non-workflow code/imports. Can we break this out to separate files like the other non-hello samples?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes changing the hello samples would be great as that's what I pattern-matched off of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Agreed (though I think there is some resistance to doing so), but yeah in the meantime I think matching the other whole-directory samples will work best here.
from temporalio.client import Client, WorkflowHandle | ||
from temporalio.worker import Worker | ||
|
||
# This samples shows off the key concurrent programming primitives for Workflows, especially |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# This samples shows off the key concurrent programming primitives for Workflows, especially | |
# This sample shows off the key concurrent programming primitives for Workflows, especially |
# - Running start_workflow with an initializer signal that you want to run before anything else. | ||
# | ||
@activity.defn | ||
async def allocate_nodes_to_job(nodes: List[int], task_name: str) -> List[int]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return doesn't match type hint (here and elsewhere)
# - Running start_workflow with an initializer signal that you want to run before anything else. | ||
# | ||
@activity.defn | ||
async def allocate_nodes_to_job(nodes: List[int], task_name: str) -> List[int]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually discourage multi-param activities/workflows in favor of single dataclass instances with multiple fields
self.nodes_lock.release() | ||
|
||
@workflow.run | ||
async def run(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would encourage explicit return type hints on workflow functions
if self.cluster_shutdown: | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could just put break
after the wait_condition
line inside the try
|
||
async with Worker( | ||
client, | ||
task_queue="tq", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To prevent clashing, in samples we try to name the task queue after the sample
async def test_atomic_message_handlers(client: Client): | ||
async with Worker( | ||
client, | ||
task_queue="tq", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would suggest unique task queues in tests
ClusterManager.run, | ||
id=f"ClusterManager-{uuid.uuid4()}", | ||
task_queue="tq", | ||
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not be necessary in tests (tests should be isolated where they shouldn't have to worry about other things that could be running)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM, only minor things
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can just be at a top-level directory of atomic_message_handlers
, no need to nest an extra directory deep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I wanted people to see updates and signals for discoverability, and we're planning at least one more updates sample.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We haven't usually grouped by those top-level features before but more by what the sample does. So we don't have interceptors/context_propagation
and interceptors/sentry
, just two top-level separate samples that use the same Temporal features. We just need to determine whether we want this type of grouping now and maybe apply it generally. I know our other samples repositories have also tried to avoid nesting most samples.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The primary README at the root of this repo should be updated to reference this sample
from temporalio import activity | ||
|
||
|
||
@dataclass(kw_only=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dataclass(kw_only=True) | |
@dataclass |
Probably not needed, but no big deal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer named arguments in general for 2+ parameters. Cuts down on callsite bugs and makes them clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can still use named arguments. We use them in lots of places, but since we're the only users of them we don't need to set this setting to force us to use them. Also, we have a CI check for our samples in 3.8 and I don't think this came about until 3.10 (we can look into relaxing our CI version constraints though).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, too bad. I'd rather people who pattern-match off of this sample be directed toward best practices. Will remove for now. I wonder if we have stats on python versions people actually use in the wild?
|
||
|
||
@activity.defn | ||
async def allocate_nodes_to_job(input: AllocateNodesToJobInput) -> List[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This return type hint seems invalid (same with some other functions)
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, | ||
start_signal="start_cluster", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I understand this is demonstrating handlers, arguably for users there is not much value of combining these two options together. If you know you always want to do something at the start of the workflow you could call it at the start of the workflow (e.g. when there is no state). No problem with it being here though, may just be a bit confusing.
for i in range(6): | ||
allocation_updates.append( | ||
wf.execute_update( | ||
ClusterManagerWorkflow.allocate_n_nodes_to_job, args=[f"task-{i}", 2] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to discourage multiple arguments to things (workflows, activities, signals, queries, updates, etc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I missed the updates, sorry.
) -> List[str]: | ||
await workflow.wait_condition(lambda: self.state.cluster_started) | ||
if self.state.cluster_shutdown: | ||
raise RuntimeError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This (and the ValueError
below) are task failures. You may want to use ApplicationError
.
cluster_shutdown: bool = False | ||
nodes: Optional[Dict[str, Optional[str]]] = None | ||
max_assigned_nodes: int = 0 | ||
num_assigned_nodes: int = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Porting this to .NET and not sure there is value storing this num-node field on "state" (and it's built out of lock, so it's a bit confusing)
nodes_to_free = [k for k, v in self.state.nodes.items() if v == task_name] | ||
# This await would be dangerous without nodes_lock because it yields control and allows interleaving. | ||
await self._deallocate_nodes_for_job(nodes_to_free, task_name) | ||
return "Done" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably don't need to return a value from this update
@workflow.update | ||
async def delete_job(self, task_name: str) -> str: | ||
await workflow.wait_condition(lambda: self.state.cluster_started) | ||
assert not self.state.cluster_shutdown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably match the error from allocate (see comment there, this will fail task by default, may prefer ApplicationError
)
) | ||
await asyncio.gather(*deletion_updates) | ||
|
||
await wf.signal(ClusterManagerWorkflow.shutdown_cluster) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arguably shutdown could be an update that returns what the workflow returns instead of making it a two-step process (but this is fine too)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool idea, and would show off the power of update. Ran out of time this A.M, though.
@workflow.update | ||
async def allocate_n_nodes_to_job( | ||
self, input: ClusterManagerAllocateNNodesToJobInput | ||
) -> List[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be nice to have docstrings on the signals and updates, e.g. explaining what the update returns. I'm thinking that this would help users understand why it's an update and how updates are useful.
self.state.nodes[node] = task_name | ||
|
||
@workflow.update | ||
async def delete_job(self, input: ClusterManagerDeleteJobInput): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't return anything, so I think readers will be wondering why it's an update rather than a signal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updates that don't return anything are totally reasonable (they can still raise errors for instance and just waiting on their completion means you know it completed, both of which are improvements over signals). However, I would strongly recommend a -> None
type hint here.
self.max_history_length | ||
and workflow.info().get_current_history_length() > self.max_history_length | ||
): | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes it more confusing from a pedagogical point of view. Might be nice to switch to e.g. using mock.patch
in the test to control CAN limit. (Non-blocking comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that it's not just a pytest affordance, it's also for the sample. (there's a --test-continue-as-new argument)
return True | ||
return False | ||
|
||
# max_history_size - to more conveniently test continue-as-new, not to be used in production. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this comment be here?
f"Cannot allocate {input.num_nodes} nodes; have only {len(unassigned_nodes)} available" | ||
) | ||
assigned_nodes = unassigned_nodes[: input.num_nodes] | ||
# This await would be dangerous without nodes_lock because it yields control and allows interleaving. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would help users understand the locking even more if this comment said what it is that shouldn't be interleaved.
@dataclass(kw_only=True) | ||
class ClusterManagerAllocateNNodesToJobInput: | ||
num_nodes: int | ||
task_name: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to use "job" xor "task" in names.
README.md
Outdated
@@ -52,6 +52,7 @@ Some examples require extra dependencies. See each sample's directory for specif | |||
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow. | |||
<!-- Keep this list in alphabetical order --> | |||
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language. | |||
* [atomic_message_handlers](updates_and_signals/atomic_message_handlers/) - Safely handling updates and signals. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the name of the sample should be changed to something like safe_message_handling
. It's not about atomicity -- the sample doesn't demonstrate rolling back of incomplete side effects. Rather it's about maintaining strict isolation between handler executions, via serialization of handler executions. In any case, we don't want users to think this is showing a specialized form of message handling that they can ignore; we want them to consider whether they need this for any workflow with message handlers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. "Safe" feels much more like something I'm supposed to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not the biggest fan of "safe" vs "atomic" since the latter is more discoverable/descriptive when looking at the list of samples, but I don't have a strong opinion here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cretz we could choose a word other than "safe", but I argued above that "atomic" isn't the right word.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think "atomic" relates to rollback at all. Atomic just means one at a time or uninterruptible, as opposed to "transactional". But many can also see it as meaning "quick" or "all or none", but I don't see it that way when I see it used. I think atomic is an ok word, but again I don't have a strong opinion. Also "safe" has a lot of meanings for Temporal workflow code. Many users will be ok w/ their handlers running concurrently and will still be "safe". Maybe "serial" or something, unsure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, to be honest I'm not in love with "safe" and implying that any other usage style is not safe.
Hm, an atomic operation is one that either completes in its entirety or behaves as if it never started, and can't be seen in an intermediate state. So, if the operation has multiple stages with side effects, that would require some notion of rollback. It's usually synonymous with "transactional". I agree it's closely related to the idea of serializing executions so that they occur one at a time, since that's one way of ensuring that one execution can't see in-progress state of another, but using "atomic" would imply that message handling that does multiple writes can rollback incomplete changes. I think here we're talking about "serialized message processing" or "preventing corruption of shared state by message handlers".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's more here than just concurrency, such as dangling handlers. Sticking with safe.
I'm think I'm going to touch on idempotency as well in my next push, though we probably should also add a more focused idempotency sample.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, to be honest I'm not in love with "safe" and implying that any other usage style is not safe.
I don't think it implies that. "Robust" is an alternate word.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: added idempotency. I didn't use the built-in update ID, since it wasn't necessary here. Maybe that can be our separate idempotency sample.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor things. Also there seems to be a test failure in CI.
README.md
Outdated
@@ -52,6 +52,7 @@ Some examples require extra dependencies. See each sample's directory for specif | |||
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow. | |||
<!-- Keep this list in alphabetical order --> | |||
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language. | |||
* [safe_message_handlers](updates_and_signals/safe_message_handlers/) - Safely handling updates and signals. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should keep this list in alphabetical order (see comment a couple of lines above). Also not a fan of nesting these non-hello samples beneath a directory unnecessarily (you'll note we don't do this much in other samples here or in many samples repos). If you must inconsistently nest this sample, you may want nested bullets here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We prefer tests to be in the same directory under tests
that they are in the top level. So /custom_converter/
tests are in /tests/custom_converter/
and therefore /updates_and_signals/safe_message_handlers/
tests should be under /tests/updates_and_signals/safe_message_handlers/
(granted as mentioned in comments before, I don't think we should nest sample dirs).
|
||
To run, first see [README.md](../../README.md) for prerequisites. | ||
|
||
Then, run the following from this directory to run the sample: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes people get confused that they can't just run these two commands in the same terminal because the first blocks. In our sample READMEs we usually make clear that the starter needs to be in a separate terminal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copy-pasted this. Looks like I got unlucky in which one I chose!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Yeah we are admittedly not consistent, this is not a blocker or anything.
|
||
|
||
@activity.defn | ||
async def allocate_nodes_to_job(input: AllocateNodesToJobInput): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should provide type hints for every activity return, even if -> None
, it helps callers
cluster_manager_handle = await client.start_workflow( | ||
ClusterManagerWorkflow.run, | ||
ClusterManagerInput(test_continue_as_new=should_test_continue_as_new), | ||
id=f"ClusterManagerWorkflow-{uuid.uuid4()}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other samples we have used fixed workflow IDs, but don't technically have to here, but it makes the id_reuse_policy
have no value since this is always unique
ClusterManagerWorkflow.run, | ||
ClusterManagerInput(test_continue_as_new=should_test_continue_as_new), | ||
id=f"ClusterManagerWorkflow-{uuid.uuid4()}", | ||
task_queue="atomic-message-handlers-task-queue", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Task queue was not changed to match the sample name
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes], | ||
): | ||
# Wait until interrupted | ||
logging.info("ClusterManagerWorkflow worker started, ctrl+c to exit") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent use of logging vs print
from temporalio.client import Client, WorkflowHandle | ||
from temporalio.common import RetryPolicy | ||
from temporalio.exceptions import ApplicationError | ||
from temporalio.worker import Worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Several unused imports above. (We're switching to ruff for Python repos -- using the ruff VSCode extension will make sense and highlights these.)
self.sleep_interval_seconds: int = 600 | ||
|
||
@workflow.signal | ||
async def start_cluster(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't need to be `async, and I think there's a pedagogical argument for making it not async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, let me take that back. Somewhere out in the real world, there's a compute cluster that this workflow represents. For this example to be realistic, start_cluster
would need to use an activity to make a network call in order to start that cluster (thus ensuring that workflow state is in sync with real cluster state).
return self.get_assigned_nodes(job_name=input.job_name) | ||
|
||
async def _allocate_nodes_to_job( | ||
self, assigned_nodes: List[str], job_name: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can choose either "allocate" or "assign" as the term used throughout this sample. "assign to" probably better than "allocate to". Argument here could be named nodes_to_assign
.
cluster_started: bool = False | ||
cluster_shutdown: bool = False | ||
nodes: Dict[str, Optional[str]] = dataclasses.field(default_factory=dict) | ||
jobs_added: Set[str] = dataclasses.field(default_factory=set) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jobs_added
isn't a great name :) Blindly naming according to semantics/function would yield something like jobs_with_nodes_assigned_already
-- so, some more streamlined name that captures those semantics reasonably well?
I think there also needs to be a note somewhere explaining that our idempotency rules are that you cannot assign nodes twice to the same job; instead that will return a response indicating the already-assigned nodes.
"Cannot allocate nodes to a job: Cluster is already shut down" | ||
) | ||
|
||
async with self.nodes_lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's demonstrate timeout here (use asyncio.wait_for
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to punt on this for now to try to get this PR closed down.
# will cause the workflow to keep retrying and get it stuck. | ||
raise ApplicationError( | ||
f"Cannot allocate {input.num_nodes} nodes; have only {len(unassigned_nodes)} available" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create a validator for this update. Minimally, it should reject if input.num_nodes
is negative. If it also rejects if input.num_nodes == 0
, then I believe we could get rid of the jobsWithNodesAssignedAlready
data structure and replace the logic with a dynamic computation: input.job_name in nodes.values()
(i.e. is the requested job in the set of job names that have at least one assigned node).
Regarding the dynamic/run-time dependent logic len(unassigned_nodes) >= input.num_nodes
check, I think it should not go in the validator, since (a) that ensures that we do the work if-and-only-if it's possible at run-time, and (b) probably there's an argument that statically valid requests that happened not to have available resources at run-time should leave a record in history.
If any/all of this makes sense, perhaps it's worth adding the reasoning to comments in the the code to really get people understanding the nuances.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like valid (ha!) feedback, but for a separate PR.
# before sending work to those nodes. | ||
# Returns the list of node names that were allocated to the job. | ||
@workflow.update | ||
async def allocate_n_nodes_to_job( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you definitely want the name to be assign_n_nodes
rather than just assign_nodes
? I vote the latter.
|
||
async with self.nodes_lock: | ||
# Idempotency guard. | ||
if input.job_name in self.state.jobs_added: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See below, this could be if input.job_name in self.state.nodes.values()
rather than maintaining a separate materialized data structure.
) | ||
nodes_to_assign = unassigned_nodes[: input.num_nodes] | ||
# This await would be dangerous without nodes_lock because it yields control and allows interleaving | ||
# with delete_job and perform_health_checks, which both touch self.state.nodes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checks above would also be dangerous without the node lock held, so let's either move this comment up to where we acquire the lock, or add additional comments up there. Probably the former:
We need to acquire the lock here in order to perform some checks that depend on the contents of
self.state.nodes
and then to...
(I can have a stab at drafting that comment; it's a little bit involved to correctly document all the reasons why the lock must be held (atomicity of checks with mutation and prevention of interleaving with other coroutines are related but distinct), but it might be pedagogically valuable.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the part that makes the lock necessary as there are no blocking calls above.
raise ApplicationError("Cannot delete a job: Cluster is already shut down") | ||
|
||
async with self.nodes_lock: | ||
nodes_to_free = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"free" is a third synonym (we already have unassign, deallocate). As I mentioned above, my suggestion is to pick a single verb and use it everywhere. (So suggest nodes_to_unassign
here.)
find_bad_nodes, | ||
FindBadNodesInput(nodes_to_check=assigned_nodes), | ||
start_to_close_timeout=timedelta(seconds=10), | ||
# This health check is optional, and our lock would block the whole workflow if we let it retry forever. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seeing as this is going in front of the public we probably want to gesture to what good practices would be here. Currently, if the health check fails, this throws an unhandled exception in the main wf loop, right?
self.sleep_interval_seconds: int = 600 | ||
|
||
@workflow.signal | ||
async def start_cluster(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, let me take that back. Somewhere out in the real world, there's a compute cluster that this workflow represents. For this example to be realistic, start_cluster
would need to use an activity to make a network call in order to start that cluster (thus ensuring that workflow state is in sync with real cluster state).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great. Feel free to leave my comments for follow-on PRs (and flag those you disagree with on any timescale).
CI needs to pass then we can merge |
* list -> set to fix a test * Return a struct rather than a raw value from the list for better hygiene * Remove test dependency on race conditions between health check and adding nodes.
|
This is blocked on the Java Test Service being fixed for updates (or switching those tests away from using that time-skipping service) |
Skip update tests under Java test server
What was changed
Added a ClusterManager sample that shows off
workflow.wait_condition
in handlers as well as the use of a mutex to guarantee atomicity.Why?
As part of our effort to teach users about interleaving of blocking signal and update handlers, as well as about a workflow's reentrancy model in general, we are producing samples.
Checklist
Closes
How was this tested:
poetry run pytest tests/updates_and_signals/atomic_message_handlers_test.py