Skip to content

Commit

Permalink
ref(processor): Remove the config from the state (#4393)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Dec 17, 2024
1 parent 0d5fe3f commit 2fcf51a
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 86 deletions.
116 changes: 68 additions & 48 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,9 +758,6 @@ struct ProcessEnvelopeState<Group> {
#[cfg_attr(not(feature = "processing"), expect(dead_code))]
rate_limits: Arc<RateLimits>,

/// The config of this Relay instance.
config: Arc<Config>,

/// The managed envelope before processing.
managed_envelope: TypedEnvelope<Group>,
}
Expand Down Expand Up @@ -806,19 +803,43 @@ impl<Group> ProcessEnvelopeState<Group> {
fn remove_event(&mut self) {
self.event = Annotated::empty();
}
}

/// Function for on-off switches that filter specific item types (profiles, spans)
/// based on a feature flag.
///
/// If the project config did not come from the upstream, we keep the items.
fn should_filter(&self, feature: Feature, project_info: &ProjectInfo) -> bool {
match self.config.relay_mode() {
RelayMode::Proxy | RelayMode::Static | RelayMode::Capture => false,
RelayMode::Managed => !project_info.has_feature(feature),
}
/// Function for on-off switches that filter specific item types (profiles, spans)
/// based on a feature flag.
///
/// If the project config did not come from the upstream, we keep the items.
fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
match config.relay_mode() {
RelayMode::Proxy | RelayMode::Static | RelayMode::Capture => false,
RelayMode::Managed => !project_info.has_feature(feature),
}
}

/// New type representing the normalization state of the event.
#[derive(Copy, Clone)]
struct EventFullyNormalized(bool);

impl EventFullyNormalized {
/// Returns `true` if the event is fully normalized, `false` otherwise.
pub fn new(envelope: &Envelope) -> Self {
let event_fully_normalized = envelope.meta().is_from_internal_relay()
&& envelope
.items()
.any(|item| item.creates_event() && item.fully_normalized());

Self(event_fully_normalized)
}
}

/// New type representing whether metrics were extracted from transactions/spans.
#[derive(Copy, Clone)]
struct EventMetricsExtracted(bool);

/// New type representing whether spans were extracted.
#[derive(Copy, Clone)]
struct SpansExtracted(bool);

/// The view out of the [`ProcessEnvelopeState`] after processing.
#[derive(Debug)]
struct ProcessingStateResult {
Expand Down Expand Up @@ -1124,30 +1145,6 @@ struct InnerProcessor {
metric_outcomes: MetricOutcomes,
}

/// New type representing the normalization state of the event.
#[derive(Copy, Clone)]
struct EventFullyNormalized(bool);

impl EventFullyNormalized {
/// Returns `true` if the event is fully normalized, `false` otherwise.
pub fn new(envelope: &Envelope) -> Self {
let event_fully_normalized = envelope.meta().is_from_internal_relay()
&& envelope
.items()
.any(|item| item.creates_event() && item.fully_normalized());

Self(event_fully_normalized)
}
}

/// New type representing whether metrics were extracted from transactions/spans.
#[derive(Copy, Clone)]
struct EventMetricsExtracted(bool);

/// New type representing whether spans were extracted.
#[derive(Copy, Clone)]
struct SpansExtracted(bool);

impl EnvelopeProcessorService {
/// Creates a multi-threaded envelope processor.
#[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
Expand Down Expand Up @@ -1619,6 +1616,7 @@ impl EnvelopeProcessorService {
fn process_transactions(
&self,
state: &mut ProcessEnvelopeState<TransactionGroup>,
config: Arc<Config>,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
mut sampling_project_info: Option<Arc<ProjectInfo>>,
Expand All @@ -1637,7 +1635,7 @@ impl EnvelopeProcessorService {
spans_extracted = inner_spans_extracted;
};

let profile_id = profile::filter(state, project_id, project_info.clone());
let profile_id = profile::filter(state, config.clone(), project_id, project_info.clone());
profile::transfer_id(state, profile_id);

event::finalize(state, &self.inner.config)?;
Expand Down Expand Up @@ -1673,6 +1671,7 @@ impl EnvelopeProcessorService {
let sampling_result = match run_dynamic_sampling {
true => dynamic_sampling::run(
state,
config.clone(),
project_info.clone(),
sampling_project_info,
&reservoir,
Expand All @@ -1689,7 +1688,7 @@ impl EnvelopeProcessorService {
if let Some(outcome) = sampling_result.into_dropped_outcome() {
// Process profiles before dropping the transaction, if necessary.
// Before metric extraction to make sure the profile count is reflected correctly.
profile::process(state, project_info.clone(), &global_config);
profile::process(state, &global_config, config.clone(), project_info.clone());
// Extract metrics here, we're about to drop the event/transaction.
event_metrics_extracted = self.extract_transaction_metrics(
state,
Expand Down Expand Up @@ -1721,7 +1720,8 @@ impl EnvelopeProcessorService {

if_processing!(self.inner.config, {
// Process profiles before extracting metrics, to make sure they are removed if they are invalid.
let profile_id = profile::process(state, project_info.clone(), &global_config);
let profile_id =
profile::process(state, &global_config, config.clone(), project_info.clone());
profile::transfer_id(state, profile_id);

// Always extract metrics in processing Relays for sampled items.
Expand All @@ -1737,8 +1737,9 @@ impl EnvelopeProcessorService {
if project_info.has_feature(Feature::ExtractSpansFromEvent) {
spans_extracted = span::extract_from_event(
state,
project_info.clone(),
&global_config,
config,
project_info.clone(),
server_sample_rate,
event_metrics_extracted,
spans_extracted,
Expand Down Expand Up @@ -1792,10 +1793,11 @@ impl EnvelopeProcessorService {
fn process_standalone(
&self,
state: &mut ProcessEnvelopeState<StandaloneGroup>,
config: Arc<Config>,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
) -> Result<(), ProcessingError> {
profile::filter(state, project_id, project_info.clone());
profile::filter(state, config, project_id, project_info.clone());

if_processing!(self.inner.config, {
self.enforce_quotas(state, project_info.clone())?;
Expand Down Expand Up @@ -1823,6 +1825,7 @@ impl EnvelopeProcessorService {
fn process_client_reports(
&self,
state: &mut ProcessEnvelopeState<ClientReportGroup>,
config: Arc<Config>,
project_info: Arc<ProjectInfo>,
) -> Result<(), ProcessingError> {
if_processing!(self.inner.config, {
Expand All @@ -1831,6 +1834,7 @@ impl EnvelopeProcessorService {

report::process_client_reports(
state,
config,
project_info,
self.inner.addrs.outcome_aggregator.clone(),
);
Expand All @@ -1842,12 +1846,14 @@ impl EnvelopeProcessorService {
fn process_replays(
&self,
state: &mut ProcessEnvelopeState<ReplayGroup>,
config: Arc<Config>,
project_info: Arc<ProjectInfo>,
) -> Result<(), ProcessingError> {
replay::process(
state,
project_info.clone(),
&self.inner.global_config.current(),
config,
project_info.clone(),
self.inner.geoip_lookup.as_ref(),
)?;
if_processing!(self.inner.config, {
Expand Down Expand Up @@ -1876,12 +1882,13 @@ impl EnvelopeProcessorService {
fn process_standalone_spans(
&self,
state: &mut ProcessEnvelopeState<SpanGroup>,
config: Arc<Config>,
#[allow(unused_variables)] project_id: ProjectId,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] sampling_project_info: Option<Arc<ProjectInfo>>,
#[allow(unused_variables)] reservoir_counters: ReservoirCounters,
) -> Result<(), ProcessingError> {
span::filter(state, project_info.clone());
span::filter(state, config.clone(), project_info.clone());
span::convert_otel_traces_data(state);

if_processing!(self.inner.config, {
Expand All @@ -1893,10 +1900,11 @@ impl EnvelopeProcessorService {

span::process(
state,
&global_config,
config,
project_id,
project_info.clone(),
sampling_project_info,
&global_config,
self.inner.geoip_lookup.as_ref(),
&reservoir,
);
Expand Down Expand Up @@ -1950,7 +1958,6 @@ impl EnvelopeProcessorService {
event: Annotated::empty(),
metrics: Metrics::default(),
extracted_metrics: ProcessingExtractedMetrics::new(),
config: self.inner.config.clone(),
rate_limits,
managed_envelope,
};
Expand Down Expand Up @@ -1984,19 +1991,32 @@ impl EnvelopeProcessorService {
ProcessingGroup::Transaction => {
run!(
process_transactions,
self.inner.config.clone(),
project_id,
project_info,
sampling_project_info,
reservoir_counters
)
}
ProcessingGroup::Session => run!(process_sessions, project_info),
ProcessingGroup::Standalone => run!(process_standalone, project_id, project_info),
ProcessingGroup::ClientReport => run!(process_client_reports, project_info),
ProcessingGroup::Replay => run!(process_replays, project_info),
ProcessingGroup::Standalone => run!(
process_standalone,
self.inner.config.clone(),
project_id,
project_info
),
ProcessingGroup::ClientReport => run!(
process_client_reports,
self.inner.config.clone(),
project_info
),
ProcessingGroup::Replay => {
run!(process_replays, self.inner.config.clone(), project_info)
}
ProcessingGroup::CheckIn => run!(process_checkins, project_id, project_info),
ProcessingGroup::Span => run!(
process_standalone_spans,
self.inner.config.clone(),
project_id,
project_info,
sampling_project_info,
Expand Down
20 changes: 13 additions & 7 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub fn validate_and_set_dsc(
/// Computes the sampling decision on the incoming event
pub fn run<Group>(
state: &mut ProcessEnvelopeState<Group>,
config: Arc<Config>,
project_info: Arc<ProjectInfo>,
sampling_project_info: Option<Arc<ProjectInfo>>,
reservoir: &ReservoirEvaluator,
Expand All @@ -97,7 +98,7 @@ where
let reservoir = Group::supports_reservoir_sampling().then_some(reservoir);

compute_sampling_decision(
state.config.processing_enabled(),
config.processing_enabled(),
reservoir,
sampling_config,
state.event.value(),
Expand Down Expand Up @@ -449,7 +450,6 @@ mod tests {
event: Annotated::from(event),
metrics: Default::default(),
extracted_metrics: ProcessingExtractedMetrics::new(),
config: config.clone(),
rate_limits: Default::default(),
managed_envelope: ManagedEnvelope::new(
envelope,
Expand All @@ -468,17 +468,17 @@ mod tests {

// None represents no TransactionMetricsConfig, DS will not be run
let (mut state, project_info) = get_state(None);
let sampling_result = run(&mut state, project_info, None, &reservoir);
let sampling_result = run(&mut state, config.clone(), project_info, None, &reservoir);
assert_eq!(sampling_result.decision(), SamplingDecision::Keep);

// Current version is 3, so it won't run DS if it's outdated
let (mut state, project_info) = get_state(Some(2));
let sampling_result = run(&mut state, project_info, None, &reservoir);
let sampling_result = run(&mut state, config.clone(), project_info, None, &reservoir);
assert_eq!(sampling_result.decision(), SamplingDecision::Keep);

// Dynamic sampling is run, as the transactionmetrics version is up to date.
let (mut state, project_info) = get_state(Some(3));
let sampling_result = run(&mut state, project_info, None, &reservoir);
let sampling_result = run(&mut state, config.clone(), project_info, None, &reservoir);
assert_eq!(sampling_result.decision(), SamplingDecision::Drop);
}

Expand Down Expand Up @@ -715,11 +715,11 @@ mod tests {
r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42","trace":{"trace_id":"89143b0763095bd9c9955e8175d1fb23","public_key":"e12d836b15bb49d7bbf99e64295d995b"}}"#,
);
let envelope = Envelope::parse_bytes(bytes).unwrap();
let config = Arc::new(Config::default());
let mut state = ProcessEnvelopeState::<G> {
event: Annotated::new(Event::default()),
metrics: Default::default(),
extracted_metrics: ProcessingExtractedMetrics::new(),
config: Arc::new(Config::default()),
rate_limits: Default::default(),
managed_envelope: ManagedEnvelope::new(
envelope,
Expand Down Expand Up @@ -762,7 +762,13 @@ mod tests {
};

let reservoir = dummy_reservoir();
run(&mut state, project_info, sampling_project_info, &reservoir)
run(
&mut state,
config,
project_info,
sampling_project_info,
&reservoir,
)
}

#[test]
Expand Down
10 changes: 6 additions & 4 deletions relay-server/src/services/processor/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use relay_protocol::Annotated;

use crate::envelope::{ContentType, Item, ItemType};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{ProcessEnvelopeState, TransactionGroup};
use crate::services::processor::{should_filter, ProcessEnvelopeState, TransactionGroup};
use crate::services::projects::project::ProjectInfo;
use crate::utils::ItemAction;

Expand All @@ -22,10 +22,11 @@ use crate::utils::ItemAction;
/// Returns the profile id of the single remaining profile, if there is one.
pub fn filter<G>(
state: &mut ProcessEnvelopeState<G>,
config: Arc<Config>,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
) -> Option<ProfileId> {
let profiling_disabled = state.should_filter(Feature::Profiling, &project_info);
let profiling_disabled = should_filter(&config, &project_info, Feature::Profiling);
let has_transaction = state.event_type() == Some(EventType::Transaction);
let keep_unsampled_profiles = true;

Expand Down Expand Up @@ -98,8 +99,9 @@ pub fn transfer_id(
/// Processes profiles and set the profile ID in the profile context on the transaction if successful.
pub fn process(
state: &mut ProcessEnvelopeState<TransactionGroup>,
project_info: Arc<ProjectInfo>,
global_config: &GlobalConfig,
config: Arc<Config>,
project_info: Arc<ProjectInfo>,
) -> Option<ProfileId> {
let client_ip = state.managed_envelope.envelope().meta().client_addr();
let filter_settings = &project_info.config.filter_settings;
Expand All @@ -122,7 +124,7 @@ pub fn process(
match expand_profile(
item,
event,
&state.config,
&config,
client_ip,
filter_settings,
global_config,
Expand Down
Loading

0 comments on commit 2fcf51a

Please sign in to comment.