Skip to content

Commit

Permalink
move source out
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Oct 18, 2024
1 parent a4775d9 commit c2bf66a
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 223 deletions.
227 changes: 8 additions & 219 deletions relay-server/src/services/projects/cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::{BTreeMap, BTreeSet};
use std::convert::Infallible;
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -8,34 +7,22 @@ use crate::extractors::RequestMeta;
use crate::services::buffer::{EnvelopeBuffer, EnvelopeBufferError};
use crate::services::global_config;
use crate::services::processor::{
EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics,
EncodeMetrics, EnvelopeProcessor, ProcessEnvelope, ProcessingGroup, ProjectMetrics,
};
use crate::services::projects::cache2::{CheckedEnvelope, ProjectCacheHandle};
use crate::services::projects::project::state::UpstreamProjectState;
use crate::Envelope;
use chrono::{DateTime, Utc};
use hashbrown::HashSet;
use relay_base_schema::project::ProjectKey;
#[cfg(feature = "processing")]
use relay_config::RedisConfigRef;
use relay_config::{Config, RelayMode};
use relay_metrics::Bucket;
use relay_redis::RedisPool;
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, Interface, Service};
#[cfg(feature = "processing")]
use tokio::sync::Semaphore;
use tokio::sync::{mpsc, watch};

use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets};
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::projects::project::{ProjectFetchState, ProjectState};
use crate::services::projects::source::local::{LocalProjectSource, LocalProjectSourceService};
#[cfg(feature = "processing")]
use crate::services::projects::source::redis::RedisProjectSource;
use crate::services::projects::source::upstream::{
UpstreamProjectSource, UpstreamProjectSourceService,
};
use crate::services::spooler::{
self, Buffer, BufferService, DequeueMany, Enqueue, QueueKey, RemoveMany, RestoreIndex,
UnspooledEnvelope, BATCH_KEY_COUNT,
Expand All @@ -46,10 +33,6 @@ use crate::services::upstream::UpstreamRelay;
use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers};
use crate::utils::{GarbageDisposal, ManagedEnvelope, MemoryChecker, RetryBackoff, SleepHandle};

/// Default value of maximum connections to Redis. This value was arbitrarily determined.
#[cfg(feature = "processing")]
const DEFAULT_REDIS_MAX_CONNECTIONS: u32 = 10;

/// Validates the envelope against project configuration and rate limits.
///
/// This ensures internally that the project state is up to date and then runs the same checks as
Expand Down Expand Up @@ -197,159 +180,6 @@ impl FromMessage<FlushBuckets> for ProjectCache {
}
}

/// Helper type that contains all configured sources for project cache fetching.
///
/// See [`RequestUpdate`] for a description on how project states are fetched.
#[derive(Clone, Debug)]
pub struct ProjectSource {
config: Arc<Config>,
local_source: Addr<LocalProjectSource>,
upstream_source: Addr<UpstreamProjectSource>,
#[cfg(feature = "processing")]
redis_source: Option<RedisProjectSource>,
#[cfg(feature = "processing")]
redis_semaphore: Arc<Semaphore>,
}

impl ProjectSource {
/// Starts all project source services in the current runtime.
pub fn start(
config: Arc<Config>,
upstream_relay: Addr<UpstreamRelay>,
_redis: Option<RedisPool>,
) -> Self {
let local_source = LocalProjectSourceService::new(config.clone()).start();
let upstream_source =
UpstreamProjectSourceService::new(config.clone(), upstream_relay).start();

#[cfg(feature = "processing")]
let redis_max_connections = config
.redis()
.map(|configs| {
let config = match configs {
relay_config::RedisPoolConfigs::Unified(config) => config,
relay_config::RedisPoolConfigs::Individual {
project_configs: config,
..
} => config,
};
Self::compute_max_connections(config).unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS)
})
.unwrap_or(DEFAULT_REDIS_MAX_CONNECTIONS);
#[cfg(feature = "processing")]
let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool));

Self {
config,
local_source,
upstream_source,
#[cfg(feature = "processing")]
redis_source,
#[cfg(feature = "processing")]
redis_semaphore: Arc::new(Semaphore::new(redis_max_connections.try_into().unwrap())),
}
}

