-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Workflow Update and Signal handlers concurrency sample #123
Changes from 19 commits
5d0307d
bca534a
8b0a6ed
fb7b32f
42d1f12
c96f06d
6944099
ec1fb89
dd58c64
37e56ed
a1506b1
2cad3dd
d5db7d7
f39841c
344d694
fc74a69
0b84c25
c8e9075
4fc6dac
3ba8882
f47369e
5dc6185
5b45b21
ce4d384
52429bd
74867f1
c6bdd12
62f24a2
d933042
31e2d59
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We prefer tests to be in the same directory under |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
import uuid | ||
|
||
from temporalio import common, workflow | ||
from temporalio.client import Client, WorkflowUpdateFailedError | ||
from temporalio.exceptions import ApplicationError | ||
from temporalio.worker import Worker | ||
|
||
from updates_and_signals.safe_message_handlers.activities import ( | ||
allocate_nodes_to_job, | ||
deallocate_nodes_for_job, | ||
find_bad_nodes, | ||
) | ||
from updates_and_signals.safe_message_handlers.starter import do_cluster_lifecycle | ||
from updates_and_signals.safe_message_handlers.workflow import ( | ||
ClusterManagerAllocateNNodesToJobInput, | ||
ClusterManagerInput, | ||
ClusterManagerWorkflow, | ||
) | ||
|
||
|
||
async def test_safe_message_handlers(client: Client): | ||
task_queue = f"tq-{uuid.uuid4()}" | ||
async with Worker( | ||
client, | ||
task_queue=task_queue, | ||
workflows=[ClusterManagerWorkflow], | ||
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes], | ||
): | ||
cluster_manager_handle = await client.start_workflow( | ||
ClusterManagerWorkflow.run, | ||
ClusterManagerInput(), | ||
id=f"ClusterManagerWorkflow-{uuid.uuid4()}", | ||
task_queue=task_queue, | ||
) | ||
await do_cluster_lifecycle(cluster_manager_handle, delay_seconds=1) | ||
result = await cluster_manager_handle.result() | ||
assert result.max_assigned_nodes == 12 | ||
assert result.num_currently_assigned_nodes == 0 | ||
|
||
|
||
async def test_update_idempotency(client: Client): | ||
task_queue = f"tq-{uuid.uuid4()}" | ||
async with Worker( | ||
client, | ||
task_queue=task_queue, | ||
workflows=[ClusterManagerWorkflow], | ||
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes], | ||
): | ||
cluster_manager_handle = await client.start_workflow( | ||
ClusterManagerWorkflow.run, | ||
ClusterManagerInput(), | ||
id=f"ClusterManagerWorkflow-{uuid.uuid4()}", | ||
task_queue=task_queue, | ||
) | ||
|
||
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster) | ||
|
||
nodes_1 = await cluster_manager_handle.execute_update( | ||
ClusterManagerWorkflow.allocate_n_nodes_to_job, | ||
ClusterManagerAllocateNNodesToJobInput(num_nodes=5, job_name=f"jobby-job"), | ||
) | ||
# simulate that in calling it twice, the operation is idempotent | ||
nodes_2 = await cluster_manager_handle.execute_update( | ||
ClusterManagerWorkflow.allocate_n_nodes_to_job, | ||
ClusterManagerAllocateNNodesToJobInput(num_nodes=5, job_name=f"jobby-job"), | ||
) | ||
# the second call should not allocate more nodes (it may return fewer if the health check finds bad nodes | ||
# in between the two signals.) | ||
assert nodes_1 >= nodes_2 | ||
|
||
|
||
async def test_update_failure(client: Client): | ||
task_queue = f"tq-{uuid.uuid4()}" | ||
async with Worker( | ||
client, | ||
task_queue=task_queue, | ||
workflows=[ClusterManagerWorkflow], | ||
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes], | ||
): | ||
cluster_manager_handle = await client.start_workflow( | ||
ClusterManagerWorkflow.run, | ||
ClusterManagerInput(), | ||
id=f"ClusterManagerWorkflow-{uuid.uuid4()}", | ||
task_queue=task_queue, | ||
) | ||
|
||
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster) | ||
|
||
await cluster_manager_handle.execute_update( | ||
ClusterManagerWorkflow.allocate_n_nodes_to_job, | ||
ClusterManagerAllocateNNodesToJobInput(num_nodes=24, job_name=f"big-task"), | ||
) | ||
try: | ||
# Try to allocate too many nodes | ||
await cluster_manager_handle.execute_update( | ||
ClusterManagerWorkflow.allocate_n_nodes_to_job, | ||
ClusterManagerAllocateNNodesToJobInput( | ||
num_nodes=3, job_name=f"little-task" | ||
), | ||
) | ||
except WorkflowUpdateFailedError as e: | ||
assert isinstance(e.cause, ApplicationError) | ||
assert e.cause.message == "Cannot allocate 3 nodes; have only 1 available" | ||
finally: | ||
await cluster_manager_handle.signal(ClusterManagerWorkflow.shutdown_cluster) | ||
result = await cluster_manager_handle.result() | ||
assert result.num_currently_assigned_nodes + result.num_bad_nodes == 24 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
# Atomic message handlers | ||
|
||
This sample shows off important techniques for handling signals and updates, aka messages. In particular, it illustrates how message handlers can interleave or not be completed before the workflow completes, and how you can manage that. | ||
|
||
* Here, using workflow.wait_condition, signal and update handlers will only operate when the workflow is within a certain state--between cluster_started and cluster_shutdown. | ||
* You can run start_workflow with an initializer signal that you want to run before anything else other than the workflow's constructor. This pattern is known as "signal-with-start." | ||
* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so we use a lock to protect shared state from interleaved access. | ||
* Message handlers should also finish before the workflow run completes. One option is to use a lock. | ||
* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `workflow.info().is_continue_as_new_suggested()` to see when it's time. Just make sure message handlers have finished before doing so. | ||
* Message handlers can be made idempotent. See update `ClusterManager.allocate_n_nodes_to_job`. | ||
|
||
To run, first see [README.md](../../README.md) for prerequisites. | ||
|
||
Then, run the following from this directory to run the sample: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sometimes people get confused that they can't just run these two commands in the same terminal because the first blocks. In our sample READMEs we usually make clear that the starter needs to be in a separate terminal. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I copy-pasted this. Looks like I got unlucky in which one I chose! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 Yeah we are admittedly not consistent, this is not a blocker or anything. |
||
|
||
```bash | ||
poetry run python worker.py | ||
poetry run python starter.py | ||
``` | ||
|
||
This will start a worker to run your workflow and activities, then start a ClusterManagerWorkflow and put it through its paces. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
import asyncio | ||
from dataclasses import dataclass | ||
from typing import List | ||
|
||
from temporalio import activity | ||
|
||
|
||
@dataclass | ||
class AllocateNodesToJobInput: | ||
nodes: List[str] | ||
job_name: str | ||
|
||
|
||
@activity.defn | ||
async def allocate_nodes_to_job(input: AllocateNodesToJobInput): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should provide type hints for every activity return, even if |
||
print(f"Assigning nodes {input.nodes} to job {input.job_name}") | ||
await asyncio.sleep(0.1) | ||
|
||
|
||
@dataclass | ||
class DeallocateNodesForJobInput: | ||
nodes: List[str] | ||
job_name: str | ||
|
||
|
||
@activity.defn | ||
async def deallocate_nodes_for_job(input: DeallocateNodesForJobInput): | ||
print(f"Deallocating nodes {input.nodes} from job {input.job_name}") | ||
await asyncio.sleep(0.1) | ||
|
||
|
||
@dataclass | ||
class FindBadNodesInput: | ||
nodes_to_check: List[str] | ||
|
||
|
||
@activity.defn | ||
async def find_bad_nodes(input: FindBadNodesInput) -> List[str]: | ||
await asyncio.sleep(0.1) | ||
bad_nodes = [n for n in input.nodes_to_check if int(n) % 5 == 0] | ||
if bad_nodes: | ||
print(f"Found bad nodes: {bad_nodes}") | ||
else: | ||
print("No new bad nodes found.") | ||
return bad_nodes |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
import argparse | ||
import asyncio | ||
import logging | ||
import uuid | ||
from typing import Optional | ||
|
||
from temporalio import client, common | ||
from temporalio.client import Client, WorkflowHandle | ||
|
||
from updates_and_signals.safe_message_handlers.workflow import ( | ||
ClusterManagerAllocateNNodesToJobInput, | ||
ClusterManagerDeleteJobInput, | ||
ClusterManagerInput, | ||
ClusterManagerWorkflow, | ||
) | ||
|
||
|
||
async def do_cluster_lifecycle(wf: WorkflowHandle, delay_seconds: Optional[int] = None): | ||
|
||
await wf.signal(ClusterManagerWorkflow.start_cluster) | ||
|
||
allocation_updates = [] | ||
for i in range(6): | ||
allocation_updates.append( | ||
wf.execute_update( | ||
ClusterManagerWorkflow.allocate_n_nodes_to_job, | ||
ClusterManagerAllocateNNodesToJobInput( | ||
num_nodes=2, job_name=f"task-{i}" | ||
), | ||
) | ||
) | ||
await asyncio.gather(*allocation_updates) | ||
|
||
if delay_seconds: | ||
await asyncio.sleep(delay_seconds) | ||
|
||
deletion_updates = [] | ||
for i in range(6): | ||
deletion_updates.append( | ||
wf.execute_update( | ||
ClusterManagerWorkflow.delete_job, | ||
ClusterManagerDeleteJobInput(job_name=f"task-{i}"), | ||
) | ||
) | ||
await asyncio.gather(*deletion_updates) | ||
|
||
await wf.signal(ClusterManagerWorkflow.shutdown_cluster) | ||
|
||
|
||
async def main(should_test_continue_as_new: bool): | ||
# Connect to Temporal | ||
client = await Client.connect("localhost:7233") | ||
|
||
cluster_manager_handle = await client.start_workflow( | ||
ClusterManagerWorkflow.run, | ||
ClusterManagerInput(test_continue_as_new=should_test_continue_as_new), | ||
id=f"ClusterManagerWorkflow-{uuid.uuid4()}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In other samples we have used fixed workflow IDs, but don't technically have to here, but it makes the |
||
task_queue="atomic-message-handlers-task-queue", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Task queue was not changed to match the sample name |
||
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, | ||
) | ||
delay_seconds = 10 if should_test_continue_as_new else 1 | ||
await do_cluster_lifecycle(cluster_manager_handle, delay_seconds=delay_seconds) | ||
result = await cluster_manager_handle.result() | ||
print( | ||
f"Cluster shut down successfully. It peaked at {result.max_assigned_nodes} assigned nodes ." | ||
f" It had {result.num_currently_assigned_nodes} nodes assigned at the end." | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
logging.basicConfig(level=logging.INFO) | ||
parser = argparse.ArgumentParser(description="Atomic message handlers") | ||
parser.add_argument( | ||
"--test-continue-as-new", | ||
help="Make the ClusterManagerWorkflow continue as new before shutting down", | ||
action="store_true", | ||
default=False, | ||
) | ||
args = parser.parse_args() | ||
asyncio.run(main(args.test_continue_as_new)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import asyncio | ||
import logging | ||
|
||
from temporalio import activity, common, workflow | ||
from temporalio.client import Client, WorkflowHandle | ||
from temporalio.worker import Worker | ||
|
||
from updates_and_signals.safe_message_handlers.workflow import ( | ||
ClusterManagerWorkflow, | ||
allocate_nodes_to_job, | ||
deallocate_nodes_for_job, | ||
find_bad_nodes, | ||
) | ||
|
||
interrupt_event = asyncio.Event() | ||
|
||
|
||
async def main(): | ||
# Connect client | ||
client = await Client.connect("localhost:7233") | ||
|
||
async with Worker( | ||
client, | ||
task_queue="atomic-message-handlers-task-queue", | ||
workflows=[ClusterManagerWorkflow], | ||
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes], | ||
): | ||
# Wait until interrupted | ||
logging.info("ClusterManagerWorkflow worker started, ctrl+c to exit") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inconsistent use of logging vs print |
||
await interrupt_event.wait() | ||
logging.info("Shutting down") | ||
|
||
|
||
if __name__ == "__main__": | ||
logging.basicConfig(level=logging.INFO) | ||
loop = asyncio.new_event_loop() | ||
try: | ||
loop.run_until_complete(main()) | ||
except KeyboardInterrupt: | ||
interrupt_event.set() | ||
loop.run_until_complete(loop.shutdown_asyncgens()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should keep this list in alphabetical order (see comment a couple of lines above). Also not a fan of nesting these non-hello samples beneath a directory unnecessarily (you'll note we don't do this much in other samples here or in many samples repos). If you must inconsistently nest this sample, you may want nested bullets here.