From 0763309a16d4c59377e74b99b805aaf5705d1587 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 25 Nov 2024 21:08:18 -0500 Subject: [PATCH] Fix polling sample 2 (#152) * Fix infrequent polling sample --- .github/workflows/ci.yml | 2 +- polling/frequent/activities.py | 15 ++------- polling/infrequent/activities.py | 13 ++------ polling/periodic_sequence/activities.py | 10 ++---- polling/periodic_sequence/workflows.py | 6 ++-- polling/test_service.py | 37 ++++++++++++++--------- tests/polling/infrequent/__init__.py | 0 tests/polling/infrequent/workflow_test.py | 31 +++++++++++++++++++ 8 files changed, 64 insertions(+), 50 deletions(-) create mode 100644 tests/polling/infrequent/__init__.py create mode 100644 tests/polling/infrequent/workflow_test.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c32f9da1..8bcb6c9e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ jobs: os: [ubuntu-latest, macos-intel, macos-arm, windows-latest] include: - os: macos-intel - runsOn: macos-12 + runsOn: macos-13 - os: macos-arm runsOn: macos-14 # macOS ARM 3.8 does not have an available Python build at diff --git a/polling/frequent/activities.py b/polling/frequent/activities.py index 2b49e379..a112b417 100644 --- a/polling/frequent/activities.py +++ b/polling/frequent/activities.py @@ -1,28 +1,19 @@ import asyncio -import time -from dataclasses import dataclass from temporalio import activity -from polling.test_service import TestService - - -@dataclass -class ComposeGreetingInput: - greeting: str - name: str +from polling.test_service import ComposeGreetingInput, get_service_result @activity.defn async def compose_greeting(input: ComposeGreetingInput) -> str: - test_service = TestService() while True: try: try: - result = await test_service.get_service_result(input) + result = await get_service_result(input) activity.logger.info(f"Exiting activity ${result}") return result - except Exception as e: + except Exception: # swallow exception since service is down activity.logger.debug("Failed, trying again shortly", exc_info=True) diff --git a/polling/infrequent/activities.py b/polling/infrequent/activities.py index 2bd71587..b3db1aed 100644 --- a/polling/infrequent/activities.py +++ b/polling/infrequent/activities.py @@ -1,19 +1,10 @@ -from dataclasses import dataclass - from temporalio import activity -from polling.test_service import TestService - - -@dataclass -class ComposeGreetingInput: - greeting: str - name: str +from polling.test_service import ComposeGreetingInput, get_service_result @activity.defn async def compose_greeting(input: ComposeGreetingInput) -> str: - test_service = TestService() # If this raises an exception because it's not done yet, the activity will # continually be scheduled for retry - return await test_service.get_service_result(input) + return await get_service_result(input) diff --git a/polling/periodic_sequence/activities.py b/polling/periodic_sequence/activities.py index 1a1196c6..87b69890 100644 --- a/polling/periodic_sequence/activities.py +++ b/polling/periodic_sequence/activities.py @@ -1,14 +1,8 @@ -from dataclasses import dataclass +from typing import Any, NoReturn from temporalio import activity -@dataclass -class ComposeGreetingInput: - greeting: str - name: str - - @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: +async def compose_greeting(input: Any) -> NoReturn: raise RuntimeError("Service is down") diff --git a/polling/periodic_sequence/workflows.py b/polling/periodic_sequence/workflows.py index d38d41ce..917170c1 100644 --- a/polling/periodic_sequence/workflows.py +++ b/polling/periodic_sequence/workflows.py @@ -6,10 +6,8 @@ from temporalio.exceptions import ActivityError with workflow.unsafe.imports_passed_through(): - from polling.periodic_sequence.activities import ( - ComposeGreetingInput, - compose_greeting, - ) + from polling.periodic_sequence.activities import compose_greeting + from polling.test_service import ComposeGreetingInput @workflow.defn diff --git a/polling/test_service.py b/polling/test_service.py index 3744998a..490f8476 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -1,14 +1,23 @@ -class TestService: - def __init__(self): - self.try_attempts = 0 - self.error_attempts = 5 - - async def get_service_result(self, input): - print( - f"Attempt {self.try_attempts}" - f" of {self.error_attempts} to invoke service" - ) - self.try_attempts += 1 - if self.try_attempts % self.error_attempts == 0: - return f"{input.greeting}, {input.name}!" - raise Exception("service is down") +from dataclasses import dataclass +from typing import Counter + +from temporalio import activity + +attempts = Counter[str]() +ERROR_ATTEMPTS = 5 + + +@dataclass +class ComposeGreetingInput: + greeting: str + name: str + + +async def get_service_result(input): + workflow_id = activity.info().workflow_id + attempts[workflow_id] += 1 + + print(f"Attempt {attempts[workflow_id]} of {ERROR_ATTEMPTS} to invoke service") + if attempts[workflow_id] == ERROR_ATTEMPTS: + return f"{input.greeting}, {input.name}!" + raise Exception("service is down") diff --git a/tests/polling/infrequent/__init__.py b/tests/polling/infrequent/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/polling/infrequent/workflow_test.py b/tests/polling/infrequent/workflow_test.py new file mode 100644 index 00000000..31f3f987 --- /dev/null +++ b/tests/polling/infrequent/workflow_test.py @@ -0,0 +1,31 @@ +import uuid + +import pytest +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from polling.infrequent.activities import compose_greeting +from polling.infrequent.workflows import GreetingWorkflow + + +async def test_infrequent_polling_workflow(client: Client, env: WorkflowEnvironment): + if not env.supports_time_skipping: + pytest.skip("Too slow to test with time-skipping disabled") + + # Start a worker that hosts the workflow and activity implementations. + task_queue = f"tq-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[GreetingWorkflow], + activities=[compose_greeting], + ): + handle = await client.start_workflow( + GreetingWorkflow.run, + "Temporal", + id=f"infrequent-polling-{uuid.uuid4()}", + task_queue=task_queue, + ) + result = await handle.result() + assert result == "Hello, Temporal!"