From 7cba95b6672e1573b4296a9c4d3a0ef1e296969b Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 23 Oct 2024 16:32:21 +0200 Subject: [PATCH] fix all tests --- .github/workflows/ci.yml | 2 +- CHANGELOG.md | 1 + relay-quotas/src/rate_limit.rs | 2 +- relay-server/src/endpoints/project_configs.rs | 2 +- relay-server/src/services/buffer/mod.rs | 50 ++---- relay-server/src/services/processor.rs | 103 ++++++----- .../src/services/processor/span/processing.rs | 2 +- .../src/services/projects/cache/handle.rs | 35 ++++ .../src/services/projects/cache/legacy.rs | 97 ++--------- .../src/services/projects/cache/project.rs | 162 +++++------------- .../src/services/projects/cache/service.rs | 2 +- .../src/services/projects/cache/state.rs | 40 ++++- .../src/services/projects/project/info.rs | 9 +- .../src/services/projects/project/mod.rs | 2 +- .../src/services/projects/source/upstream.rs | 4 +- relay-server/src/testutils.rs | 5 +- tests/integration/test_projectconfigs.py | 2 + tests/integration/test_query.py | 8 +- 18 files changed, 225 insertions(+), 303 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 28548848d31..8287d3842ed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -553,7 +553,7 @@ jobs: - run: make test-integration env: PYTEST_N: 6 - RELAY_VERSION_CHAIN: "20.6.0,latest" + RELAY_VERSION_CHAIN: "23.12.0,latest" sentry-relay-integration-tests: name: Sentry-Relay Integration Tests diff --git a/CHANGELOG.md b/CHANGELOG.md index c75570ec2fe..9084b6ecf3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ **Breaking Changes**: - Removes support for metric meta envelope items. ([#4152](https://github.com/getsentry/relay/pull/4152)) +- Removes support for the project cache endpoint version 2 and before. ([#4147](https://github.com/getsentry/relay/pull/4147)) ## 24.10.0 diff --git a/relay-quotas/src/rate_limit.rs b/relay-quotas/src/rate_limit.rs index 0c2a80de543..a77baaf1738 100644 --- a/relay-quotas/src/rate_limit.rs +++ b/relay-quotas/src/rate_limit.rs @@ -1149,7 +1149,7 @@ mod tests { #[test] fn test_cached_rate_limits_expired() { - let mut cached = CachedRateLimits::new(); + let cached = CachedRateLimits::new(); // Active error limit cached.add(RateLimit { diff --git a/relay-server/src/endpoints/project_configs.rs b/relay-server/src/endpoints/project_configs.rs index 54080088377..38876be44c9 100644 --- a/relay-server/src/endpoints/project_configs.rs +++ b/relay-server/src/endpoints/project_configs.rs @@ -230,7 +230,7 @@ fn is_outdated(Query(query): Query) -> bool { /// Returns `true` if the `?version` query parameter is compatible with this implementation. fn is_compatible(Query(query): Query) -> bool { - query.version >= ENDPOINT_V3 && query.version <= ENDPOINT_V3 + query.version == ENDPOINT_V3 } /// Endpoint handler for the project configs endpoint. diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 80c90b04c02..5546889283f 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -488,7 +488,7 @@ mod tests { service: EnvelopeBufferService, global_tx: watch::Sender, envelopes_rx: mpsc::Receiver, - project_cache_rx: mpsc::UnboundedReceiver, + project_cache_handle: ProjectCacheHandle, outcome_aggregator_rx: mpsc::UnboundedReceiver, } @@ -496,6 +496,8 @@ mod tests { config_json: Option, global_config_status: global_config::Status, ) -> EnvelopeBufferServiceResult { + relay_log::init_test!(); + let config_json = config_json.unwrap_or(serde_json::json!({ "spool": { "envelopes": { @@ -508,8 +510,8 @@ mod tests { let memory_stat = MemoryStat::default(); let (global_tx, global_rx) = watch::channel(global_config_status); let (envelopes_tx, envelopes_rx) = mpsc::channel(5); - let (project_cache, project_cache_rx) = Addr::custom(); let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom(); + let project_cache_handle = ProjectCacheHandle::for_test(); let envelope_buffer_service = EnvelopeBufferService::new( config, @@ -517,7 +519,7 @@ mod tests { global_rx, Services { envelopes_tx, - project_cache, + project_cache_handle: project_cache_handle.clone(), outcome_aggregator, test_store: Addr::dummy(), }, @@ -528,7 +530,7 @@ mod tests { service: envelope_buffer_service, global_tx, envelopes_rx, - project_cache_rx, + project_cache_handle, outcome_aggregator_rx, } } @@ -541,7 +543,7 @@ mod tests { service, global_tx: _global_tx, envelopes_rx: _envelopes_rx, - project_cache_rx: _project_cache_rx, + project_cache_handle: _project_cache_handle, outcome_aggregator_rx: _outcome_aggregator_rx, } = envelope_buffer_service(None, global_config::Status::Pending); @@ -566,8 +568,8 @@ mod tests { service, global_tx, envelopes_rx, - project_cache_rx, outcome_aggregator_rx: _outcome_aggregator_rx, + .. } = envelope_buffer_service(None, global_config::Status::Pending); let addr = service.start(); @@ -580,7 +582,6 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000)).await; assert_eq!(envelopes_rx.len(), 0); - assert_eq!(project_cache_rx.len(), 0); global_tx.send_replace(global_config::Status::Ready(Arc::new( GlobalConfig::default(), @@ -589,7 +590,6 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000)).await; assert_eq!(envelopes_rx.len(), 1); - assert_eq!(project_cache_rx.len(), 0); } #[tokio::test] @@ -599,9 +599,9 @@ mod tests { let EnvelopeBufferServiceResult { service, envelopes_rx, - project_cache_rx, outcome_aggregator_rx: _outcome_aggregator_rx, global_tx: _global_tx, + .. } = envelope_buffer_service( Some(serde_json::json!({ "spool": { @@ -627,7 +627,6 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000)).await; assert_eq!(envelopes_rx.len(), 0); - assert_eq!(project_cache_rx.len(), 0); } #[tokio::test] @@ -637,7 +636,7 @@ mod tests { let EnvelopeBufferServiceResult { service, envelopes_rx, - project_cache_rx, + project_cache_handle: _project_cache_handle, mut outcome_aggregator_rx, global_tx: _global_tx, } = envelope_buffer_service( @@ -664,7 +663,6 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; assert_eq!(envelopes_rx.len(), 0); - assert_eq!(project_cache_rx.len(), 0); let outcome = outcome_aggregator_rx.try_recv().unwrap(); assert_eq!(outcome.category, DataCategory::TransactionIndexed); @@ -678,7 +676,7 @@ mod tests { let EnvelopeBufferServiceResult { service, mut envelopes_rx, - mut project_cache_rx, + project_cache_handle, global_tx: _global_tx, outcome_aggregator_rx: _outcome_aggregator_rx, } = envelope_buffer_service( @@ -693,9 +691,8 @@ mod tests { addr.send(EnvelopeBuffer::Push(envelope.clone())); - tokio::time::sleep(Duration::from_secs(1)).await; - - let Some(DequeuedEnvelope(envelope)) = envelopes_rx.recv().await else { + let message = tokio::time::timeout(Duration::from_secs(3), envelopes_rx.recv()); + let Some(legacy::DequeuedEnvelope(envelope)) = message.await.unwrap() else { panic!(); }; @@ -703,20 +700,11 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; - assert_eq!(project_cache_rx.len(), 1); - let message = project_cache_rx.recv().await; - assert!(matches!( - message, - Some(ProjectCache::UpdateProject(key)) if key == project_key - )); + assert_eq!(project_cache_handle.test_num_fetches(), 1); - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(1300)).await; - assert_eq!(project_cache_rx.len(), 1); - assert!(matches!( - message, - Some(ProjectCache::UpdateProject(key)) if key == project_key - )) + assert_eq!(project_cache_handle.test_num_fetches(), 2); } #[tokio::test] @@ -727,8 +715,8 @@ mod tests { service, mut envelopes_rx, global_tx: _global_tx, - project_cache_rx: _project_cache_rx, outcome_aggregator_rx: _outcome_aggregator_rx, + .. } = envelope_buffer_service( None, global_config::Status::Ready(Arc::new(GlobalConfig::default())), @@ -751,7 +739,7 @@ mod tests { assert_eq!( messages .iter() - .filter(|message| matches!(message, DequeuedEnvelope(..))) + .filter(|message| matches!(message, legacy::DequeuedEnvelope(..))) .count(), 5 ); @@ -764,7 +752,7 @@ mod tests { assert_eq!( messages .iter() - .filter(|message| matches!(message, DequeuedEnvelope(..))) + .filter(|message| matches!(message, legacy::DequeuedEnvelope(..))) .count(), 5 ); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 994f278b81d..35bd47ef9e5 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -3702,8 +3702,6 @@ mod tests { ] { let message = ProcessMetrics { data: MetricData::Raw(vec![item.clone()]), - project_state: ProjectState::Pending, - rate_limits: Default::default(), project_key, source, start_time, @@ -3727,10 +3725,11 @@ mod tests { let start_time = Instant::now(); let config = Config::default(); - let (project_cache, mut project_cache_rx) = Addr::custom(); + let (aggregator, mut aggregator_rx) = Addr::custom(); let processor = create_test_processor_with_addrs( config, Addrs { + aggregator, ..Default::default() }, ); @@ -3773,78 +3772,72 @@ mod tests { }; processor.handle_process_batched_metrics(&mut token, message); - let value = project_cache_rx.recv().await.unwrap(); - let legacy::ProjectCache::ProcessMetrics(pm1) = value else { + let value = aggregator_rx.recv().await.unwrap(); + let Aggregator::MergeBuckets(mb1) = value else { panic!() }; - let value = project_cache_rx.recv().await.unwrap(); - let legacy::ProjectCache::ProcessMetrics(pm2) = value else { + let value = aggregator_rx.recv().await.unwrap(); + let Aggregator::MergeBuckets(mb2) = value else { panic!() }; - let mut messages = vec![pm1, pm2]; + let mut messages = vec![mb1, mb2]; messages.sort_by_key(|pm| pm.project_key); let actual = messages .into_iter() - .map(|pm| (pm.project_key, pm.data, pm.source)) + .map(|pm| (pm.project_key, pm.buckets)) .collect::>(); assert_debug_snapshot!(actual, @r###" [ ( ProjectKey("11111111111111111111111111111111"), - Parsed( - [ - Bucket { - timestamp: UnixTimestamp(1615889440), - width: 0, - name: MetricName( - "d:custom/endpoint.response_time@millisecond", - ), - value: Distribution( - [ - 68.0, - ], - ), - tags: { - "route": "user_index", - }, - metadata: BucketMetadata { - merges: 1, - received_at: None, - extracted_from_indexed: false, - }, + [ + Bucket { + timestamp: UnixTimestamp(1615889440), + width: 0, + name: MetricName( + "d:custom/endpoint.response_time@millisecond", + ), + value: Distribution( + [ + 68.0, + ], + ), + tags: { + "route": "user_index", }, - ], - ), - Internal, + metadata: BucketMetadata { + merges: 1, + received_at: None, + extracted_from_indexed: false, + }, + }, + ], ), ( ProjectKey("22222222222222222222222222222222"), - Parsed( - [ - Bucket { - timestamp: UnixTimestamp(1615889440), - width: 0, - name: MetricName( - "d:custom/endpoint.cache_rate@none", - ), - value: Distribution( - [ - 36.0, - ], - ), - tags: {}, - metadata: BucketMetadata { - merges: 1, - received_at: None, - extracted_from_indexed: false, - }, + [ + Bucket { + timestamp: UnixTimestamp(1615889440), + width: 0, + name: MetricName( + "d:custom/endpoint.cache_rate@none", + ), + value: Distribution( + [ + 36.0, + ], + ), + tags: {}, + metadata: BucketMetadata { + merges: 1, + received_at: None, + extracted_from_indexed: false, }, - ], - ), - Internal, + }, + ], ), ] "###); diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index a9801e45f8c..10d44f6c17c 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -822,7 +822,7 @@ mod tests { extracted_metrics: ProcessingExtractedMetrics::new(), config: Arc::new(Config::default()), project_info, - rate_limits: RateLimits::default(), + rate_limits: Arc::new(RateLimits::default()), sampling_project_info: None, project_id: ProjectId::new(42), managed_envelope: managed_envelope.try_into().unwrap(), diff --git a/relay-server/src/services/projects/cache/handle.rs b/relay-server/src/services/projects/cache/handle.rs index 6c77558c667..7d203252ea7 100644 --- a/relay-server/src/services/projects/cache/handle.rs +++ b/relay-server/src/services/projects/cache/handle.rs @@ -49,3 +49,38 @@ impl fmt::Debug for ProjectCacheHandle { .finish() } } + +#[cfg(test)] +mod test { + use super::*; + use crate::services::projects::project::ProjectState; + + impl ProjectCacheHandle { + pub fn for_test() -> Self { + Self { + shared: Default::default(), + config: Default::default(), + service: Addr::dummy(), + project_events: broadcast::channel(999_999).0, + } + } + + pub fn test_set_project_state(&self, project_key: ProjectKey, state: ProjectState) { + let is_pending = state.is_pending(); + self.shared.test_set_project_state(project_key, state); + if is_pending { + let _ = self.project_events.send(ProjectEvent::Evicted(project_key)); + } else { + let _ = self.project_events.send(ProjectEvent::Ready(project_key)); + } + } + + pub fn test_has_project_created(&self, project_key: ProjectKey) -> bool { + self.shared.test_has_project_created(project_key) + } + + pub fn test_num_fetches(&self) -> u64 { + self.service.len() + } + } +} diff --git a/relay-server/src/services/projects/cache/legacy.rs b/relay-server/src/services/projects/cache/legacy.rs index c8175edf512..33850df77c4 100644 --- a/relay-server/src/services/projects/cache/legacy.rs +++ b/relay-server/src/services/projects/cache/legacy.rs @@ -31,15 +31,12 @@ use crate::utils::{ManagedEnvelope, MemoryChecker, RetryBackoff, SleepHandle}; /// Validates the envelope against project configuration and rate limits. /// -/// This ensures internally that the project state is up to date and then runs the same checks as -/// [`CheckEnvelope`]. Once the envelope has been validated, remaining items are forwarded to the -/// next stage: +/// This ensures internally that the project state is up to date . +/// Once the envelope has been validated, remaining items are forwarded to the next stage: /// /// - If the envelope needs dynamic sampling, and the project state is not cached or out of the /// date, the envelopes is spooled and we continue when the state is fetched. /// - Otherwise, the envelope is directly submitted to the [`EnvelopeProcessor`]. -/// -/// [`EnvelopeProcessor`]: crate::services::processor::EnvelopeProcessor #[derive(Debug)] pub struct ValidateEnvelope { envelope: ManagedEnvelope, @@ -76,21 +73,9 @@ pub struct RefreshIndexCache(pub HashSet); #[derive(Debug)] pub struct DequeuedEnvelope(pub Box); -/// A cache for [`ProjectState`]s. -/// -/// The project maintains information about organizations, projects, and project keys along with -/// settings required to ingest traffic to Sentry. Internally, it tries to keep information -/// up-to-date and automatically retires unused old data. -/// -/// To retrieve information from the cache, use [`GetProjectState`] for guaranteed up-to-date -/// information, or [`GetCachedProjectState`] for immediately available but potentially older -/// information. +/// The legacy project cache. /// -/// There are also higher-level operations, such as [`CheckEnvelope`] and [`ValidateEnvelope`] that -/// inspect contents of envelopes for ingestion, as well as [`ProcessMetrics`] to aggregate metrics -/// associated with a project. -/// -/// See the enumerated variants for a full list of available messages for this service. +/// It manages spool v1 and some remaining messages which handle project state. #[derive(Debug)] pub enum ProjectCache { ValidateEnvelope(ValidateEnvelope), @@ -893,7 +878,7 @@ mod tests { ProjectCacheBroker { config: config.clone(), memory_checker, - projects: todo!(), + projects: ProjectCacheHandle::for_test(), services, spool_v1_unspool_handle: SleepHandle::idle(), spool_v1: Some(SpoolV1 { @@ -916,9 +901,10 @@ mod tests { let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel(); let (mut broker, _buffer_svc) = project_cache_broker_setup(services.clone(), buffer_tx).await; + let projects = broker.projects.clone(); + let mut project_events = projects.events(); broker.global_config = GlobalConfigStatus::Ready; - let (tx_update, mut rx_update) = mpsc::unbounded_channel(); let (tx_assert, mut rx_assert) = mpsc::unbounded_channel(); let dsn1 = "111d836b15bb49d7bbf99e64295d995b"; @@ -945,11 +931,12 @@ mod tests { tokio::task::spawn(async move { loop { select! { - + Ok(project_event) = project_events.recv() => { + broker.handle_project_event(project_event); + } Some(assert) = rx_assert.recv() => { assert_eq!(broker.spool_v1.as_ref().unwrap().index.len(), assert); }, - Some(update) = rx_update.recv() => broker.merge_state(update), () = &mut broker.spool_v1_unspool_handle => broker.handle_periodic_unspool(), } } @@ -958,13 +945,11 @@ mod tests { // Before updating any project states. tx_assert.send(2).unwrap(); - let update_dsn1_project_state = UpdateProjectState { - project_key: ProjectKey::parse(dsn1).unwrap(), - state: ProjectFetchState::allowed(), - no_cache: false, - }; + projects.test_set_project_state( + ProjectKey::parse(dsn1).unwrap(), + ProjectState::new_allowed(), + ); - tx_update.send(update_dsn1_project_state).unwrap(); assert!(buffer_rx.recv().await.is_some()); // One of the project should be unspooled. tx_assert.send(1).unwrap(); @@ -972,61 +957,15 @@ mod tests { // Schedule some work... tokio::time::sleep(Duration::from_secs(2)).await; - let update_dsn2_project_state = UpdateProjectState { - project_key: ProjectKey::parse(dsn2).unwrap(), - state: ProjectFetchState::allowed(), - no_cache: false, - }; + projects.test_set_project_state( + ProjectKey::parse(dsn2).unwrap(), + ProjectState::new_allowed(), + ); - tx_update.send(update_dsn2_project_state).unwrap(); assert!(buffer_rx.recv().await.is_some()); // The last project should be unspooled. tx_assert.send(0).unwrap(); // Make sure the last assert is tested. tokio::time::sleep(Duration::from_millis(100)).await; } - - #[tokio::test] - async fn handle_processing_without_project() { - let services = mocked_services(); - let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel(); - let (mut broker, buffer_svc) = - project_cache_broker_setup(services.clone(), buffer_tx.clone()).await; - - let dsn = "111d836b15bb49d7bbf99e64295d995b"; - let project_key = ProjectKey::parse(dsn).unwrap(); - let key = QueueKey { - own_key: project_key, - sampling_key: project_key, - }; - let envelope = ManagedEnvelope::new( - empty_envelope_with_dsn(dsn), - services.outcome_aggregator.clone(), - services.test_store.clone(), - ProcessingGroup::Ungrouped, - ); - - // Index and projects are empty. - assert!(broker.spool_v1.as_mut().unwrap().index.is_empty()); - - // Since there is no project we should not process anything but create a project and spool - // the envelope. - broker.handle_processing(envelope); - - // Assert that we have a new project and also added an index. - assert!(!broker - .projects - .get(project_key) - .project_state() - .is_pending()); - assert!(broker.spool_v1.as_mut().unwrap().index.contains(&key)); - - // Check is we actually spooled anything. - buffer_svc.send(DequeueMany::new([key].into(), buffer_tx.clone())); - let UnspooledEnvelope { - managed_envelope, .. - } = buffer_rx.recv().await.unwrap(); - - assert_eq!(key, QueueKey::from_envelope(managed_envelope.envelope())); - } } diff --git a/relay-server/src/services/projects/cache/project.rs b/relay-server/src/services/projects/cache/project.rs index 239829e4d2f..93a50159c21 100644 --- a/relay-server/src/services/projects/cache/project.rs +++ b/relay-server/src/services/projects/cache/project.rs @@ -34,17 +34,15 @@ impl<'a> Project<'a> { self.shared.reservoir_counters() } - /// Runs the checks on incoming envelopes. + /// Checks the envelope against project configuration and rate limits. /// - /// See, [`crate::services::projects::cache::CheckEnvelope`] for more information + /// When `fetched`, then the project state is ensured to be up to date. When `cached`, an outdated + /// project state may be used, or otherwise the envelope is passed through unaltered. /// - /// * checks the rate limits - /// * validates the envelope meta in `check_request` - determines whether the given request - /// should be accepted or discarded - /// - /// IMPORTANT: If the [`ProjectState`] is invalid, the `check_request` will be skipped and only - /// rate limits will be validated. This function **must not** be called in the main processing - /// pipeline. + /// To check the envelope, this runs: + /// - Validate origins and public keys + /// - Quotas with a limit of `0` + /// - Cached rate limits pub fn check_envelope( &self, mut envelope: ManagedEnvelope, @@ -178,114 +176,32 @@ mod tests { use crate::envelope::{ContentType, Envelope, Item}; use crate::extractors::RequestMeta; use crate::services::processor::ProcessingGroup; - use relay_base_schema::project::ProjectId; + use crate::services::projects::project::{ProjectInfo, PublicKeyConfig}; + use relay_base_schema::project::{ProjectId, ProjectKey}; use relay_event_schema::protocol::EventId; - use relay_test::mock_service; use serde_json::json; - use smallvec::SmallVec; + use smallvec::smallvec; use super::*; - #[test] - fn get_state_expired() { - for expiry in [9999, 0] { - let config = Arc::new( - Config::from_json_value(json!( - { - "cache": { - "project_expiry": expiry, - "project_grace_period": 0, - "eviction_interval": 9999 // do not evict - } - } - )) - .unwrap(), - ); - - // Initialize project with a state - let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); - let project_info = ProjectInfo { - project_id: Some(ProjectId::new(123)), - ..Default::default() - }; - let mut project = Project::new(project_key, config.clone()); - project.state = ProjectFetchState::enabled(project_info); - - if expiry > 0 { - // With long expiry, should get a state - assert!(matches!(project.current_state(), ProjectState::Enabled(_))); - } else { - // With 0 expiry, project should expire immediately. No state can be set. - assert!(matches!(project.current_state(), ProjectState::Pending)); - } - } - } - - #[tokio::test] - async fn test_stale_cache() { - let (addr, _) = mock_service("project_cache", (), |&mut (), _| {}); - - let config = Arc::new( - Config::from_json_value(json!( - { - "cache": { - "project_expiry": 100, - "project_grace_period": 0, - "eviction_interval": 9999 // do not evict - } - } - )) - .unwrap(), - ); - - let channel = StateChannel::new(); - - // Initialize project with a state. - let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); - let mut project = Project::new(project_key, config); - project.state_channel = Some(channel); - project.state = ProjectFetchState::allowed(); - - assert!(project.next_fetch_attempt.is_none()); - // Try to update project with errored project state. - project.update_state(&addr, ProjectFetchState::pending(), false); - // Since we got invalid project state we still keep the old one meaning there - // still must be the project id set. - assert!(matches!(project.current_state(), ProjectState::Enabled(_))); - assert!(project.next_fetch_attempt.is_some()); - - // This tests that we actually initiate the backoff and the backoff mechanism works: - // * first call to `update_state` with invalid ProjectState starts the backoff, but since - // it's the first attemt, we get Duration of 0. - // * second call to `update_state` here will bumpt the `next_backoff` Duration to somehing - // like ~ 1s - // * and now, by calling `fetch_state` we test that it's a noop, since if backoff is active - // we should never fetch - // * without backoff it would just panic, not able to call the ProjectCache service - let channel = StateChannel::new(); - project.state_channel = Some(channel); - project.update_state(&addr, ProjectFetchState::pending(), false); - project.fetch_state(addr, false); - } - - fn create_project(config: Option) -> Project { - let project_key = ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(); - let mut project = Project::new(project_key, Arc::new(Config::default())); + fn create_project(config: &Config, data: Option) -> Project<'_> { let mut project_info = ProjectInfo { project_id: Some(ProjectId::new(42)), ..Default::default() }; - let mut public_keys = SmallVec::new(); - public_keys.push(PublicKeyConfig { - public_key: project_key, + project_info.public_keys = smallvec![PublicKeyConfig { + public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), numeric_id: None, - }); - project_info.public_keys = public_keys; - if let Some(config) = config { - project_info.config = serde_json::from_value(config).unwrap(); + }]; + + if let Some(data) = data { + project_info.config = serde_json::from_value(data).unwrap(); } - project.state = ProjectFetchState::enabled(project_info); - project + + Project::new( + SharedProject::for_test(ProjectState::Enabled(project_info.into())), + config, + ) } fn request_meta() -> RequestMeta { @@ -298,18 +214,22 @@ mod tests { #[test] fn test_track_nested_spans_outcomes() { - let mut project = create_project(Some(json!({ - "features": [ - "organizations:indexed-spans-extraction" - ], - "quotas": [{ - "id": "foo", - "categories": ["transaction"], - "window": 3600, - "limit": 0, - "reasonCode": "foo", - }] - }))); + let config = Default::default(); + let project = create_project( + &config, + Some(json!({ + "features": [ + "organizations:indexed-spans-extraction" + ], + "quotas": [{ + "id": "foo", + "categories": ["transaction"], + "window": 3600, + "limit": 0, + "reasonCode": "foo", + }] + })), + ); let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta()); @@ -347,8 +267,8 @@ mod tests { envelope.add_item(transaction); - let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom(); - let (test_store, _) = Addr::custom(); + let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom(); + let (test_store, _) = relay_system::Addr::custom(); let managed_envelope = ManagedEnvelope::new( envelope, @@ -357,7 +277,7 @@ mod tests { ProcessingGroup::Transaction, ); - let _ = project.check_envelope(managed_envelope); + project.check_envelope(managed_envelope).unwrap(); drop(outcome_aggregator); let expected = [ diff --git a/relay-server/src/services/projects/cache/service.rs b/relay-server/src/services/projects/cache/service.rs index 270f78a7a71..e61583de399 100644 --- a/relay-server/src/services/projects/cache/service.rs +++ b/relay-server/src/services/projects/cache/service.rs @@ -113,7 +113,7 @@ impl ProjectCacheService { // TODO: cached state for delta fetches, maybe this should just be a revision? let state = match source - .fetch(fetch.project_key(), false, Default::default()) + .fetch(fetch.project_key(), false, fetch.revision()) .await { Ok(result) => result, diff --git a/relay-server/src/services/projects/cache/state.rs b/relay-server/src/services/projects/cache/state.rs index 401578a9673..c6752689c0d 100644 --- a/relay-server/src/services/projects/cache/state.rs +++ b/relay-server/src/services/projects/cache/state.rs @@ -21,7 +21,7 @@ use crate::utils::RetryBackoff; /// /// [`Shared`] can be extended through [`Shared::get_or_create`], in which case /// the private state is missing. Users of [`Shared::get_or_create`] *must* trigger -/// a fetch to create the private state when [`Missing`] is returned. +/// a fetch to create the private state and keep it updated. /// This gurnatuees that eventually the project state is populated, but for a undetermined, /// time it is possible that shared state exists without the respective private state. #[derive(Default)] @@ -181,6 +181,24 @@ impl Shared { } } +/// TEST ONLY bypass to make the project cache mockable. +#[cfg(test)] +impl Shared { + /// Updates the project state for a project. + /// + /// TEST ONLY! + pub fn test_set_project_state(&self, project_key: ProjectKey, state: ProjectState) { + self.projects + .pin() + .get_or_insert_with(project_key, Default::default) + .set_project_state(state); + } + + pub fn test_has_project_created(&self, project_key: ProjectKey) -> bool { + self.projects.pin().contains_key(&project_key) + } +} + impl fmt::Debug for Shared { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Shared") @@ -214,6 +232,18 @@ impl SharedProject { } } +/// TEST ONLY bypass to make the project cache mockable. +#[cfg(test)] +impl SharedProject { + /// Creates a new [`SharedProject`] for testing only. + pub fn for_test(state: ProjectState) -> Self { + Self(Arc::new(SharedProjectStateInner { + state, + ..Default::default() + })) + } +} + /// Reference to a full project wrapping shared and private state. struct ProjectRef<'a> { shared: SharedProjectState, @@ -268,6 +298,14 @@ impl Fetch { self.when } + /// Returns the revisions of the currently cached project. + /// + /// If the upstream indicates it does not have a different version of this project + /// we do not need to update the local state. + pub fn revision(&self) -> Revision { + self.revision.clone() + } + /// Completes the fetch with a result and returns a [`CompletedFetch`]. pub fn complete(self, state: SourceProjectState) -> CompletedFetch { CompletedFetch { diff --git a/relay-server/src/services/projects/project/info.rs b/relay-server/src/services/projects/project/info.rs index aa37e46431e..ae477849ad1 100644 --- a/relay-server/src/services/projects/project/info.rs +++ b/relay-server/src/services/projects/project/info.rs @@ -188,9 +188,6 @@ impl ProjectInfo { /// This scoping amends `RequestMeta::get_partial_scoping` by adding organization and key info. /// The processor must fetch the full scoping before attempting to rate limit with partial /// scoping. - /// - /// To get the own scoping of this ProjectKey without amending request information, use - /// [`Project::scoping`](crate::services::projects::project::Project::scoping) instead. pub fn scope_request(&self, meta: &RequestMeta) -> Scoping { let mut scoping = meta.get_partial_scoping(); @@ -291,3 +288,9 @@ impl PartialEq for Revision { } } } + +impl From<&str> for Revision { + fn from(value: &str) -> Self { + Self(Some(value.into())) + } +} diff --git a/relay-server/src/services/projects/project/mod.rs b/relay-server/src/services/projects/project/mod.rs index 17fda43ef30..5e233273705 100644 --- a/relay-server/src/services/projects/project/mod.rs +++ b/relay-server/src/services/projects/project/mod.rs @@ -8,7 +8,7 @@ use relay_quotas::Scoping; mod info; -pub use self::info::{LimitedProjectInfo, ProjectInfo, Revision}; +pub use self::info::*; /// Representation of a project's current state. #[derive(Clone, Debug, Default)] diff --git a/relay-server/src/services/projects/source/upstream.rs b/relay-server/src/services/projects/source/upstream.rs index cab39d4c195..6f051c5ec42 100644 --- a/relay-server/src/services/projects/source/upstream.rs +++ b/relay-server/src/services/projects/source/upstream.rs @@ -692,7 +692,7 @@ mod tests { let mut response1 = service.send(FetchProjectState { project_key, - current_revision: Some("123".to_owned()), + current_revision: "123".into(), no_cache: false, }); @@ -703,7 +703,7 @@ mod tests { // request, after responding to the first inflight request. let mut response2 = service.send(FetchProjectState { project_key, - current_revision: None, + current_revision: Revision::default(), no_cache: false, }); diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index 358a2a929d4..56e9742925d 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -20,6 +20,7 @@ use crate::service::create_redis_pools; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::TrackOutcome; use crate::services::processor::{self, EnvelopeProcessorService}; +use crate::services::projects::cache::ProjectCacheHandle; use crate::services::projects::project::ProjectInfo; use crate::services::test_store::TestStore; use crate::utils::{ThreadPool, ThreadPoolBuilder}; @@ -117,7 +118,6 @@ pub fn empty_envelope_with_dsn(dsn: &str) -> Box { pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { let (outcome_aggregator, _) = mock_service("outcome_aggregator", (), |&mut (), _| {}); - let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {}); let (aggregator, _) = mock_service("aggregator", (), |&mut (), _| {}); let (upstream_relay, _) = mock_service("upstream_relay", (), |&mut (), _| {}); let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); @@ -132,12 +132,12 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { create_processor_pool(), Arc::clone(&config), GlobalConfigHandle::fixed(Default::default()), + ProjectCacheHandle::for_test(), Cogs::noop(), #[cfg(feature = "processing")] redis_pools, processor::Addrs { outcome_aggregator, - legacy_project_cache: project_cache, upstream_relay, test_store, #[cfg(feature = "processing")] @@ -162,6 +162,7 @@ pub fn create_test_processor_with_addrs( create_processor_pool(), Arc::clone(&config), GlobalConfigHandle::fixed(Default::default()), + ProjectCacheHandle::for_test(), Cogs::noop(), #[cfg(feature = "processing")] redis_pools, diff --git a/tests/integration/test_projectconfigs.py b/tests/integration/test_projectconfigs.py index 5d9bd7adf1a..f3e8c4ccefb 100644 --- a/tests/integration/test_projectconfigs.py +++ b/tests/integration/test_projectconfigs.py @@ -238,6 +238,8 @@ def assert_clear_test_failures(): relay.send_event(project_key) assert mini_sentry.captured_events.empty() + time.sleep(1) + # Fix the config. config = mini_sentry.project_configs[project_key] config["slug"] = "some-slug" diff --git a/tests/integration/test_query.py b/tests/integration/test_query.py index 9c3a40014a4..7acb4f2f802 100644 --- a/tests/integration/test_query.py +++ b/tests/integration/test_query.py @@ -85,6 +85,7 @@ def get_project_config(): "miss_expiry": 1, "project_expiry": 1, "project_grace_period": grace_period, + "eviction_interval": 1, } }, ) @@ -329,7 +330,8 @@ def get_project_config(): "cache": { "miss_expiry": 1, "project_expiry": 1, - "project_grace_period": 0, + "project_grace_period": 5, + "eviction_interval": 1, } }, ) @@ -348,8 +350,8 @@ def get_project_config(): relay.send_event(42) - assert project_config_fetch.wait(timeout=1) - assert with_rev.wait(timeout=1) + assert project_config_fetch.wait(timeout=2) + assert with_rev.wait(timeout=2) event = mini_sentry.captured_events.get(timeout=1).get_event() assert event["logentry"] == {"formatted": "Hello, World!"}