Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(server): Move project source into the source module #4154

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 2 additions & 200 deletions relay-server/src/services/projects/cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::{BTreeMap, BTreeSet};
use std::convert::Infallible;
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -10,33 +9,23 @@ use crate::services::global_config;
use crate::services::processor::{
EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics,
};
use crate::services::projects::project::state::UpstreamProjectState;
use crate::Envelope;
use chrono::{DateTime, Utc};
use hashbrown::HashSet;
use relay_base_schema::project::ProjectKey;
#[cfg(feature = "processing")]
use relay_config::RedisConfigRef;
use relay_config::{Config, RelayMode};
use relay_config::Config;
use relay_metrics::{Bucket, MetricMeta};
use relay_quotas::RateLimits;
use relay_redis::RedisPool;
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, Interface, Sender, Service};
#[cfg(feature = "processing")]
use tokio::sync::Semaphore;
use tokio::sync::{mpsc, watch};
use tokio::time::Instant;

use crate::services::metrics::{Aggregator, FlushBuckets};
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::projects::project::{Project, ProjectFetchState, ProjectSender, ProjectState};
use crate::services::projects::source::local::{LocalProjectSource, LocalProjectSourceService};
#[cfg(feature = "processing")]
use crate::services::projects::source::redis::RedisProjectSource;
use crate::services::projects::source::upstream::{
UpstreamProjectSource, UpstreamProjectSourceService,
};
use crate::services::projects::source::ProjectSource;
use crate::services::spooler::{
self, Buffer, BufferService, DequeueMany, Enqueue, QueueKey, RemoveMany, RestoreIndex,
UnspooledEnvelope, BATCH_KEY_COUNT,
Expand All @@ -47,10 +36,6 @@ use crate::services::upstream::UpstreamRelay;
use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers};
use crate::utils::{GarbageDisposal, ManagedEnvelope, MemoryChecker, RetryBackoff, SleepHandle};

/// Default value of maximum connections to Redis. This value was arbitrarily determined.
#[cfg(feature = "processing")]
const DEFAULT_REDIS_MAX_CONNECTIONS: u32 = 10;

/// Requests a refresh of a project state from one of the available sources.
///
/// The project state is resolved in the following precedence:
Expand Down Expand Up @@ -424,159 +409,6 @@ impl FromMessage<UpdateProject> for ProjectCache {
}
}

/// Helper type that contains all configured sources for project cache fetching.
///
/// See [`RequestUpdate`] for a description on how project states are fetched.
#[derive(Clone, Debug)]
struct ProjectSource {
config: Arc<Config>,
local_source: Addr<LocalProjectSource>,
upstream_source: Addr<UpstreamProjectSource>,
#[cfg(feature = "processing")]
redis_source: Option<RedisProjectSource>,
#[cfg(feature = "processing")]
redis_semaphore: Arc<Semaphore>,
}

impl ProjectSource {
/// Starts all project source services in the current runtime.
pub fn start(
config: Arc<Config>,
upstream_relay: Addr<UpstreamRelay>,
_redis: Option<RedisPool>,
) -> Self {
let local_source = LocalProjectSourceService::new(config.clone()).start();
let upstream_source =
UpstreamProjectSourceService::new(config.clone(), upstream_relay).start();

#[cfg(feature = "processing")]
let redis_max_connections = config
.redis()
.map(|configs| {
let config = match configs {
relay_config::RedisPoolConfigs::Unified(config) => config,
relay_config::RedisPoolConfigs::Individual {
project_configs: config,
..
} => config,
};
Self::compute_max_connections(config).unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS)
})
.unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS);
#[cfg(feature = "processing")]
let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool));

Self {
config,
local_source,
upstream_source,
#[cfg(feature = "processing")]
redis_source,
#[cfg(feature = "processing")]
redis_semaphore: Arc::new(Semaphore::new(redis_max_connections.try_into().unwrap())),
}
}

#[cfg(feature = "processing")]
fn compute_max_connections(config: RedisConfigRef) -> Option<u32> {
match config {
RedisConfigRef::Cluster { options, .. } => Some(options.max_connections),
RedisConfigRef::MultiWrite { configs } => configs
.into_iter()
.filter_map(|c| Self::compute_max_connections(c))
.max(),
RedisConfigRef::Single { options, .. } => Some(options.max_connections),
}
}

