Skip to content

Commit

Permalink
project cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Oct 18, 2024
1 parent 6799ec4 commit a4775d9
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 823 deletions.
9 changes: 1 addition & 8 deletions relay-server/src/services/projects/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tokio::sync::{mpsc, watch};

use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets};
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::projects::project::{Project, ProjectFetchState, ProjectState};
use crate::services::projects::project::{ProjectFetchState, ProjectState};
use crate::services::projects::source::local::{LocalProjectSource, LocalProjectSourceService};
#[cfg(feature = "processing")]
use crate::services::projects::source::redis::RedisProjectSource;
Expand Down Expand Up @@ -1064,17 +1064,10 @@ impl FetchOptionalProjectState {
#[derive(Debug)]
#[allow(dead_code)] // Fields are never read, only used for discarding/dropping data.
enum ProjectGarbage {
Project(Project),
ProjectFetchState(ProjectFetchState),
Metrics(Vec<Bucket>),
}

impl From<Project> for ProjectGarbage {
fn from(value: Project) -> Self {
Self::Project(value)
}
}

impl From<ProjectFetchState> for ProjectGarbage {
fn from(value: ProjectFetchState) -> Self {
Self::ProjectFetchState(value)
Expand Down
1 change: 0 additions & 1 deletion relay-server/src/services/projects/cache2/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
mod handle;
mod project;
mod refresh;
mod service;
mod state;

Expand Down
202 changes: 202 additions & 0 deletions relay-server/src/services/projects/cache2/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,205 @@ fn count_nested_spans(envelope: &ManagedEnvelope) -> usize {
// as a span and counted during the slow path of rate limiting.
.map_or(0, |event| event.spans.0 + 1)
}

#[cfg(test)]
mod tests {
use crate::envelope::{ContentType, Envelope, Item};
use crate::extractors::RequestMeta;
use crate::services::processor::ProcessingGroup;
use relay_base_schema::project::ProjectId;
use relay_event_schema::protocol::EventId;
use relay_test::mock_service;
use serde_json::json;
use smallvec::SmallVec;

use super::*;

#[test]
fn get_state_expired() {
for expiry in [9999, 0] {
let config = Arc::new(
Config::from_json_value(json!(
{
"cache": {
"project_expiry": expiry,
"project_grace_period": 0,
"eviction_interval": 9999 // do not evict
}
}
))
.unwrap(),
);

// Initialize project with a state
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let project_info = ProjectInfo {
project_id: Some(ProjectId::new(123)),
..Default::default()
};
let mut project = Project::new(project_key, config.clone());
project.state = ProjectFetchState::enabled(project_info);

if expiry > 0 {
// With long expiry, should get a state
assert!(matches!(project.current_state(), ProjectState::Enabled(_)));
} else {
// With 0 expiry, project should expire immediately. No state can be set.
assert!(matches!(project.current_state(), ProjectState::Pending));
}
}
}

#[tokio::test]
async fn test_stale_cache() {
let (addr, _) = mock_service("project_cache", (), |&mut (), _| {});

let config = Arc::new(
Config::from_json_value(json!(
{
"cache": {
"project_expiry": 100,
"project_grace_period": 0,
"eviction_interval": 9999 // do not evict
}
}
))
.unwrap(),
);

let channel = StateChannel::new();

// Initialize project with a state.
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let mut project = Project::new(project_key, config);
project.state_channel = Some(channel);
project.state = ProjectFetchState::allowed();

assert!(project.next_fetch_attempt.is_none());
// Try to update project with errored project state.
project.update_state(&addr, ProjectFetchState::pending(), false);
// Since we got invalid project state we still keep the old one meaning there
// still must be the project id set.
assert!(matches!(project.current_state(), ProjectState::Enabled(_)));
assert!(project.next_fetch_attempt.is_some());

// This tests that we actually initiate the backoff and the backoff mechanism works:
// * first call to `update_state` with invalid ProjectState starts the backoff, but since
// it's the first attemt, we get Duration of 0.
// * second call to `update_state` here will bumpt the `next_backoff` Duration to somehing
// like ~ 1s
// * and now, by calling `fetch_state` we test that it's a noop, since if backoff is active
// we should never fetch
// * without backoff it would just panic, not able to call the ProjectCache service
let channel = StateChannel::new();
project.state_channel = Some(channel);
project.update_state(&addr, ProjectFetchState::pending(), false);
project.fetch_state(addr, false);
}

fn create_project(config: Option<serde_json::Value>) -> Project {
let project_key = ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap();
let mut project = Project::new(project_key, Arc::new(Config::default()));
let mut project_info = ProjectInfo {
project_id: Some(ProjectId::new(42)),
..Default::default()
};
let mut public_keys = SmallVec::new();
public_keys.push(PublicKeyConfig {
public_key: project_key,
numeric_id: None,
});
project_info.public_keys = public_keys;
if let Some(config) = config {
project_info.config = serde_json::from_value(config).unwrap();
}
project.state = ProjectFetchState::enabled(project_info);
project
}

fn request_meta() -> RequestMeta {
let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
.parse()
.unwrap();

RequestMeta::new(dsn)
}

#[test]
fn test_track_nested_spans_outcomes() {
let mut project = create_project(Some(json!({
"features": [
"organizations:indexed-spans-extraction"
],
"quotas": [{
"id": "foo",
"categories": ["transaction"],
"window": 3600,
"limit": 0,
"reasonCode": "foo",
}]
})));

let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());

let mut transaction = Item::new(ItemType::Transaction);
transaction.set_payload(
ContentType::Json,
r#"{
"event_id": "52df9022835246eeb317dbd739ccd059",
"type": "transaction",
"transaction": "I have a stale timestamp, but I'm recent!",
"start_timestamp": 1,
"timestamp": 2,
"contexts": {
"trace": {
"trace_id": "ff62a8b040f340bda5d830223def1d81",
"span_id": "bd429c44b67a3eb4"
}
},
"spans": [
{
"span_id": "bd429c44b67a3eb4",
"start_timestamp": 1,
"timestamp": null,
"trace_id": "ff62a8b040f340bda5d830223def1d81"
},
{
"span_id": "bd429c44b67a3eb5",
"start_timestamp": 1,
"timestamp": null,
"trace_id": "ff62a8b040f340bda5d830223def1d81"
}
]
}"#,
);

envelope.add_item(transaction);

let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom();
let (test_store, _) = Addr::custom();

let managed_envelope = ManagedEnvelope::new(
envelope,
outcome_aggregator.clone(),
test_store,
ProcessingGroup::Transaction,
);

let _ = project.check_envelope(managed_envelope);
drop(outcome_aggregator);

let expected = [
(DataCategory::Transaction, 1),
(DataCategory::TransactionIndexed, 1),
(DataCategory::Span, 3),
(DataCategory::SpanIndexed, 3),
];

for (expected_category, expected_quantity) in expected {
let outcome = outcome_aggregator_rx.blocking_recv().unwrap();
assert_eq!(outcome.category, expected_category);
assert_eq!(outcome.quantity, expected_quantity);
}
}
}
1 change: 0 additions & 1 deletion relay-server/src/services/projects/cache2/refresh.rs

This file was deleted.

Loading

0 comments on commit a4775d9

Please sign in to comment.