Skip to content

Commit

Permalink
review changes, small stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Dec 17, 2024
1 parent ab2dcc1 commit f16e425
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 29 deletions.
44 changes: 22 additions & 22 deletions relay-metrics/src/aggregator/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ impl fmt::Debug for Partition {
#[derive(Default, Debug)]
pub struct PartitionStats {
/// Amount of unique buckets in the partition.
#[expect(unused)]
#[expect(unused, reason = "used for snapshot tests")]
pub count: u64,
/// Amount of unique buckets in the partition by namespace.
pub count_by_namespace: ByNamespace<u64>,
/// Amount of times a bucket was merged in the partition.
#[expect(unused)]
#[expect(unused, reason = "used for snapshot tests")]
pub merges: u64,
/// Amount of times a bucket was merged in the partition by namespace.
pub merges_by_namespace: ByNamespace<u64>,
/// Cost of buckets in the partition.
#[expect(unused)]
#[expect(unused, reason = "used for snapshot tests")]
pub cost: u64,
/// Cost of buckets in the partition by namespace.
pub cost_by_namespace: ByNamespace<u64>,
Expand All @@ -72,12 +72,12 @@ impl From<&stats::Slot> for PartitionStats {
#[derive(Default, Debug)]
pub struct Stats {
/// Total amount of buckets in the aggregator.
#[expect(unused)]
#[expect(unused, reason = "used for snapshot tests")]
pub count: u64,
/// Total amount of buckets in the aggregator by namespace.
pub count_by_namespace: ByNamespace<u64>,
/// Total bucket cost in the aggregator.
#[expect(unused)]
#[expect(unused, reason = "used for snapshot tests")]
pub cost: u64,
/// Total bucket cost in the aggregator by namespace.
pub cost_by_namespace: ByNamespace<u64>,
Expand Down Expand Up @@ -184,14 +184,14 @@ pub struct Inner {
/// "time slot" and the second dimension is the assigned partition.
///
/// The total length of the ring-buffer is then determined by the amount of time slots times
/// the amount of partition. In other words, every time slot as [`Self::num_partitions`]
/// the amount of partitions. In other words, every time slot has [`Self::num_partitions`]
/// partitions.
///
/// Layout:
/// Time slots: [ ][ ][ ]
/// Partitions: [ ][ ] [ ][ ] [ ][ ]
///
/// An item is assigned by first determining it's time slot and then assigning it a partition
/// An item is assigned by first determining its time slot and then assigning it to a partition
/// based on selected [`Self::partition_by`] strategy.
///
/// The first item in the buffer is tracked by [`Self::head`] which is at any time the
Expand All @@ -207,7 +207,7 @@ pub struct Inner {
/// Position of the first element in [`Self::slots`].
head: u64,

/// Size of each individual bucket, inputs are truncated to this value.
/// Size of each individual bucket, inputs are truncated modulo to this value.
bucket_interval: u32,
/// Amount of slots which is added to a bucket as a delay.
///
Expand All @@ -221,9 +221,9 @@ pub struct Inner {

/// The maximum amount of slots (size of a `bucket_interval`) the timestamp is allowed to be
/// in the past or future.
slot_diff: SlotDiff,
slot_range: RelativeRange,

/// Determines how partitions are assigned based in the input bucket.
/// Determines how partitions are assigned based on the input bucket.
partition_by: FlushBatching,
/// Hasher used to calculate partitions.
hasher: ahash::RandomState,
Expand Down Expand Up @@ -251,11 +251,11 @@ impl Inner {
}
}

let slot_diff = SlotDiff {
min: config
let slot_diff = RelativeRange {
max_in_past: config
.max_secs_in_past
.map_or(u64::MAX, |v| v.div_ceil(u64::from(bucket_interval))),
max: config
max_in_future: config
.max_secs_in_future
.map_or(u64::MAX, |v| v.div_ceil(u64::from(bucket_interval))),
};
Expand All @@ -276,7 +276,7 @@ impl Inner {
.map(|c| c.div_ceil(total_slots as u64))
.unwrap_or(u64::MAX),
},
slot_diff,
slot_range: slot_diff,
partition_by: FlushBatching::Partition,
hasher: build_hasher(),
}
Expand Down Expand Up @@ -326,7 +326,7 @@ impl Inner {
key.timestamp = UnixTimestamp::from_secs(time_slot * u64::from(self.bucket_interval));

let now_slot = self.head / u64::from(self.num_partitions);
if !self.slot_diff.contains(now_slot, time_slot) {
if !self.slot_range.contains(now_slot, time_slot) {
return Err(AggregateMetricsError::InvalidTimestamp(key.timestamp));
}

Expand Down Expand Up @@ -503,21 +503,21 @@ impl fmt::Debug for Slot {
}
}

struct SlotDiff {
min: u64,
max: u64,
struct RelativeRange {
max_in_past: u64,
max_in_future: u64,
}

impl SlotDiff {
pub fn contains(&self, now: u64, target: u64) -> bool {
impl RelativeRange {
fn contains(&self, now: u64, target: u64) -> bool {
if target < now {
// Timestamp/target in the past.
let diff = now - target;
diff <= self.min
diff <= self.max_in_past
} else {
// Timestamp/target in the future.
let diff = target - now;
diff <= self.max
diff <= self.max_in_future
}
}
}
Expand Down
27 changes: 20 additions & 7 deletions relay-metrics/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
//! Core functionality of metrics aggregation.
//!
//! A new implementation of the [`crate::aggregator::Aggregator`], with slightly
//! different semantics and more optimized aggregation.
//!
//! This module is supposed to replace the [`crate::aggregator`] module,
//! if proven successful.
use std::time::{Duration, SystemTime};

Expand Down Expand Up @@ -43,7 +37,26 @@ pub enum AggregateMetricsError {
InvalidTimestamp(UnixTimestamp),
}

/// An aggregator for metric [`Bucket`]'s.
/// A collector of [`Bucket`] submissions.
///
/// # Aggregation
///
/// Each metric is dispatched into the a [`Bucket`] depending on its project key (DSN), name, type,
/// unit, tags and timestamp. The bucket timestamp is rounded to the precision declared by the
/// `bucket_interval` field on the [AggregatorConfig] configuration.
///
/// Each bucket stores the accumulated value of submitted metrics:
///
/// - `Counter`: Sum of values.
/// - `Distribution`: A list of values.
/// - `Set`: A unique set of hashed values.
/// - `Gauge`: A summary of the reported values, see [`GaugeValue`](crate::GaugeValue).
///
/// # Conflicts
///
/// Metrics are uniquely identified by the combination of their name, type and unit. It is allowed
/// to send metrics of different types and units under the same name. For example, sending a metric
/// once as set and once as distribution will result in two actual metrics being recorded.
#[derive(Debug)]
pub struct Aggregator {
name: String,
Expand Down

0 comments on commit f16e425

Please sign in to comment.