Skip to content

Commit

Permalink
🚑 Hotfix: handle redis pubsub connection error (#651)
Browse files Browse the repository at this point in the history
* 🚑 Hotfix: handle redis pubsub connection error

* 🎨 parameterise the retry_duration
  • Loading branch information
ff137 authored Feb 15, 2024
1 parent 81da4e8 commit cb9d783
Showing 1 changed file with 76 additions and 47 deletions.
123 changes: 76 additions & 47 deletions webhooks/dependencies/sse_manager.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import sys
import time
from collections import defaultdict as ddict
from datetime import datetime, timedelta
from typing import Any, AsyncGenerator, List, NoReturn, Tuple
from typing import Any, AsyncGenerator, Dict, List, NoReturn, Tuple

import aioredis
from pydantic import ValidationError

from shared.constants import (
Expand Down Expand Up @@ -65,62 +67,89 @@ def _start_background_tasks(self) -> None:
asyncio.create_task(self._process_incoming_events())
asyncio.create_task(self._cleanup_cache())

async def _listen_for_new_events(self) -> NoReturn:
async def _listen_for_new_events(
self, max_retries=5, retry_duration=0.33
) -> NoReturn:
"""
Listen on redis pubsub channel for new SSE events; read the event and add to queue
Listen on redis pubsub channel for new SSE events; read the event and add to queue.
Terminates after exceeding max_retries connection attempts.
"""
pubsub = self.redis_service.redis.pubsub()
retry_count = 0

await pubsub.subscribe(self.redis_service.sse_event_pubsub_channel)
while retry_count < max_retries:
try:
pubsub = self.redis_service.redis.pubsub()

while True:
message = await pubsub.get_message(ignore_subscribe_messages=True)
if message:
try:
message_data = message["data"]
if isinstance(message_data, bytes):
message_data = message_data.decode("utf-8")
await pubsub.subscribe(self.redis_service.sse_event_pubsub_channel)

wallet_id, timestamp_ns_str = message_data.split(":")
timestamp_ns = int(timestamp_ns_str)
# Reset retry_count upon successful connection
retry_count = 0

# Fetch the event with the exact timestamp from the sorted set
json_events = await self.redis_service.get_json_events_by_timestamp(
wallet_id, timestamp_ns, timestamp_ns
)
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True)
if message:
await self._process_redis_event(message)
else:
await asyncio.sleep(0.01) # Prevent a busy loop if no message
except aioredis.ConnectionError as e:
logger.error(f"ConnectionError detected: {e}.")
except Exception as e: # General exception catch
logger.exception(f"Unexpected error: {e}.")
finally:
retry_count += 1
logger.warning(
f"Attempt #{retry_count} to reconnect in {retry_duration}s ..."
)
await asyncio.sleep(retry_duration) # Wait a bit before retrying

for json_event in json_events:
try:
parsed_event = (
CloudApiWebhookEventGeneric.model_validate_json(
json_event
)
)
topic = parsed_event.topic
# If the loop exits due to retry limit exceeded
logger.critical(
f"Failed to connect to Redis after {max_retries} attempts. Terminating service."
)
sys.exit(1) # todo: Not graceful

# Add event to SSE queue for processing
await self.incoming_events.put(parsed_event)
async def _process_redis_event(self, message: Dict[str, Any]) -> None:
try:
message_data = message["data"]
if isinstance(message_data, bytes):
message_data = message_data.decode("utf-8")

# Also publish event to websocket
# Doing it here makes websockets stateless as well
await publish_event_on_websocket(
event_json=json_event,
wallet_id=wallet_id,
topic=topic,
)
except ValidationError as e:
error_message = (
"Could not parse json event retreived from redis "
f"into a `CloudApiWebhookEventGeneric`. Error: `{str(e)}`."
)
logger.error(error_message)
wallet_id, timestamp_ns_str = message_data.split(":")
timestamp_ns = int(timestamp_ns_str)

except (KeyError, ValueError) as e:
logger.error("Could not unpack redis pubsub message: {}", e)
except Exception:
logger.exception("Exception caught while processing redis event")
else:
await asyncio.sleep(0.01) # Prevent a busy loop if no new messages
# Fetch the event with the exact timestamp from the sorted set
json_events = await self.redis_service.get_json_events_by_timestamp(
wallet_id, timestamp_ns, timestamp_ns
)

for json_event in json_events:
try:
parsed_event = CloudApiWebhookEventGeneric.model_validate_json(
json_event
)
topic = parsed_event.topic

# Add event to SSE queue for processing
await self.incoming_events.put(parsed_event)

# Also publish event to websocket
# Doing it here makes websockets stateless as well
await publish_event_on_websocket(
event_json=json_event,
wallet_id=wallet_id,
topic=topic,
)
except ValidationError as e:
error_message = (
"Could not parse json event retreived from redis "
f"into a `CloudApiWebhookEventGeneric`. Error: `{str(e)}`."
)
logger.error(error_message)

except (KeyError, ValueError) as e:
logger.error("Could not unpack redis pubsub message: {}", e)
except Exception:
logger.exception("Exception caught while processing redis event")

async def _backfill_events(self) -> None:
"""
Expand Down

0 comments on commit cb9d783

Please sign in to comment.