#[cfg(feature = "processing")]
fn compute_max_connections(config: RedisConfigRef) -> Option<u32> {
match config {
RedisConfigRef::Cluster { options, .. } => Some(options.max_connections),
RedisConfigRef::MultiWrite { configs } => configs
.into_iter()
.filter_map(|c| Self::compute_max_connections(c))
.max(),
RedisConfigRef::Single { options, .. } => Some(options.max_connections),
}
}

pub async fn fetch(
self,
project_key: ProjectKey,
no_cache: bool,
cached_state: ProjectFetchState,
) -> Result<ProjectFetchState, ProjectSourceError> {
let state_opt = self
.local_source
.send(FetchOptionalProjectState { project_key })
.await?;

if let Some(state) = state_opt {
return Ok(ProjectFetchState::new(state));
}

match self.config.relay_mode() {
RelayMode::Proxy => return Ok(ProjectFetchState::allowed()),
RelayMode::Static => return Ok(ProjectFetchState::disabled()),
RelayMode::Capture => return Ok(ProjectFetchState::allowed()),
RelayMode::Managed => (), // Proceed with loading the config from redis or upstream
}

let current_revision = cached_state.revision().map(String::from);
#[cfg(feature = "processing")]
if let Some(redis_source) = self.redis_source {
let current_revision = current_revision.clone();

let redis_permit = self.redis_semaphore.acquire().await?;
let state_fetch_result = tokio::task::spawn_blocking(move || {
redis_source.get_config_if_changed(project_key, current_revision.as_deref())
})
.await?;
drop(redis_permit);

match state_fetch_result {
// New state fetched from Redis, possibly pending.
Ok(UpstreamProjectState::New(state)) => {
let state = state.sanitized();
if !state.is_pending() {
return Ok(ProjectFetchState::new(state));
}
}
// Redis reported that we're holding an up-to-date version of the state already,
// refresh the state and return the old cached state again.
Ok(UpstreamProjectState::NotModified) => {
return Ok(ProjectFetchState::refresh(cached_state))
}
Err(error) => {
relay_log::error!(
error = &error as &dyn Error,
"failed to fetch project from Redis",
);
}
};
};

let state = self
.upstream_source
.send(FetchProjectState {
project_key,
current_revision,
no_cache,
})
.await?;

match state {
UpstreamProjectState::New(state) => Ok(ProjectFetchState::new(state.sanitized())),
UpstreamProjectState::NotModified => Ok(ProjectFetchState::refresh(cached_state)),
}
}
}

