Skip to content

Commit

Permalink
Changes in response to @drewhoskins-temporal's code review
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Aug 8, 2024
1 parent 2b77b3f commit f0c5ea5
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 38 deletions.
83 changes: 47 additions & 36 deletions docs/develop/python/message-passing.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ tags:
- dynamic-handlers
---

See [Workflow message-passing](/encyclopedia/workflow-message-passing#sending-queries) for an introduction to using messages with Temporal Workflows.

Here is an example Workflow Definition featuring Query, Signal, and Update handlers, and an Update validator.
You can view this as a runnable sample at [hello/hello_message_passing.py](https://github.com/temporalio/samples-python/blob/dan/dan/docs_message_passing.py).

Expand All @@ -49,7 +51,7 @@ class Language(IntEnum):

@dataclass
class GetLanguagesInput:
supported_only: bool
include_unsupported: bool


@dataclass
Expand All @@ -75,11 +77,11 @@ class GreetingWorkflow:

@workflow.query
def get_languages(self, input: GetLanguagesInput) -> list[Language]:
# A Query handler returns a value but must not mutate the Workflow state.
if input.supported_only:
return [lang for lang in Language if lang in self.greetings]
else:
# A Query handler returns a value: it can inspect but must not mutate the Workflow state.
if input.include_unsupported:
return list(Language)
else:
return [lang for lang in Language if lang in self.greetings]

@workflow.signal
def approve(self, input: ApproveInput) -> None:
Expand All @@ -98,22 +100,16 @@ class GreetingWorkflow:
if language not in self.greetings:
# In an Update validator you raise any exception to reject the Update.
raise ValueError(f"{language.name} is not supported")

@workflow.query
def get_language(self) -> Language:
return self.language
```

- Arguments and return values of handlers (and the main Workflow function) must be [serializable](/dataconversion): a [dataclass](https://docs.python.org/3/library/dataclasses.html) will often be the right choice.
While multiple arguments are supported, it's recommended to use a single dataclass argument to which fields can be added as needed.

- The argument and return types of the handler methods define the argument and return types that client code will use when sending the message (but Signals have no return type).

- It's possible to write handler methods that take multiple arguments, but this is not recommended: instead use a single dataclass argument to which fields can be added/removed as needed.

- The decorators can take arguments, for example to set the name to something other than the method name. See the API reference docs: [`@workflow.query`](https://python.temporal.io/temporalio.workflow.html#query), [`@workflow.signal`](https://python.temporal.io/temporalio.workflow.html#signal), [`workflow.update`](https://python.temporal.io/temporalio.workflow.html#update).

- See [Workflow message-passing](/encyclopedia/workflow-message-passing#sending-queries) for an introduction to using messages with Temporal Workflows and guidance on safe usage of async Signal and Update handlers.
- This example shows synchronous handlers only, but Update and Signal handlers can also be `async def` and thus use Activities, Child Workflows, durable [`asyncio.sleep(...)`](https://docs.python.org/3/library/asyncio-task.html#asyncio.sleep) timers, and [`workflow.wait_condition(...)`](https://python.temporal.io/temporalio.workflow.html#wait_condition) conditions. See [Async handlers](#async-handlers) below and read [Workflow message-passing](/encyclopedia/workflow-message-passing#sending-queries) for an introduction to safe usage of async Signal and Update handlers.

### Query handlers {#handle-query}

Expand Down Expand Up @@ -149,11 +145,9 @@ While multiple arguments are supported, it's recommended to use a single datacla
## Sending messages

To send Queries, Signals, or Updates you call methods on a [WorkflowHandle](https://python.temporal.io/temporalio.client.WorkflowHandle.html) object.

To obtain the Workflow handle, you can:
- Use the [start_workflow()](https://python.temporal.io/temporalio.client.Client.html#start_workflow) to start a Workflow and return its handle.
- Use the [get_workflow_handle_for()](https://python.temporal.io/temporalio.client.Client.html#get_workflow_handle_for) method to get a type-safe Workflow handle.
- Use the [get_workflow_handle()](https://python.temporal.io/temporalio.client.Client.html#get_workflow_handle) method to get a non-type-safe Workflow handle.
- Use the [start_workflow](https://python.temporal.io/temporalio.client.Client.html#start_workflow) to start a Workflow and return its handle.
- Use the [get_workflow_handle_for](https://python.temporal.io/temporalio.client.Client.html#get_workflow_handle_for) method to get a Workflow handle.

For example:
```python
Expand All @@ -163,8 +157,15 @@ workflow_handle = await client.start_workflow(
)
```

To find out what argument type to provide (and what return type to expect for Queries and Updates), look at the corresponding handler method on the Workflow Definition.



### Sending a Query {#send-query}

Use [`WorkflowHandle.query`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#query).


```python
supported_languages = await workflow_handle.query(
GreetingWorkflow.get_languages, GetLanguagesInput(supported_only=True)
Expand All @@ -181,6 +182,8 @@ supported_languages = await workflow_handle.query(

#### Sending a Signal from a Client {#send-signal-from-client}

Use [`WorkflowHandle.signal`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#signal).

```python
await workflow_handle.signal(GreetingWorkflow.approve, ApproveInput(name="me"))
```
Expand All @@ -194,7 +197,6 @@ A Workflow can send a Signal to another Workflow, in which case it's called an _

In this case you need to obtain a Workflow handle for the external Workflow:
- Use [`get_external_workflow_handle_for`](https://python.temporal.io/temporalio.workflow.html#get_external_workflow_handle_for) to get a typed Workflow handle to an existing Workflow.
- Use [`get_external_workflow_handle`](https://python.temporal.io/temporalio.workflow.html#get_external_workflow_handle) when you don't know the type of the other Workflow.

<div class="copycode-notice-container">
<a href="https://github.com/temporalio/documentation/blob/main/sample-apps/python/signal_your_workflow/signal_external_wf_dacx.py">
Expand All @@ -220,7 +222,7 @@ When an External Signal is sent:
#### Signal-With-Start {#signal-with-start}

Signal-With-Start is sent by a Client.
To send a Signal-With-Start, use the [`start_workflow()`](https://python.temporal.io/temporalio.client.Client.html#start_workflow) method and pass the `start_signal` argument with the name of your Signal.
To send a Signal-With-Start, use the [`start_workflow`](https://python.temporal.io/temporalio.client.Client.html#start_workflow) method and pass the `start_signal` argument with the name of your Signal.

If there's a Workflow running with the given Workflow Id, the Signal will be sent to it. If there isn't, a new Workflow will be started and the Signal will be sent immediately on start.

Expand Down Expand Up @@ -290,9 +292,16 @@ update_handle = await workflow_handle.start_update(
update_result = await update_handle.result()
```

#### Non-type safe APIs

Note that all the sample code in this document assumes that you can import the Workflow Definition.
If you do not have access to the Workflow Definition (or it is not written in Python) then you can still do everything documented here by using non type-safe APIs.
This involves passing strings instead of method objects to [`Client.start_workflow`](https://python.temporal.io/temporalio.client.Client.html#start_workflow) / [`WorkflowHandle.query`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#query) / [`WorkflowHandle.signal`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#signal) / [`WorkflowHandle.execute_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#execute_update) / [`WorkflowHandle.start_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#start_update), and using the non type-safe APIs [`get_workflow_handle`](https://python.temporal.io/temporalio.client.Client.html#get_workflow_handle) and [`get_external_workflow_handle`](https://python.temporal.io/temporalio.workflow.html#get_external_workflow_handle).


### Exceptions

The following exceptions might be raised by [`execute_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#execute_update), or when calling [`update_handle.result()`](https://python.temporal.io/temporalio.client.WorkflowUpdateHandle.html#result) on a handle obtained from [`start_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#start_update):
The following exceptions might be raised by [`execute_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#execute_update), or when calling [`update_handle.result`](https://python.temporal.io/temporalio.client.WorkflowUpdateHandle.html#result) on a handle obtained from [`start_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#start_update):

- [`temporalio.client.WorkflowUpdateFailedError`](https://python.temporal.io/temporalio.client.WorkflowUpdateFailedError.html)

Expand All @@ -312,21 +321,16 @@ If the Workflow handle references a Workflow that doesn't exist then [`execute_u

## Message handler patterns {#message-handler-patterns}

### Inject work into the main Workflow
TODO (delicate subject; probably make this non-blocking Public Preview )

### Ensuring your messages are processed exactly once
TODO
In addition to the topics below, see [Inject work into the main Workflow](/encyclopedia/workflow-message-passing#injecting-work-into-main-workflow) and [Ensuring your messages are processed exactly once](/encyclopedia/workflow-message-passing#exactly-once-message-processing).

### Async handlers
### Async handlers {#async-handlers}
In the example at the top of the page, all Signal and Update handlers were plain `def` methods.
But handlers can also be `async def`.
This allows them to use `await` to wait for Activities, Child Workflows, [`asyncio.sleep(...)`](https://docs.python.org/3/library/asyncio-task.html#asyncio.sleep) timers, or [`workflow.wait_condition(...)`](https://python.temporal.io/temporalio.workflow.html#wait_condition) conditions, thus opening up many powerful possibilities.
However, this means that handler executions, and your main Workflow method, are all running concurrently, with switching occurring between them at `await` calls.
(I.e. they interleave, but there is no parallelism.)
It's essential to understand the things that could go wrong in order to use `async def` handlers safely.
See [Controlling handler concurrency](#control-handler-concurrency) and [Waiting for message handlers to finish](#wait-for-message-handlers) below.
See also [Workflow message-passing](/encyclopedia/workflow-message-passing#sending-queries) for guidance on safe usage of async Signal and Update handlers, and the [Safe message handlers](https://github.com/temporalio/samples-python/tree/main/updates_and_signals/safe_message_handlers) sample.
See [Workflow message passing](/encyclopedia/workflow-message-passing) for guidance on safe usage of async Signal and Update handlers, the [Safe message handlers](https://github.com/temporalio/samples-python/tree/main/updates_and_signals/safe_message_handlers) sample, and the [Controlling handler concurrency](#control-handler-concurrency) and [Waiting for message handlers to finish](#wait-for-message-handlers) sections below.


As an example of an async handler, in the following code sample the Update handler from above has been changed to be an `async def` so that it can execute an Activity to make a network call to a remote service:
Expand Down Expand Up @@ -370,10 +374,16 @@ class GreetingWorkflow:
return previous_language
```

The Update handler is now able to schedule an Activity and wait for the result -- but this could be done with a Signal handler also.
In contrast to a Signal, the client sending the Update will not get an Update result until the Activity has completed and the Workflow has received a response or error from the remote service.



#### Waiting
[`await workflow.wait_condition(...)`](https://python.temporal.io/temporalio.workflow.html#wait_condition) is often useful in Temporal Workflow Definitions.
You can use it in a handler as long as it's an `async def`:
[`workflow.wait_condition(...)`](https://python.temporal.io/temporalio.workflow.html#wait_condition) is very useful in Temporal Workflow Definitions.
It allows Workflow code to wait until

For example, in an `async def` handler it could be used to wait until the handler execution can proceed, according to Workflow implementation logic:
```python
@workflow.update
async def my_update(self, update_input: UpdateInput) -> str:
Expand All @@ -384,6 +394,8 @@ You can use it in a handler as long as it's an `async def`:


#### Use `asyncio.Lock` to prevent concurrent handler execution {#control-handler-concurrency}
See [Message handler concurrency](/encyclopedia/workflow-message-passing#message-handler-concurrency).

Sometimes you may write code that is incorrect if multiple instances of a handler are in progress concurrently.
Here's an example:
```python
Expand Down Expand Up @@ -424,14 +436,13 @@ class MyWorkflow:
```

#### Finishing handlers before the Workflow completes {#wait-for-message-handlers}
See [Finishing handlers before the Workflow completes](/encyclopedia/workflow-message-passing#finishing-message-handlers).

If your Workflow has `async def` Signal or Update handlers, then there is nothing to stop you allowing your main Workflow method to return or continue-as-new while a handler execution is waiting on an async task such as an Activity result.
However, this means that the handler may have been interrupted before it finished important work.
And if it's an Update handler, then the client will get an error when they try to retrieve their Update result.
Is this really what you want?
Or would it be more correct for your Workflow to wait for in-progress handlers to finish before allowing your main Workflow method to return?


You can do this using [`all_handlers_finished()`](https://python.temporal.io/temporalio.workflow.html#all_handlers_finished):
You can avoid this by using [`workflow.wait_condition`](https://python.temporal.io/temporalio.workflow.html#wait_condition) to wait for [`all_handlers_finished`](https://python.temporal.io/temporalio.workflow.html#all_handlers_finished) to return `True`:
```python
@workflow.defn
class MyWorkflow:
Expand Down Expand Up @@ -476,7 +487,7 @@ A Dynamic Signal in Temporal is a Signal that is invoked dynamically at runtime
A Signal can be made dynamic by adding `dynamic=True` to the `@signal.defn` decorator.

The handler must accept `self`, a string input, and a `Sequence[temporalio.common.RawValue]`.
The [payload_converter()](https://python.temporal.io/temporalio.workflow.html#payload_converter) function is used to convert a `RawValue` object to the desired type.
The [payload_converter](https://python.temporal.io/temporalio.workflow.html#payload_converter) function is used to convert a `RawValue` object to the desired type.
For example:

```python
Expand All @@ -499,7 +510,7 @@ A Workflow can be made dynamic by adding `dynamic=True` to the `@workflow.defn`
You must register the Workflow with the [Worker](https://python.temporal.io/temporalio.worker.html) before it can be invoked.

The Workflow Definition must then accept a single argument of type `Sequence[temporalio.common.RawValue]`.
The [payload_converter()](https://python.temporal.io/temporalio.workflow.html#payload_converter) function is used to convert a `RawValue` object to the desired type.
The [payload_converter](https://python.temporal.io/temporalio.workflow.html#payload_converter) function is used to convert a `RawValue` object to the desired type.

<div class="copycode-notice-container">
<a href="https://github.com/temporalio/documentation/blob/main/sample-apps/python/dynamic_handlers/your_dynamic_workflow_dacx.py">
Expand Down Expand Up @@ -531,7 +542,7 @@ An Activity can be made dynamic by adding `dynamic=True` to the `@activity.defn`
You must register the Activity with the [Worker](https://python.temporal.io/temporalio.worker.html) before it can be invoked.

The Activity Definition must then accept a single argument of type `Sequence[temporalio.common.RawValue]`.
The [payload_converter()](https://python.temporal.io/temporalio.activity.html#payload_converter) function is used to convert a `RawValue` object to the desired type.
The [payload_converter](https://python.temporal.io/temporalio.activity.html#payload_converter) function is used to convert a `RawValue` object to the desired type.

<div class="copycode-notice-container">
<a href="https://github.com/temporalio/documentation/blob/main/sample-apps/python/dynamic_handlers/your_dynamic_activity_dacx.py">
Expand Down
4 changes: 2 additions & 2 deletions docs/encyclopedia/application-message-passing.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ A Signal or Update handler can block waiting for the Workflow to reach a certain
Sometimes, you need your message handler to wait for long-running operations such as executing an Activity. When this happens, the handler will yield control back to [the loop](#message-handler-concurrency). This means that your handlers can have race conditions if you’re not careful.
You can guard your handlers with concurrency primitives like mutexes or semaphores, but you should use versions of these primitives provided for Workflows in most languages. See the links below for examples of how to use them in your SDK.

#### Inject work into the main Workflow
#### Inject work into the main Workflow {#injecting-work-into-main-workflow}

Sometimes you want to process work provided by messages in the main Workflow. Perhaps you’d like to accumulate several messages before acting on any of them. For example, message handlers might put work into a queue, which can then be picked up and processed in an event loop that you yourself write.
This option is considered advanced but offers powerful flexibility. And if you serialize the handling of your messages inside your main Workflow, you can avoid using concurrency primitives like mutexes and semaphores. See the links above for how to do this in your SDK.
Expand All @@ -259,7 +259,7 @@ If you don’t need to ensure that your handlers complete, you may specify your

See the links below for how to ensure handlers are finished in your SDK.

#### Ensuring your messages are processed exactly once
#### Ensuring your messages are processed exactly once {#exactly-once-message-processing}

Many developers want their message handlers to run exactly once--to be idempotent--in cases where the same Signal or Update is delivered twice or sent by two different call sites. Temporal deduplicates messages for you on the server, but there is one important case when you need to think about this yourself when authoring a Workflow, and one when sending Signals and Updates.

Expand Down

0 comments on commit f0c5ea5

Please sign in to comment.