Skip to content

Commit

Permalink
Shopping cart (UWS)
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Dec 12, 2024
1 parent 9ba977b commit 1c1e69c
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 0 deletions.
16 changes: 16 additions & 0 deletions message_passing/update_shopping_cart/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Update With Start: Shopping Cart

This sample illustrates the use of update-with-start to send Updates to a Workflow, starting the Workflow if
it is not running yet. The Workflow represents a Shopping Cart in an e-commerce application, and
update-with-start is used to add items to the cart, receiving back the updated cart subtotal.

Run the following from this directory:

poetry run python worker.py

Then, in another terminal:

poetry run python starter.py

This will start a worker to run your workflow and activities, then simulate a backend application receiving
requests to add items to a shopping cart, before finalizing the order.
1 change: 1 addition & 0 deletions message_passing/update_shopping_cart/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TASK_QUEUE = "uws"
21 changes: 21 additions & 0 deletions message_passing/update_shopping_cart/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import asyncio
from dataclasses import dataclass
from decimal import Decimal
from typing import Optional

from temporalio import activity


@dataclass
class ShoppingCartItem:
sku: str
quantity: int


@activity.defn
async def get_price(item: ShoppingCartItem) -> Optional[str]:
await asyncio.sleep(0.1)
price = None if item.sku == "sku-456" else Decimal("5.99")
if price is None:
return None
return str(price * item.quantity)
69 changes: 69 additions & 0 deletions message_passing/update_shopping_cart/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import asyncio
from decimal import Decimal
from typing import Optional, Tuple

from temporalio import common
from temporalio.client import (
Client,
WithStartWorkflowOperation,
WorkflowHandle,
WorkflowUpdateFailedError,
)

from message_passing.update_shopping_cart.workflows import (
ShoppingCartItem,
ShoppingCartWorkflow,
)


async def handle_add_item_request(
session_id: str, item_id: str, quantity: int, temporal_client: Client
) -> Tuple[Optional[Decimal], WorkflowHandle]:
"""
Handle a client request to add an item to the shopping cart. The user is not logged in, but a session ID is
available from a cookie, and we use this as the cart ID. The Temporal client was created at service-start
time and is shared by all request handlers.
A Workflow Type exists that can be used to represent a shopping cart. The method uses update-with-start to
add an item to the shopping cart, creating the cart if it doesn't already exist.
Note that the workflow handle is available, even if the Update fails.
"""
cart_id = f"cart-{session_id}"
start_op = WithStartWorkflowOperation(
ShoppingCartWorkflow.run,
id=cart_id,
id_conflict_policy=common.WorkflowIDConflictPolicy.USE_EXISTING,
task_queue="uws",
)
try:
price = Decimal(
await temporal_client.execute_update_with_start(
ShoppingCartWorkflow.add_item,
ShoppingCartItem(sku=item_id, quantity=quantity),
start_workflow_operation=start_op,
)
)
except WorkflowUpdateFailedError:
price = None

return price, await start_op.workflow_handle()


async def main():
print("🛒")
temporal_client = await Client.connect("localhost:7233")
subtotal_1, _ = await handle_add_item_request(
"session-777", "sku-123", 1, temporal_client
)
subtotal_2, wf_handle = await handle_add_item_request(
"session-777", "sku-456", 1, temporal_client
)
print(f"subtotals were, {[subtotal_1, subtotal_2]}")
await wf_handle.signal(ShoppingCartWorkflow.checkout)
final_order = await wf_handle.result()
print(f"final order: {final_order}")


if __name__ == "__main__":
asyncio.run(main())
33 changes: 33 additions & 0 deletions message_passing/update_shopping_cart/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.worker import Worker

from message_passing.update_shopping_cart import TASK_QUEUE, workflows

interrupt_event = asyncio.Event()


async def main():
logging.basicConfig(level=logging.INFO)

client = await Client.connect("localhost:7233")

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[workflows.ShoppingCartWorkflow],
):
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
52 changes: 52 additions & 0 deletions message_passing/update_shopping_cart/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from dataclasses import dataclass
from decimal import Decimal
from typing import Tuple

from temporalio import workflow
from temporalio.exceptions import ApplicationError

from message_passing.update_shopping_cart.activities import ShoppingCartItem, get_price


@dataclass
class FinalizedOrder:
id: str
items: list[Tuple[ShoppingCartItem, str]]
total: str


@workflow.defn
class ShoppingCartWorkflow:
def __init__(self):
self.items: list[Tuple[ShoppingCartItem, Decimal]] = []
self.order_submitted = False

@workflow.run
async def run(self) -> FinalizedOrder:
await workflow.wait_condition(
lambda: workflow.all_handlers_finished() and self.order_submitted
)
return FinalizedOrder(
id=workflow.info().workflow_id,
items=[(item, str(price)) for item, price in self.items],
total=str(
sum(item.quantity * price for item, price in self.items)
or Decimal("0.00")
),
)

@workflow.update
async def add_item(self, item: ShoppingCartItem) -> str:
price = await get_price(item)
if price is None:
raise ApplicationError(
f"Item unavailable: {item}",
)
self.items.append((item, Decimal(price)))
return str(
sum(item.quantity * price for item, price in self.items) or Decimal("0.00")
)

@workflow.signal
def checkout(self):
self.order_submitted = True

0 comments on commit 1c1e69c

Please sign in to comment.