Skip to content

Commit

Permalink
Fix polling sample 2 (#152)
Browse files Browse the repository at this point in the history
* Fix infrequent polling sample
  • Loading branch information
dandavison authored Nov 26, 2024
1 parent 58305c0 commit 0763309
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 50 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
os: [ubuntu-latest, macos-intel, macos-arm, windows-latest]
include:
- os: macos-intel
runsOn: macos-12
runsOn: macos-13
- os: macos-arm
runsOn: macos-14
# macOS ARM 3.8 does not have an available Python build at
Expand Down
15 changes: 3 additions & 12 deletions polling/frequent/activities.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,19 @@
import asyncio
import time
from dataclasses import dataclass

from temporalio import activity

from polling.test_service import TestService


@dataclass
class ComposeGreetingInput:
greeting: str
name: str
from polling.test_service import ComposeGreetingInput, get_service_result


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
test_service = TestService()
while True:
try:
try:
result = await test_service.get_service_result(input)
result = await get_service_result(input)
activity.logger.info(f"Exiting activity ${result}")
return result
except Exception as e:
except Exception:
# swallow exception since service is down
activity.logger.debug("Failed, trying again shortly", exc_info=True)

Expand Down
13 changes: 2 additions & 11 deletions polling/infrequent/activities.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
from dataclasses import dataclass

from temporalio import activity

from polling.test_service import TestService


@dataclass
class ComposeGreetingInput:
greeting: str
name: str
from polling.test_service import ComposeGreetingInput, get_service_result


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
test_service = TestService()
# If this raises an exception because it's not done yet, the activity will
# continually be scheduled for retry
return await test_service.get_service_result(input)
return await get_service_result(input)
10 changes: 2 additions & 8 deletions polling/periodic_sequence/activities.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
from dataclasses import dataclass
from typing import Any, NoReturn

from temporalio import activity


@dataclass
class ComposeGreetingInput:
greeting: str
name: str


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
async def compose_greeting(input: Any) -> NoReturn:
raise RuntimeError("Service is down")
6 changes: 2 additions & 4 deletions polling/periodic_sequence/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
from temporalio.exceptions import ActivityError

with workflow.unsafe.imports_passed_through():
from polling.periodic_sequence.activities import (
ComposeGreetingInput,
compose_greeting,
)
from polling.periodic_sequence.activities import compose_greeting
from polling.test_service import ComposeGreetingInput


@workflow.defn
Expand Down
37 changes: 23 additions & 14 deletions polling/test_service.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
class TestService:
def __init__(self):
self.try_attempts = 0
self.error_attempts = 5

async def get_service_result(self, input):
print(
f"Attempt {self.try_attempts}"
f" of {self.error_attempts} to invoke service"
)
self.try_attempts += 1
if self.try_attempts % self.error_attempts == 0:
return f"{input.greeting}, {input.name}!"
raise Exception("service is down")
from dataclasses import dataclass
from typing import Counter

from temporalio import activity

attempts = Counter[str]()
ERROR_ATTEMPTS = 5


@dataclass
class ComposeGreetingInput:
greeting: str
name: str


async def get_service_result(input):
workflow_id = activity.info().workflow_id
attempts[workflow_id] += 1

print(f"Attempt {attempts[workflow_id]} of {ERROR_ATTEMPTS} to invoke service")
if attempts[workflow_id] == ERROR_ATTEMPTS:
return f"{input.greeting}, {input.name}!"
raise Exception("service is down")
Empty file.
31 changes: 31 additions & 0 deletions tests/polling/infrequent/workflow_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import uuid

import pytest
from temporalio.client import Client
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from polling.infrequent.activities import compose_greeting
from polling.infrequent.workflows import GreetingWorkflow


async def test_infrequent_polling_workflow(client: Client, env: WorkflowEnvironment):
if not env.supports_time_skipping:
pytest.skip("Too slow to test with time-skipping disabled")

# Start a worker that hosts the workflow and activity implementations.
task_queue = f"tq-{uuid.uuid4()}"
async with Worker(
client,
task_queue=task_queue,
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
handle = await client.start_workflow(
GreetingWorkflow.run,
"Temporal",
id=f"infrequent-polling-{uuid.uuid4()}",
task_queue=task_queue,
)
result = await handle.result()
assert result == "Hello, Temporal!"

0 comments on commit 0763309

Please sign in to comment.