Skip to content

Commit

Permalink
⚡️ Endorser resubscribe (#1229)
Browse files Browse the repository at this point in the history
* add tenacity

* add retry logic to subscribe

* 🎨

* remove unused fixture

* raise exception

* remove import

* remove import

* update logging

* add test for retry log

* 🎨

---------

Co-authored-by: Mourits de Beer <[email protected]>
  • Loading branch information
cl0ete and ff137 authored Dec 12, 2024
1 parent 07a2ca7 commit 3eed833
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 35 deletions.
31 changes: 23 additions & 8 deletions endorser/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions endorser/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pydantic = "~2.10.1"
uvicorn = "~0.32.1"
ddtrace = "^2.17.0"
scalar-fastapi = "^1.0.3"
tenacity = "^9.0.0"

[tool.poetry.dev-dependencies]
anyio = "~4.7.0"
Expand Down
58 changes: 45 additions & 13 deletions endorser/services/endorsement_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
from nats.errors import BadSubscriptionError, Error, TimeoutError
from nats.js.client import JetStreamContext
from nats.js.errors import FetchTimeoutError
from tenacity import (
RetryCallState,
retry,
retry_if_exception_type,
stop_never,
wait_fixed,
)

from endorser.util.endorsement import accept_endorsement, should_accept_endorsement
from shared.constants import (
Expand Down Expand Up @@ -182,23 +189,48 @@ async def _subscribe(self) -> JetStreamContext.PullSubscription:
"""
Subscribes to the NATS subject for endorsement events.
"""
logger.info("Subscribing to NATS subject: {}", self.endorser_nats_subject)
subscribe_kwargs = {
"subject": self.endorser_nats_subject,
"durable": ENDORSER_DURABLE_CONSUMER,
"stream": NATS_STREAM,
}

logger.info("Subscribing to NATS: {}", subscribe_kwargs)

@retry(
retry=retry_if_exception_type(TimeoutError),
wait=wait_fixed(1),
stop=stop_never,
after=self._retry_log,
)
async def pull_subscribe(**kwargs):
try:
subscription = await self.jetstream.pull_subscribe(**kwargs)
logger.debug("Subscribed to NATS subject")
return subscription
except (BadSubscriptionError, Error) as e:
logger.error("Error subscribing to NATS subject: {}", e)
raise e
except Exception: # pylint: disable=W0703
logger.exception("Unknown error subscribing to NATS subject")
raise

try:
subscribe_kwargs = {
"subject": self.endorser_nats_subject,
"durable": ENDORSER_DURABLE_CONSUMER,
"stream": NATS_STREAM,
}
subscription = await self.jetstream.pull_subscribe(**subscribe_kwargs)
except (BadSubscriptionError, Error) as e:
logger.error("Error subscribing to NATS subject: {}", e)
raise e
return await pull_subscribe(**subscribe_kwargs)
except Exception: # pylint: disable=W0703
logger.exception("Unknown error subscribing to NATS subject")
logger.exception("Error subscribing to NATS subject")
raise
logger.debug("Subscribed to NATS subject")

return subscription
def _retry_log(self, retry_state: RetryCallState):
"""Custom logging for retry attempts."""
if retry_state.outcome.failed:
exception = retry_state.outcome.exception()
logger.warning(
"Retry attempt {} failed due to {}: {}",
retry_state.attempt_number,
type(exception).__name__,
exception,
)

async def check_jetstream(self):
try:
Expand Down
39 changes: 39 additions & 0 deletions endorser/tests/test_endorser_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from nats.errors import BadSubscriptionError, Error, TimeoutError
from nats.js.client import JetStreamContext
from nats.js.errors import FetchTimeoutError
from tenacity import RetryCallState

from endorser.services.endorsement_processor import EndorsementProcessor
from shared.constants import (
Expand Down Expand Up @@ -417,3 +418,41 @@ async def test_check_jetstream_exception(endorsement_processor_mock):
mock_logger.exception.assert_called_once_with(
"Caught exception while checking jetstream status"
)


class MockFuture:
"""A mock class to simulate the behavior of a Future object."""

def __init__(self, exception=None):
self._exception = exception

@property
def failed(self):
return self._exception is not None

def exception(self):
return self._exception


def test_retry_log(endorsement_processor_mock): # pylint: disable=redefined-outer-name

# Mock a retry state
mock_retry_state = MagicMock(spec=RetryCallState)

# Mock the outcome attribute with a Future-like object
mock_retry_state.outcome = MockFuture(exception=ValueError("Test retry exception"))
mock_retry_state.attempt_number = 3 # Retry attempt number

# Patch the logger to capture log calls
with patch("endorser.services.endorsement_processor.logger") as mock_logger:
endorsement_processor_mock._retry_log( # pylint: disable=protected-access
retry_state=mock_retry_state
)

# Assert that logger.warning was called with the expected message
mock_logger.warning.assert_called_once_with(
"Retry attempt {} failed due to {}: {}",
3,
"ValueError",
mock_retry_state.outcome.exception(),
)
14 changes: 0 additions & 14 deletions waypoint/tests/services/test_nats_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,17 +357,3 @@ async def test_check_jetstream_exception(

assert result == {"is_working": False}
mock_nats_client.account_info.assert_called_once()


class MockFuture:
"""A mock class to simulate the behavior of a Future object."""

def __init__(self, exception=None):
self._exception = exception

@property
def failed(self):
return self._exception is not None

def exception(self):
return self._exception

0 comments on commit 3eed833

Please sign in to comment.