From c5e9e80022e4f965542e1750a914f9572455ccbd Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 16 Oct 2024 15:17:19 +0200 Subject: [PATCH] after rebase for project source move --- Cargo.lock | 85 ++ Cargo.toml | 1 + relay-dynamic-config/src/error_boundary.rs | 8 + relay-quotas/Cargo.toml | 1 + relay-quotas/src/rate_limit.rs | 49 +- relay-server/Cargo.toml | 1 + relay-server/src/endpoints/common.rs | 13 +- relay-server/src/endpoints/project_configs.rs | 30 +- relay-server/src/service.rs | 21 +- relay-server/src/services/buffer/mod.rs | 15 +- relay-server/src/services/processor.rs | 107 ++- relay-server/src/services/projects/cache.rs | 612 ++------------ .../src/services/projects/cache2/handle.rs | 52 ++ .../src/services/projects/cache2/mod.rs | 8 + .../src/services/projects/cache2/project.rs | 376 +++++++++ .../src/services/projects/cache2/service.rs | 136 ++++ .../src/services/projects/cache2/state.rs | 437 ++++++++++ relay-server/src/services/projects/mod.rs | 1 + .../src/services/projects/project/mod.rs | 747 ------------------ .../projects/project/state/fetch_state.rs | 68 +- .../services/projects/project/state/mod.rs | 3 +- 21 files changed, 1302 insertions(+), 1469 deletions(-) create mode 100644 relay-server/src/services/projects/cache2/handle.rs create mode 100644 relay-server/src/services/projects/cache2/mod.rs create mode 100644 relay-server/src/services/projects/cache2/project.rs create mode 100644 relay-server/src/services/projects/cache2/service.rs create mode 100644 relay-server/src/services/projects/cache2/state.rs diff --git a/Cargo.lock b/Cargo.lock index b5703e7f5c2..79da6b4549e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -217,6 +217,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-wait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a55b94919229f2c42292fd71ffa4b75e83193bffdd77b1e858cd55fd2d0b0ea8" +dependencies = [ + "libc", + "windows-sys 0.42.0", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -2680,6 +2690,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "papaya" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "674d893d27388726e4020eaa60257b3df760392003c14f46455e3277af72cd04" +dependencies = [ + "atomic-wait", + "seize", +] + [[package]] name = "parking" version = "2.2.0" @@ -3649,6 +3669,7 @@ dependencies = [ name = "relay-quotas" version = "24.10.0" dependencies = [ + "arc-swap", "hashbrown", "insta", "itertools 0.13.0", @@ -3741,6 +3762,7 @@ dependencies = [ "minidump", "multer", "once_cell", + "papaya", "pin-project-lite", "priority-queue", "rand", @@ -4154,6 +4176,12 @@ dependencies = [ "libc", ] +[[package]] +name = "seize" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d659fa6f19e82a52ab8d3fff3c380bd8cc16462eaea411395618a38760eb85bc" + [[package]] name = "semver" version = "1.0.23" @@ -5769,6 +5797,21 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -5827,6 +5870,12 @@ dependencies = [ "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -5839,6 +5888,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -5851,6 +5906,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -5869,6 +5930,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -5881,6 +5948,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -5893,6 +5966,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -5905,6 +5984,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" diff --git a/Cargo.toml b/Cargo.toml index 2865dccce0b..3fd9e67567c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,6 +119,7 @@ num-traits = "0.2.18" num_cpus = "1.13.0" once_cell = "1.13.1" opentelemetry-proto = { version = "0.7.0", default-features = false } +papaya = "0.1.4" parking_lot = "0.12.1" path-slash = "0.2.1" pest = "2.1.3" diff --git a/relay-dynamic-config/src/error_boundary.rs b/relay-dynamic-config/src/error_boundary.rs index c80d2ab8b7e..dc4948a3b00 100644 --- a/relay-dynamic-config/src/error_boundary.rs +++ b/relay-dynamic-config/src/error_boundary.rs @@ -42,6 +42,14 @@ impl ErrorBoundary { } } + /// Converts from `ErrorBoundary` to `ErrorBoundary<&T>`. + pub fn as_ref(&self) -> ErrorBoundary<&T> { + match self { + Self::Ok(t) => ErrorBoundary::Ok(t), + Self::Err(e) => ErrorBoundary::Err(Arc::clone(e)), + } + } + /// Returns the contained [`Ok`] value or computes it from a closure. #[inline] pub fn unwrap_or_else(self, op: F) -> T diff --git a/relay-quotas/Cargo.toml b/relay-quotas/Cargo.toml index 4ac6cdb070e..f5a3d03d210 100644 --- a/relay-quotas/Cargo.toml +++ b/relay-quotas/Cargo.toml @@ -17,6 +17,7 @@ redis = ["dep:thiserror", "dep:relay-log", "relay-redis/impl"] workspace = true [dependencies] +arc-swap = { workspace = true } hashbrown = { workspace = true } relay-base-schema = { workspace = true } relay-common = { workspace = true } diff --git a/relay-quotas/src/rate_limit.rs b/relay-quotas/src/rate_limit.rs index c1162d340ea..0c2a80de543 100644 --- a/relay-quotas/src/rate_limit.rs +++ b/relay-quotas/src/rate_limit.rs @@ -1,7 +1,9 @@ use std::fmt; use std::str::FromStr; +use std::sync::Arc; use std::time::{Duration, Instant}; +use arc_swap::ArcSwap; use relay_base_schema::metrics::MetricNamespace; use relay_base_schema::project::{ProjectId, ProjectKey}; use smallvec::SmallVec; @@ -402,7 +404,7 @@ impl<'a> IntoIterator for &'a RateLimits { /// /// The data structure makes sure no expired rate limits are enforced. #[derive(Debug, Default)] -pub struct CachedRateLimits(RateLimits); +pub struct CachedRateLimits(ArcSwap); impl CachedRateLimits { /// Creates a new, empty instance without any rate limits enforced. @@ -413,25 +415,52 @@ impl CachedRateLimits { /// Adds a limit to this collection. /// /// See also: [`RateLimits::add`]. - pub fn add(&mut self, limit: RateLimit) { - self.0.add(limit); + pub fn add(&self, limit: RateLimit) { + self.0.rcu(|current| { + let mut current = current.as_ref().clone(); + current.add(limit.clone()); + current + }); } /// Merges more rate limits into this instance. /// /// See also: [`RateLimits::merge`]. - pub fn merge(&mut self, rate_limits: RateLimits) { - for limit in rate_limits { - self.add(limit) - } + pub fn merge(&self, limits: RateLimits) { + self.0.rcu(|current| { + let mut current = current.as_ref().clone(); + for limit in limits.clone() { + current.add(limit) + } + current + }); } /// Returns a reference to the contained rate limits. /// /// This call gurantuess that at the time of call no returned rate limit is expired. - pub fn current_limits(&mut self) -> &RateLimits { - self.0.clean_expired(); - &self.0 + pub fn current_limits(&self) -> Arc { + let now = Instant::now(); + + let mut current = self.0.load(); + while current.iter().any(|rl| rl.retry_after.expired_at(now)) { + let new = { + let mut new = current.as_ref().clone(); + new.clean_expired(); + Arc::new(new) + }; + + let prev = self.0.compare_and_swap(&*current, Arc::clone(&new)); + + // If there was a swap, we know `new` is now stored and the most recent value. + if Arc::ptr_eq(¤t, &prev) { + return new; + } + + current = prev; + } + + arc_swap::Guard::into_inner(current) } } diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index dbec10dc958..4ae8cbe9e63 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -53,6 +53,7 @@ mime = { workspace = true } minidump = { workspace = true, optional = true } multer = { workspace = true } once_cell = { workspace = true } +papaya = { workspace = true } pin-project-lite = { workspace = true } priority-queue = { workspace = true } rand = { workspace = true } diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 6368c4fbe74..e7a86e1d0d9 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -12,8 +12,8 @@ use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, I use crate::service::ServiceState; use crate::services::buffer::EnvelopeBuffer; use crate::services::outcome::{DiscardReason, Outcome}; -use crate::services::processor::{MetricData, ProcessingGroup}; -use crate::services::projects::cache::{CheckEnvelope, ProcessMetrics, ValidateEnvelope}; +use crate::services::processor::{MetricData, ProcessMetrics, ProcessingGroup}; +use crate::services::projects::cache::ValidateEnvelope; use crate::statsd::{RelayCounters, RelayHistograms}; use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope}; @@ -274,7 +274,7 @@ fn queue_envelope( if !metric_items.is_empty() { relay_log::trace!("sending metrics into processing queue"); - state.project_cache().send(ProcessMetrics { + state.processor().send(ProcessMetrics { data: MetricData::Raw(metric_items.into_vec()), start_time: envelope.meta().start_time().into(), sent_at: envelope.sent_at(), @@ -367,10 +367,9 @@ pub async fn handle_envelope( } let checked = state - .project_cache() - .send(CheckEnvelope::new(managed_envelope)) - .await - .map_err(|_| BadStoreRequest::ScheduleFailed)? + .project_cache_handle() + .get(managed_envelope.scoping().project_key) + .check_envelope(managed_envelope) .map_err(BadStoreRequest::EventRejected)?; let Some(mut managed_envelope) = checked.envelope else { diff --git a/relay-server/src/endpoints/project_configs.rs b/relay-server/src/endpoints/project_configs.rs index 391687a31f4..50ef7bc0af3 100644 --- a/relay-server/src/endpoints/project_configs.rs +++ b/relay-server/src/endpoints/project_configs.rs @@ -106,8 +106,6 @@ struct GetProjectStatesRequest { #[serde(default)] full_config: bool, #[serde(default)] - no_cache: bool, - #[serde(default)] global: bool, } @@ -139,30 +137,9 @@ fn into_valid_keys( async fn inner( state: ServiceState, - Query(version): Query, body: SignedJson, ) -> Result { let SignedJson { inner, relay } = body; - let project_cache = &state.project_cache().clone(); - - let no_cache = inner.no_cache; - let keys_len = inner.public_keys.len(); - - let mut futures: FuturesUnordered<_> = into_valid_keys(inner.public_keys, inner.revisions) - .map(|(project_key, revision)| async move { - let state_result = if version.version >= ENDPOINT_V3 && !no_cache { - project_cache - .send(GetCachedProjectState::new(project_key)) - .await - } else { - project_cache - .send(GetProjectState::new(project_key).no_cache(no_cache)) - .await - }; - - (project_key, revision, state_result) - }) - .collect(); let (global, global_status) = if inner.global { match state.global_config().send(global_config::Get).await? { @@ -178,12 +155,15 @@ async fn inner( (None, None) }; + let keys_len = inner.public_keys.len(); let mut pending = Vec::with_capacity(keys_len); let mut unchanged = Vec::with_capacity(keys_len); let mut configs = HashMap::with_capacity(keys_len); - while let Some((project_key, revision, state_result)) = futures.next().await { - let project_info = match state_result? { + for (project_key, revision) in into_valid_keys(inner.public_keys, inner.revisions) { + let project = state.project_cache_handle().get(project_key); + + let project_info = match project.project_state() { ProjectState::Enabled(info) => info, ProjectState::Disabled => { // Don't insert project config. Downstream Relay will consider it disabled. diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index f4c8051284d..48f5cbc10d9 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -13,6 +13,7 @@ use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOut use crate::services::outcome_aggregator::OutcomeAggregator; use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService}; use crate::services::projects::cache::{ProjectCache, ProjectCacheService, Services}; +use crate::services::projects::cache2::ProjectCacheHandle; use crate::services::relays::{RelayCache, RelayCacheService}; use crate::services::stats::RelayStats; #[cfg(feature = "processing")] @@ -52,7 +53,7 @@ pub enum ServiceError { Redis, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Registry { pub aggregator: Addr, pub health_check: Addr, @@ -65,18 +66,8 @@ pub struct Registry { pub project_cache: Addr, pub upstream_relay: Addr, pub envelope_buffer: Option, -} -impl fmt::Debug for Registry { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Registry") - .field("aggregator", &self.aggregator) - .field("health_check", &self.health_check) - .field("outcome_producer", &self.outcome_producer) - .field("outcome_aggregator", &self.outcome_aggregator) - .field("processor", &format_args!("Addr")) - .finish() - } + pub project_cache_handle: ProjectCacheHandle, } /// Constructs a tokio [`Runtime`] configured for running [services](relay_system::Service). @@ -144,6 +135,7 @@ fn create_store_pool(config: &Config) -> Result { struct StateInner { config: Arc, memory_checker: MemoryChecker, + registry: Registry, } @@ -357,6 +349,11 @@ impl ServiceState { &self.inner.registry.project_cache } + /// Returns a [`ProjectCacheHandle`]. + pub fn project_cache_handle(&self) -> &ProjectCacheHandle { + &self.inner.registry.project_cache_handle + } + /// Returns the address of the [`RelayCache`] service. pub fn relay_cache(&self) -> &Addr { &self.inner.registry.relay_cache diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 3bcc0c5fea4..10221615b83 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -21,8 +21,10 @@ use crate::services::outcome::DiscardReason; use crate::services::outcome::Outcome; use crate::services::outcome::TrackOutcome; use crate::services::processor::ProcessingGroup; -use crate::services::projects::cache::{DequeuedEnvelope, ProjectCache, UpdateProject}; +use crate::services::projects::cache::DequeuedEnvelope; +use crate::services::projects::cache2::ProjectCacheHandle; +use crate::services::projects::cache2::ProjectEvent; use crate::services::test_store::TestStore; use crate::statsd::{RelayCounters, RelayHistograms}; use crate::utils::ManagedEnvelope; @@ -100,7 +102,7 @@ pub struct Services { /// Bounded channel used exclusively to handle backpressure when sending envelopes to the /// project cache. pub envelopes_tx: mpsc::Sender, - pub project_cache: Addr, + pub project_cache_handle: ProjectCacheHandle, pub outcome_aggregator: Addr, pub test_store: Addr, } @@ -269,12 +271,12 @@ impl EnvelopeBufferService { relay_log::trace!("EnvelopeBufferService: requesting project(s) update"); let own_key = envelope.meta().public_key(); - services.project_cache.send(UpdateProject(own_key)); + services.project_cache_handle.fetch(own_key); match envelope.sampling_key() { None => {} Some(sampling_key) if sampling_key == own_key => {} // already sent. Some(sampling_key) => { - services.project_cache.send(UpdateProject(sampling_key)); + services.project_cache_handle.fetch(sampling_key); } } @@ -398,6 +400,7 @@ impl Service for EnvelopeBufferService { buffer.initialize().await; let mut shutdown = Controller::shutdown_handle(); + let mut project_events = self.services.project_cache_handle.events(); relay_log::info!("EnvelopeBufferService: starting"); loop { @@ -427,6 +430,10 @@ impl Service for EnvelopeBufferService { } } } + ProjectEvent::Ready(project_key) = project_events.recv() => { + Self::handle_message(&mut buffer, EnvelopeBuffer::Ready(project_key)).await; + sleep = Duration::ZERO; + } Some(message) = rx.recv() => { Self::handle_message(&mut buffer, message).await; sleep = Duration::ZERO; diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index f1b2d3e93f5..572c2528173 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -71,9 +71,8 @@ use crate::services::global_config::GlobalConfigHandle; use crate::services::metrics::{Aggregator, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::event::FiltersStatus; -use crate::services::projects::cache::{ - BucketSource, ProcessMetrics, ProjectCache, UpdateRateLimits, -}; +use crate::services::projects::cache::{BucketSource, ProjectCache}; +use crate::services::projects::cache2::ProjectCacheHandle; use crate::services::projects::project::{ProjectInfo, ProjectState}; use crate::services::test_store::{Capture, TestStore}; use crate::services::upstream::{ @@ -875,7 +874,7 @@ pub struct ProcessEnvelope { /// The project info. pub project_info: Arc, /// Currently active cached rate limits for this project. - pub rate_limits: RateLimits, + pub rate_limits: Arc, /// Root sampling project info. pub sampling_project_info: Option>, /// Sampling reservoir counters. @@ -894,16 +893,7 @@ pub struct ProcessEnvelope { /// Additionally, processing applies clock drift correction using the system clock of this Relay, if /// the Envelope specifies the [`sent_at`](Envelope::sent_at) header. #[derive(Debug)] -pub struct ProcessProjectMetrics { - /// The project state the metrics belong to. - /// - /// The project state can be pending, in which case cached rate limits - /// and other project specific operations are skipped and executed once - /// the project state becomes available. - pub project_state: ProjectState, - /// Currently active cached rate limits for this project. - pub rate_limits: RateLimits, - +pub struct ProcessMetrics { /// A list of metric items. pub data: MetricData, /// The target project. @@ -999,7 +989,7 @@ pub struct ProjectMetrics { /// Project info for extracting quotas. pub project_info: Arc, /// Currently cached rate limits. - pub rate_limits: RateLimits, + pub rate_limits: Arc, } /// Encodes metrics into an envelope ready to be sent upstream. @@ -1028,7 +1018,7 @@ pub struct SubmitClientReports { #[derive(Debug)] pub enum EnvelopeProcessor { ProcessEnvelope(Box), - ProcessProjectMetrics(Box), + ProcessProjectMetrics(Box), ProcessBatchedMetrics(Box), EncodeMetrics(Box), SubmitEnvelope(Box), @@ -1059,10 +1049,10 @@ impl FromMessage for EnvelopeProcessor { } } -impl FromMessage for EnvelopeProcessor { +impl FromMessage for EnvelopeProcessor { type Response = NoResponse; - fn from_message(message: ProcessProjectMetrics, _: ()) -> Self { + fn from_message(message: ProcessMetrics, _: ()) -> Self { Self::ProcessProjectMetrics(Box::new(message)) } } @@ -1136,6 +1126,7 @@ struct InnerProcessor { workers: WorkerGroup, config: Arc, global_config: GlobalConfigHandle, + project_cache_handle: ProjectCacheHandle, cogs: Cogs, #[cfg(feature = "processing")] quotas_pool: Option, @@ -1318,10 +1309,11 @@ impl EnvelopeProcessorService { // Update cached rate limits with the freshly computed ones. if !limits.is_empty() { - self.inner.addrs.project_cache.send(UpdateRateLimits::new( - state.managed_envelope.scoping().project_key, - limits, - )); + self.inner + .project_cache_handle + .get(state.managed_envelope.scoping().project_key) + .rate_limits() + .merge(limits); } Ok(()) @@ -2064,10 +2056,8 @@ impl EnvelopeProcessorService { } } - fn handle_process_project_metrics(&self, cogs: &mut Token, message: ProcessProjectMetrics) { - let ProcessProjectMetrics { - project_state, - rate_limits, + fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) { + let ProcessMetrics { data, project_key, start_time, @@ -2106,13 +2096,16 @@ impl EnvelopeProcessorService { true }); + let project = self.inner.project_cache_handle.get(project_key); + // Best effort check to filter and rate limit buckets, if there is no project state // available at the current time, we will check again after flushing. - let buckets = match project_state.enabled() { - Some(project_info) => { + let buckets = match project.project_state() { + ProjectState::Enabled(project_info) => { + let rate_limits = project.cached_rate_limits(); self.check_buckets(project_key, &project_info, &rate_limits, buckets) } - None => buckets, + _ => buckets, }; relay_log::trace!("merging metric buckets into the aggregator"); @@ -2147,21 +2140,17 @@ impl EnvelopeProcessorService { } }; - let mut feature_weights = FeatureWeights::none(); for (project_key, buckets) in buckets { - feature_weights = feature_weights.merge(relay_metrics::cogs::BySize(&buckets).into()); - - self.inner.addrs.project_cache.send(ProcessMetrics { - data: MetricData::Parsed(buckets), - project_key, - source, - start_time: start_time.into(), - sent_at, - }); - } - - if !feature_weights.is_empty() { - cogs.update(feature_weights); + self.handle_process_metrics( + cogs, + ProcessMetrics { + data: MetricData::Parsed(buckets), + project_key, + source, + start_time: start_time.into(), + sent_at, + }, + ) } } @@ -2362,10 +2351,11 @@ impl EnvelopeProcessorService { Outcome::RateLimited(reason_code), ); - self.inner.addrs.project_cache.send(UpdateRateLimits::new( - item_scoping.scoping.project_key, - limits, - )); + self.inner + .project_cache_handle + .get(item_scoping.scoping.project_key) + .rate_limits() + .merge(limits); } } @@ -2440,9 +2430,10 @@ impl EnvelopeProcessorService { if was_enforced { // Update the rate limits in the project cache. self.inner - .addrs - .project_cache - .send(UpdateRateLimits::new(scoping.project_key, rate_limits)); + .project_cache_handle + .get(scoping.project_key) + .rate_limits() + .merge(rate_limits); } } } @@ -2742,7 +2733,7 @@ impl EnvelopeProcessorService { match message { EnvelopeProcessor::ProcessEnvelope(m) => self.handle_process_envelope(*m), EnvelopeProcessor::ProcessProjectMetrics(m) => { - self.handle_process_project_metrics(&mut cogs, *m) + self.handle_process_metrics(&mut cogs, *m) } EnvelopeProcessor::ProcessBatchedMetrics(m) => { self.handle_process_batched_metrics(&mut cogs, *m) @@ -2888,7 +2879,7 @@ pub struct SendEnvelope { envelope: TypedEnvelope, body: Bytes, http_encoding: HttpEncoding, - project_cache: Addr, + project_cache_handle: ProjectCacheHandle, } impl UpstreamRequest for SendEnvelope { @@ -2940,10 +2931,10 @@ impl UpstreamRequest for SendEnvelope { self.envelope.accept(); if let UpstreamRequestError::RateLimited(limits) = error { - self.project_cache.send(UpdateRateLimits::new( - scoping.project_key, - limits.scope(&scoping), - )); + self.project_cache_handle + .get(scoping.project_key) + .rate_limits() + .merge(limits.scope(&scoping)); } } Err(error) => { @@ -3683,7 +3674,7 @@ mod tests { ), (BucketSource::Internal, None), ] { - let message = ProcessProjectMetrics { + let message = ProcessMetrics { data: MetricData::Raw(vec![item.clone()]), project_state: ProjectState::Pending, rate_limits: Default::default(), @@ -3692,7 +3683,7 @@ mod tests { start_time, sent_at: Some(Utc::now()), }; - processor.handle_process_project_metrics(&mut token, message); + processor.handle_process_metrics(&mut token, message); let value = aggregator_rx.recv().await.unwrap(); let Aggregator::MergeBuckets(merge_buckets) = value else { diff --git a/relay-server/src/services/projects/cache.rs b/relay-server/src/services/projects/cache.rs index 658900e873a..2d99733db1d 100644 --- a/relay-server/src/services/projects/cache.rs +++ b/relay-server/src/services/projects/cache.rs @@ -7,25 +7,22 @@ use crate::extractors::RequestMeta; use crate::services::buffer::{EnvelopeBuffer, EnvelopeBufferError}; use crate::services::global_config; use crate::services::processor::{ - EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics, + EncodeMetrics, EnvelopeProcessor, ProcessEnvelope, ProcessingGroup, ProjectMetrics, }; +use crate::services::projects::cache2::{CheckedEnvelope, ProjectCacheHandle, ProjectEvent}; use crate::Envelope; -use chrono::{DateTime, Utc}; use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; use relay_config::Config; use relay_metrics::Bucket; -use relay_quotas::RateLimits; use relay_redis::RedisPool; use relay_statsd::metric; -use relay_system::{Addr, FromMessage, Interface, Sender, Service}; +use relay_system::{Addr, FromMessage, Interface, Service}; use tokio::sync::{mpsc, watch}; -use tokio::time::Instant; -use crate::services::metrics::{Aggregator, FlushBuckets}; +use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; -use crate::services::projects::project::{Project, ProjectFetchState, ProjectSender, ProjectState}; -use crate::services::projects::source::ProjectSource; +use crate::services::projects::project::{ProjectFetchState, ProjectState}; use crate::services::spooler::{ self, Buffer, BufferService, DequeueMany, Enqueue, QueueKey, RemoveMany, RestoreIndex, UnspooledEnvelope, BATCH_KEY_COUNT, @@ -36,102 +33,6 @@ use crate::services::upstream::UpstreamRelay; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; use crate::utils::{GarbageDisposal, ManagedEnvelope, MemoryChecker, RetryBackoff, SleepHandle}; -/// Requests a refresh of a project state from one of the available sources. -/// -/// The project state is resolved in the following precedence: -/// -/// 1. Local file system -/// 2. Redis cache (processing mode only) -/// 3. Upstream (managed and processing mode only) -/// -/// Requests to the upstream are performed via `UpstreamProjectSource`, which internally batches -/// individual requests. -#[derive(Clone, Debug)] -pub struct RequestUpdate { - /// The public key to fetch the project by. - pub project_key: ProjectKey, - /// If true, all caches should be skipped and a fresh state should be computed. - pub no_cache: bool, - /// Previously cached fetch state, if available. - /// - /// The upstream request will include the revision of the currently cached state, - /// if the upstream does not have a different revision, this cached - /// state is re-used and its expiry bumped. - pub cached_state: ProjectFetchState, -} - -/// Returns the project state. -/// -/// The project state is fetched if it is missing or outdated. If `no_cache` is specified, then the -/// state is always refreshed. -#[derive(Debug)] -pub struct GetProjectState { - project_key: ProjectKey, - no_cache: bool, -} - -impl GetProjectState { - /// Fetches the project state and uses the cached version if up-to-date. - pub fn new(project_key: ProjectKey) -> Self { - Self { - project_key, - no_cache: false, - } - } - - /// Fetches the project state and conditionally skips the cache. - pub fn no_cache(mut self, no_cache: bool) -> Self { - self.no_cache = no_cache; - self - } -} - -/// Returns the project state if it is already cached. -/// -/// This is used for cases when we only want to perform operations that do -/// not require waiting for network requests. -#[derive(Debug)] -pub struct GetCachedProjectState { - project_key: ProjectKey, -} - -impl GetCachedProjectState { - pub fn new(project_key: ProjectKey) -> Self { - Self { project_key } - } -} - -/// A checked envelope and associated rate limits. -/// -/// Items violating the rate limits have been removed from the envelope. If all items are removed -/// from the envelope, `None` is returned in place of the envelope. -#[derive(Debug)] -pub struct CheckedEnvelope { - pub envelope: Option, - pub rate_limits: RateLimits, -} - -/// Checks the envelope against project configuration and rate limits. -/// -/// When `fetched`, then the project state is ensured to be up to date. When `cached`, an outdated -/// project state may be used, or otherwise the envelope is passed through unaltered. -/// -/// To check the envelope, this runs: -/// - Validate origins and public keys -/// - Quotas with a limit of `0` -/// - Cached rate limits -#[derive(Debug)] -pub struct CheckEnvelope { - envelope: ManagedEnvelope, -} - -impl CheckEnvelope { - /// Uses a cached project state and checks the envelope. - pub fn new(envelope: ManagedEnvelope) -> Self { - Self { envelope } - } -} - /// Validates the envelope against project configuration and rate limits. /// /// This ensures internally that the project state is up to date and then runs the same checks as @@ -154,21 +55,6 @@ impl ValidateEnvelope { } } -#[derive(Debug)] -pub struct UpdateRateLimits { - project_key: ProjectKey, - rate_limits: RateLimits, -} - -impl UpdateRateLimits { - pub fn new(project_key: ProjectKey, rate_limits: RateLimits) -> UpdateRateLimits { - Self { - project_key, - rate_limits, - } - } -} - /// Source information where a metric bucket originates from. #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum BucketSource { @@ -195,25 +81,6 @@ impl From<&RequestMeta> for BucketSource { } } -/// Starts the processing flow for received metrics. -/// -/// Enriches the raw data with projcet information and forwards -/// the metrics using [`ProcessProjectMetrics`](crate::services::processor::ProcessProjectMetrics). -#[derive(Debug)] -pub struct ProcessMetrics { - /// A list of metric items. - pub data: MetricData, - /// The target project. - pub project_key: ProjectKey, - /// Whether to keep or reset the metric metadata. - pub source: BucketSource, - /// The instant at which the request was received. - pub start_time: Instant, - /// The value of the Envelope's [`sent_at`](crate::envelope::Envelope::sent_at) - /// header for clock drift correction. - pub sent_at: Option>, -} - /// Updates the buffer index for [`ProjectKey`] with the [`QueueKey`] keys. /// /// This message is sent from the project buffer in case of the error while fetching the data from @@ -262,36 +129,19 @@ pub struct UpdateProject(pub ProjectKey); /// See the enumerated variants for a full list of available messages for this service. #[derive(Debug)] pub enum ProjectCache { - RequestUpdate(RequestUpdate), - Get(GetProjectState, ProjectSender), - GetCached(GetCachedProjectState, Sender), - CheckEnvelope( - CheckEnvelope, - Sender>, - ), ValidateEnvelope(ValidateEnvelope), - UpdateRateLimits(UpdateRateLimits), - ProcessMetrics(ProcessMetrics), FlushBuckets(FlushBuckets), UpdateSpoolIndex(UpdateSpoolIndex), RefreshIndexCache(RefreshIndexCache), - UpdateProject(ProjectKey), } impl ProjectCache { pub fn variant(&self) -> &'static str { match self { - Self::RequestUpdate(_) => "RequestUpdate", - Self::Get(_, _) => "Get", - Self::GetCached(_, _) => "GetCached", - Self::CheckEnvelope(_, _) => "CheckEnvelope", Self::ValidateEnvelope(_) => "ValidateEnvelope", - Self::UpdateRateLimits(_) => "UpdateRateLimits", - Self::ProcessMetrics(_) => "ProcessMetrics", Self::FlushBuckets(_) => "FlushBuckets", Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", Self::RefreshIndexCache(_) => "RefreshIndexCache", - Self::UpdateProject(_) => "UpdateProject", } } } @@ -314,41 +164,6 @@ impl FromMessage for ProjectCache { } } -impl FromMessage for ProjectCache { - type Response = relay_system::NoResponse; - - fn from_message(message: RequestUpdate, _: ()) -> Self { - Self::RequestUpdate(message) - } -} - -impl FromMessage for ProjectCache { - type Response = relay_system::BroadcastResponse; - - fn from_message(message: GetProjectState, sender: ProjectSender) -> Self { - Self::Get(message, sender) - } -} - -impl FromMessage for ProjectCache { - type Response = relay_system::AsyncResponse; - - fn from_message(message: GetCachedProjectState, sender: Sender) -> Self { - Self::GetCached(message, sender) - } -} - -impl FromMessage for ProjectCache { - type Response = relay_system::AsyncResponse>; - - fn from_message( - message: CheckEnvelope, - sender: Sender>, - ) -> Self { - Self::CheckEnvelope(message, sender) - } -} - impl FromMessage for ProjectCache { type Response = relay_system::NoResponse; @@ -357,22 +172,6 @@ impl FromMessage for ProjectCache { } } -impl FromMessage for ProjectCache { - type Response = relay_system::NoResponse; - - fn from_message(message: UpdateRateLimits, _: ()) -> Self { - Self::UpdateRateLimits(message) - } -} - -impl FromMessage for ProjectCache { - type Response = relay_system::NoResponse; - - fn from_message(message: ProcessMetrics, _: ()) -> Self { - Self::ProcessMetrics(message) - } -} - impl FromMessage for ProjectCache { type Response = relay_system::NoResponse; @@ -381,27 +180,6 @@ impl FromMessage for ProjectCache { } } -impl FromMessage for ProjectCache { - type Response = relay_system::NoResponse; - - fn from_message(message: UpdateProject, _: ()) -> Self { - let UpdateProject(project_key) = message; - Self::UpdateProject(project_key) - } -} - -/// Updates the cache with new project state information. -struct UpdateProjectState { - /// The public key to fetch the project by. - project_key: ProjectKey, - - /// New project state information. - state: ProjectFetchState, - - /// If true, all caches should be skipped and a fresh state should be computed. - no_cache: bool, -} - /// Holds the addresses of all services required for [`ProjectCache`]. #[derive(Debug, Clone)] pub struct Services { @@ -423,14 +201,9 @@ struct ProjectCacheBroker { config: Arc, memory_checker: MemoryChecker, services: Services, - // Need hashbrown because extract_if is not stable in std yet. - projects: hashbrown::HashMap, + projects: ProjectCacheHandle, /// Utility for disposing of expired project data in a background thread. garbage_disposal: GarbageDisposal, - /// Source for fetching project states from the upstream or from disk. - source: ProjectSource, - /// Tx channel used to send the updated project state whenever requested. - state_tx: mpsc::UnboundedSender, /// Handle to schedule periodic unspooling of buffered envelopes (spool V1). spool_v1_unspool_handle: SleepHandle, @@ -493,194 +266,35 @@ impl ProjectCacheBroker { .send(DequeueMany::new(keys, spool_v1.buffer_tx.clone())) } - /// Evict projects that are over its expiry date. - /// - /// Ideally, we would use `check_expiry` to determine expiry here. - /// However, for eviction, we want to add an additional delay, such that we do not delete - /// a project that has expired recently and for which a fetch is already underway in - /// [`super::source::upstream`]. - fn evict_stale_project_caches(&mut self) { - let eviction_start = Instant::now(); - let delta = 2 * self.config.project_cache_expiry() + self.config.project_grace_period(); - - let expired = self - .projects - .extract_if(|_, entry| entry.last_updated_at() + delta <= eviction_start); - - // Defer dropping the projects to a dedicated thread: - let mut count = 0; - for (project_key, project) in expired { - if let Some(spool_v1) = self.spool_v1.as_mut() { - let keys = spool_v1 - .index - .extract_if(|key| key.own_key == project_key || key.sampling_key == project_key) - .collect::>(); - - if !keys.is_empty() { - spool_v1.buffer.send(RemoveMany::new(project_key, keys)) - } - } - - self.garbage_disposal.dispose(project); - count += 1; - } - metric!(counter(RelayCounters::EvictingStaleProjectCaches) += count); - - // Log garbage queue size: - let queue_size = self.garbage_disposal.queue_size() as f64; - metric!(gauge(RelayGauges::ProjectCacheGarbageQueueSize) = queue_size); - - metric!(timer(RelayTimers::ProjectStateEvictionDuration) = eviction_start.elapsed()); - } - - fn get_or_create_project(&mut self, project_key: ProjectKey) -> &mut Project { - metric!(histogram(RelayHistograms::ProjectStateCacheSize) = self.projects.len() as u64); - - let config = self.config.clone(); - - self.projects - .entry(project_key) - .and_modify(|_| { - metric!(counter(RelayCounters::ProjectCacheHit) += 1); - }) - .or_insert_with(move || { - metric!(counter(RelayCounters::ProjectCacheMiss) += 1); - Project::new(project_key, config) - }) - } - - /// Updates the [`Project`] with received [`ProjectState`]. - /// - /// If the project state is valid we also send the message to the buffer service to dequeue the - /// envelopes for this project. - fn merge_state(&mut self, message: UpdateProjectState) { - let UpdateProjectState { - project_key, - state, - no_cache, - } = message; - - let project_cache = self.services.project_cache.clone(); - - let old_state = - self.get_or_create_project(project_key) - .update_state(&project_cache, state, no_cache); - - if let Some(old_state) = old_state { - self.garbage_disposal.dispose(old_state); - } - - // Try to schedule unspool if it's not scheduled yet. - self.schedule_unspool(); - - if let Some(envelope_buffer) = self.services.envelope_buffer.as_ref() { - envelope_buffer.send(EnvelopeBuffer::Ready(project_key)) + fn evict_project(&mut self, project_key: ProjectKey) { + let Some(ref mut spool_v1) = self.spool_v1 else { + return; }; - } - - fn handle_request_update(&mut self, message: RequestUpdate) { - let RequestUpdate { - project_key, - no_cache, - cached_state, - } = message; - - // Bump the update time of the project in our hashmap to evade eviction. - let project = self.get_or_create_project(project_key); - project.refresh_updated_timestamp(); - let next_attempt = project.next_fetch_attempt(); - - let source = self.source.clone(); - let sender = self.state_tx.clone(); - - tokio::spawn(async move { - // Wait on the new attempt time when set. - if let Some(next_attempt) = next_attempt { - tokio::time::sleep_until(next_attempt).await; - } - let state = source - .fetch(project_key, no_cache, cached_state) - .await - .unwrap_or_else(|e| { - relay_log::error!( - error = &e as &dyn Error, - tags.project_key = %project_key, - "Failed to fetch project from source" - ); - // TODO: change this to ProjectFetchState::pending() once we consider it safe to do so. - // see https://github.com/getsentry/relay/pull/4140. - ProjectFetchState::disabled() - }); - - let message = UpdateProjectState { - project_key, - no_cache, - state, - }; - - sender.send(message).ok(); - }); - } - fn handle_get(&mut self, message: GetProjectState, sender: ProjectSender) { - let GetProjectState { - project_key, - no_cache, - } = message; - let project_cache = self.services.project_cache.clone(); - let project = self.get_or_create_project(project_key); - - project.get_state(project_cache, sender, no_cache); - } + let keys = spool_v1 + .index + .extract_if(|key| key.own_key == project_key || key.sampling_key == project_key) + .collect::>(); - fn handle_get_cached(&mut self, message: GetCachedProjectState) -> ProjectState { - let project_cache = self.services.project_cache.clone(); - self.get_or_create_project(message.project_key) - .get_cached_state(project_cache, false) + if !keys.is_empty() { + spool_v1.buffer.send(RemoveMany::new(project_key, keys)) + } } - fn handle_check_envelope( - &mut self, - message: CheckEnvelope, - ) -> Result { - let CheckEnvelope { envelope: context } = message; - let project_cache = self.services.project_cache.clone(); - let project_key = context.envelope().meta().public_key(); - if let Some(sampling_key) = context.envelope().sampling_key() { - if sampling_key != project_key { - let sampling_project = self.get_or_create_project(sampling_key); - sampling_project.prefetch(project_cache.clone(), false); - } + fn handle_project_event(&mut self, event: ProjectEvent) { + match event { + ProjectEvent::Ready(_) => self.schedule_unspool(), + ProjectEvent::Evicted(project_key) => self.evict_project(project_key), } - let project = self.get_or_create_project(project_key); - - // Preload the project cache so that it arrives a little earlier in processing. However, - // do not pass `no_cache`. In case the project is rate limited, we do not want to force - // a full reload. Fetching must not block the store request. - project.prefetch(project_cache, false); - - project.check_envelope(context) } /// Handles the processing of the provided envelope. fn handle_processing(&mut self, key: QueueKey, mut managed_envelope: ManagedEnvelope) { let project_key = managed_envelope.envelope().meta().public_key(); - let Some(project) = self.projects.get_mut(&project_key) else { - relay_log::error!( - tags.project_key = %project_key, - "project could not be found in the cache", - ); + let project = self.projects.get(project_key); - let mut project = Project::new(project_key, self.config.clone()); - project.prefetch(self.services.project_cache.clone(), false); - self.projects.insert(project_key, project); - self.enqueue(key, managed_envelope); - return; - }; - - let project_cache = self.services.project_cache.clone(); - let project_info = match project.get_cached_state(project_cache.clone(), false) { + let project_info = match project.project_state() { ProjectState::Enabled(info) => info, ProjectState::Disabled => { managed_envelope.reject(Outcome::Invalid(DiscardReason::ProjectId)); @@ -707,20 +321,19 @@ impl ProjectCacheBroker { .. }) = project.check_envelope(managed_envelope) { - let rate_limits = project.current_rate_limits().clone(); - - let reservoir_counters = project.reservoir_counters(); + let rate_limits = project.rate_limits().current_limits(); + let reservoir_counters = project.reservoir_counters().clone(); let sampling_project_info = managed_envelope .envelope() .sampling_key() - .and_then(|key| self.projects.get_mut(&key)) - .and_then(|p| p.get_cached_state(project_cache, false).enabled()) - .filter(|state| state.organization_id == project_info.organization_id); + .map(|key| self.projects.get(key)) + .and_then(|p| p.project_state().clone().enabled()) + .filter(|info| info.organization_id == project_info.organization_id); let process = ProcessEnvelope { envelope: managed_envelope, - project_info, + project_info: Arc::clone(&project_info), rate_limits, sampling_project_info, reservoir_counters, @@ -748,17 +361,13 @@ impl ProjectCacheBroker { envelope: mut managed_envelope, } = message; - let project_cache = self.services.project_cache.clone(); let envelope = managed_envelope.envelope(); // Fetch the project state for our key and make sure it's not invalid. let own_key = envelope.meta().public_key(); - let project = self.get_or_create_project(own_key); - - let project_state = - project.get_cached_state(project_cache.clone(), envelope.meta().no_cache()); + let project = self.projects.get(own_key); - let project_state = match project_state { + let project_state = match project.project_state() { ProjectState::Enabled(state) => Some(state), ProjectState::Disabled => { managed_envelope.reject(Outcome::Invalid(DiscardReason::ProjectId)); @@ -770,12 +379,10 @@ impl ProjectCacheBroker { // Also, fetch the project state for sampling key and make sure it's not invalid. let sampling_key = envelope.sampling_key(); let mut requires_sampling_state = sampling_key.is_some(); - let sampling_state = if let Some(sampling_key) = sampling_key { - let state = self - .get_or_create_project(sampling_key) - .get_cached_state(project_cache, envelope.meta().no_cache()); - match state { - ProjectState::Enabled(state) => Some(state), + let sampling_info = if let Some(sampling_key) = sampling_key { + let sampling_project = self.projects.get(sampling_key); + match sampling_project.project_state() { + ProjectState::Enabled(info) => Some(Arc::clone(&info)), ProjectState::Disabled => { relay_log::trace!("Sampling state is disabled ({sampling_key})"); // We accept events even if its root project has been disabled. @@ -796,7 +403,7 @@ impl ProjectCacheBroker { // Trigger processing once we have a project state and we either have a sampling project // state or we do not need one. if project_state.is_some() - && (sampling_state.is_some() || !requires_sampling_state) + && (sampling_info.is_some() || !requires_sampling_state) && self.memory_checker.check_memory().has_capacity() && self.global_config.is_ready() { @@ -809,37 +416,20 @@ impl ProjectCacheBroker { self.enqueue(key, managed_envelope); } - fn handle_rate_limits(&mut self, message: UpdateRateLimits) { - self.get_or_create_project(message.project_key) - .merge_rate_limits(message.rate_limits); - } - - fn handle_process_metrics(&mut self, message: ProcessMetrics) { - let project_cache = self.services.project_cache.clone(); - - let message = self - .get_or_create_project(message.project_key) - .prefetch(project_cache, false) - .process_metrics(message); - - self.services.envelope_processor.send(message); - } - fn handle_flush_buckets(&mut self, message: FlushBuckets) { let aggregator = self.services.aggregator.clone(); - let project_cache = self.services.project_cache.clone(); let mut no_project = 0; let mut scoped_buckets = BTreeMap::new(); for (project_key, buckets) in message.buckets { - let project = self.get_or_create_project(project_key); + let project = self.projects.get(project_key); - let project_info = match project.current_state() { + let project_info = match project.project_state() { ProjectState::Pending => { no_project += 1; - // Schedule an update for the project just in case. - project.prefetch(project_cache.clone(), false); - project.return_buckets(&aggregator, buckets); + + // Return the buckets to the aggregator. + aggregator.send(MergeBuckets::new(project_key, buckets)); continue; } ProjectState::Disabled => { @@ -854,7 +444,7 @@ impl ProjectCacheBroker { ProjectState::Enabled(project_info) => project_info, }; - let Some(scoping) = project.scoping() else { + let Some(scoping) = project_info.scoping(project_key) else { relay_log::error!( tags.project_key = project_key.as_str(), "there is no scoping: dropping {} buckets", @@ -867,8 +457,8 @@ impl ProjectCacheBroker { match scoped_buckets.entry(scoping) { Vacant(entry) => { entry.insert(ProjectMetrics { - project_info, - rate_limits: project.current_rate_limits().clone(), + project_info: Arc::clone(project_info), + rate_limits: project.rate_limits().current_limits(), buckets, }); } @@ -895,16 +485,14 @@ impl ProjectCacheBroker { fn handle_refresh_index_cache(&mut self, message: RefreshIndexCache) { let RefreshIndexCache(index) = message; - let project_cache = self.services.project_cache.clone(); for key in index { let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); spool_v1.index.insert(key); - self.get_or_create_project(key.own_key) - .prefetch(project_cache.clone(), false); + + self.projects.fetch(key.own_key); if key.own_key != key.sampling_key { - self.get_or_create_project(key.sampling_key) - .prefetch(project_cache.clone(), false); + self.projects.fetch(key.sampling_key); } } } @@ -918,11 +506,10 @@ impl ProjectCacheBroker { let services = self.services.clone(); let own_key = envelope.meta().public_key(); - let project = self.get_or_create_project(own_key); - let project_state = project.get_cached_state(services.project_cache.clone(), false); + let project = self.projects.get(own_key); // Check if project config is enabled. - let project_info = match project_state { + let project_info = match project.project_state() { ProjectState::Enabled(info) => info, ProjectState::Disabled => { let mut managed_envelope = ManagedEnvelope::new( @@ -944,8 +531,7 @@ impl ProjectCacheBroker { let sampling_project_info = match sampling_key.map(|sampling_key| { ( sampling_key, - self.get_or_create_project(sampling_key) - .get_cached_state(services.project_cache, false), + self.projects.get(sampling_key).project_state().clone(), ) }) { Some((_, ProjectState::Enabled(info))) => { @@ -964,8 +550,6 @@ impl ProjectCacheBroker { None => None, }; - let project = self.get_or_create_project(own_key); - // Reassign processing groups and proceed to processing. for (group, envelope) in ProcessingGroup::split_envelope(*envelope) { let managed_envelope = ManagedEnvelope::new( @@ -983,11 +567,11 @@ impl ProjectCacheBroker { continue; // Outcomes are emitted by check_envelope }; - let reservoir_counters = project.reservoir_counters(); + let reservoir_counters = project.reservoir_counters().clone(); services.envelope_processor.send(ProcessEnvelope { envelope: managed_envelope, project_info: project_info.clone(), - rate_limits: project.current_rate_limits().clone(), + rate_limits: project.rate_limits().current_limits(), sampling_project_info: sampling_project_info.clone(), reservoir_counters, }); @@ -996,22 +580,6 @@ impl ProjectCacheBroker { Ok(()) } - fn handle_update_project(&mut self, project_key: ProjectKey) { - let project_cache = self.services.project_cache.clone(); - let envelope_buffer = self.services.envelope_buffer.clone(); - let project = self.get_or_create_project(project_key); - - // If the project is already loaded, inform the envelope buffer. - if !project.current_state().is_pending() { - if let Some(envelope_buffer) = envelope_buffer { - envelope_buffer.send(EnvelopeBuffer::Ready(project_key)); - } - } - - let no_cache = false; - project.prefetch(project_cache, no_cache); - } - /// Returns backoff timeout for an unspool attempt. fn next_unspool_attempt(&mut self) -> Duration { let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); @@ -1036,15 +604,9 @@ impl ProjectCacheBroker { /// Which includes the own key and the sampling key for the project. /// Note: this function will trigger [`ProjectState`] refresh if it's already expired. fn is_state_cached(&mut self, key: &QueueKey) -> bool { - key.unique_keys().iter().all(|key| { - self.projects.get_mut(key).is_some_and(|project| { - // Returns `Some` if the project is cached otherwise None and also triggers refresh - // in background. - !project - .get_cached_state(self.services.project_cache.clone(), false) - .is_pending() - }) - }) + key.unique_keys() + .iter() + .all(|key| !self.projects.get(*key).project_state().is_pending()) } /// Iterates the buffer index and tries to unspool the envelopes for projects with a valid @@ -1108,25 +670,14 @@ impl ProjectCacheBroker { message = ty, { match message { - ProjectCache::RequestUpdate(message) => self.handle_request_update(message), - ProjectCache::Get(message, sender) => self.handle_get(message, sender), - ProjectCache::GetCached(message, sender) => { - sender.send(self.handle_get_cached(message)) - } - ProjectCache::CheckEnvelope(message, sender) => { - sender.send(self.handle_check_envelope(message)) - } ProjectCache::ValidateEnvelope(message) => { self.handle_validate_envelope(message) } - ProjectCache::UpdateRateLimits(message) => self.handle_rate_limits(message), - ProjectCache::ProcessMetrics(message) => self.handle_process_metrics(message), ProjectCache::FlushBuckets(message) => self.handle_flush_buckets(message), ProjectCache::UpdateSpoolIndex(message) => self.handle_buffer_index(message), ProjectCache::RefreshIndexCache(message) => { self.handle_refresh_index_cache(message) } - ProjectCache::UpdateProject(project) => self.handle_update_project(project), } } ) @@ -1153,6 +704,7 @@ impl ProjectCacheBroker { pub struct ProjectCacheService { config: Arc, memory_checker: MemoryChecker, + project_cache: ProjectCacheHandle, services: Services, global_config_rx: watch::Receiver, /// Bounded channel used exclusively to receive envelopes from the envelope buffer. @@ -1165,6 +717,7 @@ impl ProjectCacheService { pub fn new( config: Arc, memory_checker: MemoryChecker, + project_cache: ProjectCacheHandle, services: Services, global_config_rx: watch::Receiver, envelopes_rx: mpsc::Receiver, @@ -1173,6 +726,7 @@ impl ProjectCacheService { Self { config, memory_checker, + project_cache, services, global_config_rx, envelopes_rx, @@ -1188,22 +742,20 @@ impl Service for ProjectCacheService { let Self { config, memory_checker, + project_cache, services, mut global_config_rx, mut envelopes_rx, redis, } = self; + let project_events = project_cache.events(); let project_cache = services.project_cache.clone(); let outcome_aggregator = services.outcome_aggregator.clone(); let test_store = services.test_store.clone(); tokio::spawn(async move { - let mut ticker = tokio::time::interval(config.cache_eviction_interval()); relay_log::info!("project cache started"); - // Channel for async project state responses back into the project cache. - let (state_tx, mut state_rx) = mpsc::unbounded_channel(); - let global_config = match global_config_rx.borrow().clone() { global_config::Status::Ready(_) => { relay_log::info!("global config received"); @@ -1256,20 +808,12 @@ impl Service for ProjectCacheService { }), }; - // Main broker that serializes public and internal messages, and triggers project state - // fetches via the project source. let mut broker = ProjectCacheBroker { config: config.clone(), memory_checker, - projects: hashbrown::HashMap::new(), + projects: todo!(), garbage_disposal: GarbageDisposal::new(), - source: ProjectSource::start( - config.clone(), - services.upstream_relay.clone(), - redis, - ), services, - state_tx, spool_v1_unspool_handle: SleepHandle::idle(), spool_v1, global_config, @@ -1289,9 +833,9 @@ impl Service for ProjectCacheService { } }) }, - Some(message) = state_rx.recv() => { - metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "merge_state", { - broker.merge_state(message) + project_event = project_events.recv() => { + metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_project_event", { + broker.handle_project_event(project_event); }) } // Buffer will not dequeue the envelopes from the spool if there is not enough @@ -1301,11 +845,6 @@ impl Service for ProjectCacheService { broker.handle_processing(key, managed_envelope) }) }, - _ = ticker.tick() => { - metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "evict_project_caches", { - broker.evict_stale_project_caches() - }) - } () = &mut broker.spool_v1_unspool_handle => { metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "periodic_unspool", { broker.handle_periodic_unspool() @@ -1334,17 +873,10 @@ impl Service for ProjectCacheService { #[derive(Debug)] #[allow(dead_code)] // Fields are never read, only used for discarding/dropping data. enum ProjectGarbage { - Project(Project), ProjectFetchState(ProjectFetchState), Metrics(Vec), } -impl From for ProjectGarbage { - fn from(value: Project) -> Self { - Self::Project(value) - } -} - impl From for ProjectGarbage { fn from(value: ProjectFetchState) -> Self { Self::ProjectFetchState(value) @@ -1390,7 +922,6 @@ mod tests { async fn project_cache_broker_setup( services: Services, - state_tx: mpsc::UnboundedSender, buffer_tx: mpsc::UnboundedSender, ) -> (ProjectCacheBroker, Addr) { let config: Arc<_> = Config::from_json_value(serde_json::json!({ @@ -1429,11 +960,9 @@ mod tests { ProjectCacheBroker { config: config.clone(), memory_checker, - projects: hashbrown::HashMap::new(), + projects: todo!(), garbage_disposal: GarbageDisposal::new(), - source: ProjectSource::start(config, services.upstream_relay.clone(), None), services, - state_tx, spool_v1_unspool_handle: SleepHandle::idle(), spool_v1: Some(SpoolV1 { buffer_tx, @@ -1452,10 +981,9 @@ mod tests { relay_log::init_test!(); let services = mocked_services(); - let (state_tx, _) = mpsc::unbounded_channel(); let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel(); let (mut broker, _buffer_svc) = - project_cache_broker_setup(services.clone(), state_tx, buffer_tx).await; + project_cache_broker_setup(services.clone(), buffer_tx).await; broker.global_config = GlobalConfigStatus::Ready; let (tx_update, mut rx_update) = mpsc::unbounded_channel(); @@ -1529,10 +1057,9 @@ mod tests { #[tokio::test] async fn handle_processing_without_project() { let services = mocked_services(); - let (state_tx, _) = mpsc::unbounded_channel(); let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel(); let (mut broker, buffer_svc) = - project_cache_broker_setup(services.clone(), state_tx, buffer_tx.clone()).await; + project_cache_broker_setup(services.clone(), buffer_tx.clone()).await; let dsn = "111d836b15bb49d7bbf99e64295d995b"; let project_key = ProjectKey::parse(dsn).unwrap(); @@ -1548,7 +1075,6 @@ mod tests { ); // Index and projects are empty. - assert!(broker.projects.is_empty()); assert!(broker.spool_v1.as_mut().unwrap().index.is_empty()); // Since there is no project we should not process anything but create a project and spool @@ -1556,7 +1082,11 @@ mod tests { broker.handle_processing(key, envelope); // Assert that we have a new project and also added an index. - assert!(broker.projects.get(&project_key).is_some()); + assert!(!broker + .projects + .get(project_key) + .project_state() + .is_pending()); assert!(broker.spool_v1.as_mut().unwrap().index.contains(&key)); // Check is we actually spooled anything. diff --git a/relay-server/src/services/projects/cache2/handle.rs b/relay-server/src/services/projects/cache2/handle.rs new file mode 100644 index 00000000000..e7afb43454e --- /dev/null +++ b/relay-server/src/services/projects/cache2/handle.rs @@ -0,0 +1,52 @@ +use core::fmt; +use std::sync::Arc; + +use relay_base_schema::project::ProjectKey; +use relay_config::Config; +use relay_system::Addr; +use tokio::sync::broadcast; + +use super::state::Shared; +use crate::services::projects::cache2::service::ProjectEvent; +use crate::services::projects::cache2::{Project, ProjectCache}; + +#[derive(Clone)] +pub struct ProjectCacheHandle { + shared: Arc, + config: Arc, + service: Addr, + project_events: broadcast::Sender, +} + +impl ProjectCacheHandle { + /// Returns the current project state for the `project_key`. + pub fn get(&self, project_key: ProjectKey) -> Project<'_> { + // TODO: maybe we should always trigger a fetch? + // We need a way to continously keep projects updated while at the same time + // let unused projects expire. + // TODO: trigger prefetch for the sampling projects, maybe take a resolver trait which can + // also resolve the sampling project and fetch? Or do it explicit. + let project = match self.shared.get_or_create(project_key) { + Ok(project) => project, + Err(missing) => missing.fetch(&self.service), + }; + Project::new(project, &self.config) + } + + pub fn fetch(&self, project_key: ProjectKey) { + // TODO: does this make sense? + self.service.send(ProjectCache::Fetch(project_key)); + } + + pub fn events(&self) -> broadcast::Receiver { + self.project_events.subscribe() + } +} + +impl fmt::Debug for ProjectCacheHandle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ProjectCacheHandle") + .field("shared", &self.shared) + .finish() + } +} diff --git a/relay-server/src/services/projects/cache2/mod.rs b/relay-server/src/services/projects/cache2/mod.rs new file mode 100644 index 00000000000..62d55aff707 --- /dev/null +++ b/relay-server/src/services/projects/cache2/mod.rs @@ -0,0 +1,8 @@ +mod handle; +mod project; +mod service; +mod state; + +pub use self::handle::ProjectCacheHandle; +pub use self::project::{CheckedEnvelope, Project}; +pub use self::service::{ProjectCache, ProjectCacheService, ProjectEvent}; diff --git a/relay-server/src/services/projects/cache2/project.rs b/relay-server/src/services/projects/cache2/project.rs new file mode 100644 index 00000000000..27223609541 --- /dev/null +++ b/relay-server/src/services/projects/cache2/project.rs @@ -0,0 +1,376 @@ +use std::sync::Arc; + +use relay_config::Config; +use relay_dynamic_config::Feature; +use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits}; +use relay_sampling::evaluation::ReservoirCounters; + +use crate::envelope::ItemType; +use crate::services::outcome::{DiscardReason, Outcome}; +use crate::services::projects::cache2::state::SharedProject; +use crate::services::projects::project::ProjectState; +use crate::utils::{CheckLimits, Enforcement, EnvelopeLimiter, ManagedEnvelope}; + +/// A loaded project. +pub struct Project<'a> { + shared: SharedProject, + config: &'a Config, +} + +impl<'a> Project<'a> { + pub(crate) fn new(shared: SharedProject, config: &'a Config) -> Self { + Self { shared, config } + } + + pub fn project_state(&self) -> &ProjectState { + self.shared.project_state() + } + + pub fn rate_limits(&self) -> &CachedRateLimits { + self.shared.cached_rate_limits() + } + + pub fn reservoir_counters(&self) -> &ReservoirCounters { + self.shared.reservoir_counters() + } + + /// Runs the checks on incoming envelopes. + /// + /// See, [`crate::services::projects::cache::CheckEnvelope`] for more information + /// + /// * checks the rate limits + /// * validates the envelope meta in `check_request` - determines whether the given request + /// should be accepted or discarded + /// + /// IMPORTANT: If the [`ProjectState`] is invalid, the `check_request` will be skipped and only + /// rate limits will be validated. This function **must not** be called in the main processing + /// pipeline. + pub fn check_envelope( + &self, + mut envelope: ManagedEnvelope, + ) -> Result { + let state = match self.project_state() { + ProjectState::Enabled(state) => Some(Arc::clone(&state)), + ProjectState::Disabled => { + // TODO(jjbayer): We should refactor this function to either return a Result or + // handle envelope rejections internally, but not both. + envelope.reject(Outcome::Invalid(DiscardReason::ProjectId)); + return Err(DiscardReason::ProjectId); + } + ProjectState::Pending => None, + }; + + let mut scoping = envelope.scoping(); + + if let Some(ref state) = state { + scoping = state.scope_request(envelope.envelope().meta()); + envelope.scope(scoping); + + if let Err(reason) = state.check_envelope(envelope.envelope(), &self.config) { + envelope.reject(Outcome::Invalid(reason)); + return Err(reason); + } + } + + let current_limits = self.rate_limits().current_limits(); + + let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]); + let envelope_limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |item_scoping, _| { + Ok(current_limits.check_with_quotas(quotas, item_scoping)) + }); + + let (mut enforcement, mut rate_limits) = + envelope_limiter.compute(envelope.envelope_mut(), &scoping)?; + + let check_nested_spans = state + .as_ref() + .is_some_and(|s| s.has_feature(Feature::ExtractSpansFromEvent)); + + // If we can extract spans from the event, we want to try and count the number of nested + // spans to correctly emit negative outcomes in case the transaction itself is dropped. + if check_nested_spans { + sync_spans_to_enforcement(&envelope, &mut enforcement); + } + + enforcement.apply_with_outcomes(&mut envelope); + + envelope.update(); + + // Special case: Expose active rate limits for all metric namespaces if there is at least + // one metrics item in the Envelope to communicate backoff to SDKs. This is necessary + // because `EnvelopeLimiter` cannot not check metrics without parsing item contents. + if envelope.envelope().items().any(|i| i.ty().is_metrics()) { + let mut metrics_scoping = scoping.item(DataCategory::MetricBucket); + metrics_scoping.namespace = MetricNamespaceScoping::Any; + rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping)); + } + + let envelope = if envelope.envelope().is_empty() { + // Individual rate limits have already been issued above + envelope.reject(Outcome::RateLimited(None)); + None + } else { + Some(envelope) + }; + + Ok(CheckedEnvelope { + envelope, + rate_limits, + }) + } +} + +/// A checked envelope and associated rate limits. +/// +/// Items violating the rate limits have been removed from the envelope. If all items are removed +/// from the envelope, `None` is returned in place of the envelope. +#[derive(Debug)] +pub struct CheckedEnvelope { + pub envelope: Option, + pub rate_limits: RateLimits, +} + +/// Adds category limits for the nested spans inside a transaction. +/// +/// On the fast path of rate limiting, we do not have nested spans of a transaction extracted +/// as top-level spans, thus if we limited a transaction, we want to count and emit negative +/// outcomes for each of the spans nested inside that transaction. +fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) { + if !enforcement.is_event_active() { + return; + } + + let spans_count = count_nested_spans(envelope); + if spans_count == 0 { + return; + } + + if enforcement.event.is_active() { + enforcement.spans = enforcement.event.clone_for(DataCategory::Span, spans_count); + } + + if enforcement.event_indexed.is_active() { + enforcement.spans_indexed = enforcement + .event_indexed + .clone_for(DataCategory::SpanIndexed, spans_count); + } +} + +/// Counts the nested spans inside the first transaction envelope item inside the [`Envelope`](crate::envelope::Envelope). +fn count_nested_spans(envelope: &ManagedEnvelope) -> usize { + #[derive(Debug, serde::Deserialize)] + struct PartialEvent { + spans: crate::utils::SeqCount, + } + + envelope + .envelope() + .items() + .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted()) + .and_then(|item| serde_json::from_slice::(&item.payload()).ok()) + // We do + 1, since we count the transaction itself because it will be extracted + // as a span and counted during the slow path of rate limiting. + .map_or(0, |event| event.spans.0 + 1) +} + +#[cfg(test)] +mod tests { + use crate::envelope::{ContentType, Envelope, Item}; + use crate::extractors::RequestMeta; + use crate::services::processor::ProcessingGroup; + use relay_base_schema::project::ProjectId; + use relay_event_schema::protocol::EventId; + use relay_test::mock_service; + use serde_json::json; + use smallvec::SmallVec; + + use super::*; + + #[test] + fn get_state_expired() { + for expiry in [9999, 0] { + let config = Arc::new( + Config::from_json_value(json!( + { + "cache": { + "project_expiry": expiry, + "project_grace_period": 0, + "eviction_interval": 9999 // do not evict + } + } + )) + .unwrap(), + ); + + // Initialize project with a state + let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let project_info = ProjectInfo { + project_id: Some(ProjectId::new(123)), + ..Default::default() + }; + let mut project = Project::new(project_key, config.clone()); + project.state = ProjectFetchState::enabled(project_info); + + if expiry > 0 { + // With long expiry, should get a state + assert!(matches!(project.current_state(), ProjectState::Enabled(_))); + } else { + // With 0 expiry, project should expire immediately. No state can be set. + assert!(matches!(project.current_state(), ProjectState::Pending)); + } + } + } + + #[tokio::test] + async fn test_stale_cache() { + let (addr, _) = mock_service("project_cache", (), |&mut (), _| {}); + + let config = Arc::new( + Config::from_json_value(json!( + { + "cache": { + "project_expiry": 100, + "project_grace_period": 0, + "eviction_interval": 9999 // do not evict + } + } + )) + .unwrap(), + ); + + let channel = StateChannel::new(); + + // Initialize project with a state. + let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let mut project = Project::new(project_key, config); + project.state_channel = Some(channel); + project.state = ProjectFetchState::allowed(); + + assert!(project.next_fetch_attempt.is_none()); + // Try to update project with errored project state. + project.update_state(&addr, ProjectFetchState::pending(), false); + // Since we got invalid project state we still keep the old one meaning there + // still must be the project id set. + assert!(matches!(project.current_state(), ProjectState::Enabled(_))); + assert!(project.next_fetch_attempt.is_some()); + + // This tests that we actually initiate the backoff and the backoff mechanism works: + // * first call to `update_state` with invalid ProjectState starts the backoff, but since + // it's the first attemt, we get Duration of 0. + // * second call to `update_state` here will bumpt the `next_backoff` Duration to somehing + // like ~ 1s + // * and now, by calling `fetch_state` we test that it's a noop, since if backoff is active + // we should never fetch + // * without backoff it would just panic, not able to call the ProjectCache service + let channel = StateChannel::new(); + project.state_channel = Some(channel); + project.update_state(&addr, ProjectFetchState::pending(), false); + project.fetch_state(addr, false); + } + + fn create_project(config: Option) -> Project { + let project_key = ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(); + let mut project = Project::new(project_key, Arc::new(Config::default())); + let mut project_info = ProjectInfo { + project_id: Some(ProjectId::new(42)), + ..Default::default() + }; + let mut public_keys = SmallVec::new(); + public_keys.push(PublicKeyConfig { + public_key: project_key, + numeric_id: None, + }); + project_info.public_keys = public_keys; + if let Some(config) = config { + project_info.config = serde_json::from_value(config).unwrap(); + } + project.state = ProjectFetchState::enabled(project_info); + project + } + + fn request_meta() -> RequestMeta { + let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" + .parse() + .unwrap(); + + RequestMeta::new(dsn) + } + + #[test] + fn test_track_nested_spans_outcomes() { + let mut project = create_project(Some(json!({ + "features": [ + "organizations:indexed-spans-extraction" + ], + "quotas": [{ + "id": "foo", + "categories": ["transaction"], + "window": 3600, + "limit": 0, + "reasonCode": "foo", + }] + }))); + + let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta()); + + let mut transaction = Item::new(ItemType::Transaction); + transaction.set_payload( + ContentType::Json, + r#"{ + "event_id": "52df9022835246eeb317dbd739ccd059", + "type": "transaction", + "transaction": "I have a stale timestamp, but I'm recent!", + "start_timestamp": 1, + "timestamp": 2, + "contexts": { + "trace": { + "trace_id": "ff62a8b040f340bda5d830223def1d81", + "span_id": "bd429c44b67a3eb4" + } + }, + "spans": [ + { + "span_id": "bd429c44b67a3eb4", + "start_timestamp": 1, + "timestamp": null, + "trace_id": "ff62a8b040f340bda5d830223def1d81" + }, + { + "span_id": "bd429c44b67a3eb5", + "start_timestamp": 1, + "timestamp": null, + "trace_id": "ff62a8b040f340bda5d830223def1d81" + } + ] +}"#, + ); + + envelope.add_item(transaction); + + let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom(); + let (test_store, _) = Addr::custom(); + + let managed_envelope = ManagedEnvelope::new( + envelope, + outcome_aggregator.clone(), + test_store, + ProcessingGroup::Transaction, + ); + + let _ = project.check_envelope(managed_envelope); + drop(outcome_aggregator); + + let expected = [ + (DataCategory::Transaction, 1), + (DataCategory::TransactionIndexed, 1), + (DataCategory::Span, 3), + (DataCategory::SpanIndexed, 3), + ]; + + for (expected_category, expected_quantity) in expected { + let outcome = outcome_aggregator_rx.blocking_recv().unwrap(); + assert_eq!(outcome.category, expected_category); + assert_eq!(outcome.quantity, expected_quantity); + } + } +} diff --git a/relay-server/src/services/projects/cache2/service.rs b/relay-server/src/services/projects/cache2/service.rs new file mode 100644 index 00000000000..1115a8feeb6 --- /dev/null +++ b/relay-server/src/services/projects/cache2/service.rs @@ -0,0 +1,136 @@ +use std::sync::Arc; + +use relay_base_schema::project::ProjectKey; +use relay_config::Config; +use tokio::sync::{broadcast, mpsc}; + +use crate::services::buffer::EnvelopeBuffer; +use crate::services::projects::cache2::state::{CompletedFetch, Fetch}; +use crate::services::projects::project::{ProjectFetchState, ProjectState}; +use crate::services::projects::source::ProjectSource; + +pub enum ProjectCache { + Fetch(ProjectKey), +} + +impl relay_system::Interface for ProjectCache {} + +impl relay_system::FromMessage for ProjectCache { + type Response = relay_system::NoResponse; + + fn from_message(message: Self, _: ()) -> Self { + message + } +} + +pub enum ProjectEvent { + Ready(ProjectKey), + Evicted(ProjectKey), +} + +pub struct ProjectCacheService { + store: super::state::ProjectStore, + source: ProjectSource, + config: Arc, + + buffer: relay_system::Addr, + + project_update_rx: mpsc::UnboundedReceiver, + project_update_tx: mpsc::UnboundedSender, + + project_events_tx: broadcast::Sender, +} + +impl ProjectCacheService { + fn schedule_fetch(&self, fetch: Fetch) { + let source = self.source.clone(); + let project_updates = self.project_update_tx.clone(); + + tokio::spawn(async move { + tokio::time::sleep_until(fetch.when().into()).await; + + // TODO: cached state for delta fetches, maybe this should just be a revision? + let state = match source + .fetch(fetch.project_key(), false, ProjectFetchState::pending()) + .await + { + // TODO: verify if the sanitized here is correct + Ok(state) => state.sanitized().into(), + Err(err) => { + relay_log::error!( + error = &err as &dyn std::error::Error, + "failed to fetch project state for {fetch:?}" + ); + ProjectState::Pending + } + }; + + project_updates.send(fetch.complete(state)); + }); + } +} + +impl ProjectCacheService { + fn handle_fetch(&mut self, project_key: ProjectKey) { + if let Some(fetch) = self.store.try_begin_fetch(project_key, &self.config) { + self.schedule_fetch(fetch); + } + } + + fn handle_project_update(&mut self, fetch: CompletedFetch) { + let project_key = fetch.project_key(); + + if let Some(fetch) = self.store.complete_fetch(fetch, &self.config) { + relay_log::trace!( + project_key = fetch.project_key().as_str(), + "re-scheduling project fetch: {fetch:?}" + ); + self.schedule_fetch(fetch); + return; + } + + self.project_events_tx + .send(ProjectEvent::Ready(project_key)); + } + + fn handle_evict_stale_projects(&mut self) { + let on_evict = |project_key| { + self.project_events_tx + .send(ProjectEvent::Evicted(project_key)); + }; + + self.store.evict_stale_projects(&self.config, on_evict); + } + + fn handle(&mut self, message: ProjectCache) { + match message { + ProjectCache::Fetch(project_key) => self.handle_fetch(project_key), + } + } +} + +impl relay_system::Service for ProjectCacheService { + type Interface = ProjectCache; + + fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + tokio::spawn(async move { + let mut eviction_ticker = tokio::time::interval(self.config.cache_eviction_interval()); + + loop { + tokio::select! { + biased; + + Some(update) = self.project_update_rx.recv() => { + self.handle_project_update(update) + }, + Some(message) = rx.recv() => { + self.handle(message); + }, + _ = eviction_ticker.tick() => { + self.handle_evict_stale_projects() + } + } + } + }); + } +} diff --git a/relay-server/src/services/projects/cache2/state.rs b/relay-server/src/services/projects/cache2/state.rs new file mode 100644 index 00000000000..722e20f1829 --- /dev/null +++ b/relay-server/src/services/projects/cache2/state.rs @@ -0,0 +1,437 @@ +use std::fmt; +use std::sync::Arc; +use std::time::Instant; + +use arc_swap::ArcSwap; +use relay_base_schema::project::ProjectKey; +use relay_config::Config; +use relay_quotas::{CachedRateLimits, RateLimits}; +use relay_sampling::evaluation::ReservoirCounters; + +use crate::services::projects::cache2::ProjectCache; +use crate::services::projects::project::ProjectState; +use crate::utils::RetryBackoff; + +/// The backing storage for a project cache. +/// +/// Exposes the only interface to delete from [`Shared`], gurnatueed by +/// requiring exclusive/mutable access to [`ProjectStore`]. +/// +/// [`Shared`] can be extended through [`Shared::get_or_create`], in which case +/// the private state is missing. Users of [`Shared::get_or_create`] *must* trigger +/// a fetch to create the private state when [`Missing`] is returned. +/// This gurnatuees that eventually the project state is populated, but for a undetermined, +/// time it is possible that shared state exists without the respective private state. +pub struct ProjectStore { + shared: Arc, + private: hashbrown::HashMap, +} + +impl ProjectStore { + pub fn get(&mut self, project_key: ProjectKey) -> Option> { + let private = self.private.get_mut(&project_key)?; + let shared = self.shared.projects.pin().get(&project_key).cloned(); + debug_assert!( + shared.is_some(), + "there must be a shared project if private state exists" + ); + + Some(ProjectRef { + private, + shared: shared?, + }) + } + + pub fn try_begin_fetch(&mut self, project_key: ProjectKey, config: &Config) -> Option { + self.get_or_create(project_key, config) + .try_begin_fetch(config) + } + + #[must_use = "an incomplete fetch must be retried"] + pub fn complete_fetch(&mut self, fetch: CompletedFetch, config: &Config) -> Option { + // TODO: what if in the meantime the state expired and was evicted? + // Should eviction be possible for states that are currently in progress, I don't think so. + // Maybe need to discard outdated fetches (for evicted projects)? + debug_assert!(self.shared.projects.pin().get(&fetch.project_key).is_some()); + debug_assert!(self.private.get(&fetch.project_key).is_some()); + + let mut project = self.get_or_create(fetch.project_key, config); + project.complete_fetch(fetch); + // Schedule another fetch if necessary, usually should only happen if + // the completed fetch is pending. + project.try_begin_fetch(config) + } + + pub fn evict_stale_projects(&mut self, config: &Config, mut on_evict: F) -> usize + where + F: FnMut(ProjectKey), + { + let eviction_start = Instant::now(); + let delta = 2 * config.project_cache_expiry() + config.project_grace_period(); + + // TODO: what do we do with forever fetching projects, do we fail eventually? + let expired = self.private.extract_if(|_, private| { + if private.has_fetch_in_progress() { + return false; + } + + // Invariant: if there is no last successful fetch, + // there must be a fetch currently in progress. + debug_assert!(private.last_non_pending_fetch.is_some()); + + private + .last_non_pending_fetch + .map_or(true, |ts| ts + delta <= eviction_start) + }); + + let mut evicted = 0; + + let shared = self.shared.projects.pin(); + for (project_key, _) in expired { + let _removed = shared.remove(&project_key); + debug_assert!( + _removed.is_some(), + "an expired project must exist in the shared state" + ); + + evicted += 1; + + // TODO: garbage disposal? Do we still need that, we shouldn't have a problem with + // timings anymore. + + on_evict(project_key); + } + drop(shared); + + evicted + } + + fn get_or_create(&mut self, project_key: ProjectKey, config: &Config) -> ProjectRef<'_> { + #[cfg(debug_assertions)] + if self.private.contains_key(&project_key) { + // We have exclusive access to the private part, there are no concurrent deletions + // hence when if we have a private state there must always be a shared state as well. + debug_assert!(self.shared.projects.pin().contains_key(&project_key)); + } + + let private = self + .private + .entry(project_key) + .or_insert_with(|| PrivateProjectState::new(project_key, config)); + + let shared = self + .shared + .projects + .pin() + .get_or_insert_with(project_key, Default::default) + .clone(); + + ProjectRef { private, shared } + } +} + +pub struct ProjectRef<'a> { + shared: SharedProjectState, + private: &'a mut PrivateProjectState, +} + +impl ProjectRef<'_> { + pub fn merge_rate_limits(&mut self, rate_limits: RateLimits) { + self.shared.merge_rate_limits(rate_limits) + } + + fn try_begin_fetch(&mut self, config: &Config) -> Option { + self.private.try_begin_fetch(config) + } + + fn complete_fetch(&mut self, fetch: CompletedFetch) { + self.private.complete_fetch(&fetch); + + // Keep the old state around if the current fetch is pending. + // It may still be useful to callers. + if !fetch.project_state.is_pending() { + self.shared.set_project_state(fetch.project_state); + } + } +} + +pub struct Shared { + projects: papaya::HashMap, +} + +impl Shared { + // pub fn get(&self, project_key: ProjectKey) -> Option { + // self.projects + // .pin() + // .get(&project_key) + // .map(|v| v.state.load().as_ref().clone()) + // } + + /// Returns the existing project state or creates a new one and returns `Err([`Missing`])`. + /// + /// The returned [`Missing`] value must be used to trigger a fetch for this project + /// or it will stay pending forever. + pub fn get_or_create(&self, project_key: ProjectKey) -> Result { + // TODO: do we need to check for expiry here? + // TODO: if yes, we need to include the timestamp in the shared project state. + // TODO: grace periods? + + // The fast path, we expect the project to exist. + let projects = self.projects.pin(); + if let Some(project) = projects.get(&project_key) { + return Ok(project.to_shared_project()); + } + + // The slow path, try to attempt to insert, somebody else may have been faster, but that's okay. + match projects.try_insert(project_key, Default::default()) { + Ok(inserted) => Err(Missing { + project_key, + shared_project: inserted.to_shared_project(), + }), + Err(occupied) => Ok(occupied.current.to_shared_project()), + } + } +} + +impl fmt::Debug for Shared { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Shared") + .field("num_projects", &self.projects.len()) + .finish() + } +} + +#[must_use = "a missing project must be fetched"] +pub struct Missing { + project_key: ProjectKey, + shared_project: SharedProject, +} + +impl Missing { + pub fn fetch(self, project_cache: &relay_system::Addr) -> SharedProject { + project_cache.send(ProjectCache::Fetch(self.project_key)); + self.shared_project + } +} + +pub struct SharedProject(Arc); + +impl SharedProject { + pub fn project_state(&self) -> &ProjectState { + &self.0.state + } + + pub fn cached_rate_limits(&self) -> &CachedRateLimits { + // TODO: exposing cached rate limits may be a bad idea, this allows mutation + // and caching of rate limits for pending projects, which may or may not be fine. + // + // Read only access is easily achievable if we return only the current rate limits. + &self.0.rate_limits + } + + pub fn reservoir_counters(&self) -> &ReservoirCounters { + &self.0.reservoir_counters + } +} + +#[derive(Debug, Default, Clone)] +struct SharedProjectState(Arc>); + +impl SharedProjectState { + fn set_project_state(&self, state: ProjectState) { + let prev = self.0.rcu(|stored| SharedProjectStateInner { + state: state.clone(), + rate_limits: Arc::clone(&stored.rate_limits), + reservoir_counters: Arc::clone(&stored.reservoir_counters), + }); + + // Try clean expired reservoir counters. + // + // We do it after the `rcu`, to not re-run this more often than necessary. + if let Some(state) = state.enabled() { + let config = state.config.sampling.as_ref(); + if let Some(config) = config.and_then(|eb| eb.as_ref().ok()) { + // We can safely use previous here, the `rcu` just replaced the state, the + // reservoir counters did not change. + // + // `try_lock` to not potentially block, it's a best effort cleanup. + // + // TODO: Remove the lock, we already have interior mutability with the `ArcSwap` + // and the counters themselves can be atomics. + if let Ok(mut counters) = prev.reservoir_counters.try_lock() { + counters.retain(|key, _| config.rules.iter().any(|rule| rule.id == *key)); + } + } + } + } + + fn merge_rate_limits(&self, rate_limits: RateLimits) { + self.0.load().rate_limits.merge(rate_limits) + } + + fn to_shared_project(&self) -> SharedProject { + SharedProject(self.0.as_ref().load_full()) + } +} + +/// The data contained in a [`SharedProjectState`]. +/// +/// All fields must be cheap to clone and are ideally just a single `Arc`. +/// Partial updates to [`SharedProjectState`], are performed using `rcu` cloning all fields. +#[derive(Debug, Default)] +struct SharedProjectStateInner { + state: ProjectState, + rate_limits: Arc, + reservoir_counters: ReservoirCounters, +} + +struct PrivateProjectState { + project_key: ProjectKey, + current_fetch: Option<()>, + + last_non_pending_fetch: Option, + + next_fetch_attempt: Option, + backoff: RetryBackoff, +} + +impl PrivateProjectState { + fn new(project_key: ProjectKey, config: &Config) -> Self { + Self { + project_key, + current_fetch: None, + last_non_pending_fetch: None, + next_fetch_attempt: None, + backoff: RetryBackoff::new(config.http_max_retry_interval()), + } + } + + fn has_fetch_in_progress(&self) -> bool { + self.current_fetch.is_some() + } + + fn try_begin_fetch(&mut self, config: &Config) -> Option { + if self.current_fetch.is_some() { + // Already a fetch in progress. + relay_log::trace!( + project_key = self.project_key.as_str(), + "project fetch skipped, fetch in progress" + ); + return None; + } + + if matches!(self.check_expiry(config), Expiry::Updated) { + // The current state is up to date, no need to start another fetch. + relay_log::trace!( + project_key = self.project_key.as_str(), + "project fetch skipped, already up to date" + ); + return None; + } + + // Mark a current fetch in progress. + self.current_fetch.insert(()); + + // Schedule a new fetch, even if there is a backoff, it will just be sleeping for a while. + let when = self.next_fetch_attempt.take().unwrap_or_else(Instant::now); + + relay_log::debug!( + project_key = &self.project_key.as_str(), + attempts = self.backoff.attempt() + 1, + "project state fetch scheduled in {:?}", + when.saturating_duration_since(Instant::now()), + ); + + Some(Fetch { + project_key: self.project_key, + when, + }) + } + + fn complete_fetch(&mut self, fetch: &CompletedFetch) { + if fetch.project_state.is_pending() { + self.next_fetch_attempt = Instant::now().checked_add(self.backoff.next_backoff()); + } else { + debug_assert!( + self.next_fetch_attempt.is_none(), + "the scheduled fetch should have cleared the next attempt" + ); + self.next_fetch_attempt = None; + self.backoff.reset(); + self.last_non_pending_fetch = Some(Instant::now()); + } + + let _current_fetch = self.current_fetch.take(); + debug_assert!( + _current_fetch.is_some(), + "fetch completed while there was no current fetch registered" + ); + } + + fn check_expiry(&self, config: &Config) -> Expiry { + let Some(last_fetch) = self.last_non_pending_fetch else { + return Expiry::Expired; + }; + + let expiry = config.project_cache_expiry(); + let elapsed = last_fetch.elapsed(); + + if elapsed >= expiry + config.project_grace_period() { + Expiry::Expired + } else if elapsed >= expiry { + Expiry::Stale + } else { + Expiry::Updated + } + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +enum Expiry { + /// The project state is perfectly up to date. + Updated, + /// The project state is outdated but events depending on this project state can still be + /// processed. The state should be refreshed in the background though. + Stale, + /// The project state is completely outdated and events need to be buffered up until the new + /// state has been fetched. + Expired, +} + +#[derive(Debug)] +pub struct Fetch { + project_key: ProjectKey, + when: Instant, +} + +impl Fetch { + pub fn project_key(&self) -> ProjectKey { + self.project_key + } + + pub fn when(&self) -> Instant { + self.when + } + + pub fn complete(self, state: ProjectState) -> CompletedFetch { + CompletedFetch { + project_key: self.project_key, + project_state: state, + } + } +} + +pub struct CompletedFetch { + project_key: ProjectKey, + project_state: ProjectState, +} + +impl CompletedFetch { + pub fn project_key(&self) -> ProjectKey { + self.project_key + } + + pub fn project_state(&self) -> &ProjectState { + &self.project_state + } +} diff --git a/relay-server/src/services/projects/mod.rs b/relay-server/src/services/projects/mod.rs index bc8f253eee7..ef8b6fead27 100644 --- a/relay-server/src/services/projects/mod.rs +++ b/relay-server/src/services/projects/mod.rs @@ -1,3 +1,4 @@ pub mod cache; +pub mod cache2; pub mod project; pub mod source; diff --git a/relay-server/src/services/projects/project/mod.rs b/relay-server/src/services/projects/project/mod.rs index cf4f387c9a8..16421edec93 100644 --- a/relay-server/src/services/projects/project/mod.rs +++ b/relay-server/src/services/projects/project/mod.rs @@ -1,753 +1,6 @@ -use std::sync::Arc; -use std::time::Duration; - -use relay_base_schema::project::ProjectKey; -use relay_config::Config; -use relay_dynamic_config::{ErrorBoundary, Feature}; -use relay_metrics::Bucket; -use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits, Scoping}; -use relay_sampling::evaluation::ReservoirCounters; -use relay_statsd::metric; -use relay_system::{Addr, BroadcastChannel}; -use serde::{Deserialize, Serialize}; -use tokio::time::Instant; - -use crate::envelope::ItemType; -use crate::services::metrics::{Aggregator, MergeBuckets}; -use crate::services::outcome::{DiscardReason, Outcome}; -use crate::services::processor::ProcessProjectMetrics; -use crate::services::projects::cache::{ - CheckedEnvelope, ProcessMetrics, ProjectCache, RequestUpdate, -}; -use crate::utils::{Enforcement, SeqCount}; - -use crate::statsd::RelayCounters; -use crate::utils::{CheckLimits, EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; - pub mod state; pub use state::{ ExpiryState, LimitedParsedProjectState, ParsedProjectState, ProjectFetchState, ProjectInfo, ProjectState, }; - -/// Sender type for messages that respond with project states. -pub type ProjectSender = relay_system::BroadcastSender; - -/// Represents a public key received from the projectconfig endpoint. -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct PublicKeyConfig { - /// Public part of key (random hash). - pub public_key: ProjectKey, - - /// The primary key of the DSN in Sentry's main database. - #[serde(default, skip_serializing_if = "Option::is_none")] - pub numeric_id: Option, -} - -/// Channel used to respond to state requests (e.g. by project config endpoint). -#[derive(Debug)] -struct StateChannel { - inner: BroadcastChannel, - no_cache: bool, -} - -impl StateChannel { - pub fn new() -> Self { - Self { - inner: BroadcastChannel::new(), - no_cache: false, - } - } - - pub fn no_cache(&mut self, no_cache: bool) -> &mut Self { - self.no_cache = no_cache; - self - } -} - -#[derive(Debug)] -enum GetOrFetch<'a> { - Cached(ProjectState), - Scheduled(&'a mut StateChannel), -} - -/// Structure representing organization and project configuration for a project key. -/// -/// This structure no longer uniquely identifies a project. Instead, it identifies a project key. -/// Projects can define multiple keys, in which case this structure is duplicated for each instance. -#[derive(Debug)] -pub struct Project { - backoff: RetryBackoff, - next_fetch_attempt: Option, - last_updated_at: Instant, - project_key: ProjectKey, - config: Arc, - state: ProjectFetchState, - state_channel: Option, - rate_limits: CachedRateLimits, - last_no_cache: Instant, - reservoir_counters: ReservoirCounters, -} - -impl Project { - /// Creates a new `Project`. - pub fn new(key: ProjectKey, config: Arc) -> Self { - Project { - backoff: RetryBackoff::new(config.http_max_retry_interval()), - next_fetch_attempt: None, - last_updated_at: Instant::now(), - project_key: key, - state: ProjectFetchState::expired(), - state_channel: None, - rate_limits: CachedRateLimits::new(), - last_no_cache: Instant::now(), - reservoir_counters: Arc::default(), - config, - } - } - - /// Returns the [`ReservoirCounters`] for the project. - pub fn reservoir_counters(&self) -> ReservoirCounters { - self.reservoir_counters.clone() - } - - /// Returns the current [`ProjectState`] attached to the project. - pub fn current_state(&self) -> ProjectState { - self.state.current_state(&self.config) - } - - /// Returns the currently active cached rate limits. - pub fn current_rate_limits(&mut self) -> &RateLimits { - self.rate_limits.current_limits() - } - - /// If a reservoir rule is no longer in the sampling config, we will remove those counters. - fn remove_expired_reservoir_rules(&self) { - let Some(state) = self.current_state().enabled() else { - return; - }; - - let Some(ErrorBoundary::Ok(config)) = state.config.sampling.as_ref() else { - return; - }; - - // Using try_lock to not slow down the project cache service. - if let Ok(mut guard) = self.reservoir_counters.try_lock() { - guard.retain(|key, _| config.rules.iter().any(|rule| rule.id == *key)); - } - } - - pub fn merge_rate_limits(&mut self, rate_limits: RateLimits) { - self.rate_limits.merge(rate_limits); - } - - /// Returns the next attempt `Instant` if backoff is initiated, or None otherwise. - pub fn next_fetch_attempt(&self) -> Option { - self.next_fetch_attempt - } - - /// The last time the project state was updated - pub fn last_updated_at(&self) -> Instant { - self.last_updated_at - } - - /// Refresh the update time of the project in order to delay eviction. - /// - /// Called by the project cache when the project state is refreshed. - pub fn refresh_updated_timestamp(&mut self) { - self.last_updated_at = Instant::now(); - } - - /// Collects internal project state and assembles a [`ProcessProjectMetrics`] message. - pub fn process_metrics(&mut self, message: ProcessMetrics) -> ProcessProjectMetrics { - let project_state = self.current_state(); - let rate_limits = self.rate_limits.current_limits().clone(); - - ProcessProjectMetrics { - project_state, - rate_limits, - - data: message.data, - project_key: message.project_key, - source: message.source, - start_time: message.start_time.into(), - sent_at: message.sent_at, - } - } - - /// Returns a list of buckets back to the aggregator. - /// - /// This is used to return flushed buckets back to the aggregator if the project has not been - /// loaded at the time of flush. - /// - /// Buckets at this stage are expected to be validated already. - pub fn return_buckets(&self, aggregator: &Addr, buckets: Vec) { - aggregator.send(MergeBuckets::new( - self.project_key, - buckets.into_iter().collect(), - )); - } - - /// Returns `true` if backoff expired and new attempt can be triggered. - fn can_fetch(&self) -> bool { - self.next_fetch_attempt - .map(|next_attempt_at| next_attempt_at <= Instant::now()) - .unwrap_or(true) - } - - /// Triggers a debounced refresh of the project state. - /// - /// If the state is already being updated in the background, this method checks if the request - /// needs to be upgraded with the `no_cache` flag to ensure a more recent update. - fn fetch_state( - &mut self, - project_cache: Addr, - no_cache: bool, - ) -> &mut StateChannel { - // If there is a running request and we do not need to upgrade it to no_cache, or if the - // backoff is started and the new attempt is still somewhere in the future, skip - // scheduling a new fetch. - let should_fetch = !matches!(self.state_channel, Some(ref channel) if channel.no_cache || !no_cache) - && self.can_fetch(); - - let channel = self.state_channel.get_or_insert_with(StateChannel::new); - - if should_fetch { - channel.no_cache(no_cache); - let attempts = self.backoff.attempt() + 1; - relay_log::debug!( - "project {} state requested {attempts} times", - self.project_key - ); - project_cache.send(RequestUpdate { - project_key: self.project_key, - no_cache, - cached_state: self.state.clone(), - }); - } - - channel - } - - fn get_or_fetch_state( - &mut self, - project_cache: Addr, - mut no_cache: bool, - ) -> GetOrFetch<'_> { - // count number of times we are looking for the project state - metric!(counter(RelayCounters::ProjectStateGet) += 1); - - // Allow at most 1 no_cache request per second. Gracefully degrade to cached requests. - if no_cache { - if self.last_no_cache.elapsed() < Duration::from_secs(1) { - no_cache = false; - } else { - metric!(counter(RelayCounters::ProjectStateNoCache) += 1); - self.last_no_cache = Instant::now(); - } - } - - let cached_state = match self.state.expiry_state(&self.config) { - // Never use the cached state if `no_cache` is set. - _ if no_cache => None, - // There is no project state that can be used, fetch a state and return it. - ExpiryState::Expired => None, - // The project is semi-outdated, fetch new state but return old one. - ExpiryState::Stale(state) => Some(state.clone()), - // The project is not outdated, return early here to jump over fetching logic below. - ExpiryState::Updated(state) => return GetOrFetch::Cached(state.clone()), - }; - - let channel = self.fetch_state(project_cache, no_cache); - - match cached_state { - Some(state) => GetOrFetch::Cached(state), - None => GetOrFetch::Scheduled(channel), - } - } - - /// Returns the cached project state if it is valid. - /// - /// Depending on the state of the cache, this method takes different action: - /// - /// - If the cached state is up-to-date, this method simply returns `Some`. - /// - If the cached state is stale, this method triggers a refresh in the background and - /// returns `Some`. The stale period can be configured through - /// [`Config::project_grace_period`]. - /// - If there is no cached state or the cached state is fully outdated, this method triggers a - /// refresh in the background and returns `None`. - /// - /// If `no_cache` is set to true, this method always returns `None` and always triggers a - /// background refresh. - /// - /// To wait for a valid state instead, use [`get_state`](Self::get_state). - pub fn get_cached_state( - &mut self, - project_cache: Addr, - no_cache: bool, - ) -> ProjectState { - match self.get_or_fetch_state(project_cache, no_cache) { - GetOrFetch::Cached(state) => state, - GetOrFetch::Scheduled(_) => ProjectState::Pending, - } - } - - /// Obtains a valid project state and passes it to the sender once ready. - /// - /// This first checks if the state needs to be updated. This is the case if the project state - /// has passed its cache timeout. The `no_cache` flag forces an update. This does nothing if an - /// update is already running in the background. - /// - /// Independent of updating, _stale_ states are passed to the sender immediately as long as they - /// are in the [grace period](Config::project_grace_period). - pub fn get_state( - &mut self, - project_cache: Addr, - sender: ProjectSender, - no_cache: bool, - ) { - match self.get_or_fetch_state(project_cache, no_cache) { - GetOrFetch::Cached(state) => { - sender.send(state); - } - - GetOrFetch::Scheduled(channel) => { - channel.inner.attach(sender); - } - } - } - - /// Ensures the project state gets updated. - /// - /// This first checks if the state needs to be updated. This is the case if the project state - /// has passed its cache timeout. The `no_cache` flag forces another update unless one is - /// already running in the background. - /// - /// If an update is required, the update will start in the background and complete at a later - /// point. Therefore, this method is useful to trigger an update early if it is already clear - /// that the project state will be needed soon. To retrieve an updated state, use - /// [`Project::get_state`] instead. - pub fn prefetch(&mut self, project_cache: Addr, no_cache: bool) -> &mut Self { - self.get_cached_state(project_cache, no_cache); - self - } - - /// Replaces the internal project state with a new one and triggers pending actions. - /// - /// Returns the *old* project state if it was replaced. - /// - /// This flushes pending envelopes from [`ValidateEnvelope`] and - /// notifies all pending receivers from [`get_state`](Self::get_state). - /// - /// `no_cache` should be passed from the requesting call. Updates with `no_cache` will always - /// take precedence. - /// - /// [`ValidateEnvelope`]: crate::services::projects::cache::ValidateEnvelope - pub fn update_state( - &mut self, - project_cache: &Addr, - state: ProjectFetchState, - no_cache: bool, - ) -> Option { - // Initiate the backoff if the incoming state is invalid. Reset it otherwise. - if state.is_pending() { - self.next_fetch_attempt = Instant::now().checked_add(self.backoff.next_backoff()); - } else { - self.next_fetch_attempt = None; - self.backoff.reset(); - } - - let Some(channel) = self.state_channel.take() else { - relay_log::error!(tags.project_key = %self.project_key, "channel is missing for the state update"); - return None; - }; - - // If the channel has `no_cache` set but we are not a `no_cache` request, we have - // been superseeded. Put it back and let the other request take precedence. - if channel.no_cache && !no_cache { - self.state_channel = Some(channel); - return None; - } - - // If the state is pending, return back the taken channel and schedule state update. - if state.is_pending() { - // Only overwrite if the old state is expired: - let is_expired = matches!(self.state.expiry_state(&self.config), ExpiryState::Expired); - let old_state = match is_expired { - true => Some(std::mem::replace(&mut self.state, state)), - false => None, - }; - - self.state_channel = Some(channel); - let attempts = self.backoff.attempt() + 1; - relay_log::debug!( - "project {} state requested {attempts} times", - self.project_key - ); - - project_cache.send(RequestUpdate { - project_key: self.project_key, - no_cache, - cached_state: self.state.clone(), - }); - return old_state; - } - - let old_state = std::mem::replace(&mut self.state, state); - - // Flush all waiting recipients. - relay_log::debug!("project state {} updated", self.project_key); - channel.inner.send(self.state.current_state(&self.config)); - - self.after_state_updated(); - - Some(old_state) - } - - /// Called after all state validations and after the project state is updated. - /// - /// See also: [`Self::update_state`]. - fn after_state_updated(&mut self) { - // Check if the new sampling config got rid of any reservoir rules we have counters for. - self.remove_expired_reservoir_rules(); - } - - /// Creates `Scoping` for this project if the state is loaded. - /// - /// Returns `Some` if the project state has been fetched and contains a project identifier, - /// otherwise `None`. - pub fn scoping(&self) -> Option { - self.current_state().scoping(self.project_key) - } - - /// Runs the checks on incoming envelopes. - /// - /// See, [`crate::services::projects::cache::CheckEnvelope`] for more information - /// - /// * checks the rate limits - /// * validates the envelope meta in `check_request` - determines whether the given request - /// should be accepted or discarded - /// - /// IMPORTANT: If the [`ProjectState`] is invalid, the `check_request` will be skipped and only - /// rate limits will be validated. This function **must not** be called in the main processing - /// pipeline. - pub fn check_envelope( - &mut self, - mut envelope: ManagedEnvelope, - ) -> Result { - let state = match self.current_state() { - ProjectState::Enabled(state) => Some(state.clone()), - ProjectState::Disabled => { - // TODO(jjbayer): We should refactor this function to either return a Result or - // handle envelope rejections internally, but not both. - envelope.reject(Outcome::Invalid(DiscardReason::ProjectId)); - return Err(DiscardReason::ProjectId); - } - ProjectState::Pending => None, - }; - let mut scoping = envelope.scoping(); - - if let Some(ref state) = state { - scoping = state.scope_request(envelope.envelope().meta()); - envelope.scope(scoping); - - if let Err(reason) = state.check_envelope(envelope.envelope(), &self.config) { - envelope.reject(Outcome::Invalid(reason)); - return Err(reason); - } - } - - let current_limits = self.rate_limits.current_limits(); - - let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]); - let envelope_limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |item_scoping, _| { - Ok(current_limits.check_with_quotas(quotas, item_scoping)) - }); - - let (mut enforcement, mut rate_limits) = - envelope_limiter.compute(envelope.envelope_mut(), &scoping)?; - - let check_nested_spans = state - .as_ref() - .is_some_and(|s| s.has_feature(Feature::ExtractSpansFromEvent)); - - // If we can extract spans from the event, we want to try and count the number of nested - // spans to correctly emit negative outcomes in case the transaction itself is dropped. - if check_nested_spans { - sync_spans_to_enforcement(&envelope, &mut enforcement); - } - - enforcement.apply_with_outcomes(&mut envelope); - - envelope.update(); - - // Special case: Expose active rate limits for all metric namespaces if there is at least - // one metrics item in the Envelope to communicate backoff to SDKs. This is necessary - // because `EnvelopeLimiter` cannot not check metrics without parsing item contents. - if envelope.envelope().items().any(|i| i.ty().is_metrics()) { - let mut metrics_scoping = scoping.item(DataCategory::MetricBucket); - metrics_scoping.namespace = MetricNamespaceScoping::Any; - rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping)); - } - - let envelope = if envelope.envelope().is_empty() { - // Individual rate limits have already been issued above - envelope.reject(Outcome::RateLimited(None)); - None - } else { - Some(envelope) - }; - - Ok(CheckedEnvelope { - envelope, - rate_limits, - }) - } -} - -/// Adds category limits for the nested spans inside a transaction. -/// -/// On the fast path of rate limiting, we do not have nested spans of a transaction extracted -/// as top-level spans, thus if we limited a transaction, we want to count and emit negative -/// outcomes for each of the spans nested inside that transaction. -fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) { - if !enforcement.is_event_active() { - return; - } - - let spans_count = count_nested_spans(envelope); - if spans_count == 0 { - return; - } - - if enforcement.event.is_active() { - enforcement.spans = enforcement.event.clone_for(DataCategory::Span, spans_count); - } - - if enforcement.event_indexed.is_active() { - enforcement.spans_indexed = enforcement - .event_indexed - .clone_for(DataCategory::SpanIndexed, spans_count); - } -} - -/// Counts the nested spans inside the first transaction envelope item inside the [`Envelope`](crate::envelope::Envelope). -fn count_nested_spans(envelope: &ManagedEnvelope) -> usize { - #[derive(Debug, Deserialize)] - struct PartialEvent { - spans: SeqCount, - } - - envelope - .envelope() - .items() - .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted()) - .and_then(|item| serde_json::from_slice::(&item.payload()).ok()) - // We do + 1, since we count the transaction itself because it will be extracted - // as a span and counted during the slow path of rate limiting. - .map_or(0, |event| event.spans.0 + 1) -} - -#[cfg(test)] -mod tests { - use crate::envelope::{ContentType, Envelope, Item}; - use crate::extractors::RequestMeta; - use crate::services::processor::ProcessingGroup; - use relay_base_schema::project::ProjectId; - use relay_event_schema::protocol::EventId; - use relay_test::mock_service; - use serde_json::json; - use smallvec::SmallVec; - - use super::*; - - #[test] - fn get_state_expired() { - for expiry in [9999, 0] { - let config = Arc::new( - Config::from_json_value(json!( - { - "cache": { - "project_expiry": expiry, - "project_grace_period": 0, - "eviction_interval": 9999 // do not evict - } - } - )) - .unwrap(), - ); - - // Initialize project with a state - let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); - let project_info = ProjectInfo { - project_id: Some(ProjectId::new(123)), - ..Default::default() - }; - let mut project = Project::new(project_key, config.clone()); - project.state = ProjectFetchState::enabled(project_info); - - if expiry > 0 { - // With long expiry, should get a state - assert!(matches!(project.current_state(), ProjectState::Enabled(_))); - } else { - // With 0 expiry, project should expire immediately. No state can be set. - assert!(matches!(project.current_state(), ProjectState::Pending)); - } - } - } - - #[tokio::test] - async fn test_stale_cache() { - let (addr, _) = mock_service("project_cache", (), |&mut (), _| {}); - - let config = Arc::new( - Config::from_json_value(json!( - { - "cache": { - "project_expiry": 100, - "project_grace_period": 0, - "eviction_interval": 9999 // do not evict - } - } - )) - .unwrap(), - ); - - let channel = StateChannel::new(); - - // Initialize project with a state. - let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); - let mut project = Project::new(project_key, config); - project.state_channel = Some(channel); - project.state = ProjectFetchState::allowed(); - - assert!(project.next_fetch_attempt.is_none()); - // Try to update project with errored project state. - project.update_state(&addr, ProjectFetchState::pending(), false); - // Since we got invalid project state we still keep the old one meaning there - // still must be the project id set. - assert!(matches!(project.current_state(), ProjectState::Enabled(_))); - assert!(project.next_fetch_attempt.is_some()); - - // This tests that we actually initiate the backoff and the backoff mechanism works: - // * first call to `update_state` with invalid ProjectState starts the backoff, but since - // it's the first attemt, we get Duration of 0. - // * second call to `update_state` here will bumpt the `next_backoff` Duration to somehing - // like ~ 1s - // * and now, by calling `fetch_state` we test that it's a noop, since if backoff is active - // we should never fetch - // * without backoff it would just panic, not able to call the ProjectCache service - let channel = StateChannel::new(); - project.state_channel = Some(channel); - project.update_state(&addr, ProjectFetchState::pending(), false); - project.fetch_state(addr, false); - } - - fn create_project(config: Option) -> Project { - let project_key = ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(); - let mut project = Project::new(project_key, Arc::new(Config::default())); - let mut project_info = ProjectInfo { - project_id: Some(ProjectId::new(42)), - ..Default::default() - }; - let mut public_keys = SmallVec::new(); - public_keys.push(PublicKeyConfig { - public_key: project_key, - numeric_id: None, - }); - project_info.public_keys = public_keys; - if let Some(config) = config { - project_info.config = serde_json::from_value(config).unwrap(); - } - project.state = ProjectFetchState::enabled(project_info); - project - } - - fn request_meta() -> RequestMeta { - let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" - .parse() - .unwrap(); - - RequestMeta::new(dsn) - } - - #[test] - fn test_track_nested_spans_outcomes() { - let mut project = create_project(Some(json!({ - "features": [ - "organizations:indexed-spans-extraction" - ], - "quotas": [{ - "id": "foo", - "categories": ["transaction"], - "window": 3600, - "limit": 0, - "reasonCode": "foo", - }] - }))); - - let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta()); - - let mut transaction = Item::new(ItemType::Transaction); - transaction.set_payload( - ContentType::Json, - r#"{ - "event_id": "52df9022835246eeb317dbd739ccd059", - "type": "transaction", - "transaction": "I have a stale timestamp, but I'm recent!", - "start_timestamp": 1, - "timestamp": 2, - "contexts": { - "trace": { - "trace_id": "ff62a8b040f340bda5d830223def1d81", - "span_id": "bd429c44b67a3eb4" - } - }, - "spans": [ - { - "span_id": "bd429c44b67a3eb4", - "start_timestamp": 1, - "timestamp": null, - "trace_id": "ff62a8b040f340bda5d830223def1d81" - }, - { - "span_id": "bd429c44b67a3eb5", - "start_timestamp": 1, - "timestamp": null, - "trace_id": "ff62a8b040f340bda5d830223def1d81" - } - ] -}"#, - ); - - envelope.add_item(transaction); - - let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom(); - let (test_store, _) = Addr::custom(); - - let managed_envelope = ManagedEnvelope::new( - envelope, - outcome_aggregator.clone(), - test_store, - ProcessingGroup::Transaction, - ); - - let _ = project.check_envelope(managed_envelope); - drop(outcome_aggregator); - - let expected = [ - (DataCategory::Transaction, 1), - (DataCategory::TransactionIndexed, 1), - (DataCategory::Span, 3), - (DataCategory::SpanIndexed, 3), - ]; - - for (expected_category, expected_quantity) in expected { - let outcome = outcome_aggregator_rx.blocking_recv().unwrap(); - assert_eq!(outcome.category, expected_category); - assert_eq!(outcome.quantity, expected_quantity); - } - } -} diff --git a/relay-server/src/services/projects/project/state/fetch_state.rs b/relay-server/src/services/projects/project/state/fetch_state.rs index 020150f303a..3a61f838b80 100644 --- a/relay-server/src/services/projects/project/state/fetch_state.rs +++ b/relay-server/src/services/projects/project/state/fetch_state.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use tokio::time::Instant; -use relay_config::Config; use relay_dynamic_config::ProjectConfig; use crate::services::projects::project::state::info::ProjectInfo; @@ -88,47 +87,6 @@ impl ProjectFetchState { matches!(self.state, ProjectState::Pending) } - /// Returns information about the expiry of a project state. - /// - /// If no detailed information is needed, use [`Self::current_state`] instead. - pub fn expiry_state(&self, config: &Config) -> ExpiryState { - match self.check_expiry(config) { - Expiry::Updated => ExpiryState::Updated(&self.state), - Expiry::Stale => ExpiryState::Stale(&self.state), - Expiry::Expired => ExpiryState::Expired, - } - } - - /// Returns the current project state, if it has not yet expired. - pub fn current_state(&self, config: &Config) -> ProjectState { - match self.expiry_state(config) { - ExpiryState::Updated(state) | ExpiryState::Stale(state) => state.clone(), - ExpiryState::Expired => ProjectState::Pending, - } - } - - /// Returns whether this state is outdated and needs to be refetched. - fn check_expiry(&self, config: &Config) -> Expiry { - let Some(last_fetch) = self.last_fetch else { - return Expiry::Expired; - }; - let expiry = match &self.state { - ProjectState::Enabled(info) if info.project_id.is_some() => { - config.project_cache_expiry() - } - _ => config.cache_miss_expiry(), - }; - - let elapsed = last_fetch.elapsed(); - if elapsed >= expiry + config.project_grace_period() { - Expiry::Expired - } else if elapsed >= expiry { - Expiry::Stale - } else { - Expiry::Updated - } - } - /// Returns the revision of the contained project state. /// /// See: [`ProjectState::revision`]. @@ -137,26 +95,8 @@ impl ProjectFetchState { } } -/// Wrapper for a project state, with expiry information. -#[derive(Clone, Copy, Debug)] -pub enum ExpiryState<'a> { - /// An up-to-date project state. See [`Expiry::Updated`]. - Updated(&'a ProjectState), - /// A stale project state that can still be used. See [`Expiry::Stale`]. - Stale(&'a ProjectState), - /// An expired project state that should not be used. See [`Expiry::Expired`]. - Expired, -} - -/// The expiry status of a project state. Return value of [`ProjectFetchState::check_expiry`]. -#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] -enum Expiry { - /// The project state is perfectly up to date. - Updated, - /// The project state is outdated but events depending on this project state can still be - /// processed. The state should be refreshed in the background though. - Stale, - /// The project state is completely outdated and events need to be buffered up until the new - /// state has been fetched. - Expired, +impl From for ProjectState { + fn from(value: ProjectFetchState) -> Self { + value.state + } } diff --git a/relay-server/src/services/projects/project/state/mod.rs b/relay-server/src/services/projects/project/state/mod.rs index 868ad5ed4a9..efd7094589f 100644 --- a/relay-server/src/services/projects/project/state/mod.rs +++ b/relay-server/src/services/projects/project/state/mod.rs @@ -13,7 +13,7 @@ pub use self::fetch_state::{ExpiryState, ProjectFetchState}; pub use self::info::{LimitedProjectInfo, ProjectInfo}; /// Representation of a project's current state. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub enum ProjectState { /// A valid project that is not disabled. Enabled(Arc), @@ -25,6 +25,7 @@ pub enum ProjectState { /// - The upstream returned "pending" for this project (see [`crate::services::projects::source::upstream`]). /// - The upstream returned an unparsable project so we have to try again. /// - The project has expired and must be treated as "has not been fetched". + #[default] Pending, }