Skip to content

Commit

Permalink
after rebase for project source move
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Oct 21, 2024
1 parent d5012d9 commit c5e9e80
Show file tree
Hide file tree
Showing 21 changed files with 1,302 additions and 1,469 deletions.
85 changes: 85 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ num-traits = "0.2.18"
num_cpus = "1.13.0"
once_cell = "1.13.1"
opentelemetry-proto = { version = "0.7.0", default-features = false }
papaya = "0.1.4"
parking_lot = "0.12.1"
path-slash = "0.2.1"
pest = "2.1.3"
Expand Down
8 changes: 8 additions & 0 deletions relay-dynamic-config/src/error_boundary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ impl<T> ErrorBoundary<T> {
}
}

/// Converts from `ErrorBoundary<T>` to `ErrorBoundary<&T>`.
pub fn as_ref(&self) -> ErrorBoundary<&T> {
match self {
Self::Ok(t) => ErrorBoundary::Ok(t),
Self::Err(e) => ErrorBoundary::Err(Arc::clone(e)),
}
}

/// Returns the contained [`Ok`] value or computes it from a closure.
#[inline]
pub fn unwrap_or_else<F>(self, op: F) -> T
Expand Down
1 change: 1 addition & 0 deletions relay-quotas/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ redis = ["dep:thiserror", "dep:relay-log", "relay-redis/impl"]
workspace = true

[dependencies]
arc-swap = { workspace = true }
hashbrown = { workspace = true }
relay-base-schema = { workspace = true }
relay-common = { workspace = true }
Expand Down
49 changes: 39 additions & 10 deletions relay-quotas/src/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};

use arc_swap::ArcSwap;
use relay_base_schema::metrics::MetricNamespace;
use relay_base_schema::project::{ProjectId, ProjectKey};
use smallvec::SmallVec;
Expand Down Expand Up @@ -402,7 +404,7 @@ impl<'a> IntoIterator for &'a RateLimits {
///
/// The data structure makes sure no expired rate limits are enforced.
#[derive(Debug, Default)]
pub struct CachedRateLimits(RateLimits);
pub struct CachedRateLimits(ArcSwap<RateLimits>);

impl CachedRateLimits {
/// Creates a new, empty instance without any rate limits enforced.
Expand All @@ -413,25 +415,52 @@ impl CachedRateLimits {
/// Adds a limit to this collection.
///
/// See also: [`RateLimits::add`].
pub fn add(&mut self, limit: RateLimit) {
self.0.add(limit);
pub fn add(&self, limit: RateLimit) {
self.0.rcu(|current| {
let mut current = current.as_ref().clone();
current.add(limit.clone());
current
});
}

/// Merges more rate limits into this instance.
///
/// See also: [`RateLimits::merge`].
pub fn merge(&mut self, rate_limits: RateLimits) {
for limit in rate_limits {
self.add(limit)
}
pub fn merge(&self, limits: RateLimits) {
self.0.rcu(|current| {
let mut current = current.as_ref().clone();
for limit in limits.clone() {
current.add(limit)
}
current
});
}

/// Returns a reference to the contained rate limits.
///
/// This call gurantuess that at the time of call no returned rate limit is expired.
pub fn current_limits(&mut self) -> &RateLimits {
self.0.clean_expired();
&self.0
pub fn current_limits(&self) -> Arc<RateLimits> {
let now = Instant::now();

let mut current = self.0.load();
while current.iter().any(|rl| rl.retry_after.expired_at(now)) {
let new = {
let mut new = current.as_ref().clone();
new.clean_expired();
Arc::new(new)
};

let prev = self.0.compare_and_swap(&*current, Arc::clone(&new));

// If there was a swap, we know `new` is now stored and the most recent value.
if Arc::ptr_eq(&current, &prev) {
return new;
}

current = prev;
}

arc_swap::Guard::into_inner(current)
}
}

Expand Down
1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ mime = { workspace = true }
minidump = { workspace = true, optional = true }
multer = { workspace = true }
once_cell = { workspace = true }
papaya = { workspace = true }
pin-project-lite = { workspace = true }
priority-queue = { workspace = true }
rand = { workspace = true }
Expand Down
13 changes: 6 additions & 7 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, I
use crate::service::ServiceState;
use crate::services::buffer::EnvelopeBuffer;
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{MetricData, ProcessingGroup};
use crate::services::projects::cache::{CheckEnvelope, ProcessMetrics, ValidateEnvelope};
use crate::services::processor::{MetricData, ProcessMetrics, ProcessingGroup};
use crate::services::projects::cache::ValidateEnvelope;
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope};

Expand Down Expand Up @@ -274,7 +274,7 @@ fn queue_envelope(

if !metric_items.is_empty() {
relay_log::trace!("sending metrics into processing queue");
state.project_cache().send(ProcessMetrics {
state.processor().send(ProcessMetrics {
data: MetricData::Raw(metric_items.into_vec()),
start_time: envelope.meta().start_time().into(),
sent_at: envelope.sent_at(),
Expand Down Expand Up @@ -367,10 +367,9 @@ pub async fn handle_envelope(
}

let checked = state
.project_cache()
.send(CheckEnvelope::new(managed_envelope))
.await
.map_err(|_| BadStoreRequest::ScheduleFailed)?
.project_cache_handle()
.get(managed_envelope.scoping().project_key)
.check_envelope(managed_envelope)
.map_err(BadStoreRequest::EventRejected)?;

let Some(mut managed_envelope) = checked.envelope else {
Expand Down
30 changes: 5 additions & 25 deletions relay-server/src/endpoints/project_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ struct GetProjectStatesRequest {
#[serde(default)]
full_config: bool,
#[serde(default)]
no_cache: bool,
#[serde(default)]
global: bool,
}

Expand Down Expand Up @@ -139,30 +137,9 @@ fn into_valid_keys(

async fn inner(
state: ServiceState,
Query(version): Query<VersionQuery>,
body: SignedJson<GetProjectStatesRequest>,
) -> Result<impl IntoResponse, ServiceUnavailable> {
let SignedJson { inner, relay } = body;
let project_cache = &state.project_cache().clone();

let no_cache = inner.no_cache;
let keys_len = inner.public_keys.len();

let mut futures: FuturesUnordered<_> = into_valid_keys(inner.public_keys, inner.revisions)
.map(|(project_key, revision)| async move {
let state_result = if version.version >= ENDPOINT_V3 && !no_cache {
project_cache
.send(GetCachedProjectState::new(project_key))
.await
} else {
project_cache
.send(GetProjectState::new(project_key).no_cache(no_cache))
.await
};

(project_key, revision, state_result)
})
.collect();

let (global, global_status) = if inner.global {
match state.global_config().send(global_config::Get).await? {
Expand All @@ -178,12 +155,15 @@ async fn inner(
(None, None)
};

let keys_len = inner.public_keys.len();
let mut pending = Vec::with_capacity(keys_len);
let mut unchanged = Vec::with_capacity(keys_len);
let mut configs = HashMap::with_capacity(keys_len);

while let Some((project_key, revision, state_result)) = futures.next().await {
let project_info = match state_result? {
for (project_key, revision) in into_valid_keys(inner.public_keys, inner.revisions) {
let project = state.project_cache_handle().get(project_key);

let project_info = match project.project_state() {
ProjectState::Enabled(info) => info,
ProjectState::Disabled => {
// Don't insert project config. Downstream Relay will consider it disabled.
Expand Down
Loading

0 comments on commit c5e9e80

Please sign in to comment.