From 1c1e69cf6ef601127853aa8530ca8ff276c9fc45 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 6 Dec 2024 11:46:52 -0500 Subject: [PATCH] Shopping cart (UWS) --- .../update_shopping_cart/README.md | 16 +++++ .../update_shopping_cart/__init__.py | 1 + .../update_shopping_cart/activities.py | 21 ++++++ .../update_shopping_cart/starter.py | 69 +++++++++++++++++++ .../update_shopping_cart/worker.py | 33 +++++++++ .../update_shopping_cart/workflows.py | 52 ++++++++++++++ 6 files changed, 192 insertions(+) create mode 100644 message_passing/update_shopping_cart/README.md create mode 100644 message_passing/update_shopping_cart/__init__.py create mode 100644 message_passing/update_shopping_cart/activities.py create mode 100644 message_passing/update_shopping_cart/starter.py create mode 100644 message_passing/update_shopping_cart/worker.py create mode 100644 message_passing/update_shopping_cart/workflows.py diff --git a/message_passing/update_shopping_cart/README.md b/message_passing/update_shopping_cart/README.md new file mode 100644 index 00000000..df7d9ebc --- /dev/null +++ b/message_passing/update_shopping_cart/README.md @@ -0,0 +1,16 @@ +# Update With Start: Shopping Cart + +This sample illustrates the use of update-with-start to send Updates to a Workflow, starting the Workflow if +it is not running yet. The Workflow represents a Shopping Cart in an e-commerce application, and +update-with-start is used to add items to the cart, receiving back the updated cart subtotal. + +Run the following from this directory: + + poetry run python worker.py + +Then, in another terminal: + + poetry run python starter.py + +This will start a worker to run your workflow and activities, then simulate a backend application receiving +requests to add items to a shopping cart, before finalizing the order. diff --git a/message_passing/update_shopping_cart/__init__.py b/message_passing/update_shopping_cart/__init__.py new file mode 100644 index 00000000..b723460a --- /dev/null +++ b/message_passing/update_shopping_cart/__init__.py @@ -0,0 +1 @@ +TASK_QUEUE = "uws" diff --git a/message_passing/update_shopping_cart/activities.py b/message_passing/update_shopping_cart/activities.py new file mode 100644 index 00000000..b26308f6 --- /dev/null +++ b/message_passing/update_shopping_cart/activities.py @@ -0,0 +1,21 @@ +import asyncio +from dataclasses import dataclass +from decimal import Decimal +from typing import Optional + +from temporalio import activity + + +@dataclass +class ShoppingCartItem: + sku: str + quantity: int + + +@activity.defn +async def get_price(item: ShoppingCartItem) -> Optional[str]: + await asyncio.sleep(0.1) + price = None if item.sku == "sku-456" else Decimal("5.99") + if price is None: + return None + return str(price * item.quantity) diff --git a/message_passing/update_shopping_cart/starter.py b/message_passing/update_shopping_cart/starter.py new file mode 100644 index 00000000..0546c8aa --- /dev/null +++ b/message_passing/update_shopping_cart/starter.py @@ -0,0 +1,69 @@ +import asyncio +from decimal import Decimal +from typing import Optional, Tuple + +from temporalio import common +from temporalio.client import ( + Client, + WithStartWorkflowOperation, + WorkflowHandle, + WorkflowUpdateFailedError, +) + +from message_passing.update_shopping_cart.workflows import ( + ShoppingCartItem, + ShoppingCartWorkflow, +) + + +async def handle_add_item_request( + session_id: str, item_id: str, quantity: int, temporal_client: Client +) -> Tuple[Optional[Decimal], WorkflowHandle]: + """ + Handle a client request to add an item to the shopping cart. The user is not logged in, but a session ID is + available from a cookie, and we use this as the cart ID. The Temporal client was created at service-start + time and is shared by all request handlers. + + A Workflow Type exists that can be used to represent a shopping cart. The method uses update-with-start to + add an item to the shopping cart, creating the cart if it doesn't already exist. + + Note that the workflow handle is available, even if the Update fails. + """ + cart_id = f"cart-{session_id}" + start_op = WithStartWorkflowOperation( + ShoppingCartWorkflow.run, + id=cart_id, + id_conflict_policy=common.WorkflowIDConflictPolicy.USE_EXISTING, + task_queue="uws", + ) + try: + price = Decimal( + await temporal_client.execute_update_with_start( + ShoppingCartWorkflow.add_item, + ShoppingCartItem(sku=item_id, quantity=quantity), + start_workflow_operation=start_op, + ) + ) + except WorkflowUpdateFailedError: + price = None + + return price, await start_op.workflow_handle() + + +async def main(): + print("🛒") + temporal_client = await Client.connect("localhost:7233") + subtotal_1, _ = await handle_add_item_request( + "session-777", "sku-123", 1, temporal_client + ) + subtotal_2, wf_handle = await handle_add_item_request( + "session-777", "sku-456", 1, temporal_client + ) + print(f"subtotals were, {[subtotal_1, subtotal_2]}") + await wf_handle.signal(ShoppingCartWorkflow.checkout) + final_order = await wf_handle.result() + print(f"final order: {final_order}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/message_passing/update_shopping_cart/worker.py b/message_passing/update_shopping_cart/worker.py new file mode 100644 index 00000000..4eee40a0 --- /dev/null +++ b/message_passing/update_shopping_cart/worker.py @@ -0,0 +1,33 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from message_passing.update_shopping_cart import TASK_QUEUE, workflows + +interrupt_event = asyncio.Event() + + +async def main(): + logging.basicConfig(level=logging.INFO) + + client = await Client.connect("localhost:7233") + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[workflows.ShoppingCartWorkflow], + ): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/message_passing/update_shopping_cart/workflows.py b/message_passing/update_shopping_cart/workflows.py new file mode 100644 index 00000000..d3a77281 --- /dev/null +++ b/message_passing/update_shopping_cart/workflows.py @@ -0,0 +1,52 @@ +from dataclasses import dataclass +from decimal import Decimal +from typing import Tuple + +from temporalio import workflow +from temporalio.exceptions import ApplicationError + +from message_passing.update_shopping_cart.activities import ShoppingCartItem, get_price + + +@dataclass +class FinalizedOrder: + id: str + items: list[Tuple[ShoppingCartItem, str]] + total: str + + +@workflow.defn +class ShoppingCartWorkflow: + def __init__(self): + self.items: list[Tuple[ShoppingCartItem, Decimal]] = [] + self.order_submitted = False + + @workflow.run + async def run(self) -> FinalizedOrder: + await workflow.wait_condition( + lambda: workflow.all_handlers_finished() and self.order_submitted + ) + return FinalizedOrder( + id=workflow.info().workflow_id, + items=[(item, str(price)) for item, price in self.items], + total=str( + sum(item.quantity * price for item, price in self.items) + or Decimal("0.00") + ), + ) + + @workflow.update + async def add_item(self, item: ShoppingCartItem) -> str: + price = await get_price(item) + if price is None: + raise ApplicationError( + f"Item unavailable: {item}", + ) + self.items.append((item, Decimal(price))) + return str( + sum(item.quantity * price for item, price in self.items) or Decimal("0.00") + ) + + @workflow.signal + def checkout(self): + self.order_submitted = True