Skip to content

Commit

Permalink
it compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Oct 22, 2024
1 parent 5c09bff commit 44489a1
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 122 deletions.
3 changes: 1 addition & 2 deletions relay-server/src/endpoints/batch_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use serde::{Deserialize, Serialize};

use crate::extractors::{SignedBytes, StartTime};
use crate::service::ServiceState;
use crate::services::processor::ProcessBatchedMetrics;
use crate::services::projects::cache::BucketSource;
use crate::services::processor::{BucketSource, ProcessBatchedMetrics};

#[derive(Debug, Serialize, Deserialize)]
struct SendMetricsResponse {}
Expand Down
6 changes: 4 additions & 2 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::service::ServiceState;
use crate::services::buffer::EnvelopeBuffer;
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{MetricData, ProcessMetrics, ProcessingGroup};
use crate::services::projects::cache::ValidateEnvelope;
use crate::services::projects::cache::legacy::ValidateEnvelope;
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope};

Expand Down Expand Up @@ -312,7 +312,9 @@ fn queue_envelope(
}
None => {
relay_log::trace!("Sending envelope to project cache for V1 buffer");
state.project_cache().send(ValidateEnvelope::new(envelope));
state
.legacy_project_cache()
.send(ValidateEnvelope::new(envelope));
}
}
}
Expand Down
48 changes: 31 additions & 17 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ use crate::services::metrics::{Aggregator, RouterService};
use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
use crate::services::outcome_aggregator::OutcomeAggregator;
use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService};
use crate::services::projects::cache::{ProjectCache, ProjectCacheService, Services};
use crate::services::projects::cache2::ProjectCacheHandle;
use crate::services::projects::cache::{
legacy, ProjectCache, ProjectCacheHandle, ProjectCacheService,
};
use crate::services::projects::source::ProjectSource;
use crate::services::relays::{RelayCache, RelayCacheService};
use crate::services::stats::RelayStats;
#[cfg(feature = "processing")]
Expand Down Expand Up @@ -62,6 +64,7 @@ pub struct Registry {
pub test_store: Addr<TestStore>,
pub relay_cache: Addr<RelayCache>,
pub global_config: Addr<GlobalConfigManager>,
pub legacy_project_cache: Addr<legacy::ProjectCache>,
pub project_cache: Addr<ProjectCache>,
pub upstream_relay: Addr<UpstreamRelay>,
pub envelope_buffer: Option<ObservableEnvelopeBuffer>,
Expand Down Expand Up @@ -189,12 +192,23 @@ impl ServiceState {
// service fail if the service is not running.
let global_config = global_config.start();

let (project_cache, project_cache_rx) = channel(ProjectCacheService::name());
let (legacy_project_cache, legacy_project_cache_rx) =
channel(legacy::ProjectCacheService::name());

let project_source = ProjectSource::start(
Arc::clone(&config),
upstream_relay.clone(),
redis_pools
.as_ref()
.map(|pools| pools.project_configs.clone()),
);
let (project_cache, project_cache_handle) =
ProjectCacheService::new(Arc::clone(&config), project_source).start();

let aggregator = RouterService::new(
config.default_aggregator_config().clone(),
config.secondary_aggregator_configs().clone(),
Some(project_cache.clone().recipient()),
Some(legacy_project_cache.clone().recipient()),
);
let aggregator_handle = aggregator.handle();
let aggregator = aggregator.start();
Expand Down Expand Up @@ -229,11 +243,12 @@ impl ServiceState {
create_processor_pool(&config)?,
config.clone(),
global_config_handle,
project_cache_handle.clone(),
cogs,
#[cfg(feature = "processing")]
redis_pools.clone(),
processor::Addrs {
project_cache: project_cache.clone(),
legacy_project_cache: legacy_project_cache.clone(),
outcome_aggregator: outcome_aggregator.clone(),
upstream_relay: upstream_relay.clone(),
test_store: test_store.clone(),
Expand All @@ -252,35 +267,32 @@ impl ServiceState {
global_config_rx.clone(),
buffer::Services {
envelopes_tx,
project_cache: project_cache.clone(),
project_cache_handle: project_cache_handle.clone(),
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
},
)
.map(|b| b.start_observable());

// Keep all the services in one context.
let project_cache_services = Services {
let project_cache_services = legacy::Services {
envelope_buffer: envelope_buffer.as_ref().map(ObservableEnvelopeBuffer::addr),
aggregator: aggregator.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
project_cache: project_cache.clone(),
project_cache: legacy_project_cache.clone(),
test_store: test_store.clone(),
upstream_relay: upstream_relay.clone(),
};

ProjectCacheService::new(
legacy::ProjectCacheService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
project_cache_handle.clone(),
project_cache_services,
global_config_rx,
envelopes_rx,
redis_pools
.as_ref()
.map(|pools| pools.project_configs.clone()),
)
.spawn_handler(project_cache_rx);
.spawn_handler(legacy_project_cache_rx);

let health_check = HealthCheckService::new(
config.clone(),
Expand Down Expand Up @@ -310,7 +322,9 @@ impl ServiceState {
test_store,
relay_cache,
global_config,
legacy_project_cache,
project_cache,
project_cache_handle,
upstream_relay,
envelope_buffer,
};
Expand Down Expand Up @@ -343,9 +357,9 @@ impl ServiceState {
self.inner.registry.envelope_buffer.as_ref()
}

/// Returns the address of the [`ProjectCache`] service.
pub fn project_cache(&self) -> &Addr<ProjectCache> {
&self.inner.registry.project_cache
/// Returns the address of the [`legacy::ProjectCache`] service.
pub fn legacy_project_cache(&self) -> &Addr<legacy::ProjectCache> {
&self.inner.registry.legacy_project_cache
}

/// Returns a [`ProjectCacheHandle`].
Expand Down
19 changes: 8 additions & 11 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ use crate::services::outcome::DiscardReason;
use crate::services::outcome::Outcome;
use crate::services::outcome::TrackOutcome;
use crate::services::processor::ProcessingGroup;
use crate::services::projects::cache::DequeuedEnvelope;

use crate::services::projects::cache2::ProjectCacheHandle;
use crate::services::projects::cache2::ProjectEvent;
use crate::services::projects::cache::{legacy, ProjectCacheHandle, ProjectEvent};
use crate::services::test_store::TestStore;
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::ManagedEnvelope;
Expand Down Expand Up @@ -101,7 +98,7 @@ impl ObservableEnvelopeBuffer {
pub struct Services {
/// Bounded channel used exclusively to handle backpressure when sending envelopes to the
/// project cache.
pub envelopes_tx: mpsc::Sender<DequeuedEnvelope>,
pub envelopes_tx: mpsc::Sender<legacy::DequeuedEnvelope>,
pub project_cache_handle: ProjectCacheHandle,
pub outcome_aggregator: Addr<TrackOutcome>,
pub test_store: Addr<TestStore>,
Expand Down Expand Up @@ -159,7 +156,7 @@ impl EnvelopeBufferService {
&mut self,
buffer: &PolymorphicEnvelopeBuffer,
dequeue: bool,
) -> Option<Permit<DequeuedEnvelope>> {
) -> Option<Permit<legacy::DequeuedEnvelope>> {
relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "checking"
Expand Down Expand Up @@ -220,7 +217,7 @@ impl EnvelopeBufferService {
config: &Config,
buffer: &mut PolymorphicEnvelopeBuffer,
services: &Services,
envelopes_tx_permit: Permit<'a, DequeuedEnvelope>,
envelopes_tx_permit: Permit<'a, legacy::DequeuedEnvelope>,
) -> Result<Duration, EnvelopeBufferError> {
let sleep = match buffer.peek().await? {
Peek::Empty => {
Expand Down Expand Up @@ -253,7 +250,7 @@ impl EnvelopeBufferService {
.pop()
.await?
.expect("Element disappeared despite exclusive excess");
envelopes_tx_permit.send(DequeuedEnvelope(envelope));
envelopes_tx_permit.send(legacy::DequeuedEnvelope(envelope));

Duration::ZERO // try next pop immediately
}
Expand Down Expand Up @@ -430,7 +427,7 @@ impl Service for EnvelopeBufferService {
}
}
}
ProjectEvent::Ready(project_key) = project_events.recv() => {
Ok(ProjectEvent::Ready(project_key)) = project_events.recv() => {
Self::handle_message(&mut buffer, EnvelopeBuffer::Ready(project_key)).await;
sleep = Duration::ZERO;
}
Expand Down Expand Up @@ -490,8 +487,8 @@ mod tests {
struct EnvelopeBufferServiceResult {
service: EnvelopeBufferService,
global_tx: watch::Sender<global_config::Status>,
envelopes_rx: mpsc::Receiver<DequeuedEnvelope>,
project_cache_rx: mpsc::UnboundedReceiver<ProjectCache>,
envelopes_rx: mpsc::Receiver<legacy::DequeuedEnvelope>,
project_cache_rx: mpsc::UnboundedReceiver<legacy::ProjectCache>,
outcome_aggregator_rx: mpsc::UnboundedReceiver<TrackOutcome>,
}

Expand Down
Loading

0 comments on commit 44489a1

Please sign in to comment.