diff --git a/relay-metrics/src/aggregator/inner.rs b/relay-metrics/src/aggregator/inner.rs index de149d15f8..08281ffb0d 100644 --- a/relay-metrics/src/aggregator/inner.rs +++ b/relay-metrics/src/aggregator/inner.rs @@ -40,17 +40,17 @@ impl fmt::Debug for Partition { #[derive(Default, Debug)] pub struct PartitionStats { /// Amount of unique buckets in the partition. - #[expect(unused)] + #[expect(unused, reason = "used for snapshot tests")] pub count: u64, /// Amount of unique buckets in the partition by namespace. pub count_by_namespace: ByNamespace, /// Amount of times a bucket was merged in the partition. - #[expect(unused)] + #[expect(unused, reason = "used for snapshot tests")] pub merges: u64, /// Amount of times a bucket was merged in the partition by namespace. pub merges_by_namespace: ByNamespace, /// Cost of buckets in the partition. - #[expect(unused)] + #[expect(unused, reason = "used for snapshot tests")] pub cost: u64, /// Cost of buckets in the partition by namespace. pub cost_by_namespace: ByNamespace, @@ -72,12 +72,12 @@ impl From<&stats::Slot> for PartitionStats { #[derive(Default, Debug)] pub struct Stats { /// Total amount of buckets in the aggregator. - #[expect(unused)] + #[expect(unused, reason = "used for snapshot tests")] pub count: u64, /// Total amount of buckets in the aggregator by namespace. pub count_by_namespace: ByNamespace, /// Total bucket cost in the aggregator. - #[expect(unused)] + #[expect(unused, reason = "used for snapshot tests")] pub cost: u64, /// Total bucket cost in the aggregator by namespace. pub cost_by_namespace: ByNamespace, @@ -184,14 +184,14 @@ pub struct Inner { /// "time slot" and the second dimension is the assigned partition. /// /// The total length of the ring-buffer is then determined by the amount of time slots times - /// the amount of partition. In other words, every time slot as [`Self::num_partitions`] + /// the amount of partitions. In other words, every time slot has [`Self::num_partitions`] /// partitions. /// /// Layout: /// Time slots: [ ][ ][ ] /// Partitions: [ ][ ] [ ][ ] [ ][ ] /// - /// An item is assigned by first determining it's time slot and then assigning it a partition + /// An item is assigned by first determining its time slot and then assigning it to a partition /// based on selected [`Self::partition_by`] strategy. /// /// The first item in the buffer is tracked by [`Self::head`] which is at any time the @@ -207,7 +207,7 @@ pub struct Inner { /// Position of the first element in [`Self::slots`]. head: u64, - /// Size of each individual bucket, inputs are truncated to this value. + /// Size of each individual bucket, inputs are truncated modulo to this value. bucket_interval: u32, /// Amount of slots which is added to a bucket as a delay. /// @@ -221,9 +221,9 @@ pub struct Inner { /// The maximum amount of slots (size of a `bucket_interval`) the timestamp is allowed to be /// in the past or future. - slot_diff: SlotDiff, + slot_range: RelativeRange, - /// Determines how partitions are assigned based in the input bucket. + /// Determines how partitions are assigned based on the input bucket. partition_by: FlushBatching, /// Hasher used to calculate partitions. hasher: ahash::RandomState, @@ -251,11 +251,11 @@ impl Inner { } } - let slot_diff = SlotDiff { - min: config + let slot_diff = RelativeRange { + max_in_past: config .max_secs_in_past .map_or(u64::MAX, |v| v.div_ceil(u64::from(bucket_interval))), - max: config + max_in_future: config .max_secs_in_future .map_or(u64::MAX, |v| v.div_ceil(u64::from(bucket_interval))), }; @@ -276,7 +276,7 @@ impl Inner { .map(|c| c.div_ceil(total_slots as u64)) .unwrap_or(u64::MAX), }, - slot_diff, + slot_range: slot_diff, partition_by: FlushBatching::Partition, hasher: build_hasher(), } @@ -326,7 +326,7 @@ impl Inner { key.timestamp = UnixTimestamp::from_secs(time_slot * u64::from(self.bucket_interval)); let now_slot = self.head / u64::from(self.num_partitions); - if !self.slot_diff.contains(now_slot, time_slot) { + if !self.slot_range.contains(now_slot, time_slot) { return Err(AggregateMetricsError::InvalidTimestamp(key.timestamp)); } @@ -503,21 +503,21 @@ impl fmt::Debug for Slot { } } -struct SlotDiff { - min: u64, - max: u64, +struct RelativeRange { + max_in_past: u64, + max_in_future: u64, } -impl SlotDiff { - pub fn contains(&self, now: u64, target: u64) -> bool { +impl RelativeRange { + fn contains(&self, now: u64, target: u64) -> bool { if target < now { // Timestamp/target in the past. let diff = now - target; - diff <= self.min + diff <= self.max_in_past } else { // Timestamp/target in the future. let diff = target - now; - diff <= self.max + diff <= self.max_in_future } } } diff --git a/relay-metrics/src/aggregator/mod.rs b/relay-metrics/src/aggregator/mod.rs index 7f61cbbb30..801e750c93 100644 --- a/relay-metrics/src/aggregator/mod.rs +++ b/relay-metrics/src/aggregator/mod.rs @@ -1,10 +1,4 @@ //! Core functionality of metrics aggregation. -//! -//! A new implementation of the [`crate::aggregator::Aggregator`], with slightly -//! different semantics and more optimized aggregation. -//! -//! This module is supposed to replace the [`crate::aggregator`] module, -//! if proven successful. use std::time::{Duration, SystemTime}; @@ -43,7 +37,26 @@ pub enum AggregateMetricsError { InvalidTimestamp(UnixTimestamp), } -/// An aggregator for metric [`Bucket`]'s. +/// A collector of [`Bucket`] submissions. +/// +/// # Aggregation +/// +/// Each metric is dispatched into the a [`Bucket`] depending on its project key (DSN), name, type, +/// unit, tags and timestamp. The bucket timestamp is rounded to the precision declared by the +/// `bucket_interval` field on the [AggregatorConfig] configuration. +/// +/// Each bucket stores the accumulated value of submitted metrics: +/// +/// - `Counter`: Sum of values. +/// - `Distribution`: A list of values. +/// - `Set`: A unique set of hashed values. +/// - `Gauge`: A summary of the reported values, see [`GaugeValue`](crate::GaugeValue). +/// +/// # Conflicts +/// +/// Metrics are uniquely identified by the combination of their name, type and unit. It is allowed +/// to send metrics of different types and units under the same name. For example, sending a metric +/// once as set and once as distribution will result in two actual metrics being recorded. #[derive(Debug)] pub struct Aggregator { name: String,