Skip to content

Commit

Permalink
ref(processor): Remove metrics and spans extracted booleans from state (
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Dec 16, 2024
1 parent 3f7fc88 commit 960e8d9
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 70 deletions.
105 changes: 63 additions & 42 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,14 +742,6 @@ struct ProcessEnvelopeState<Group> {
/// extracted.
event: Annotated<Event>,

/// Track whether transaction metrics were already extracted.
event_metrics_extracted: bool,

/// Track whether spans and span metrics were already extracted.
///
/// Only applies to envelopes with a transaction item.
spans_extracted: bool,

/// Partial metrics of the Event during construction.
///
/// The pipeline stages can add to this metrics objects. In `finalize_event`, the metrics are
Expand Down Expand Up @@ -1134,7 +1126,7 @@ struct InnerProcessor {

/// New type representing the normalization state of the event.
#[derive(Copy, Clone)]
struct EventFullyNormalized(pub bool);
struct EventFullyNormalized(bool);

impl EventFullyNormalized {
/// Returns `true` if the event is fully normalized, `false` otherwise.
Expand All @@ -1148,6 +1140,14 @@ impl EventFullyNormalized {
}
}

/// New type representing whether metrics were extracted from transactions/spans.
#[derive(Copy, Clone)]
struct EventMetricsExtracted(bool);

/// New type representing whether spans were extracted.
#[derive(Copy, Clone)]
struct SpansExtracted(bool);

impl EnvelopeProcessorService {
/// Creates a multi-threaded envelope processor.
#[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
Expand Down Expand Up @@ -1283,12 +1283,14 @@ impl EnvelopeProcessorService {
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
sampling_decision: SamplingDecision,
) -> Result<(), ProcessingError> {
if state.event_metrics_extracted {
return Ok(());
event_metrics_extracted: EventMetricsExtracted,
spans_extracted: SpansExtracted,
) -> Result<EventMetricsExtracted, ProcessingError> {
if event_metrics_extracted.0 {
return Ok(event_metrics_extracted);
}
let Some(event) = state.event.value_mut() else {
return Ok(());
return Ok(event_metrics_extracted);
};

// NOTE: This function requires a `metric_extraction` in the project config. Legacy configs
Expand All @@ -1298,7 +1300,7 @@ impl EnvelopeProcessorService {
let combined_config = {
let config = match &project_info.config.metric_extraction {
ErrorBoundary::Ok(ref config) if config.is_supported() => config,
_ => return Ok(()),
_ => return Ok(event_metrics_extracted),
};
let global_config = match &global.metric_extraction {
ErrorBoundary::Ok(global_config) => global_config,
Expand All @@ -1313,7 +1315,7 @@ impl EnvelopeProcessorService {
// If there's an error with global metrics extraction, it is safe to assume that this
// Relay instance is not up-to-date, and we should skip extraction.
relay_log::debug!("Failed to parse global extraction config: {e}");
return Ok(());
return Ok(event_metrics_extracted);
})
}
};
Expand All @@ -1325,11 +1327,11 @@ impl EnvelopeProcessorService {
Some(ErrorBoundary::Ok(tx_config)) => tx_config,
Some(ErrorBoundary::Err(e)) => {
relay_log::debug!("Failed to parse legacy transaction metrics config: {e}");
return Ok(());
return Ok(event_metrics_extracted);
}
None => {
relay_log::debug!("Legacy transaction metrics config is missing");
return Ok(());
return Ok(event_metrics_extracted);
}
};

Expand All @@ -1344,11 +1346,11 @@ impl EnvelopeProcessorService {
}
});

return Ok(());
return Ok(event_metrics_extracted);
}

// If spans were already extracted for an event, we rely on span processing to extract metrics.
let extract_spans = !state.spans_extracted
let extract_spans = !spans_extracted.0
&& project_info.config.features.produces_spans()
&& utils::sample(global.options.span_extraction_sample_rate.unwrap_or(1.0));

Expand Down Expand Up @@ -1389,9 +1391,7 @@ impl EnvelopeProcessorService {
.extend(extractor.extract(event)?, Some(sampling_decision));
}

state.event_metrics_extracted = true;

Ok(())
Ok(EventMetricsExtracted(true))
}

fn normalize_event<G: EventProcessing>(
Expand All @@ -1400,20 +1400,20 @@ impl EnvelopeProcessorService {
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
mut event_fully_normalized: EventFullyNormalized,
) -> Result<Option<EventFullyNormalized>, ProcessingError> {
) -> Result<EventFullyNormalized, ProcessingError> {
if !state.has_event() {
// NOTE(iker): only processing relays create events from
// attachments, so these events won't be normalized in
// non-processing relays even if the config is set to run full
// normalization.
return Ok(None);
return Ok(event_fully_normalized);
}

let full_normalization = match self.inner.config.normalization_level() {
NormalizationLevel::Full => true,
NormalizationLevel::Default => {
if self.inner.config.processing_enabled() && event_fully_normalized.0 {
return Ok(None);
return Ok(event_fully_normalized);
}

self.inner.config.processing_enabled()
Expand Down Expand Up @@ -1531,7 +1531,7 @@ impl EnvelopeProcessorService {

event_fully_normalized.0 |= full_normalization;

Ok(Some(event_fully_normalized))
Ok(event_fully_normalized)
}

/// Processes the general errors, and the items which require or create the events.
Expand All @@ -1551,6 +1551,9 @@ impl EnvelopeProcessorService {
unreal::expand(state, &self.inner.config)?;
});

// When extracting the event when processing an error, we expect that the result of this
// function is unused since we don't have error metrics and errors are not correlated to
// extracted spans.
event::extract(state, event_fully_normalized, &self.inner.config)?;

if_processing!(self.inner.config, {
Expand All @@ -1563,14 +1566,12 @@ impl EnvelopeProcessorService {
});

event::finalize(state, &self.inner.config)?;
if let Some(inner_event_fully_normalized) = self.normalize_event(
event_fully_normalized = self.normalize_event(
state,
project_id,
project_info.clone(),
event_fully_normalized,
)? {
event_fully_normalized = inner_event_fully_normalized;
};
)?;
let filter_run = event::filter(
state,
project_info.clone(),
Expand All @@ -1591,7 +1592,12 @@ impl EnvelopeProcessorService {

if state.has_event() {
event::scrub(state, project_info.clone())?;
event::serialize(state, event_fully_normalized)?;
event::serialize(
state,
event_fully_normalized,
EventMetricsExtracted(false),
SpansExtracted(false),
)?;
event::emit_feedback_metrics(state.envelope());
}

Expand All @@ -1609,6 +1615,7 @@ impl EnvelopeProcessorService {
}

/// Processes only transactions and transaction-related items.
#[allow(unused_assignments)]
fn process_transactions(
&self,
state: &mut ProcessEnvelopeState<TransactionGroup>,
Expand All @@ -1618,23 +1625,28 @@ impl EnvelopeProcessorService {
reservoir_counters: ReservoirCounters,
) -> Result<(), ProcessingError> {
let mut event_fully_normalized = EventFullyNormalized::new(state.envelope());
let mut event_metrics_extracted = EventMetricsExtracted(false);
let mut spans_extracted = SpansExtracted(false);

let global_config = self.inner.global_config.current();

event::extract(state, event_fully_normalized, &self.inner.config)?;
if let Some((inner_event_metrics_extracted, inner_spans_extracted)) =
event::extract(state, event_fully_normalized, &self.inner.config)?
{
event_metrics_extracted = inner_event_metrics_extracted;
spans_extracted = inner_spans_extracted;
};

let profile_id = profile::filter(state, project_id, project_info.clone());
profile::transfer_id(state, profile_id);

event::finalize(state, &self.inner.config)?;
if let Some(inner_event_fully_normalized) = self.normalize_event(
event_fully_normalized = self.normalize_event(
state,
project_id,
project_info.clone(),
event_fully_normalized,
)? {
event_fully_normalized = inner_event_fully_normalized;
}
)?;

sampling_project_info = dynamic_sampling::validate_and_set_dsc(
state,
Expand Down Expand Up @@ -1679,11 +1691,13 @@ impl EnvelopeProcessorService {
// Before metric extraction to make sure the profile count is reflected correctly.
profile::process(state, project_info.clone(), &global_config);
// Extract metrics here, we're about to drop the event/transaction.
self.extract_transaction_metrics(
event_metrics_extracted = self.extract_transaction_metrics(
state,
project_id,
project_info.clone(),
SamplingDecision::Drop,
event_metrics_extracted,
spans_extracted,
)?;

dynamic_sampling::drop_unsampled_items(state, outcome);
Expand Down Expand Up @@ -1711,19 +1725,23 @@ impl EnvelopeProcessorService {
profile::transfer_id(state, profile_id);

// Always extract metrics in processing Relays for sampled items.
self.extract_transaction_metrics(
event_metrics_extracted = self.extract_transaction_metrics(
state,
project_id,
project_info.clone(),
SamplingDecision::Keep,
event_metrics_extracted,
spans_extracted,
)?;

if project_info.has_feature(Feature::ExtractSpansFromEvent) {
span::extract_from_event(
spans_extracted = span::extract_from_event(
state,
project_info.clone(),
&global_config,
server_sample_rate,
event_metrics_extracted,
spans_extracted,
);
}

Expand All @@ -1734,7 +1752,12 @@ impl EnvelopeProcessorService {

// Event may have been dropped because of a quota and the envelope can be empty.
if state.has_event() {
event::serialize(state, event_fully_normalized)?;
event::serialize(
state,
event_fully_normalized,
event_metrics_extracted,
spans_extracted,
)?;
}

if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
Expand Down Expand Up @@ -1925,8 +1948,6 @@ impl EnvelopeProcessorService {
let managed_envelope = managed_envelope.try_into()?;
let mut state = ProcessEnvelopeState {
event: Annotated::empty(),
event_metrics_extracted: false,
spans_extracted: false,
metrics: Metrics::default(),
extracted_metrics: ProcessingExtractedMetrics::new(),
config: self.inner.config.clone(),
Expand Down
4 changes: 0 additions & 4 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,6 @@ mod tests {
)
.try_into()
.unwrap(),
event_metrics_extracted: false,
spans_extracted: false,
};

(state, project_info)
Expand Down Expand Up @@ -719,8 +717,6 @@ mod tests {
let envelope = Envelope::parse_bytes(bytes).unwrap();
let mut state = ProcessEnvelopeState::<G> {
event: Annotated::new(Event::default()),
event_metrics_extracted: false,
spans_extracted: false,
metrics: Default::default(),
extracted_metrics: ProcessingExtractedMetrics::new(),
config: Arc::new(Config::default()),
Expand Down
22 changes: 14 additions & 8 deletions relay-server/src/services/processor/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType};
use crate::extractors::RequestMeta;
use crate::services::outcome::Outcome;
use crate::services::processor::{
EventFullyNormalized, EventProcessing, ExtractedEvent, ProcessEnvelopeState, ProcessingError,
MINIMUM_CLOCK_DRIFT,
EventFullyNormalized, EventMetricsExtracted, EventProcessing, ExtractedEvent,
ProcessEnvelopeState, ProcessingError, SpansExtracted, MINIMUM_CLOCK_DRIFT,
};
use crate::services::projects::project::ProjectInfo;
use crate::statsd::{PlatformTag, RelayCounters, RelayHistograms, RelayTimers};
Expand All @@ -43,7 +43,7 @@ pub fn extract<G: EventProcessing>(
state: &mut ProcessEnvelopeState<G>,
event_fully_normalized: EventFullyNormalized,
config: &Config,
) -> Result<(), ProcessingError> {
) -> Result<Option<(EventMetricsExtracted, SpansExtracted)>, ProcessingError> {
let envelope = &mut state.envelope_mut();

// Remove all items first, and then process them. After this function returns, only
Expand Down Expand Up @@ -72,6 +72,8 @@ pub fn extract<G: EventProcessing>(

let skip_normalization = config.processing_enabled() && event_fully_normalized.0;

let mut result = None;

let (event, event_len) = if let Some(item) = event_item.or(security_item) {
relay_log::trace!("processing json event");
metric!(timer(RelayTimers::EventProcessingDeserialize), {
Expand All @@ -87,8 +89,10 @@ pub fn extract<G: EventProcessing>(
})
} else if let Some(item) = transaction_item {
relay_log::trace!("processing json transaction");
state.event_metrics_extracted = item.metrics_extracted();
state.spans_extracted = item.spans_extracted();
result = Some((
EventMetricsExtracted(item.metrics_extracted()),
SpansExtracted(item.spans_extracted()),
));
metric!(timer(RelayTimers::EventProcessingDeserialize), {
// Transaction items can only contain transaction events. Force the event type to
// hint to normalization that we're dealing with a transaction now.
Expand Down Expand Up @@ -133,7 +137,7 @@ pub fn extract<G: EventProcessing>(
state.event = event;
state.metrics.bytes_ingested_event = Annotated::new(event_len as u64);

Ok(())
Ok(result)
}

pub fn finalize<G: EventProcessing>(
Expand Down Expand Up @@ -352,6 +356,8 @@ pub fn scrub<G: EventProcessing>(
pub fn serialize<G: EventProcessing>(
state: &mut ProcessEnvelopeState<G>,
event_fully_normalized: EventFullyNormalized,
event_metrics_extracted: EventMetricsExtracted,
spans_extracted: SpansExtracted,
) -> Result<(), ProcessingError> {
if state.event.is_empty() {
relay_log::error!("Cannot serialize empty event");
Expand All @@ -371,8 +377,8 @@ pub fn serialize<G: EventProcessing>(

// TODO: The state should simply maintain & update an `ItemHeaders` object.
// If transaction metrics were extracted, set the corresponding item header
event_item.set_metrics_extracted(state.event_metrics_extracted);
event_item.set_spans_extracted(state.spans_extracted);
event_item.set_metrics_extracted(event_metrics_extracted.0);
event_item.set_spans_extracted(spans_extracted.0);
event_item.set_fully_normalized(event_fully_normalized.0);

state.envelope_mut().add_item(event_item);
Expand Down
Loading

0 comments on commit 960e8d9

Please sign in to comment.