Skip to content

Commit

Permalink
ref(server): Move project source into the source module
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Oct 18, 2024
1 parent a100236 commit 790cc6a
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 202 deletions.
202 changes: 2 additions & 200 deletions relay-server/src/services/projects/cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::{BTreeMap, BTreeSet};
use std::convert::Infallible;
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -10,33 +9,23 @@ use crate::services::global_config;
use crate::services::processor::{
EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics,
};
use crate::services::projects::project::state::UpstreamProjectState;
use crate::Envelope;
use chrono::{DateTime, Utc};
use hashbrown::HashSet;
use relay_base_schema::project::ProjectKey;
#[cfg(feature = "processing")]
use relay_config::RedisConfigRef;
use relay_config::{Config, RelayMode};
use relay_config::Config;
use relay_metrics::{Bucket, MetricMeta};
use relay_quotas::RateLimits;
use relay_redis::RedisPool;
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, Interface, Sender, Service};
#[cfg(feature = "processing")]
use tokio::sync::Semaphore;
use tokio::sync::{mpsc, watch};
use tokio::time::Instant;

use crate::services::metrics::{Aggregator, FlushBuckets};
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::projects::project::{Project, ProjectFetchState, ProjectSender, ProjectState};
use crate::services::projects::source::local::{LocalProjectSource, LocalProjectSourceService};
#[cfg(feature = "processing")]
use crate::services::projects::source::redis::RedisProjectSource;
use crate::services::projects::source::upstream::{
UpstreamProjectSource, UpstreamProjectSourceService,
};
use crate::services::projects::source::ProjectSource;
use crate::services::spooler::{
self, Buffer, BufferService, DequeueMany, Enqueue, QueueKey, RemoveMany, RestoreIndex,
UnspooledEnvelope, BATCH_KEY_COUNT,
Expand All @@ -47,10 +36,6 @@ use crate::services::upstream::UpstreamRelay;
use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers};
use crate::utils::{GarbageDisposal, ManagedEnvelope, MemoryChecker, RetryBackoff, SleepHandle};

/// Default value of maximum connections to Redis. This value was arbitrarily determined.
#[cfg(feature = "processing")]
const DEFAULT_REDIS_MAX_CONNECTIONS: u32 = 10;

/// Requests a refresh of a project state from one of the available sources.
///
/// The project state is resolved in the following precedence:
Expand Down Expand Up @@ -424,159 +409,6 @@ impl FromMessage<UpdateProject> for ProjectCache {
}
}

/// Helper type that contains all configured sources for project cache fetching.
///
/// See [`RequestUpdate`] for a description on how project states are fetched.
#[derive(Clone, Debug)]
struct ProjectSource {
config: Arc<Config>,
local_source: Addr<LocalProjectSource>,
upstream_source: Addr<UpstreamProjectSource>,
#[cfg(feature = "processing")]
redis_source: Option<RedisProjectSource>,
#[cfg(feature = "processing")]
redis_semaphore: Arc<Semaphore>,
}

impl ProjectSource {
/// Starts all project source services in the current runtime.
pub fn start(
config: Arc<Config>,
upstream_relay: Addr<UpstreamRelay>,
_redis: Option<RedisPool>,
) -> Self {
let local_source = LocalProjectSourceService::new(config.clone()).start();
let upstream_source =
UpstreamProjectSourceService::new(config.clone(), upstream_relay).start();

#[cfg(feature = "processing")]
let redis_max_connections = config
.redis()
.map(|configs| {
let config = match configs {
relay_config::RedisPoolConfigs::Unified(config) => config,
relay_config::RedisPoolConfigs::Individual {
project_configs: config,
..
} => config,
};
Self::compute_max_connections(config).unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS)
})
.unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS);
#[cfg(feature = "processing")]
let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool));

Self {
config,
local_source,
upstream_source,
#[cfg(feature = "processing")]
redis_source,
#[cfg(feature = "processing")]
redis_semaphore: Arc::new(Semaphore::new(redis_max_connections.try_into().unwrap())),
}
}

#[cfg(feature = "processing")]
fn compute_max_connections(config: RedisConfigRef) -> Option<u32> {
match config {
RedisConfigRef::Cluster { options, .. } => Some(options.max_connections),
RedisConfigRef::MultiWrite { configs } => configs
.into_iter()
.filter_map(|c| Self::compute_max_connections(c))
.max(),
RedisConfigRef::Single { options, .. } => Some(options.max_connections),
}
}

