Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow Update and Signal handlers concurrency sample #123

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5d0307d
Atomic message handlers sample
drewhoskins Jun 19, 2024
bca534a
Remove resize jobs to reduce code size
drewhoskins Jun 19, 2024
8b0a6ed
Misc polish
drewhoskins Jun 19, 2024
fb7b32f
Add test
drewhoskins Jun 19, 2024
42d1f12
Format code
drewhoskins Jun 19, 2024
c96f06d
Continue as new
drewhoskins Jun 20, 2024
6944099
Formatting
drewhoskins Jun 20, 2024
ec1fb89
Feedback, readme, restructure files and directories
drewhoskins Jun 22, 2024
dd58c64
Format
drewhoskins Jun 22, 2024
37e56ed
More feedback. Add test-continue-as-new flag.
drewhoskins Jun 24, 2024
a1506b1
Feedback; throw ApplicationFailures from update handlers
drewhoskins Jun 24, 2024
2cad3dd
Formatting
drewhoskins Jun 24, 2024
d5db7d7
__init__.py
drewhoskins Jun 24, 2024
f39841c
Fix lint issues
drewhoskins Jun 24, 2024
344d694
Dan Feedback
drewhoskins Jun 25, 2024
fc74a69
More typehints
drewhoskins Jun 25, 2024
0b84c25
s/atomic/safe/
drewhoskins Jun 25, 2024
c8e9075
Fix and demo idempotency
drewhoskins Jun 26, 2024
4fc6dac
Compatibility with 3.8
drewhoskins Jun 26, 2024
3ba8882
More feedback
drewhoskins Jun 27, 2024
f47369e
Re-add tests
drewhoskins Jun 27, 2024
5dc6185
Fix flaky test
drewhoskins Jun 27, 2024
5b45b21
Improve update and tests
drewhoskins-temporal Jul 8, 2024
ce4d384
Ruff linting
drewhoskins-temporal Jul 8, 2024
52429bd
Use consistent verbs, improve health check
drewhoskins-temporal Jul 8, 2024
74867f1
poe format
drewhoskins-temporal Jul 8, 2024
c6bdd12
Minor sample improvements
drewhoskins-temporal Jul 8, 2024
62f24a2
Skip update tests under Java test server
dandavison Jul 22, 2024
d933042
Merge pull request #1 from dandavison/drewhoskins_concurrency_sample-dan
drewhoskins-temporal Jul 24, 2024
31e2d59
Merge branch 'main' into drewhoskins_concurrency_sample
drewhoskins-temporal Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions tests/update_and_signal_handlers/atomic_message_handlers_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import uuid

from temporalio import workflow, common
from temporalio.client import Client
from temporalio.worker import Worker
from update_and_signal_handlers.atomic_message_handlers import ClusterManager, allocate_nodes_to_job, deallocate_nodes_for_job, do_cluster_lifecycle, find_bad_nodes


