diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index 6dec921f656..f1749bdf83a 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -229,7 +229,7 @@ fn build_redis_config_options( let max_connections = options.max_connections.unwrap_or(default_connections); let min_idle = options .min_idle - .unwrap_or_else(|| max_connections.div_ceil(crate::redis::DEFAULT_MIN_IDLE_RATIO)); + .unwrap_or_else(|| max_connections.div_ceil(DEFAULT_MIN_IDLE_RATIO)); RedisConfigOptions { max_connections, @@ -250,7 +250,6 @@ pub(super) fn create_redis_pool( RedisConfig::Cluster { cluster_nodes, options, - .. } => RedisConfigRef::Cluster { cluster_nodes, options: build_redis_config_options(options, default_connections), @@ -277,10 +276,8 @@ pub(super) fn create_redis_pool( pub(super) fn create_redis_pools(configs: &RedisConfigs, cpu_concurrency: u32) -> RedisPoolConfigs { // Default `max_connections` for the `project_configs` pool. // In a unified config, this is used for all pools. - let project_configs_default_connections = std::cmp::max( - cpu_concurrency * 2, - crate::redis::DEFAULT_MIN_MAX_CONNECTIONS, - ); + let project_configs_default_connections = + std::cmp::max(cpu_concurrency * 2, DEFAULT_MIN_MAX_CONNECTIONS); match configs { RedisConfigs::Unified(cfg) => { diff --git a/relay-redis/src/real.rs b/relay-redis/src/real.rs index bd8b58972f2..30fa44f773b 100644 --- a/relay-redis/src/real.rs +++ b/relay-redis/src/real.rs @@ -194,7 +194,7 @@ impl PooledClient { /// Recursively computes the [`ConnectionInner`] from a [`PooledClient`]. fn connection_inner(&mut self) -> Result, RedisError> { let inner = match self { - PooledClient::Cluster(ref mut connection, opts) => { + PooledClient::Cluster(connection, opts) => { connection .set_read_timeout(Some(Duration::from_secs(opts.read_timeout))) .map_err(RedisError::Redis)?; @@ -220,7 +220,7 @@ impl PooledClient { secondaries: secondary_connections, } } - PooledClient::Single(ref mut connection, opts) => { + PooledClient::Single(connection, opts) => { connection .set_read_timeout(Some(Duration::from_secs(opts.read_timeout))) .map_err(RedisError::Redis)?; @@ -295,7 +295,9 @@ impl RedisPool { /// Creates a [`RedisPool`] in single-node configuration. pub fn single(server: &str, opts: RedisConfigOptions) -> Result { - let pool = Self::client_pool(server, &opts)?; + let pool = Self::base_pool_builder(&opts) + .build(redis::Client::open(server).map_err(RedisError::Redis)?) + .map_err(RedisError::Pool)?; Ok(RedisPool::Single(pool, opts)) } @@ -303,7 +305,7 @@ impl RedisPool { /// Returns a pooled connection to a client. pub fn client(&self) -> Result { let pool = match self { - RedisPool::Cluster(ref pool, opts) => PooledClient::Cluster( + RedisPool::Cluster(pool, opts) => PooledClient::Cluster( Box::new(pool.get().map_err(RedisError::Pool)?), opts.clone(), ), @@ -323,7 +325,7 @@ impl RedisPool { secondaries: secondary_clients, } } - RedisPool::Single(ref pool, opts) => PooledClient::Single( + RedisPool::Single(pool, opts) => PooledClient::Single( Box::new(pool.get().map_err(RedisError::Pool)?), opts.clone(), ), @@ -341,16 +343,6 @@ impl RedisPool { } } - /// Returns a [`Pool`] with a [`redis::Client`]. - fn client_pool( - server: &str, - opts: &RedisConfigOptions, - ) -> Result, RedisError> { - Self::base_pool_builder(opts) - .build(redis::Client::open(server).map_err(RedisError::Redis)?) - .map_err(RedisError::Pool) - } - /// Returns the base builder for the pool with the options applied. fn base_pool_builder(opts: &RedisConfigOptions) -> Builder { Pool::builder() diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 0ff0bc2b7ec..02871016c4d 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -14,6 +14,7 @@ use relay_config::Config; use tokio::time::{timeout, Instant}; use crate::envelope::Envelope; +use crate::envelope::Item; use crate::services::buffer::common::ProjectKeyPair; use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError; use crate::services::buffer::envelope_stack::EnvelopeStack; @@ -78,6 +79,11 @@ impl PolymorphicEnvelopeBuffer { /// Adds an envelope to the buffer. pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { + relay_statsd::metric!( + histogram(RelayHistograms::BufferEnvelopeBodySize) = + envelope.items().map(Item::len).sum::() as u64 + ); + relay_statsd::metric!(timer(RelayTimers::BufferPush), { match self { Self::Sqlite(buffer) => buffer.push(envelope).await, diff --git a/relay-server/src/services/projects/cache.rs b/relay-server/src/services/projects/cache.rs index 354a668af2f..658900e873a 100644 --- a/relay-server/src/services/projects/cache.rs +++ b/relay-server/src/services/projects/cache.rs @@ -1,5 +1,4 @@ use std::collections::{BTreeMap, BTreeSet}; -use std::convert::Infallible; use std::error::Error; use std::sync::Arc; use std::time::Duration; @@ -10,33 +9,23 @@ use crate::services::global_config; use crate::services::processor::{ EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics, }; -use crate::services::projects::project::state::UpstreamProjectState; use crate::Envelope; use chrono::{DateTime, Utc}; use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; -#[cfg(feature = "processing")] -use relay_config::RedisConfigRef; -use relay_config::{Config, RelayMode}; +use relay_config::Config; use relay_metrics::Bucket; use relay_quotas::RateLimits; use relay_redis::RedisPool; use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, Sender, Service}; -#[cfg(feature = "processing")] -use tokio::sync::Semaphore; use tokio::sync::{mpsc, watch}; use tokio::time::Instant; use crate::services::metrics::{Aggregator, FlushBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::projects::project::{Project, ProjectFetchState, ProjectSender, ProjectState}; -use crate::services::projects::source::local::{LocalProjectSource, LocalProjectSourceService}; -#[cfg(feature = "processing")] -use crate::services::projects::source::redis::RedisProjectSource; -use crate::services::projects::source::upstream::{ - UpstreamProjectSource, UpstreamProjectSourceService, -}; +use crate::services::projects::source::ProjectSource; use crate::services::spooler::{ self, Buffer, BufferService, DequeueMany, Enqueue, QueueKey, RemoveMany, RestoreIndex, UnspooledEnvelope, BATCH_KEY_COUNT, @@ -47,10 +36,6 @@ use crate::services::upstream::UpstreamRelay; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; use crate::utils::{GarbageDisposal, ManagedEnvelope, MemoryChecker, RetryBackoff, SleepHandle}; -/// Default value of maximum connections to Redis. This value was arbitrarily determined. -#[cfg(feature = "processing")] -const DEFAULT_REDIS_MAX_CONNECTIONS: u32 = 10; - /// Requests a refresh of a project state from one of the available sources. /// /// The project state is resolved in the following precedence: @@ -405,159 +390,6 @@ impl FromMessage for ProjectCache { } } -/// Helper type that contains all configured sources for project cache fetching. -/// -/// See [`RequestUpdate`] for a description on how project states are fetched. -#[derive(Clone, Debug)] -struct ProjectSource { - config: Arc, - local_source: Addr, - upstream_source: Addr, - #[cfg(feature = "processing")] - redis_source: Option, - #[cfg(feature = "processing")] - redis_semaphore: Arc, -} - -impl ProjectSource { - /// Starts all project source services in the current runtime. - pub fn start( - config: Arc, - upstream_relay: Addr, - _redis: Option, - ) -> Self { - let local_source = LocalProjectSourceService::new(config.clone()).start(); - let upstream_source = - UpstreamProjectSourceService::new(config.clone(), upstream_relay).start(); - - #[cfg(feature = "processing")] - let redis_max_connections = config - .redis() - .map(|configs| { - let config = match configs { - relay_config::RedisPoolConfigs::Unified(config) => config, - relay_config::RedisPoolConfigs::Individual { - project_configs: config, - .. - } => config, - }; - Self::compute_max_connections(config).unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS) - }) - .unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS); - #[cfg(feature = "processing")] - let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool)); - - Self { - config, - local_source, - upstream_source, - #[cfg(feature = "processing")] - redis_source, - #[cfg(feature = "processing")] - redis_semaphore: Arc::new(Semaphore::new(redis_max_connections.try_into().unwrap())), - } - } - - #[cfg(feature = "processing")] - fn compute_max_connections(config: RedisConfigRef) -> Option { - match config { - RedisConfigRef::Cluster { options, .. } => Some(options.max_connections), - RedisConfigRef::MultiWrite { configs } => configs - .into_iter() - .filter_map(|c| Self::compute_max_connections(c)) - .max(), - RedisConfigRef::Single { options, .. } => Some(options.max_connections), - } - } - - async fn fetch( - self, - project_key: ProjectKey, - no_cache: bool, - cached_state: ProjectFetchState, - ) -> Result { - let state_opt = self - .local_source - .send(FetchOptionalProjectState { project_key }) - .await?; - - if let Some(state) = state_opt { - return Ok(ProjectFetchState::new(state)); - } - - match self.config.relay_mode() { - RelayMode::Proxy => return Ok(ProjectFetchState::allowed()), - RelayMode::Static => return Ok(ProjectFetchState::disabled()), - RelayMode::Capture => return Ok(ProjectFetchState::allowed()), - RelayMode::Managed => (), // Proceed with loading the config from redis or upstream - } - - let current_revision = cached_state.revision().map(String::from); - #[cfg(feature = "processing")] - if let Some(redis_source) = self.redis_source { - let current_revision = current_revision.clone(); - - let redis_permit = self.redis_semaphore.acquire().await?; - let state_fetch_result = tokio::task::spawn_blocking(move || { - redis_source.get_config_if_changed(project_key, current_revision.as_deref()) - }) - .await?; - drop(redis_permit); - - match state_fetch_result { - // New state fetched from Redis, possibly pending. - Ok(UpstreamProjectState::New(state)) => { - let state = state.sanitized(); - if !state.is_pending() { - return Ok(ProjectFetchState::new(state)); - } - } - // Redis reported that we're holding an up-to-date version of the state already, - // refresh the state and return the old cached state again. - Ok(UpstreamProjectState::NotModified) => { - return Ok(ProjectFetchState::refresh(cached_state)) - } - Err(error) => { - relay_log::error!( - error = &error as &dyn Error, - "failed to fetch project from Redis", - ); - } - }; - }; - - let state = self - .upstream_source - .send(FetchProjectState { - project_key, - current_revision, - no_cache, - }) - .await?; - - match state { - UpstreamProjectState::New(state) => Ok(ProjectFetchState::new(state.sanitized())), - UpstreamProjectState::NotModified => Ok(ProjectFetchState::refresh(cached_state)), - } - } -} - -#[derive(Debug, thiserror::Error)] -enum ProjectSourceError { - #[error("redis permit error {0}")] - RedisPermit(#[from] tokio::sync::AcquireError), - #[error("redis join error {0}")] - RedisJoin(#[from] tokio::task::JoinError), - #[error("upstream error {0}")] - Upstream(#[from] relay_system::SendError), -} - -impl From for ProjectSourceError { - fn from(value: Infallible) -> Self { - match value {} - } -} - /// Updates the cache with new project state information. struct UpdateProjectState { /// The public key to fetch the project by. @@ -1498,36 +1330,6 @@ impl Service for ProjectCacheService { } } -#[derive(Clone, Debug)] -pub struct FetchProjectState { - /// The public key to fetch the project by. - pub project_key: ProjectKey, - - /// Currently cached revision if available. - /// - /// The upstream is allowed to omit full project configs - /// for requests for which the requester already has the most - /// recent revision. - /// - /// Settings this to `None` will essentially always re-fetch - /// the project config. - pub current_revision: Option, - - /// If true, all caches should be skipped and a fresh state should be computed. - pub no_cache: bool, -} - -#[derive(Clone, Debug)] -pub struct FetchOptionalProjectState { - project_key: ProjectKey, -} - -impl FetchOptionalProjectState { - pub fn project_key(&self) -> ProjectKey { - self.project_key - } -} - /// Sum type for all objects which need to be discareded through the [`GarbageDisposal`]. #[derive(Debug)] #[allow(dead_code)] // Fields are never read, only used for discarding/dropping data. diff --git a/relay-server/src/services/projects/source/local.rs b/relay-server/src/services/projects/source/local.rs index 94f3a50835b..9bffd851ef0 100644 --- a/relay-server/src/services/projects/source/local.rs +++ b/relay-server/src/services/projects/source/local.rs @@ -9,8 +9,8 @@ use relay_system::{AsyncResponse, FromMessage, Interface, Receiver, Sender, Serv use tokio::sync::mpsc; use tokio::time::Instant; -use crate::services::projects::cache::FetchOptionalProjectState; use crate::services::projects::project::{ParsedProjectState, ProjectState}; +use crate::services::projects::source::FetchOptionalProjectState; /// Service interface of the local project source. #[derive(Debug)] diff --git a/relay-server/src/services/projects/source/mod.rs b/relay-server/src/services/projects/source/mod.rs index 80f6a34d782..5b208ef3aed 100644 --- a/relay-server/src/services/projects/source/mod.rs +++ b/relay-server/src/services/projects/source/mod.rs @@ -1,4 +1,210 @@ +use std::convert::Infallible; +use std::sync::Arc; + +use relay_base_schema::project::ProjectKey; +#[cfg(feature = "processing")] +use relay_config::RedisConfigRef; +use relay_config::{Config, RelayMode}; +use relay_redis::RedisPool; +use relay_system::{Addr, Service as _}; +#[cfg(feature = "processing")] +use tokio::sync::Semaphore; + pub mod local; #[cfg(feature = "processing")] pub mod redis; pub mod upstream; + +use crate::services::projects::project::state::UpstreamProjectState; +use crate::services::projects::project::ProjectFetchState; +use crate::services::upstream::UpstreamRelay; + +use self::local::{LocalProjectSource, LocalProjectSourceService}; +#[cfg(feature = "processing")] +use self::redis::RedisProjectSource; +use self::upstream::{UpstreamProjectSource, UpstreamProjectSourceService}; + +/// Default value of maximum connections to Redis. This value was arbitrarily determined. +#[cfg(feature = "processing")] +const DEFAULT_REDIS_MAX_CONNECTIONS: u32 = 10; + +/// Helper type that contains all configured sources for project cache fetching. +#[derive(Clone, Debug)] +pub struct ProjectSource { + config: Arc, + local_source: Addr, + upstream_source: Addr, + #[cfg(feature = "processing")] + redis_source: Option, + #[cfg(feature = "processing")] + redis_semaphore: Arc, +} + +impl ProjectSource { + /// Starts all project source services in the current runtime. + pub fn start( + config: Arc, + upstream_relay: Addr, + _redis: Option, + ) -> Self { + let local_source = LocalProjectSourceService::new(config.clone()).start(); + let upstream_source = + UpstreamProjectSourceService::new(config.clone(), upstream_relay).start(); + + #[cfg(feature = "processing")] + let redis_max_connections = config + .redis() + .map(|configs| { + let config = match configs { + relay_config::RedisPoolConfigs::Unified(config) => config, + relay_config::RedisPoolConfigs::Individual { + project_configs: config, + .. + } => config, + }; + Self::compute_max_connections(config).unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS) + }) + .unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS); + #[cfg(feature = "processing")] + let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool)); + + Self { + config, + local_source, + upstream_source, + #[cfg(feature = "processing")] + redis_source, + #[cfg(feature = "processing")] + redis_semaphore: Arc::new(Semaphore::new(redis_max_connections.try_into().unwrap())), + } + } + + #[cfg(feature = "processing")] + fn compute_max_connections(config: RedisConfigRef) -> Option { + match config { + RedisConfigRef::Cluster { options, .. } => Some(options.max_connections), + RedisConfigRef::MultiWrite { configs } => configs + .into_iter() + .filter_map(|c| Self::compute_max_connections(c)) + .max(), + RedisConfigRef::Single { options, .. } => Some(options.max_connections), + } + } + + pub async fn fetch( + self, + project_key: ProjectKey, + no_cache: bool, + cached_state: ProjectFetchState, + ) -> Result { + let state_opt = self + .local_source + .send(FetchOptionalProjectState { project_key }) + .await?; + + if let Some(state) = state_opt { + return Ok(ProjectFetchState::new(state)); + } + + match self.config.relay_mode() { + RelayMode::Proxy => return Ok(ProjectFetchState::allowed()), + RelayMode::Static => return Ok(ProjectFetchState::disabled()), + RelayMode::Capture => return Ok(ProjectFetchState::allowed()), + RelayMode::Managed => (), // Proceed with loading the config from redis or upstream + } + + let current_revision = cached_state.revision().map(String::from); + #[cfg(feature = "processing")] + if let Some(redis_source) = self.redis_source { + let current_revision = current_revision.clone(); + + let redis_permit = self.redis_semaphore.acquire().await?; + let state_fetch_result = tokio::task::spawn_blocking(move || { + redis_source.get_config_if_changed(project_key, current_revision.as_deref()) + }) + .await?; + drop(redis_permit); + + match state_fetch_result { + // New state fetched from Redis, possibly pending. + Ok(UpstreamProjectState::New(state)) => { + let state = state.sanitized(); + if !state.is_pending() { + return Ok(ProjectFetchState::new(state)); + } + } + // Redis reported that we're holding an up-to-date version of the state already, + // refresh the state and return the old cached state again. + Ok(UpstreamProjectState::NotModified) => { + return Ok(ProjectFetchState::refresh(cached_state)) + } + Err(error) => { + relay_log::error!( + error = &error as &dyn std::error::Error, + "failed to fetch project from Redis", + ); + } + }; + }; + + let state = self + .upstream_source + .send(FetchProjectState { + project_key, + current_revision, + no_cache, + }) + .await?; + + match state { + UpstreamProjectState::New(state) => Ok(ProjectFetchState::new(state.sanitized())), + UpstreamProjectState::NotModified => Ok(ProjectFetchState::refresh(cached_state)), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ProjectSourceError { + #[error("redis permit error {0}")] + RedisPermit(#[from] tokio::sync::AcquireError), + #[error("redis join error {0}")] + RedisJoin(#[from] tokio::task::JoinError), + #[error("upstream error {0}")] + Upstream(#[from] relay_system::SendError), +} + +impl From for ProjectSourceError { + fn from(value: Infallible) -> Self { + match value {} + } +} + +#[derive(Clone, Debug)] +pub struct FetchProjectState { + /// The public key to fetch the project by. + pub project_key: ProjectKey, + + /// Currently cached revision if available. + /// + /// The upstream is allowed to omit full project configs + /// for requests for which the requester already has the most + /// recent revision. + /// + /// Settings this to `None` will essentially always re-fetch + /// the project config. + pub current_revision: Option, + + /// If true, all caches should be skipped and a fresh state should be computed. + pub no_cache: bool, +} + +#[derive(Clone, Debug)] +pub struct FetchOptionalProjectState { + project_key: ProjectKey, +} + +impl FetchOptionalProjectState { + pub fn project_key(&self) -> ProjectKey { + self.project_key + } +} diff --git a/relay-server/src/services/projects/source/upstream.rs b/relay-server/src/services/projects/source/upstream.rs index 960aa4c8823..bcbfd5a279c 100644 --- a/relay-server/src/services/projects/source/upstream.rs +++ b/relay-server/src/services/projects/source/upstream.rs @@ -18,10 +18,10 @@ use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::time::Instant; -use crate::services::projects::cache::FetchProjectState; use crate::services::projects::project::state::UpstreamProjectState; use crate::services::projects::project::ParsedProjectState; use crate::services::projects::project::ProjectState; +use crate::services::projects::source::FetchProjectState; use crate::services::upstream::{ Method, RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay, UpstreamRequestError, }; diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index deaed0bc572..644beb882f0 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -181,6 +181,11 @@ pub enum RelayHistograms { /// Number of envelopes in the backpressure buffer between the envelope buffer /// and the project cache. BufferBackpressureEnvelopesCount, + /// The amount of bytes in the item payloads of an envelope pushed to the envelope buffer. + /// + /// This is not quite the same as the actual size of a serialized envelope, because it ignores + /// the envelope header and item headers. + BufferEnvelopeBodySize, /// The number of batches emitted per partition. BatchesPerPartition, /// The number of buckets in a batch emitted. @@ -309,6 +314,7 @@ impl HistogramMetric for RelayHistograms { RelayHistograms::BufferBackpressureEnvelopesCount => { "buffer.backpressure_envelopes_count" } + RelayHistograms::BufferEnvelopeBodySize => "buffer.envelope_body_size", RelayHistograms::ProjectStatePending => "project_state.pending", RelayHistograms::ProjectStateAttempts => "project_state.attempts", RelayHistograms::ProjectStateRequestBatchSize => "project_state.request.batch_size",