Skip to content

Commit

Permalink
ref(processor): Remove rate limiting field from state (#4394)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Dec 17, 2024
1 parent 2fcf51a commit 06b174a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 29 deletions.
76 changes: 52 additions & 24 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,10 +754,6 @@ struct ProcessEnvelopeState<Group> {
/// configuration objects in the project config.
extracted_metrics: ProcessingExtractedMetrics,

/// Currently active cached rate limits of the project this envelope belongs to.
#[cfg_attr(not(feature = "processing"), expect(dead_code))]
rate_limits: Arc<RateLimits>,

/// The managed envelope before processing.
managed_envelope: TypedEnvelope<Group>,
}
Expand Down Expand Up @@ -1246,6 +1242,7 @@ impl EnvelopeProcessorService {
&self,
state: &mut ProcessEnvelopeState<G>,
project_info: Arc<ProjectInfo>,
rate_limits: Arc<RateLimits>,
) -> Result<(), ProcessingError> {
let global_config = self.inner.global_config.current();
let rate_limiter = match self.inner.rate_limiter.as_ref() {
Expand All @@ -1255,11 +1252,20 @@ impl EnvelopeProcessorService {

// Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
// applied in the fast path, all cached quotas can be applied here.
let _ = RateLimiter::Cached.enforce(&global_config, state, project_info.clone())?;
let _ = RateLimiter::Cached.enforce(
&global_config,
state,
project_info.clone(),
rate_limits.clone(),
)?;

// Enforce all quotas consistently with Redis.
let limits =
RateLimiter::Consistent(rate_limiter).enforce(&global_config, state, project_info)?;
let limits = RateLimiter::Consistent(rate_limiter).enforce(
&global_config,
state,
project_info,
rate_limits,
)?;

// Update cached rate limits with the freshly computed ones.
if !limits.is_empty() {
Expand Down Expand Up @@ -1538,6 +1544,7 @@ impl EnvelopeProcessorService {
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
sampling_project_info: Option<Arc<ProjectInfo>>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<(), ProcessingError> {
let mut event_fully_normalized = EventFullyNormalized::new(state.envelope());

Expand Down Expand Up @@ -1584,7 +1591,7 @@ impl EnvelopeProcessorService {
}

if_processing!(self.inner.config, {
self.enforce_quotas(state, project_info.clone())?;
self.enforce_quotas(state, project_info.clone(), rate_limits)?;
});

if state.has_event() {
Expand Down Expand Up @@ -1613,13 +1620,15 @@ impl EnvelopeProcessorService {

/// Processes only transactions and transaction-related items.
#[allow(unused_assignments)]
#[allow(clippy::too_many_arguments)]
fn process_transactions(
&self,
state: &mut ProcessEnvelopeState<TransactionGroup>,
config: Arc<Config>,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
mut sampling_project_info: Option<Arc<ProjectInfo>>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
reservoir_counters: ReservoirCounters,
) -> Result<(), ProcessingError> {
let mut event_fully_normalized = EventFullyNormalized::new(state.envelope());
Expand Down Expand Up @@ -1706,7 +1715,7 @@ impl EnvelopeProcessorService {
// - An envelope containing only processed profiles.
// We need to make sure there are enough quotas for these profiles.
if_processing!(self.inner.config, {
self.enforce_quotas(state, project_info.clone())?;
self.enforce_quotas(state, project_info.clone(), rate_limits)?;
});

return Ok(());
Expand Down Expand Up @@ -1746,7 +1755,7 @@ impl EnvelopeProcessorService {
);
}

self.enforce_quotas(state, project_info.clone())?;
self.enforce_quotas(state, project_info.clone(), rate_limits)?;

span::maybe_discard_transaction(state, project_info);
});
Expand Down Expand Up @@ -1796,11 +1805,12 @@ impl EnvelopeProcessorService {
config: Arc<Config>,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<(), ProcessingError> {
profile::filter(state, config, project_id, project_info.clone());

if_processing!(self.inner.config, {
self.enforce_quotas(state, project_info.clone())?;
self.enforce_quotas(state, project_info.clone(), rate_limits)?;
});

report::process_user_reports(state);
Expand All @@ -1813,10 +1823,11 @@ impl EnvelopeProcessorService {
&self,
state: &mut ProcessEnvelopeState<SessionGroup>,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<(), ProcessingError> {
session::process(state, project_info.clone(), &self.inner.config);
if_processing!(self.inner.config, {
self.enforce_quotas(state, project_info)?;
self.enforce_quotas(state, project_info, rate_limits)?;
});
Ok(())
}
Expand All @@ -1827,9 +1838,10 @@ impl EnvelopeProcessorService {
state: &mut ProcessEnvelopeState<ClientReportGroup>,
config: Arc<Config>,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<(), ProcessingError> {
if_processing!(self.inner.config, {
self.enforce_quotas(state, project_info.clone())?;
self.enforce_quotas(state, project_info.clone(), rate_limits)?;
});

report::process_client_reports(
Expand All @@ -1848,6 +1860,7 @@ impl EnvelopeProcessorService {
state: &mut ProcessEnvelopeState<ReplayGroup>,
config: Arc<Config>,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<(), ProcessingError> {
replay::process(
state,
Expand All @@ -1857,7 +1870,7 @@ impl EnvelopeProcessorService {
self.inner.geoip_lookup.as_ref(),
)?;
if_processing!(self.inner.config, {
self.enforce_quotas(state, project_info)?;
self.enforce_quotas(state, project_info, rate_limits)?;
});
Ok(())
}
Expand All @@ -1868,9 +1881,10 @@ impl EnvelopeProcessorService {
#[allow(unused_variables)] state: &mut ProcessEnvelopeState<CheckInGroup>,
#[allow(unused_variables)] project_id: ProjectId,
#[allow(unused_variables)] project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<(), ProcessingError> {
if_processing!(self.inner.config, {
self.enforce_quotas(state, project_info)?;
self.enforce_quotas(state, project_info, rate_limits)?;
self.process_check_ins(state, project_id);
});
Ok(())
Expand All @@ -1879,13 +1893,15 @@ impl EnvelopeProcessorService {
/// Processes standalone spans.
///
/// This function does *not* run for spans extracted from transactions.
#[allow(clippy::too_many_arguments)]
fn process_standalone_spans(
&self,
state: &mut ProcessEnvelopeState<SpanGroup>,
config: Arc<Config>,
#[allow(unused_variables)] project_id: ProjectId,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] sampling_project_info: Option<Arc<ProjectInfo>>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
#[allow(unused_variables)] reservoir_counters: ReservoirCounters,
) -> Result<(), ProcessingError> {
span::filter(state, config.clone(), project_info.clone());
Expand All @@ -1909,7 +1925,7 @@ impl EnvelopeProcessorService {
&reservoir,
);

self.enforce_quotas(state, project_info)?;
self.enforce_quotas(state, project_info, rate_limits)?;
});
Ok(())
}
Expand Down Expand Up @@ -1958,7 +1974,6 @@ impl EnvelopeProcessorService {
event: Annotated::empty(),
metrics: Metrics::default(),
extracted_metrics: ProcessingExtractedMetrics::new(),
rate_limits,
managed_envelope,
};

Expand Down Expand Up @@ -1986,7 +2001,8 @@ impl EnvelopeProcessorService {
process_errors,
project_id,
project_info,
sampling_project_info
sampling_project_info,
rate_limits
),
ProcessingGroup::Transaction => {
run!(
Expand All @@ -1995,31 +2011,42 @@ impl EnvelopeProcessorService {
project_id,
project_info,
sampling_project_info,
rate_limits,
reservoir_counters
)
}
ProcessingGroup::Session => run!(process_sessions, project_info),
ProcessingGroup::Session => run!(process_sessions, project_info, rate_limits),
ProcessingGroup::Standalone => run!(
process_standalone,
self.inner.config.clone(),
project_id,
project_info
project_info,
rate_limits
),
ProcessingGroup::ClientReport => run!(
process_client_reports,
self.inner.config.clone(),
project_info
project_info,
rate_limits
),
ProcessingGroup::Replay => {
run!(process_replays, self.inner.config.clone(), project_info)
run!(
process_replays,
self.inner.config.clone(),
project_info,
rate_limits
)
}
ProcessingGroup::CheckIn => {
run!(process_checkins, project_id, project_info, rate_limits)
}
ProcessingGroup::CheckIn => run!(process_checkins, project_id, project_info),
ProcessingGroup::Span => run!(
process_standalone_spans,
self.inner.config.clone(),
project_id,
project_info,
sampling_project_info,
rate_limits,
reservoir_counters
),
ProcessingGroup::ProfileChunk => run!(process_profile_chunks, project_info),
Expand Down Expand Up @@ -2942,6 +2969,7 @@ impl RateLimiter<'_> {
global_config: &GlobalConfig,
state: &mut ProcessEnvelopeState<G>,
project_info: Arc<ProjectInfo>,
rate_limits: Arc<RateLimits>,
) -> Result<RateLimits, ProcessingError> {
if state.envelope().is_empty() && !state.has_event() {
return Ok(RateLimits::default());
Expand All @@ -2958,7 +2986,7 @@ impl RateLimiter<'_> {
// remove it from the processing state eventually.
let mut envelope_limiter =
EnvelopeLimiter::new(CheckLimits::All, |item_scope, quantity| match self {
RateLimiter::Cached => Ok(state.rate_limits.check_with_quotas(quotas, item_scope)),
RateLimiter::Cached => Ok(rate_limits.check_with_quotas(quotas, item_scope)),
RateLimiter::Consistent(rl) => Ok::<_, ProcessingError>(
rl.is_rate_limited(quotas, item_scope, quantity, false)?,
),
Expand Down
4 changes: 1 addition & 3 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,6 @@ mod tests {
event: Annotated::from(event),
metrics: Default::default(),
extracted_metrics: ProcessingExtractedMetrics::new(),
rate_limits: Default::default(),
managed_envelope: ManagedEnvelope::new(
envelope,
outcome_aggregator.clone(),
Expand All @@ -476,7 +475,7 @@ mod tests {
let sampling_result = run(&mut state, config.clone(), project_info, None, &reservoir);
assert_eq!(sampling_result.decision(), SamplingDecision::Keep);

// Dynamic sampling is run, as the transactionmetrics version is up to date.
// Dynamic sampling is run, as the transaction metrics version is up to date.
let (mut state, project_info) = get_state(Some(3));
let sampling_result = run(&mut state, config.clone(), project_info, None, &reservoir);
assert_eq!(sampling_result.decision(), SamplingDecision::Drop);
Expand Down Expand Up @@ -720,7 +719,6 @@ mod tests {
event: Annotated::new(Event::default()),
metrics: Default::default(),
extracted_metrics: ProcessingExtractedMetrics::new(),
rate_limits: Default::default(),
managed_envelope: ManagedEnvelope::new(
envelope,
Addr::dummy(),
Expand Down
2 changes: 0 additions & 2 deletions relay-server/src/services/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,6 @@ mod tests {
};
use relay_event_schema::protocol::{Contexts, Event, Span};
use relay_protocol::get_value;
use relay_quotas::RateLimits;
use relay_system::Addr;

use crate::envelope::Envelope;
Expand Down Expand Up @@ -868,7 +867,6 @@ mod tests {
event: Annotated::from(event),
metrics: Default::default(),
extracted_metrics: ProcessingExtractedMetrics::new(),
rate_limits: Arc::new(RateLimits::default()),
managed_envelope: managed_envelope.try_into().unwrap(),
};

Expand Down

0 comments on commit 06b174a

Please sign in to comment.