async fn fetch(
self,
project_key: ProjectKey,
no_cache: bool,
cached_state: ProjectFetchState,
) -> Result<ProjectFetchState, ProjectSourceError> {
let state_opt = self
.local_source
.send(FetchOptionalProjectState { project_key })
.await?;

if let Some(state) = state_opt {
return Ok(ProjectFetchState::new(state));
}

match self.config.relay_mode() {
RelayMode::Proxy => return Ok(ProjectFetchState::allowed()),
RelayMode::Static => return Ok(ProjectFetchState::disabled()),
RelayMode::Capture => return Ok(ProjectFetchState::allowed()),
RelayMode::Managed => (), // Proceed with loading the config from redis or upstream
}

let current_revision = cached_state.revision().map(String::from);
#[cfg(feature = "processing")]
if let Some(redis_source) = self.redis_source {
let current_revision = current_revision.clone();

let redis_permit = self.redis_semaphore.acquire().await?;
let state_fetch_result = tokio::task::spawn_blocking(move || {
redis_source.get_config_if_changed(project_key, current_revision.as_deref())
})
.await?;
drop(redis_permit);

match state_fetch_result {
// New state fetched from Redis, possibly pending.
Ok(UpstreamProjectState::New(state)) => {
let state = state.sanitized();
if !state.is_pending() {
return Ok(ProjectFetchState::new(state));
}
}
// Redis reported that we're holding an up-to-date version of the state already,
// refresh the state and return the old cached state again.
Ok(UpstreamProjectState::NotModified) => {
return Ok(ProjectFetchState::refresh(cached_state))
}
Err(error) => {
relay_log::error!(
error = &error as &dyn Error,
"failed to fetch project from Redis",
);
}
};
};

let state = self
.upstream_source
.send(FetchProjectState {
project_key,
current_revision,
no_cache,
})
.await?;

match state {
UpstreamProjectState::New(state) => Ok(ProjectFetchState::new(state.sanitized())),
UpstreamProjectState::NotModified => Ok(ProjectFetchState::refresh(cached_state)),
}
}
}

#[derive(Debug, thiserror::Error)]
enum ProjectSourceError {
#[error("redis permit error {0}")]
RedisPermit(#[from] tokio::sync::AcquireError),
#[error("redis join error {0}")]
RedisJoin(#[from] tokio::task::JoinError),
#[error("upstream error {0}")]
Upstream(#[from] relay_system::SendError),
}

impl From<Infallible> for ProjectSourceError {
fn from(value: Infallible) -> Self {
match value {}
}
}

/// Updates the cache with new project state information.
struct UpdateProjectState {
/// The public key to fetch the project by.
Expand Down Expand Up @@ -1528,36 +1360,6 @@ impl Service for ProjectCacheService {
}
}

#[derive(Clone, Debug)]
pub struct FetchProjectState {
/// The public key to fetch the project by.
pub project_key: ProjectKey,

/// Currently cached revision if available.
///
/// The upstream is allowed to omit full project configs
/// for requests for which the requester already has the most
/// recent revision.
///
/// Settings this to `None` will essentially always re-fetch
/// the project config.
pub current_revision: Option<String>,

/// If true, all caches should be skipped and a fresh state should be computed.
pub no_cache: bool,
}

#[derive(Clone, Debug)]
pub struct FetchOptionalProjectState {
project_key: ProjectKey,
}

impl FetchOptionalProjectState {
pub fn project_key(&self) -> ProjectKey {
self.project_key
}
}

/// Sum type for all objects which need to be discareded through the [`GarbageDisposal`].
#[derive(Debug)]
#[allow(dead_code)] // Fields are never read, only used for discarding/dropping data.
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/projects/source/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use relay_system::{AsyncResponse, FromMessage, Interface, Receiver, Sender, Serv
use tokio::sync::mpsc;
use tokio::time::Instant;

use crate::services::projects::cache::FetchOptionalProjectState;
use crate::services::projects::project::{ParsedProjectState, ProjectState};
use crate::services::projects::source::FetchOptionalProjectState;

/// Service interface of the local project source.
#[derive(Debug)]
Expand Down
Loading

0 comments on commit 790cc6a

Please sign in to comment.