async def test_atomic_message_handlers(client: Client):
async with Worker(
client,
task_queue="tq",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would suggest unique task queues in tests

workflows=[ClusterManager],
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes],
):
cluster_manager_handle = await client.start_workflow(
ClusterManager.run,
id=f"ClusterManager-{uuid.uuid4()}",
task_queue="tq",
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not be necessary in tests (tests should be isolated where they shouldn't have to worry about other things that could be running)

start_signal='start_cluster',

)
await do_cluster_lifecycle(cluster_manager_handle)
max_assigned_nodes = await cluster_manager_handle.result()
assert max_assigned_nodes == 12
179 changes: 179 additions & 0 deletions update_and_signal_handlers/atomic_message_handlers.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

README should be updated referencing this sample

Copy link
Member

@cretz cretz Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We consider it bad practice to put non-workflow code with workflow code in the same file and so we don't do it in samples except for the hello ones (which we may change see #49 and #67). Users have done bad things combining code since entire workflow files run in a sandbox including all non-workflow code/imports. Can we break this out to separate files like the other non-hello samples?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes changing the hello samples would be great as that's what I pattern-matched off of.

Copy link
Member

@cretz cretz Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Agreed (though I think there is some resistance to doing so), but yeah in the meantime I think matching the other whole-directory samples will work best here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a README for this sample similar to other samples

Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import asyncio
from datetime import timedelta
import logging
from typing import Dict, List, Optional
import uuid

from temporalio import activity, common, workflow
from temporalio.client import Client, WorkflowHandle
from temporalio.worker import Worker

# This samples shows off the key concurrent programming primitives for Workflows, especially
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# This samples shows off the key concurrent programming primitives for Workflows, especially
# This sample shows off the key concurrent programming primitives for Workflows, especially

# useful for workflows that handle signals and updates.

# - Makes signal and update handlers only operate when the workflow is within a certain state
# (here between cluster_started and cluster_shutdown) using workflow.wait_condition.
# - Signal and update handlers can block and their actions can be interleaved with one another and with the main workflow.
# Here, we use a lock to protect shared state from interleaved access.
# - Running start_workflow with an initializer signal that you want to run before anything else.
#
@activity.defn
async def allocate_nodes_to_job(nodes: List[int], task_name: str) -> List[int]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return doesn't match type hint (here and elsewhere)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually discourage multi-param activities/workflows in favor of single dataclass instances with multiple fields

print(f"Assigning nodes {nodes} to job {task_name}")
await asyncio.sleep(0.1)

@activity.defn
async def deallocate_nodes_for_job(nodes: List[int], task_name: str) -> List[int]:
print(f"Deallocating nodes {nodes} from job {task_name}")
await asyncio.sleep(0.1)

@activity.defn
async def find_bad_nodes(nodes: List[int]) -> List[int]:
await asyncio.sleep(0.1)
bad_nodes = [n for n in nodes if n % 5 == 0]
if bad_nodes:
print(f"Found bad nodes: {bad_nodes}")
return bad_nodes

# ClusterManager keeps track of the allocations of a cluster of nodes.
# Via signals, the cluster can be started and shutdown.
# Via updates, clients can also assign jobs to nodes and delete jobs.
# These updates must run atomically.
@workflow.defn
class ClusterManager:
def __init__(self) -> None:
self.cluster_started = False
self.cluster_shutdown = False
# Protects workflow state from interleaved access
self.nodes_lock = asyncio.Lock()
self.max_assigned_nodes = 0

@workflow.signal
async def start_cluster(self):
self.cluster_started = True
self.nodes : Dict[Optional[str]] = dict([(k, None) for k in range(25)])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dicts require types for keys and values and usually you'd use a dictionary comprehension (e.g. {k: None for k in range(25)})

workflow.logger.info("Cluster started")

@workflow.signal
async def shutdown_cluster(self):
await workflow.wait_condition(lambda: self.cluster_started)
self.cluster_shutdown = True
workflow.logger.info("Cluster shut down")

@workflow.update
async def allocate_n_nodes_to_job(self, task_name: str, num_nodes: int, ) -> List[int]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessarily trailing param comma (can use poe format to format code)

await workflow.wait_condition(lambda: self.cluster_started)
assert not self.cluster_shutdown
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would fail the workflow task causing the workflow to hang. Is this intentional? If not, consider using an exception that would fail the update.


await self.nodes_lock.acquire()
try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using async with self.nodes_lock:, I think it's a bit clearer

unassigned_nodes = [k for k, v in self.nodes.items() if v is None]
if len(unassigned_nodes) < num_nodes:
raise ValueError(f"Cannot allocate {num_nodes} nodes; have only {len(unassigned_nodes)} available")
assigned_nodes = unassigned_nodes[:num_nodes]
# This await would be dangerous without nodes_lock because it yields control and allows interleaving.
await self._allocate_nodes_to_job(assigned_nodes, task_name)
self.max_assigned_nodes = max(
self.max_assigned_nodes,
len([k for k, v in self.nodes.items() if v is not None]))
return assigned_nodes
finally:
self.nodes_lock.release()


async def _allocate_nodes_to_job(self, assigned_nodes: List[int], task_name: str) -> List[int]:
await workflow.execute_activity(
allocate_nodes_to_job, args=[assigned_nodes, task_name], start_to_close_timeout=timedelta(seconds=10)
)
for node in assigned_nodes:
self.nodes[node] = task_name

@workflow.update
async def delete_job(self, task_name: str) -> str:
await workflow.wait_condition(lambda: self.cluster_started)
assert not self.cluster_shutdown
await self.nodes_lock.acquire()
try:
nodes_to_free = [k for k, v in self.nodes.items() if v == task_name]
# This await would be dangerous without nodes_lock because it yields control and allows interleaving.
await self._deallocate_nodes_for_job(nodes_to_free, task_name)
return "Done"
finally:
self.nodes_lock.release()

async def _deallocate_nodes_for_job(self, nodes_to_free: List[int], task_name: str) -> List[int]:
await workflow.execute_activity(
deallocate_nodes_for_job, args=[nodes_to_free, task_name], start_to_close_timeout=timedelta(seconds=10)
)
for node in nodes_to_free:
self.nodes[node] = None

async def perform_health_checks(self):
await self.nodes_lock.acquire()
try:
assigned_nodes = [k for k, v in self.nodes.items() if v is not None]
# This await would be dangerous without nodes_lock because it yields control and allows interleaving.
bad_nodes = await workflow.execute_activity(find_bad_nodes, assigned_nodes, start_to_close_timeout=timedelta(seconds=10))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to clarify in a comment here and on _allocate_nodes_to_job what will happen if these activities fail and retry forever (or never start for whatever reason) to confirm it is intentional (lock will never get unlocked)

for node in bad_nodes:
self.nodes[node] = "BAD!"
self.num_assigned_nodes = len(assigned_nodes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this instance variable used for?

finally:
self.nodes_lock.release()

@workflow.run
async def run(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would encourage explicit return type hints on workflow functions

await workflow.wait_condition(lambda: self.cluster_started)

# Perform health checks at intervals
while True:
try:
await workflow.wait_condition(lambda: self.cluster_shutdown, timeout=timedelta(seconds=1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try to avoid such frequent timers in loops in workflows usually (this can blow up histories), so even in a sample it may be encouraging bad behavior

except asyncio.TimeoutError:
pass
if self.cluster_shutdown:
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just put break after the wait_condition line inside the try

await self.perform_health_checks()

# Now we can start allocating jobs to nodes
await workflow.wait_condition(lambda: self.cluster_shutdown)
return self.max_assigned_nodes

async def do_cluster_lifecycle(wf: WorkflowHandle):
allocation_updates = []
for i in range(6):
allocation_updates.append(wf.execute_update(ClusterManager.allocate_n_nodes_to_job, args=[f"task-{i}", 2]))
await asyncio.gather(*allocation_updates)

deletion_updates = []
for i in range(6):
deletion_updates.append(wf.execute_update(ClusterManager.delete_job, f"task-{i}"))
await asyncio.gather(*deletion_updates)

await wf.signal(ClusterManager.shutdown_cluster)

async def main():
client = await Client.connect("localhost:7233")

async with Worker(
client,
task_queue="tq",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To prevent clashing, in samples we try to name the task queue after the sample

workflows=[ClusterManager],
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes],
):
cluster_manager_handle = await client.start_workflow(
ClusterManager.run,
id=f"ClusterManager-{uuid.uuid4()}",
task_queue="tq",
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
start_signal='start_cluster',

)
await do_cluster_lifecycle(cluster_manager_handle)
max_assigned_nodes = await cluster_manager_handle.result()
print(f"Cluster shut down successfully. It peaked at {max_assigned_nodes} assigned nodes.")



if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
Loading