Skip to content

Commit

Permalink
ref(metrics): Rework metrics aggregator to keep internal partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Dec 16, 2024
1 parent 0d5fe3f commit ecf05c9
Show file tree
Hide file tree
Showing 27 changed files with 2,171 additions and 1,739 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

**Internal**:

- Rework metrics aggregator to keep internal partitions. ([#4378](https://github.com/getsentry/relay/pull/4378))
- Remove support for metrics with profile namespace. ([#4391](https://github.com/getsentry/relay/pull/4391))

## 24.11.2
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 34 additions & 7 deletions relay-config/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Metrics aggregator configuration.
use relay_metrics::aggregator::AggregatorConfig;
use relay_metrics::MetricNamespace;
use relay_metrics::{MetricNamespace, UnixTimestamp};
use serde::{Deserialize, Serialize};

/// Parameters used for metric aggregation.
Expand All @@ -12,27 +12,54 @@ pub struct AggregatorServiceConfig {
#[serde(flatten)]
pub aggregator: AggregatorConfig,

/// The length the name of a metric is allowed to be.
///
/// Defaults to `200` bytes.
pub max_name_length: usize,

/// The length the tag key is allowed to be.
///
/// Defaults to `200` bytes.
pub max_tag_key_length: usize,

/// The length the tag value is allowed to be.
///
/// Defaults to `200` chars.
pub max_tag_value_length: usize,

/// The approximate maximum number of bytes submitted within one flush cycle.
///
/// This controls how big flushed batches of buckets get, depending on the number of buckets,
/// the cumulative length of their keys, and the number of raw values. Since final serialization
/// adds some additional overhead, this number is approxmate and some safety margin should be
/// adds some additional overhead, this number is approximate and some safety margin should be
/// left to hard limits.
pub max_flush_bytes: usize,
}

/// The flushing interval in milliseconds that determines how often the aggregator is polled for
/// flushing new buckets.
impl AggregatorServiceConfig {
/// Returns the valid range for metrics timestamps.
///
/// Defaults to `100` milliseconds.
pub flush_interval_ms: u64,
/// Metrics or buckets outside of this range should be discarded.
pub fn timestamp_range(&self) -> std::ops::Range<UnixTimestamp> {
let now = UnixTimestamp::now().as_secs();
let min_timestamp = UnixTimestamp::from_secs(
now.saturating_sub(u64::from(self.aggregator.max_secs_in_past)),
);
let max_timestamp = UnixTimestamp::from_secs(
now.saturating_add(u64::from(self.aggregator.max_secs_in_future)),
);
min_timestamp..max_timestamp
}
}

impl Default for AggregatorServiceConfig {
fn default() -> Self {
Self {
aggregator: AggregatorConfig::default(),
max_name_length: 200,
max_tag_key_length: 200,
max_tag_value_length: 200,
max_flush_bytes: 5_000_000, // 5 MB
flush_interval_ms: 100, // 100 milliseconds
}
}
}
Expand Down
58 changes: 0 additions & 58 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use relay_kafka::{
ConfigError as KafkaConfigError, KafkaConfigParam, KafkaParams, KafkaTopic, TopicAssignment,
TopicAssignments,
};
use relay_metrics::aggregator::{AggregatorConfig, FlushBatching};
use relay_metrics::MetricNamespace;
use serde::de::{DeserializeOwned, Unexpected, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
Expand Down Expand Up @@ -2461,63 +2460,6 @@ impl Config {
&self.values.cogs.relay_resource_id
}

/// Creates an [`AggregatorConfig`] that is compatible with every other aggregator.
///
/// A lossless aggregator can be put in front of any of the configured aggregators without losing data that the configured aggregator would keep.
/// This is useful for pre-aggregating metrics together in a single aggregator instance.
pub fn permissive_aggregator_config(&self) -> AggregatorConfig {
let AggregatorConfig {
mut bucket_interval,
mut max_secs_in_past,
mut max_secs_in_future,
mut max_name_length,
mut max_tag_key_length,
mut max_tag_value_length,
mut max_project_key_bucket_bytes,
mut max_total_bucket_bytes,
..
} = self.default_aggregator_config().aggregator;

for secondary_config in self.secondary_aggregator_configs() {
let agg = &secondary_config.config.aggregator;

bucket_interval = bucket_interval.min(agg.bucket_interval);
max_secs_in_past = max_secs_in_past.max(agg.max_secs_in_past);
max_secs_in_future = max_secs_in_future.max(agg.max_secs_in_future);
max_name_length = max_name_length.max(agg.max_name_length);
max_tag_key_length = max_tag_key_length.max(agg.max_tag_key_length);
max_tag_value_length = max_tag_value_length.max(agg.max_tag_value_length);
max_project_key_bucket_bytes =
max_project_key_bucket_bytes.max(agg.max_project_key_bucket_bytes);
max_total_bucket_bytes = max_total_bucket_bytes.max(agg.max_total_bucket_bytes);
}

for agg in self
.secondary_aggregator_configs()
.iter()
.map(|sc| &sc.config)
.chain(std::iter::once(self.default_aggregator_config()))
{
if agg.aggregator.bucket_interval % bucket_interval != 0 {
relay_log::error!("buckets don't align");
}
}

AggregatorConfig {
bucket_interval,
max_secs_in_past,
max_secs_in_future,
max_name_length,
max_tag_key_length,
max_tag_value_length,
max_project_key_bucket_bytes,
max_total_bucket_bytes,
initial_delay: 30,
flush_partitions: None,
flush_batching: FlushBatching::Project,
}
}

/// Returns configuration for the default metrics aggregator.
pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
&self.values.aggregator
Expand Down
1 change: 1 addition & 0 deletions relay-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ workspace = true
redis = ["relay-redis/impl"]

[dependencies]
ahash = { workspace = true }
bytecount = { workspace = true }
chrono = { workspace = true }
fnv = { workspace = true }
Expand Down
11 changes: 5 additions & 6 deletions relay-metrics/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use rand::SeedableRng;
use relay_base_schema::project::ProjectKey;
use relay_common::time::UnixTimestamp;
use relay_metrics::{
aggregator::{Aggregator, AggregatorConfig, FlushDecision},
aggregator::{Aggregator, AggregatorConfig},
Bucket, BucketValue, DistributionValue, FiniteF64,
};
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::fmt;
use std::ops::Range;
use std::time::SystemTime;

struct NumbersGenerator {
min: usize,
Expand Down Expand Up @@ -188,7 +189,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
b.iter_batched(
|| {
let timestamp = UnixTimestamp::now();
let aggregator: Aggregator = Aggregator::new(config.clone());
let aggregator = Aggregator::named("default".to_owned(), &config);
(aggregator, input.get_buckets(timestamp))
},
|(mut aggregator, buckets)| {
Expand All @@ -215,7 +216,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
b.iter_batched(
|| {
let timestamp = UnixTimestamp::now();
let mut aggregator: Aggregator = Aggregator::new(config.clone());
let mut aggregator = Aggregator::named("default".to_owned(), &config);
for (project_key, bucket) in input.get_buckets(timestamp) {
aggregator.merge(project_key, bucket).unwrap();
}
Expand All @@ -224,9 +225,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
|mut aggregator| {
// XXX: Ideally we'd want to test the entire try_flush here, but spawning
// a service is too much work here.
black_box(aggregator.pop_flush_buckets(black_box(false), |_| {
FlushDecision::Flush(Vec::new())
}));
let _ = black_box(aggregator.try_flush_next(SystemTime::UNIX_EPOCH));
},
BatchSize::SmallInput,
)
Expand Down
Loading

0 comments on commit ecf05c9

Please sign in to comment.