diff --git a/docs/develop/python/message-passing.mdx b/docs/develop/python/message-passing.mdx index ddd424fb88..f19f5b9c33 100644 --- a/docs/develop/python/message-passing.mdx +++ b/docs/develop/python/message-passing.mdx @@ -2,7 +2,7 @@ id: message-passing title: Messages - Python SDK feature guide sidebar_label: Messages -description: Learn how to develop with Signals, Queries, and Updates using the Python SDK in Temporal. Master defining Signal handlers, customizing names, sending Signals, and handling Queries dynamically. +description: Develop with Queries, Signals, and Updates with the Temporal Python SDK. toc_max_heading_level: 2 keywords: - temporal python signals @@ -27,156 +27,205 @@ tags: - dynamic-handlers --- -This page shows how to do the following: +A Workflow can act like a stateful web service that receives messages: Queries, Signals, and Updates. +The Workflow implementation defines these endpoints via handler methods that can react to incoming messages and return values. +Temporal Clients use messages to read Workflow state and control its execution. +See [Workflow message passing](/encyclopedia/workflow-message-passing) for a general overview of this topic. +This page introduces these features for the Temporal Python SDK. -- [Develop with Signals](#signals) -- [Develop with Queries](#queries) -- [Develop with Updates](#updates) -- [Dynamic Handler](#dynamic-handler) +## Write message handlers {#writing-message-handlers} -## Signals {#signals} - -**How to develop with Signals using the Python SDK.** - -A [Signal](/encyclopedia/workflow-message-passing#sending-signals) is a message sent asynchronously to a running Workflow Execution which can be used to change the state and control the flow of a Workflow Execution. -It can only deliver data to a Workflow Execution that has not already closed. - -Signals are defined in your code and handled in your Workflow Definition. -Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution. +:::info +The code that follows is part of a working message passing [sample](https://github.com/temporalio/samples-python/tree/message-passing/message_passing/introduction). +::: -There are two steps for adding support for a Signal to your Workflow code: +Follow these guidelines when writing your message handlers: -1. **[Defining the Signal](#define-signal)** - You specify the name and data structure used by Temporal Clients when sending the Signal. -2. **[Handling the Signal](#handle-signal)** - You write code that will be invoked when the Signal is received from a Temporal Client. +- Message handlers are defined as methods on the Workflow class, using one of the three decorators: [`@workflow.query`](https://python.temporal.io/temporalio.workflow.html#query), [`@workflow.signal`](https://python.temporal.io/temporalio.workflow.html#signal), and [`@workflow.update`](https://python.temporal.io/temporalio.workflow.html#update). +- The parameters and return values of handlers and the main Workflow function must be [serializable](/dataconversion). +- Prefer [data classes](https://docs.python.org/3/library/dataclasses.html) to multiple input parameters. + Data class parameters allow you to add fields without changing the calling signature. -After defining and handling your Signal, you can send it from a [Temporal Client](#send-signal-from-client) or from another [Workflow Execution](#send-signal-from-workflow). +### Query handlers {#queries} -### Define Signal {#define-signal} +A [Query](/encyclopedia/workflow-message-passing#sending-queries) is a synchronous operation that retrieves state from a Workflow Execution: -**How to define a Signal using the Python SDK.** +```python +class Language(IntEnum): + Chinese = 1 + English = 2 + French = 3 -A Signal has a name and can have arguments. +@dataclass +class GetLanguagesInput: + include_unsupported: bool -- The name, also called a Signal type, is a string. -- The arguments must be serializable. - To define a Signal, set the Signal decorator [`@workflow.signal`](https://python.temporal.io/temporalio.workflow.html#signal) on the Signal function inside your Workflow. +@workflow.defn +class GreetingWorkflow: + def __init__(self) -> None: + self.greetings = { + Language.CHINESE: "你好,世界", + Language.ENGLISH: "Hello, world", + } -Non-dynamic methods can only have positional arguments. -Temporal suggests taking a single argument that is an object or data class of fields that can be added to as needed. + @workflow.query + def get_languages(self, input: GetLanguagesInput) -> list[Language]: + # 👉 A Query handler returns a value: it can inspect but must not mutate the Workflow state. + if input.include_unsupported: + return list(Language) + else: + return list(self.greetings) +``` -Return values from Signal methods are ignored. +- The Query decorator can accept arguments. + Refer to the API docs: [`@workflow.query`](https://python.temporal.io/temporalio.workflow.html#query). -**How to customize names** +- A Query handler uses `def`, not `async def`. + You can't perform async operations like executing an Activity in a Query handler. -You can have a name parameter to customize the Signal's name, otherwise it defaults to the name of the Signal method. +### Signal handlers {#signals} -
- - View the source code - - in the context of the rest of the application code. -
+A [Signal](/encyclopedia/workflow-message-passing#sending-signals) is an asynchronous message sent to a running Workflow Execution to change its state and control its flow: ```python -from temporalio import workflow -# ... - @workflow.signal - async def submit_greeting(self, name: str) -> None: - await self._pending_greetings.put(name) +@dataclass +class ApproveInput: + name: str +@workflow.defn +class GreetingWorkflow: + ... @workflow.signal - def exit(self) -> None: -# ... - @workflow.signal(name="Custom Signal Name") - async def custom_signal(self, name: str) -> None: - await self._pending_greetings.put(name) + def approve(self, input: ApproveInput) -> None: + # 👉 A Signal handler mutates the Workflow state but cannot return a value. + self.approved_for_release = True + self.approver_name = input.name ``` -### Handle Signal {#handle-signal} - -**How to handle a Signal using the Python SDK.** +- The Signal decorator can accept arguments. + Refer to the API docs: [`@workflow.signal`](https://python.temporal.io/temporalio.workflow.html#signal). -Workflows listen for Signals by the Signal's name. +- The handler should not return a value. + The response is sent immediately from the server, without waiting for the Workflow to process the Signal. -Signal handlers are functions defined in the Workflow that listen for incoming Signals of a given type. -These handlers define how a Workflow should react when it receives a specific type of Signal. +- Signal (and Update) handlers can be `async def`. + This allows you to use Activities, Child Workflows, durable [`asyncio.sleep`](https://docs.python.org/3/library/asyncio-task.html#asyncio.sleep) Timers, [`workflow.wait_condition`](https://python.temporal.io/temporalio.workflow.html#wait_condition) conditions, and more. + See [Async handlers](#async-handlers) and [Workflow message passing](/encyclopedia/workflow-message-passing) for guidelines on safely using async Signal and Update handlers. -To send a Signal to the Workflow, use the [signal](https://python.temporal.io/temporalio.client.WorkflowHandle.html#signal) method from the [WorkflowHandle](https://python.temporal.io/temporalio.client.WorkflowHandle.html) class. -
- - View the source code in the context of the rest of the application code. - +### Update handlers and validators {#updates} -
+An [Update](/encyclopedia/workflow-message-passing#sending-updates) is a trackable synchronous request sent to a running Workflow Execution. +It can change the Workflow state, control its flow, and return a result. +The sender must wait until the Worker accepts or rejects the Update. +The sender may wait further to receive a returned value or an exception if something goes wrong: ```python -from temporalio.client import Client -# ... -# ... - await handle.signal(GreetingWorkflow.submit_greeting, "User 1") +class Language(IntEnum): + Chinese = 1 + English = 2 + French = 3 + +@workflow.defn +class GreetingWorkflow: + ... + @workflow.update + def set_language(self, language: Language) -> Language: + # 👉 An Update handler can mutate the Workflow state and return a value. + previous_language, self.language = self.language, language + return previous_language + + @set_language.validator + def validate_language(self, language: Language) -> None: + if language not in self.greetings: + # 👉 In an Update validator you raise any exception to reject the Update. + raise ValueError(f"{language.name} is not supported") ``` -### Send a Signal from a Temporal Client {#send-signal-from-client} +- The Update decorator can take arguments (like, `name`, `dynamic` and `unfinished_policy`) as described in the API reference docs for [`workflow.update`](https://python.temporal.io/temporalio.workflow.html#update). -**How to send a Signal from a Temporal Client using the Python SDK.** +- About validators: + - Use validators to reject an Update before it is written to History. + Validators are always optional. + If you don't need to reject Updates, you can skip them. + - The SDK automatically provides a validator decorator named `@.validator`. + The validator must accept the same argument types as the handler and return `None`. -When a Signal is sent successfully from the Temporal Client, the [WorkflowExecutionSignaled](/references/events#workflowexecutionsignaled) Event appears in the Event History of the Workflow that receives the Signal. +- Accepting and rejecting Updates with validators: + - To reject an Update, raise an exception of any type in the validator. + - Without a validator, Updates are always accepted. +- Validators and Event History: + - The `WorkflowExecutionUpdateAccepted` event is written into the History whether the acceptance was automatic or programmatic. + - When a Validator raises an error, the Update is rejected and `WorkflowExecutionUpdateAccepted` _won't_ be added to the Event History. + The caller receives an "Update failed" error. -To send a Signal from the Client, use the [signal()](https://python.temporal.io/temporalio.client.WorkflowHandle.html#signal) function on the Workflow handle. +- Use [`workflow.current_update_info`](https://python.temporal.io/temporalio.workflow.html#current_update_info) to obtain information about the current Update. + This includes the Update ID, which can be useful for deduplication when using Continue-As-New: see [Ensuring your messages are processed exactly once](/encyclopedia/workflow-message-passing#exactly-once-message-processing). +- Update (and Signal) handlers can be `async def`, letting them use Activities, Child Workflows, durable [`asyncio.sleep`](https://docs.python.org/3/library/asyncio-task.html#asyncio.sleep) Timers, [`workflow.wait_condition`](https://python.temporal.io/temporalio.workflow.html#wait_condition) conditions, and more. + See [Async handlers](#async-handlers) and [Workflow message passing](/encyclopedia/workflow-message-passing) for safe usage guidelines. -To get the Workflow handle, you can use any of the following options. +## Send messages {#send-messages} -- Use the [get_workflow_handle()](https://python.temporal.io/temporalio.client.Client.html#get_workflow_handle) method. -- Use the [get_workflow_handle_for()](https://python.temporal.io/temporalio.client.Client.html#get_workflow_handle_for) method to get a type-safe Workflow handle by its Workflow Id. -- Use the [start_workflow()](https://python.temporal.io/temporalio.client.Client.html#start_workflow) to start a Workflow and return its handle. +To send Queries, Signals, or Updates, you call methods on a [WorkflowHandle](https://python.temporal.io/temporalio.client.WorkflowHandle.html) object: -
- - View the source code - {' '} - in the context of the rest of the application code. -
+- Use [start_workflow](https://python.temporal.io/temporalio.client.Client.html#start_workflow) to start a Workflow and return its handle. + +- Use [get_workflow_handle_for](https://python.temporal.io/temporalio.client.Client.html#get_workflow_handle_for) to retrieve a typed Workflow handle by its Workflow Id. + +For example: ```python -from temporalio.client import Client -# ... -# ... - client = await Client.connect("localhost:7233") - handle = await client.start_workflow( - GreetingWorkflow.run, - id="your-greeting-workflow", - task_queue="signal-tq", - ) - await handle.signal(GreetingWorkflow.submit_greeting, "User 1") +client = await Client.connect("localhost:7233") +workflow_handle = await client.start_workflow( + GreetingWorkflow.run, id="greeting-workflow-1234", task_queue="my-task-queue" +) ``` -### Send a Signal from a Workflow {#send-signal-from-workflow} +To check the argument types required when sending messages -- and the return type for Queries and Updates -- refer to the corresponding handler method in the Workflow Definition. -**How to send a Signal from a Workflow using the Python SDK.** +### Send a Query {#send-query} -A Workflow can send a Signal to another Workflow, in which case it's called an _External Signal_. +Use [`WorkflowHandle.query`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#query) to send a Query to a Workflow Execution: -When an External Signal is sent: -- A [SignalExternalWorkflowExecutionInitiated](/references/events#signalexternalworkflowexecutioninitiated) Event appears in the sender's Event History. -- A [WorkflowExecutionSignaled](/references/events#workflowexecutionsignaled) Event appears in the recipient's Event History. +```python +supported_languages = await workflow_handle.query( + GreetingWorkflow.get_languages, GetLanguagesInput(supported_only=True) +) +``` -Use [`get_external_workflow_handle_for`](https://python.temporal.io/temporalio.workflow.html#get_external_workflow_handle_for) to get a typed Workflow handle to an existing Workflow by its identifier. -Use [`get_external_workflow_handle`](https://python.temporal.io/temporalio.workflow.html#get_external_workflow_handle) when you don't know the type of the other Workflow. +- Sending a Query doesn’t add events to a Workflow's Event History. -:::note +- You can send Queries to closed Workflow Executions within a Namespace's Workflow retention period. + This includes Workflows that have completed, failed, or timed out. + Querying terminated Workflows is not safe and, therefore, not supported. -The Workflow Type passed is only for type annotations and not for validation. +- A Worker must be online and polling the Task Queue to process a Query. -::: +### Send a Signal {#send-signal} + +You can send a Signal to a Workflow Execution from a Temporal Client or from another Workflow Execution. +However, you can only send Signals to Workflow Executions that haven’t closed. -
- - View the source code in the context of the rest of the application code. - {' '} +#### Send a Signal from a Client {#send-signal-from-client} -
+Use [`WorkflowHandle.signal`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#signal) to send a Signal: + +```python +await workflow_handle.signal(GreetingWorkflow.approve, ApproveInput(name="me")) +``` + +- The call returns when the server accepts the Signal; it does _not_ wait for the Signal to be delivered to the Workflow Execution. + +- The [WorkflowExecutionSignaled](/references/events#workflowexecutionsignaled) Event appears in the Workflow's Event History. + +#### Send a Signal from a Workflow {#send-signal-from-workflow} + +A Workflow can send a Signal to another Workflow, known as an _External Signal_. +You'll need a Workflow handle for the external Workflow. +Use [`get_external_workflow_handle_for`](https://python.temporal.io/temporalio.workflow.html#get_external_workflow_handle_for): + +
See full sample
```python # ... @@ -188,28 +237,20 @@ class WorkflowB: await handle.signal(WorkflowA.your_signal, "signal argument") ``` -### Signal-With-Start {#signal-with-start} - -**How to send a Signal-With-Start using the Python SDK.** - -Signal-With-Start is used from the Client. -It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments. - -If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled. +When an External Signal is sent: +- A [SignalExternalWorkflowExecutionInitiated](/references/events#signalexternalworkflowexecutioninitiated) Event appears in the sender's Event History. +- A [WorkflowExecutionSignaled](/references/events#workflowexecutionsignaled) Event appears in the recipient's Event History. -To send a Signal-With-Start in Python, use the [`start_workflow()`](https://python.temporal.io/temporalio.client.Client.html#start_workflow) method and pass the `start_signal` argument with the name of your Signal. +#### Signal-With-Start {#signal-with-start} -
- - View the source code in the context of the rest of the application code. - {' '} +Signal-With-Start allows a Client to send a Signal to a Workflow Execution, starting the Execution if it is not already running. +To use Signal-With-Start, call the [`start_workflow`](https://python.temporal.io/temporalio.client.Client.html#start_workflow) method and pass the `start_signal` argument with the name of your Signal: -
+
See full sample
```python from temporalio.client import Client # ... -# ... async def main(): client = await Client.connect("localhost:7233") await client.start_workflow( @@ -221,214 +262,399 @@ async def main(): ) ``` -## Queries {#queries} +### Send an Update {#send-update-from-client} -A [Query](/encyclopedia/workflow-message-passing#sending-queries) is a synchronous operation that is used to get the state of a Workflow Execution. +An Update is a synchronous, blocking call that can change Workflow state, control its flow, and return a result. -### How to define a Query {#define-query} +A client sending an Update must wait until the Server delivers the Update to a Worker. +Workers must be available and responsive. +If you need a response as soon as the Server receives the request, use a Signal instead. +Also note that you can't send Updates to other Workflow Executions or perform an Update equivalent of Signal-With-Start. -A Query has a name and can have arguments. +- `WorkflowExecutionUpdateAccepted` is added to the Event History when the Worker confirms that the Update passed validation. +- `WorkflowExecutionUpdateCompleted` is added to the Event History when the Worker confirms that the Update has finished. -- The name, also called a Query type, is a string. -- The arguments must be [serializable](/dataconversion). +To send an Update to a Workflow Execution, you can: -To define a Query, set the Query decorator [`@workflow.query`](https://python.temporal.io/temporalio.workflow.html#query) on the Query function inside your Workflow. +- Call [`execute_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#execute_update) and wait for the Update to complete. + This code fetches an Update result: -**Customize names** + ```python + previous_language = await workflow_handle.execute_update( + GreetingWorkflow.set_language, Language.Chinese + ) + ``` -You can have a name parameter to customize the Query's name, otherwise it defaults to the name of the Query method. +- Send [`start_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#start_update) to receive an [`UpdateHandle`](https://python.temporal.io/temporalio.client.WorkflowUpdateHandle.html) as soon as the Update is accepted or rejected. -:::note + - Use this `UpdateHandle` later to fetch your results. + - `async def` Update handlers normally perform long-running async activities. + - `start_update` only waits until the Worker has accepted or rejected the Update, not until all asynchronous operations are complete. -You can either set the `name` or the `dynamic` parameter in a Query's decorator, but not both. + For example: -::: + ```python + # Wait until the update is accepted + update_handle = await workflow_handle.start_update( + HelloWorldWorkflow.set_greeting, + HelloWorldInput("World"), + ) + # Wait until the update is completed + update_result = await update_handle.result() + ``` -
- - View the source code in the context of the rest of the application code. - {' '} + For more details, see the "Async handlers" section. -
+To obtain an Update handle, you can: -```python -# ... - @workflow.query - def greeting(self) -> str: - return self._greeting -``` +- Use [`start_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#start_update) to start an Update and return the handle, as shown in the preceding example. +- Use [`get_update_handle_for`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#get_update_handle_for) to fetch a handle for an in-progress Update using the Update ID and Workflow ID. -### Handle a Query {#handle-query} +:::info NON-TYPE SAFE API CALLS -**How to handle a Query** +In real-world development, sometimes you may be unable to import Workflow Definition method signatures. +When you don't have access to the Workflow Definition or it isn't written in Python, you can still use non-type safe APIs and dynamic method invocation. +Pass method names instead of method objects to: -Queries are handled by your Workflow. +- [`Client.start_workflow`](https://python.temporal.io/temporalio.client.Client.html#start_workflow) +- [`WorkflowHandle.query`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#query) +- [`WorkflowHandle.signal`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#signal) +- [`WorkflowHandle.execute_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#execute_update) +- [`WorkflowHandle.start_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#start_update) -Don’t include any logic that causes [Command](/workflows#command) generation within a Query handler (such as executing Activities). -Including such logic causes unexpected behavior. +Use these non-type safe APIs: -To send a Query to the Workflow, use the [`query`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#query) method from the [`WorkflowHandle`](https://python.temporal.io/temporalio.client.WorkflowHandle.html) class. +- [`get_workflow_handle`](https://python.temporal.io/temporalio.client.Client.html#get_workflow_handle) +- [`get_external_workflow_handle`](https://python.temporal.io/temporalio.workflow.html#get_external_workflow_handle). -
- - View the source code in the context of the rest of the application code. - {' '} +::: -
+## Message handler patterns {#message-handler-patterns} -```python -# ... - result = await handle.query(GreetingWorkflow.greeting) -``` +This section covers common write operations, such as Signal and Update handlers. +It doesn't apply to pure read operations, like Queries or Update Validators. -### Send a Query {#send-query} +:::tip -**How to send a Query** +For additional information, see [Inject work into the main Workflow](/encyclopedia/workflow-message-passing#injecting-work-into-main-workflow), [Ensuring your messages are processed exactly once](/encyclopedia/workflow-message-passing#exactly-once-message-processing), and [this sample](https://github.com/temporalio/samples-python/blob/message-passing/message_passing/safe_message_handlers/README.md) demonstrating safe `async` message handling. -Queries are sent from a Temporal Client. +::: -To send a Query to a Workflow Execution from Client code, use the `query()` method on the Workflow handle. +### Add async handlers to use `await` {#async-handlers} -
- - View the source code in the context of the rest of the application code. - {' '} +Signal and Update handlers can be `async def` as well as `def`. +Using `async def` allows you to use `await` with Activities, Child Workflows, [`asyncio.sleep`](https://docs.python.org/3/library/asyncio-task.html#asyncio.sleep) Timers, [`workflow.wait_condition`](https://python.temporal.io/temporalio.workflow.html#wait_condition) conditions, etc. +This expands the possibilities for what can be done by a handler but it also means that handler executions and your main Workflow method are all running concurrently, with switching occurring between them at `await` calls. +It's essential to understand the things that could go wrong in order to use `async def` handlers safely. +See [Workflow message passing](/encyclopedia/workflow-message-passing) for guidance on safe usage of async Signal and Update handlers, the [Safe message handlers](https://github.com/temporalio/samples-python/tree/main/updates_and_signals/safe_message_handlers) sample, and the [Controlling handler concurrency](#control-handler-concurrency) and [Waiting for message handlers to finish](#wait-for-message-handlers) sections below. -
+The following code executes an Activity that makes a network call to a remote service. +It modifies the Update handler from earlier on this page, turning it into an `async def`: ```python -# ... - result = await handle.query(GreetingWorkflow.greeting) +@activity.defn +async def call_greeting_service(to_language: Language) -> Optional[str]: + await asyncio.sleep(0.2) # Pretend that we are calling a remote service. + greetings = { + Language.Arabic: "مرحبا بالعالم", + Language.Chinese: "你好,世界", + Language.English: "Hello, world", + Language.French: "Bonjour, monde", + Language.Hindi: "नमस्ते दुनिया", + Language.Spanish: "Hola mundo", + } + return greetings.get(to_language) + + +@workflow.defn +class GreetingWorkflow: + def __init__(self) -> None: + self.lock = asyncio.Lock() + ... + ... + @workflow.update + async def set_language(self, language: Language) -> Language: + if language not in self.greetings: + # 👉 Use a lock here to ensure that multiple calls to set_language are processed in order. + async with self.lock: + greeting = await workflow.execute_activity( + call_greeting_service, + language, + start_to_close_timeout=timedelta(seconds=10), + ) + if greeting is None: + # 👉 An update validator cannot be async, so cannot be used to check that the remote + # call_greeting_service supports the requested language. Raising ApplicationError + # will fail the Update, but the WorkflowExecutionUpdateAccepted event will still be + # added to history. + raise ApplicationError( + f"Greeting service does not support {language.name}" + ) + self.greetings[language] = greeting + previous_language, self.language = self.language, language + return previous_language ``` -## Develop with Updates {#updates} +After updating the code to use an `async def`, your Update handler can schedule an Activity and await the result. +Although an `async def` Signal handler can initiate similar network tasks, using an Update handler allows the client to receive a result or error once the Activity completes. +This lets your client track the progress of asynchronous work performed by the Update's Activities, Child Workflows, etc. + +### Add wait conditions to block + +Sometimes, `async def` Signal or Update handlers need to meet certain conditions before they should continue. +You can use a wait condition ([`workflow.wait_condition`](https://python.temporal.io/temporalio.workflow.html#wait_condition)) to set a function that prevents the code from proceeding until the condition returns `True`. +This is an important feature that helps you control your handler logic. -An [Update](/encyclopedia/workflow-message-passing#sending-updates) is an operation that can mutate the state of a Workflow Execution and return a response. +Here are two important use cases for `workflow.wait_condition`: -### How to define an Update {#define-update} +- Waiting in a handler until it is appropriate to continue. +- Waiting in the main Workflow until all active handlers have finished. -Workflow Updates handlers are methods in your Workflow Definition designed to handle updates. -These updates can be triggered during the lifecycle of a Workflow Execution. +The condition state you're waiting for can be updated by and reflect any part of the Workflow code. +This includes the main Workflow method, other handlers, or child coroutines spawned by the main Workflow method, and so forth. -**Define an Update Handler** +### Use wait conditions in handlers -To define an update handler, use the [@workflow.update](https://python.temporal.io/temporalio.workflow.html#update) decorator on a method within your Workflow. This decorator can be applied to both asynchronous and synchronous methods. +It's common to use a Workflow wait condition to wait until a handler should start. +You can also use wait conditions anywhere else in the handler to wait for a specific condition to become `True`. +This allows you to write handlers that pause at multiple points, each time waiting for a required condition to become `True`. -- **Decorator Usage:** Apply `@workflow.update` to the method intended to handle updates. -- **Overriding:** If a method with this decorator is overridden, the overriding method should also be decorated with `@workflow.update`. -- **Validator Method:** Optionally, you can define a validator method for the update handler. This validator is specified using `@update_handler_method_name.validator` and is invoked before the update handler. -- **Method Parameters:** Update handlers should only use positional parameters. For non-dynamic methods, it's recommended to use a single parameter that is an object or data class, which allows for future expansion of fields. -- **Return Values:** The update handler can return a serializable value. This value is sent back to the caller of the update. +Consider a `ready_for_update_to_execute` method that runs before your Update handler executes. +The `workflow.wait_condition` method waits until your condition is met: ```python -# ... @workflow.update - async def update_workflow_status(self) -> str: - self.is_complete = True - return "Workflow status updated" + async def my_update(self, update_input: UpdateInput) -> str: + await workflow.wait_condition( + lambda: self.ready_for_update_to_execute(update_input) + ) ``` -### Send an Update from a Temporal Client {#send-update-from-client} +Remember: Handlers can execute before the main Workflow method starts. -**How to send an Update from a Temporal Client** +### Ensure your handlers finish before the Workflow completes {#wait-for-message-handlers} -To send a Workflow Update from a Temporal Client, call the [execute_update](https://python.temporal.io/temporalio.client.WorkflowHandle.html#execute_update) method on the [WorkflowHandle](https://python.temporal.io/temporalio.client.WorkflowHandle.html) class. +Workflow wait conditions can ensure your handler completes before a Workflow finishes. +When your Workflow uses `async def` Signal or Update handlers, your main Workflow method can return or continue-as-new while a handler is still waiting on an async task, such as an Activity result. +The Workflow completing may interrupt the handler before it finishes crucial work and cause client errors when trying retrieve Update results. +Use [`workflow.wait_condition`](https://python.temporal.io/temporalio.workflow.html#wait_condition) and [`all_handlers_finished`](https://python.temporal.io/temporalio.workflow.html#all_handlers_finished) to address this problem and allow your Workflow to end smoothly: ```python -# ... - update_result = await handle.execute_update( - HelloWorldWorkflow.update_workflow_status - ) - print(f"Update Result: {update_result}") +@workflow.defn +class MyWorkflow: + @workflow.run + async def run(self) -> str: + ... + await workflow.wait_condition(workflow.all_handlers_finished) + return "workflow-result" ``` -## Dynamic Handler {#dynamic-handler} +By default, your Worker will log a warning when you allow a Workflow Execution to finish with unfinished handler executions. +You can silence these warnings on a per-handler basis by passing the `unfinished_policy` argument to the [`@workflow.signal`](https://python.temporal.io/temporalio.workflow.html#signal) / [`workflow.update`](https://python.temporal.io/temporalio.workflow.html#update) decorator: + +```python + @workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON) + async def my_update(self) -> None: + ... +``` -**What is a Dynamic Handler?** +See [Finishing handlers before the Workflow completes](/encyclopedia/workflow-message-passing#finishing-message-handlers) for more information. -Temporal supports Dynamic Workflows, Activities, Signals, and Queries. -These are unnamed handlers that are invoked if no other statically defined handler with the given name exists. +### Use `asyncio.Lock` to prevent concurrent handler execution {#control-handler-concurrency} -Dynamic Handlers provide flexibility to handle cases where the names of Workflows, Activities, Signals, or Queries aren't known at run time. +Concurrent processes can interact in unpredictable ways. +Incorrectly written [concurrent message-passing](/encyclopedia/workflow-message-passing#message-handler-concurrency) code may not work correctly when multiple handler instances run simultaneously. +Here's an example of a pathological case: -:::caution +```python +@workflow.defn +class MyWorkflow: -Dynamic Handlers should be used judiciously as a fallback mechanism rather than the primary approach. -Overusing them can lead to maintainability and debugging issues down the line. + @workflow.signal + async def bad_async_handler(self): + data = await workflow.execute_activity( + fetch_data, start_to_close_timeout=timedelta(seconds=10) + ) + self.x = data.x + # 🐛🐛 Bug!! If multiple instances of this handler are executing concurrently, then + # there may be times when the Workflow has self.x from one Activity execution and self.y from another. + await asyncio.sleep(1) # or await anything else + self.y = data.y +``` -Instead, Workflows, Activities, Signals, and Queries should be defined statically whenever possible, with clear names that indicate their purpose. -Use static definitions as the primary way of structuring your Workflows. +Coordinating access using `asyncio.Lock` corrects this code. +Locking makes sure that only one handler instance can execute a specific section of code at any given time: -Reserve Dynamic Handlers for cases where the handler names are not known at compile time and need to be looked up dynamically at runtime. -They are meant to handle edge cases and act as a catch-all, not as the main way of invoking logic. +```python +@workflow.defn +class MyWorkflow: + def __init__(self) -> None: + ... + self.lock = asyncio.Lock() + ... -::: + @workflow.signal + async def safe_async_handler(self): + async with self.lock: + data = await workflow.execute_activity( + fetch_data, start_to_close_timeout=timedelta(seconds=10) + ) + self.x = data.x + # ✅ OK: the scheduler may switch now to a different handler execution, or to the main workflow + # method, but no other execution of this handler can run until this execution finishes. + await asyncio.sleep(1) # or await anything else + self.y = data.y +``` -### Set a Dynamic Signal {#set-a-dynamic-signal} +## Message handler troubleshooting {#message-handler-troubleshooting} -**How to set a Dynamic Signal** +When sending a Signal, Update, or Query to a Workflow, your Client might encounter the following errors: -A Dynamic Signal in Temporal is a Signal that is invoked dynamically at runtime if no other Signal with the same input is registered. -A Signal can be made dynamic by adding `dynamic=True` to the `@signal.defn` decorator. +- **The client can't contact the server**: + You'll receive a [`temporalio.service.RPCError`](https://python.temporal.io/temporalio.service.RPCError.html) on which the `status` attribute is [`RPCStatusCode`](https://python.temporal.io/temporalio.service.RPCStatusCode.html) `UNAVAILABLE` (after some retries; see the `retry_config` argument to [`Client.connect`](https://python.temporal.io/temporalio.client.Client.html#connect)). -The Signal Handler should accept `self`, a string input, and a `Sequence[temporalio.common.RawValue]`. -The [payload_converter()](https://python.temporal.io/temporalio.workflow.html#payload_converter) function is used to convert a `RawValue` object to the desired type. +- **The workflow does not exist**: + You'll receive an [`temporalio.service.RPCError`](https://python.temporal.io/temporalio.service.RPCError.html) exception on which the `status` attribute is [`RPCStatusCode`](https://python.temporal.io/temporalio.service.RPCStatusCode.html) `NOT_FOUND`. -
- - View the source code in the context of the rest of the application code. - {' '} +See [Exceptions in message handlers](/encyclopedia/workflow-message-passing#exceptions) for a non–Python-specific discussion of this topic. -
+### Problems when sending a Signal {#signal-problems} -```python -# ... - @workflow.signal(dynamic=True) - async def dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None: - await self._pending_greetings.put(name) -``` +When using Signal, the only exception that will result from your requests during its execution is `RPCError`. +All handlers may experience additional exceptions during the initial (pre-Worker) part of a handler request lifecycle. + +For Queries and Updates, the client waits for a response from the Worker. +If an issue occurs during the handler Execution by the Worker, the client may receive an exception. + +### Problems when sending an Update {#update-problems} + +When working with Updates, you may encounter these errors: + +- **No Workflow Workers are polling the Task Queue**: + Your request will be retried by the SDK Client indefinitely. + You can use [`asyncio.timeout`](https://docs.python.org/3/library/asyncio-task.html#timeouts) to impose a timeout. + This raises a [`temporalio.client.WorkflowUpdateRPCTimeoutOrCancelledError`](https://python.temporal.io/temporalio.client.WorkflowUpdateRPCTimeoutOrCancelledError.html) exception. -### Set a Dynamic Query {#set-a-dynamic-query} +- **Update failed**: You'll receive a [`temporalio.client.WorkflowUpdateFailedError`](https://python.temporal.io/temporalio.client.WorkflowUpdateFailedError.html) exception. + There are two ways this can happen: -**How to set a Dynamic Query** + - The Update was rejected by an Update validator defined in the Workflow alongside the Update handler. -A Dynamic Query in Temporal is a Query that is invoked dynamically at runtime if no other Query with the same name is registered. -A Query can be made dynamic by adding `dynamic=True` to the `@query.defn` decorator. + - The Update failed after having been accepted. -The Query Handler should accept `self`, a string name, and a `Sequence[temporalio.common.RawValue]`. -The [payload_converter()](https://python.temporal.io/temporalio.workflow.html#payload_converter) function is used to convert a `RawValue` object to the desired type. + Update failures are like [Workflow failures](/references/failures#errors-in-workflows). + Issues that cause a Workflow failure in the main method also cause Update failures in the Update handler. + These might include: -
- - View the source code in the context of the rest of the application code. - {' '} + - A failed Child Workflow + - A failed Activity (if the Activity retries have been set to a finite number) + - The Workflow author raising `ApplicationFailure` + - Any error listed in [workflow_failure_exception_types](https://python.temporal.io/temporalio.worker.Worker.html) (empty by default) -
+- **The handler caused the Workflow Task to fail**: + A [Workflow Task Failure](/references/failures#errors-in-workflows) causes the server to retry Workflow Tasks indefinitely. What happens to your Update request depends on its stage: + - If the request hasn't been accepted by the server, you receive a `FAILED_PRECONDITION` [`temporalio.service.RPCError`](https://python.temporal.io/temporalio.service.RPCError.html) exception. + - If the request has been accepted, it is durable. + Once the Workflow is healthy again after a code deploy, use an [`UpdateHandle`](https://python.temporal.io/temporalio.client.WorkflowUpdateHandle.html) to fetch the Update result. + +- **The Workflow finished while the Update handler execution was in progress**: + You'll receive a [`temporalio.service.RPCError`](https://python.temporal.io/temporalio.service.RPCError.html) exception with a `status` attribute of [`RPCStatusCode`](https://python.temporal.io/temporalio.service.RPCStatusCode.html) `NOT_FOUND`. + This happens if the Workflow finished while the Update handler execution was in progress, for example because + + - The Workflow was canceled or failed. + + - The Workflow completed normally or continued-as-new and the Workflow author did not [wait for handlers to be finished](/encyclopedia/workflow-message-passing#finishing-message-handlers). + +### Problems when sending a Query {#query-problems} + +When working with Queries, you may encounter these errors: + +- **There is no Workflow Worker polling the Task Queue**: + You'll receive a [`temporalio.service.RPCError`](https://python.temporal.io/temporalio.service.RPCError.html) exception on which the `status` attribute is [`RPCStatusCode`](https://python.temporal.io/temporalio.service.RPCStatusCode.html) `FAILED_PRECONDITION`. + +- **Query failed**: + You'll receive a [`temporalio.client.WorkflowQueryFailedError`](https://python.temporal.io/temporalio.client.WorkflowQueryFailedError.html) exception if something goes wrong during a Query. + Any exception in a Query handler will trigger this error. + This differs from Signal and Update requests, where exceptions can lead to Workflow Task Failure instead. + +- **The handler caused the Workflow Task to fail.** + This would happen, for example, if the Query handler blocks the thread for too long without yielding. + +## Dynamic components {#dynamic-handler} + +A dynamic Workflow, Activitity, Signal, Update, or Query is a kind of unnamed item. +Normally, these items are registered by name with the Worker and invoked at runtime. +When an unregistered or unrecognized Workflow, Activity, or message request arrives with a recognized method signature, the Worker can use a pre-registered dynamic stand-in. + +For example, you might send a request to start a Workflow named "MyUnknownWorkflow". +After receiving a Workflow Task, the Worker may find that there's no registered Workflow Definitions of that type. +It then checks to see if there's a registered dynamic Workflow. +If the dynamic Workflow signature matches the incoming Workflow signature, the Worker invokes that just as it would invoke a non-dynamic statically named version. + +By registering dynamic versions of your Temporal components, the Worker can fall back to these alternate implementations for name mismatches. + +:::caution + +Use dynamic elements judiciously and as a fallback mechanism, not a primary design. +They can introduce long-term maintainability and debugging issues. +Reserve dynamic invocation use for cases where a name is not or can't be known at compile time. + +::: + +### Set a dynamic Signal, Query, or Update handler {#set-a-dynamic-signal} + +A dynamic Signal, Query, or Update refers to a special stand-in handler. +It's used when an unregistered handler request arrives. + +Consider a Signal, where you might send something like `workflow.signal(MyWorkflow.my_signal_method, my_arg)`. +This is a type-safe compiler-checked approach that guarantees a method exists. +There's also a non-type-safe string-based form: `workflow.signal('some-name', my_arg)`. +When sent to the server, the name is checked only after arriving at the Worker. +This is where "dynamic handlers" come in. + +After failing to find a handler with a matching name and type, the Worker checks for a registered dynamic stand-in handler. +If found, the Worker uses that instead. + +You must opt handlers into dynamic access. +Add `dynamic=True` to the handler decorator (for example, `@workflow.signal(dynamic=True)`) to make a handler dynamic. +The handler's signature must accept `(self, name: str, args: Sequence[RawValue])`. +Use a [payload_converter](https://python.temporal.io/temporalio.workflow.html#payload_converter) function to convert `RawValue` objects to your required type. +For example: ```python -# ... - @workflow.query(dynamic=True) - def dynamic_query(self, input: str, args: Sequence[RawValue]) -> str: - return self._greeting +from typing import Sequence + +from temporalio.common import RawValue +... + + @workflow.signal(dynamic=True) + async def dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None: + ... ``` -### Set a Dynamic Workflow {#set-a-dynamic-workflow} +This sample creates a `dynamic_signal` Signal. +When an unregistered or unrecognized Signal arrives with a matching signature, dynamic assignment uses this handler to manage the Signal. +It is responsible for transforming the sequence contents into usable data in a form that the method's logic can process and act on. -**How to set a Dynamic Workflow** +### Set a dynamic Workflow {#set-a-dynamic-workflow} -A Dynamic Workflow in Temporal is a Workflow that is invoked dynamically at runtime if no other Workflow with the same name is registered. -A Workflow can be made dynamic by adding `dynamic=True` to the `@workflow.defn` decorator. -You must register the Workflow with the [Worker](https://python.temporal.io/temporalio.worker.html) before it can be invoked. +A dynamic Workflow refers to a special stand-in Workflow Definition. +It's used when an unknown Workflow Execution request arrives. + +Consider the "MyUnknownWorkflow" example described earlier. +The Worker may find there's no registered Workflow Definitions of that name or type. +After failing to find a Workflow Definition with a matching type, the Worker looks for a dynamic stand-in. +If found, it invokes that instead. -The Workflow Definition must then accept a single argument of type `Sequence[temporalio.common.RawValue]`. -The [payload_converter()](https://python.temporal.io/temporalio.workflow.html#payload_converter) function is used to convert a `RawValue` object to the desired type. +To participate, your Workflow must opt into dynamic access. +Adding `dynamic=True` to the `@workflow.defn` decorator makes the Workflow Definition eligible to participate in dynamic invocation. +You must register the Workflow with the [Worker](https://python.temporal.io/temporalio.worker.html) before it can be invoked. -
- - View the source code in the context of the rest of the application code. - {' '} +The Workflow Definition's primary Workflow method must accept a single argument of type `Sequence[temporalio.common.RawValue]`. +Use a [payload_converter](https://python.temporal.io/temporalio.workflow.html#payload_converter) function to convert `RawValue` objects to your required type. +For example: -
+
See full sample
```python # ... @@ -444,23 +670,22 @@ class DynamicWorkflow: ) ``` -### Set a Dynamic Activity {#set-a-dynamic-activity} +This Workflow converts the first `Sequence` element to a string, and uses that to execute an Activity. + +### Set a dynamic Activity {#set-a-dynamic-activity} -**How to set a Dynamic Activity** +A dynamic Activity is a stand-in implementation. +It's used when an Activity Task with an unknown Activity type is received by the Worker. -A Dynamic Activity in Temporal is an Activity that is invoked dynamically at runtime if no other Activity with the same name is registered. -An Activity can be made dynamic by adding `dynamic=True` to the `@activity.defn` decorator. +To participate, your Activity must opt into dynamic access. +Adding `dynamic=True` to the `@activity.defn` decorator makes the Workflow Definition eligible to participate in dynamic invocation. You must register the Activity with the [Worker](https://python.temporal.io/temporalio.worker.html) before it can be invoked. The Activity Definition must then accept a single argument of type `Sequence[temporalio.common.RawValue]`. -The [payload_converter()](https://python.temporal.io/temporalio.activity.html#payload_converter) function is used to convert a `RawValue` object to the desired type. +Use a [payload_converter](https://python.temporal.io/temporalio.activity.html#payload_converter) function to convert `RawValue` objects to your required types. +For example: -
- - View the source code in the context of the rest of the application code. - {' '} - -
+
See full sample
```python # ... @@ -481,3 +706,7 @@ class GreetingWorkflow: start_to_close_timeout=timedelta(seconds=10), ) ``` + +This example invokes an unregistered Activity by name. +The Worker resolves it using the registered dynamic Activity instead. +When possible, prefer to use compiler-checked type-safe arguments rather than Activity name strings. diff --git a/docs/encyclopedia/application-message-passing.mdx b/docs/encyclopedia/application-message-passing.mdx index 5807e7e084..d88e597bdf 100644 --- a/docs/encyclopedia/application-message-passing.mdx +++ b/docs/encyclopedia/application-message-passing.mdx @@ -31,9 +31,9 @@ keywords: import PrettyImage from '@site/src/components/pretty-image/PrettyImage'; import { RelatedReadContainer, RelatedReadItem } from '@site/src/components/related-read/RelatedRead'; -Workflows can be thought of as stateful web services that can receive messages. The Workflow can have powerful message handlers akin to endpoints that can react to the incoming messages in combination with the current state of the Workflow. -Temporal supports three types of messages: Signals, Queries, and Updates. -To define these, consider these messages from the perspective of the client making the request: +Workflows can be thought of as stateful web services that can receive messages. +The Workflow can have powerful message handlers akin to endpoints that react to the incoming messages in combination with the current state of the Workflow. +Temporal supports three types of messages: Signals, Queries, and Updates: - Queries are read requests. They can read the current state of the Workflow but cannot block in doing so. - Signals are asynchronous write requests. They cause changes in the running Workflow, but you cannot await any response or error. @@ -145,8 +145,8 @@ In most languages (except Go), you may call `executeUpdate` to complete an Updat Alternatively, to start an Update, you may call `startUpdate` and pass in the Workflow Update Stage as an argument. You have two choices on what to await: -- Accepted - wait til the Worker is contacted, which ensures that the Update is persisted. See [Update Validators](#update-validators) for more information. -- Completed - wait til the handler finishes and returns a result. (This is equivalent to `executeUpdate`.) +- Accepted - wait until the Worker is contacted, which ensures that the Update is persisted. See [Update Validators](#update-validators) for more information. +- Completed - wait until the handler finishes and returns a result. (This is equivalent to `executeUpdate`.) The start call will give you a handle you can use to track the Update, determine whether it was Accepted, and ultimately get its result or an error. @@ -231,7 +231,7 @@ A Signal or Update handler can block waiting for the Workflow to reach a certain Sometimes, you need your message handler to wait for long-running operations such as executing an Activity. When this happens, the handler will yield control back to [the loop](#message-handler-concurrency). This means that your handlers can have race conditions if you’re not careful. You can guard your handlers with concurrency primitives like mutexes or semaphores, but you should use versions of these primitives provided for Workflows in most languages. See the links below for examples of how to use them in your SDK. -#### Inject work into the main Workflow +#### Inject work into the main Workflow {#injecting-work-into-main-workflow} Sometimes you want to process work provided by messages in the main Workflow. Perhaps you’d like to accumulate several messages before acting on any of them. For example, message handlers might put work into a queue, which can then be picked up and processed in an event loop that you yourself write. This option is considered advanced but offers powerful flexibility. And if you serialize the handling of your messages inside your main Workflow, you can avoid using concurrency primitives like mutexes and semaphores. See the links above for how to do this in your SDK. @@ -244,7 +244,7 @@ If you don’t need to ensure that your handlers complete, you may specify your See the links below for how to ensure handlers are finished in your SDK. -#### Ensuring your messages are processed exactly once +#### Ensuring your messages are processed exactly once {#exactly-once-message-processing} Many developers want their message handlers to run exactly once--to be idempotent--in cases where the same Signal or Update is delivered twice or sent by two different call sites. Temporal deduplicates messages for you on the server, but there is one important case when you need to think about this yourself when authoring a Workflow, and one when sending Signals and Updates. @@ -300,7 +300,7 @@ Once the Update handler is finished and has returned a value, the operation is c --> -### Exceptions in message handlers +### Exceptions in message handlers {#exceptions} When throwing an exception in a message handler, you should decide whether to make it an [Application Failure](/references/failures#application-failure). The implications are different between Signals and Updates. @@ -324,7 +324,7 @@ If you throw any other exception, by default, it will cause a [Workflow Task Fai #### Errors and panics in message handlers in the Go SDK -In Go, returning an error behaves like an [Application Failure](/references/failures#application-failure) in the other SDKs. Panics behave like non-Application Failure exceptions in other languages, in that they fail the Signal or Update handler tasks. +In Go, returning an error behaves like an [Application Failure](/references/failures#application-failure) in the other SDKs. Panics behave like non-Application Failure exceptions in other languages, in that they cause a [Workflow Task Failure](/references/failures#workflow-task-failures). ### Writing Signal Handlers {#writing-signal-handlers} @@ -334,7 +334,7 @@ Use these links to see a simple Signal handler. - +