Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(processor): Remove reservoir from the state #4383

Merged
merged 6 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 87 additions & 42 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap};
use std::error::Error;
use std::fmt::{Debug, Display};
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::sync::{Arc, Mutex, Once};
use std::sync::{Arc, Once};
use std::time::Duration;

use anyhow::Context;
Expand Down Expand Up @@ -33,32 +33,13 @@ use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNames
use relay_pii::PiiConfigError;
use relay_protocol::Annotated;
use relay_quotas::{DataCategory, RateLimits, Scoping};
use relay_sampling::config::RuleId;
use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, NoResponse, Service};
use reqwest::header;
use smallvec::{smallvec, SmallVec};
use zstd::stream::Encoder as ZstdEncoder;

#[cfg(feature = "processing")]
use {
crate::services::store::{Store, StoreEnvelope},
crate::utils::{CheckLimits, Enforcement, EnvelopeLimiter, ItemAction},
itertools::Itertools,
relay_cardinality::{
CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
RedisSetLimiterOptions,
},
relay_dynamic_config::{CardinalityLimiterMode, GlobalConfig, MetricExtractionGroups},
relay_quotas::{Quota, RateLimitingError, RedisRateLimiter},
relay_redis::{RedisPool, RedisPools},
std::iter::Chain,
std::slice::Iter,
std::time::Instant,
symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
};

use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType};
use crate::extractors::{PartialDsn, RequestMeta};
Expand All @@ -82,6 +63,24 @@ use crate::utils::{
self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, ThreadPool, TypedEnvelope,
WorkerGroup,
};
use relay_base_schema::organization::OrganizationId;
#[cfg(feature = "processing")]
use {
crate::services::store::{Store, StoreEnvelope},
crate::utils::{CheckLimits, Enforcement, EnvelopeLimiter, ItemAction},
itertools::Itertools,
relay_cardinality::{
CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
RedisSetLimiterOptions,
},
relay_dynamic_config::{CardinalityLimiterMode, GlobalConfig, MetricExtractionGroups},
relay_quotas::{Quota, RateLimitingError, RedisRateLimiter},
relay_redis::{RedisPool, RedisPools},
std::iter::Chain,
std::slice::Iter,
std::time::Instant,
symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
};

