From 0ae6b29aee5d39e3821043904d41fbb343a86f6c Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 18 Oct 2024 12:54:13 +0200 Subject: [PATCH] ref(server): Move project source into the source module --- relay-server/src/services/projects/cache.rs | 202 +---------------- .../src/services/projects/source/local.rs | 2 +- .../src/services/projects/source/mod.rs | 205 ++++++++++++++++++ .../src/services/projects/source/upstream.rs | 2 +- 4 files changed, 209 insertions(+), 202 deletions(-) diff --git a/relay-server/src/services/projects/cache.rs b/relay-server/src/services/projects/cache.rs index 6bab22f4934..dc29f20bc23 100644 --- a/relay-server/src/services/projects/cache.rs +++ b/relay-server/src/services/projects/cache.rs @@ -1,5 +1,4 @@ use std::collections::{BTreeMap, BTreeSet}; -use std::convert::Infallible; use std::error::Error; use std::sync::Arc; use std::time::Duration; @@ -10,33 +9,23 @@ use crate::services::global_config; use crate::services::processor::{ EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics, }; -use crate::services::projects::project::state::UpstreamProjectState; use crate::Envelope; use chrono::{DateTime, Utc}; use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; -#[cfg(feature = "processing")] -use relay_config::RedisConfigRef; -use relay_config::{Config, RelayMode}; +use relay_config::Config; use relay_metrics::{Bucket, MetricMeta}; use relay_quotas::RateLimits; use relay_redis::RedisPool; use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, Sender, Service}; -#[cfg(feature = "processing")] -use tokio::sync::Semaphore; use tokio::sync::{mpsc, watch}; use tokio::time::Instant; use crate::services::metrics::{Aggregator, FlushBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::projects::project::{Project, ProjectFetchState, ProjectSender, ProjectState}; -use crate::services::projects::source::local::{LocalProjectSource, LocalProjectSourceService}; -#[cfg(feature = "processing")] -use crate::services::projects::source::redis::RedisProjectSource; -use crate::services::projects::source::upstream::{ - UpstreamProjectSource, UpstreamProjectSourceService, -}; +use crate::services::projects::source::ProjectSource; use crate::services::spooler::{ self, Buffer, BufferService, DequeueMany, Enqueue, QueueKey, RemoveMany, RestoreIndex, UnspooledEnvelope, BATCH_KEY_COUNT, @@ -47,10 +36,6 @@ use crate::services::upstream::UpstreamRelay; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; use crate::utils::{GarbageDisposal, ManagedEnvelope, MemoryChecker, RetryBackoff, SleepHandle}; -/// Default value of maximum connections to Redis. This value was arbitrarily determined. -#[cfg(feature = "processing")] -const DEFAULT_REDIS_MAX_CONNECTIONS: u32 = 10; - /// Requests a refresh of a project state from one of the available sources. /// /// The project state is resolved in the following precedence: @@ -424,159 +409,6 @@ impl FromMessage for ProjectCache { } } -/// Helper type that contains all configured sources for project cache fetching. -/// -/// See [`RequestUpdate`] for a description on how project states are fetched. -#[derive(Clone, Debug)] -struct ProjectSource { - config: Arc, - local_source: Addr, - upstream_source: Addr, - #[cfg(feature = "processing")] - redis_source: Option, - #[cfg(feature = "processing")] - redis_semaphore: Arc, -} - -impl ProjectSource { - /// Starts all project source services in the current runtime. - pub fn start( - config: Arc, - upstream_relay: Addr, - _redis: Option, - ) -> Self { - let local_source = LocalProjectSourceService::new(config.clone()).start(); - let upstream_source = - UpstreamProjectSourceService::new(config.clone(), upstream_relay).start(); - - #[cfg(feature = "processing")] - let redis_max_connections = config - .redis() - .map(|configs| { - let config = match configs { - relay_config::RedisPoolConfigs::Unified(config) => config, - relay_config::RedisPoolConfigs::Individual { - project_configs: config, - .. - } => config, - }; - Self::compute_max_connections(config).unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS) - }) - .unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS); - #[cfg(feature = "processing")] - let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool)); - - Self { - config, - local_source, - upstream_source, - #[cfg(feature = "processing")] - redis_source, - #[cfg(feature = "processing")] - redis_semaphore: Arc::new(Semaphore::new(redis_max_connections.try_into().unwrap())), - } - } - - #[cfg(feature = "processing")] - fn compute_max_connections(config: RedisConfigRef) -> Option { - match config { - RedisConfigRef::Cluster { options, .. } => Some(options.max_connections), - RedisConfigRef::MultiWrite { configs } => configs - .into_iter() - .filter_map(|c| Self::compute_max_connections(c)) - .max(), - RedisConfigRef::Single { options, .. } => Some(options.max_connections), - } - } - - async fn fetch( - self, - project_key: ProjectKey, - no_cache: bool, - cached_state: ProjectFetchState, - ) -> Result { - let state_opt = self - .local_source - .send(FetchOptionalProjectState { project_key }) - .await?; - - if let Some(state) = state_opt { - return Ok(ProjectFetchState::new(state)); - } - - match self.config.relay_mode() { - RelayMode::Proxy => return Ok(ProjectFetchState::allowed()), - RelayMode::Static => return Ok(ProjectFetchState::disabled()), - RelayMode::Capture => return Ok(ProjectFetchState::allowed()), - RelayMode::Managed => (), // Proceed with loading the config from redis or upstream - } - - let current_revision = cached_state.revision().map(String::from); - #[cfg(feature = "processing")] - if let Some(redis_source) = self.redis_source { - let current_revision = current_revision.clone(); - - let redis_permit = self.redis_semaphore.acquire().await?; - let state_fetch_result = tokio::task::spawn_blocking(move || { - redis_source.get_config_if_changed(project_key, current_revision.as_deref()) - }) - .await?; - drop(redis_permit); - - match state_fetch_result { - // New state fetched from Redis, possibly pending. - Ok(UpstreamProjectState::New(state)) => { - let state = state.sanitized(); - if !state.is_pending() { - return Ok(ProjectFetchState::new(state)); - } - } - // Redis reported that we're holding an up-to-date version of the state already, - // refresh the state and return the old cached state again. - Ok(UpstreamProjectState::NotModified) => { - return Ok(ProjectFetchState::refresh(cached_state)) - } - Err(error) => { - relay_log::error!( - error = &error as &dyn Error, - "failed to fetch project from Redis", - ); - } - }; - }; - - let state = self - .upstream_source - .send(FetchProjectState { - project_key, - current_revision, - no_cache, - }) - .await?; - - match state { - UpstreamProjectState::New(state) => Ok(ProjectFetchState::new(state.sanitized())), - UpstreamProjectState::NotModified => Ok(ProjectFetchState::refresh(cached_state)), - } - } -} - -#[derive(Debug, thiserror::Error)] -enum ProjectSourceError { - #[error("redis permit error {0}")] - RedisPermit(#[from] tokio::sync::AcquireError), - #[error("redis join error {0}")] - RedisJoin(#[from] tokio::task::JoinError), - #[error("upstream error {0}")] - Upstream(#[from] relay_system::SendError), -} - -impl From for ProjectSourceError { - fn from(value: Infallible) -> Self { - match value {} - } -} - /// Updates the cache with new project state information. struct UpdateProjectState { /// The public key to fetch the project by. @@ -1528,36 +1360,6 @@ impl Service for ProjectCacheService { } } -#[derive(Clone, Debug)] -pub struct FetchProjectState { - /// The public key to fetch the project by. - pub project_key: ProjectKey, - - /// Currently cached revision if available. - /// - /// The upstream is allowed to omit full project configs - /// for requests for which the requester already has the most - /// recent revision. - /// - /// Settings this to `None` will essentially always re-fetch - /// the project config. - pub current_revision: Option, - - /// If true, all caches should be skipped and a fresh state should be computed. - pub no_cache: bool, -} - -#[derive(Clone, Debug)] -pub struct FetchOptionalProjectState { - project_key: ProjectKey, -} - -impl FetchOptionalProjectState { - pub fn project_key(&self) -> ProjectKey { - self.project_key - } -} - /// Sum type for all objects which need to be discareded through the [`GarbageDisposal`]. #[derive(Debug)] #[allow(dead_code)] // Fields are never read, only used for discarding/dropping data. diff --git a/relay-server/src/services/projects/source/local.rs b/relay-server/src/services/projects/source/local.rs index 94f3a50835b..9bffd851ef0 100644 --- a/relay-server/src/services/projects/source/local.rs +++ b/relay-server/src/services/projects/source/local.rs @@ -9,8 +9,8 @@ use relay_system::{AsyncResponse, FromMessage, Interface, Receiver, Sender, Serv use tokio::sync::mpsc; use tokio::time::Instant; -use crate::services::projects::cache::FetchOptionalProjectState; use crate::services::projects::project::{ParsedProjectState, ProjectState}; +use crate::services::projects::source::FetchOptionalProjectState; /// Service interface of the local project source. #[derive(Debug)] diff --git a/relay-server/src/services/projects/source/mod.rs b/relay-server/src/services/projects/source/mod.rs index 80f6a34d782..e5d02e88ed2 100644 --- a/relay-server/src/services/projects/source/mod.rs +++ b/relay-server/src/services/projects/source/mod.rs @@ -1,4 +1,209 @@ +use std::convert::Infallible; +use std::sync::Arc; + +use relay_base_schema::project::ProjectKey; +#[cfg(feature = "processing")] +use relay_config::RedisConfigRef; +use relay_config::{Config, RelayMode}; +use relay_redis::RedisPool; +use relay_system::{Addr, Service as _}; +use tokio::sync::Semaphore; + pub mod local; #[cfg(feature = "processing")] pub mod redis; pub mod upstream; + +use crate::services::projects::project::state::UpstreamProjectState; +use crate::services::projects::project::ProjectFetchState; +use crate::services::upstream::UpstreamRelay; + +use self::local::{LocalProjectSource, LocalProjectSourceService}; +#[cfg(feature = "processing")] +use self::redis::RedisProjectSource; +use self::upstream::{UpstreamProjectSource, UpstreamProjectSourceService}; + +/// Default value of maximum connections to Redis. This value was arbitrarily determined. +#[cfg(feature = "processing")] +const DEFAULT_REDIS_MAX_CONNECTIONS: u32 = 10; + +/// Helper type that contains all configured sources for project cache fetching. +#[derive(Clone, Debug)] +pub struct ProjectSource { + config: Arc, + local_source: Addr, + upstream_source: Addr, + #[cfg(feature = "processing")] + redis_source: Option, + #[cfg(feature = "processing")] + redis_semaphore: Arc, +} + +impl ProjectSource { + /// Starts all project source services in the current runtime. + pub fn start( + config: Arc, + upstream_relay: Addr, + _redis: Option, + ) -> Self { + let local_source = LocalProjectSourceService::new(config.clone()).start(); + let upstream_source = + UpstreamProjectSourceService::new(config.clone(), upstream_relay).start(); + + #[cfg(feature = "processing")] + let redis_max_connections = config + .redis() + .map(|configs| { + let config = match configs { + relay_config::RedisPoolConfigs::Unified(config) => config, + relay_config::RedisPoolConfigs::Individual { + project_configs: config, + .. + } => config, + }; + Self::compute_max_connections(config).unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS) + }) + .unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS); + #[cfg(feature = "processing")] + let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool)); + + Self { + config, + local_source, + upstream_source, + #[cfg(feature = "processing")] + redis_source, + #[cfg(feature = "processing")] + redis_semaphore: Arc::new(Semaphore::new(redis_max_connections.try_into().unwrap())), + } + } + + #[cfg(feature = "processing")] + fn compute_max_connections(config: RedisConfigRef) -> Option { + match config { + RedisConfigRef::Cluster { options, .. } => Some(options.max_connections), + RedisConfigRef::MultiWrite { configs } => configs + .into_iter() + .filter_map(|c| Self::compute_max_connections(c)) + .max(), + RedisConfigRef::Single { options, .. } => Some(options.max_connections), + } + } + + pub async fn fetch( + self, + project_key: ProjectKey, + no_cache: bool, + cached_state: ProjectFetchState, + ) -> Result { + let state_opt = self + .local_source + .send(FetchOptionalProjectState { project_key }) + .await?; + + if let Some(state) = state_opt { + return Ok(ProjectFetchState::new(state)); + } + + match self.config.relay_mode() { + RelayMode::Proxy => return Ok(ProjectFetchState::allowed()), + RelayMode::Static => return Ok(ProjectFetchState::disabled()), + RelayMode::Capture => return Ok(ProjectFetchState::allowed()), + RelayMode::Managed => (), // Proceed with loading the config from redis or upstream + } + + let current_revision = cached_state.revision().map(String::from); + #[cfg(feature = "processing")] + if let Some(redis_source) = self.redis_source { + let current_revision = current_revision.clone(); + + let redis_permit = self.redis_semaphore.acquire().await?; + let state_fetch_result = tokio::task::spawn_blocking(move || { + redis_source.get_config_if_changed(project_key, current_revision.as_deref()) + }) + .await?; + drop(redis_permit); + + match state_fetch_result { + // New state fetched from Redis, possibly pending. + Ok(UpstreamProjectState::New(state)) => { + let state = state.sanitized(); + if !state.is_pending() { + return Ok(ProjectFetchState::new(state)); + } + } + // Redis reported that we're holding an up-to-date version of the state already, + // refresh the state and return the old cached state again. + Ok(UpstreamProjectState::NotModified) => { + return Ok(ProjectFetchState::refresh(cached_state)) + } + Err(error) => { + relay_log::error!( + error = &error as &dyn std::error::Error, + "failed to fetch project from Redis", + ); + } + }; + }; + + let state = self + .upstream_source + .send(FetchProjectState { + project_key, + current_revision, + no_cache, + }) + .await?; + + match state { + UpstreamProjectState::New(state) => Ok(ProjectFetchState::new(state.sanitized())), + UpstreamProjectState::NotModified => Ok(ProjectFetchState::refresh(cached_state)), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ProjectSourceError { + #[error("redis permit error {0}")] + RedisPermit(#[from] tokio::sync::AcquireError), + #[error("redis join error {0}")] + RedisJoin(#[from] tokio::task::JoinError), + #[error("upstream error {0}")] + Upstream(#[from] relay_system::SendError), +} + +impl From for ProjectSourceError { + fn from(value: Infallible) -> Self { + match value {} + } +} + +#[derive(Clone, Debug)] +pub struct FetchProjectState { + /// The public key to fetch the project by. + pub project_key: ProjectKey, + + /// Currently cached revision if available. + /// + /// The upstream is allowed to omit full project configs + /// for requests for which the requester already has the most + /// recent revision. + /// + /// Settings this to `None` will essentially always re-fetch + /// the project config. + pub current_revision: Option, + + /// If true, all caches should be skipped and a fresh state should be computed. + pub no_cache: bool, +} + +#[derive(Clone, Debug)] +pub struct FetchOptionalProjectState { + project_key: ProjectKey, +} + +impl FetchOptionalProjectState { + pub fn project_key(&self) -> ProjectKey { + self.project_key + } +} diff --git a/relay-server/src/services/projects/source/upstream.rs b/relay-server/src/services/projects/source/upstream.rs index 960aa4c8823..bcbfd5a279c 100644 --- a/relay-server/src/services/projects/source/upstream.rs +++ b/relay-server/src/services/projects/source/upstream.rs @@ -18,10 +18,10 @@ use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::time::Instant; -use crate::services::projects::cache::FetchProjectState; use crate::services::projects::project::state::UpstreamProjectState; use crate::services::projects::project::ParsedProjectState; use crate::services::projects::project::ProjectState; +use crate::services::projects::source::FetchProjectState; use crate::services::upstream::{ Method, RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay, UpstreamRequestError, };