#[derive(Debug, thiserror::Error)]
enum ProjectSourceError {
#[error("redis permit error {0}")]
RedisPermit(#[from] tokio::sync::AcquireError),
#[error("redis join error {0}")]
RedisJoin(#[from] tokio::task::JoinError),
#[error("upstream error {0}")]
Upstream(#[from] relay_system::SendError),
}

impl From<Infallible> for ProjectSourceError {
fn from(value: Infallible) -> Self {
match value {}
}
}

/// Updates the cache with new project state information.
struct UpdateProjectState {
/// The public key to fetch the project by.
Expand Down Expand Up @@ -388,8 +218,6 @@ struct ProjectCacheBroker {
projects: ProjectCacheHandle,
/// Utility for disposing of expired project data in a background thread.
garbage_disposal: GarbageDisposal<ProjectGarbage>,
/// Source for fetching project states from the upstream or from disk.
source: ProjectSource,

/// Handle to schedule periodic unspooling of buffered envelopes (spool V1).
spool_v1_unspool_handle: SleepHandle,
Expand Down Expand Up @@ -967,18 +795,11 @@ impl Service for ProjectCacheService {
}),
};

// Main broker that serializes public and internal messages, and triggers project state
// fetches via the project source.
let mut broker = ProjectCacheBroker {
config: config.clone(),
memory_checker,
projects: todo!(),
garbage_disposal: GarbageDisposal::new(),
source: ProjectSource::start(
config.clone(),
services.upstream_relay.clone(),
redis,
),
services,
spool_v1_unspool_handle: SleepHandle::idle(),
spool_v1,
Expand Down Expand Up @@ -1030,36 +851,6 @@ impl Service for ProjectCacheService {
}
}

#[derive(Clone, Debug)]
pub struct FetchProjectState {
/// The public key to fetch the project by.
pub project_key: ProjectKey,

/// Currently cached revision if available.
///
/// The upstream is allowed to omit full project configs
/// for requests for which the requester already has the most
/// recent revision.
///
/// Settings this to `None` will essentially always re-fetch
/// the project config.
pub current_revision: Option<String>,

/// If true, all caches should be skipped and a fresh state should be computed.
pub no_cache: bool,
}

#[derive(Clone, Debug)]
pub struct FetchOptionalProjectState {
project_key: ProjectKey,
}

impl FetchOptionalProjectState {
pub fn project_key(&self) -> ProjectKey {
self.project_key
}
}

/// Sum type for all objects which need to be discareded through the [`GarbageDisposal`].
#[derive(Debug)]
#[allow(dead_code)] // Fields are never read, only used for discarding/dropping data.
Expand Down Expand Up @@ -1113,7 +904,6 @@ mod tests {

async fn project_cache_broker_setup(
services: Services,
state_tx: mpsc::UnboundedSender<UpdateProjectState>,
buffer_tx: mpsc::UnboundedSender<UnspooledEnvelope>,
) -> (ProjectCacheBroker, Addr<Buffer>) {
let config: Arc<_> = Config::from_json_value(serde_json::json!({
Expand Down Expand Up @@ -1154,9 +944,7 @@ mod tests {
memory_checker,
projects: todo!(),
garbage_disposal: GarbageDisposal::new(),
source: ProjectSource::start(config, services.upstream_relay.clone(), None),
services,
state_tx,
spool_v1_unspool_handle: SleepHandle::idle(),
spool_v1: Some(SpoolV1 {
buffer_tx,
Expand All @@ -1175,10 +963,9 @@ mod tests {
relay_log::init_test!();

let services = mocked_services();
let (state_tx, _) = mpsc::unbounded_channel();
let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel();
let (mut broker, _buffer_svc) =
project_cache_broker_setup(services.clone(), state_tx, buffer_tx).await;
project_cache_broker_setup(services.clone(), buffer_tx).await;

broker.global_config = GlobalConfigStatus::Ready;
let (tx_update, mut rx_update) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -1252,10 +1039,9 @@ mod tests {
#[tokio::test]
async fn handle_processing_without_project() {
let services = mocked_services();
let (state_tx, _) = mpsc::unbounded_channel();
let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel();
let (mut broker, buffer_svc) =
project_cache_broker_setup(services.clone(), state_tx, buffer_tx.clone()).await;
project_cache_broker_setup(services.clone(), buffer_tx.clone()).await;

let dsn = "111d836b15bb49d7bbf99e64295d995b";
let project_key = ProjectKey::parse(dsn).unwrap();
Expand All @@ -1271,15 +1057,18 @@ mod tests {
);

// Index and projects are empty.
assert!(broker.projects.is_empty());
assert!(broker.spool_v1.as_mut().unwrap().index.is_empty());

// Since there is no project we should not process anything but create a project and spool
// the envelope.
broker.handle_processing(key, envelope);

// Assert that we have a new project and also added an index.
assert!(broker.projects.get(&project_key).is_some());
assert!(!broker
.projects
.get(project_key)
.project_state()
.is_pending());
assert!(broker.spool_v1.as_mut().unwrap().index.contains(&key));

// Check is we actually spooled anything.
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/projects/cache2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use relay_config::Config;
use tokio::sync::mpsc;

use crate::services::buffer::EnvelopeBuffer;
use crate::services::projects::cache::ProjectSource;
use crate::services::projects::cache2::state::{CompletedFetch, Fetch};
use crate::services::projects::project::{ProjectFetchState, ProjectState};
use crate::services::projects::source::ProjectSource;

pub enum ProjectCache {
Fetch(ProjectKey),
Expand Down
Loading

0 comments on commit c2bf66a

Please sign in to comment.