From d3e14f7b1d57ca400b52f6004811cca8b063221a Mon Sep 17 00:00:00 2001 From: cl0ete Date: Thu, 5 Dec 2024 12:20:22 +0200 Subject: [PATCH] use tenacity for retry --- waypoint/services/nats_service.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/waypoint/services/nats_service.py b/waypoint/services/nats_service.py index 908e52a1c..3d4ddc44d 100644 --- a/waypoint/services/nats_service.py +++ b/waypoint/services/nats_service.py @@ -4,11 +4,12 @@ from datetime import datetime, timedelta, timezone import orjson +import tenacity from nats.errors import BadSubscriptionError, Error, TimeoutError from nats.js.api import ConsumerConfig, DeliverPolicy from nats.js.client import JetStreamContext from nats.js.errors import FetchTimeoutError -from retry import retry +from tenacity import RetryCallState from shared.constants import NATS_STATE_STREAM, NATS_STATE_SUBJECT from shared.log_config import get_logger @@ -17,6 +18,15 @@ logger = get_logger(__name__) +def retry_log(retry_state: RetryCallState): + """Custom logging for retry attempts.""" + if retry_state.outcome.failed: + exception = retry_state.outcome.exception() + logger.warning( + f"Retry attempt {retry_state.attempt_number} failed due to {type(exception).__name__}: {exception}" + ) + + class NatsEventsProcessor: """ Class to handle processing of NATS events. Calling the process_events method will @@ -55,7 +65,12 @@ async def _subscribe( opt_start_time=start_time, ) - @retry(TimeoutError, delay=1, backoff=2, max_delay=16, logger=logger) + @tenacity.retry( + retry=tenacity.retry_if_exception_type(TimeoutError), + wait=tenacity.wait_exponential(multiplier=1, max=16), + after=retry_log, + stop=tenacity.stop_never, + ) async def pull_subscribe(config, **kwargs): try: logger.trace(