Skip to content

Commit

Permalink
replace cached rate limits inner with mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Oct 30, 2024
1 parent e459a30 commit b43c51f
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 47 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion relay-quotas/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ redis = ["dep:thiserror", "dep:relay-log", "relay-redis/impl"]
workspace = true

[dependencies]
arc-swap = { workspace = true }
hashbrown = { workspace = true }
relay-base-schema = { workspace = true }
relay-common = { workspace = true }
Expand Down
59 changes: 18 additions & 41 deletions relay-quotas/src/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Arc, Mutex, PoisonError};
use std::time::{Duration, Instant};

use arc_swap::ArcSwap;
use relay_base_schema::metrics::MetricNamespace;
use relay_base_schema::project::{ProjectId, ProjectKey};
use smallvec::SmallVec;
Expand Down Expand Up @@ -281,15 +280,14 @@ impl RateLimits {
}

/// Removes expired rate limits from this instance.
pub fn clean_expired(&mut self) {
let now = Instant::now();
pub fn clean_expired(&mut self, now: Instant) {
self.limits
.retain(|limit| !limit.retry_after.expired_at(now));
}

/// Checks whether any rate limits apply to the given scoping.
///
/// If no limits match, then the returned `RateLimits` instance evalutes `is_ok`. Otherwise, it
/// If no limits match, then the returned `RateLimits` instance evaluates `is_ok`. Otherwise, it
/// contains rate limits that match the given scoping.
pub fn check(&self, scoping: ItemScoping<'_>) -> Self {
self.check_with_quotas(&[], scoping)
Expand All @@ -300,7 +298,7 @@ impl RateLimits {
/// This is similar to `check`. Additionally, it checks for quotas with a static limit `0`, and
/// rejects items even if there is no active rate limit in this instance.
///
/// If no limits or quotas match, then the returned `RateLimits` instance evalutes `is_ok`.
/// If no limits or quotas match, then the returned `RateLimits` instance evaluates `is_ok`.
/// Otherwise, it contains rate limits that match the given scoping.
pub fn check_with_quotas<'a>(
&self,
Expand Down Expand Up @@ -341,7 +339,7 @@ impl RateLimits {

/// Returns `true` if there are any limits contained.
///
/// This is equavalent to checking whether [`Self::longest`] returns `Some`
/// This is equivalent to checking whether [`Self::longest`] returns `Some`
/// or [`Self::iter`] returns an iterator with at least one item.
pub fn is_empty(&self) -> bool {
self.limits.is_empty()
Expand Down Expand Up @@ -404,7 +402,7 @@ impl<'a> IntoIterator for &'a RateLimits {
///
/// The data structure makes sure no expired rate limits are enforced.
#[derive(Debug, Default)]
pub struct CachedRateLimits(ArcSwap<RateLimits>);
pub struct CachedRateLimits(Mutex<Arc<RateLimits>>);

impl CachedRateLimits {
/// Creates a new, empty instance without any rate limits enforced.
Expand All @@ -416,51 +414,30 @@ impl CachedRateLimits {
///
/// See also: [`RateLimits::add`].
pub fn add(&self, limit: RateLimit) {
self.0.rcu(|current| {
let mut current = current.as_ref().clone();
current.add(limit.clone());
current
});
let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
let current = Arc::make_mut(&mut inner);
current.add(limit);
}

/// Merges more rate limits into this instance.
///
/// See also: [`RateLimits::merge`].
pub fn merge(&self, limits: RateLimits) {
self.0.rcu(|current| {
let mut current = current.as_ref().clone();
for limit in limits.clone() {
current.add(limit)
}
current
});
let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
let current = Arc::make_mut(&mut inner);
for limit in limits {
current.add(limit)
}
}

/// Returns a reference to the contained rate limits.
///
/// This call gurantuess that at the time of call no returned rate limit is expired.
/// This call guarantees that at the time of call no returned rate limit is expired.
pub fn current_limits(&self) -> Arc<RateLimits> {
let now = Instant::now();

let mut current = self.0.load();
while current.iter().any(|rl| rl.retry_after.expired_at(now)) {
let new = {
let mut new = current.as_ref().clone();
new.clean_expired();
Arc::new(new)
};

let prev = self.0.compare_and_swap(&*current, Arc::clone(&new));

// If there was a swap, we know `new` is now stored and the most recent value.
if Arc::ptr_eq(&current, &prev) {
return new;
}

current = prev;
}

arc_swap::Guard::into_inner(current)
let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
Arc::make_mut(&mut inner).clean_expired(now);
Arc::clone(&inner)
}
}

Expand Down
2 changes: 1 addition & 1 deletion relay-sampling/src/evaluation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<'a> ReservoirEvaluator<'a> {
Arc::clone(&self.counters)
}

/// Sets the Redis pool and organiation ID for the [`ReservoirEvaluator`].
/// Sets the Redis pool and organization ID for the [`ReservoirEvaluator`].
///
/// These values are needed to synchronize with Redis.
#[cfg(feature = "redis")]
Expand Down
3 changes: 0 additions & 3 deletions relay-server/src/services/projects/cache/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,6 @@ impl SharedProjectState {
// reservoir counters did not change.
//
// `try_lock` to not potentially block, it's a best effort cleanup.
//
// TODO: Remove the lock, we already have interior mutability with the `ArcSwap`
// and the counters themselves can be atomics.
if let Ok(mut counters) = prev.reservoir_counters.try_lock() {
counters.retain(|key, _| config.rules.iter().any(|rule| rule.id == *key));
}
Expand Down

0 comments on commit b43c51f

Please sign in to comment.