Skip to content

Commit

Permalink
Persistent connection for email sending. (dart-lang#7239)
Browse files Browse the repository at this point in the history
  • Loading branch information
isoos authored Jan 25, 2024
1 parent c233d9c commit 5b72dca
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ AppEngine version, listed here to ease deployment and troubleshooting.
* Upgraded stable Dart analysis SDK to `3.2.5`.
* Upgraded stable Flutter analysis SDK to `3.16.8`.
* Upgraded preview Flutter analysis SDK to `3.19.0-0.2.pre`.
* Note: Sending emails keep persistent connection for a few minutes.

## `20240118t104800-all`
* Upgraded dependencies including `markdown: ^7.2.0`.
Expand Down
99 changes: 83 additions & 16 deletions app/lib/frontend/email_sender.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ abstract class EmailSender {

EmailSender createGmailRelaySender(
String serviceAccountEmail,
String impersonatedGSuiteUser,
http.Client authClient,
) =>
_GmailSmtpRelay(
serviceAccountEmail,
impersonatedGSuiteUser,
authClient,
);

Expand Down Expand Up @@ -103,7 +101,7 @@ Address? _toAddress(EmailAddress? input) =>
/// * Is configured for [domain-wide delegation][3] with the
/// `https://mail.google.com/` scope on the given GSuite.
///
/// This class then creates a JWT impersonating [_impersonatedGSuiteUser] and
/// This class then creates a JWT impersonating the sender and
/// signed by [_serviceAccountEmail] (using [signJwt API][4]). It then exchanges
/// this JWT for an `access_token` using [OAuth 2 for service accounts][4].
///
Expand All @@ -117,16 +115,17 @@ class _GmailSmtpRelay implements EmailSender {
static const _scopes = ['https://mail.google.com/'];

final String _serviceAccountEmail;
final String _impersonatedGSuiteUser;
final http.Client _authClient;

final _connectionsBySender = <String, Future<_GmailConnection>>{};
final _forceReconnectSenders = <String>{};

DateTime _accessTokenRefreshed = DateTime(0);
DateTime _backoffUntil = DateTime(0);
Future<String>? _accessToken;

_GmailSmtpRelay(
this._serviceAccountEmail,
this._impersonatedGSuiteUser,
this._authClient,
);

Expand All @@ -139,27 +138,35 @@ class _GmailSmtpRelay implements EmailSender {
'from ${message.from} '
'to ${message.recipients.join(', ')}';
_logger.info('Sending email: $debugHeader...');
final sender = message.from.email;
try {
await retry(
() async => send(
_toMessage(message),
await _getSmtpServer(),
timeout: Duration(seconds: 15),
),
() async {
final c = await _getConnection(sender);
try {
await c.connection.send(_toMessage(message));
} finally {
c.trackSentEmail(message.recipients.length);
}
},
retryIf: (e) =>
e is TimeoutException ||
e is IOException ||
e is SmtpClientCommunicationException ||
e is SmtpNoGreetingException,
delayFactor: Duration(seconds: 2),
maxAttempts: 2,
onRetry: (_) {
_forceReconnectSenders.add(sender);
},
);
} on SmtpMessageValidationException catch (e, st) {
_logger.info('Sending email failed: $debugHeader.', e, st);
throw EmailSenderException.invalid();
} on SmtpClientAuthenticationException catch (e, st) {
_logger.shout('Sending email failed due to invalid auth: $e', e, st);
_backoffUntil = clock.now().add(Duration(minutes: 2));
_forceReconnectSenders.add(sender);
_accessToken = null;
throw EmailSenderException.failed();
} on MailerException catch (e, st) {
Expand All @@ -168,22 +175,48 @@ class _GmailSmtpRelay implements EmailSender {
}
}

Future<SmtpServer> _getSmtpServer() async {
Future<_GmailConnection> _getConnection(String sender) async {
final connectionFuture = _connectionsBySender[sender];
final old = connectionFuture == null ? null : await connectionFuture;
final forceReconnect = _forceReconnectSenders.remove(sender);
if (!forceReconnect && old != null && !old.isExpired) {
return old;
}
final newConnectionFuture = Future.microtask(() async {
if (old != null) {
try {
await old.connection.close();
} catch (e, st) {
_logger.warning('Unable to close SMTP connection.', e, st);
}
}
return _GmailConnection(
PersistentConnection(
await _getSmtpServer(sender),
timeout: Duration(seconds: 15),
),
);
});
_connectionsBySender[sender] = newConnectionFuture;
return newConnectionFuture;
}

Future<SmtpServer> _getSmtpServer(String sender) async {
final maxAge = clock.now().subtract(Duration(minutes: 20));
if (_accessToken == null || _accessTokenRefreshed.isBefore(maxAge)) {
_accessToken = _createAccessToken();
_accessToken = _createAccessToken(sender);
_accessTokenRefreshed = clock.now();
}

// For documentation see:
// https://support.google.com/a/answer/176600?hl=en
return gmailRelaySaslXoauth2(_impersonatedGSuiteUser, await _accessToken!);
return gmailRelaySaslXoauth2(sender, await _accessToken!);
}

/// Create an access_token for [_impersonatedGSuiteUser] using the
/// Create an access_token for [sender] using the
/// [_serviceAccountEmail] configured for _domain-wide delegation_ following:
/// https://developers.google.com/identity/protocols/oauth2/service-account
Future<String> _createAccessToken() async {
Future<String> _createAccessToken(String sender) async {
final iam = iam_credentials.IAMCredentialsApi(_authClient);
final iat = clock.now().toUtc().millisecondsSinceEpoch ~/ 1000 - 20;
iam_credentials.SignJwtResponse jwtResponse;
Expand All @@ -196,7 +229,7 @@ class _GmailSmtpRelay implements EmailSender {
'aud': _googleOauth2TokenUrl.toString(),
'exp': iat + 3600,
'iat': iat,
'sub': _impersonatedGSuiteUser,
'sub': sender,
}),
'projects/-/serviceAccounts/$_serviceAccountEmail',
));
Expand Down Expand Up @@ -234,3 +267,37 @@ class _GmailSmtpRelay implements EmailSender {
}
}
}

class _GmailConnection {
final DateTime created;
final PersistentConnection connection;
DateTime _lastUsed;
var _sentCount = 0;

_GmailConnection(this.connection)
: created = clock.now(),
_lastUsed = clock.now();

void trackSentEmail(int count) {
_sentCount += count;
_lastUsed = clock.now();
}

bool get isExpired {
// There is a 100-recipient limit per SMTP transaction for smtp-relay.gmail.com.
// Exceeding this limit results in an error message. To send messages to
// additional recipients, start another transaction (new SMTP connection or RSET command).
if (_sentCount > 90) {
return true;
}
final age = clock.now().difference(created);
if (age > Duration(minutes: 5)) {
return true;
}
final idle = clock.now().difference(_lastUsed);
if (idle > Duration(seconds: 25)) {
return true;
}
return false;
}
}
1 change: 0 additions & 1 deletion app/lib/service/services.dart
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ Future<void> withServices(FutureOr<void> Function() fn) async {
activeConfiguration.gmailRelayImpersonatedGSuiteUser != null
? createGmailRelaySender(
activeConfiguration.gmailRelayServiceAccount!,
activeConfiguration.gmailRelayImpersonatedGSuiteUser!,
authClient,
)
: loggingEmailSender,
Expand Down

0 comments on commit 5b72dca

Please sign in to comment.