-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Workflow Update and Signal handlers concurrency sample #123
Merged
drewhoskins-temporal
merged 30 commits into
temporalio:main
from
drewhoskins:drewhoskins_concurrency_sample
Jul 24, 2024
Merged
Changes from all commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
5d0307d
Atomic message handlers sample
drewhoskins bca534a
Remove resize jobs to reduce code size
drewhoskins 8b0a6ed
Misc polish
drewhoskins fb7b32f
Add test
drewhoskins 42d1f12
Format code
drewhoskins c96f06d
Continue as new
drewhoskins 6944099
Formatting
drewhoskins ec1fb89
Feedback, readme, restructure files and directories
drewhoskins dd58c64
Format
drewhoskins 37e56ed
More feedback. Add test-continue-as-new flag.
drewhoskins a1506b1
Feedback; throw ApplicationFailures from update handlers
drewhoskins 2cad3dd
Formatting
drewhoskins d5db7d7
__init__.py
drewhoskins f39841c
Fix lint issues
drewhoskins 344d694
Dan Feedback
drewhoskins fc74a69
More typehints
drewhoskins 0b84c25
s/atomic/safe/
drewhoskins c8e9075
Fix and demo idempotency
drewhoskins 4fc6dac
Compatibility with 3.8
drewhoskins 3ba8882
More feedback
drewhoskins f47369e
Re-add tests
drewhoskins 5dc6185
Fix flaky test
drewhoskins 5b45b21
Improve update and tests
drewhoskins-temporal ce4d384
Ruff linting
drewhoskins-temporal 52429bd
Use consistent verbs, improve health check
drewhoskins-temporal 74867f1
poe format
drewhoskins-temporal c6bdd12
Minor sample improvements
drewhoskins-temporal 62f24a2
Skip update tests under Java test server
dandavison d933042
Merge pull request #1 from dandavison/drewhoskins_concurrency_sample-dan
drewhoskins-temporal 31e2d59
Merge branch 'main' into drewhoskins_concurrency_sample
drewhoskins-temporal File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
155 changes: 155 additions & 0 deletions
155
tests/updates_and_signals/safe_message_handlers/workflow_test.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
import asyncio | ||
import uuid | ||
|
||
import pytest | ||
from temporalio.client import Client, WorkflowUpdateFailedError | ||
from temporalio.exceptions import ApplicationError | ||
from temporalio.testing import WorkflowEnvironment | ||
from temporalio.worker import Worker | ||
|
||
from updates_and_signals.safe_message_handlers.activities import ( | ||
assign_nodes_to_job, | ||
find_bad_nodes, | ||
unassign_nodes_for_job, | ||
) | ||
from updates_and_signals.safe_message_handlers.workflow import ( | ||
ClusterManagerAssignNodesToJobInput, | ||
ClusterManagerDeleteJobInput, | ||
ClusterManagerInput, | ||
ClusterManagerWorkflow, | ||
) | ||
|
||
|
||
async def test_safe_message_handlers(client: Client, env: WorkflowEnvironment): | ||
if env.supports_time_skipping: | ||
pytest.skip( | ||
"Java test server: https://github.com/temporalio/sdk-java/issues/1903" | ||
) | ||
task_queue = f"tq-{uuid.uuid4()}" | ||
async with Worker( | ||
client, | ||
task_queue=task_queue, | ||
workflows=[ClusterManagerWorkflow], | ||
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes], | ||
): | ||
cluster_manager_handle = await client.start_workflow( | ||
ClusterManagerWorkflow.run, | ||
ClusterManagerInput(), | ||
id=f"ClusterManagerWorkflow-{uuid.uuid4()}", | ||
task_queue=task_queue, | ||
) | ||
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster) | ||
|
||
allocation_updates = [] | ||
for i in range(6): | ||
allocation_updates.append( | ||
cluster_manager_handle.execute_update( | ||
ClusterManagerWorkflow.assign_nodes_to_job, | ||
ClusterManagerAssignNodesToJobInput( | ||
total_num_nodes=2, job_name=f"task-{i}" | ||
), | ||
) | ||
) | ||
results = await asyncio.gather(*allocation_updates) | ||
for result in results: | ||
assert len(result.nodes_assigned) == 2 | ||
|
||
await asyncio.sleep(1) | ||
|
||
deletion_updates = [] | ||
for i in range(6): | ||
deletion_updates.append( | ||
cluster_manager_handle.execute_update( | ||
ClusterManagerWorkflow.delete_job, | ||
ClusterManagerDeleteJobInput(job_name=f"task-{i}"), | ||
) | ||
) | ||
await asyncio.gather(*deletion_updates) | ||
|
||
await cluster_manager_handle.signal(ClusterManagerWorkflow.shutdown_cluster) | ||
|
||
result = await cluster_manager_handle.result() | ||
assert result.num_currently_assigned_nodes == 0 | ||
|
||
|
||
async def test_update_idempotency(client: Client, env: WorkflowEnvironment): | ||
if env.supports_time_skipping: | ||
pytest.skip( | ||
"Java test server: https://github.com/temporalio/sdk-java/issues/1903" | ||
) | ||
task_queue = f"tq-{uuid.uuid4()}" | ||
async with Worker( | ||
client, | ||
task_queue=task_queue, | ||
workflows=[ClusterManagerWorkflow], | ||
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes], | ||
): | ||
cluster_manager_handle = await client.start_workflow( | ||
ClusterManagerWorkflow.run, | ||
ClusterManagerInput(), | ||
id=f"ClusterManagerWorkflow-{uuid.uuid4()}", | ||
task_queue=task_queue, | ||
) | ||
|
||
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster) | ||
|
||
result_1 = await cluster_manager_handle.execute_update( | ||
ClusterManagerWorkflow.assign_nodes_to_job, | ||
ClusterManagerAssignNodesToJobInput( | ||
total_num_nodes=5, job_name="jobby-job" | ||
), | ||
) | ||
# simulate that in calling it twice, the operation is idempotent | ||
result_2 = await cluster_manager_handle.execute_update( | ||
ClusterManagerWorkflow.assign_nodes_to_job, | ||
ClusterManagerAssignNodesToJobInput( | ||
total_num_nodes=5, job_name="jobby-job" | ||
), | ||
) | ||
# the second call should not assign more nodes (it may return fewer if the health check finds bad nodes | ||
# in between the two signals.) | ||
assert result_1.nodes_assigned >= result_2.nodes_assigned | ||
|
||
|
||
async def test_update_failure(client: Client, env: WorkflowEnvironment): | ||
if env.supports_time_skipping: | ||
pytest.skip( | ||
"Java test server: https://github.com/temporalio/sdk-java/issues/1903" | ||
) | ||
task_queue = f"tq-{uuid.uuid4()}" | ||
async with Worker( | ||
client, | ||
task_queue=task_queue, | ||
workflows=[ClusterManagerWorkflow], | ||
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes], | ||
): | ||
cluster_manager_handle = await client.start_workflow( | ||
ClusterManagerWorkflow.run, | ||
ClusterManagerInput(), | ||
id=f"ClusterManagerWorkflow-{uuid.uuid4()}", | ||
task_queue=task_queue, | ||
) | ||
|
||
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster) | ||
|
||
await cluster_manager_handle.execute_update( | ||
ClusterManagerWorkflow.assign_nodes_to_job, | ||
ClusterManagerAssignNodesToJobInput( | ||
total_num_nodes=24, job_name="big-task" | ||
), | ||
) | ||
try: | ||
# Try to assign too many nodes | ||
await cluster_manager_handle.execute_update( | ||
ClusterManagerWorkflow.assign_nodes_to_job, | ||
ClusterManagerAssignNodesToJobInput( | ||
total_num_nodes=3, job_name="little-task" | ||
), | ||
) | ||
except WorkflowUpdateFailedError as e: | ||
assert isinstance(e.cause, ApplicationError) | ||
assert e.cause.message == "Cannot assign 3 nodes; have only 1 available" | ||
finally: | ||
await cluster_manager_handle.signal(ClusterManagerWorkflow.shutdown_cluster) | ||
result = await cluster_manager_handle.result() | ||
assert result.num_currently_assigned_nodes + result.num_bad_nodes == 24 |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
# Atomic message handlers | ||
|
||
This sample shows off important techniques for handling signals and updates, aka messages. In particular, it illustrates how message handlers can interleave or not be completed before the workflow completes, and how you can manage that. | ||
|
||
* Here, using workflow.wait_condition, signal and update handlers will only operate when the workflow is within a certain state--between cluster_started and cluster_shutdown. | ||
* You can run start_workflow with an initializer signal that you want to run before anything else other than the workflow's constructor. This pattern is known as "signal-with-start." | ||
* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so we use a lock to protect shared state from interleaved access. | ||
* Message handlers should also finish before the workflow run completes. One option is to use a lock. | ||
* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `workflow.info().is_continue_as_new_suggested()` to see when it's time. Just make sure message handlers have finished before doing so. | ||
* Message handlers can be made idempotent. See update `ClusterManager.assign_nodes_to_job`. | ||
|
||
To run, first see [README.md](../../README.md) for prerequisites. | ||
|
||
Then, run the following from this directory to run the worker: | ||
\ | ||
poetry run python worker.py | ||
|
||
Then, in another terminal, run the following to execute the workflow: | ||
|
||
poetry run python starter.py | ||
|
||
This will start a worker to run your workflow and activities, then start a ClusterManagerWorkflow and put it through its paces. |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
import asyncio | ||
from dataclasses import dataclass | ||
from typing import List, Set | ||
|
||
from temporalio import activity | ||
|
||
|
||
@dataclass | ||
class AssignNodesToJobInput: | ||
nodes: List[str] | ||
job_name: str | ||
|
||
|
||
@activity.defn | ||
async def assign_nodes_to_job(input: AssignNodesToJobInput) -> None: | ||
print(f"Assigning nodes {input.nodes} to job {input.job_name}") | ||
await asyncio.sleep(0.1) | ||
|
||
|
||
@dataclass | ||
class UnassignNodesForJobInput: | ||
nodes: List[str] | ||
job_name: str | ||
|
||
|
||
@activity.defn | ||
async def unassign_nodes_for_job(input: UnassignNodesForJobInput) -> None: | ||
print(f"Deallocating nodes {input.nodes} from job {input.job_name}") | ||
await asyncio.sleep(0.1) | ||
|
||
|
||
@dataclass | ||
class FindBadNodesInput: | ||
nodes_to_check: Set[str] | ||
|
||
|
||
@activity.defn | ||
async def find_bad_nodes(input: FindBadNodesInput) -> Set[str]: | ||
await asyncio.sleep(0.1) | ||
bad_nodes = set([n for n in input.nodes_to_check if int(n) % 5 == 0]) | ||
if bad_nodes: | ||
print(f"Found bad nodes: {bad_nodes}") | ||
else: | ||
print("No new bad nodes found.") | ||
return bad_nodes |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import argparse | ||
import asyncio | ||
import logging | ||
import uuid | ||
from typing import Optional | ||
|
||
from temporalio import common | ||
from temporalio.client import Client, WorkflowHandle | ||
|
||
from updates_and_signals.safe_message_handlers.workflow import ( | ||
ClusterManagerAssignNodesToJobInput, | ||
ClusterManagerDeleteJobInput, | ||
ClusterManagerInput, | ||
ClusterManagerWorkflow, | ||
) | ||
|
||
|
||
async def do_cluster_lifecycle(wf: WorkflowHandle, delay_seconds: Optional[int] = None): | ||
|
||
await wf.signal(ClusterManagerWorkflow.start_cluster) | ||
|
||
print("Assigning jobs to nodes...") | ||
allocation_updates = [] | ||
for i in range(6): | ||
allocation_updates.append( | ||
wf.execute_update( | ||
ClusterManagerWorkflow.assign_nodes_to_job, | ||
ClusterManagerAssignNodesToJobInput( | ||
total_num_nodes=2, job_name=f"task-{i}" | ||
), | ||
) | ||
) | ||
await asyncio.gather(*allocation_updates) | ||
|
||
print(f"Sleeping for {delay_seconds} second(s)") | ||
if delay_seconds: | ||
await asyncio.sleep(delay_seconds) | ||
|
||
print("Deleting jobs...") | ||
deletion_updates = [] | ||
for i in range(6): | ||
deletion_updates.append( | ||
wf.execute_update( | ||
ClusterManagerWorkflow.delete_job, | ||
ClusterManagerDeleteJobInput(job_name=f"task-{i}"), | ||
) | ||
) | ||
await asyncio.gather(*deletion_updates) | ||
|
||
await wf.signal(ClusterManagerWorkflow.shutdown_cluster) | ||
|
||
|
||
async def main(should_test_continue_as_new: bool): | ||
# Connect to Temporal | ||
client = await Client.connect("localhost:7233") | ||
|
||
print("Starting cluster") | ||
cluster_manager_handle = await client.start_workflow( | ||
ClusterManagerWorkflow.run, | ||
ClusterManagerInput(test_continue_as_new=should_test_continue_as_new), | ||
id=f"ClusterManagerWorkflow-{uuid.uuid4()}", | ||
task_queue="safe-message-handlers-task-queue", | ||
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, | ||
) | ||
delay_seconds = 10 if should_test_continue_as_new else 1 | ||
await do_cluster_lifecycle(cluster_manager_handle, delay_seconds=delay_seconds) | ||
result = await cluster_manager_handle.result() | ||
print( | ||
f"Cluster shut down successfully." | ||
f" It had {result.num_currently_assigned_nodes} nodes assigned at the end." | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
logging.basicConfig(level=logging.INFO) | ||
parser = argparse.ArgumentParser(description="Atomic message handlers") | ||
parser.add_argument( | ||
"--test-continue-as-new", | ||
help="Make the ClusterManagerWorkflow continue as new before shutting down", | ||
action="store_true", | ||
default=False, | ||
) | ||
args = parser.parse_args() | ||
asyncio.run(main(args.test_continue_as_new)) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other samples we have used fixed workflow IDs, but don't technically have to here, but it makes the
id_reuse_policy
have no value since this is always unique