diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index e6a889921b..e75d5f96a4 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -668,7 +668,7 @@ impl ProcessingExtractedMetrics { if limit.is_active() { drop_namespaces.push(namespace); } else if indexed.is_active() && !enforced_consistently { - // If the enforcment was not computed by consistently checking the limits, + // If the enforcement was not computed by consistently checking the limits, // the quota for the metrics has not yet been incremented. // In this case we have a dropped indexed payload but a metric which still needs to // be accounted for, make sure the metric will still be rate limited. @@ -732,22 +732,6 @@ fn send_metrics(metrics: ExtractedMetrics, envelope: &Envelope, aggregator: &Add } } -/// A state container for envelope processing. -#[derive(Debug)] -struct ProcessEnvelopeState { - /// Partial metrics of the Event during construction. - /// - /// The pipeline stages can add to this metrics objects. In `finalize_event`, the metrics are - /// persisted into the Event. All modifications afterwards will have no effect. - metrics: Metrics, - - /// Metrics extracted from items in the envelope. - /// - /// Relay can extract metrics for sessions and transactions, which is controlled by - /// configuration objects in the project config. - extracted_metrics: ProcessingExtractedMetrics, -} - /// Returns the data category if there is an event. /// /// The data category is computed from the event type. Both `Default` and `Error` events map to @@ -801,11 +785,27 @@ struct EventMetricsExtracted(bool); #[derive(Debug, Copy, Clone)] struct SpansExtracted(bool); -/// The view out of the [`ProcessEnvelopeState`] after processing. +/// The result of the envelope processing containing the processed envelope along with the partial +/// result. #[derive(Debug)] -struct ProcessingStateResult { +struct ProcessingResult { managed_envelope: TypedEnvelope, - extracted_metrics: ExtractedMetrics, + extracted_metrics: ProcessingExtractedMetrics, +} + +impl ProcessingResult { + /// Creates a [`ProcessingResult`] with no metrics. + fn no_metrics(managed_envelope: TypedEnvelope) -> Self { + Self { + managed_envelope, + extracted_metrics: ProcessingExtractedMetrics::new(), + } + } + + /// Returns the components of the [`ProcessingResult`]. + fn into_inner(self) -> (TypedEnvelope, ExtractedMetrics) { + (self.managed_envelope, self.extracted_metrics.metrics) + } } /// Response of the [`ProcessEnvelope`] message. @@ -1205,9 +1205,9 @@ impl EnvelopeProcessorService { #[cfg(feature = "processing")] fn enforce_quotas( &self, - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, event: Annotated, + extracted_metrics: &mut ProcessingExtractedMetrics, project_info: Arc, rate_limits: Arc, ) -> Result, ProcessingError> { @@ -1220,9 +1220,9 @@ impl EnvelopeProcessorService { // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not // applied in the fast path, all cached quotas can be applied here. let cached_result = RateLimiter::Cached.enforce( - state, managed_envelope, event, + extracted_metrics, &global_config, project_info.clone(), rate_limits.clone(), @@ -1230,9 +1230,9 @@ impl EnvelopeProcessorService { // Enforce all quotas consistently with Redis. let consistent_result = RateLimiter::Consistent(rate_limiter).enforce( - state, managed_envelope, cached_result.event, + extracted_metrics, &global_config, project_info, rate_limits, @@ -1254,9 +1254,9 @@ impl EnvelopeProcessorService { #[allow(clippy::too_many_arguments)] fn extract_transaction_metrics( &self, - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, event: &mut Annotated, + extracted_metrics: &mut ProcessingExtractedMetrics, project_id: ProjectId, project_info: Arc, sampling_decision: SamplingDecision, @@ -1343,9 +1343,7 @@ impl EnvelopeProcessorService { extract_spans, ); - state - .extracted_metrics - .extend(metrics, Some(sampling_decision)); + extracted_metrics.extend(metrics, Some(sampling_decision)); if !project_info.has_feature(Feature::DiscardTransaction) { let transaction_from_dsc = managed_envelope @@ -1361,9 +1359,7 @@ impl EnvelopeProcessorService { target_project_id: project_id, }; - state - .extracted_metrics - .extend(extractor.extract(event)?, Some(sampling_decision)); + extracted_metrics.extend(extractor.extract(event)?, Some(sampling_decision)); } Ok(EventMetricsExtracted(true)) @@ -1507,14 +1503,16 @@ impl EnvelopeProcessorService { /// Processes the general errors, and the items which require or create the events. fn process_errors( &self, - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, project_id: ProjectId, project_info: Arc, sampling_project_info: Option>, #[allow(unused_variables)] rate_limits: Arc, - ) -> Result<(), ProcessingError> { + ) -> Result, ProcessingError> { let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope()); + let mut metrics = Metrics::default(); + #[allow(unused_mut)] + let mut extracted_metrics = ProcessingExtractedMetrics::new(); // Events can also contain user reports. report::process_user_reports(managed_envelope); @@ -1524,8 +1522,8 @@ impl EnvelopeProcessorService { }); let extraction_result = event::extract( - state, managed_envelope, + &mut metrics, event_fully_normalized, &self.inner.config, )?; @@ -1538,13 +1536,18 @@ impl EnvelopeProcessorService { event_fully_normalized = inner_event_fully_normalized; } if let Some(inner_event_fully_normalized) = - attachment::create_placeholders(state, managed_envelope, &mut event) + attachment::create_placeholders(managed_envelope, &mut event, &mut metrics) { event_fully_normalized = inner_event_fully_normalized; } }); - event::finalize(state, managed_envelope, &mut event, &self.inner.config)?; + event::finalize( + managed_envelope, + &mut event, + &mut metrics, + &self.inner.config, + )?; event_fully_normalized = self.normalize_event( managed_envelope, &mut event, @@ -1570,9 +1573,9 @@ impl EnvelopeProcessorService { if_processing!(self.inner.config, { event = self.enforce_quotas( - state, managed_envelope, event, + &mut extracted_metrics, project_info.clone(), rate_limits, )?; @@ -1600,7 +1603,7 @@ impl EnvelopeProcessorService { ); } - Ok(()) + Ok(Some(extracted_metrics)) } /// Processes only transactions and transaction-related items. @@ -1608,7 +1611,6 @@ impl EnvelopeProcessorService { #[allow(clippy::too_many_arguments)] fn process_transactions( &self, - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, config: Arc, project_id: ProjectId, @@ -1616,17 +1618,19 @@ impl EnvelopeProcessorService { mut sampling_project_info: Option>, #[allow(unused_variables)] rate_limits: Arc, reservoir_counters: ReservoirCounters, - ) -> Result<(), ProcessingError> { + ) -> Result, ProcessingError> { let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope()); let mut event_metrics_extracted = EventMetricsExtracted(false); let mut spans_extracted = SpansExtracted(false); + let mut metrics = Metrics::default(); + let mut extracted_metrics = ProcessingExtractedMetrics::new(); let global_config = self.inner.global_config.current(); // We extract the main event from the envelope. let extraction_result = event::extract( - state, managed_envelope, + &mut metrics, event_fully_normalized, &self.inner.config, )?; @@ -1651,7 +1655,12 @@ impl EnvelopeProcessorService { ); profile::transfer_id(&mut event, profile_id); - event::finalize(state, managed_envelope, &mut event, &self.inner.config)?; + event::finalize( + managed_envelope, + &mut event, + &mut metrics, + &self.inner.config, + )?; event_fully_normalized = self.normalize_event( managed_envelope, &mut event, @@ -1714,9 +1723,9 @@ impl EnvelopeProcessorService { ); // Extract metrics here, we're about to drop the event/transaction. event_metrics_extracted = self.extract_transaction_metrics( - state, managed_envelope, &mut event, + &mut extracted_metrics, project_id, project_info.clone(), SamplingDecision::Drop, @@ -1732,15 +1741,15 @@ impl EnvelopeProcessorService { // We need to make sure there are enough quotas for these profiles. if_processing!(self.inner.config, { event = self.enforce_quotas( - state, managed_envelope, Annotated::empty(), + &mut extracted_metrics, project_info.clone(), rate_limits, )?; }); - return Ok(()); + return Ok(Some(extracted_metrics)); } // Need to scrub the transaction before extracting spans. @@ -1762,9 +1771,9 @@ impl EnvelopeProcessorService { // Always extract metrics in processing Relays for sampled items. event_metrics_extracted = self.extract_transaction_metrics( - state, managed_envelope, &mut event, + &mut extracted_metrics, project_id, project_info.clone(), SamplingDecision::Keep, @@ -1786,9 +1795,9 @@ impl EnvelopeProcessorService { } event = self.enforce_quotas( - state, managed_envelope, event, + &mut extracted_metrics, project_info.clone(), rate_limits, )?; @@ -1815,15 +1824,14 @@ impl EnvelopeProcessorService { ); }; - Ok(()) + Ok(Some(extracted_metrics)) } fn process_profile_chunks( &self, - _state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, project_info: Arc, - ) -> Result<(), ProcessingError> { + ) -> Result, ProcessingError> { profile_chunk::filter(managed_envelope, project_info.clone()); if_processing!(self.inner.config, { profile_chunk::process( @@ -1833,19 +1841,22 @@ impl EnvelopeProcessorService { &self.inner.config, ); }); - Ok(()) + + Ok(None) } /// Processes standalone items that require an event ID, but do not have an event on the same envelope. fn process_standalone( &self, - #[allow(unused_variables)] state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, config: Arc, project_id: ProjectId, project_info: Arc, #[allow(unused_variables)] rate_limits: Arc, - ) -> Result<(), ProcessingError> { + ) -> Result, ProcessingError> { + #[allow(unused_mut)] + let mut extracted_metrics = ProcessingExtractedMetrics::new(); + profile::filter( managed_envelope, &Annotated::empty(), @@ -1856,9 +1867,9 @@ impl EnvelopeProcessorService { if_processing!(self.inner.config, { self.enforce_quotas( - state, managed_envelope, Annotated::empty(), + &mut extracted_metrics, project_info.clone(), rate_limits, )?; @@ -1866,49 +1877,54 @@ impl EnvelopeProcessorService { report::process_user_reports(managed_envelope); attachment::scrub(managed_envelope, project_info); - Ok(()) + + Ok(Some(extracted_metrics)) } /// Processes user sessions. fn process_sessions( &self, - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, project_info: Arc, #[allow(unused_variables)] rate_limits: Arc, - ) -> Result<(), ProcessingError> { + ) -> Result, ProcessingError> { + let mut extracted_metrics = ProcessingExtractedMetrics::new(); + session::process( - state, managed_envelope, + &mut extracted_metrics, project_info.clone(), &self.inner.config, ); if_processing!(self.inner.config, { self.enforce_quotas( - state, managed_envelope, Annotated::empty(), + &mut extracted_metrics, project_info, rate_limits, )?; }); - Ok(()) + + Ok(Some(extracted_metrics)) } /// Processes user and client reports. fn process_client_reports( &self, - #[allow(unused_variables)] state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, config: Arc, project_info: Arc, #[allow(unused_variables)] rate_limits: Arc, - ) -> Result<(), ProcessingError> { + ) -> Result, ProcessingError> { + #[allow(unused_mut)] + let mut extracted_metrics = ProcessingExtractedMetrics::new(); + if_processing!(self.inner.config, { self.enforce_quotas( - state, managed_envelope, Annotated::empty(), + &mut extracted_metrics, project_info.clone(), rate_limits, )?; @@ -1921,18 +1937,20 @@ impl EnvelopeProcessorService { self.inner.addrs.outcome_aggregator.clone(), ); - Ok(()) + Ok(Some(extracted_metrics)) } /// Processes replays. fn process_replays( &self, - #[allow(unused_variables)] state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, config: Arc, project_info: Arc, #[allow(unused_variables)] rate_limits: Arc, - ) -> Result<(), ProcessingError> { + ) -> Result, ProcessingError> { + #[allow(unused_mut)] + let mut extracted_metrics = ProcessingExtractedMetrics::new(); + replay::process( managed_envelope, &self.inner.global_config.current(), @@ -1940,38 +1958,43 @@ impl EnvelopeProcessorService { project_info.clone(), self.inner.geoip_lookup.as_ref(), )?; + if_processing!(self.inner.config, { self.enforce_quotas( - state, managed_envelope, Annotated::empty(), + &mut extracted_metrics, project_info, rate_limits, )?; }); - Ok(()) + + Ok(Some(extracted_metrics)) } /// Processes cron check-ins. fn process_checkins( &self, - #[allow(unused_variables)] state: &mut ProcessEnvelopeState, #[allow(unused_variables)] managed_envelope: &mut TypedEnvelope, #[allow(unused_variables)] project_id: ProjectId, #[allow(unused_variables)] project_info: Arc, #[allow(unused_variables)] rate_limits: Arc, - ) -> Result<(), ProcessingError> { + ) -> Result, ProcessingError> { + #[allow(unused_mut)] + let mut extracted_metrics = ProcessingExtractedMetrics::new(); + if_processing!(self.inner.config, { self.enforce_quotas( - state, managed_envelope, Annotated::empty(), + &mut extracted_metrics, project_info, rate_limits, )?; self.normalize_checkins(managed_envelope, project_id); }); - Ok(()) + + Ok(Some(extracted_metrics)) } /// Processes standalone spans. @@ -1980,7 +2003,6 @@ impl EnvelopeProcessorService { #[allow(clippy::too_many_arguments)] fn process_standalone_spans( &self, - #[allow(unused_variables)] state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, config: Arc, #[allow(unused_variables)] project_id: ProjectId, @@ -1988,7 +2010,10 @@ impl EnvelopeProcessorService { #[allow(unused_variables)] sampling_project_info: Option>, #[allow(unused_variables)] rate_limits: Arc, #[allow(unused_variables)] reservoir_counters: ReservoirCounters, - ) -> Result<(), ProcessingError> { + ) -> Result, ProcessingError> { + #[allow(unused_mut)] + let mut extracted_metrics = ProcessingExtractedMetrics::new(); + span::filter(managed_envelope, config.clone(), project_info.clone()); span::convert_otel_traces_data(managed_envelope); @@ -2000,9 +2025,9 @@ impl EnvelopeProcessorService { ); span::process( - state, managed_envelope, &mut Annotated::empty(), + &mut extracted_metrics, &global_config, config, project_id, @@ -2013,15 +2038,15 @@ impl EnvelopeProcessorService { ); self.enforce_quotas( - state, managed_envelope, Annotated::empty(), + &mut extracted_metrics, project_info, rate_limits, )?; }); - Ok(()) + Ok(Some(extracted_metrics)) } fn process_envelope( @@ -2032,7 +2057,7 @@ impl EnvelopeProcessorService { rate_limits: Arc, sampling_project_info: Option>, reservoir_counters: ReservoirCounters, - ) -> Result { + ) -> Result { // Get the group from the managed envelope context, and if it's not set, try to guess it // from the contents of the envelope. let group = managed_envelope.group(); @@ -2064,22 +2089,17 @@ impl EnvelopeProcessorService { macro_rules! run { ($fn_name:ident $(, $args:expr)*) => {{ let mut managed_envelope = managed_envelope.try_into()?; - let mut state = ProcessEnvelopeState { - metrics: Metrics::default(), - extracted_metrics: ProcessingExtractedMetrics::new(), - }; - - // The state is temporarily supplied, until it will be removed. - match self.$fn_name(&mut state, &mut managed_envelope, $($args),*) { - Ok(()) => Ok(ProcessingStateResult { + match self.$fn_name(&mut managed_envelope, $($args),*) { + Ok(extracted_metrics) => Ok(ProcessingResult { managed_envelope: managed_envelope.into_processed(), - extracted_metrics: state.extracted_metrics.metrics, + extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e) }), - Err(e) => { - if let Some(outcome) = e.to_outcome() { + Err(error) => { + if let Some(outcome) = error.to_outcome() { managed_envelope.reject(outcome); } - return Err(e); + + return Err(error); } } @@ -2154,10 +2174,9 @@ impl EnvelopeProcessorService { ); } - Ok(ProcessingStateResult { - managed_envelope: managed_envelope.into_processed(), - extracted_metrics: Default::default(), - }) + Ok(ProcessingResult::no_metrics( + managed_envelope.into_processed(), + )) } // Fallback to the legacy process_state implementation for Ungrouped events. ProcessingGroup::Ungrouped => { @@ -2166,18 +2185,17 @@ impl EnvelopeProcessorService { items = ?managed_envelope.envelope().items().next().map(Item::ty), "could not identify the processing group based on the envelope's items" ); - Ok(ProcessingStateResult { - managed_envelope: managed_envelope.into_processed(), - extracted_metrics: Default::default(), - }) + + Ok(ProcessingResult::no_metrics( + managed_envelope.into_processed(), + )) } // Leave this group unchanged. // // This will later be forwarded to upstream. - ProcessingGroup::ForwardUnknown => Ok(ProcessingStateResult { - managed_envelope: managed_envelope.into_processed(), - extracted_metrics: Default::default(), - }), + ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics( + managed_envelope.into_processed(), + )), } } @@ -2241,28 +2259,33 @@ impl EnvelopeProcessorService { sampling_project_info, reservoir_counters, ) { - Ok(mut state) => { + Ok(result) => { + let (mut managed_envelope, extracted_metrics) = result.into_inner(); + // The envelope could be modified or even emptied during processing, which - // requires recomputation of the context. - state.managed_envelope.update(); - - let has_metrics = !state.extracted_metrics.project_metrics.is_empty(); - send_metrics( - state.extracted_metrics, - state.managed_envelope.envelope(), - &self.inner.addrs.aggregator, - ); + // requires re-computation of the context. + managed_envelope.update(); + + let has_metrics = !extracted_metrics.project_metrics.is_empty(); + if has_metrics { + send_metrics( + extracted_metrics, + managed_envelope.envelope(), + &self.inner.addrs.aggregator, + ); + } - let envelope_response = if state.managed_envelope.envelope().is_empty() { + let envelope_response = if managed_envelope.envelope().is_empty() { if !has_metrics { // Individual rate limits have already been issued - state.managed_envelope.reject(Outcome::RateLimited(None)); + managed_envelope.reject(Outcome::RateLimited(None)); } else { - state.managed_envelope.accept(); + managed_envelope.accept(); } + None } else { - Some(state.managed_envelope) + Some(managed_envelope) }; Ok(ProcessEnvelopeResponse { @@ -3078,9 +3101,9 @@ enum RateLimiter<'a> { impl RateLimiter<'_> { fn enforce( &self, - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, event: Annotated, + extracted_metrics: &mut ProcessingExtractedMetrics, global_config: &GlobalConfig, project_info: Arc, rate_limits: Arc, @@ -3122,9 +3145,7 @@ impl RateLimiter<'_> { // Use the same rate limits as used for the envelope on the metrics. // Those rate limits should not be checked for expiry or similar to ensure a consistent // limiting of envelope items and metrics. - state - .extracted_metrics - .apply_enforcement(&enforcement, matches!(self, Self::Consistent(_))); + extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_))); enforcement.apply_with_outcomes(managed_envelope); if event_active { diff --git a/relay-server/src/services/processor/attachment.rs b/relay-server/src/services/processor/attachment.rs index 7741ad6b2e..8eedabd219 100644 --- a/relay-server/src/services/processor/attachment.rs +++ b/relay-server/src/services/processor/attachment.rs @@ -14,9 +14,9 @@ use crate::services::projects::project::ProjectInfo; use crate::utils::TypedEnvelope; #[cfg(feature = "processing")] use { - crate::services::processor::{ErrorGroup, EventFullyNormalized, ProcessEnvelopeState}, + crate::services::processor::{ErrorGroup, EventFullyNormalized}, crate::utils, - relay_event_schema::protocol::Event, + relay_event_schema::protocol::{Event, Metrics}, relay_protocol::Annotated, }; @@ -28,9 +28,9 @@ use { /// If the event payload was empty before, it is created. #[cfg(feature = "processing")] pub fn create_placeholders( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, event: &mut Annotated, + metrics: &mut Metrics, ) -> Option { let envelope = managed_envelope.envelope(); let minidump_attachment = @@ -40,12 +40,12 @@ pub fn create_placeholders( if let Some(item) = minidump_attachment { let event = event.get_or_insert_with(Event::default); - state.metrics.bytes_ingested_event_minidump = Annotated::new(item.len() as u64); + metrics.bytes_ingested_event_minidump = Annotated::new(item.len() as u64); utils::process_minidump(event, &item.payload()); return Some(EventFullyNormalized(false)); } else if let Some(item) = apple_crash_report_attachment { let event = event.get_or_insert_with(Event::default); - state.metrics.bytes_ingested_event_applecrashreport = Annotated::new(item.len() as u64); + metrics.bytes_ingested_event_applecrashreport = Annotated::new(item.len() as u64); utils::process_apple_crash_report(event, &item.payload()); return Some(EventFullyNormalized(false)); } diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index caf4ddb706..671831c96c 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -11,8 +11,8 @@ use relay_dynamic_config::GlobalConfig; use relay_event_normalization::{nel, ClockDriftProcessor}; use relay_event_schema::processor::{self, ProcessingState}; use relay_event_schema::protocol::{ - Breadcrumb, Csp, Event, ExpectCt, ExpectStaple, Hpkp, LenientString, NetworkReportError, - OtelContext, RelayInfo, SecurityReportType, Values, + Breadcrumb, Csp, Event, ExpectCt, ExpectStaple, Hpkp, LenientString, Metrics, + NetworkReportError, OtelContext, RelayInfo, SecurityReportType, Values, }; use relay_pii::PiiProcessor; use relay_protocol::{Annotated, Array, Empty, Object, Value}; @@ -25,7 +25,7 @@ use crate::extractors::RequestMeta; use crate::services::outcome::Outcome; use crate::services::processor::{ event_category, event_type, EventFullyNormalized, EventMetricsExtracted, EventProcessing, - ExtractedEvent, ProcessEnvelopeState, ProcessingError, SpansExtracted, MINIMUM_CLOCK_DRIFT, + ExtractedEvent, ProcessingError, SpansExtracted, MINIMUM_CLOCK_DRIFT, }; use crate::services::projects::project::ProjectInfo; use crate::statsd::{PlatformTag, RelayCounters, RelayHistograms, RelayTimers}; @@ -48,8 +48,8 @@ pub struct ExtractionResult { /// 4. A multipart form data body. /// 5. If none match, `Annotated::empty()`. pub fn extract( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + metrics: &mut Metrics, event_fully_normalized: EventFullyNormalized, config: &Config, ) -> Result { @@ -143,7 +143,7 @@ pub fn extract( (Annotated::empty(), 0) }; - state.metrics.bytes_ingested_event = Annotated::new(event_len as u64); + metrics.bytes_ingested_event = Annotated::new(event_len as u64); Ok(ExtractionResult { event, @@ -153,9 +153,9 @@ pub fn extract( } pub fn finalize( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, event: &mut Annotated, + metrics: &mut Metrics, config: &Config, ) -> Result<(), ProcessingError> { let envelope = managed_envelope.envelope_mut(); @@ -194,7 +194,7 @@ pub fn finalize( // In processing mode, also write metrics into the event. Most metrics have already been // collected at this state, except for the combined size of all attachments. if config.processing_enabled() { - let mut metrics = std::mem::take(&mut state.metrics); + let mut metrics = std::mem::take(metrics); let attachment_size = envelope .items() diff --git a/relay-server/src/services/processor/session.rs b/relay-server/src/services/processor/session.rs index 608ec641d5..11c00c27cd 100644 --- a/relay-server/src/services/processor/session.rs +++ b/relay-server/src/services/processor/session.rs @@ -15,7 +15,7 @@ use relay_metrics::Bucket; use relay_statsd::metric; use crate::envelope::{ContentType, Item, ItemType}; -use crate::services::processor::{ProcessEnvelopeState, SessionGroup, MINIMUM_CLOCK_DRIFT}; +use crate::services::processor::{ProcessingExtractedMetrics, SessionGroup, MINIMUM_CLOCK_DRIFT}; use crate::services::projects::project::ProjectInfo; use crate::statsd::RelayTimers; use crate::utils::{ItemAction, TypedEnvelope}; @@ -25,8 +25,8 @@ use crate::utils::{ItemAction, TypedEnvelope}; /// Both are removed from the envelope if they contain invalid JSON or if their timestamps /// are out of range after clock drift correction. pub fn process( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + extracted_metrics: &mut ProcessingExtractedMetrics, project_info: Arc, config: &Config, ) { @@ -39,7 +39,7 @@ pub fn process( let clock_drift_processor = ClockDriftProcessor::new(envelope.sent_at(), received).at_least(MINIMUM_CLOCK_DRIFT); - let mut extracted_metrics = Vec::new(); + let mut session_extracted_metrics = Vec::new(); managed_envelope.retain_items(|item| { let should_keep = match item.ty() { ItemType::Session => process_session( @@ -50,7 +50,7 @@ pub fn process( client_addr, metrics_config, &clock_drift_processor, - &mut extracted_metrics, + &mut session_extracted_metrics, ), ItemType::Sessions => process_session_aggregates( item, @@ -60,7 +60,7 @@ pub fn process( client_addr, metrics_config, &clock_drift_processor, - &mut extracted_metrics, + &mut session_extracted_metrics, ), _ => true, // Keep all other item types }; @@ -71,9 +71,7 @@ pub fn process( } }); - state - .extracted_metrics - .extend_project_metrics(extracted_metrics, None); + extracted_metrics.extend_project_metrics(session_extracted_metrics, None); } /// Returns Ok(true) if attributes were modified. diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index e511e9190f..250dc26b21 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -8,8 +8,8 @@ use crate::metrics_extraction::{event, generic}; use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::processor::span::extract_transaction_span; use crate::services::processor::{ - dynamic_sampling, event_type, EventMetricsExtracted, ProcessEnvelopeState, ProcessingError, - SpanGroup, SpansExtracted, TransactionGroup, + dynamic_sampling, event_type, EventMetricsExtracted, ProcessingError, + ProcessingExtractedMetrics, SpanGroup, SpansExtracted, TransactionGroup, }; use crate::services::projects::project::ProjectInfo; use crate::utils::{sample, ItemAction, ManagedEnvelope, TypedEnvelope}; @@ -47,9 +47,9 @@ struct ValidationError(#[from] anyhow::Error); #[allow(clippy::too_many_arguments)] pub fn process( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, event: &mut Annotated, + extracted_metrics: &mut ProcessingExtractedMetrics, global_config: &GlobalConfig, config: Arc, project_id: ProjectId, @@ -155,9 +155,7 @@ pub fn process( CombinedMetricExtractionConfig::new(global_metrics_config, config), ); - state - .extracted_metrics - .extend_project_metrics(metrics, Some(sampling_decision)); + extracted_metrics.extend_project_metrics(metrics, Some(sampling_decision)); if project_info.config.features.produces_spans() { let transaction = span @@ -172,9 +170,7 @@ pub fn process( sampling_decision, project_id, ); - state - .extracted_metrics - .extend_sampling_metrics(bucket, Some(sampling_decision)); + extracted_metrics.extend_sampling_metrics(bucket, Some(sampling_decision)); } item.set_metrics_extracted(true);