Skip to content

Commit

Permalink
pre rm metric meta rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Oct 18, 2024
1 parent 76d15be commit e4eab90
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 18 deletions.
8 changes: 5 additions & 3 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use crate::services::outcome::DiscardReason;
use crate::services::outcome::Outcome;
use crate::services::outcome::TrackOutcome;
use crate::services::processor::ProcessingGroup;
use crate::services::projects::cache::{DequeuedEnvelope, ProjectCache, UpdateProject};
use crate::services::projects::cache::DequeuedEnvelope;

use crate::services::projects::cache2::ProjectCacheHandle;
use crate::services::test_store::TestStore;
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::ManagedEnvelope;
Expand Down Expand Up @@ -100,7 +101,7 @@ pub struct Services {
/// Bounded channel used exclusively to handle backpressure when sending envelopes to the
/// project cache.
pub envelopes_tx: mpsc::Sender<DequeuedEnvelope>,
pub project_cache: Addr<ProjectCache>,
pub project_cache_handle: ProjectCacheHandle,
pub outcome_aggregator: Addr<TrackOutcome>,
pub test_store: Addr<TestStore>,
}
Expand Down Expand Up @@ -269,12 +270,13 @@ impl EnvelopeBufferService {
relay_log::trace!("EnvelopeBufferService: requesting project(s) update");
let own_key = envelope.meta().public_key();

services.project_cache_handle.fetch(own_key);
services.project_cache.send(UpdateProject(own_key));
match envelope.sampling_key() {
None => {}
Some(sampling_key) if sampling_key == own_key => {} // already sent.
Some(sampling_key) => {
services.project_cache.send(UpdateProject(sampling_key));
services.project_cache_handle.fetch(sampling_key);
}
}

Expand Down
30 changes: 21 additions & 9 deletions relay-server/src/services/projects/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ pub enum ProjectCache {
ValidateEnvelope(ValidateEnvelope),
// UpdateRateLimits(UpdateRateLimits),
// ProcessMetrics(ProcessMetrics),
<<<<<<< HEAD
=======
// AddMetricMeta(AddMetricMeta),
>>>>>>> da6fbcc87 (pre rm metric meta rebase)
FlushBuckets(FlushBuckets),
UpdateSpoolIndex(UpdateSpoolIndex),
RefreshIndexCache(RefreshIndexCache),
Expand Down Expand Up @@ -975,6 +979,16 @@ impl ProjectCacheBroker {
// self.services.envelope_processor.send(message);
// }

<<<<<<< HEAD
=======
// fn handle_add_metric_meta(&mut self, message: AddMetricMeta) {
// let envelope_processor = self.services.envelope_processor.clone();
//
// self.get_or_create_project(message.project_key)
// .add_metric_meta(message.meta, envelope_processor);
// }

>>>>>>> da6fbcc87 (pre rm metric meta rebase)
fn handle_flush_buckets(&mut self, message: FlushBuckets) {
let aggregator = self.services.aggregator.clone();

Expand Down Expand Up @@ -1179,15 +1193,9 @@ impl ProjectCacheBroker {
/// Which includes the own key and the sampling key for the project.
/// Note: this function will trigger [`ProjectState`] refresh if it's already expired.
fn is_state_cached(&mut self, key: &QueueKey) -> bool {
key.unique_keys().iter().all(|key| {
self.projects.get_mut(key).is_some_and(|project| {
// Returns `Some` if the project is cached otherwise None and also triggers refresh
// in background.
!project
.get_cached_state(self.services.project_cache.clone(), false)
.is_pending()
})
})
key.unique_keys()
.iter()
.all(|key| !self.projects.get(*key).project_state().is_pending())
}

/// Iterates the buffer index and tries to unspool the envelopes for projects with a valid
Expand Down Expand Up @@ -1264,6 +1272,10 @@ impl ProjectCacheBroker {
}
// ProjectCache::UpdateRateLimits(message) => self.handle_rate_limits(message),
// ProjectCache::ProcessMetrics(message) => self.handle_process_metrics(message),
<<<<<<< HEAD
=======
// ProjectCache::AddMetricMeta(message) => self.handle_add_metric_meta(message),
>>>>>>> da6fbcc87 (pre rm metric meta rebase)
ProjectCache::FlushBuckets(message) => self.handle_flush_buckets(message),
ProjectCache::UpdateSpoolIndex(message) => self.handle_buffer_index(message),
ProjectCache::RefreshIndexCache(message) => {
Expand Down
8 changes: 8 additions & 0 deletions relay-server/src/services/projects/cache2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use tokio::sync::mpsc;

use crate::services::buffer::EnvelopeBuffer;
use crate::services::projects::cache::ProjectSource;
use crate::services::projects::cache2::state::{CompletedFetch, Fetch};
use crate::services::projects::project::{ProjectFetchState, ProjectState};
Expand All @@ -27,6 +28,8 @@ pub struct ProjectCacheService {
source: ProjectSource,
config: Arc<Config>,

buffer: relay_system::Addr<EnvelopeBuffer>,

project_update_rx: mpsc::UnboundedReceiver<CompletedFetch>,
project_update_tx: mpsc::UnboundedSender<CompletedFetch>,
}
Expand Down Expand Up @@ -68,13 +71,18 @@ impl ProjectCacheService {
}

fn handle_project_update(&mut self, fetch: CompletedFetch) {
let project_key = fetch.project_key();

if let Some(fetch) = self.store.complete_fetch(fetch, &self.config) {
relay_log::trace!(
project_key = fetch.project_key().as_str(),
"re-scheduling project fetch: {fetch:?}"
);
self.schedule_fetch(fetch);
return;
}

self.buffer.send(EnvelopeBuffer::Ready(project_key));
}

fn handle_evict_stale_projects(&mut self) {
Expand Down
16 changes: 10 additions & 6 deletions relay-server/src/services/projects/cache2/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ impl ProjectRef<'_> {

// Keep the old state around if the current fetch is pending.
// It may still be useful to callers.
if !fetch.state.is_pending() {
self.shared.set_project_state(fetch.state);
if !fetch.project_state.is_pending() {
self.shared.set_project_state(fetch.project_state);
}
}
}
Expand Down Expand Up @@ -370,7 +370,7 @@ impl PrivateProjectState {
}

fn complete_fetch(&mut self, fetch: &CompletedFetch) {
if fetch.state.is_pending() {
if fetch.project_state.is_pending() {
self.next_fetch_attempt = Instant::now().checked_add(self.backoff.next_backoff());
} else {
debug_assert!(
Expand Down Expand Up @@ -437,18 +437,22 @@ impl Fetch {
pub fn complete(self, state: ProjectState) -> CompletedFetch {
CompletedFetch {
project_key: self.project_key,
state,
project_state: state,
}
}
}

pub struct CompletedFetch {
project_key: ProjectKey,
state: ProjectState,
project_state: ProjectState,
}

impl CompletedFetch {
fn project_key(&self) -> ProjectKey {
pub fn project_key(&self) -> ProjectKey {
self.project_key
}

pub fn project_state(&self) -> &ProjectState {
&self.project_state
}
}

0 comments on commit e4eab90

Please sign in to comment.