Skip to content

Commit

Permalink
fix all tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Oct 24, 2024
1 parent 1e94723 commit 7cba95b
Show file tree
Hide file tree
Showing 18 changed files with 225 additions and 303 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ jobs:
- run: make test-integration
env:
PYTEST_N: 6
RELAY_VERSION_CHAIN: "20.6.0,latest"
RELAY_VERSION_CHAIN: "23.12.0,latest"

sentry-relay-integration-tests:
name: Sentry-Relay Integration Tests
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
**Breaking Changes**:

- Removes support for metric meta envelope items. ([#4152](https://github.com/getsentry/relay/pull/4152))
- Removes support for the project cache endpoint version 2 and before. ([#4147](https://github.com/getsentry/relay/pull/4147))

## 24.10.0

Expand Down
2 changes: 1 addition & 1 deletion relay-quotas/src/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ mod tests {

#[test]
fn test_cached_rate_limits_expired() {
let mut cached = CachedRateLimits::new();
let cached = CachedRateLimits::new();

// Active error limit
cached.add(RateLimit {
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/endpoints/project_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ fn is_outdated(Query(query): Query<VersionQuery>) -> bool {

/// Returns `true` if the `?version` query parameter is compatible with this implementation.
fn is_compatible(Query(query): Query<VersionQuery>) -> bool {
query.version >= ENDPOINT_V3 && query.version <= ENDPOINT_V3
query.version == ENDPOINT_V3
}

/// Endpoint handler for the project configs endpoint.
Expand Down
50 changes: 19 additions & 31 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,14 +488,16 @@ mod tests {
service: EnvelopeBufferService,
global_tx: watch::Sender<global_config::Status>,
envelopes_rx: mpsc::Receiver<legacy::DequeuedEnvelope>,
project_cache_rx: mpsc::UnboundedReceiver<legacy::ProjectCache>,
project_cache_handle: ProjectCacheHandle,
outcome_aggregator_rx: mpsc::UnboundedReceiver<TrackOutcome>,
}

fn envelope_buffer_service(
config_json: Option<serde_json::Value>,
global_config_status: global_config::Status,
) -> EnvelopeBufferServiceResult {
relay_log::init_test!();

let config_json = config_json.unwrap_or(serde_json::json!({
"spool": {
"envelopes": {
Expand All @@ -508,16 +510,16 @@ mod tests {
let memory_stat = MemoryStat::default();
let (global_tx, global_rx) = watch::channel(global_config_status);
let (envelopes_tx, envelopes_rx) = mpsc::channel(5);
let (project_cache, project_cache_rx) = Addr::custom();
let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom();
let project_cache_handle = ProjectCacheHandle::for_test();

let envelope_buffer_service = EnvelopeBufferService::new(
config,
memory_stat,
global_rx,
Services {
envelopes_tx,
project_cache,
project_cache_handle: project_cache_handle.clone(),
outcome_aggregator,
test_store: Addr::dummy(),
},
Expand All @@ -528,7 +530,7 @@ mod tests {
service: envelope_buffer_service,
global_tx,
envelopes_rx,
project_cache_rx,
project_cache_handle,
outcome_aggregator_rx,
}
}
Expand All @@ -541,7 +543,7 @@ mod tests {
service,
global_tx: _global_tx,
envelopes_rx: _envelopes_rx,
project_cache_rx: _project_cache_rx,
project_cache_handle: _project_cache_handle,
outcome_aggregator_rx: _outcome_aggregator_rx,
} = envelope_buffer_service(None, global_config::Status::Pending);

Expand All @@ -566,8 +568,8 @@ mod tests {
service,
global_tx,
envelopes_rx,
project_cache_rx,
outcome_aggregator_rx: _outcome_aggregator_rx,
..
} = envelope_buffer_service(None, global_config::Status::Pending);

let addr = service.start();
Expand All @@ -580,7 +582,6 @@ mod tests {
tokio::time::sleep(Duration::from_millis(1000)).await;

assert_eq!(envelopes_rx.len(), 0);
assert_eq!(project_cache_rx.len(), 0);

global_tx.send_replace(global_config::Status::Ready(Arc::new(
GlobalConfig::default(),
Expand All @@ -589,7 +590,6 @@ mod tests {
tokio::time::sleep(Duration::from_millis(1000)).await;

assert_eq!(envelopes_rx.len(), 1);
assert_eq!(project_cache_rx.len(), 0);
}

#[tokio::test]
Expand All @@ -599,9 +599,9 @@ mod tests {
let EnvelopeBufferServiceResult {
service,
envelopes_rx,
project_cache_rx,
outcome_aggregator_rx: _outcome_aggregator_rx,
global_tx: _global_tx,
..
} = envelope_buffer_service(
Some(serde_json::json!({
"spool": {
Expand All @@ -627,7 +627,6 @@ mod tests {
tokio::time::sleep(Duration::from_millis(1000)).await;

assert_eq!(envelopes_rx.len(), 0);
assert_eq!(project_cache_rx.len(), 0);
}

#[tokio::test]
Expand All @@ -637,7 +636,7 @@ mod tests {
let EnvelopeBufferServiceResult {
service,
envelopes_rx,
project_cache_rx,
project_cache_handle: _project_cache_handle,
mut outcome_aggregator_rx,
global_tx: _global_tx,
} = envelope_buffer_service(
Expand All @@ -664,7 +663,6 @@ mod tests {
tokio::time::sleep(Duration::from_millis(100)).await;

assert_eq!(envelopes_rx.len(), 0);
assert_eq!(project_cache_rx.len(), 0);

let outcome = outcome_aggregator_rx.try_recv().unwrap();
assert_eq!(outcome.category, DataCategory::TransactionIndexed);
Expand All @@ -678,7 +676,7 @@ mod tests {
let EnvelopeBufferServiceResult {
service,
mut envelopes_rx,
mut project_cache_rx,
project_cache_handle,
global_tx: _global_tx,
outcome_aggregator_rx: _outcome_aggregator_rx,
} = envelope_buffer_service(
Expand All @@ -693,30 +691,20 @@ mod tests {

addr.send(EnvelopeBuffer::Push(envelope.clone()));

tokio::time::sleep(Duration::from_secs(1)).await;

let Some(DequeuedEnvelope(envelope)) = envelopes_rx.recv().await else {
let message = tokio::time::timeout(Duration::from_secs(3), envelopes_rx.recv());
let Some(legacy::DequeuedEnvelope(envelope)) = message.await.unwrap() else {
panic!();
};

addr.send(EnvelopeBuffer::NotReady(project_key, envelope));

tokio::time::sleep(Duration::from_millis(100)).await;

assert_eq!(project_cache_rx.len(), 1);
let message = project_cache_rx.recv().await;
assert!(matches!(
message,
Some(ProjectCache::UpdateProject(key)) if key == project_key
));
assert_eq!(project_cache_handle.test_num_fetches(), 1);

tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_millis(1300)).await;

assert_eq!(project_cache_rx.len(), 1);
assert!(matches!(
message,
Some(ProjectCache::UpdateProject(key)) if key == project_key
))
assert_eq!(project_cache_handle.test_num_fetches(), 2);
}

#[tokio::test]
Expand All @@ -727,8 +715,8 @@ mod tests {
service,
mut envelopes_rx,
global_tx: _global_tx,
project_cache_rx: _project_cache_rx,
outcome_aggregator_rx: _outcome_aggregator_rx,
..
} = envelope_buffer_service(
None,
global_config::Status::Ready(Arc::new(GlobalConfig::default())),
Expand All @@ -751,7 +739,7 @@ mod tests {
assert_eq!(
messages
.iter()
.filter(|message| matches!(message, DequeuedEnvelope(..)))
.filter(|message| matches!(message, legacy::DequeuedEnvelope(..)))
.count(),
5
);
Expand All @@ -764,7 +752,7 @@ mod tests {
assert_eq!(
messages
.iter()
.filter(|message| matches!(message, DequeuedEnvelope(..)))
.filter(|message| matches!(message, legacy::DequeuedEnvelope(..)))
.count(),
5
);
Expand Down
103 changes: 48 additions & 55 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3702,8 +3702,6 @@ mod tests {
] {
let message = ProcessMetrics {
data: MetricData::Raw(vec![item.clone()]),
project_state: ProjectState::Pending,
rate_limits: Default::default(),
project_key,
source,
start_time,
Expand All @@ -3727,10 +3725,11 @@ mod tests {
let start_time = Instant::now();
let config = Config::default();

let (project_cache, mut project_cache_rx) = Addr::custom();
let (aggregator, mut aggregator_rx) = Addr::custom();
let processor = create_test_processor_with_addrs(
config,
Addrs {
aggregator,
..Default::default()
},
);
Expand Down Expand Up @@ -3773,78 +3772,72 @@ mod tests {
};
processor.handle_process_batched_metrics(&mut token, message);

let value = project_cache_rx.recv().await.unwrap();
let legacy::ProjectCache::ProcessMetrics(pm1) = value else {
let value = aggregator_rx.recv().await.unwrap();
let Aggregator::MergeBuckets(mb1) = value else {
panic!()
};
let value = project_cache_rx.recv().await.unwrap();
let legacy::ProjectCache::ProcessMetrics(pm2) = value else {
let value = aggregator_rx.recv().await.unwrap();
let Aggregator::MergeBuckets(mb2) = value else {
panic!()
};

let mut messages = vec![pm1, pm2];
let mut messages = vec![mb1, mb2];
messages.sort_by_key(|pm| pm.project_key);

let actual = messages
.into_iter()
.map(|pm| (pm.project_key, pm.data, pm.source))
.map(|pm| (pm.project_key, pm.buckets))
.collect::<Vec<_>>();

assert_debug_snapshot!(actual, @r###"
[
(
ProjectKey("11111111111111111111111111111111"),
Parsed(
[
Bucket {
timestamp: UnixTimestamp(1615889440),
width: 0,
name: MetricName(
"d:custom/endpoint.response_time@millisecond",
),
value: Distribution(
[
68.0,
],
),
tags: {
"route": "user_index",
},
metadata: BucketMetadata {
merges: 1,
received_at: None,
extracted_from_indexed: false,
},
[
Bucket {
timestamp: UnixTimestamp(1615889440),
width: 0,
name: MetricName(
"d:custom/endpoint.response_time@millisecond",
),
value: Distribution(
[
68.0,
],
),
tags: {
"route": "user_index",
},
],
),
Internal,
metadata: BucketMetadata {
merges: 1,
received_at: None,
extracted_from_indexed: false,
},
},
],
),
(
ProjectKey("22222222222222222222222222222222"),
Parsed(
[
Bucket {
timestamp: UnixTimestamp(1615889440),
width: 0,
name: MetricName(
"d:custom/endpoint.cache_rate@none",
),
value: Distribution(
[
36.0,
],
),
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
extracted_from_indexed: false,
},
[
Bucket {
timestamp: UnixTimestamp(1615889440),
width: 0,
name: MetricName(
"d:custom/endpoint.cache_rate@none",
),
value: Distribution(
[
36.0,
],
),
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
extracted_from_indexed: false,
},
],
),
Internal,
},
],
),
]
"###);
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ mod tests {
extracted_metrics: ProcessingExtractedMetrics::new(),
config: Arc::new(Config::default()),
project_info,
rate_limits: RateLimits::default(),
rate_limits: Arc::new(RateLimits::default()),
sampling_project_info: None,
project_id: ProjectId::new(42),
managed_envelope: managed_envelope.try_into().unwrap(),
Expand Down
Loading

0 comments on commit 7cba95b

Please sign in to comment.