From d82089122a162bc162731e5ba1dd53c79e4ac406 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 18 Oct 2024 11:00:59 +0200 Subject: [PATCH] ref(metric-meta): Removes all metric meta code from Relay --- CHANGELOG.md | 12 +- relay-cogs/src/lib.rs | 4 - relay-config/src/config.rs | 38 +--- relay-config/src/redis.rs | 63 ------- relay-metrics/src/lib.rs | 4 - relay-metrics/src/meta/aggregator.rs | 152 ---------------- relay-metrics/src/meta/mod.rs | 11 -- relay-metrics/src/meta/protocol.rs | 165 ----------------- relay-metrics/src/meta/redis.rs | 170 ------------------ relay-metrics/src/statsd.rs | 13 -- relay-redis/src/noop.rs | 2 - relay-redis/src/real.rs | 2 - relay-server/src/endpoints/common.rs | 12 +- relay-server/src/envelope.rs | 13 +- relay-server/src/service.rs | 7 +- relay-server/src/services/processor.rs | 140 +-------------- relay-server/src/services/processor/event.rs | 1 - relay-server/src/services/projects/cache.rs | 40 +---- .../src/services/projects/project/mod.rs | 114 +----------- relay-server/src/services/stats.rs | 2 - relay-server/src/statsd.rs | 9 - relay-server/src/utils/rate_limits.rs | 2 - relay-server/src/utils/sizes.rs | 2 - tests/integration/test_metric_meta.py | 99 ---------- tests/integration/test_metrics.py | 50 +----- tests/integration/test_redis.py | 64 ------- 26 files changed, 33 insertions(+), 1158 deletions(-) delete mode 100644 relay-metrics/src/meta/aggregator.rs delete mode 100644 relay-metrics/src/meta/mod.rs delete mode 100644 relay-metrics/src/meta/protocol.rs delete mode 100644 relay-metrics/src/meta/redis.rs delete mode 100644 tests/integration/test_metric_meta.py diff --git a/CHANGELOG.md b/CHANGELOG.md index d6004291635..c75570ec2fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,14 @@ # Changelog +## Unreleased + +**Breaking Changes**: + +- Removes support for metric meta envelope items. ([#4152](https://github.com/getsentry/relay/pull/4152)) + ## 24.10.0 -**Breaking Changes:** +**Breaking Changes**: - Only allow processing enabled in managed mode. ([#4087](https://github.com/getsentry/relay/pull/4087)) @@ -12,7 +18,7 @@ - Use the duration reported by the profiler instead of the transaction. ([#4058](https://github.com/getsentry/relay/pull/4058)) - Incorrect pattern matches involving adjacent any and wildcard matchers. ([#4072](https://github.com/getsentry/relay/pull/4072)) -**Features:** +**Features**: - Add a config option to add default tags to all Relay Sentry events. ([#3944](https://github.com/getsentry/relay/pull/3944)) - Automatically derive `client.address` and `user.geo` for standalone spans. ([#4047](https://github.com/getsentry/relay/pull/4047)) @@ -23,7 +29,7 @@ - Add support for creating User from LoginId in Unreal Crash Context. ([#4093](https://github.com/getsentry/relay/pull/4093)) - Add multi-write Redis client. ([#4064](https://github.com/getsentry/relay/pull/4064)) -**Internal:** +**Internal**: - Remove unused `cogs.enabled` configuration option. ([#4060](https://github.com/getsentry/relay/pull/4060)) - Add the dynamic sampling rate to standalone spans as a measurement so that it can be stored, queried, and used for extrapolation. ([#4063](https://github.com/getsentry/relay/pull/4063)) diff --git a/relay-cogs/src/lib.rs b/relay-cogs/src/lib.rs index 7c5a08fbfed..68401d11ebc 100644 --- a/relay-cogs/src/lib.rs +++ b/relay-cogs/src/lib.rs @@ -132,9 +132,6 @@ pub enum AppFeature { /// This app feature is for continuous profiling. Profiles, - /// Metric metadata. - MetricMeta, - /// Metrics in the transactions namespace. MetricsTransactions, /// Metrics in the spans namespace. @@ -168,7 +165,6 @@ impl AppFeature { Self::ClientReports => "client_reports", Self::CheckIns => "check_ins", Self::Replays => "replays", - Self::MetricMeta => "metric_meta", Self::MetricsTransactions => "metrics_transactions", Self::MetricsSpans => "metrics_spans", Self::MetricsProfiles => "metrics_profiles", diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 54a6b35b606..2a98bc63a33 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -559,17 +559,9 @@ impl Default for Metrics { } /// Controls processing of Sentry metrics and metric metadata. -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Default)] #[serde(default)] struct SentryMetrics { - /// Code locations expiry in seconds. - /// - /// Defaults to 15 days. - pub meta_locations_expiry: u64, - /// Maximum amount of code locations to store per metric. - /// - /// Defaults to 5. - pub meta_locations_max: usize, /// Whether metric stats are collected and emitted. /// /// Metric stats are always collected and emitted when processing @@ -583,16 +575,6 @@ struct SentryMetrics { pub metric_stats_enabled: bool, } -impl Default for SentryMetrics { - fn default() -> Self { - Self { - meta_locations_expiry: 15 * 24 * 60 * 60, - meta_locations_max: 5, - metric_stats_enabled: false, - } - } -} - /// Controls various limits #[derive(Serialize, Deserialize, Debug)] #[serde(default)] @@ -633,8 +615,6 @@ struct Limits { max_statsd_size: ByteSize, /// The maximum payload size for metric buckets. max_metric_buckets_size: ByteSize, - /// The maximum payload size for metric metadata. - max_metric_meta_size: ByteSize, /// The maximum payload size for a compressed replay. max_replay_compressed_size: ByteSize, /// The maximum payload size for an uncompressed replay. @@ -686,7 +666,6 @@ impl Default for Limits { max_span_size: ByteSize::mebibytes(1), max_statsd_size: ByteSize::mebibytes(1), max_metric_buckets_size: ByteSize::mebibytes(1), - max_metric_meta_size: ByteSize::mebibytes(1), max_replay_compressed_size: ByteSize::mebibytes(10), max_replay_uncompressed_size: ByteSize::mebibytes(100), max_replay_message_size: ByteSize::mebibytes(15), @@ -2071,16 +2050,6 @@ impl Config { self.values.metrics.sample_rate } - /// Returns the maximum amount of code locations per metric. - pub fn metrics_meta_locations_max(&self) -> usize { - self.values.sentry_metrics.meta_locations_max - } - - /// Returns the expiry for code locations. - pub fn metrics_meta_locations_expiry(&self) -> Duration { - Duration::from_secs(self.values.sentry_metrics.meta_locations_expiry) - } - /// Returns the interval for periodic metrics emitted from Relay. /// /// `None` if periodic metrics are disabled. @@ -2296,11 +2265,6 @@ impl Config { self.values.limits.max_metric_buckets_size.as_bytes() } - /// Returns the maximum payload size of metric metadata in bytes. - pub fn max_metric_meta_size(&self) -> usize { - self.values.limits.max_metric_meta_size.as_bytes() - } - /// Whether metric stats are collected and emitted. /// /// Metric stats are always collected and emitted when processing diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index 48748631957..6dec921f656 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -178,8 +178,6 @@ pub enum RedisConfigs { cardinality: Box, /// Configuration for the `quotas` pool. quotas: Box, - /// Configuration for the `misc` pool. - misc: Box, }, } @@ -221,8 +219,6 @@ pub enum RedisPoolConfigs<'a> { cardinality: RedisConfigRef<'a>, /// Configuration for the `quotas` pool. quotas: RedisConfigRef<'a>, - /// Configuration for the `misc` pool. - misc: RedisConfigRef<'a>, }, } @@ -295,18 +291,15 @@ pub(super) fn create_redis_pools(configs: &RedisConfigs, cpu_concurrency: u32) - project_configs, cardinality, quotas, - misc, } => { let project_configs = create_redis_pool(project_configs, project_configs_default_connections); let cardinality = create_redis_pool(cardinality, cpu_concurrency); let quotas = create_redis_pool(quotas, cpu_concurrency); - let misc = create_redis_pool(misc, cpu_concurrency); RedisPoolConfigs::Individual { project_configs, cardinality, quotas, - misc, } } } @@ -381,16 +374,6 @@ quotas: - "redis://127.0.0.2:6379" max_connections: 17 connection_timeout: 5 -misc: - configs: - - cluster_nodes: - - "redis://127.0.0.1:6379" - - "redis://127.0.0.2:6379" - max_connections: 42 - connection_timeout: 5 - - server: "redis://127.0.0.1:6379" - max_connections: 84 - connection_timeout: 10 "#; let configs: RedisConfigs = serde_yaml::from_str(yaml) @@ -420,29 +403,6 @@ misc: ..Default::default() }, }), - misc: Box::new(RedisConfig::MultiWrite { - configs: vec![ - RedisConfig::Cluster { - cluster_nodes: vec![ - "redis://127.0.0.1:6379".to_owned(), - "redis://127.0.0.2:6379".to_owned(), - ], - options: PartialRedisConfigOptions { - max_connections: Some(42), - connection_timeout: 5, - ..Default::default() - }, - }, - RedisConfig::Single(SingleRedisConfig::Detailed { - server: "redis://127.0.0.1:6379".to_owned(), - options: PartialRedisConfigOptions { - max_connections: Some(84), - connection_timeout: 10, - ..Default::default() - }, - }), - ], - }), }; assert_eq!(configs, expected); @@ -736,17 +696,6 @@ read_timeout: 10 }), ], }), - misc: Box::new(RedisConfig::Cluster { - cluster_nodes: vec![ - "redis://127.0.0.1:6379".to_owned(), - "redis://127.0.0.2:6379".to_owned(), - ], - options: PartialRedisConfigOptions { - max_connections: Some(84), - connection_timeout: 10, - ..Default::default() - }, - }), }; assert_json_snapshot!(configs, @r###" @@ -792,18 +741,6 @@ read_timeout: 10 "write_timeout": 3 } ] - }, - "misc": { - "cluster_nodes": [ - "redis://127.0.0.1:6379", - "redis://127.0.0.2:6379" - ], - "max_connections": 84, - "connection_timeout": 10, - "max_lifetime": 300, - "idle_timeout": 60, - "read_timeout": 3, - "write_timeout": 3 } } "###); diff --git a/relay-metrics/src/lib.rs b/relay-metrics/src/lib.rs index 396236c2c45..4897553525b 100644 --- a/relay-metrics/src/lib.rs +++ b/relay-metrics/src/lib.rs @@ -70,7 +70,6 @@ pub mod aggregator; pub mod cogs; -pub mod meta; mod bucket; mod finite; @@ -80,8 +79,5 @@ mod view; pub use bucket::*; pub use finite::*; -#[cfg(feature = "redis")] -pub use meta::RedisMetricMetaStore; -pub use meta::{MetaAggregator, MetricMeta}; pub use protocol::*; pub use view::*; diff --git a/relay-metrics/src/meta/aggregator.rs b/relay-metrics/src/meta/aggregator.rs deleted file mode 100644 index db7c051efb8..00000000000 --- a/relay-metrics/src/meta/aggregator.rs +++ /dev/null @@ -1,152 +0,0 @@ -use std::{ - collections::{HashMap, HashSet}, - hash::Hash, -}; - -use relay_base_schema::project::ProjectKey; - -use super::{Item, Location, MetricMeta, StartOfDayUnixTimestamp}; -use crate::{statsd::MetricCounters, MetricResourceIdentifier}; - -/// A metrics meta aggregator. -/// -/// Aggregates metric metadata based on their scope (project, mri, timestamp) and -/// only keeps the most relevant entries. -/// -/// Currently we track the first N amount of unique metric meta elements we get. -/// -/// This should represent the actual adoption rate of different code versions. -/// -/// This aggregator is purely in memeory and will lose its state on restart, -/// which may cause multiple different items being emitted after restarts. -/// For this we have de-deuplication in the storage and the volume overall -/// of this happening is small enough to just add it to the storage worst case. -#[derive(Debug)] -pub struct MetaAggregator { - /// All tracked code locations. - locations: hashbrown::HashMap>, - - /// Maximum tracked locations. - max_locations: usize, -} - -impl MetaAggregator { - /// Creates a new metrics meta aggregator. - pub fn new(max_locations: usize) -> Self { - Self { - locations: hashbrown::HashMap::new(), - max_locations, - } - } - - /// Adds a new meta item to the aggregator. - /// - /// Returns a new [`MetricMeta`] element when the element should be stored - /// or sent upstream for storage. - /// - /// Returns `None` when the meta item was already seen or is not considered relevant. - pub fn add(&mut self, project_key: ProjectKey, meta: MetricMeta) -> Option { - let mut send_upstream = HashMap::new(); - - let mut total = 0; - for (mri, items) in meta.mapping { - let scope = Scope { - timestamp: meta.timestamp, - project_key, - mri, - }; - - total += items.len(); - if let Some(items) = self.add_scoped(&scope, items) { - send_upstream.insert(scope.mri, items); - } - } - - relay_statsd::metric!(counter(MetricCounters::MetaAggregatorItems) += total as i64); - - if send_upstream.is_empty() { - return None; - } - - relay_statsd::metric!(counter(MetricCounters::MetaAggregatorUpdate) += 1); - Some(MetricMeta { - timestamp: meta.timestamp, - mapping: send_upstream, - }) - } - - /// Retrieves all currently relevant metric meta for a project. - pub fn get_all_relevant(&self, project_key: ProjectKey) -> impl Iterator { - let locations = self - .locations - .iter() - .filter(|(scope, _)| scope.project_key == project_key); - - let mut result = HashMap::new(); - - for (scope, locations) in locations { - result - .entry(scope.timestamp) - .or_insert_with(|| MetricMeta { - timestamp: scope.timestamp, - mapping: HashMap::new(), - }) - .mapping - .entry(scope.mri.clone()) // This clone sucks - .or_insert_with(Vec::new) - .extend(locations.iter().cloned().map(Item::Location)); - } - - result.into_values() - } - - /// Remove all contained state related to a project. - pub fn clear(&mut self, project_key: ProjectKey) { - self.locations - .retain(|scope, _| scope.project_key != project_key); - } - - fn add_scoped(&mut self, scope: &Scope, items: Vec) -> Option> { - // Entry ref needs hashbrown, we would have to clone the scope without or do a separate lookup. - let locations = self.locations.entry_ref(scope).or_default(); - let mut send_upstream = Vec::new(); - - for item in items { - match item { - Item::Location(location) => { - if locations.len() > self.max_locations { - break; - } - - if !locations.contains(&location) { - locations.insert(location.clone()); - send_upstream.push(Item::Location(location)); - } - } - Item::Unknown => {} - } - } - - (!send_upstream.is_empty()).then_some(send_upstream) - } -} - -/// The metadata scope. -/// -/// We scope metadata by project, mri and day, -/// represented as a unix timestamp at the beginning of the day. -/// -/// The technical scope (e.g. redis key) also includes the organization id, but this -/// can be inferred from the project. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -struct Scope { - pub timestamp: StartOfDayUnixTimestamp, - pub project_key: ProjectKey, - pub mri: MetricResourceIdentifier<'static>, -} - -impl From<&Scope> for Scope { - fn from(value: &Scope) -> Self { - value.clone() - } -} diff --git a/relay-metrics/src/meta/mod.rs b/relay-metrics/src/meta/mod.rs deleted file mode 100644 index 5504e0dc5a3..00000000000 --- a/relay-metrics/src/meta/mod.rs +++ /dev/null @@ -1,11 +0,0 @@ -//! Functionality for aggregating and storing of metrics metadata. - -mod aggregator; -mod protocol; -#[cfg(feature = "redis")] -mod redis; - -pub use self::aggregator::*; -pub use self::protocol::*; -#[cfg(feature = "redis")] -pub use self::redis::*; diff --git a/relay-metrics/src/meta/protocol.rs b/relay-metrics/src/meta/protocol.rs deleted file mode 100644 index 7a1223cf8ef..00000000000 --- a/relay-metrics/src/meta/protocol.rs +++ /dev/null @@ -1,165 +0,0 @@ -use std::collections::HashMap; - -use chrono::{DateTime, Utc}; -use relay_common::time::UnixTimestamp; -use serde::{Deserialize, Serialize}; - -use crate::MetricResourceIdentifier; - -/// A metric metadata item. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct MetricMeta { - /// Timestamp scope for the contained metadata. - /// - /// Metric metadata is collected in daily intervals, so this may be truncated - /// to the start of the day (UTC) already. - pub timestamp: StartOfDayUnixTimestamp, - - /// The contained metadata mapped by MRI. - pub mapping: HashMap, Vec>, -} - -/// A metadata item. -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(tag = "type", rename_all = "lowercase")] -pub enum Item { - /// A location metadata pointing to the code location where the metric originates from. - Location(Location), - /// Unknown item. - #[serde(other)] - Unknown, -} - -/// A code location. -#[derive(Clone, Debug, PartialEq, Eq, Hash, Deserialize, Serialize)] -pub struct Location { - /// The relative file path. - #[serde(skip_serializing_if = "Option::is_none")] - pub filename: Option, - /// The absolute file path. - #[serde(skip_serializing_if = "Option::is_none")] - pub abs_path: Option, - /// The containing module name or path. - #[serde(skip_serializing_if = "Option::is_none")] - pub module: Option, - /// The containing function name. - #[serde(skip_serializing_if = "Option::is_none")] - pub function: Option, - /// The line number. - #[serde(skip_serializing_if = "Option::is_none")] - pub lineno: Option, - /// Source code leading up to `lineno`. - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub pre_context: Vec>, - /// Source code of the current line (`lineno`). - #[serde(skip_serializing_if = "Option::is_none")] - pub context_line: Option, - /// Source code of the lines after `lineno`. - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub post_context: Vec>, -} - -/// A Unix timestamp that is truncated to the start of the day. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct StartOfDayUnixTimestamp(UnixTimestamp); - -impl StartOfDayUnixTimestamp { - /// Creates a new `StartOfDayUnixTimestamp` from a timestamp by truncating it. - /// - /// May return none when passed an invalid date, but in practice this never fails - /// since the [`UnixTimestamp`] is already sufficiently validated. - pub fn new(ts: UnixTimestamp) -> Option { - let dt: DateTime = DateTime::from_timestamp(ts.as_secs().try_into().ok()?, 0)?; - let beginning_of_day = dt.date_naive().and_hms_opt(0, 0, 0)?.and_utc(); - Some(Self(UnixTimestamp::from_datetime(beginning_of_day)?)) - } - - /// Returns the underlying unix timestamp, truncated to the start of the day. - pub fn as_timestamp(&self) -> UnixTimestamp { - self.0 - } -} - -impl std::ops::Deref for StartOfDayUnixTimestamp { - type Target = UnixTimestamp; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl Serialize for StartOfDayUnixTimestamp { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - self.0.serialize(serializer) - } -} - -impl<'de> Deserialize<'de> for StartOfDayUnixTimestamp { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let ts = UnixTimestamp::deserialize(deserializer)?; - StartOfDayUnixTimestamp::new(ts) - .ok_or_else(|| serde::de::Error::custom("invalid timestamp")) - } -} - -#[cfg(test)] -mod tests { - use insta::assert_json_snapshot; - - use super::*; - - #[test] - fn test_deserialize_null_context() { - let json = r#"{ - "timestamp": 1715904000, - "mapping": { - "d:memory.allocations@allocations": [{ - "abs_path": "/rails/config/initializers/active_job.rb", - "function": "block in
", - "lineno": 5, - "filename": "config/initializers/active_job.rb", - "pre_context": [null, " allocations = event.allocations\n", " allocations=#{allocations}\"\n"], - "context_line": " Sentry::Metrics.distribution('memory.allocations')\n", - "post_context": ["end\n",null,null], - "type":"location" - }] - } - }"#; - - let r: MetricMeta = serde_json::from_str(json).unwrap(); - - assert_json_snapshot!(r, @r###" - { - "timestamp": 1715904000, - "mapping": { - "d:custom/memory.allocations@allocations": [ - { - "type": "location", - "filename": "config/initializers/active_job.rb", - "abs_path": "/rails/config/initializers/active_job.rb", - "function": "block in
", - "lineno": 5, - "pre_context": [ - null, - " allocations = event.allocations\n", - " allocations=#{allocations}\"\n" - ], - "context_line": " Sentry::Metrics.distribution('memory.allocations')\n", - "post_context": [ - "end\n", - null, - null - ] - } - ] - } - } - "###); - } -} diff --git a/relay-metrics/src/meta/redis.rs b/relay-metrics/src/meta/redis.rs deleted file mode 100644 index 60cda8d50fb..00000000000 --- a/relay-metrics/src/meta/redis.rs +++ /dev/null @@ -1,170 +0,0 @@ -use std::time::Duration; - -use hash32::{FnvHasher, Hasher as _}; -use relay_base_schema::project::ProjectId; -use relay_common::time::UnixTimestamp; -use relay_redis::{RedisError, RedisPool}; - -use super::{Item, MetricMeta}; -use crate::{statsd::MetricCounters, MetricResourceIdentifier}; - -/// Redis metric meta -pub struct RedisMetricMetaStore { - redis: RedisPool, - expiry: Duration, -} - -impl RedisMetricMetaStore { - /// Creates a new Redis metrics meta store. - pub fn new(redis: RedisPool, expiry: Duration) -> Self { - Self { redis, expiry } - } - - /// Stores metric metadata in Redis. - pub fn store( - &self, - organization_id: u64, - project_id: ProjectId, - meta: MetricMeta, - ) -> Result<(), RedisError> { - let mut client = self.redis.client()?; - let mut connection = client.connection()?; - - let mut redis_updates = 0; - - let mut pipe = relay_redis::redis::pipe(); - for (mri, items) in meta.mapping { - let key = self.build_redis_key(organization_id, project_id, *meta.timestamp, &mri); - - // Should be fine if we don't batch here, we expect a very small amount of locations - // from the aggregator. - let location_cmd = pipe.cmd("SADD").arg(&key); - for item in items { - match item { - Item::Location(location) => { - let member = serde_json::to_string(&location).unwrap(); - location_cmd.arg(member); - } - Item::Unknown => {} - } - } - location_cmd.ignore(); - - redis_updates += 1; - relay_log::trace!("storing metric meta for project {organization_id}:{project_id}"); - - // use original timestamp to not bump expiry - let expire_at = meta.timestamp.as_secs() + self.expiry.as_secs(); - pipe.cmd("EXPIREAT").arg(key).arg(expire_at).ignore(); - } - pipe.query::<()>(&mut connection) - .map_err(RedisError::Redis)?; - - relay_statsd::metric!(counter(MetricCounters::MetaRedisUpdate) += redis_updates); - - Ok(()) - } - - fn build_redis_key( - &self, - organization_id: u64, - project_id: ProjectId, - timestamp: UnixTimestamp, - mri: &MetricResourceIdentifier<'_>, - ) -> String { - let mri_hash = mri_to_fnv1a32(mri); - - format!("mm:l:{{{organization_id}}}:{project_id}:{mri_hash}:{timestamp}") - } -} - -/// Converts an MRI to a fnv1a32 hash. -/// -/// Sentry also uses the exact same algorithm to hash MRIs. -fn mri_to_fnv1a32(mri: &MetricResourceIdentifier<'_>) -> u32 { - let mut hasher = FnvHasher::default(); - - let s = mri.to_string(); - // hash the bytes directly, `write_str` on the hasher adds a 0xff byte at the end - std::hash::Hasher::write(&mut hasher, s.as_bytes()); - hasher.finish32() -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - - use relay_redis::RedisConfigOptions; - - use crate::meta::{Location, StartOfDayUnixTimestamp}; - - use super::*; - - fn build_store() -> RedisMetricMetaStore { - let url = std::env::var("RELAY_REDIS_URL") - .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()); - - let redis = RedisPool::single(&url, RedisConfigOptions::default()).unwrap(); - - RedisMetricMetaStore::new(redis, Duration::from_secs(86400)) - } - - #[test] - fn test_store() { - let store = build_store(); - - let organization_id = 1000; - let project_id = ProjectId::new(2); - let mri = MetricResourceIdentifier::parse("c:foo").unwrap(); - - let timestamp = StartOfDayUnixTimestamp::new(UnixTimestamp::now()).unwrap(); - - let location = Location { - filename: Some("foobar".to_owned()), - abs_path: None, - module: None, - function: None, - lineno: Some(42), - pre_context: Vec::new(), - context_line: None, - post_context: Vec::new(), - }; - - store - .store( - organization_id, - project_id, - MetricMeta { - timestamp, - mapping: HashMap::from([(mri.clone(), vec![Item::Location(location.clone())])]), - }, - ) - .unwrap(); - - let mut client = store.redis.client().unwrap(); - let mut connection = client.connection().unwrap(); - let key = store.build_redis_key(organization_id, project_id, *timestamp, &mri); - let locations: Vec = relay_redis::redis::cmd("SMEMBERS") - .arg(key) - .query(&mut connection) - .unwrap(); - - assert_eq!(locations, vec![serde_json::to_string(&location).unwrap()]); - } - - #[test] - fn test_mri_hash() { - fn test_mri(s: &str, expected_hash: u32) { - let mri = MetricResourceIdentifier::parse(s).unwrap(); - assert_eq!(mri_to_fnv1a32(&mri), expected_hash); - } - - // Sentry has the same tests. - test_mri("c:transactions/count_per_root_project@none", 2684394786); - test_mri("d:transactions/duration@millisecond", 1147819254); - test_mri("s:transactions/user@none", 1739810785); - test_mri("c:custom/user.click@none", 1248146441); - test_mri("d:custom/page.load@millisecond", 2103554973); - test_mri("s:custom/username@none", 670706478); - } -} diff --git a/relay-metrics/src/statsd.rs b/relay-metrics/src/statsd.rs index ae7976a1fa3..f77ff107a22 100644 --- a/relay-metrics/src/statsd.rs +++ b/relay-metrics/src/statsd.rs @@ -40,15 +40,6 @@ pub enum MetricCounters { /// - `aggregator`: The name of the metrics aggregator (usually `"default"`). /// - `namespace`: The namespace of the metric. MergeMiss, - - /// Incremented every time the meta aggregator emitted an update that needs to be stored or - /// sent upstream. - MetaAggregatorUpdate, - /// Incremnted for every metric meta item added to the metric meta aggregator. - MetaAggregatorItems, - /// Incremented every time a redis key is updated to store or update metadata. - #[cfg(feature = "redis")] - MetaRedisUpdate, } impl CounterMetric for MetricCounters { @@ -56,10 +47,6 @@ impl CounterMetric for MetricCounters { match *self { Self::MergeHit => "metrics.buckets.merge.hit", Self::MergeMiss => "metrics.buckets.merge.miss", - Self::MetaAggregatorUpdate => "metrics.meta.agg.update", - Self::MetaAggregatorItems => "metrics.meta.agg.items", - #[cfg(feature = "redis")] - Self::MetaRedisUpdate => "metrics.meta.redis.update", } } } diff --git a/relay-redis/src/noop.rs b/relay-redis/src/noop.rs index 452fed960ad..944c9929c5c 100644 --- a/relay-redis/src/noop.rs +++ b/relay-redis/src/noop.rs @@ -52,6 +52,4 @@ pub struct RedisPools { pub cardinality: RedisPool, /// The pool used for rate limiting/quotas. pub quotas: RedisPool, - /// The pool used for metrics metadata. - pub misc: RedisPool, } diff --git a/relay-redis/src/real.rs b/relay-redis/src/real.rs index f47d4c929e6..bd8b58972f2 100644 --- a/relay-redis/src/real.rs +++ b/relay-redis/src/real.rs @@ -381,8 +381,6 @@ pub struct RedisPools { pub cardinality: RedisPool, /// The pool used for rate limiting/quotas. pub quotas: RedisPool, - /// The pool used for metrics metadata. - pub misc: RedisPool, } /// Stats about how the [`RedisPool`] is performing. diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 5a124b84b73..6368c4fbe74 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -12,7 +12,7 @@ use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, I use crate::service::ServiceState; use crate::services::buffer::EnvelopeBuffer; use crate::services::outcome::{DiscardReason, Outcome}; -use crate::services::processor::{MetricData, ProcessMetricMeta, ProcessingGroup}; +use crate::services::processor::{MetricData, ProcessingGroup}; use crate::services::projects::cache::{CheckEnvelope, ProcessMetrics, ValidateEnvelope}; use crate::statsd::{RelayCounters, RelayHistograms}; use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope}; @@ -282,16 +282,6 @@ fn queue_envelope( source: envelope.meta().into(), }); } - - // Remove metric meta from the envelope and send them directly to processing. - let metric_meta = envelope.take_items_by(|item| matches!(item.ty(), ItemType::MetricMeta)); - if !metric_meta.is_empty() { - relay_log::trace!("sending metric meta into processing queue"); - state.processor().send(ProcessMetricMeta { - items: metric_meta.into_vec(), - project_key: envelope.meta().public_key(), - }) - } } // Split off the envelopes by item type. diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index afc9b3fb787..5e9673509cc 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -105,8 +105,6 @@ pub enum ItemType { Statsd, /// Buckets of preaggregated metrics encoded as JSON. MetricBuckets, - /// Additional metadata for metrics - MetricMeta, /// Client internal report (eg: outcomes). ClientReport, /// Profile event payload encoded as JSON. @@ -168,7 +166,6 @@ impl ItemType { Self::Sessions => "sessions", Self::Statsd => "statsd", Self::MetricBuckets => "metric_buckets", - Self::MetricMeta => "metric_meta", Self::ClientReport => "client_report", Self::Profile => "profile", Self::ReplayEvent => "replay_event", @@ -192,10 +189,7 @@ impl ItemType { /// Returns `true` if the item is a metric type. pub fn is_metrics(&self) -> bool { - matches!( - self, - ItemType::Statsd | ItemType::MetricBuckets | ItemType::MetricMeta - ) + matches!(self, ItemType::Statsd | ItemType::MetricBuckets) } } @@ -224,7 +218,6 @@ impl std::str::FromStr for ItemType { "sessions" => Self::Sessions, "statsd" => Self::Statsd, "metric_buckets" => Self::MetricBuckets, - "metric_meta" => Self::MetricMeta, "client_report" => Self::ClientReport, "profile" => Self::Profile, "replay_event" => Self::ReplayEvent, @@ -685,7 +678,7 @@ impl Item { ItemType::UnrealReport => Some(DataCategory::Error), ItemType::Attachment => Some(DataCategory::Attachment), ItemType::Session | ItemType::Sessions => None, - ItemType::Statsd | ItemType::MetricBuckets | ItemType::MetricMeta => None, + ItemType::Statsd | ItemType::MetricBuckets => None, ItemType::FormData => None, ItemType::UserReport => None, ItemType::UserReportV2 => Some(DataCategory::UserReportV2), @@ -915,7 +908,6 @@ impl Item { | ItemType::Sessions | ItemType::Statsd | ItemType::MetricBuckets - | ItemType::MetricMeta | ItemType::ClientReport | ItemType::ReplayEvent | ItemType::ReplayRecording @@ -952,7 +944,6 @@ impl Item { ItemType::Sessions => false, ItemType::Statsd => false, ItemType::MetricBuckets => false, - ItemType::MetricMeta => false, ItemType::ClientReport => false, ItemType::ReplayRecording => false, ItemType::ReplayVideo => false, diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index d7e954fe86d..f4c8051284d 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -431,25 +431,21 @@ pub fn create_redis_pools(configs: RedisPoolConfigs) -> Result { let project_configs = create_redis_pool(project_configs)?; let cardinality = create_redis_pool(cardinality)?; let quotas = create_redis_pool(quotas)?; - let misc = create_redis_pool(misc)?; Ok(RedisPools { project_configs, cardinality, quotas, - misc, }) } } @@ -460,11 +456,10 @@ fn initialize_redis_scripts_for_pools(redis_pools: &RedisPools) -> Result<(), Re let project_configs = redis_pools.project_configs.client()?; let cardinality = redis_pools.cardinality.client()?; let quotas = redis_pools.quotas.client()?; - let misc = redis_pools.misc.client()?; let scripts = RedisScripts::all(); - let pools = [project_configs, cardinality, quotas, misc]; + let pools = [project_configs, cardinality, quotas]; for pool in pools { initialize_redis_scripts(pool, &scripts)?; } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 494630a01bc..f1b2d3e93f5 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -30,7 +30,7 @@ use relay_event_schema::protocol::{ ClientReport, Event, EventId, EventType, IpAddr, Metrics, NetworkReportError, }; use relay_filter::FilterStatKey; -use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricMeta, MetricNamespace}; +use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace}; use relay_pii::PiiConfigError; use relay_profiling::ProfileId; use relay_protocol::Annotated; @@ -52,7 +52,6 @@ use { RedisSetLimiterOptions, }, relay_dynamic_config::{CardinalityLimiterMode, GlobalConfig, MetricExtractionGroups}, - relay_metrics::RedisMetricMetaStore, relay_quotas::{Quota, RateLimitingError, RedisRateLimiter}, relay_redis::{RedisPool, RedisPools}, std::iter::Chain, @@ -73,7 +72,7 @@ use crate::services::metrics::{Aggregator, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::event::FiltersStatus; use crate::services::projects::cache::{ - AddMetricMeta, BucketSource, ProcessMetrics, ProjectCache, UpdateRateLimits, + BucketSource, ProcessMetrics, ProjectCache, UpdateRateLimits, }; use crate::services::projects::project::{ProjectInfo, ProjectState}; use crate::services::test_store::{Capture, TestStore}; @@ -992,15 +991,6 @@ pub struct ProcessBatchedMetrics { pub sent_at: Option>, } -/// Parses a list of metric meta items and pushes them to the project cache for aggregation. -#[derive(Debug)] -pub struct ProcessMetricMeta { - /// A list of metric meta items. - pub items: Vec, - /// The target project. - pub project_key: ProjectKey, -} - /// Metric buckets with additional project. #[derive(Debug, Clone)] pub struct ProjectMetrics { @@ -1019,18 +1009,6 @@ pub struct EncodeMetrics { pub scopes: BTreeMap, } -/// Encodes metric meta into an [`Envelope`] and sends it upstream. -/// -/// At the moment, upstream means directly into Redis for processing relays -/// and otherwise submitting the Envelope via HTTP to the [`UpstreamRelay`]. -#[derive(Debug)] -pub struct EncodeMetricMeta { - /// Scoping of the meta. - pub scoping: Scoping, - /// The metric meta. - pub meta: MetricMeta, -} - /// Sends an envelope to the upstream or Kafka. #[derive(Debug)] pub struct SubmitEnvelope { @@ -1052,9 +1030,7 @@ pub enum EnvelopeProcessor { ProcessEnvelope(Box), ProcessProjectMetrics(Box), ProcessBatchedMetrics(Box), - ProcessMetricMeta(Box), EncodeMetrics(Box), - EncodeMetricMeta(Box), SubmitEnvelope(Box), SubmitClientReports(Box), } @@ -1066,9 +1042,7 @@ impl EnvelopeProcessor { EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope", EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics", EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics", - EnvelopeProcessor::ProcessMetricMeta(_) => "ProcessMetricMeta", EnvelopeProcessor::EncodeMetrics(_) => "EncodeMetrics", - EnvelopeProcessor::EncodeMetricMeta(_) => "EncodeMetricMeta", EnvelopeProcessor::SubmitEnvelope(_) => "SubmitEnvelope", EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports", } @@ -1101,14 +1075,6 @@ impl FromMessage for EnvelopeProcessor { } } -impl FromMessage for EnvelopeProcessor { - type Response = NoResponse; - - fn from_message(message: ProcessMetricMeta, _: ()) -> Self { - Self::ProcessMetricMeta(Box::new(message)) - } -} - impl FromMessage for EnvelopeProcessor { type Response = NoResponse; @@ -1117,14 +1083,6 @@ impl FromMessage for EnvelopeProcessor { } } -impl FromMessage for EnvelopeProcessor { - type Response = NoResponse; - - fn from_message(message: EncodeMetricMeta, _: ()) -> Self { - Self::EncodeMetricMeta(Box::new(message)) - } -} - impl FromMessage for EnvelopeProcessor { type Response = NoResponse; @@ -1186,8 +1144,6 @@ struct InnerProcessor { rate_limiter: Option, geoip_lookup: Option, #[cfg(feature = "processing")] - metric_meta_store: Option, - #[cfg(feature = "processing")] cardinality_limiter: Option, metric_outcomes: MetricOutcomes, } @@ -1214,14 +1170,13 @@ impl EnvelopeProcessorService { }); #[cfg(feature = "processing")] - let (cardinality, quotas, misc) = match redis { + let (cardinality, quotas) = match redis { Some(RedisPools { cardinality, quotas, - misc, .. - }) => (Some(cardinality), Some(quotas), Some(misc)), - None => (None, None, None), + }) => (Some(cardinality), Some(quotas)), + None => (None, None), }; let inner = InnerProcessor { @@ -1236,10 +1191,6 @@ impl EnvelopeProcessorService { addrs, geoip_lookup, #[cfg(feature = "processing")] - metric_meta_store: misc.map(|misc| { - RedisMetricMetaStore::new(misc, config.metrics_meta_locations_expiry()) - }), - #[cfg(feature = "processing")] cardinality_limiter: cardinality .map(|cardinality| { RedisSetLimiter::new( @@ -2214,35 +2165,6 @@ impl EnvelopeProcessorService { } } - fn handle_process_metric_meta(&self, message: ProcessMetricMeta) { - let ProcessMetricMeta { items, project_key } = message; - - for item in items { - if item.ty() != &ItemType::MetricMeta { - relay_log::error!( - "invalid item of type {} passed to ProcessMetricMeta", - item.ty() - ); - continue; - } - - let payload = item.payload(); - match serde_json::from_slice::(&payload) { - Ok(meta) => { - relay_log::trace!("adding metric metadata to project cache"); - self.inner - .addrs - .project_cache - .send(AddMetricMeta { project_key, meta }); - } - Err(error) => { - metric!(counter(RelayCounters::MetricMetaParsingFailed) += 1); - relay_log::debug!(error = &error as &dyn Error, "failed to parse metric meta"); - } - } - } - } - fn handle_submit_envelope(&self, message: SubmitEnvelope) { let SubmitEnvelope { mut envelope } = message; @@ -2805,54 +2727,6 @@ impl EnvelopeProcessorService { } } - fn handle_encode_metric_meta(&self, message: EncodeMetricMeta) { - #[cfg(feature = "processing")] - if self.inner.config.processing_enabled() { - return self.store_metric_meta(message); - } - - self.encode_metric_meta(message); - } - - fn encode_metric_meta(&self, message: EncodeMetricMeta) { - let EncodeMetricMeta { scoping, meta } = message; - - let upstream = self.inner.config.upstream_descriptor(); - let dsn = PartialDsn::outbound(&scoping, upstream); - - let mut item = Item::new(ItemType::MetricMeta); - item.set_payload(ContentType::Json, serde_json::to_vec(&meta).unwrap()); - let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn)); - envelope.add_item(item); - - let envelope = ManagedEnvelope::new( - envelope, - self.inner.addrs.outcome_aggregator.clone(), - self.inner.addrs.test_store.clone(), - ProcessingGroup::Metrics, - ); - self.handle_submit_envelope(SubmitEnvelope { - envelope: envelope.into_processed(), - }); - } - - #[cfg(feature = "processing")] - fn store_metric_meta(&self, message: EncodeMetricMeta) { - let EncodeMetricMeta { scoping, meta } = message; - - let Some(ref metric_meta_store) = self.inner.metric_meta_store else { - return; - }; - - let r = metric_meta_store.store(scoping.organization_id, scoping.project_id, meta); - if let Err(error) = r { - relay_log::error!( - error = &error as &dyn std::error::Error, - "failed to store metric meta in redis" - ) - } - } - #[cfg(all(test, feature = "processing"))] fn redis_rate_limiter_enabled(&self) -> bool { self.inner.rate_limiter.is_some() @@ -2873,9 +2747,7 @@ impl EnvelopeProcessorService { EnvelopeProcessor::ProcessBatchedMetrics(m) => { self.handle_process_batched_metrics(&mut cogs, *m) } - EnvelopeProcessor::ProcessMetricMeta(m) => self.handle_process_metric_meta(*m), EnvelopeProcessor::EncodeMetrics(m) => self.handle_encode_metrics(*m), - EnvelopeProcessor::EncodeMetricMeta(m) => self.handle_encode_metric_meta(*m), EnvelopeProcessor::SubmitEnvelope(m) => self.handle_submit_envelope(*m), EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m), } @@ -2887,7 +2759,6 @@ impl EnvelopeProcessorService { EnvelopeProcessor::ProcessEnvelope(v) => AppFeature::from(v.envelope.group()).into(), EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(), EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(), - EnvelopeProcessor::ProcessMetricMeta(_) => AppFeature::MetricMeta.into(), EnvelopeProcessor::EncodeMetrics(v) => v .scopes .values() @@ -2901,7 +2772,6 @@ impl EnvelopeProcessorService { } }) .fold(FeatureWeights::none(), FeatureWeights::merge), - EnvelopeProcessor::EncodeMetricMeta(_) => AppFeature::MetricMeta.into(), EnvelopeProcessor::SubmitEnvelope(v) => AppFeature::from(v.envelope.group()).into(), EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(), } diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 1759d0d078c..8f9b22ae142 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -435,7 +435,6 @@ fn is_duplicate(item: &Item, processing_enabled: bool) -> bool { ItemType::Sessions => false, ItemType::Statsd => false, ItemType::MetricBuckets => false, - ItemType::MetricMeta => false, ItemType::ClientReport => false, ItemType::Profile => false, ItemType::ReplayEvent => false, diff --git a/relay-server/src/services/projects/cache.rs b/relay-server/src/services/projects/cache.rs index 6bab22f4934..354a668af2f 100644 --- a/relay-server/src/services/projects/cache.rs +++ b/relay-server/src/services/projects/cache.rs @@ -18,7 +18,7 @@ use relay_base_schema::project::ProjectKey; #[cfg(feature = "processing")] use relay_config::RedisConfigRef; use relay_config::{Config, RelayMode}; -use relay_metrics::{Bucket, MetricMeta}; +use relay_metrics::Bucket; use relay_quotas::RateLimits; use relay_redis::RedisPool; use relay_statsd::metric; @@ -229,15 +229,6 @@ pub struct ProcessMetrics { pub sent_at: Option>, } -/// Add metric metadata to the aggregator. -#[derive(Debug)] -pub struct AddMetricMeta { - /// The project key. - pub project_key: ProjectKey, - /// The metadata. - pub meta: MetricMeta, -} - /// Updates the buffer index for [`ProjectKey`] with the [`QueueKey`] keys. /// /// This message is sent from the project buffer in case of the error while fetching the data from @@ -296,7 +287,6 @@ pub enum ProjectCache { ValidateEnvelope(ValidateEnvelope), UpdateRateLimits(UpdateRateLimits), ProcessMetrics(ProcessMetrics), - AddMetricMeta(AddMetricMeta), FlushBuckets(FlushBuckets), UpdateSpoolIndex(UpdateSpoolIndex), RefreshIndexCache(RefreshIndexCache), @@ -313,7 +303,6 @@ impl ProjectCache { Self::ValidateEnvelope(_) => "ValidateEnvelope", Self::UpdateRateLimits(_) => "UpdateRateLimits", Self::ProcessMetrics(_) => "ProcessMetrics", - Self::AddMetricMeta(_) => "AddMetricMeta", Self::FlushBuckets(_) => "FlushBuckets", Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", Self::RefreshIndexCache(_) => "RefreshIndexCache", @@ -399,14 +388,6 @@ impl FromMessage for ProjectCache { } } -impl FromMessage for ProjectCache { - type Response = relay_system::NoResponse; - - fn from_message(message: AddMetricMeta, _: ()) -> Self { - Self::AddMetricMeta(message) - } -} - impl FromMessage for ProjectCache { type Response = relay_system::NoResponse; @@ -748,14 +729,11 @@ impl ProjectCacheBroker { } = message; let project_cache = self.services.project_cache.clone(); - let envelope_processor = self.services.envelope_processor.clone(); - let old_state = self.get_or_create_project(project_key).update_state( - &project_cache, - state, - &envelope_processor, - no_cache, - ); + let old_state = + self.get_or_create_project(project_key) + .update_state(&project_cache, state, no_cache); + if let Some(old_state) = old_state { self.garbage_disposal.dispose(old_state); } @@ -1015,13 +993,6 @@ impl ProjectCacheBroker { self.services.envelope_processor.send(message); } - fn handle_add_metric_meta(&mut self, message: AddMetricMeta) { - let envelope_processor = self.services.envelope_processor.clone(); - - self.get_or_create_project(message.project_key) - .add_metric_meta(message.meta, envelope_processor); - } - fn handle_flush_buckets(&mut self, message: FlushBuckets) { let aggregator = self.services.aggregator.clone(); let project_cache = self.services.project_cache.clone(); @@ -1318,7 +1289,6 @@ impl ProjectCacheBroker { } ProjectCache::UpdateRateLimits(message) => self.handle_rate_limits(message), ProjectCache::ProcessMetrics(message) => self.handle_process_metrics(message), - ProjectCache::AddMetricMeta(message) => self.handle_add_metric_meta(message), ProjectCache::FlushBuckets(message) => self.handle_flush_buckets(message), ProjectCache::UpdateSpoolIndex(message) => self.handle_buffer_index(message), ProjectCache::RefreshIndexCache(message) => { diff --git a/relay-server/src/services/projects/project/mod.rs b/relay-server/src/services/projects/project/mod.rs index a553ebd89b1..cf4f387c9a8 100644 --- a/relay-server/src/services/projects/project/mod.rs +++ b/relay-server/src/services/projects/project/mod.rs @@ -4,7 +4,7 @@ use std::time::Duration; use relay_base_schema::project::ProjectKey; use relay_config::Config; use relay_dynamic_config::{ErrorBoundary, Feature}; -use relay_metrics::{Bucket, MetaAggregator, MetricMeta}; +use relay_metrics::Bucket; use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits, Scoping}; use relay_sampling::evaluation::ReservoirCounters; use relay_statsd::metric; @@ -15,7 +15,7 @@ use tokio::time::Instant; use crate::envelope::ItemType; use crate::services::metrics::{Aggregator, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome}; -use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor, ProcessProjectMetrics}; +use crate::services::processor::ProcessProjectMetrics; use crate::services::projects::cache::{ CheckedEnvelope, ProcessMetrics, ProjectCache, RequestUpdate, }; @@ -89,8 +89,6 @@ pub struct Project { rate_limits: CachedRateLimits, last_no_cache: Instant, reservoir_counters: ReservoirCounters, - metric_meta_aggregator: MetaAggregator, - has_pending_metric_meta: bool, } impl Project { @@ -106,8 +104,6 @@ impl Project { rate_limits: CachedRateLimits::new(), last_no_cache: Instant::now(), reservoir_counters: Arc::default(), - metric_meta_aggregator: MetaAggregator::new(config.metrics_meta_locations_max()), - has_pending_metric_meta: false, config, } } @@ -194,91 +190,6 @@ impl Project { )); } - pub fn add_metric_meta( - &mut self, - meta: MetricMeta, - envelope_processor: Addr, - ) { - // Only track metadata if custom metrics are enabled, or we don't know yet whether they are - // enabled. - let is_enabled = match self.current_state() { - ProjectState::Enabled(info) => info.has_feature(Feature::CustomMetrics), - ProjectState::Disabled => false, - ProjectState::Pending => true, - }; - - if !is_enabled { - relay_log::trace!("metric meta not enabled for project {}", self.project_key); - return; - } - - let Some(meta) = self.metric_meta_aggregator.add(self.project_key, meta) else { - // Nothing to do. Which means there is also no pending data. - relay_log::trace!("metric meta aggregator already has data, nothing to send upstream"); - return; - }; - - let scoping = self.scoping(); - match scoping { - Some(scoping) => { - // We can only have a scoping if we also have a state, which means at this point feature - // flags are already checked. - envelope_processor.send(EncodeMetricMeta { scoping, meta }) - } - None => self.has_pending_metric_meta = true, - } - } - - fn flush_metric_meta(&mut self, envelope_processor: &Addr) { - if !self.has_pending_metric_meta { - return; - } - let is_enabled = match self.current_state() { - ProjectState::Enabled(project_info) => project_info.has_feature(Feature::CustomMetrics), - ProjectState::Disabled => false, - ProjectState::Pending => { - // Cannot flush, wait for project state to be loaded. - return; - } - }; - - let Some(scoping) = self.scoping() else { - return; - }; - - // All relevant info has been gathered, consider us flushed. - self.has_pending_metric_meta = false; - - if !is_enabled { - relay_log::debug!( - "clearing metric meta aggregator, because project {} does not have feature flag enabled", - self.project_key, - ); - // Project/Org does not have the feature, forget everything. - self.metric_meta_aggregator.clear(self.project_key); - return; - } - - // Flush the entire aggregator containing all code locations for the project. - // - // There is the rare case when this flushes items which have already been flushed before. - // This happens only if we temporarily lose a previously valid and loaded project state - // and then reveive an update for the project state. - // While this is a possible occurence the effect should be relatively limited, - // especially since the final store does de-deuplication. - for meta in self - .metric_meta_aggregator - .get_all_relevant(self.project_key) - { - relay_log::debug!( - "flushing aggregated metric meta for project {}", - self.project_key - ); - metric!(counter(RelayCounters::ProjectStateFlushAllMetricMeta) += 1); - envelope_processor.send(EncodeMetricMeta { scoping, meta }); - } - } - /// Returns `true` if backoff expired and new attempt can be triggered. fn can_fetch(&self) -> bool { self.next_fetch_attempt @@ -438,7 +349,6 @@ impl Project { &mut self, project_cache: &Addr, state: ProjectFetchState, - envelope_processor: &Addr, no_cache: bool, ) -> Option { // Initiate the backoff if the incoming state is invalid. Reset it otherwise. @@ -491,7 +401,7 @@ impl Project { relay_log::debug!("project state {} updated", self.project_key); channel.inner.send(self.state.current_state(&self.config)); - self.after_state_updated(envelope_processor); + self.after_state_updated(); Some(old_state) } @@ -499,8 +409,7 @@ impl Project { /// Called after all state validations and after the project state is updated. /// /// See also: [`Self::update_state`]. - fn after_state_updated(&mut self, envelope_processor: &Addr) { - self.flush_metric_meta(envelope_processor); + fn after_state_updated(&mut self) { // Check if the new sampling config got rid of any reservoir rules we have counters for. self.remove_expired_reservoir_rules(); } @@ -692,7 +601,6 @@ mod tests { #[tokio::test] async fn test_stale_cache() { let (addr, _) = mock_service("project_cache", (), |&mut (), _| {}); - let (envelope_processor, _) = mock_service("envelope_processor", (), |&mut (), _| {}); let config = Arc::new( Config::from_json_value(json!( @@ -717,12 +625,7 @@ mod tests { assert!(project.next_fetch_attempt.is_none()); // Try to update project with errored project state. - project.update_state( - &addr, - ProjectFetchState::pending(), - &envelope_processor, - false, - ); + project.update_state(&addr, ProjectFetchState::pending(), false); // Since we got invalid project state we still keep the old one meaning there // still must be the project id set. assert!(matches!(project.current_state(), ProjectState::Enabled(_))); @@ -738,12 +641,7 @@ mod tests { // * without backoff it would just panic, not able to call the ProjectCache service let channel = StateChannel::new(); project.state_channel = Some(channel); - project.update_state( - &addr, - ProjectFetchState::pending(), - &envelope_processor, - false, - ); + project.update_state(&addr, ProjectFetchState::pending(), false); project.fetch_state(addr, false); } diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index a7b5adc66e6..8fe47474547 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -122,13 +122,11 @@ impl RelayStats { project_configs, cardinality, quotas, - misc, }) = self.redis_pools.as_ref() { Self::redis_pool(project_configs, "project_configs"); Self::redis_pool(cardinality, "cardinality"); Self::redis_pool(quotas, "quotas"); - Self::redis_pool(misc, "misc"); } } } diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 3526e30b0a7..deaed0bc572 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -727,11 +727,6 @@ pub enum RelayCounters { /// Failure can happen, for example, when there's a network error. Refer to /// [`UpstreamRequestError`](crate::services::upstream::UpstreamRequestError) for all cases. ProjectUpstreamFailed, - /// Number of full metric data flushes. - /// - /// A full flush takes all contained items of the aggregator and flushes them upstream, - /// at best this happens once per freshly loaded project. - ProjectStateFlushAllMetricMeta, /// Number of Relay server starts. /// /// This can be used to track unwanted restarts due to crashes or termination. @@ -816,8 +811,6 @@ pub enum RelayCounters { EvictingStaleProjectCaches, /// Number of times that parsing a metrics bucket item from an envelope failed. MetricBucketsParsingFailed, - /// Number of times that parsing a metric meta item from an envelope failed. - MetricMetaParsingFailed, /// Count extraction of transaction names. Tag with the decision to drop / replace / use original. MetricsTransactionNameExtracted, /// Number of Events with an OpenTelemetry Context @@ -876,7 +869,6 @@ impl CounterMetric for RelayCounters { RelayCounters::ProjectStateGet => "project_state.get", RelayCounters::ProjectStateRequest => "project_state.request", RelayCounters::ProjectStateNoCache => "project_state.no_cache", - RelayCounters::ProjectStateFlushAllMetricMeta => "project_state.flush_all_metric_meta", #[cfg(feature = "processing")] RelayCounters::ProjectStateRedis => "project_state.redis.requests", RelayCounters::ProjectUpstreamCompleted => "project_upstream.completed", @@ -893,7 +885,6 @@ impl CounterMetric for RelayCounters { RelayCounters::ResponsesStatusCodes => "responses.status_codes", RelayCounters::EvictingStaleProjectCaches => "project_cache.eviction", RelayCounters::MetricBucketsParsingFailed => "metrics.buckets.parsing_failed", - RelayCounters::MetricMetaParsingFailed => "metrics.meta.parsing_failed", RelayCounters::MetricsTransactionNameExtracted => "metrics.transaction_name", RelayCounters::OpenTelemetryEvent => "event.opentelemetry", RelayCounters::GlobalConfigFetched => "global_config.fetch", diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 2dabb2ed3e9..8db1f37a849 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -120,7 +120,6 @@ fn infer_event_category(item: &Item) -> Option { ItemType::Sessions => None, ItemType::Statsd => None, ItemType::MetricBuckets => None, - ItemType::MetricMeta => None, ItemType::FormData => None, ItemType::UserReport => None, ItemType::Profile => None, @@ -492,7 +491,6 @@ impl Enforcement { | ItemType::Sessions | ItemType::Statsd | ItemType::MetricBuckets - | ItemType::MetricMeta | ItemType::ClientReport | ItemType::UserReportV2 | ItemType::ProfileChunk diff --git a/relay-server/src/utils/sizes.rs b/relay-server/src/utils/sizes.rs index 725306d1d8a..4bb4b9d88dc 100644 --- a/relay-server/src/utils/sizes.rs +++ b/relay-server/src/utils/sizes.rs @@ -16,7 +16,6 @@ use crate::utils::{ItemAction, ManagedEnvelope}; /// - `max_check_in_size` /// - `max_event_size` /// - `max_metric_buckets_size` -/// - `max_metric_meta_size` /// - `max_profile_size` /// - `max_replay_compressed_size` /// - `max_session_count` @@ -62,7 +61,6 @@ pub fn check_envelope_size_limits(config: &Config, envelope: &Envelope) -> Resul ItemType::UserReport => NO_LIMIT, ItemType::Statsd => config.max_statsd_size(), ItemType::MetricBuckets => config.max_metric_buckets_size(), - ItemType::MetricMeta => config.max_metric_meta_size(), ItemType::Span | ItemType::OtelSpan => config.max_span_size(), ItemType::ProfileChunk => config.max_profile_size(), ItemType::Unknown(_) => NO_LIMIT, diff --git a/tests/integration/test_metric_meta.py b/tests/integration/test_metric_meta.py deleted file mode 100644 index 8dc0822008b..00000000000 --- a/tests/integration/test_metric_meta.py +++ /dev/null @@ -1,99 +0,0 @@ -import json -import time - -from sentry_sdk.envelope import Envelope, Item, PayloadRef -from datetime import datetime, timezone, time as dttime - - -def assert_location(expected, actual): - # Compare without the 'type' which is not persisted into storage. - assert {k: expected[k] for k in expected.keys() - {"type"}} == actual - - -def test_metric_meta(mini_sentry, redis_client, relay_with_processing): - project_id = 42 - now = datetime.now(tz=timezone.utc) - start_of_day = datetime.combine(now, dttime.min, tzinfo=timezone.utc) - - # Key magic number is the fnv32 hash from the MRI. - redis_key = "mm:l:{1}:42:2718098263:" + str(int(start_of_day.timestamp())) - # Clear just so there is no overlap with previous tests. - redis_client.delete(redis_key) - - relay = relay_with_processing() - - project_config = mini_sentry.add_full_project_config(project_id)["config"] - project_config.setdefault("features", []).append("organizations:custom-metrics") - - location1 = { - "type": "location", - "function": "_scan_for_suspect_projects", - "module": "sentry.tasks.low_priority_symbolication", - "filename": "sentry/tasks/low_priority_symbolication.py", - "abs_path": "/usr/src/sentry/src/sentry/tasks/low_priority_symbolication.py", - "lineno": 45, - } - location2 = { - "type": "location", - "function": "_scan_for_suspect_projects_the_second", - "module": "sentry.tasks.low_priority_symbolication", - "filename": "sentry/tasks/low_priority_symbolication.py", - "abs_path": "/usr/src/sentry/src/sentry/tasks/low_priority_symbolication.py", - "lineno": 120, - } - location3 = { - "type": "location", - "function": "_scan_for_suspect_projects_the_second", - "module": "sentry.tasks.low_priority_symbolication", - "filename": "sentry/tasks/low_priority_symbolication.py", - "abs_path": "/usr/src/sentry/src/sentry/tasks/low_priority_symbolication.py", - "lineno": 123, - } - - envelope = Envelope() - payload = { - "timestamp": now.isoformat(), - "mapping": { - "d:custom/sentry.process_profile.track_outcome@second": [ - location1, - location2, - ] - }, - } - envelope.add_item(Item(PayloadRef(json=payload), type="metric_meta")) - relay.send_envelope(project_id, envelope) - - time.sleep(1) - - stored = sorted( - (json.loads(v) for v in redis_client.smembers(redis_key)), - key=lambda x: x["lineno"], - ) - assert len(stored) == 2 - assert_location(location1, stored[0]) - assert_location(location2, stored[1]) - - # Resend one location and add a new location. - envelope = Envelope() - payload = { - "timestamp": now.isoformat(), - "mapping": { - "d:custom/sentry.process_profile.track_outcome@second": [ - location1, - location3, - ] - }, - } - envelope.add_item(Item(PayloadRef(json=payload), type="metric_meta")) - relay.send_envelope(project_id, envelope) - - time.sleep(1) - - stored = sorted( - (json.loads(v) for v in redis_client.smembers(redis_key)), - key=lambda x: x["lineno"], - ) - assert len(stored) == 3 - assert_location(location1, stored[0]) - assert_location(location2, stored[1]) - assert_location(location3, stored[2]) diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index c168a1b2e28..8ef8953408c 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -2,7 +2,6 @@ from collections import defaultdict from datetime import UTC, datetime, timedelta, timezone from pathlib import Path -from sentry_sdk.envelope import Envelope, Item, PayloadRef import json import signal import time @@ -134,52 +133,9 @@ def test_metrics_proxy_mode_statsd(mini_sentry, relay): relay.send_metrics(project_id, metrics_payload) envelope = mini_sentry.captured_events.get(timeout=3) assert len(envelope.items) == 1 - metric_meta_item = envelope.items[0] - assert metric_meta_item.type == "statsd" - assert metric_meta_item.get_bytes().decode() == metrics_payload - - -def test_metrics_proxy_mode_metrics_meta(mini_sentry, relay): - relay = relay( - mini_sentry, - options={ - "relay": {"mode": "proxy"}, - "aggregator": { - "bucket_interval": 1, - "initial_delay": 0, - "shift_key": "none", - }, - }, - ) - - location = { - "type": "location", - "function": "_scan_for_suspect_projects", - "module": "sentry.tasks.low_priority_symbolication", - "filename": "sentry/tasks/low_priority_symbolication.py", - "abs_path": "/usr/src/sentry/src/sentry/tasks/low_priority_symbolication.py", - "lineno": 45, - } - - meta_payload = { - "timestamp": datetime.now(tz=timezone.utc).isoformat(), - "mapping": { - "d:custom/sentry.process_profile.track_outcome@second": [ - location, - ] - }, - } - meta_payload = json.dumps(meta_payload, sort_keys=True) - - envelope = Envelope() - envelope.add_item(Item(PayloadRef(json=meta_payload), type="metric_meta")) - relay.send_envelope(42, envelope) - - envelope = mini_sentry.captured_events.get(timeout=3) - assert len(envelope.items) == 1 - metric_meta_item = envelope.items[0] - assert metric_meta_item.type == "metric_meta" - assert json.loads(metric_meta_item.get_bytes().decode()) == meta_payload + item = envelope.items[0] + assert item.type == "statsd" + assert item.get_bytes().decode() == metrics_payload def test_metrics(mini_sentry, relay): diff --git a/tests/integration/test_redis.py b/tests/integration/test_redis.py index c263d01ae64..8520e08b831 100644 --- a/tests/integration/test_redis.py +++ b/tests/integration/test_redis.py @@ -1,70 +1,6 @@ -import json -import time import uuid from sentry_sdk.envelope import Envelope, Item, PayloadRef -from datetime import datetime, timezone, time as dttime - - -def test_multi_write_redis_client_with_metric_meta( - mini_sentry, redis_client, secondary_redis_client, relay_with_processing -): - project_id = 42 - now = datetime.now(tz=timezone.utc) - start_of_day = datetime.combine(now, dttime.min, tzinfo=timezone.utc) - - # Key magic number is the fnv32 hash from the MRI. - redis_key = "mm:l:{1}:42:2718098263:" + str(int(start_of_day.timestamp())) - # Clear just so there is no overlap with previous tests. - redis_client.delete(redis_key) - - relay = relay_with_processing( - { - "processing": { - "redis": { - "configs": [ - {"server": "redis://127.0.0.1:6379"}, - # We want to test with multiple nesting levels to make sure the multi-write - # works nonetheless. - {"configs": ["redis://127.0.0.1:6380"]}, - ] - } - } - } - ) - - project_config = mini_sentry.add_full_project_config(project_id)["config"] - project_config.setdefault("features", []).append("organizations:custom-metrics") - - location = { - "type": "location", - "function": "_scan_for_suspect_projects", - "module": "sentry.tasks.low_priority_symbolication", - "filename": "sentry/tasks/low_priority_symbolication.py", - "abs_path": "/usr/src/sentry/src/sentry/tasks/low_priority_symbolication.py", - "lineno": 45, - } - - envelope = Envelope() - payload = { - "timestamp": now.isoformat(), - "mapping": { - "d:custom/sentry.process_profile.track_outcome@second": [ - location, - ] - }, - } - envelope.add_item(Item(PayloadRef(json=payload), type="metric_meta")) - relay.send_envelope(project_id, envelope) - - time.sleep(1) - - for client in [redis_client, secondary_redis_client]: - stored = sorted( - (json.loads(v) for v in client.smembers(redis_key)), - key=lambda x: x["lineno"], - ) - assert len(stored) == 1 def test_multi_write_redis_client_with_rate_limiting(