From c2bf66a3ee985f74d1e186b4cd1d8d834a9314eb Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 18 Oct 2024 12:48:38 +0200 Subject: [PATCH] move source out --- relay-server/src/services/projects/cache.rs | 227 +----------------- .../src/services/projects/cache2/service.rs | 2 +- .../src/services/projects/source/mod.rs | 213 +++++++++++++++- 3 files changed, 219 insertions(+), 223 deletions(-) diff --git a/relay-server/src/services/projects/cache.rs b/relay-server/src/services/projects/cache.rs index 97869afcfa1..42d0a08dea6 100644 --- a/relay-server/src/services/projects/cache.rs +++ b/relay-server/src/services/projects/cache.rs @@ -1,5 +1,4 @@ use std::collections::{BTreeMap, BTreeSet}; -use std::convert::Infallible; use std::error::Error; use std::sync::Arc; use std::time::Duration; @@ -8,34 +7,22 @@ use crate::extractors::RequestMeta; use crate::services::buffer::{EnvelopeBuffer, EnvelopeBufferError}; use crate::services::global_config; use crate::services::processor::{ - EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics, + EncodeMetrics, EnvelopeProcessor, ProcessEnvelope, ProcessingGroup, ProjectMetrics, }; use crate::services::projects::cache2::{CheckedEnvelope, ProjectCacheHandle}; -use crate::services::projects::project::state::UpstreamProjectState; use crate::Envelope; -use chrono::{DateTime, Utc}; use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; -#[cfg(feature = "processing")] -use relay_config::RedisConfigRef; use relay_config::{Config, RelayMode}; use relay_metrics::Bucket; use relay_redis::RedisPool; use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, Service}; -#[cfg(feature = "processing")] -use tokio::sync::Semaphore; use tokio::sync::{mpsc, watch}; use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::projects::project::{ProjectFetchState, ProjectState}; -use crate::services::projects::source::local::{LocalProjectSource, LocalProjectSourceService}; -#[cfg(feature = "processing")] -use crate::services::projects::source::redis::RedisProjectSource; -use crate::services::projects::source::upstream::{ - UpstreamProjectSource, UpstreamProjectSourceService, -}; use crate::services::spooler::{ self, Buffer, BufferService, DequeueMany, Enqueue, QueueKey, RemoveMany, RestoreIndex, UnspooledEnvelope, BATCH_KEY_COUNT, @@ -46,10 +33,6 @@ use crate::services::upstream::UpstreamRelay; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; use crate::utils::{GarbageDisposal, ManagedEnvelope, MemoryChecker, RetryBackoff, SleepHandle}; -/// Default value of maximum connections to Redis. This value was arbitrarily determined. -#[cfg(feature = "processing")] -const DEFAULT_REDIS_MAX_CONNECTIONS: u32 = 10; - /// Validates the envelope against project configuration and rate limits. /// /// This ensures internally that the project state is up to date and then runs the same checks as @@ -197,159 +180,6 @@ impl FromMessage for ProjectCache { } } -/// Helper type that contains all configured sources for project cache fetching. -/// -/// See [`RequestUpdate`] for a description on how project states are fetched. -#[derive(Clone, Debug)] -pub struct ProjectSource { - config: Arc, - local_source: Addr, - upstream_source: Addr, - #[cfg(feature = "processing")] - redis_source: Option, - #[cfg(feature = "processing")] - redis_semaphore: Arc, -} - -impl ProjectSource { - /// Starts all project source services in the current runtime. - pub fn start( - config: Arc, - upstream_relay: Addr, - _redis: Option, - ) -> Self { - let local_source = LocalProjectSourceService::new(config.clone()).start(); - let upstream_source = - UpstreamProjectSourceService::new(config.clone(), upstream_relay).start(); - - #[cfg(feature = "processing")] - let redis_max_connections = config - .redis() - .map(|configs| { - let config = match configs { - relay_config::RedisPoolConfigs::Unified(config) => config, - relay_config::RedisPoolConfigs::Individual { - project_configs: config, - .. - } => config, - }; - Self::compute_max_connections(config).unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS) - }) - .unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS); - #[cfg(feature = "processing")] - let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool)); - - Self { - config, - local_source, - upstream_source, - #[cfg(feature = "processing")] - redis_source, - #[cfg(feature = "processing")] - redis_semaphore: Arc::new(Semaphore::new(redis_max_connections.try_into().unwrap())), - } - } - - #[cfg(feature = "processing")] - fn compute_max_connections(config: RedisConfigRef) -> Option { - match config { - RedisConfigRef::Cluster { options, .. } => Some(options.max_connections), - RedisConfigRef::MultiWrite { configs } => configs - .into_iter() - .filter_map(|c| Self::compute_max_connections(c)) - .max(), - RedisConfigRef::Single { options, .. } => Some(options.max_connections), - } - } - - pub async fn fetch( - self, - project_key: ProjectKey, - no_cache: bool, - cached_state: ProjectFetchState, - ) -> Result { - let state_opt = self - .local_source - .send(FetchOptionalProjectState { project_key }) - .await?; - - if let Some(state) = state_opt { - return Ok(ProjectFetchState::new(state)); - } - - match self.config.relay_mode() { - RelayMode::Proxy => return Ok(ProjectFetchState::allowed()), - RelayMode::Static => return Ok(ProjectFetchState::disabled()), - RelayMode::Capture => return Ok(ProjectFetchState::allowed()), - RelayMode::Managed => (), // Proceed with loading the config from redis or upstream - } - - let current_revision = cached_state.revision().map(String::from); - #[cfg(feature = "processing")] - if let Some(redis_source) = self.redis_source { - let current_revision = current_revision.clone(); - - let redis_permit = self.redis_semaphore.acquire().await?; - let state_fetch_result = tokio::task::spawn_blocking(move || { - redis_source.get_config_if_changed(project_key, current_revision.as_deref()) - }) - .await?; - drop(redis_permit); - - match state_fetch_result { - // New state fetched from Redis, possibly pending. - Ok(UpstreamProjectState::New(state)) => { - let state = state.sanitized(); - if !state.is_pending() { - return Ok(ProjectFetchState::new(state)); - } - } - // Redis reported that we're holding an up-to-date version of the state already, - // refresh the state and return the old cached state again. - Ok(UpstreamProjectState::NotModified) => { - return Ok(ProjectFetchState::refresh(cached_state)) - } - Err(error) => { - relay_log::error!( - error = &error as &dyn Error, - "failed to fetch project from Redis", - ); - } - }; - }; - - let state = self - .upstream_source - .send(FetchProjectState { - project_key, - current_revision, - no_cache, - }) - .await?; - - match state { - UpstreamProjectState::New(state) => Ok(ProjectFetchState::new(state.sanitized())), - UpstreamProjectState::NotModified => Ok(ProjectFetchState::refresh(cached_state)), - } - } -} - -#[derive(Debug, thiserror::Error)] -enum ProjectSourceError { - #[error("redis permit error {0}")] - RedisPermit(#[from] tokio::sync::AcquireError), - #[error("redis join error {0}")] - RedisJoin(#[from] tokio::task::JoinError), - #[error("upstream error {0}")] - Upstream(#[from] relay_system::SendError), -} - -impl From for ProjectSourceError { - fn from(value: Infallible) -> Self { - match value {} - } -} - /// Updates the cache with new project state information. struct UpdateProjectState { /// The public key to fetch the project by. @@ -388,8 +218,6 @@ struct ProjectCacheBroker { projects: ProjectCacheHandle, /// Utility for disposing of expired project data in a background thread. garbage_disposal: GarbageDisposal, - /// Source for fetching project states from the upstream or from disk. - source: ProjectSource, /// Handle to schedule periodic unspooling of buffered envelopes (spool V1). spool_v1_unspool_handle: SleepHandle, @@ -967,18 +795,11 @@ impl Service for ProjectCacheService { }), }; - // Main broker that serializes public and internal messages, and triggers project state - // fetches via the project source. let mut broker = ProjectCacheBroker { config: config.clone(), memory_checker, projects: todo!(), garbage_disposal: GarbageDisposal::new(), - source: ProjectSource::start( - config.clone(), - services.upstream_relay.clone(), - redis, - ), services, spool_v1_unspool_handle: SleepHandle::idle(), spool_v1, @@ -1030,36 +851,6 @@ impl Service for ProjectCacheService { } } -#[derive(Clone, Debug)] -pub struct FetchProjectState { - /// The public key to fetch the project by. - pub project_key: ProjectKey, - - /// Currently cached revision if available. - /// - /// The upstream is allowed to omit full project configs - /// for requests for which the requester already has the most - /// recent revision. - /// - /// Settings this to `None` will essentially always re-fetch - /// the project config. - pub current_revision: Option, - - /// If true, all caches should be skipped and a fresh state should be computed. - pub no_cache: bool, -} - -#[derive(Clone, Debug)] -pub struct FetchOptionalProjectState { - project_key: ProjectKey, -} - -impl FetchOptionalProjectState { - pub fn project_key(&self) -> ProjectKey { - self.project_key - } -} - /// Sum type for all objects which need to be discareded through the [`GarbageDisposal`]. #[derive(Debug)] #[allow(dead_code)] // Fields are never read, only used for discarding/dropping data. @@ -1113,7 +904,6 @@ mod tests { async fn project_cache_broker_setup( services: Services, - state_tx: mpsc::UnboundedSender, buffer_tx: mpsc::UnboundedSender, ) -> (ProjectCacheBroker, Addr) { let config: Arc<_> = Config::from_json_value(serde_json::json!({ @@ -1154,9 +944,7 @@ mod tests { memory_checker, projects: todo!(), garbage_disposal: GarbageDisposal::new(), - source: ProjectSource::start(config, services.upstream_relay.clone(), None), services, - state_tx, spool_v1_unspool_handle: SleepHandle::idle(), spool_v1: Some(SpoolV1 { buffer_tx, @@ -1175,10 +963,9 @@ mod tests { relay_log::init_test!(); let services = mocked_services(); - let (state_tx, _) = mpsc::unbounded_channel(); let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel(); let (mut broker, _buffer_svc) = - project_cache_broker_setup(services.clone(), state_tx, buffer_tx).await; + project_cache_broker_setup(services.clone(), buffer_tx).await; broker.global_config = GlobalConfigStatus::Ready; let (tx_update, mut rx_update) = mpsc::unbounded_channel(); @@ -1252,10 +1039,9 @@ mod tests { #[tokio::test] async fn handle_processing_without_project() { let services = mocked_services(); - let (state_tx, _) = mpsc::unbounded_channel(); let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel(); let (mut broker, buffer_svc) = - project_cache_broker_setup(services.clone(), state_tx, buffer_tx.clone()).await; + project_cache_broker_setup(services.clone(), buffer_tx.clone()).await; let dsn = "111d836b15bb49d7bbf99e64295d995b"; let project_key = ProjectKey::parse(dsn).unwrap(); @@ -1271,7 +1057,6 @@ mod tests { ); // Index and projects are empty. - assert!(broker.projects.is_empty()); assert!(broker.spool_v1.as_mut().unwrap().index.is_empty()); // Since there is no project we should not process anything but create a project and spool @@ -1279,7 +1064,11 @@ mod tests { broker.handle_processing(key, envelope); // Assert that we have a new project and also added an index. - assert!(broker.projects.get(&project_key).is_some()); + assert!(!broker + .projects + .get(project_key) + .project_state() + .is_pending()); assert!(broker.spool_v1.as_mut().unwrap().index.contains(&key)); // Check is we actually spooled anything. diff --git a/relay-server/src/services/projects/cache2/service.rs b/relay-server/src/services/projects/cache2/service.rs index 74be9e9952e..208622ed8c6 100644 --- a/relay-server/src/services/projects/cache2/service.rs +++ b/relay-server/src/services/projects/cache2/service.rs @@ -5,9 +5,9 @@ use relay_config::Config; use tokio::sync::mpsc; use crate::services::buffer::EnvelopeBuffer; -use crate::services::projects::cache::ProjectSource; use crate::services::projects::cache2::state::{CompletedFetch, Fetch}; use crate::services::projects::project::{ProjectFetchState, ProjectState}; +use crate::services::projects::source::ProjectSource; pub enum ProjectCache { Fetch(ProjectKey), diff --git a/relay-server/src/services/projects/source/mod.rs b/relay-server/src/services/projects/source/mod.rs index 80f6a34d782..59f6ee4d99a 100644 --- a/relay-server/src/services/projects/source/mod.rs +++ b/relay-server/src/services/projects/source/mod.rs @@ -1,4 +1,211 @@ -pub mod local; +use std::convert::Infallible; +use std::sync::Arc; + +use relay_base_schema::project::ProjectKey; #[cfg(feature = "processing")] -pub mod redis; -pub mod upstream; +use relay_config::RedisConfigRef; +use relay_config::{Config, RelayMode}; +use relay_redis::RedisPool; +use relay_system::Addr; +use tokio::sync::Semaphore; + +mod local; +#[cfg(feature = "processing")] +mod redis; +mod upstream; + +use crate::services::projects::project::state::UpstreamProjectState; +use crate::services::projects::project::ProjectFetchState; +use crate::services::upstream::UpstreamRelay; + +use self::local::{LocalProjectSource, LocalProjectSourceService}; +#[cfg(feature = "processing")] +use self::redis::RedisProjectSource; +use self::upstream::{UpstreamProjectSource, UpstreamProjectSourceService}; + +/// Default value of maximum connections to Redis. This value was arbitrarily determined. +#[cfg(feature = "processing")] +const DEFAULT_REDIS_MAX_CONNECTIONS: u32 = 10; + +/// Helper type that contains all configured sources for project cache fetching. +/// +/// See [`RequestUpdate`] for a description on how project states are fetched. +#[derive(Clone, Debug)] +pub struct ProjectSource { + config: Arc, + local_source: Addr, + upstream_source: Addr, + #[cfg(feature = "processing")] + redis_source: Option, + #[cfg(feature = "processing")] + redis_semaphore: Arc, +} + +impl ProjectSource { + /// Starts all project source services in the current runtime. + pub fn start( + config: Arc, + upstream_relay: Addr, + _redis: Option, + ) -> Self { + let local_source = LocalProjectSourceService::new(config.clone()).start(); + let upstream_source = + UpstreamProjectSourceService::new(config.clone(), upstream_relay).start(); + + #[cfg(feature = "processing")] + let redis_max_connections = config + .redis() + .map(|configs| { + let config = match configs { + relay_config::RedisPoolConfigs::Unified(config) => config, + relay_config::RedisPoolConfigs::Individual { + project_configs: config, + .. + } => config, + }; + Self::compute_max_connections(config).unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS) + }) + .unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS); + #[cfg(feature = "processing")] + let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool)); + + Self { + config, + local_source, + upstream_source, + #[cfg(feature = "processing")] + redis_source, + #[cfg(feature = "processing")] + redis_semaphore: Arc::new(Semaphore::new(redis_max_connections.try_into().unwrap())), + } + } + + #[cfg(feature = "processing")] + fn compute_max_connections(config: RedisConfigRef) -> Option { + match config { + RedisConfigRef::Cluster { options, .. } => Some(options.max_connections), + RedisConfigRef::MultiWrite { configs } => configs + .into_iter() + .filter_map(|c| Self::compute_max_connections(c)) + .max(), + RedisConfigRef::Single { options, .. } => Some(options.max_connections), + } + } + + pub async fn fetch( + self, + project_key: ProjectKey, + no_cache: bool, + cached_state: ProjectFetchState, + ) -> Result { + let state_opt = self + .local_source + .send(FetchOptionalProjectState { project_key }) + .await?; + + if let Some(state) = state_opt { + return Ok(ProjectFetchState::new(state)); + } + + match self.config.relay_mode() { + RelayMode::Proxy => return Ok(ProjectFetchState::allowed()), + RelayMode::Static => return Ok(ProjectFetchState::disabled()), + RelayMode::Capture => return Ok(ProjectFetchState::allowed()), + RelayMode::Managed => (), // Proceed with loading the config from redis or upstream + } + + let current_revision = cached_state.revision().map(String::from); + #[cfg(feature = "processing")] + if let Some(redis_source) = self.redis_source { + let current_revision = current_revision.clone(); + + let redis_permit = self.redis_semaphore.acquire().await?; + let state_fetch_result = tokio::task::spawn_blocking(move || { + redis_source.get_config_if_changed(project_key, current_revision.as_deref()) + }) + .await?; + drop(redis_permit); + + match state_fetch_result { + // New state fetched from Redis, possibly pending. + Ok(UpstreamProjectState::New(state)) => { + let state = state.sanitized(); + if !state.is_pending() { + return Ok(ProjectFetchState::new(state)); + } + } + // Redis reported that we're holding an up-to-date version of the state already, + // refresh the state and return the old cached state again. + Ok(UpstreamProjectState::NotModified) => { + return Ok(ProjectFetchState::refresh(cached_state)) + } + Err(error) => { + relay_log::error!( + error = &error as &dyn std::error::Error, + "failed to fetch project from Redis", + ); + } + }; + }; + + let state = self + .upstream_source + .send(FetchProjectState { + project_key, + current_revision, + no_cache, + }) + .await?; + + match state { + UpstreamProjectState::New(state) => Ok(ProjectFetchState::new(state.sanitized())), + UpstreamProjectState::NotModified => Ok(ProjectFetchState::refresh(cached_state)), + } + } +} + +#[derive(Debug, thiserror::Error)] +enum ProjectSourceError { + #[error("redis permit error {0}")] + RedisPermit(#[from] tokio::sync::AcquireError), + #[error("redis join error {0}")] + RedisJoin(#[from] tokio::task::JoinError), + #[error("upstream error {0}")] + Upstream(#[from] relay_system::SendError), +} + +impl From for ProjectSourceError { + fn from(value: Infallible) -> Self { + match value {} + } +} + +#[derive(Clone, Debug)] +pub struct FetchProjectState { + /// The public key to fetch the project by. + pub project_key: ProjectKey, + + /// Currently cached revision if available. + /// + /// The upstream is allowed to omit full project configs + /// for requests for which the requester already has the most + /// recent revision. + /// + /// Settings this to `None` will essentially always re-fetch + /// the project config. + pub current_revision: Option, + + /// If true, all caches should be skipped and a fresh state should be computed. + pub no_cache: bool, +} + +#[derive(Clone, Debug)] +pub struct FetchOptionalProjectState { + project_key: ProjectKey, +} + +impl FetchOptionalProjectState { + pub fn project_key(&self) -> ProjectKey { + self.project_key + } +}