From c40e8ce2474bb9da1c09f3bbc74bc6e32faae761 Mon Sep 17 00:00:00 2001 From: Evan Hutnik Date: Thu, 11 Dec 2025 12:20:00 -0500 Subject: [PATCH 01/14] Compiles --- rust/cloud-storage/email_service/Cargo.toml | 4 + .../email_service/src/api/context.rs | 3 +- .../api/email/attachments/get_document_id.rs | 2 +- .../email_service/src/api/email/links/list.rs | 2 +- .../src/api/email/messages/send.rs | 2 +- .../src/api/email/sync/enable.rs | 2 +- .../email_service/src/api/gmail/webhook.rs | 2 +- .../src/api/internal/backfill.rs | 3 +- .../src/api/internal/delete_user.rs | 27 +- .../src/api/middleware/gmail_token.rs | 2 +- .../src/bin/pubsub_workers/pubsub_workers.rs | 369 ++++++++++++++++++ rust/cloud-storage/email_service/src/lib.rs | 3 + rust/cloud-storage/email_service/src/main.rs | 257 +----------- .../email_service/src/pubsub/backfill/mod.rs | 2 +- .../email_service/src/pubsub/mod.rs | 10 +- .../email_service/src/pubsub/refresh/mod.rs | 2 +- .../email_service/src/pubsub/scheduled/mod.rs | 2 +- .../src/pubsub/sfs_uploader/mod.rs | 2 +- .../email_service/src/pubsub/webhook/mod.rs | 2 +- .../email_service/src/util/gmail/auth.rs | 2 +- .../src/util/process_pre_insert/mod.rs | 7 +- .../email_service/src/util/redis/backfill.rs | 2 +- .../src/util/upload_attachment.rs | 10 +- 23 files changed, 427 insertions(+), 292 deletions(-) create mode 100644 rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs create mode 100644 rust/cloud-storage/email_service/src/lib.rs diff --git a/rust/cloud-storage/email_service/Cargo.toml b/rust/cloud-storage/email_service/Cargo.toml index 1f23920d4..e0ad55e75 100644 --- a/rust/cloud-storage/email_service/Cargo.toml +++ b/rust/cloud-storage/email_service/Cargo.toml @@ -21,6 +21,10 @@ path = "src/bin/backfill_gmail_attachments/backfill_gmail_attachments.rs" name = "backfill_search" path = "src/bin/backfill_search/backfill_search.rs" +[[bin]] +name = "pubsub_workers" +path = "src/bin/pubsub_workers/pubsub_workers.rs" + [features] attachment_upload = [] connection_gateway = [] diff --git a/rust/cloud-storage/email_service/src/api/context.rs b/rust/cloud-storage/email_service/src/api/context.rs index 5a007a0ae..5f1e3ee01 100644 --- a/rust/cloud-storage/email_service/src/api/context.rs +++ b/rust/cloud-storage/email_service/src/api/context.rs @@ -1,7 +1,8 @@ -use crate::{config::Config, util::redis::RedisClient}; use axum::extract::FromRef; use document_storage_service_client::DocumentStorageServiceClient; use email::{domain::service::EmailServiceImpl, inbound::EmailPreviewState, outbound::EmailPgRepo}; +use email_service::config::Config; +use email_service::util::redis::RedisClient; use frecency::{domain::services::FrecencyQueryServiceImpl, outbound::postgres::FrecencyPgStorage}; use macro_auth::middleware::decode_jwt::JwtValidationArgs; use macro_middleware::auth::internal_access::InternalApiSecretKey; diff --git a/rust/cloud-storage/email_service/src/api/email/attachments/get_document_id.rs b/rust/cloud-storage/email_service/src/api/email/attachments/get_document_id.rs index 897bc9d0a..983a30f46 100644 --- a/rust/cloud-storage/email_service/src/api/email/attachments/get_document_id.rs +++ b/rust/cloud-storage/email_service/src/api/email/attachments/get_document_id.rs @@ -1,9 +1,9 @@ use crate::api::context::ApiContext; -use crate::util::upload_attachment::{UploadAttachmentContext, upload_attachment}; use axum::extract::{Path, State}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use axum::{Extension, Json}; +use email_service::util::upload_attachment::{UploadAttachmentContext, upload_attachment}; use model::response::ErrorResponse; use models_email::db::address::EmailRecipientType; use models_email::email::service::link::Link; diff --git a/rust/cloud-storage/email_service/src/api/email/links/list.rs b/rust/cloud-storage/email_service/src/api/email/links/list.rs index 9888b0a14..322049ba8 100644 --- a/rust/cloud-storage/email_service/src/api/email/links/list.rs +++ b/rust/cloud-storage/email_service/src/api/email/links/list.rs @@ -1,9 +1,9 @@ use crate::api::context::ApiContext; -use crate::util::gmail::auth::fetch_gmail_access_token_from_link; use axum::extract::State; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use axum::{Extension, Json}; +use email_service::util::gmail::auth::fetch_gmail_access_token_from_link; use futures::future::join_all; use model::response::ErrorResponse; use model::user::UserContext; diff --git a/rust/cloud-storage/email_service/src/api/email/messages/send.rs b/rust/cloud-storage/email_service/src/api/email/messages/send.rs index d4d142664..a0033c99d 100644 --- a/rust/cloud-storage/email_service/src/api/email/messages/send.rs +++ b/rust/cloud-storage/email_service/src/api/email/messages/send.rs @@ -1,6 +1,5 @@ use crate::api::context::ApiContext; use crate::api::email::validation::{self, ValidationError}; -use crate::util::gmail::send; use anyhow::Context; use axum::extract::State; use axum::http::StatusCode; @@ -9,6 +8,7 @@ use axum::{Extension, Json}; use base64::Engine; use base64::engine::general_purpose::URL_SAFE_NO_PAD; use email_db_client::messages::insert::insert_message_to_send; +use email_service::util::gmail::send; use model::response::ErrorResponse; use model::user::UserContext; use models_email::email::service::address::ContactInfo; diff --git a/rust/cloud-storage/email_service/src/api/email/sync/enable.rs b/rust/cloud-storage/email_service/src/api/email/sync/enable.rs index e2950430d..d006564a3 100644 --- a/rust/cloud-storage/email_service/src/api/email/sync/enable.rs +++ b/rust/cloud-storage/email_service/src/api/email/sync/enable.rs @@ -112,7 +112,7 @@ pub async fn enable_gmail_sync( ) -> Result { let token = match gmail_access_token { Some(token) => token.to_string(), - None => crate::util::gmail::auth::fetch_gmail_token_usercontext_response( + None => email_service::util::gmail::auth::fetch_gmail_token_usercontext_response( user_context, &ctx.redis_client, &ctx.auth_service_client, diff --git a/rust/cloud-storage/email_service/src/api/gmail/webhook.rs b/rust/cloud-storage/email_service/src/api/gmail/webhook.rs index fc0c6ea1f..5a5d9eed7 100644 --- a/rust/cloud-storage/email_service/src/api/gmail/webhook.rs +++ b/rust/cloud-storage/email_service/src/api/gmail/webhook.rs @@ -1,5 +1,4 @@ use crate::api::context::ApiContext; -use crate::util; use axum::extract::State; use axum::http::HeaderMap; use axum::{ @@ -7,6 +6,7 @@ use axum::{ http::StatusCode, response::{IntoResponse, Json, Response}, }; +use email_service::util; use model::response::ErrorResponse; use models_email::gmail::webhook::{ GmailMessagePayload, GmailWebhookPayload, JwtVerificationError, WebhookOperation, diff --git a/rust/cloud-storage/email_service/src/api/internal/backfill.rs b/rust/cloud-storage/email_service/src/api/internal/backfill.rs index 043fa7c91..96fa2204a 100644 --- a/rust/cloud-storage/email_service/src/api/internal/backfill.rs +++ b/rust/cloud-storage/email_service/src/api/internal/backfill.rs @@ -1,10 +1,11 @@ -use crate::{api::ApiContext, util::backfill::backfill_insights::backfill_email_insights}; +use crate::api::ApiContext; use axum::{ extract::State, http::StatusCode, response::{IntoResponse, Json, Response}, }; +use email_service::util::backfill::backfill_insights::backfill_email_insights; use model::insight_context::email_insights::BackfillEmailInsightsFilter; /// Internal endpoint to backfill insights from emails diff --git a/rust/cloud-storage/email_service/src/api/internal/delete_user.rs b/rust/cloud-storage/email_service/src/api/internal/delete_user.rs index 9b63722f5..dfeeec1ef 100644 --- a/rust/cloud-storage/email_service/src/api/internal/delete_user.rs +++ b/rust/cloud-storage/email_service/src/api/internal/delete_user.rs @@ -39,19 +39,20 @@ pub async fn handler( tracing::error!(error=?e, link_id=?link.id, "Failed to update backfill job statuses"); }; - let gmail_access_token = match crate::util::gmail::auth::fetch_gmail_access_token_from_link( - &link, - &ctx.redis_client, - &ctx.auth_service_client, - ) - .await - { - Ok(token) => Some(token), - Err(e) => { - tracing::error!(error=?e, link_id=?link.id, "unable to fetch access token - skipping stop watch"); - None - } - }; + let gmail_access_token = + match email_service::util::gmail::auth::fetch_gmail_access_token_from_link( + &link, + &ctx.redis_client, + &ctx.auth_service_client, + ) + .await + { + Ok(token) => Some(token), + Err(e) => { + tracing::error!(error=?e, link_id=?link.id, "unable to fetch access token - skipping stop watch"); + None + } + }; if let Some(token) = gmail_access_token && let Err(e) = ctx.gmail_client.stop_watch(&token).await diff --git a/rust/cloud-storage/email_service/src/api/middleware/gmail_token.rs b/rust/cloud-storage/email_service/src/api/middleware/gmail_token.rs index 9ad10aa08..badc158a8 100644 --- a/rust/cloud-storage/email_service/src/api/middleware/gmail_token.rs +++ b/rust/cloud-storage/email_service/src/api/middleware/gmail_token.rs @@ -3,7 +3,7 @@ use axum::{Extension, extract::Request, middleware::Next, response::Response}; use model::user::UserContext; use crate::api::context::ApiContext; -use crate::util::gmail::auth::fetch_gmail_token_usercontext_response; +use email_service::util::gmail::auth::fetch_gmail_token_usercontext_response; pub(in crate::api) async fn attach_gmail_token( State(ctx): State, diff --git a/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs b/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs new file mode 100644 index 000000000..a7c5725c9 --- /dev/null +++ b/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs @@ -0,0 +1,369 @@ +use anyhow::Context; +use document_storage_service_client::DocumentStorageServiceClient; +use email_service::config::{CloudfrontSignerPrivateKey, Config}; +use macro_entrypoint::MacroEntrypoint; +use macro_env::Environment; +use macro_middleware::auth::internal_access::InternalApiSecretKey; +use secretsmanager_client::SecretManager; +use sqlx::postgres::PgPoolOptions; +use static_file_service_client::StaticFileServiceClient; +use std::sync::Arc; +use system_properties::{PgSystemPropertiesRepository, SystemPropertiesServiceImpl}; + +#[tokio::main] +#[tracing::instrument(err)] +async fn main() -> anyhow::Result<()> { + MacroEntrypoint::default().init(); + let env = Environment::new_or_prod(); + + // new + let aws_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region("us-east-1") + .load() + .await; + + let secretsmanager_client = secretsmanager_client::SecretsManager::new( + aws_sdk_secretsmanager::Client::new(&aws_config), + ); + + let cloudfront_signer_private_key = secretsmanager_client + .get_maybe_secret_value(env, CloudfrontSignerPrivateKey::new()?) + .await?; + + // Parse our configuration from the environment. + let config = Config::from_env(cloudfront_signer_private_key) + .context("expected to be able to generate config")?; + + let auth_service_secret_key = match config.environment { + Environment::Local => config.auth_service_secret_key.clone(), + _ => secretsmanager_client + .get_secret_value(config.auth_service_secret_key.clone()) + .await + .context("unable to get secret")? + .to_string(), + }; + + let (min_connections, max_connections): (u32, u32) = match config.environment { + Environment::Production => (3, 30), + Environment::Develop => (1, 10), + Environment::Local => (1, 10), + }; + + let (min_connections_backfill, max_connections_backfill): (u32, u32) = match config.environment + { + Environment::Production => (3, 30), + Environment::Develop => (1, 30), + Environment::Local => (1, 50), + }; + + // all non-backfill workers share a connection pool + let db = PgPoolOptions::new() + .min_connections(min_connections) + .max_connections(max_connections) + .connect(&config.macro_db_url) + .await + .context("could not connect to db")?; + + let db_backfill = PgPoolOptions::new() + .min_connections(min_connections_backfill) + .max_connections(max_connections_backfill) + .connect(&config.macro_db_url) + .await + .context("could not connect to backfill db")?; + + let gmail_queue_aws_config = if cfg!(feature = "local_queue") { + aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region("us-east-1") + .endpoint_url(&config.gmail_webhook_queue) + .load() + .await + } else { + aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region("us-east-1") + .load() + .await + }; + + let sqs_client = sqs_client::SQS::new(aws_sdk_sqs::Client::new(&gmail_queue_aws_config)) + .gmail_webhook_queue(&config.gmail_webhook_queue) + .gmail_webhook_retry_queue(&config.gmail_webhook_retry_queue) + .search_event_queue(&config.search_event_queue) + .insight_context_queue(&config.insight_context_queue) + .email_backfill_queue(&config.backfill_queue) + .email_scheduled_queue(&config.email_scheduled_queue) + .sfs_uploader_queue(&config.sfs_uploader_queue) + .contacts_queue(&config.contacts_queue); + + let macro_notify_client = macro_notify::MacroNotify::new( + config.notification_queue.clone(), + "email_service".to_string(), + ) + .await; + + let refresh_worker = sqs_worker::SQSWorker::new( + aws_sdk_sqs::Client::new(&gmail_queue_aws_config), + config.email_refresh_queue.clone(), + config.queue_max_messages, + config.queue_wait_time_seconds, + ); + + let scheduled_worker = sqs_worker::SQSWorker::new( + aws_sdk_sqs::Client::new(&gmail_queue_aws_config), + config.email_scheduled_queue.clone(), + config.queue_max_messages, + config.queue_wait_time_seconds, + ); + + let sfs_uploader_workers = (0..config.sfs_uploader_workers) + .map(|_| { + sqs_worker::SQSWorker::new( + aws_sdk_sqs::Client::new(&gmail_queue_aws_config), + config.sfs_uploader_queue.clone(), + config.queue_max_messages, + config.queue_wait_time_seconds, + ) + }) + .collect::>(); + + let backfill_workers = (0..config.backfill_queue_workers) + .map(|_| { + sqs_worker::SQSWorker::new( + aws_sdk_sqs::Client::new(&gmail_queue_aws_config), + config.backfill_queue.clone(), + config.backfill_queue_max_messages, + config.queue_wait_time_seconds, + ) + }) + .collect::>(); + + let webhook_workers = (0..config.webhook_queue_workers) + .map(|_| { + sqs_worker::SQSWorker::new( + aws_sdk_sqs::Client::new(&gmail_queue_aws_config), + config.gmail_webhook_queue.clone(), + config.webhook_queue_max_messages, + config.queue_wait_time_seconds, + ) + }) + .collect::>(); + + let webhook_retry_workers = (0..config.webhook_retry_queue_workers) + .map(|_| { + sqs_worker::SQSWorker::new( + aws_sdk_sqs::Client::new(&gmail_queue_aws_config), + config.gmail_webhook_retry_queue.clone(), + config.webhook_retry_queue_max_messages, + config.queue_wait_time_seconds, + ) + }) + .collect::>(); + + let auth_service_client = authentication_service_client::AuthServiceClient::new( + auth_service_secret_key, + config.auth_service_url.clone(), + ); + + let gmail_client = gmail_client::GmailClient::new(config.gmail_gcp_queue.clone()); + + let redis_inner_client = redis::Client::open(config.redis_uri.as_str()) + .inspect(|client| { + client + .get_connection() + .map(|_| tracing::info!("initialized redis connection")) + .inspect_err(|e| { + tracing::error!(error=?e, "failed to connect to redis"); + }) + .ok(); + }) + .context("failed to connect to redis")?; + + let redis_client = email_service::util::redis::RedisClient::new( + redis_inner_client, + config.redis_rate_limit_reqs, + config.redis_rate_limit_window_secs, + ); + + let internal_auth_key = InternalApiSecretKey::new()?; + + let sfs_client = StaticFileServiceClient::new( + internal_auth_key.as_ref().to_string(), + config.static_file_service_url.clone(), + ); + + let dss_client = DocumentStorageServiceClient::new( + internal_auth_key.as_ref().to_string(), + config.document_storage_service_url.clone(), + ); + + let connection_gateway_client = connection_gateway_client::client::ConnectionGatewayClient::new( + internal_auth_key.as_ref().to_string(), + config.connection_gateway_url.clone(), + ); + + let system_properties_service = Arc::new(SystemPropertiesServiceImpl::new( + PgSystemPropertiesRepository::new(db.clone()), + )); + + // process user inbox updates from gmail webhook queue, triggered by update pubsub messages from Google + for worker in webhook_workers { + let db_webhook = db.clone(); + let sqs_client_webhook = sqs_client.clone(); + let gmail_client_webhook = gmail_client.clone(); + let auth_service_client_webhook = auth_service_client.clone(); + let redis_client_webhook = redis_client.clone(); + let macro_notify_client_webhook = macro_notify_client.clone(); + let sfs_client_webhook = sfs_client.clone(); + let connection_gateway_client_webhook = connection_gateway_client.clone(); + let dss_client_webhook = dss_client.clone(); + let system_properties_service_webhook = system_properties_service.clone(); + tokio::spawn(async move { + email_service::pubsub::webhook::worker::run_worker( + db_webhook, + worker, + sqs_client_webhook, + gmail_client_webhook, + auth_service_client_webhook, + redis_client_webhook, + macro_notify_client_webhook, + sfs_client_webhook, + connection_gateway_client_webhook, + dss_client_webhook, + system_properties_service_webhook, + config.notifications_enabled, + false, + ) + .await; + }); + } + tracing::info!( + num_workers = config.webhook_queue_workers, + "webhook workers started" + ); + + // separate queue for retries to avoid backups for large inbox updates that hit gmail api rate limit + for worker in webhook_retry_workers { + let db_webhook = db.clone(); + let sqs_client_webhook = sqs_client.clone(); + let gmail_client_webhook = gmail_client.clone(); + let auth_service_client_webhook = auth_service_client.clone(); + let redis_client_webhook = redis_client.clone(); + let macro_notify_client_webhook = macro_notify_client.clone(); + let sfs_client_webhook = sfs_client.clone(); + let connection_gateway_client_webhook = connection_gateway_client.clone(); + let dss_client_webhook = dss_client.clone(); + let system_properties_service_webhook = system_properties_service.clone(); + tokio::spawn(async move { + email_service::pubsub::webhook::worker::run_worker( + db_webhook, + worker, + sqs_client_webhook, + gmail_client_webhook, + auth_service_client_webhook, + redis_client_webhook, + macro_notify_client_webhook, + sfs_client_webhook, + connection_gateway_client_webhook, + dss_client_webhook, + system_properties_service_webhook, + config.notifications_enabled, + true, + ) + .await; + }); + } + tracing::info!( + num_workers = config.webhook_queue_workers, + "webhook workers started" + ); + + // backfill user emails upon signup + for worker in backfill_workers { + let db_backfill = db_backfill.clone(); + let sqs_client_backfill = sqs_client.clone(); + let gmail_client_backfill = gmail_client.clone(); + let auth_service_client_backfill = auth_service_client.clone(); + let redis_client_backfill = redis_client.clone(); + let macro_notify_client_backfill = macro_notify_client.clone(); + let sfs_client_backfill = sfs_client.clone(); + let connection_gateway_client_backfill = connection_gateway_client.clone(); + let dss_client_backfill = dss_client.clone(); + let system_properties_service_backfill = system_properties_service.clone(); + tokio::spawn(async move { + email_service::pubsub::backfill::worker::run_worker( + db_backfill, + worker, + sqs_client_backfill, + gmail_client_backfill, + auth_service_client_backfill, + redis_client_backfill, + macro_notify_client_backfill, + sfs_client_backfill, + connection_gateway_client_backfill, + dss_client_backfill, + system_properties_service_backfill, + config.notifications_enabled, + ) + .await; + }); + } + tracing::info!( + num_workers = config.backfill_queue_workers, + "backfill workers started" + ); + + let db_refresh = db.clone(); + let gmail_client_refresh = gmail_client.clone(); + let auth_service_client_refresh = auth_service_client.clone(); + let redis_client_refresh = redis_client.clone(); + let sqs_client_refresh = sqs_client.clone(); + // daily refresh operations for user contacts and inbox subscriptions + tokio::spawn(async move { + email_service::pubsub::refresh::worker::run_worker( + refresh_worker, + db_refresh, + gmail_client_refresh, + auth_service_client_refresh, + redis_client_refresh, + sqs_client_refresh, + ) + .await; + }); + + let db_scheduled = db.clone(); + let gmail_client_scheduled = gmail_client.clone(); + let auth_service_client_scheduled = auth_service_client.clone(); + let redis_client_scheduled = redis_client.clone(); + // send scheduled emails + tokio::spawn(async move { + email_service::pubsub::scheduled::worker::run_worker( + scheduled_worker, + db_scheduled, + gmail_client_scheduled, + auth_service_client_scheduled, + redis_client_scheduled, + ) + .await; + }); + + if cfg!(feature = "sfs_map") { + for worker in sfs_uploader_workers { + let db_sfs_uploader = db.clone(); + let sfs_client_sfs_uploader = sfs_client.clone(); + // upload user contact images to sfs from contact sync + tokio::spawn(async move { + email_service::pubsub::sfs_uploader::worker::run_worker( + worker, + db_sfs_uploader, + sfs_client_sfs_uploader, + ) + .await; + }); + } + tracing::info!( + num_workers = config.sfs_uploader_workers, + "sfs uploader workers started" + ); + } + + Ok(()) +} diff --git a/rust/cloud-storage/email_service/src/lib.rs b/rust/cloud-storage/email_service/src/lib.rs new file mode 100644 index 000000000..fb0829176 --- /dev/null +++ b/rust/cloud-storage/email_service/src/lib.rs @@ -0,0 +1,3 @@ +pub mod config; +pub mod pubsub; +pub mod util; diff --git a/rust/cloud-storage/email_service/src/main.rs b/rust/cloud-storage/email_service/src/main.rs index bec29ae4b..73706bc04 100644 --- a/rust/cloud-storage/email_service/src/main.rs +++ b/rust/cloud-storage/email_service/src/main.rs @@ -1,12 +1,12 @@ use crate::api::context::ApiContext; -use crate::config::CloudfrontSignerPrivateKey; use anyhow::Context; -use config::{Config, Environment}; use document_storage_service_client::DocumentStorageServiceClient; use email::{domain::service::EmailServiceImpl, inbound::EmailPreviewState, outbound::EmailPgRepo}; +use email_service::config::CloudfrontSignerPrivateKey; use frecency::{domain::services::FrecencyQueryServiceImpl, outbound::postgres::FrecencyPgStorage}; use macro_auth::middleware::decode_jwt::JwtValidationArgs; use macro_entrypoint::MacroEntrypoint; +use macro_env::Environment; use macro_middleware::auth::internal_access::InternalApiSecretKey; use secretsmanager_client::{LocalOrRemoteSecret, SecretManager}; use sqlx::postgres::PgPoolOptions; @@ -15,9 +15,6 @@ use std::sync::Arc; use system_properties::{PgSystemPropertiesRepository, SystemPropertiesServiceImpl}; mod api; -mod config; -mod pubsub; -mod util; mod utils; #[tokio::main] @@ -43,7 +40,7 @@ async fn main() -> anyhow::Result<()> { .await?; // Parse our configuration from the environment. - let config = Config::from_env(cloudfront_signer_private_key) + let config = email_service::config::Config::from_env(cloudfront_signer_private_key) .context("expected to be able to generate config")?; let auth_service_secret_key = match config.environment { @@ -57,18 +54,11 @@ async fn main() -> anyhow::Result<()> { // limiting to max of 400 connections (25% of macrodb total) in prod. (10 service + 30 backfill) * 10 pod max let (min_connections, max_connections): (u32, u32) = match config.environment { - Environment::Production => (3, 30), + Environment::Production => (3, 20), Environment::Develop => (1, 10), Environment::Local => (1, 10), }; - let (min_connections_backfill, max_connections_backfill): (u32, u32) = match config.environment - { - Environment::Production => (3, 30), - Environment::Develop => (1, 30), - Environment::Local => (1, 50), - }; - let db = PgPoolOptions::new() .min_connections(min_connections) .max_connections(max_connections) @@ -76,13 +66,6 @@ async fn main() -> anyhow::Result<()> { .await .context("could not connect to db")?; - let db_backfill = PgPoolOptions::new() - .min_connections(min_connections_backfill) - .max_connections(max_connections_backfill) - .connect(&config.macro_db_url) - .await - .context("could not connect to backfill db")?; - let gmail_queue_aws_config = if cfg!(feature = "local_queue") { aws_config::defaults(aws_config::BehaviorVersion::latest()) .region("us-east-1") @@ -106,70 +89,6 @@ async fn main() -> anyhow::Result<()> { .sfs_uploader_queue(&config.sfs_uploader_queue) .contacts_queue(&config.contacts_queue); - let macro_notify_client = macro_notify::MacroNotify::new( - config.notification_queue.clone(), - "email_service".to_string(), - ) - .await; - - let refresh_worker = sqs_worker::SQSWorker::new( - aws_sdk_sqs::Client::new(&gmail_queue_aws_config), - config.email_refresh_queue.clone(), - config.queue_max_messages, - config.queue_wait_time_seconds, - ); - - let scheduled_worker = sqs_worker::SQSWorker::new( - aws_sdk_sqs::Client::new(&gmail_queue_aws_config), - config.email_scheduled_queue.clone(), - config.queue_max_messages, - config.queue_wait_time_seconds, - ); - - let sfs_uploader_workers = (0..config.sfs_uploader_workers) - .map(|_| { - sqs_worker::SQSWorker::new( - aws_sdk_sqs::Client::new(&gmail_queue_aws_config), - config.sfs_uploader_queue.clone(), - config.queue_max_messages, - config.queue_wait_time_seconds, - ) - }) - .collect::>(); - - let backfill_workers = (0..config.backfill_queue_workers) - .map(|_| { - sqs_worker::SQSWorker::new( - aws_sdk_sqs::Client::new(&gmail_queue_aws_config), - config.backfill_queue.clone(), - config.backfill_queue_max_messages, - config.queue_wait_time_seconds, - ) - }) - .collect::>(); - - let webhook_workers = (0..config.webhook_queue_workers) - .map(|_| { - sqs_worker::SQSWorker::new( - aws_sdk_sqs::Client::new(&gmail_queue_aws_config), - config.gmail_webhook_queue.clone(), - config.webhook_queue_max_messages, - config.queue_wait_time_seconds, - ) - }) - .collect::>(); - - let webhook_retry_workers = (0..config.webhook_retry_queue_workers) - .map(|_| { - sqs_worker::SQSWorker::new( - aws_sdk_sqs::Client::new(&gmail_queue_aws_config), - config.gmail_webhook_retry_queue.clone(), - config.webhook_retry_queue_max_messages, - config.queue_wait_time_seconds, - ) - }) - .collect::>(); - let auth_service_client = authentication_service_client::AuthServiceClient::new( auth_service_secret_key, config.auth_service_url.clone(), @@ -189,7 +108,7 @@ async fn main() -> anyhow::Result<()> { }) .context("failed to connect to redis")?; - let redis_client = util::redis::RedisClient::new( + let redis_client = email_service::util::redis::RedisClient::new( redis_inner_client, config.redis_rate_limit_reqs, config.redis_rate_limit_window_secs, @@ -207,176 +126,10 @@ async fn main() -> anyhow::Result<()> { config.document_storage_service_url.clone(), ); - let connection_gateway_client = connection_gateway_client::client::ConnectionGatewayClient::new( - internal_auth_key.as_ref().to_string(), - config.connection_gateway_url.clone(), - ); - let system_properties_service = Arc::new(SystemPropertiesServiceImpl::new( PgSystemPropertiesRepository::new(db.clone()), )); - // process user inbox updates from gmail webhook queue, triggered by update pubsub messages from Google - for worker in webhook_workers { - let db_webhook = db.clone(); - let sqs_client_webhook = sqs_client.clone(); - let gmail_client_webhook = gmail_client.clone(); - let auth_service_client_webhook = auth_service_client.clone(); - let redis_client_webhook = redis_client.clone(); - let macro_notify_client_webhook = macro_notify_client.clone(); - let sfs_client_webhook = sfs_client.clone(); - let connection_gateway_client_webhook = connection_gateway_client.clone(); - let dss_client_webhook = dss_client.clone(); - let system_properties_service_webhook = system_properties_service.clone(); - tokio::spawn(async move { - pubsub::webhook::worker::run_worker( - db_webhook, - worker, - sqs_client_webhook, - gmail_client_webhook, - auth_service_client_webhook, - redis_client_webhook, - macro_notify_client_webhook, - sfs_client_webhook, - connection_gateway_client_webhook, - dss_client_webhook, - system_properties_service_webhook, - config.notifications_enabled, - false, - ) - .await; - }); - } - tracing::info!( - num_workers = config.webhook_queue_workers, - "webhook workers started" - ); - - // separate queue for retries to avoid backups for large inbox updates that hit gmail api rate limit - for worker in webhook_retry_workers { - let db_webhook = db.clone(); - let sqs_client_webhook = sqs_client.clone(); - let gmail_client_webhook = gmail_client.clone(); - let auth_service_client_webhook = auth_service_client.clone(); - let redis_client_webhook = redis_client.clone(); - let macro_notify_client_webhook = macro_notify_client.clone(); - let sfs_client_webhook = sfs_client.clone(); - let connection_gateway_client_webhook = connection_gateway_client.clone(); - let dss_client_webhook = dss_client.clone(); - let system_properties_service_webhook = system_properties_service.clone(); - tokio::spawn(async move { - pubsub::webhook::worker::run_worker( - db_webhook, - worker, - sqs_client_webhook, - gmail_client_webhook, - auth_service_client_webhook, - redis_client_webhook, - macro_notify_client_webhook, - sfs_client_webhook, - connection_gateway_client_webhook, - dss_client_webhook, - system_properties_service_webhook, - config.notifications_enabled, - true, - ) - .await; - }); - } - tracing::info!( - num_workers = config.webhook_queue_workers, - "webhook workers started" - ); - - // backfill user emails upon signup - for worker in backfill_workers { - let db_backfill = db_backfill.clone(); - let sqs_client_backfill = sqs_client.clone(); - let gmail_client_backfill = gmail_client.clone(); - let auth_service_client_backfill = auth_service_client.clone(); - let redis_client_backfill = redis_client.clone(); - let macro_notify_client_backfill = macro_notify_client.clone(); - let sfs_client_backfill = sfs_client.clone(); - let connection_gateway_client_backfill = connection_gateway_client.clone(); - let dss_client_backfill = dss_client.clone(); - let system_properties_service_backfill = system_properties_service.clone(); - tokio::spawn(async move { - pubsub::backfill::worker::run_worker( - db_backfill, - worker, - sqs_client_backfill, - gmail_client_backfill, - auth_service_client_backfill, - redis_client_backfill, - macro_notify_client_backfill, - sfs_client_backfill, - connection_gateway_client_backfill, - dss_client_backfill, - system_properties_service_backfill, - config.notifications_enabled, - ) - .await; - }); - } - tracing::info!( - num_workers = config.backfill_queue_workers, - "backfill workers started" - ); - - let db_refresh = db.clone(); - let gmail_client_refresh = gmail_client.clone(); - let auth_service_client_refresh = auth_service_client.clone(); - let redis_client_refresh = redis_client.clone(); - let sqs_client_refresh = sqs_client.clone(); - // daily refresh operations for user contacts and inbox subscriptions - tokio::spawn(async move { - pubsub::refresh::worker::run_worker( - refresh_worker, - db_refresh, - gmail_client_refresh, - auth_service_client_refresh, - redis_client_refresh, - sqs_client_refresh, - ) - .await; - }); - - let db_scheduled = db.clone(); - let gmail_client_scheduled = gmail_client.clone(); - let auth_service_client_scheduled = auth_service_client.clone(); - let redis_client_scheduled = redis_client.clone(); - // send scheduled emails - tokio::spawn(async move { - pubsub::scheduled::worker::run_worker( - scheduled_worker, - db_scheduled, - gmail_client_scheduled, - auth_service_client_scheduled, - redis_client_scheduled, - ) - .await; - }); - - if cfg!(feature = "sfs_map") { - for worker in sfs_uploader_workers { - let db_sfs_uploader = db.clone(); - let sfs_client_sfs_uploader = sfs_client.clone(); - // upload user contact images to sfs from contact sync - tokio::spawn(async move { - pubsub::sfs_uploader::worker::run_worker( - worker, - db_sfs_uploader, - sfs_client_sfs_uploader, - ) - .await; - }); - } - tracing::info!( - num_workers = config.sfs_uploader_workers, - "sfs uploader workers started" - ); - } - let jwt_args = JwtValidationArgs::new_with_secret_manager(config.environment, &secretsmanager_client) .await?; diff --git a/rust/cloud-storage/email_service/src/pubsub/backfill/mod.rs b/rust/cloud-storage/email_service/src/pubsub/backfill/mod.rs index 183c7df44..4762bcc49 100644 --- a/rust/cloud-storage/email_service/src/pubsub/backfill/mod.rs +++ b/rust/cloud-storage/email_service/src/pubsub/backfill/mod.rs @@ -7,4 +7,4 @@ mod init; mod list_threads; mod process; mod update_metadata; -pub(crate) mod worker; +pub mod worker; diff --git a/rust/cloud-storage/email_service/src/pubsub/mod.rs b/rust/cloud-storage/email_service/src/pubsub/mod.rs index 193a40550..0aff8044c 100644 --- a/rust/cloud-storage/email_service/src/pubsub/mod.rs +++ b/rust/cloud-storage/email_service/src/pubsub/mod.rs @@ -1,7 +1,7 @@ -pub(crate) mod backfill; +pub mod backfill; pub(crate) mod context; -pub(crate) mod refresh; -pub(crate) mod scheduled; -pub(crate) mod sfs_uploader; +pub mod refresh; +pub mod scheduled; +pub mod sfs_uploader; pub(crate) mod util; -pub(crate) mod webhook; +pub mod webhook; diff --git a/rust/cloud-storage/email_service/src/pubsub/refresh/mod.rs b/rust/cloud-storage/email_service/src/pubsub/refresh/mod.rs index bbea3c649..4ea071886 100644 --- a/rust/cloud-storage/email_service/src/pubsub/refresh/mod.rs +++ b/rust/cloud-storage/email_service/src/pubsub/refresh/mod.rs @@ -1,3 +1,3 @@ pub(crate) mod context; pub(crate) mod process; -pub(crate) mod worker; +pub mod worker; diff --git a/rust/cloud-storage/email_service/src/pubsub/scheduled/mod.rs b/rust/cloud-storage/email_service/src/pubsub/scheduled/mod.rs index bbea3c649..4ea071886 100644 --- a/rust/cloud-storage/email_service/src/pubsub/scheduled/mod.rs +++ b/rust/cloud-storage/email_service/src/pubsub/scheduled/mod.rs @@ -1,3 +1,3 @@ pub(crate) mod context; pub(crate) mod process; -pub(crate) mod worker; +pub mod worker; diff --git a/rust/cloud-storage/email_service/src/pubsub/sfs_uploader/mod.rs b/rust/cloud-storage/email_service/src/pubsub/sfs_uploader/mod.rs index bbea3c649..4ea071886 100644 --- a/rust/cloud-storage/email_service/src/pubsub/sfs_uploader/mod.rs +++ b/rust/cloud-storage/email_service/src/pubsub/sfs_uploader/mod.rs @@ -1,3 +1,3 @@ pub(crate) mod context; pub(crate) mod process; -pub(crate) mod worker; +pub mod worker; diff --git a/rust/cloud-storage/email_service/src/pubsub/webhook/mod.rs b/rust/cloud-storage/email_service/src/pubsub/webhook/mod.rs index fb443e85c..59b1da237 100644 --- a/rust/cloud-storage/email_service/src/pubsub/webhook/mod.rs +++ b/rust/cloud-storage/email_service/src/pubsub/webhook/mod.rs @@ -1,4 +1,4 @@ mod error_handlers; mod operations; pub(crate) mod process; -pub(crate) mod worker; +pub mod worker; diff --git a/rust/cloud-storage/email_service/src/util/gmail/auth.rs b/rust/cloud-storage/email_service/src/util/gmail/auth.rs index c7b0799bd..95effa640 100644 --- a/rust/cloud-storage/email_service/src/util/gmail/auth.rs +++ b/rust/cloud-storage/email_service/src/util/gmail/auth.rs @@ -1,9 +1,9 @@ -use crate::util::redis::RedisClient; use anyhow::Context; use authentication_service_client::AuthServiceClient; use axum::Json; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; +use crate::util::redis::RedisClient; use gmail_client::GmailClient; use model::response::ErrorResponse; use model::user::UserContext; diff --git a/rust/cloud-storage/email_service/src/util/process_pre_insert/mod.rs b/rust/cloud-storage/email_service/src/util/process_pre_insert/mod.rs index 9d453187c..47c0d9e09 100644 --- a/rust/cloud-storage/email_service/src/util/process_pre_insert/mod.rs +++ b/rust/cloud-storage/email_service/src/util/process_pre_insert/mod.rs @@ -1,5 +1,7 @@ use crate::util::process_pre_insert::clean_message::{clean_message, clean_threads}; -use crate::util::process_pre_insert::sfs_map::{store_message_images, store_threads_images}; +use crate::util::process_pre_insert::sfs_map::{ + store_message_images, store_threads_images, +}; use models_email::email::service; use static_file_service_client::StaticFileServiceClient; @@ -22,7 +24,8 @@ pub async fn process_threads_pre_insert( } // perform necessary processing on a message before inserting into the database -#[tracing::instrument(skip(db, sfs_client, message), fields(message_id = %message.provider_id.clone().unwrap_or_default()))] +#[tracing::instrument(skip(db, sfs_client, message), fields(message_id = %message.provider_id.clone().unwrap_or_default() +))] pub async fn process_message_pre_insert( db: &sqlx::PgPool, sfs_client: &StaticFileServiceClient, diff --git a/rust/cloud-storage/email_service/src/util/redis/backfill.rs b/rust/cloud-storage/email_service/src/util/redis/backfill.rs index a261ad6ee..3e2f203e2 100644 --- a/rust/cloud-storage/email_service/src/util/redis/backfill.rs +++ b/rust/cloud-storage/email_service/src/util/redis/backfill.rs @@ -1,5 +1,5 @@ -use crate::util::redis::RedisClient; use anyhow::Context; +use crate::util::redis::RedisClient; use redis::AsyncCommands; use uuid::Uuid; diff --git a/rust/cloud-storage/email_service/src/util/upload_attachment.rs b/rust/cloud-storage/email_service/src/util/upload_attachment.rs index dc35419b7..f6ff3175b 100644 --- a/rust/cloud-storage/email_service/src/util/upload_attachment.rs +++ b/rust/cloud-storage/email_service/src/util/upload_attachment.rs @@ -1,9 +1,9 @@ use crate::pubsub::util::check_gmail_rate_limit; -use crate::util::redis::RedisClient; use anyhow::{Context, anyhow}; use base64::Engine; use base64::engine::general_purpose::STANDARD; use document_storage_service_client::DocumentStorageServiceClient; +use crate::util::redis::RedisClient; use gmail_client::GmailClient; use macro_user_id::cowlike::ArcCowStr; use macro_user_id::user_id::MacroUserId; @@ -42,8 +42,8 @@ pub async fn upload_attachment( GmailApiOperation::MessagesAttachmentsGet, true, ) - .await - .context("Rate limit check failed")?; + .await + .context("Rate limit check failed")?; // 2. Fetch the raw attachment data from Gmail. let attachment_data = fetch_gmail_attachment_data( @@ -51,7 +51,7 @@ pub async fn upload_attachment( ctx.access_token, &attachment_args.attachment_metadata, ) - .await?; + .await?; // 3. Calculate hashes required for the upload process. let (hex_hash, base64_hash) = calculate_hashes(&attachment_data); @@ -69,7 +69,7 @@ pub async fn upload_attachment( &file_type, attachment_args.backfill, ) - .await?; + .await?; // 6. Upload the attachment data to the presigned URL. upload_data_to_presigned_url(&dss_response, attachment_data, &base64_hash).await?; From be4ba76c25a54797ab9e4b1bd262dd2028b4b570 Mon Sep 17 00:00:00 2001 From: Evan Hutnik Date: Thu, 11 Dec 2025 22:11:50 -0500 Subject: [PATCH 02/14] Moving role outside of service --- .../resources/src/resources/bucket.ts | 8 +- infra/stacks/email-service/index.ts | 71 ++++++++++++++++- infra/stacks/email-service/service.ts | 78 +------------------ 3 files changed, 76 insertions(+), 81 deletions(-) diff --git a/infra/packages/resources/src/resources/bucket.ts b/infra/packages/resources/src/resources/bucket.ts index b37aa8d00..3161948c5 100644 --- a/infra/packages/resources/src/resources/bucket.ts +++ b/infra/packages/resources/src/resources/bucket.ts @@ -46,12 +46,14 @@ export function createBucket({ maxAgeSeconds: 3000, }, ], - logging: + loggings: stack === 'prod' - ? { + ? [ + { targetBucket: 'macro-logging-bucket', targetPrefix: `${bucketName}/`, - } + }, + ] : undefined, }); } diff --git a/infra/stacks/email-service/index.ts b/infra/stacks/email-service/index.ts index 232364df1..49604946e 100644 --- a/infra/stacks/email-service/index.ts +++ b/infra/stacks/email-service/index.ts @@ -1,7 +1,7 @@ import * as aws from '@pulumi/aws'; import * as pulumi from '@pulumi/pulumi'; import * as tls from '@pulumi/tls'; -import { Queue, Redis } from '@resources'; +import {createFrecencyTablePolicy, Queue, Redis} from '@resources'; import { config, getMacroApiToken, @@ -236,17 +236,82 @@ const queueArns = [ contactsQueueArn, ]; +const emailServiceSecretsPolicy = new aws.iam.Policy( + 'email-service-secrets-policy', + { + policy: { + Version: '2012-10-17', + Statement: [ + { + Action: ['secretsmanager:GetSecretValue'], + Resource: [...secretKeyArns], + Effect: 'Allow', + }, + ], + }, + tags: tags, + } +); + +const emailServiceSqsPolicy = new aws.iam.Policy( + 'email-service-sqs-policy', + { + policy: pulumi.output({ + Version: '2012-10-17', + Statement: [ + { + Action: ['sqs:*'], + Resource: queueArns, + Effect: 'Allow', + }, + ], + }), + tags: tags, + } +); + +const emailServiceFrecencyPolicy = createFrecencyTablePolicy( + 'email-service-frecency-policy' +); + +// Create IAM role for email service +const emailServiceRole = new aws.iam.Role( + 'email-service-role', + { + name: `email-service-role-${stack}`, + assumeRolePolicy: { + Version: '2012-10-17', + Statement: [ + { + Action: 'sts:AssumeRole', + Principal: { + Service: 'ecs-tasks.amazonaws.com', + }, + Effect: 'Allow', + Sid: '', + }, + ], + }, + tags: tags, + managedPolicyArns: [ + emailServiceSecretsPolicy.arn, + emailServiceSqsPolicy.arn, + emailServiceFrecencyPolicy.arn, + ], + } +); + + const emailService = new EmailService('email-service', { vpc: coparse_api_vpc, tags, ecsClusterArn: cloudStorageClusterArn, clusterName: cloudStorageClusterName, - secretKeyArns, + role: emailServiceRole, serviceContainerPort: 8080, isPrivate: false, healthCheckPath: '/health', platform: { family: 'linux', architecture: 'amd64' }, - queueArns, cfKeyPair: cfKeyPair, containerEnvVars: [ { diff --git a/infra/stacks/email-service/service.ts b/infra/stacks/email-service/service.ts index 1360028c3..e355ebb2b 100644 --- a/infra/stacks/email-service/service.ts +++ b/infra/stacks/email-service/service.ts @@ -3,7 +3,6 @@ import * as awsx from '@pulumi/awsx'; import * as pulumi from '@pulumi/pulumi'; import type * as tls from '@pulumi/tls'; import { - createFrecencyTablePolicy, DATADOG_API_KEY, datadogAgentContainer, fargateLogRouterSidecarContainer, @@ -22,7 +21,7 @@ export const SERVICE_DOMAIN_NAME = `email-service${ }.${BASE_DOMAIN}`; type Args = { - secretKeyArns: (pulumi.Output | string)[]; + role: aws.iam.Role; clusterName: pulumi.Output | string; ecsClusterArn: pulumi.Output | string; vpc: { @@ -36,7 +35,6 @@ type Args = { containerEnvVars: { name: string; value: pulumi.Output | string }[]; healthCheckPath: string; tags: { [key: string]: string }; - queueArns: pulumi.Output[]; cfKeyPair: tls.PrivateKey; }; @@ -56,6 +54,7 @@ export class EmailService extends pulumi.ComponentResource { constructor( name: string, { + role, ecsClusterArn, vpc, platform, @@ -65,8 +64,6 @@ export class EmailService extends pulumi.ComponentResource { containerEnvVars, clusterName, tags, - secretKeyArns, - queueArns, cfKeyPair, }: Args, opts?: pulumi.ComponentResourceOptions @@ -75,76 +72,7 @@ export class EmailService extends pulumi.ComponentResource { this.tags = tags; this.clusterName = clusterName; - - // role - const secretsPolicy = new aws.iam.Policy( - `${BASE_NAME}-secrets-policy`, - { - policy: { - Version: '2012-10-17', - Statement: [ - { - Action: ['secretsmanager:GetSecretValue'], - Resource: [...secretKeyArns], - Effect: 'Allow', - }, - ], - }, - tags: this.tags, - }, - { parent: this } - ); - - const gmailSqsPolicy = new aws.iam.Policy( - `${BASE_NAME}-gmail-sqs-policy`, - { - policy: pulumi.output({ - Version: '2012-10-17', - Statement: [ - { - Action: ['sqs:*'], - Resource: queueArns, - Effect: 'Allow', - }, - ], - }), - tags: tags, - }, - { parent: this } - ); - - // Create frecency table policy - const frecencyPolicy = createFrecencyTablePolicy( - `${BASE_NAME}-frecency-policy`, - { parent: this } - ); - - this.role = new aws.iam.Role( - `${BASE_NAME}-role`, - { - name: `${BASE_NAME}-role-${stack}`, - assumeRolePolicy: { - Version: '2012-10-17', - Statement: [ - { - Action: 'sts:AssumeRole', - Principal: { - Service: 'ecs-tasks.amazonaws.com', - }, - Effect: 'Allow', - Sid: '', - }, - ], - }, - tags: this.tags, - managedPolicyArns: [ - secretsPolicy.arn, - gmailSqsPolicy.arn, - frecencyPolicy.arn, - ], - }, - { parent: this } - ); + this.role = role; // ecr image const image = new EcrImage( From 68598cb227345515bb78b03cb5b856b9b4ed533c Mon Sep 17 00:00:00 2001 From: Evan Hutnik Date: Thu, 11 Dec 2025 22:23:22 -0500 Subject: [PATCH 03/14] Move bucket into index --- infra/stacks/email-service/index.ts | 40 ++++++++++++++++++++++-- infra/stacks/email-service/service.ts | 45 --------------------------- 2 files changed, 38 insertions(+), 47 deletions(-) diff --git a/infra/stacks/email-service/index.ts b/infra/stacks/email-service/index.ts index 49604946e..cab825b3a 100644 --- a/infra/stacks/email-service/index.ts +++ b/infra/stacks/email-service/index.ts @@ -10,10 +10,11 @@ import { stack, } from '@shared'; import { EmailRefreshHandler } from '@stacks/email-service/refresh_lambda'; -import { cloudfrontPrivateKeySecret } from '@stacks/email-service/s3-cloudfront-distribution'; +import {cloudfrontPrivateKeySecret, getCloudfrontDistribution} from '@stacks/email-service/s3-cloudfront-distribution'; import { EmailScheduledHandler } from '@stacks/email-service/scheduled_lambda'; import { get_coparse_api_vpc } from '@vpc'; import { EmailService } from './service'; +import {EmailAttachmentsBucket} from "@stacks/email-service/attachments-bucket"; const tags = { environment: stack, @@ -301,6 +302,30 @@ const emailServiceRole = new aws.iam.Role( } ); +let emailAttachmentBucket: EmailAttachmentsBucket; +if (stack !== 'local') { + emailAttachmentBucket = new EmailAttachmentsBucket( + `email-attachments-bucket-${stack}`, + { + emailServiceRoleArn: emailServiceRole.arn, + } + ); +} else { + emailAttachmentBucket = new EmailAttachmentsBucket( + `email-attachments-bucket-${stack}`, + {} + ); +} + +const cloudfrontDistribution = getCloudfrontDistribution({ + bucket: emailAttachmentBucket.bucket, + keyPair: cfKeyPair, +}); + +emailAttachmentBucket.attachCloudfrontPolicy({ + cloudfrontDistributionArn: cloudfrontDistribution.distribution.arn, + emailServiceRoleArn: emailServiceRole.arn, +}); const emailService = new EmailService('email-service', { vpc: coparse_api_vpc, @@ -312,7 +337,6 @@ const emailService = new EmailService('email-service', { isPrivate: false, healthCheckPath: '/health', platform: { family: 'linux', architecture: 'amd64' }, - cfKeyPair: cfKeyPair, containerEnvVars: [ { name: 'RUST_LOG', @@ -466,6 +490,18 @@ const emailService = new EmailService('email-service', { name: 'CONTACTS_QUEUE', value: pulumi.interpolate`${contactsQueueName}`, }, + { + name: 'ATTACHMENT_BUCKET', + value: emailAttachmentBucket.bucket.id, + }, + { + name: 'CLOUDFRONT_DISTRIBUTION_URL', + value: pulumi.interpolate`${cloudfrontDistribution.domain}`, + }, + { + name: 'CLOUDFRONT_SIGNER_PUBLIC_KEY_ID', + value: pulumi.interpolate`${cloudfrontDistribution.publicKey.id}`, + } ], }); diff --git a/infra/stacks/email-service/service.ts b/infra/stacks/email-service/service.ts index e355ebb2b..b80920e22 100644 --- a/infra/stacks/email-service/service.ts +++ b/infra/stacks/email-service/service.ts @@ -1,7 +1,6 @@ import * as aws from '@pulumi/aws'; import * as awsx from '@pulumi/awsx'; import * as pulumi from '@pulumi/pulumi'; -import type * as tls from '@pulumi/tls'; import { DATADOG_API_KEY, datadogAgentContainer, @@ -10,8 +9,6 @@ import { } from '@resources'; import { EcrImage } from '@service'; import { BASE_DOMAIN, CLOUD_TRAIL_SNS_TOPIC_ARN, stack } from '@shared'; -import { EmailAttachmentsBucket } from '@stacks/email-service/attachments-bucket'; -import { getCloudfrontDistribution } from '@stacks/email-service/s3-cloudfront-distribution'; const BASE_NAME = 'email-service'; const BASE_PATH = '../../../rust/cloud-storage'; @@ -35,7 +32,6 @@ type Args = { containerEnvVars: { name: string; value: pulumi.Output | string }[]; healthCheckPath: string; tags: { [key: string]: string }; - cfKeyPair: tls.PrivateKey; }; export class EmailService extends pulumi.ComponentResource { @@ -64,7 +60,6 @@ export class EmailService extends pulumi.ComponentResource { containerEnvVars, clusterName, tags, - cfKeyPair, }: Args, opts?: pulumi.ComponentResourceOptions ) { @@ -101,46 +96,6 @@ export class EmailService extends pulumi.ComponentResource { this.serviceAlbSg = sg.serviceAlbSg; this.serviceSg = sg.serviceSg; - let emailAttachmentBucket: EmailAttachmentsBucket; - if (stack !== 'local') { - emailAttachmentBucket = new EmailAttachmentsBucket( - `email-attachments-bucket-${stack}`, - { - emailServiceRoleArn: this.role.arn, - } - ); - } else { - emailAttachmentBucket = new EmailAttachmentsBucket( - `email-attachments-bucket-${stack}`, - {} - ); - } - - const cloudfrontDistribution = getCloudfrontDistribution({ - bucket: emailAttachmentBucket.bucket, - keyPair: cfKeyPair, - }); - - emailAttachmentBucket.attachCloudfrontPolicy({ - cloudfrontDistributionArn: cloudfrontDistribution.distribution.arn, - emailServiceRoleArn: this.role.arn, - }); - - containerEnvVars.push( - { - name: 'ATTACHMENT_BUCKET', - value: emailAttachmentBucket.bucket.id, - }, - { - name: 'CLOUDFRONT_DISTRIBUTION_URL', - value: pulumi.interpolate`${cloudfrontDistribution.domain}`, - }, - { - name: 'CLOUDFRONT_SIGNER_PUBLIC_KEY_ID', - value: pulumi.interpolate`${cloudfrontDistribution.publicKey.id}`, - } - ); - // lb const { targetGroup, lb, listener } = serviceLoadBalancer(this, { serviceName: BASE_NAME, // service name From 8cfb5e684c2cdbafef80f42c390061379c782871 Mon Sep 17 00:00:00 2001 From: Evan Hutnik Date: Thu, 11 Dec 2025 23:40:20 -0500 Subject: [PATCH 04/14] Add pubsub_workers.ts, call in index.ts --- infra/stacks/email-service/index.ts | 345 ++++++++++--------- infra/stacks/email-service/pubsub_workers.ts | 274 +++++++++++++++ 2 files changed, 453 insertions(+), 166 deletions(-) create mode 100644 infra/stacks/email-service/pubsub_workers.ts diff --git a/infra/stacks/email-service/index.ts b/infra/stacks/email-service/index.ts index cab825b3a..491809ef3 100644 --- a/infra/stacks/email-service/index.ts +++ b/infra/stacks/email-service/index.ts @@ -15,6 +15,7 @@ import { EmailScheduledHandler } from '@stacks/email-service/scheduled_lambda'; import { get_coparse_api_vpc } from '@vpc'; import { EmailService } from './service'; import {EmailAttachmentsBucket} from "@stacks/email-service/attachments-bucket"; +import {EmailPubSubWorkers} from "@stacks/email-service/pubsub_workers"; const tags = { environment: stack, @@ -327,6 +328,173 @@ emailAttachmentBucket.attachCloudfrontPolicy({ emailServiceRoleArn: emailServiceRole.arn, }); +const containerEnvVars = [ + { + name: 'RUST_LOG', + value: `email=${stack === 'prod' ? 'debug' : 'debug'},email_service=${stack === 'prod' ? 'debug' : 'debug'},pubsub_workers=${stack === 'prod' ? 'debug' : 'debug'},email_db_client=${stack === 'prod' ? 'info' : 'debug'},gmail_client=${stack === 'prod' ? 'info' : 'debug'},tower_http=info,insight_service_client=${stack === 'prod' ? 'info' : 'debug'}`, + }, + { + name: 'ENVIRONMENT', + value: stack, + }, + { + name: 'MACRO_DB_URL', + value: pulumi.interpolate`${MACRO_DB_URL}`, + }, + { + name: 'REDIS_URI', + value: pulumi.interpolate`redis://${emailServiceRedis.endpoint}`, + }, + { + name: 'EMAIL_REFRESH_QUEUE', + value: refreshQueueName, + }, + { + name: 'EMAIL_SCHEDULED_QUEUE', + value: scheduledQueueName, + }, + { + name: 'GMAIL_WEBHOOK_QUEUE', + value: webhookQueueName, + }, + { + name: 'GMAIL_WEBHOOK_RETRY_QUEUE', + value: webhookRetryQueueName, + }, + { + name: 'BACKFILL_QUEUE', + value: backfillQueueName, + }, + { + name: 'SFS_UPLOADER_QUEUE', + value: sfsUploaderQueueName, + }, + { + name: 'GMAIL_GCP_QUEUE', + value: pulumi.interpolate`${GMAIL_GCP_QUEUE}`, + }, + { + name: 'NOTIFICATION_QUEUE', + value: pulumi.interpolate`${notificationQueueName}`, + }, + { + name: 'INSIGHT_CONTEXT_QUEUE', + value: pulumi.interpolate`${insightContextQueueName}`, + }, + { + name: 'JWT_SECRET_KEY', + value: pulumi.interpolate`${JWT_SECRET_KEY}`, + }, + { + name: 'AUDIENCE', + value: pulumi.interpolate`${AUDIENCE}`, + }, + { + name: 'ISSUER', + value: pulumi.interpolate`${ISSUER}`, + }, + { + name: 'INTERNAL_API_SECRET_KEY', + value: pulumi.interpolate`${INTERNAL_AUTH_KEY}`, + }, + { + name: 'AUTHENTICATION_SERVICE_URL', + value: pulumi.interpolate`https://auth-service${stack === 'prod' ? '' : `-${stack}`}.macro.com`, + }, + { + name: 'AUTHENTICATION_SERVICE_SECRET_KEY', + value: pulumi.interpolate`${AUTHENTICATION_SERVICE_INTERNAL_API_KEY}`, + }, + { + name: 'STATIC_FILE_SERVICE_URL', + value: `https://static-file-service${stack === 'prod' ? '' : `-${stack}`}.macro.com`, + }, + { + name: 'DOCUMENT_STORAGE_SERVICE_URL', + value: `https://cloud-storage${stack === 'prod' ? '' : `-${stack}`}.macro.com`, + }, + { + name: 'CONNECTION_GATEWAY_URL', + value: `https://connection-gateway${stack === 'prod' ? '' : `-${stack}`}.macro.com`, + }, + { + name: 'NOTIFICATIONS_ENABLED', + value: pulumi.interpolate`${NOTIFICATIONS_ENABLED}`, + }, + { + name: 'SEARCH_EVENT_QUEUE', + value: pulumi.interpolate`${searchEventQueueName}`, + }, + { + name: 'REDIS_RATE_LIMIT_REQS', + value: pulumi.interpolate`${REDIS_RATE_LIMIT_REQS}`, + }, + { + name: 'REDIS_RATE_LIMIT_WINDOW_SECS', + value: pulumi.interpolate`${REDIS_RATE_LIMIT_WINDOW_SECS}`, + }, + { + name: 'BACKFILL_QUEUE_WORKERS', + value: pulumi.interpolate`${BACKFILL_QUEUE_WORKERS}`, + }, + { + name: 'BACKFILL_QUEUE_MAX_MESSAGES', + value: pulumi.interpolate`${BACKFILL_QUEUE_MAX_MESSAGES}`, + }, + { + name: 'WEBHOOK_QUEUE_WORKERS', + value: pulumi.interpolate`${WEBHOOK_QUEUE_WORKERS}`, + }, + { + name: 'WEBHOOK_QUEUE_MAX_MESSAGES', + value: pulumi.interpolate`${WEBHOOK_QUEUE_MAX_MESSAGES}`, + }, + { + name: 'WEBHOOK_RETRY_QUEUE_WORKERS', + value: pulumi.interpolate`${WEBHOOK_RETRY_QUEUE_WORKERS}`, + }, + { + name: 'WEBHOOK_RETRY_QUEUE_MAX_MESSAGES', + value: pulumi.interpolate`${WEBHOOK_RETRY_QUEUE_MAX_MESSAGES}`, + }, + { + name: 'SFS_UPLOADER_WORKERS', + value: pulumi.interpolate`${SFS_UPLOADER_WORKERS}`, + }, + { + name: 'MACRO_API_TOKEN_ISSUER', + value: pulumi.interpolate`${MACRO_API_TOKENS.macroApiTokenIssuer}`, + }, + { + name: 'MACRO_API_TOKEN_PUBLIC_KEY', + value: pulumi.interpolate`${MACRO_API_TOKENS.macroApiTokenPublicKey}`, + }, + { + name: 'PRESIGNED_URL_TTL_SECS', + value: pulumi.interpolate`${PRESIGNED_URL_TTL_SECS}`, + }, + { + name: 'CLOUDFRONT_SIGNER_PRIVATE_KEY', + value: pulumi.interpolate`${CLOUDFRONT_PRIVATE_KEY}`, + }, + { + name: 'CONTACTS_QUEUE', + value: pulumi.interpolate`${contactsQueueName}`, + }, + { + name: 'ATTACHMENT_BUCKET', + value: emailAttachmentBucket.bucket.id, + }, + { + name: 'CLOUDFRONT_DISTRIBUTION_URL', + value: pulumi.interpolate`${cloudfrontDistribution.domain}`, + }, + { + name: 'CLOUDFRONT_SIGNER_PUBLIC_KEY_ID', + value: pulumi.interpolate`${cloudfrontDistribution.publicKey.id}`, + } +]; + const emailService = new EmailService('email-service', { vpc: coparse_api_vpc, tags, @@ -337,176 +505,21 @@ const emailService = new EmailService('email-service', { isPrivate: false, healthCheckPath: '/health', platform: { family: 'linux', architecture: 'amd64' }, - containerEnvVars: [ - { - name: 'RUST_LOG', - value: `email=${stack === 'prod' ? 'debug' : 'debug'},email_service=${stack === 'prod' ? 'debug' : 'debug'},email_db_client=${stack === 'prod' ? 'info' : 'debug'},gmail_client=${stack === 'prod' ? 'info' : 'debug'},tower_http=info,insight_service_client=${stack === 'prod' ? 'info' : 'debug'}`, - }, - { - name: 'ENVIRONMENT', - value: stack, - }, - { - name: 'MACRO_DB_URL', - value: pulumi.interpolate`${MACRO_DB_URL}`, - }, - { - name: 'REDIS_URI', - value: pulumi.interpolate`redis://${emailServiceRedis.endpoint}`, - }, - { - name: 'EMAIL_REFRESH_QUEUE', - value: refreshQueueName, - }, - { - name: 'EMAIL_SCHEDULED_QUEUE', - value: scheduledQueueName, - }, - { - name: 'GMAIL_WEBHOOK_QUEUE', - value: webhookQueueName, - }, - { - name: 'GMAIL_WEBHOOK_RETRY_QUEUE', - value: webhookRetryQueueName, - }, - { - name: 'BACKFILL_QUEUE', - value: backfillQueueName, - }, - { - name: 'SFS_UPLOADER_QUEUE', - value: sfsUploaderQueueName, - }, - { - name: 'GMAIL_GCP_QUEUE', - value: pulumi.interpolate`${GMAIL_GCP_QUEUE}`, - }, - { - name: 'NOTIFICATION_QUEUE', - value: pulumi.interpolate`${notificationQueueName}`, - }, - { - name: 'INSIGHT_CONTEXT_QUEUE', - value: pulumi.interpolate`${insightContextQueueName}`, - }, - { - name: 'JWT_SECRET_KEY', - value: pulumi.interpolate`${JWT_SECRET_KEY}`, - }, - { - name: 'AUDIENCE', - value: pulumi.interpolate`${AUDIENCE}`, - }, - { - name: 'ISSUER', - value: pulumi.interpolate`${ISSUER}`, - }, - { - name: 'INTERNAL_API_SECRET_KEY', - value: pulumi.interpolate`${INTERNAL_AUTH_KEY}`, - }, - { - name: 'AUTHENTICATION_SERVICE_URL', - value: pulumi.interpolate`https://auth-service${stack === 'prod' ? '' : `-${stack}`}.macro.com`, - }, - { - name: 'AUTHENTICATION_SERVICE_SECRET_KEY', - value: pulumi.interpolate`${AUTHENTICATION_SERVICE_INTERNAL_API_KEY}`, - }, - { - name: 'STATIC_FILE_SERVICE_URL', - value: `https://static-file-service${stack === 'prod' ? '' : `-${stack}`}.macro.com`, - }, - { - name: 'DOCUMENT_STORAGE_SERVICE_URL', - value: `https://cloud-storage${stack === 'prod' ? '' : `-${stack}`}.macro.com`, - }, - { - name: 'CONNECTION_GATEWAY_URL', - value: `https://connection-gateway${stack === 'prod' ? '' : `-${stack}`}.macro.com`, - }, - { - name: 'NOTIFICATIONS_ENABLED', - value: pulumi.interpolate`${NOTIFICATIONS_ENABLED}`, - }, - { - name: 'SEARCH_EVENT_QUEUE', - value: pulumi.interpolate`${searchEventQueueName}`, - }, - { - name: 'REDIS_RATE_LIMIT_REQS', - value: pulumi.interpolate`${REDIS_RATE_LIMIT_REQS}`, - }, - { - name: 'REDIS_RATE_LIMIT_WINDOW_SECS', - value: pulumi.interpolate`${REDIS_RATE_LIMIT_WINDOW_SECS}`, - }, - { - name: 'BACKFILL_QUEUE_WORKERS', - value: pulumi.interpolate`${BACKFILL_QUEUE_WORKERS}`, - }, - { - name: 'BACKFILL_QUEUE_MAX_MESSAGES', - value: pulumi.interpolate`${BACKFILL_QUEUE_MAX_MESSAGES}`, - }, - { - name: 'WEBHOOK_QUEUE_WORKERS', - value: pulumi.interpolate`${WEBHOOK_QUEUE_WORKERS}`, - }, - { - name: 'WEBHOOK_QUEUE_MAX_MESSAGES', - value: pulumi.interpolate`${WEBHOOK_QUEUE_MAX_MESSAGES}`, - }, - { - name: 'WEBHOOK_RETRY_QUEUE_WORKERS', - value: pulumi.interpolate`${WEBHOOK_RETRY_QUEUE_WORKERS}`, - }, - { - name: 'WEBHOOK_RETRY_QUEUE_MAX_MESSAGES', - value: pulumi.interpolate`${WEBHOOK_RETRY_QUEUE_MAX_MESSAGES}`, - }, - { - name: 'SFS_UPLOADER_WORKERS', - value: pulumi.interpolate`${SFS_UPLOADER_WORKERS}`, - }, - { - name: 'MACRO_API_TOKEN_ISSUER', - value: pulumi.interpolate`${MACRO_API_TOKENS.macroApiTokenIssuer}`, - }, - { - name: 'MACRO_API_TOKEN_PUBLIC_KEY', - value: pulumi.interpolate`${MACRO_API_TOKENS.macroApiTokenPublicKey}`, - }, - { - name: 'PRESIGNED_URL_TTL_SECS', - value: pulumi.interpolate`${PRESIGNED_URL_TTL_SECS}`, - }, - { - name: 'CLOUDFRONT_SIGNER_PRIVATE_KEY', - value: pulumi.interpolate`${CLOUDFRONT_PRIVATE_KEY}`, - }, - { - name: 'CONTACTS_QUEUE', - value: pulumi.interpolate`${contactsQueueName}`, - }, - { - name: 'ATTACHMENT_BUCKET', - value: emailAttachmentBucket.bucket.id, - }, - { - name: 'CLOUDFRONT_DISTRIBUTION_URL', - value: pulumi.interpolate`${cloudfrontDistribution.domain}`, - }, - { - name: 'CLOUDFRONT_SIGNER_PUBLIC_KEY_ID', - value: pulumi.interpolate`${cloudfrontDistribution.publicKey.id}`, - } - ], + containerEnvVars, }); export const emailServiceUrl = pulumi.interpolate`${emailService.domain}`; +new EmailPubSubWorkers('email-pubsub-workers', { + vpc: coparse_api_vpc, + tags, + ecsClusterArn: cloudStorageClusterArn, + clusterName: cloudStorageClusterName, + role: emailServiceRole, + platform: { family: 'linux', architecture: 'amd64' }, + containerEnvVars, +}); + const emailRefreshHandler = new EmailRefreshHandler('email-refresh-handler', { queueArns: [refreshQueueArn], vpc: coparse_api_vpc, diff --git a/infra/stacks/email-service/pubsub_workers.ts b/infra/stacks/email-service/pubsub_workers.ts new file mode 100644 index 000000000..b95cdc6be --- /dev/null +++ b/infra/stacks/email-service/pubsub_workers.ts @@ -0,0 +1,274 @@ +import * as aws from '@pulumi/aws'; +import * as awsx from '@pulumi/awsx'; +import * as pulumi from '@pulumi/pulumi'; +import { + DATADOG_API_KEY, + datadogAgentContainer, + fargateLogRouterSidecarContainer, +} from '@resources'; +import { EcrImage } from '@service'; +import { CLOUD_TRAIL_SNS_TOPIC_ARN, stack } from '@shared'; + +const BASE_NAME = 'email-service-pubsub-workers'; +const BASE_PATH = '../../../rust/cloud-storage'; + +type Args = { + role: aws.iam.Role; + clusterName: pulumi.Output | string; + ecsClusterArn: pulumi.Output | string; + vpc: { + vpcId: pulumi.Output | string; + publicSubnetIds: pulumi.Output | string[]; + privateSubnetIds: pulumi.Output | string[]; + }; + platform: { family: string; architecture: 'amd64' | 'arm64' }; + containerEnvVars: { name: string; value: pulumi.Output | string }[]; + tags: { [key: string]: string }; +}; + +export class EmailPubSubWorkers extends pulumi.ComponentResource { + public role: aws.iam.Role; + public ecr: awsx.ecr.Repository; + public serviceSg: aws.ec2.SecurityGroup; + public service: awsx.ecs.FargateService; + public clusterName: pulumi.Output | string; + public tags: { [key: string]: string }; + + constructor( + name: string, + { + role, + ecsClusterArn, + vpc, + platform, + containerEnvVars, + clusterName, + tags, + }: Args, + opts?: pulumi.ComponentResourceOptions + ) { + super('my:components:PubSubWorkers', name, {}, opts); + this.tags = tags; + + this.clusterName = clusterName; + this.role = role; + + // ecr image + const image = new EcrImage( + `${BASE_NAME}-ecr-image-${stack}`, + { + repositoryId: `${BASE_NAME}-ecr-${stack}`, + repositoryName: `${BASE_NAME}-${stack}`, + imageId: `${BASE_NAME}-image-${stack}`, + imagePath: BASE_PATH, + dockerfile: 'Dockerfile', + platform, + tags: this.tags, + buildArgs: { + SERVICE_NAME: 'pubsub_workers', + }, + }, + { parent: this } + ); + this.ecr = image.ecr; + + // sg - workers only need egress + this.serviceSg = this.initializeSecurityGroup({ + vpcId: vpc.vpcId, + }); + + // service + const service = new awsx.ecs.FargateService( + `${BASE_NAME}`, + { + tags, + cluster: ecsClusterArn, + networkConfiguration: { + subnets: vpc.privateSubnetIds, + securityGroups: [this.serviceSg.id], + }, + taskDefinitionArgs: { + taskRole: { + roleArn: this.role.arn, + }, + containers: { + log_router: fargateLogRouterSidecarContainer, + datadog_agent: datadogAgentContainer, + service: { + name: BASE_NAME, + image: image.image.imageUri, + stopTimeout: 10, // 10 seconds to force kill the task + cpu: stack === 'prod' ? 2048 : 1024, + memory: stack === 'prod' ? 3742 : 1742, // 2048 minimum - 256 for datadog - 50 for log_router + environment: [...containerEnvVars], + logConfiguration: { + logDriver: 'awsfirelens', + options: { + Name: 'datadog', + Host: 'http-intake.logs.us5.datadoghq.com', + apikey: DATADOG_API_KEY, + dd_service: `${BASE_NAME}-${stack}`, + dd_source: 'fargate', + dd_tags: `project:cloudstorage, env:${stack}`, + provider: 'ecs', + }, + }, + // No portMappings - workers don't expose ports + }, + }, + runtimePlatform: { + operatingSystemFamily: `${platform.family.toUpperCase()}`, + cpuArchitecture: `${ + platform.architecture === 'amd64' + ? 'X86_64' + : platform.architecture.toUpperCase() + }`, + }, + }, + desiredCount: stack === 'prod' ? 5 : 1, + }, + { parent: this } + ); + + this.service = service; + + this.setupAutoScaling(); + + this.setupServiceAlarms(); + } + + initializeSecurityGroup({ + vpcId, + }: { + vpcId: pulumi.Output | string; + }) { + const serviceSg = new aws.ec2.SecurityGroup( + `${BASE_NAME}-sg-${stack}`, + { + name: `${BASE_NAME}-sg-${stack}`, + vpcId, + description: `${BASE_NAME} security group that is attached directly to the service`, + tags: this.tags, + }, + { parent: this } + ); + + new aws.vpc.SecurityGroupEgressRule( + `${BASE_NAME}-all-out`, + { + securityGroupId: serviceSg.id, + description: 'Allow all outbound', + cidrIpv4: '0.0.0.0/0', + ipProtocol: '-1', + tags: this.tags, + }, + { parent: this } + ); + + return serviceSg; + } + + setupAutoScaling() { + if (!this.service) return; + + const serviceScalableTarget = new aws.appautoscaling.Target( + `${BASE_NAME}-scalable-target-${stack}`, + { + maxCapacity: stack === 'prod' ? 10 : 2, + minCapacity: stack === 'prod' ? 5 : 1, + resourceId: pulumi.interpolate`service/${this.clusterName}/${this.service.service.name}`, + scalableDimension: 'ecs:service:DesiredCount', + serviceNamespace: 'ecs', + tags: this.tags, + }, + { parent: this } + ); + + // Create an Auto Scaling policy for CPU utilization. + new aws.appautoscaling.Policy( + `${BASE_NAME}-scaling-policy-cpu-${stack}`, + { + policyType: 'TargetTrackingScaling', + resourceId: serviceScalableTarget.resourceId, + scalableDimension: serviceScalableTarget.scalableDimension, + serviceNamespace: serviceScalableTarget.serviceNamespace, + targetTrackingScalingPolicyConfiguration: { + targetValue: 70.0, + predefinedMetricSpecification: { + predefinedMetricType: 'ECSServiceAverageCPUUtilization', + }, + scaleInCooldown: 100, + scaleOutCooldown: 300, + }, + }, + { parent: this } + ); + + new aws.appautoscaling.Policy( + `${BASE_NAME}-scaling-policy-memory-${stack}`, + { + policyType: 'TargetTrackingScaling', + resourceId: serviceScalableTarget.resourceId, + scalableDimension: serviceScalableTarget.scalableDimension, + serviceNamespace: serviceScalableTarget.serviceNamespace, + targetTrackingScalingPolicyConfiguration: { + targetValue: 70.0, + predefinedMetricSpecification: { + predefinedMetricType: 'ECSServiceAverageMemoryUtilization', + }, + scaleInCooldown: 100, + scaleOutCooldown: 300, + }, + }, + { parent: this } + ); + } + + setupServiceAlarms() { + new aws.cloudwatch.MetricAlarm( + `${BASE_NAME}-high-cpu-alarm`, + { + name: `${BASE_NAME}-high-cpu-alarm-${stack}`, + metricName: 'CPUUtilization', + namespace: 'AWS/ECS', + statistic: 'Average', + period: 180, + evaluationPeriods: 1, + threshold: 80, + comparisonOperator: 'GreaterThanThreshold', + dimensions: { + ClusterName: this.clusterName, + ServiceName: this.service.service.name, + }, + alarmDescription: `High CPU usage alarm for ${BASE_NAME} service.`, + actionsEnabled: true, + alarmActions: [CLOUD_TRAIL_SNS_TOPIC_ARN], + tags: this.tags, + }, + { parent: this } + ); + + new aws.cloudwatch.MetricAlarm( + `${BASE_NAME}-high-mem-alarm`, + { + name: `${BASE_NAME}-high-mem-alarm-${stack}`, + metricName: 'MemoryUtilization', + namespace: 'AWS/ECS', + statistic: 'Average', + period: 180, + evaluationPeriods: 1, + threshold: 80, + comparisonOperator: 'GreaterThanThreshold', + dimensions: { + ClusterName: this.clusterName, + ServiceName: this.service.service.name, + }, + alarmDescription: `High Memory usage alarm for ${BASE_NAME} service.`, + actionsEnabled: true, + alarmActions: [CLOUD_TRAIL_SNS_TOPIC_ARN], + tags: this.tags, + }, + { parent: this } + ); + } +} From ab6fa22ae1abca1cd852c149736291911e4141d6 Mon Sep 17 00:00:00 2001 From: Evan Hutnik Date: Thu, 11 Dec 2025 23:40:47 -0500 Subject: [PATCH 05/14] pubsub_workers.ts handle shutdown --- rust/cloud-storage/email_service/Cargo.toml | 2 +- .../src/bin/pubsub_workers/pubsub_workers.rs | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/rust/cloud-storage/email_service/Cargo.toml b/rust/cloud-storage/email_service/Cargo.toml index e0ad55e75..6b9b9c7fa 100644 --- a/rust/cloud-storage/email_service/Cargo.toml +++ b/rust/cloud-storage/email_service/Cargo.toml @@ -113,7 +113,7 @@ static_file_service_client = { path = "../static_file_service_client" } strum_macros = { workspace = true } system_properties = { path = "../system_properties" } thiserror = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["signal"] } tower = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } diff --git a/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs b/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs index a7c5725c9..429a8d291 100644 --- a/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs +++ b/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs @@ -365,5 +365,23 @@ async fn main() -> anyhow::Result<()> { ); } + tracing::info!("All workers started successfully"); + + // Wait for shutdown signal (SIGTERM from ECS or SIGINT from Ctrl+C) + tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::info!("Received SIGINT (Ctrl+C)"); + } + _ = async { + let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to install SIGTERM handler"); + term.recv().await + } => { + tracing::info!("Received SIGTERM"); + } + } + + tracing::info!("Shutdown signal received, exiting gracefully..."); + Ok(()) } From 9ac6e0c4da4770be5232f4e6daa43638058a05ed Mon Sep 17 00:00:00 2001 From: Evan Hutnik Date: Fri, 12 Dec 2025 09:37:15 -0500 Subject: [PATCH 06/14] Pre fix merge conflicts --- infra/stacks/email-service/index.ts | 14 +++++++------- infra/stacks/email-service/service.ts | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/infra/stacks/email-service/index.ts b/infra/stacks/email-service/index.ts index 491809ef3..a62527729 100644 --- a/infra/stacks/email-service/index.ts +++ b/infra/stacks/email-service/index.ts @@ -1,21 +1,21 @@ import * as aws from '@pulumi/aws'; import * as pulumi from '@pulumi/pulumi'; import * as tls from '@pulumi/tls'; -import {createFrecencyTablePolicy, Queue, Redis} from '@resources'; +import {createFrecencyTablePolicy, Queue, Redis} from '../../packages/resources'; import { config, getMacroApiToken, getMacroNotify, getSearchEventQueue, stack, -} from '@shared'; -import { EmailRefreshHandler } from '@stacks/email-service/refresh_lambda'; -import {cloudfrontPrivateKeySecret, getCloudfrontDistribution} from '@stacks/email-service/s3-cloudfront-distribution'; +} from '../../packages/shared'; +import { EmailRefreshHandler } from './refresh_lambda'; +import {cloudfrontPrivateKeySecret, getCloudfrontDistribution} from './s3-cloudfront-distribution'; import { EmailScheduledHandler } from '@stacks/email-service/scheduled_lambda'; -import { get_coparse_api_vpc } from '@vpc'; +import { get_coparse_api_vpc } from '../../packages/vpc'; import { EmailService } from './service'; -import {EmailAttachmentsBucket} from "@stacks/email-service/attachments-bucket"; -import {EmailPubSubWorkers} from "@stacks/email-service/pubsub_workers"; +import {EmailAttachmentsBucket} from "./attachments-bucket"; +import {EmailPubSubWorkers} from "./pubsub_workers"; const tags = { environment: stack, diff --git a/infra/stacks/email-service/service.ts b/infra/stacks/email-service/service.ts index b80920e22..b4efaaacd 100644 --- a/infra/stacks/email-service/service.ts +++ b/infra/stacks/email-service/service.ts @@ -6,9 +6,9 @@ import { datadogAgentContainer, fargateLogRouterSidecarContainer, serviceLoadBalancer, -} from '@resources'; -import { EcrImage } from '@service'; -import { BASE_DOMAIN, CLOUD_TRAIL_SNS_TOPIC_ARN, stack } from '@shared'; +} from '../../packages/resources'; +import { EcrImage } from '../../packages/service'; +import { BASE_DOMAIN, CLOUD_TRAIL_SNS_TOPIC_ARN, stack } from '../../packages/shared'; const BASE_NAME = 'email-service'; const BASE_PATH = '../../../rust/cloud-storage'; From 56f101b4449d40bcf24f0dd2094c2b084a7efc50 Mon Sep 17 00:00:00 2001 From: Evan Hutnik Date: Fri, 12 Dec 2025 10:03:31 -0500 Subject: [PATCH 07/14] Cleanup --- infra/stacks/email-service/index.ts | 2 +- infra/stacks/email-service/pubsub_workers.ts | 6 +++--- infra/stacks/email-service/service.ts | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/infra/stacks/email-service/index.ts b/infra/stacks/email-service/index.ts index a62527729..d02f9887d 100644 --- a/infra/stacks/email-service/index.ts +++ b/infra/stacks/email-service/index.ts @@ -11,7 +11,7 @@ import { } from '../../packages/shared'; import { EmailRefreshHandler } from './refresh_lambda'; import {cloudfrontPrivateKeySecret, getCloudfrontDistribution} from './s3-cloudfront-distribution'; -import { EmailScheduledHandler } from '@stacks/email-service/scheduled_lambda'; +import { EmailScheduledHandler } from './scheduled_lambda'; import { get_coparse_api_vpc } from '../../packages/vpc'; import { EmailService } from './service'; import {EmailAttachmentsBucket} from "./attachments-bucket"; diff --git a/infra/stacks/email-service/pubsub_workers.ts b/infra/stacks/email-service/pubsub_workers.ts index b95cdc6be..44996c2a3 100644 --- a/infra/stacks/email-service/pubsub_workers.ts +++ b/infra/stacks/email-service/pubsub_workers.ts @@ -5,9 +5,9 @@ import { DATADOG_API_KEY, datadogAgentContainer, fargateLogRouterSidecarContainer, -} from '@resources'; -import { EcrImage } from '@service'; -import { CLOUD_TRAIL_SNS_TOPIC_ARN, stack } from '@shared'; +} from '../../packages/resources'; +import { EcrImage } from '../../packages/service'; +import { CLOUD_TRAIL_SNS_TOPIC_ARN, stack } from '../../packages/shared'; const BASE_NAME = 'email-service-pubsub-workers'; const BASE_PATH = '../../../rust/cloud-storage'; diff --git a/infra/stacks/email-service/service.ts b/infra/stacks/email-service/service.ts index b4efaaacd..ee4f2be2c 100644 --- a/infra/stacks/email-service/service.ts +++ b/infra/stacks/email-service/service.ts @@ -131,8 +131,8 @@ export class EmailService extends pulumi.ComponentResource { name: BASE_NAME, image: image.image.imageUri, stopTimeout: 10, // 10 seconds to force kill the task - cpu: stack === 'prod' ? 2048 : 1024, - memory: stack === 'prod' ? 3742 : 1742, // 2048 minimum - 256 for datadog - 50 for log_router + cpu: stack === 'prod' ? 1024 : 256, + memory: stack === 'prod' ? 1742 : 717, // 2048 minimum - 256 for datadog - 50 for log_router environment: [...containerEnvVars], logConfiguration: { logDriver: 'awsfirelens', From 7c2b6cb91dab7d1ee913fb6a18385f1496b727d2 Mon Sep 17 00:00:00 2001 From: Evan Hutnik Date: Fri, 12 Dec 2025 10:09:06 -0500 Subject: [PATCH 08/14] Change bucket back --- infra/packages/resources/src/resources/bucket.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/infra/packages/resources/src/resources/bucket.ts b/infra/packages/resources/src/resources/bucket.ts index 79c9db0ea..fd62ab67e 100644 --- a/infra/packages/resources/src/resources/bucket.ts +++ b/infra/packages/resources/src/resources/bucket.ts @@ -46,14 +46,12 @@ export function createBucket({ maxAgeSeconds: 3000, }, ], - loggings: + logging: stack === 'prod' - ? [ - { + ? { targetBucket: 'macro-logging-bucket', targetPrefix: `${bucketName}/`, - }, - ] + } : undefined, }); } From 4824a1432ebba0a1972ca1d73638f567254bcb85 Mon Sep 17 00:00:00 2001 From: Evan Hutnik Date: Fri, 12 Dec 2025 10:29:20 -0500 Subject: [PATCH 09/14] Do it in ci --- infra/stacks/email-service/index.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/infra/stacks/email-service/index.ts b/infra/stacks/email-service/index.ts index d02f9887d..885cf5ef5 100644 --- a/infra/stacks/email-service/index.ts +++ b/infra/stacks/email-service/index.ts @@ -239,7 +239,7 @@ const queueArns = [ ]; const emailServiceSecretsPolicy = new aws.iam.Policy( - 'email-service-secrets-policy', + 'email-service-secrets-policy-2', { policy: { Version: '2012-10-17', @@ -256,7 +256,7 @@ const emailServiceSecretsPolicy = new aws.iam.Policy( ); const emailServiceSqsPolicy = new aws.iam.Policy( - 'email-service-sqs-policy', + 'email-service-sqs-policy-2', { policy: pulumi.output({ Version: '2012-10-17', @@ -273,14 +273,14 @@ const emailServiceSqsPolicy = new aws.iam.Policy( ); const emailServiceFrecencyPolicy = createFrecencyTablePolicy( - 'email-service-frecency-policy' + 'email-service-frecency-policy-2' ); // Create IAM role for email service const emailServiceRole = new aws.iam.Role( - 'email-service-role', + 'email-service-role-2', { - name: `email-service-role-${stack}`, + name: `email-service-role-2-${stack}`, assumeRolePolicy: { Version: '2012-10-17', Statement: [ From e0beefc321640c088ab0537f70d1f6278539cb5c Mon Sep 17 00:00:00 2001 From: Evan Hutnik Date: Fri, 12 Dec 2025 12:27:55 -0500 Subject: [PATCH 10/14] Lower number of workers in dev --- infra/stacks/email-service/Pulumi.dev.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/infra/stacks/email-service/Pulumi.dev.yaml b/infra/stacks/email-service/Pulumi.dev.yaml index 13c61008e..eaf84b712 100644 --- a/infra/stacks/email-service/Pulumi.dev.yaml +++ b/infra/stacks/email-service/Pulumi.dev.yaml @@ -11,9 +11,9 @@ config: email-service:notifications_enabled: true email-service:redis_rate_limit_reqs: 14000 email-service:redis_rate_limit_window_secs: 60 - email-service:backfill_queue_workers: 25 + email-service:backfill_queue_workers: 15 email-service:backfill_queue_max_messages: 1 - email-service:webhook_queue_workers: 25 + email-service:webhook_queue_workers: 15 email-service:webhook_queue_max_messages: 1 email-service:webhook_retry_queue_workers: 15 email-service:webhook_retry_queue_max_messages: 1 From b6e773731f5dcf89b5913f04cdb7cd8c8645fcd1 Mon Sep 17 00:00:00 2001 From: Evan Hutnik Date: Fri, 12 Dec 2025 12:45:04 -0500 Subject: [PATCH 11/14] Threads --- .../email_service/src/bin/pubsub_workers/pubsub_workers.rs | 4 ++-- rust/cloud-storage/email_service/src/main.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs b/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs index 429a8d291..37b5c521b 100644 --- a/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs +++ b/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs @@ -44,14 +44,14 @@ async fn main() -> anyhow::Result<()> { }; let (min_connections, max_connections): (u32, u32) = match config.environment { - Environment::Production => (3, 30), + Environment::Production => (3, 15), Environment::Develop => (1, 10), Environment::Local => (1, 10), }; let (min_connections_backfill, max_connections_backfill): (u32, u32) = match config.environment { - Environment::Production => (3, 30), + Environment::Production => (3, 25), Environment::Develop => (1, 30), Environment::Local => (1, 50), }; diff --git a/rust/cloud-storage/email_service/src/main.rs b/rust/cloud-storage/email_service/src/main.rs index 73706bc04..a2aad56e9 100644 --- a/rust/cloud-storage/email_service/src/main.rs +++ b/rust/cloud-storage/email_service/src/main.rs @@ -52,7 +52,7 @@ async fn main() -> anyhow::Result<()> { .to_string(), }; - // limiting to max of 400 connections (25% of macrodb total) in prod. (10 service + 30 backfill) * 10 pod max + // limiting to max of 200 connections (12.5% of macrodb total) in prod. let (min_connections, max_connections): (u32, u32) = match config.environment { Environment::Production => (3, 20), Environment::Develop => (1, 10), From dc4f61461f0434df6533880ac529b5eb95d34ad4 Mon Sep 17 00:00:00 2001 From: Evan Hutnik Date: Fri, 12 Dec 2025 12:49:18 -0500 Subject: [PATCH 12/14] Format --- infra/stacks/email-service/index.ts | 89 ++++++++++++++------------- infra/stacks/email-service/service.ts | 6 +- 2 files changed, 50 insertions(+), 45 deletions(-) diff --git a/infra/stacks/email-service/index.ts b/infra/stacks/email-service/index.ts index 885cf5ef5..7ed3f2ff4 100644 --- a/infra/stacks/email-service/index.ts +++ b/infra/stacks/email-service/index.ts @@ -1,7 +1,11 @@ import * as aws from '@pulumi/aws'; import * as pulumi from '@pulumi/pulumi'; import * as tls from '@pulumi/tls'; -import {createFrecencyTablePolicy, Queue, Redis} from '../../packages/resources'; +import { + createFrecencyTablePolicy, + Queue, + Redis, +} from '../../packages/resources'; import { config, getMacroApiToken, @@ -10,12 +14,15 @@ import { stack, } from '../../packages/shared'; import { EmailRefreshHandler } from './refresh_lambda'; -import {cloudfrontPrivateKeySecret, getCloudfrontDistribution} from './s3-cloudfront-distribution'; +import { + cloudfrontPrivateKeySecret, + getCloudfrontDistribution, +} from './s3-cloudfront-distribution'; import { EmailScheduledHandler } from './scheduled_lambda'; import { get_coparse_api_vpc } from '../../packages/vpc'; import { EmailService } from './service'; -import {EmailAttachmentsBucket} from "./attachments-bucket"; -import {EmailPubSubWorkers} from "./pubsub_workers"; +import { EmailAttachmentsBucket } from './attachments-bucket'; +import { EmailPubSubWorkers } from './pubsub_workers'; const tags = { environment: stack, @@ -255,53 +262,47 @@ const emailServiceSecretsPolicy = new aws.iam.Policy( } ); -const emailServiceSqsPolicy = new aws.iam.Policy( - 'email-service-sqs-policy-2', - { - policy: pulumi.output({ - Version: '2012-10-17', - Statement: [ - { - Action: ['sqs:*'], - Resource: queueArns, - Effect: 'Allow', - }, - ], - }), - tags: tags, - } -); +const emailServiceSqsPolicy = new aws.iam.Policy('email-service-sqs-policy-2', { + policy: pulumi.output({ + Version: '2012-10-17', + Statement: [ + { + Action: ['sqs:*'], + Resource: queueArns, + Effect: 'Allow', + }, + ], + }), + tags: tags, +}); const emailServiceFrecencyPolicy = createFrecencyTablePolicy( 'email-service-frecency-policy-2' ); // Create IAM role for email service -const emailServiceRole = new aws.iam.Role( - 'email-service-role-2', - { - name: `email-service-role-2-${stack}`, - assumeRolePolicy: { - Version: '2012-10-17', - Statement: [ - { - Action: 'sts:AssumeRole', - Principal: { - Service: 'ecs-tasks.amazonaws.com', - }, - Effect: 'Allow', - Sid: '', +const emailServiceRole = new aws.iam.Role('email-service-role-2', { + name: `email-service-role-2-${stack}`, + assumeRolePolicy: { + Version: '2012-10-17', + Statement: [ + { + Action: 'sts:AssumeRole', + Principal: { + Service: 'ecs-tasks.amazonaws.com', }, - ], - }, - tags: tags, - managedPolicyArns: [ - emailServiceSecretsPolicy.arn, - emailServiceSqsPolicy.arn, - emailServiceFrecencyPolicy.arn, + Effect: 'Allow', + Sid: '', + }, ], - } -); + }, + tags: tags, + managedPolicyArns: [ + emailServiceSecretsPolicy.arn, + emailServiceSqsPolicy.arn, + emailServiceFrecencyPolicy.arn, + ], +}); let emailAttachmentBucket: EmailAttachmentsBucket; if (stack !== 'local') { @@ -492,7 +493,7 @@ const containerEnvVars = [ { name: 'CLOUDFRONT_SIGNER_PUBLIC_KEY_ID', value: pulumi.interpolate`${cloudfrontDistribution.publicKey.id}`, - } + }, ]; const emailService = new EmailService('email-service', { diff --git a/infra/stacks/email-service/service.ts b/infra/stacks/email-service/service.ts index ee4f2be2c..ed6f841bf 100644 --- a/infra/stacks/email-service/service.ts +++ b/infra/stacks/email-service/service.ts @@ -8,7 +8,11 @@ import { serviceLoadBalancer, } from '../../packages/resources'; import { EcrImage } from '../../packages/service'; -import { BASE_DOMAIN, CLOUD_TRAIL_SNS_TOPIC_ARN, stack } from '../../packages/shared'; +import { + BASE_DOMAIN, + CLOUD_TRAIL_SNS_TOPIC_ARN, + stack, +} from '../../packages/shared'; const BASE_NAME = 'email-service'; const BASE_PATH = '../../../rust/cloud-storage'; From 7a0c6ea43bbae5d3aa8202f7b85ff3e2ae43820e Mon Sep 17 00:00:00 2001 From: Evan Hutnik Date: Fri, 12 Dec 2025 12:51:07 -0500 Subject: [PATCH 13/14] biome --- infra/stacks/email-service/index.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/infra/stacks/email-service/index.ts b/infra/stacks/email-service/index.ts index 7ed3f2ff4..ccc274198 100644 --- a/infra/stacks/email-service/index.ts +++ b/infra/stacks/email-service/index.ts @@ -13,16 +13,16 @@ import { getSearchEventQueue, stack, } from '../../packages/shared'; +import { get_coparse_api_vpc } from '../../packages/vpc'; +import { EmailAttachmentsBucket } from './attachments-bucket'; +import { EmailPubSubWorkers } from './pubsub_workers'; import { EmailRefreshHandler } from './refresh_lambda'; import { cloudfrontPrivateKeySecret, getCloudfrontDistribution, } from './s3-cloudfront-distribution'; import { EmailScheduledHandler } from './scheduled_lambda'; -import { get_coparse_api_vpc } from '../../packages/vpc'; import { EmailService } from './service'; -import { EmailAttachmentsBucket } from './attachments-bucket'; -import { EmailPubSubWorkers } from './pubsub_workers'; const tags = { environment: stack, From 9d6a7e2b011299fc1caa825737d44c68dc29960b Mon Sep 17 00:00:00 2001 From: Evan Hutnik Date: Fri, 12 Dec 2025 12:57:23 -0500 Subject: [PATCH 14/14] Formatting --- .../cloud-storage/email_service/src/util/gmail/auth.rs | 2 +- .../email_service/src/util/process_pre_insert/mod.rs | 4 +--- .../email_service/src/util/redis/backfill.rs | 2 +- .../email_service/src/util/upload_attachment.rs | 10 +++++----- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/rust/cloud-storage/email_service/src/util/gmail/auth.rs b/rust/cloud-storage/email_service/src/util/gmail/auth.rs index 95effa640..c7b0799bd 100644 --- a/rust/cloud-storage/email_service/src/util/gmail/auth.rs +++ b/rust/cloud-storage/email_service/src/util/gmail/auth.rs @@ -1,9 +1,9 @@ +use crate::util::redis::RedisClient; use anyhow::Context; use authentication_service_client::AuthServiceClient; use axum::Json; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; -use crate::util::redis::RedisClient; use gmail_client::GmailClient; use model::response::ErrorResponse; use model::user::UserContext; diff --git a/rust/cloud-storage/email_service/src/util/process_pre_insert/mod.rs b/rust/cloud-storage/email_service/src/util/process_pre_insert/mod.rs index 47c0d9e09..6802e84e8 100644 --- a/rust/cloud-storage/email_service/src/util/process_pre_insert/mod.rs +++ b/rust/cloud-storage/email_service/src/util/process_pre_insert/mod.rs @@ -1,7 +1,5 @@ use crate::util::process_pre_insert::clean_message::{clean_message, clean_threads}; -use crate::util::process_pre_insert::sfs_map::{ - store_message_images, store_threads_images, -}; +use crate::util::process_pre_insert::sfs_map::{store_message_images, store_threads_images}; use models_email::email::service; use static_file_service_client::StaticFileServiceClient; diff --git a/rust/cloud-storage/email_service/src/util/redis/backfill.rs b/rust/cloud-storage/email_service/src/util/redis/backfill.rs index 3e2f203e2..a261ad6ee 100644 --- a/rust/cloud-storage/email_service/src/util/redis/backfill.rs +++ b/rust/cloud-storage/email_service/src/util/redis/backfill.rs @@ -1,5 +1,5 @@ -use anyhow::Context; use crate::util::redis::RedisClient; +use anyhow::Context; use redis::AsyncCommands; use uuid::Uuid; diff --git a/rust/cloud-storage/email_service/src/util/upload_attachment.rs b/rust/cloud-storage/email_service/src/util/upload_attachment.rs index 058a2719d..ed0911c80 100644 --- a/rust/cloud-storage/email_service/src/util/upload_attachment.rs +++ b/rust/cloud-storage/email_service/src/util/upload_attachment.rs @@ -1,9 +1,9 @@ use crate::pubsub::util::check_gmail_rate_limit; +use crate::util::redis::RedisClient; use anyhow::{Context, anyhow}; use base64::Engine; use base64::engine::general_purpose::STANDARD; use document_storage_service_client::DocumentStorageServiceClient; -use crate::util::redis::RedisClient; use gmail_client::GmailClient; use macro_user_id::cowlike::ArcCowStr; use macro_user_id::user_id::MacroUserId; @@ -48,8 +48,8 @@ pub async fn upload_attachment( GmailApiOperation::MessagesAttachmentsGet, true, ) - .await - .context("Rate limit check failed")?; + .await + .context("Rate limit check failed")?; // 2. Fetch the raw attachment data from Gmail. let attachment_data = fetch_gmail_attachment_data( @@ -57,7 +57,7 @@ pub async fn upload_attachment( ctx.access_token, &args.attachment_metadata, ) - .await?; + .await?; let mime_type = args.attachment_metadata.mime_type.clone(); @@ -126,7 +126,7 @@ async fn upload_document_attachment( &file_type, args.backfill, ) - .await?; + .await?; // 4. Upload the attachment data to the presigned URL. upload_data_to_presigned_url(&dss_response, attachment_data, &base64_hash).await?;