Skip to content

Commit

Permalink
✨ Implement retry logic in RichAsyncClient (#1141)
Browse files Browse the repository at this point in the history
* 🎨 deduplicate RichAsyncClient

* ✨ retry on 503 errors

* ✨ make retry logic more configurable

* 🎨 docstrings

* 🎨

* 🔊 log warning when retrying
  • Loading branch information
ff137 authored Oct 29, 2024
1 parent ff08431 commit ee8912d
Showing 1 changed file with 57 additions and 53 deletions.
110 changes: 57 additions & 53 deletions shared/util/rich_async_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import ssl
from typing import Optional
from typing import List, Optional

from fastapi import HTTPException
from httpx import AsyncClient, HTTPStatusError, Response
Expand All @@ -10,74 +11,77 @@
ssl_context = ssl.create_default_context()


# Async Client with built in error handling and re-using SSL certs
class RichAsyncClient(AsyncClient):
"""Async Client that extends httpx.AsyncClient with built-in error handling and SSL cert reuse.
- Reuses SSL context for better performance
- Retries requests on 503 Service Unavailable errors
- Raises HTTPException with detailed error messages
Args:
name (Optional[str]): Optional name for the client, prepended to exceptions.
verify: SSL certificate verification context.
raise_status_error (bool): Whether to raise an error for 4xx and 5xx status codes.
retries (int): Number of retry attempts for failed requests.
retry_on (List[int]): List of HTTP status codes that should trigger a retry.
"""

def __init__(
self,
*args,
name: Optional[str] = None,
verify=ssl_context,
raise_status_error=True,
retries: int = 3,
retry_on: List[int] = [503],
**kwargs,
) -> None:
super().__init__(verify=verify, *args, **kwargs)
self.name = (
name + " - HTTP" if name else "HTTP"
) # prepend to exception messages to add context
self.name = name + " - HTTP" if name else "HTTP" # prepended to exceptions
self.raise_status_error = raise_status_error
self.retries = retries
self.retry_on = retry_on

async def post(self, url: str, **kwargs) -> Response:
try:
response = await super().post(url, **kwargs)
if self.raise_status_error:
response.raise_for_status() # Raise exception for 4xx and 5xx status codes
except HTTPStatusError as e:
code = e.response.status_code
message = e.response.text
log_message = f"{self.name} POST `{url}` failed. Status code: {code}. Response: `{message}`."
logger.error(log_message)

raise HTTPException(status_code=code, detail=message) from e
async def _handle_response(self, response: Response) -> Response:
if self.raise_status_error:
response.raise_for_status() # Raise exception for 4xx and 5xx status codes
return response

async def get(self, url: str, **kwargs) -> Response:
try:
response = await super().get(url, **kwargs)
if self.raise_status_error:
response.raise_for_status()
except HTTPStatusError as e:
code = e.response.status_code
message = e.response.text
log_message = f"{self.name} GET `{url}` failed. Status code: {code}. Response: `{message}`."
logger.error(log_message)
async def _handle_error(self, e: HTTPStatusError, url: str, method: str) -> None:
code = e.response.status_code
message = e.response.text
log_message = (
f"{self.name} {method} `{url}` failed. "
f"Status code: {code}. Response: `{message}`."
)
logger.error(log_message)
raise HTTPException(status_code=code, detail=message) from e

raise HTTPException(status_code=code, detail=message) from e
return response
async def _request_with_retries(self, method: str, url: str, **kwargs) -> Response:
for attempt in range(self.retries):
try:
response = await getattr(super(), method)(url, **kwargs)
return await self._handle_response(response)
except HTTPStatusError as e:
code = e.response.status_code
if code in self.retry_on and attempt < self.retries - 1:
log_message = (
f"{self.name} {method} `{url}` failed with status code {code}. "
f"Retrying attempt {attempt + 1}/{self.retries}."
)
logger.warning(log_message)
await asyncio.sleep(0.5) # Wait before retrying
continue # Retry the request
await self._handle_error(e, url, method)

async def delete(self, url: str, **kwargs) -> Response:
try:
response = await super().delete(url, **kwargs)
if self.raise_status_error:
response.raise_for_status()
except HTTPStatusError as e:
code = e.response.status_code
message = e.response.text
log_message = f"{self.name} DELETE `{url}` failed. Status code: {code}. Response: `{message}`."
logger.error(log_message)
async def post(self, url: str, **kwargs) -> Response:
return await self._request_with_retries("post", url, **kwargs)

raise HTTPException(status_code=code, detail=message) from e
return response
async def get(self, url: str, **kwargs) -> Response:
return await self._request_with_retries("get", url, **kwargs)

async def put(self, url: str, **kwargs) -> Response:
try:
response = await super().put(url, **kwargs)
if self.raise_status_error:
response.raise_for_status()
except HTTPStatusError as e:
code = e.response.status_code
message = e.response.text
log_message = f"{self.name} PUT `{url}` failed. Status code: {code}. Response: `{message}`."
logger.error(log_message)
async def delete(self, url: str, **kwargs) -> Response:
return await self._request_with_retries("delete", url, **kwargs)

raise HTTPException(status_code=code, detail=message) from e
return response
async def put(self, url: str, **kwargs) -> Response:
return await self._request_with_retries("put", url, **kwargs)

0 comments on commit ee8912d

Please sign in to comment.