Skip to content

Commit

Permalink
server: send emails as a background tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
frankie567 committed Dec 20, 2024
1 parent 3bb9865 commit ef797ee
Show file tree
Hide file tree
Showing 20 changed files with 174 additions and 166 deletions.
7 changes: 2 additions & 5 deletions server/polar/customer_portal/service/customer_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from polar.customer.service import customer as customer_service
from polar.customer_session.service import customer_session as customer_session_service
from polar.email.renderer import get_email_renderer
from polar.email.sender import get_email_sender
from polar.email.sender import enqueue_email
from polar.exceptions import PolarError
from polar.kit.crypto import get_token_hash
from polar.kit.utils import utc_now
Expand Down Expand Up @@ -74,7 +74,6 @@ async def send(
email_renderer = get_email_renderer(
{"customer_portal": "polar.customer_portal"}
)
email_sender = get_email_sender()

customer = customer_session_code.customer
organization = await organization_service.get(
Expand All @@ -99,9 +98,7 @@ async def send(
},
)

await email_sender.send_to_user(
to_email_addr=customer.email, subject=subject, html_content=body
)
enqueue_email(to_email_addr=customer.email, subject=subject, html_content=body)

async def authenticate(
self, session: AsyncSession, code: str
Expand Down
66 changes: 54 additions & 12 deletions server/polar/email/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

from polar.config import EmailSender as EmailSenderType
from polar.config import settings
from polar.exceptions import PolarError
from polar.logging import Logger
from polar.worker import enqueue_job

log: Logger = structlog.get_logger()

Expand All @@ -16,9 +18,17 @@
DEFAULT_REPLY_TO_EMAIL_ADDRESS = "[email protected]"


class EmailSenderError(PolarError): ...


class SendEmailError(EmailSenderError):
def __init__(self, message: str) -> None:
super().__init__(message)


class EmailSender(ABC):
@abstractmethod
async def send_to_user(
async def send(
self,
*,
to_email_addr: str,
Expand All @@ -34,7 +44,7 @@ async def send_to_user(


class LoggingEmailSender(EmailSender):
async def send_to_user(
async def send(
self,
*,
to_email_addr: str,
Expand Down Expand Up @@ -62,7 +72,7 @@ def __init__(self) -> None:
super().__init__()
self._api_key = settings.RESEND_API_KEY

async def send_to_user(
async def send(
self,
*,
to_email_addr: str,
Expand All @@ -85,15 +95,24 @@ async def send_to_user(
payload["reply_to"] = f"{reply_to_name} <{reply_to_email_addr}>"

async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.resend.com/emails",
headers={
"Authorization": f"Bearer {self._api_key}",
},
json=payload,
)
response.raise_for_status()
email = response.json()
try:
response = await client.post(
"https://api.resend.com/emails",
headers={
"Authorization": f"Bearer {self._api_key}",
},
json=payload,
)
response.raise_for_status()
email = response.json()
except httpx.HTTPError as e:
log.warning(
"resend.send_error",
to_email_addr=to_email_addr,
subject=subject,
error=e,
)
raise SendEmailError(str(e)) from e

log.info(
"resend.send",
Expand All @@ -109,3 +128,26 @@ def get_email_sender() -> EmailSender:

# Logging in development
return LoggingEmailSender()


def enqueue_email(
to_email_addr: str,
subject: str,
html_content: str,
from_name: str = DEFAULT_FROM_NAME,
from_email_addr: str = DEFAULT_FROM_EMAIL_ADDRESS,
email_headers: dict[str, str] = {},
reply_to_name: str | None = DEFAULT_REPLY_TO_NAME,
reply_to_email_addr: str | None = DEFAULT_REPLY_TO_EMAIL_ADDRESS,
) -> None:
enqueue_job(
"email.send",
to_email_addr=to_email_addr,
subject=subject,
html_content=html_content,
from_name=from_name,
from_email_addr=from_email_addr,
email_headers=email_headers,
reply_to_name=reply_to_name,
reply_to_email_addr=reply_to_email_addr,
)
35 changes: 35 additions & 0 deletions server/polar/email/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from arq import Retry

from polar.worker import JobContext, PolarWorkerContext, compute_backoff, task

from .sender import SendEmailError, get_email_sender


@task("email.send", max_tries=10)
async def email_send(
ctx: JobContext,
to_email_addr: str,
subject: str,
html_content: str,
from_name: str,
from_email_addr: str,
email_headers: dict[str, str],
reply_to_name: str | None,
reply_to_email_addr: str | None,
polar_context: PolarWorkerContext,
) -> None:
email_sender = get_email_sender()

try:
await email_sender.send(
to_email_addr=to_email_addr,
subject=subject,
html_content=html_content,
from_name=from_name,
from_email_addr=from_email_addr,
email_headers=email_headers,
reply_to_name=reply_to_name,
reply_to_email_addr=reply_to_email_addr,
)
except SendEmailError as e:
raise Retry(compute_backoff(ctx["job_try"])) from e
5 changes: 2 additions & 3 deletions server/polar/email_update/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from polar.auth.models import AuthSubject
from polar.config import settings
from polar.email.renderer import get_email_renderer
from polar.email.sender import get_email_sender
from polar.email.sender import enqueue_email
from polar.exceptions import PolarError, PolarRequestValidationError
from polar.kit.crypto import generate_token_hash_pair, get_token_hash
from polar.kit.extensions.sqlalchemy import sql
Expand Down Expand Up @@ -75,7 +75,6 @@ async def send_email(
extra_url_params: dict[str, str] = {},
) -> None:
email_renderer = get_email_renderer({"email_update": "polar.email_update"})
email_sender = get_email_sender()

delta = email_update_record.expires_at - utc_now()
token_lifetime_minutes = int(ceil(delta.seconds / 60))
Expand All @@ -91,7 +90,7 @@ async def send_email(
},
)

await email_sender.send_to_user(
enqueue_email(
to_email_addr=email_update_record.email, subject=subject, html_content=body
)

Expand Down
5 changes: 2 additions & 3 deletions server/polar/magic_link/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from polar.config import settings
from polar.email.renderer import get_email_renderer
from polar.email.sender import get_email_sender
from polar.email.sender import enqueue_email
from polar.exceptions import PolarError
from polar.kit.crypto import generate_token_hash_pair, get_token_hash
from polar.kit.extensions.sqlalchemy import sql
Expand Down Expand Up @@ -69,7 +69,6 @@ async def send(
extra_url_params: dict[str, str] = {},
) -> None:
email_renderer = get_email_renderer({"magic_link": "polar.magic_link"})
email_sender = get_email_sender()

delta = magic_link.expires_at - utc_now()
token_lifetime_minutes = int(ceil(delta.seconds / 60))
Expand All @@ -85,7 +84,7 @@ async def send(
},
)

await email_sender.send_to_user(
enqueue_email(
to_email_addr=magic_link.user_email, subject=subject, html_content=body
)

Expand Down
6 changes: 2 additions & 4 deletions server/polar/notifications/tasks/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@

import structlog

from polar.email.sender import get_email_sender
from polar.email.sender import enqueue_email
from polar.notifications.service import notifications
from polar.user.service.user import user as user_service
from polar.worker import AsyncSessionMaker, JobContext, PolarWorkerContext, task

log = structlog.get_logger()

sender = get_email_sender()


@task("notifications.send")
async def notifications_send(
Expand Down Expand Up @@ -48,7 +46,7 @@ async def notifications_send(
)
return

await sender.send_to_user(
enqueue_email(
to_email_addr=user.email,
subject=f"[Polar] {subject}",
html_content=body,
Expand Down
5 changes: 2 additions & 3 deletions server/polar/oauth2/service/oauth2_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from polar.auth.models import AuthSubject
from polar.email.renderer import get_email_renderer
from polar.email.sender import get_email_sender
from polar.email.sender import enqueue_email
from polar.enums import TokenType
from polar.exceptions import PolarError
from polar.kit.crypto import generate_token
Expand Down Expand Up @@ -93,7 +93,6 @@ async def revoke_leaked(
session.add(client)

email_renderer = get_email_renderer({"oauth2": "polar.oauth2"})
email_sender = get_email_sender()

subject, body = email_renderer.render_from_template(
subject,
Expand All @@ -107,7 +106,7 @@ async def revoke_leaked(
},
)

await email_sender.send_to_user(
enqueue_email(
to_email_addr=client.user.email, subject=subject, html_content=body
)

Expand Down
5 changes: 2 additions & 3 deletions server/polar/oauth2/service/oauth2_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from polar.config import settings
from polar.email.renderer import get_email_renderer
from polar.email.sender import get_email_sender
from polar.email.sender import enqueue_email
from polar.enums import TokenType
from polar.exceptions import PolarError
from polar.kit.crypto import get_token_hash
Expand Down Expand Up @@ -85,7 +85,6 @@ async def revoke_leaked(

# Notify
email_renderer = get_email_renderer({"oauth2": "polar.oauth2"})
email_sender = get_email_sender()

recipients: list[str]
sub = oauth2_token.sub
Expand Down Expand Up @@ -113,7 +112,7 @@ async def revoke_leaked(
)

for recipient in recipients:
await email_sender.send_to_user(
enqueue_email(
to_email_addr=recipient, subject=subject, html_content=body
)

Expand Down
7 changes: 2 additions & 5 deletions server/polar/order/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from polar.customer_session.service import customer_session as customer_session_service
from polar.discount.service import discount as discount_service
from polar.email.renderer import get_email_renderer
from polar.email.sender import get_email_sender
from polar.email.sender import enqueue_email
from polar.exceptions import PolarError
from polar.held_balance.service import held_balance as held_balance_service
from polar.integrations.stripe.schemas import ProductType
Expand Down Expand Up @@ -501,7 +501,6 @@ async def send_confirmation_email(
self, session: AsyncSession, organization: Organization, order: Order
) -> None:
email_renderer = get_email_renderer({"order": "polar.order"})
email_sender = get_email_sender()

product = order.product
customer = order.customer
Expand All @@ -522,9 +521,7 @@ async def send_confirmation_email(
},
)

await email_sender.send_to_user(
to_email_addr=customer.email, subject=subject, html_content=body
)
enqueue_email(to_email_addr=customer.email, subject=subject, html_content=body)

async def update_product_benefits_grants(
self, session: AsyncSession, product: Product
Expand Down
5 changes: 2 additions & 3 deletions server/polar/personal_access_token/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from polar.auth.models import AuthSubject
from polar.config import settings
from polar.email.renderer import get_email_renderer
from polar.email.sender import get_email_sender
from polar.email.sender import enqueue_email
from polar.enums import TokenType
from polar.integrations.loops.service import loops as loops_service
from polar.kit.crypto import generate_token_hash_pair, get_token_hash
Expand Down Expand Up @@ -133,7 +133,6 @@ async def revoke_leaked(
email_renderer = get_email_renderer(
{"personal_access_token": "polar.personal_access_token"}
)
email_sender = get_email_sender()

subject, body = email_renderer.render_from_template(
"Security Notice - Your Polar Personal Access Token has been leaked",
Expand All @@ -146,7 +145,7 @@ async def revoke_leaked(
},
)

await email_sender.send_to_user(
enqueue_email(
to_email_addr=personal_access_token.user.email,
subject=subject,
html_content=body,
Expand Down
6 changes: 3 additions & 3 deletions server/polar/subscription/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from polar.customer_session.service import customer_session as customer_session_service
from polar.discount.service import discount as discount_service
from polar.email.renderer import get_email_renderer
from polar.email.sender import get_email_sender
from polar.email.sender import enqueue_email, get_email_sender
from polar.enums import SubscriptionRecurringInterval
from polar.exceptions import PolarError
from polar.integrations.stripe.service import stripe as stripe_service
Expand Down Expand Up @@ -715,7 +715,7 @@ async def send_confirmation_email(
},
)

await email_sender.send_to_user(
enqueue_email(
to_email_addr=subscription.customer.email,
subject=subject,
html_content=body,
Expand Down Expand Up @@ -748,7 +748,7 @@ async def send_cancellation_email(
},
)

await email_sender.send_to_user(
enqueue_email(
to_email_addr=subscription.customer.email,
subject=subject,
html_content=body,
Expand Down
2 changes: 2 additions & 0 deletions server/polar/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from polar.benefit import tasks as benefit
from polar.checkout import tasks as checkout
from polar.customer_session import tasks as customer_session
from polar.email import tasks as email
from polar.email_update import tasks as email_update
from polar.eventstream import tasks as eventstream
from polar.integrations.github import tasks as github
Expand All @@ -24,6 +25,7 @@
"benefit",
"checkout",
"customer_session",
"email",
"email_update",
"eventstream",
"github",
Expand Down
Loading

0 comments on commit ef797ee

Please sign in to comment.