mod attachment;
mod dynamic_sampling;
Expand Down Expand Up @@ -735,7 +734,7 @@ fn send_metrics(metrics: ExtractedMetrics, envelope: &Envelope, aggregator: &Add

/// A state container for envelope processing.
#[derive(Debug)]
struct ProcessEnvelopeState<'a, Group> {
struct ProcessEnvelopeState<Group> {
/// The extracted event payload.
///
/// For Envelopes without event payloads, this contains `Annotated::empty`. If a single item has
Expand Down Expand Up @@ -786,12 +785,9 @@ struct ProcessEnvelopeState<'a, Group> {

/// The managed envelope before processing.
managed_envelope: TypedEnvelope<Group>,

/// Reservoir evaluator that we use for dynamic sampling.
reservoir: ReservoirEvaluator<'a>,
}

impl<Group> ProcessEnvelopeState<'_, Group> {
impl<Group> ProcessEnvelopeState<Group> {
/// Returns a reference to the contained [`Envelope`].
fn envelope(&self) -> &Envelope {
self.managed_envelope.envelope()
Expand Down Expand Up @@ -1270,7 +1266,6 @@ impl EnvelopeProcessorService {
project_info: Arc<ProjectInfo>,
rate_limits: Arc<RateLimits>,
sampling_project_info: Option<Arc<ProjectInfo>>,
reservoir_counters: Arc<Mutex<BTreeMap<RuleId, i64>>>,
) -> ProcessEnvelopeState<G> {
let envelope = managed_envelope.envelope_mut();

Expand All @@ -1286,14 +1281,6 @@ impl EnvelopeProcessorService {
// 2. The DSN was moved and the envelope sent to the old project ID.
envelope.meta_mut().set_project_id(project_id);

#[allow(unused_mut)]
let mut reservoir = ReservoirEvaluator::new(reservoir_counters);
#[cfg(feature = "processing")]
if let Some(quotas_pool) = self.inner.quotas_pool.as_ref() {
let org_id = managed_envelope.scoping().organization_id;
reservoir.set_redis(org_id, quotas_pool);
}

let extracted_metrics = ProcessingExtractedMetrics::new();

ProcessEnvelopeState {
Expand All @@ -1308,7 +1295,6 @@ impl EnvelopeProcessorService {
sampling_project_info,
project_id,
managed_envelope,
reservoir,
}
}

Expand Down Expand Up @@ -1667,6 +1653,7 @@ impl EnvelopeProcessorService {
fn process_transactions(
&self,
state: &mut ProcessEnvelopeState<TransactionGroup>,
reservoir_counters: ReservoirCounters,
) -> Result<(), ProcessingError> {
let mut event_fully_normalized = EventFullyNormalized::new(state.envelope());

Expand All @@ -1693,8 +1680,13 @@ impl EnvelopeProcessorService {
let run_dynamic_sampling =
matches!(filter_run, FiltersStatus::Ok) || self.inner.config.processing_enabled();

let reservoir = self.new_reservoir_evaluator(
state.managed_envelope.scoping().organization_id,
reservoir_counters,
);

let sampling_result = match run_dynamic_sampling {
true => dynamic_sampling::run(state),
true => dynamic_sampling::run(state, &reservoir),
false => SamplingResult::Pending,
};

Expand Down Expand Up @@ -1857,14 +1849,24 @@ impl EnvelopeProcessorService {
fn process_standalone_spans(
&self,
state: &mut ProcessEnvelopeState<SpanGroup>,
#[allow(unused_variables)] reservoir_counters: ReservoirCounters,
) -> Result<(), ProcessingError> {
span::filter(state);
span::convert_otel_traces_data(state);

if_processing!(self.inner.config, {
let global_config = self.inner.global_config.current();
let reservoir = self.new_reservoir_evaluator(
state.managed_envelope.scoping().organization_id,
reservoir_counters,
);

span::process(state, &global_config, self.inner.geoip_lookup.as_ref());
span::process(
state,
&global_config,
self.inner.geoip_lookup.as_ref(),
&reservoir,
);

self.enforce_quotas(state)?;
});
Expand All @@ -1878,7 +1880,7 @@ impl EnvelopeProcessorService {
project_info: Arc<ProjectInfo>,
rate_limits: Arc<RateLimits>,
sampling_project_info: Option<Arc<ProjectInfo>>,
reservoir_counters: Arc<Mutex<BTreeMap<RuleId, i64>>>,
reservoir_counters: ReservoirCounters,
) -> Result<ProcessingStateResult, ProcessingError> {
// Get the group from the managed envelope context, and if it's not set, try to guess it
// from the contents of the envelope.
Expand All @@ -1893,6 +1895,34 @@ impl EnvelopeProcessorService {
.parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
}

macro_rules! call {
($fn_name:ident ( $($args:expr),* )) => {{
loewenheim marked this conversation as resolved.
Show resolved Hide resolved
let managed_envelope = managed_envelope.try_into()?;
let mut state = self.prepare_state(
self.inner.config.clone(),
managed_envelope,
project_id,
project_info,
rate_limits,
sampling_project_info,
);
// The state is temporarily supplied, until it will be removed.
match self.$fn_name(&mut state, $($args),*) {
Ok(()) => Ok(ProcessingStateResult {
managed_envelope: state.managed_envelope.into_processed(),
extracted_metrics: state.extracted_metrics.metrics,
}),
Err(e) => {
if let Some(outcome) = e.to_outcome() {
state.managed_envelope.reject(outcome);
}
return Err(e);
}
}

}};
}

macro_rules! run {
($fn:ident) => {{
let managed_envelope = managed_envelope.try_into()?;
Expand All @@ -1903,7 +1933,6 @@ impl EnvelopeProcessorService {
project_info,
rate_limits,
sampling_project_info,
reservoir_counters,
);
match self.$fn(&mut state) {
Ok(()) => Ok(ProcessingStateResult {
Expand All @@ -1924,13 +1953,13 @@ impl EnvelopeProcessorService {

match group {
ProcessingGroup::Error => run!(process_errors),
ProcessingGroup::Transaction => run!(process_transactions),
ProcessingGroup::Transaction => call!(process_transactions(reservoir_counters)),
ProcessingGroup::Session => run!(process_sessions),
ProcessingGroup::Standalone => run!(process_standalone),
ProcessingGroup::ClientReport => run!(process_client_reports),
ProcessingGroup::Replay => run!(process_replays),
ProcessingGroup::CheckIn => run!(process_checkins),
ProcessingGroup::Span => run!(process_standalone_spans),
ProcessingGroup::Span => call!(process_standalone_spans(reservoir_counters)),
ProcessingGroup::ProfileChunk => run!(process_profile_chunks),
// Currently is not used.
ProcessingGroup::Metrics => {
Expand Down Expand Up @@ -2807,6 +2836,21 @@ impl EnvelopeProcessorService {
EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
}
}

fn new_reservoir_evaluator(
&self,
#[allow(unused_variables)] organization_id: OrganizationId,
reservoir_counters: ReservoirCounters,
) -> ReservoirEvaluator {
#[allow(unused_mut)]
let mut reservoir = ReservoirEvaluator::new(reservoir_counters);
#[cfg(feature = "processing")]
if let Some(quotas_pool) = self.inner.quotas_pool.as_ref() {
reservoir.set_redis(organization_id, quotas_pool);
}

reservoir
}
}

impl Service for EnvelopeProcessorService {
Expand Down Expand Up @@ -3231,6 +3275,7 @@ impl<'a> IntoIterator for CombinedQuotas<'a> {

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::env;

use insta::assert_debug_snapshot;
Expand Down
20 changes: 12 additions & 8 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ pub fn ensure_dsc(state: &mut ProcessEnvelopeState<TransactionGroup>) {
}

/// Computes the sampling decision on the incoming event
pub fn run<Group>(state: &mut ProcessEnvelopeState<Group>) -> SamplingResult
pub fn run<Group>(
state: &mut ProcessEnvelopeState<Group>,
reservoir: &ReservoirEvaluator,
) -> SamplingResult
where
Group: Sampling,
{
Expand All @@ -78,7 +81,7 @@ where
_ => None,
};

let reservoir = Group::supports_reservoir_sampling().then_some(&state.reservoir);
let reservoir = Group::supports_reservoir_sampling().then_some(reservoir);

compute_sampling_decision(
state.config.processing_enabled(),
Expand Down Expand Up @@ -448,24 +451,25 @@ mod tests {
.try_into()
.unwrap(),
event_metrics_extracted: false,
reservoir: dummy_reservoir(),
spans_extracted: false,
}
};

let reservoir = dummy_reservoir();

// None represents no TransactionMetricsConfig, DS will not be run
let mut state = get_state(None);
let sampling_result = run(&mut state);
let sampling_result = run(&mut state, &reservoir);
assert_eq!(sampling_result.decision(), SamplingDecision::Keep);

// Current version is 3, so it won't run DS if it's outdated
let mut state = get_state(Some(2));
let sampling_result = run(&mut state);
let sampling_result = run(&mut state, &reservoir);
assert_eq!(sampling_result.decision(), SamplingDecision::Keep);

// Dynamic sampling is run, as the transactionmetrics version is up to date.
let mut state = get_state(Some(3));
let sampling_result = run(&mut state);
let sampling_result = run(&mut state, &reservoir);
assert_eq!(sampling_result.decision(), SamplingDecision::Drop);
}

Expand Down Expand Up @@ -750,10 +754,10 @@ mod tests {
)
.try_into()
.unwrap(),
reservoir: dummy_reservoir(),
};

run(&mut state)
let reservoir = dummy_reservoir();
run(&mut state, &reservoir)
}

#[test]
Expand Down
Loading
Loading