async fn fetch(
self,
project_key: ProjectKey,
no_cache: bool,
cached_state: ProjectFetchState,
) -> Result<ProjectFetchState, ProjectSourceError> {
let state_opt = self
.local_source
.send(FetchOptionalProjectState { project_key })
.await?;

if let Some(state) = state_opt {
return Ok(ProjectFetchState::new(state));
}

match self.config.relay_mode() {
RelayMode::Proxy => return Ok(ProjectFetchState::allowed()),
RelayMode::Static => return Ok(ProjectFetchState::disabled()),
RelayMode::Capture => return Ok(ProjectFetchState::allowed()),
RelayMode::Managed => (), // Proceed with loading the config from redis or upstream
}

let current_revision = cached_state.revision().map(String::from);
#[cfg(feature = "processing")]
if let Some(redis_source) = self.redis_source {
let current_revision = current_revision.clone();

let redis_permit = self.redis_semaphore.acquire().await?;
let state_fetch_result = tokio::task::spawn_blocking(move || {
redis_source.get_config_if_changed(project_key, current_revision.as_deref())
})
.await?;
drop(redis_permit);

match state_fetch_result {
// New state fetched from Redis, possibly pending.
Ok(UpstreamProjectState::New(state)) => {
let state = state.sanitized();
if !state.is_pending() {
return Ok(ProjectFetchState::new(state));
}
}
// Redis reported that we're holding an up-to-date version of the state already,
// refresh the state and return the old cached state again.
Ok(UpstreamProjectState::NotModified) => {
return Ok(ProjectFetchState::refresh(cached_state))
}
Err(error) => {
relay_log::error!(
error = &error as &dyn Error,
"failed to fetch project from Redis",
);
}
};
};

let state = self
.upstream_source
.send(FetchProjectState {
project_key,
current_revision,
no_cache,
})
.await?;

match state {
UpstreamProjectState::New(state) => Ok(ProjectFetchState::new(state.sanitized())),
UpstreamProjectState::NotModified => Ok(ProjectFetchState::refresh(cached_state)),
}
}
}

#[derive(Debug, thiserror::Error)]
enum ProjectSourceError {
#[error("redis permit error {0}")]
RedisPermit(#[from] tokio::sync::AcquireError),
#[error("redis join error {0}")]
RedisJoin(#[from] tokio::task::JoinError),
#[error("upstream error {0}")]
Upstream(#[from] relay_system::SendError),
}

impl From<Infallible> for ProjectSourceError {
fn from(value: Infallible) -> Self {
match value {}
}
}

/// Updates the cache with new project state information.
struct UpdateProjectState {
/// The public key to fetch the project by.
Expand Down Expand Up @@ -1528,36 +1360,6 @@ impl Service for ProjectCacheService {
}
}

#[derive(Clone, Debug)]
pub struct FetchProjectState {
/// The public key to fetch the project by.
pub project_key: ProjectKey,

/// Currently cached revision if available.
///
/// The upstream is allowed to omit full project configs
/// for requests for which the requester already has the most
/// recent revision.
///
/// Settings this to `None` will essentially always re-fetch
/// the project config.
pub current_revision: Option<String>,

/// If true, all caches should be skipped and a fresh state should be computed.
pub no_cache: bool,
}

#[derive(Clone, Debug)]
pub struct FetchOptionalProjectState {
project_key: ProjectKey,
}

impl FetchOptionalProjectState {
pub fn project_key(&self) -> ProjectKey {
self.project_key
}
}

/// Sum type for all objects which need to be discareded through the [`GarbageDisposal`].
#[derive(Debug)]
#[allow(dead_code)] // Fields are never read, only used for discarding/dropping data.
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/projects/source/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use relay_system::{AsyncResponse, FromMessage, Interface, Receiver, Sender, Serv
use tokio::sync::mpsc;
use tokio::time::Instant;

use crate::services::projects::cache::FetchOptionalProjectState;
use crate::services::projects::project::{ParsedProjectState, ProjectState};
use crate::services::projects::source::FetchOptionalProjectState;

/// Service interface of the local project source.
#[derive(Debug)]
Expand Down
Loading
Loading