diff --git a/server/polar/customer_portal/service/customer_session.py b/server/polar/customer_portal/service/customer_session.py index dc52e829fb..56577c553b 100644 --- a/server/polar/customer_portal/service/customer_session.py +++ b/server/polar/customer_portal/service/customer_session.py @@ -10,7 +10,7 @@ from polar.customer.service import customer as customer_service from polar.customer_session.service import customer_session as customer_session_service from polar.email.renderer import get_email_renderer -from polar.email.sender import get_email_sender +from polar.email.sender import enqueue_email from polar.exceptions import PolarError from polar.kit.crypto import get_token_hash from polar.kit.utils import utc_now @@ -74,7 +74,6 @@ async def send( email_renderer = get_email_renderer( {"customer_portal": "polar.customer_portal"} ) - email_sender = get_email_sender() customer = customer_session_code.customer organization = await organization_service.get( @@ -99,9 +98,7 @@ async def send( }, ) - await email_sender.send_to_user( - to_email_addr=customer.email, subject=subject, html_content=body - ) + enqueue_email(to_email_addr=customer.email, subject=subject, html_content=body) async def authenticate( self, session: AsyncSession, code: str diff --git a/server/polar/email/sender.py b/server/polar/email/sender.py index b6586d9393..ae076c8412 100644 --- a/server/polar/email/sender.py +++ b/server/polar/email/sender.py @@ -6,7 +6,9 @@ from polar.config import EmailSender as EmailSenderType from polar.config import settings +from polar.exceptions import PolarError from polar.logging import Logger +from polar.worker import enqueue_job log: Logger = structlog.get_logger() @@ -16,9 +18,17 @@ DEFAULT_REPLY_TO_EMAIL_ADDRESS = "support@polar.sh" +class EmailSenderError(PolarError): ... + + +class SendEmailError(EmailSenderError): + def __init__(self, message: str) -> None: + super().__init__(message) + + class EmailSender(ABC): @abstractmethod - async def send_to_user( + async def send( self, *, to_email_addr: str, @@ -34,7 +44,7 @@ async def send_to_user( class LoggingEmailSender(EmailSender): - async def send_to_user( + async def send( self, *, to_email_addr: str, @@ -62,7 +72,7 @@ def __init__(self) -> None: super().__init__() self._api_key = settings.RESEND_API_KEY - async def send_to_user( + async def send( self, *, to_email_addr: str, @@ -85,15 +95,24 @@ async def send_to_user( payload["reply_to"] = f"{reply_to_name} <{reply_to_email_addr}>" async with httpx.AsyncClient() as client: - response = await client.post( - "https://api.resend.com/emails", - headers={ - "Authorization": f"Bearer {self._api_key}", - }, - json=payload, - ) - response.raise_for_status() - email = response.json() + try: + response = await client.post( + "https://api.resend.com/emails", + headers={ + "Authorization": f"Bearer {self._api_key}", + }, + json=payload, + ) + response.raise_for_status() + email = response.json() + except httpx.HTTPError as e: + log.warning( + "resend.send_error", + to_email_addr=to_email_addr, + subject=subject, + error=e, + ) + raise SendEmailError(str(e)) from e log.info( "resend.send", @@ -109,3 +128,26 @@ def get_email_sender() -> EmailSender: # Logging in development return LoggingEmailSender() + + +def enqueue_email( + to_email_addr: str, + subject: str, + html_content: str, + from_name: str = DEFAULT_FROM_NAME, + from_email_addr: str = DEFAULT_FROM_EMAIL_ADDRESS, + email_headers: dict[str, str] = {}, + reply_to_name: str | None = DEFAULT_REPLY_TO_NAME, + reply_to_email_addr: str | None = DEFAULT_REPLY_TO_EMAIL_ADDRESS, +) -> None: + enqueue_job( + "email.send", + to_email_addr=to_email_addr, + subject=subject, + html_content=html_content, + from_name=from_name, + from_email_addr=from_email_addr, + email_headers=email_headers, + reply_to_name=reply_to_name, + reply_to_email_addr=reply_to_email_addr, + ) diff --git a/server/polar/email/tasks.py b/server/polar/email/tasks.py new file mode 100644 index 0000000000..a416eb1904 --- /dev/null +++ b/server/polar/email/tasks.py @@ -0,0 +1,35 @@ +from arq import Retry + +from polar.worker import JobContext, PolarWorkerContext, compute_backoff, task + +from .sender import SendEmailError, get_email_sender + + +@task("email.send", max_tries=10) +async def email_send( + ctx: JobContext, + to_email_addr: str, + subject: str, + html_content: str, + from_name: str, + from_email_addr: str, + email_headers: dict[str, str], + reply_to_name: str | None, + reply_to_email_addr: str | None, + polar_context: PolarWorkerContext, +) -> None: + email_sender = get_email_sender() + + try: + await email_sender.send( + to_email_addr=to_email_addr, + subject=subject, + html_content=html_content, + from_name=from_name, + from_email_addr=from_email_addr, + email_headers=email_headers, + reply_to_name=reply_to_name, + reply_to_email_addr=reply_to_email_addr, + ) + except SendEmailError as e: + raise Retry(compute_backoff(ctx["job_try"])) from e diff --git a/server/polar/email_update/service.py b/server/polar/email_update/service.py index 8331df9374..09bc125424 100644 --- a/server/polar/email_update/service.py +++ b/server/polar/email_update/service.py @@ -8,7 +8,7 @@ from polar.auth.models import AuthSubject from polar.config import settings from polar.email.renderer import get_email_renderer -from polar.email.sender import get_email_sender +from polar.email.sender import enqueue_email from polar.exceptions import PolarError, PolarRequestValidationError from polar.kit.crypto import generate_token_hash_pair, get_token_hash from polar.kit.extensions.sqlalchemy import sql @@ -75,7 +75,6 @@ async def send_email( extra_url_params: dict[str, str] = {}, ) -> None: email_renderer = get_email_renderer({"email_update": "polar.email_update"}) - email_sender = get_email_sender() delta = email_update_record.expires_at - utc_now() token_lifetime_minutes = int(ceil(delta.seconds / 60)) @@ -91,7 +90,7 @@ async def send_email( }, ) - await email_sender.send_to_user( + enqueue_email( to_email_addr=email_update_record.email, subject=subject, html_content=body ) diff --git a/server/polar/magic_link/service.py b/server/polar/magic_link/service.py index b50007efa7..8a70c05057 100644 --- a/server/polar/magic_link/service.py +++ b/server/polar/magic_link/service.py @@ -7,7 +7,7 @@ from polar.config import settings from polar.email.renderer import get_email_renderer -from polar.email.sender import get_email_sender +from polar.email.sender import enqueue_email from polar.exceptions import PolarError from polar.kit.crypto import generate_token_hash_pair, get_token_hash from polar.kit.extensions.sqlalchemy import sql @@ -69,7 +69,6 @@ async def send( extra_url_params: dict[str, str] = {}, ) -> None: email_renderer = get_email_renderer({"magic_link": "polar.magic_link"}) - email_sender = get_email_sender() delta = magic_link.expires_at - utc_now() token_lifetime_minutes = int(ceil(delta.seconds / 60)) @@ -85,7 +84,7 @@ async def send( }, ) - await email_sender.send_to_user( + enqueue_email( to_email_addr=magic_link.user_email, subject=subject, html_content=body ) diff --git a/server/polar/notifications/tasks/email.py b/server/polar/notifications/tasks/email.py index 9fb6c2f9ea..f03f898ca5 100644 --- a/server/polar/notifications/tasks/email.py +++ b/server/polar/notifications/tasks/email.py @@ -2,15 +2,13 @@ import structlog -from polar.email.sender import get_email_sender +from polar.email.sender import enqueue_email from polar.notifications.service import notifications from polar.user.service.user import user as user_service from polar.worker import AsyncSessionMaker, JobContext, PolarWorkerContext, task log = structlog.get_logger() -sender = get_email_sender() - @task("notifications.send") async def notifications_send( @@ -48,7 +46,7 @@ async def notifications_send( ) return - await sender.send_to_user( + enqueue_email( to_email_addr=user.email, subject=f"[Polar] {subject}", html_content=body, diff --git a/server/polar/oauth2/service/oauth2_client.py b/server/polar/oauth2/service/oauth2_client.py index 137a008629..96f81e0a85 100644 --- a/server/polar/oauth2/service/oauth2_client.py +++ b/server/polar/oauth2/service/oauth2_client.py @@ -7,7 +7,7 @@ from polar.auth.models import AuthSubject from polar.email.renderer import get_email_renderer -from polar.email.sender import get_email_sender +from polar.email.sender import enqueue_email from polar.enums import TokenType from polar.exceptions import PolarError from polar.kit.crypto import generate_token @@ -93,7 +93,6 @@ async def revoke_leaked( session.add(client) email_renderer = get_email_renderer({"oauth2": "polar.oauth2"}) - email_sender = get_email_sender() subject, body = email_renderer.render_from_template( subject, @@ -107,7 +106,7 @@ async def revoke_leaked( }, ) - await email_sender.send_to_user( + enqueue_email( to_email_addr=client.user.email, subject=subject, html_content=body ) diff --git a/server/polar/oauth2/service/oauth2_token.py b/server/polar/oauth2/service/oauth2_token.py index fa8575940c..b94c8bc73f 100644 --- a/server/polar/oauth2/service/oauth2_token.py +++ b/server/polar/oauth2/service/oauth2_token.py @@ -8,7 +8,7 @@ from polar.config import settings from polar.email.renderer import get_email_renderer -from polar.email.sender import get_email_sender +from polar.email.sender import enqueue_email from polar.enums import TokenType from polar.exceptions import PolarError from polar.kit.crypto import get_token_hash @@ -85,7 +85,6 @@ async def revoke_leaked( # Notify email_renderer = get_email_renderer({"oauth2": "polar.oauth2"}) - email_sender = get_email_sender() recipients: list[str] sub = oauth2_token.sub @@ -113,7 +112,7 @@ async def revoke_leaked( ) for recipient in recipients: - await email_sender.send_to_user( + enqueue_email( to_email_addr=recipient, subject=subject, html_content=body ) diff --git a/server/polar/order/service.py b/server/polar/order/service.py index 53973b9510..fb8cbbb053 100644 --- a/server/polar/order/service.py +++ b/server/polar/order/service.py @@ -17,7 +17,7 @@ from polar.customer_session.service import customer_session as customer_session_service from polar.discount.service import discount as discount_service from polar.email.renderer import get_email_renderer -from polar.email.sender import get_email_sender +from polar.email.sender import enqueue_email from polar.exceptions import PolarError from polar.held_balance.service import held_balance as held_balance_service from polar.integrations.stripe.schemas import ProductType @@ -501,7 +501,6 @@ async def send_confirmation_email( self, session: AsyncSession, organization: Organization, order: Order ) -> None: email_renderer = get_email_renderer({"order": "polar.order"}) - email_sender = get_email_sender() product = order.product customer = order.customer @@ -522,9 +521,7 @@ async def send_confirmation_email( }, ) - await email_sender.send_to_user( - to_email_addr=customer.email, subject=subject, html_content=body - ) + enqueue_email(to_email_addr=customer.email, subject=subject, html_content=body) async def update_product_benefits_grants( self, session: AsyncSession, product: Product diff --git a/server/polar/personal_access_token/service.py b/server/polar/personal_access_token/service.py index 16c65979ed..a8d426308e 100644 --- a/server/polar/personal_access_token/service.py +++ b/server/polar/personal_access_token/service.py @@ -9,7 +9,7 @@ from polar.auth.models import AuthSubject from polar.config import settings from polar.email.renderer import get_email_renderer -from polar.email.sender import get_email_sender +from polar.email.sender import enqueue_email from polar.enums import TokenType from polar.integrations.loops.service import loops as loops_service from polar.kit.crypto import generate_token_hash_pair, get_token_hash @@ -133,7 +133,6 @@ async def revoke_leaked( email_renderer = get_email_renderer( {"personal_access_token": "polar.personal_access_token"} ) - email_sender = get_email_sender() subject, body = email_renderer.render_from_template( "Security Notice - Your Polar Personal Access Token has been leaked", @@ -146,7 +145,7 @@ async def revoke_leaked( }, ) - await email_sender.send_to_user( + enqueue_email( to_email_addr=personal_access_token.user.email, subject=subject, html_content=body, diff --git a/server/polar/subscription/service.py b/server/polar/subscription/service.py index 2d3d1587b9..97b38209fe 100644 --- a/server/polar/subscription/service.py +++ b/server/polar/subscription/service.py @@ -21,7 +21,7 @@ from polar.customer_session.service import customer_session as customer_session_service from polar.discount.service import discount as discount_service from polar.email.renderer import get_email_renderer -from polar.email.sender import get_email_sender +from polar.email.sender import enqueue_email, get_email_sender from polar.enums import SubscriptionRecurringInterval from polar.exceptions import PolarError from polar.integrations.stripe.service import stripe as stripe_service @@ -715,7 +715,7 @@ async def send_confirmation_email( }, ) - await email_sender.send_to_user( + enqueue_email( to_email_addr=subscription.customer.email, subject=subject, html_content=body, @@ -748,7 +748,7 @@ async def send_cancellation_email( }, ) - await email_sender.send_to_user( + enqueue_email( to_email_addr=subscription.customer.email, subject=subject, html_content=body, diff --git a/server/polar/tasks.py b/server/polar/tasks.py index 06fa61360c..2b1739d27b 100644 --- a/server/polar/tasks.py +++ b/server/polar/tasks.py @@ -3,6 +3,7 @@ from polar.benefit import tasks as benefit from polar.checkout import tasks as checkout from polar.customer_session import tasks as customer_session +from polar.email import tasks as email from polar.email_update import tasks as email_update from polar.eventstream import tasks as eventstream from polar.integrations.github import tasks as github @@ -24,6 +25,7 @@ "benefit", "checkout", "customer_session", + "email", "email_update", "eventstream", "github", diff --git a/server/tests/fixtures/__init__.py b/server/tests/fixtures/__init__.py index 73059a724e..d9ca0a5d1f 100644 --- a/server/tests/fixtures/__init__.py +++ b/server/tests/fixtures/__init__.py @@ -3,6 +3,7 @@ from tests.fixtures.auth import * # noqa: F401, F403 from tests.fixtures.base import * # noqa: F401, F403 from tests.fixtures.database import * # noqa: F401, F403 +from tests.fixtures.email import * # noqa: F401, F403 from tests.fixtures.file import * # noqa: F401, F403 from tests.fixtures.locker import * # noqa: F401, F403 from tests.fixtures.predictable_objects import * # noqa: F401, F403 diff --git a/server/tests/fixtures/email.py b/server/tests/fixtures/email.py index a682b4a8be..cc20abba09 100644 --- a/server/tests/fixtures/email.py +++ b/server/tests/fixtures/email.py @@ -15,19 +15,18 @@ DEFAULT_FROM_NAME, DEFAULT_REPLY_TO_EMAIL_ADDRESS, DEFAULT_REPLY_TO_NAME, - EmailSender, ) if TYPE_CHECKING: from tempfile import _TemporaryFileWrapper as TemporaryFileWrapper -class WatcherEmailSender(EmailSender): +class WatcherEmailRenderer: def __init__(self) -> None: self._temporary_file: TemporaryFileWrapper[str] | None = None super().__init__() - def __enter__(self) -> "WatcherEmailSender": + def __enter__(self) -> "WatcherEmailRenderer": self._temporary_file = tempfile.NamedTemporaryFile(suffix=".html", mode="w") return self @@ -39,7 +38,7 @@ def __exit__( ) -> None: self.temporary_file.close() - async def send_to_user( + def __call__( self, *, to_email_addr: str, diff --git a/server/tests/magic_link/test_service.py b/server/tests/magic_link/test_service.py index 8c4386250c..c32d436917 100644 --- a/server/tests/magic_link/test_service.py +++ b/server/tests/magic_link/test_service.py @@ -1,7 +1,7 @@ import os from collections.abc import Callable, Coroutine from datetime import UTC, datetime, timedelta -from unittest.mock import ANY, AsyncMock, MagicMock +from unittest.mock import ANY, MagicMock from uuid import UUID import pytest @@ -21,6 +21,11 @@ ] +@pytest.fixture(autouse=True) +def enqueue_email_mock(mocker: MockerFixture) -> MagicMock: + return mocker.patch("polar.magic_link.service.enqueue_email", autospec=True) + + @pytest_asyncio.fixture async def generate_magic_link_token( save_fixture: SaveFixture, @@ -84,31 +89,18 @@ async def test_authenticate_expired_token( @pytest.mark.asyncio async def test_send( - generate_magic_link_token: GenerateMagicLinkToken, - mocker: MockerFixture, - session: AsyncSession, + generate_magic_link_token: GenerateMagicLinkToken, enqueue_email_mock: MagicMock ) -> None: - email_sender_mock = AsyncMock() - mocker.patch( - "polar.magic_link.service.get_email_sender", return_value=email_sender_mock - ) - - # then - session.expunge_all() - magic_link, _ = await generate_magic_link_token("user@example.com", None, None) await magic_link_service.send(magic_link, "TOKEN", "BASE_URL") - send_to_user_mock: MagicMock = email_sender_mock.send_to_user - assert send_to_user_mock.called - - send_to_user_mock.assert_called_once_with( + enqueue_email_mock.assert_called_once_with( to_email_addr="user@example.com", html_content=ANY, subject="Sign in to Polar" ) - sent_subject = send_to_user_mock.call_args_list[0].kwargs["subject"] - sent_body = send_to_user_mock.call_args_list[0].kwargs["html_content"] + sent_subject = enqueue_email_mock.call_args_list[0].kwargs["subject"] + sent_body = enqueue_email_mock.call_args_list[0].kwargs["html_content"] sent_content = f"{sent_subject}\n