From 828b13094f969a5b74d804f84eb8267abe05f77a Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 13 Sep 2024 09:56:10 +0200 Subject: [PATCH] feat(spooler): Implement shutdown behavior in the spooler (#3980) --- CHANGELOG.md | 1 + .../services/buffer/envelope_buffer/mod.rs | 35 +++++- .../services/buffer/envelope_stack/memory.rs | 4 + .../src/services/buffer/envelope_stack/mod.rs | 4 + .../services/buffer/envelope_stack/sqlite.rs | 32 +++++ relay-server/src/services/buffer/mod.rs | 69 +++++++++-- .../services/buffer/stack_provider/memory.rs | 8 ++ .../src/services/buffer/stack_provider/mod.rs | 6 + .../services/buffer/stack_provider/sqlite.rs | 113 +++++++++++++++++- relay-server/src/statsd.rs | 3 + tests/integration/test_basic.py | 65 +++++++++- 11 files changed, 318 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c6d0dead57..91371a6808 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ - Remove the `generate-schema` tool. Relay no longer exposes JSON schema for the event protocol. Consult the Rust type documentation of the `relay-event-schema` crate instead. ([#3974](https://github.com/getsentry/relay/pull/3974)) - Allow creation of `SqliteEnvelopeBuffer` from config, and load existing stacks from db on startup. ([#3967](https://github.com/getsentry/relay/pull/3967)) - Only tag `user.geo.subregion` on frontend and mobile projects. ([#4013](https://github.com/getsentry/relay/pull/4013), [#4023](https://github.com/getsentry/relay/pull/4023)) +- Implement graceful shutdown mechanism in the `EnvelopeBuffer`. ([#3980](https://github.com/getsentry/relay/pull/3980)) ## 24.8.0 diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index e4fdd7d205..d77efb72df 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -2,6 +2,7 @@ use std::cmp::Ordering; use std::collections::BTreeSet; use std::convert::Infallible; use std::error::Error; +use std::mem; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::Arc; @@ -55,9 +56,11 @@ impl PolymorphicEnvelopeBuffer { memory_checker: MemoryChecker, ) -> Result { let buffer = if config.spool_envelopes_path().is_some() { + relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer"); let buffer = EnvelopeBuffer::::new(config).await?; Self::Sqlite(buffer) } else { + relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer"); let buffer = EnvelopeBuffer::::new(memory_checker); Self::InMemory(buffer) }; @@ -137,6 +140,20 @@ impl PolymorphicEnvelopeBuffer { Self::InMemory(buffer) => buffer.has_capacity(), } } + + /// Shuts down the [`PolymorphicEnvelopeBuffer`]. + pub async fn shutdown(&mut self) -> bool { + // Currently, we want to flush the buffer only for disk, since the in memory implementation + // tries to not do anything and pop as many elements as possible within the shutdown + // timeout. + let Self::Sqlite(buffer) = self else { + relay_log::trace!("PolymorphicEnvelopeBuffer: shutdown procedure not needed"); + return false; + }; + buffer.flush().await; + + true + } } /// Error that occurs while interacting with the envelope buffer. @@ -374,6 +391,19 @@ where }); } + /// Returns `true` if the underlying storage has the capacity to store more envelopes. + pub fn has_capacity(&self) -> bool { + self.stack_provider.has_store_capacity() + } + + /// Flushes the envelope buffer. + pub async fn flush(&mut self) { + let priority_queue = mem::take(&mut self.priority_queue); + self.stack_provider + .flush(priority_queue.into_iter().map(|(q, _)| q.value)) + .await; + } + /// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted. async fn push_stack( &mut self, @@ -413,11 +443,6 @@ where Ok(()) } - /// Returns `true` if the underlying storage has the capacity to store more envelopes. - pub fn has_capacity(&self) -> bool { - self.stack_provider.has_store_capacity() - } - /// Pops an [`EnvelopeStack`] with the supplied [`EnvelopeBufferError`]. fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) { for project_key in project_key_pair.iter() { diff --git a/relay-server/src/services/buffer/envelope_stack/memory.rs b/relay-server/src/services/buffer/envelope_stack/memory.rs index d9723e601a..ceb771ec95 100644 --- a/relay-server/src/services/buffer/envelope_stack/memory.rs +++ b/relay-server/src/services/buffer/envelope_stack/memory.rs @@ -28,4 +28,8 @@ impl EnvelopeStack for MemoryEnvelopeStack { async fn pop(&mut self) -> Result>, Self::Error> { Ok(self.0.pop()) } + + fn flush(self) -> Vec> { + self.0 + } } diff --git a/relay-server/src/services/buffer/envelope_stack/mod.rs b/relay-server/src/services/buffer/envelope_stack/mod.rs index 8e48f7391f..8acfff0600 100644 --- a/relay-server/src/services/buffer/envelope_stack/mod.rs +++ b/relay-server/src/services/buffer/envelope_stack/mod.rs @@ -19,4 +19,8 @@ pub trait EnvelopeStack: Send + std::fmt::Debug { /// Pops the [`Envelope`] on top of the stack. fn pop(&mut self) -> impl Future>, Self::Error>>; + + /// Persists all envelopes in the [`EnvelopeStack`]s to external storage, if possible, + /// and consumes the stack provider. + fn flush(self) -> Vec>; } diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 55336e9cc1..703569ddab 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -234,6 +234,10 @@ impl EnvelopeStack for SqliteEnvelopeStack { Ok(result) } + + fn flush(self) -> Vec> { + self.batches_buffer.into_iter().flatten().collect() + } } #[cfg(test)] @@ -461,4 +465,32 @@ mod tests { } assert_eq!(stack.batches_buffer_size, 0); } + + #[tokio::test] + async fn test_drain() { + let db = setup_db(true).await; + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let mut stack = SqliteEnvelopeStack::new( + envelope_store.clone(), + 5, + 1, + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, + ); + + let envelopes = mock_envelopes(5); + + // We push 5 envelopes and check that there is nothing on disk. + for envelope in envelopes.clone() { + assert!(stack.push(envelope).await.is_ok()); + } + assert_eq!(stack.batches_buffer_size, 5); + assert_eq!(envelope_store.total_count().await.unwrap(), 0); + + // We drain the stack and make sure nothing was spooled to disk. + let drained_envelopes = stack.flush(); + assert_eq!(drained_envelopes.into_iter().collect::>().len(), 5); + assert_eq!(envelope_store.total_count().await.unwrap(), 0); + } } diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 1955642c8b..92c31cd5eb 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -1,5 +1,6 @@ //! Types for buffering envelopes. +use std::error::Error; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -7,10 +8,11 @@ use std::time::Duration; use relay_base_schema::project::ProjectKey; use relay_config::Config; -use relay_system::Request; use relay_system::SendError; use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; +use relay_system::{Controller, Request, Shutdown}; use tokio::sync::watch; +use tokio::time::timeout; use crate::envelope::Envelope; use crate::services::buffer::envelope_buffer::Peek; @@ -208,10 +210,10 @@ impl EnvelopeBufferService { &mut self, buffer: &mut PolymorphicEnvelopeBuffer, ) -> Result<(), EnvelopeBufferError> { - relay_log::trace!("EnvelopeBufferService peek"); + relay_log::trace!("EnvelopeBufferService: peeking the buffer"); match buffer.peek().await? { Peek::Empty => { - relay_log::trace!("EnvelopeBufferService empty"); + relay_log::trace!("EnvelopeBufferService: peek returned empty"); relay_statsd::metric!( counter(RelayCounters::BufferTryPop) += 1, peek_result = "empty" @@ -219,7 +221,7 @@ impl EnvelopeBufferService { self.sleep = Duration::MAX; // wait for reset by `handle_message`. } Peek::Ready(_) => { - relay_log::trace!("EnvelopeBufferService pop"); + relay_log::trace!("EnvelopeBufferService: popping envelope"); relay_statsd::metric!( counter(RelayCounters::BufferTryPop) += 1, peek_result = "ready" @@ -234,7 +236,7 @@ impl EnvelopeBufferService { self.sleep = Duration::ZERO; // try next pop immediately } Peek::NotReady(stack_key, envelope) => { - relay_log::trace!("EnvelopeBufferService request update"); + relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready, requesting project update"); relay_statsd::metric!( counter(RelayCounters::BufferTryPop) += 1, peek_result = "not_ready" @@ -268,23 +270,55 @@ impl EnvelopeBufferService { // projects was already triggered (see XXX). // For better separation of concerns, this prefetch should be triggered from here // once buffer V1 has been removed. - relay_log::trace!("EnvelopeBufferService push"); + relay_log::trace!("EnvelopeBufferService: received push message"); self.push(buffer, envelope).await; } EnvelopeBuffer::NotReady(project_key, envelope) => { - relay_log::trace!("EnvelopeBufferService project not ready"); + relay_log::trace!( + "EnvelopeBufferService: received project not ready message for project key {}", + &project_key + ); buffer.mark_ready(&project_key, false); relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesReturned) += 1); self.push(buffer, envelope).await; } EnvelopeBuffer::Ready(project_key) => { - relay_log::trace!("EnvelopeBufferService project ready {}", &project_key); + relay_log::trace!( + "EnvelopeBufferService: received project ready message for project key {}", + &project_key + ); buffer.mark_ready(&project_key, true); } }; self.sleep = Duration::ZERO; } + async fn handle_shutdown( + &mut self, + buffer: &mut PolymorphicEnvelopeBuffer, + message: Shutdown, + ) -> bool { + // We gracefully shut down only if the shutdown has a timeout. + if let Some(shutdown_timeout) = message.timeout { + relay_log::trace!("EnvelopeBufferService: shutting down gracefully"); + + let shutdown_result = timeout(shutdown_timeout, buffer.shutdown()).await; + match shutdown_result { + Ok(shutdown_result) => { + return shutdown_result; + } + Err(error) => { + relay_log::error!( + error = &error as &dyn Error, + "the envelope buffer didn't shut down in time, some envelopes might be lost", + ); + } + } + } + + false + } + async fn push(&mut self, buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { if let Err(e) = buffer.push(envelope).await { relay_log::error!( @@ -322,15 +356,17 @@ impl Service for EnvelopeBufferService { }; buffer.initialize().await; - relay_log::info!("EnvelopeBufferService start"); + let mut shutdown = Controller::shutdown_handle(); + + relay_log::info!("EnvelopeBufferService: starting"); let mut iteration = 0; loop { iteration += 1; - relay_log::trace!("EnvelopeBufferService loop iteration {iteration}"); + relay_log::trace!("EnvelopeBufferService: loop iteration {iteration}"); tokio::select! { // NOTE: we do not select a bias here. - // On the one hand, we might want to prioritize dequeing over enqueing + // On the one hand, we might want to prioritize dequeuing over enqueuing // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. Ok(()) = self.ready_to_pop(&mut buffer) => { @@ -344,8 +380,15 @@ impl Service for EnvelopeBufferService { Some(message) = rx.recv() => { self.handle_message(&mut buffer, message).await; } + shutdown = shutdown.notified() => { + // In case the shutdown was handled, we break out of the loop signaling that + // there is no need to process anymore envelopes. + if self.handle_shutdown(&mut buffer, shutdown).await { + break; + } + } _ = global_config_rx.changed() => { - relay_log::trace!("EnvelopeBufferService received global config"); + relay_log::trace!("EnvelopeBufferService: received global config"); self.sleep = Duration::ZERO; // Try to pop } else => break, @@ -354,7 +397,7 @@ impl Service for EnvelopeBufferService { self.update_observable_state(&mut buffer); } - relay_log::info!("EnvelopeBufferService stop"); + relay_log::info!("EnvelopeBufferService: stopping"); }); } } diff --git a/relay-server/src/services/buffer/stack_provider/memory.rs b/relay-server/src/services/buffer/stack_provider/memory.rs index 2b0b7822cf..230db32b34 100644 --- a/relay-server/src/services/buffer/stack_provider/memory.rs +++ b/relay-server/src/services/buffer/stack_provider/memory.rs @@ -4,6 +4,7 @@ use crate::services::buffer::stack_provider::{ InitializationState, StackCreationType, StackProvider, }; use crate::utils::MemoryChecker; +use crate::EnvelopeStack; #[derive(Debug)] pub struct MemoryStackProvider { @@ -41,4 +42,11 @@ impl StackProvider for MemoryStackProvider { fn stack_type<'a>(&self) -> &'a str { "memory" } + + async fn flush(&mut self, envelope_stacks: impl IntoIterator) { + for envelope_stack in envelope_stacks { + // The flushed envelopes will be immediately dropped. + let _ = envelope_stack.flush(); + } + } } diff --git a/relay-server/src/services/buffer/stack_provider/mod.rs b/relay-server/src/services/buffer/stack_provider/mod.rs index 1b1e319509..715d70c436 100644 --- a/relay-server/src/services/buffer/stack_provider/mod.rs +++ b/relay-server/src/services/buffer/stack_provider/mod.rs @@ -60,4 +60,10 @@ pub trait StackProvider: std::fmt::Debug { /// Returns the string representation of the stack type offered by this [`StackProvider`]. fn stack_type<'a>(&self) -> &'a str; + + /// Flushes the supplied [`EnvelopeStack`]s and consumes the [`StackProvider`]. + fn flush( + &mut self, + envelope_stacks: impl IntoIterator, + ) -> impl Future; } diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 5399e8e6db..15e4be6c4e 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -1,6 +1,7 @@ -use relay_config::Config; use std::error::Error; +use relay_config::Config; + use crate::services::buffer::common::ProjectKeyPair; use crate::services::buffer::envelope_store::sqlite::{ SqliteEnvelopeStore, SqliteEnvelopeStoreError, @@ -8,7 +9,8 @@ use crate::services::buffer::envelope_store::sqlite::{ use crate::services::buffer::stack_provider::{ InitializationState, StackCreationType, StackProvider, }; -use crate::SqliteEnvelopeStack; +use crate::statsd::RelayTimers; +use crate::{Envelope, EnvelopeStack, SqliteEnvelopeStack}; #[derive(Debug)] pub struct SqliteStackProvider { @@ -16,6 +18,7 @@ pub struct SqliteStackProvider { disk_batch_size: usize, max_batches: usize, max_disk_size: usize, + drain_batch_size: usize, } #[warn(dead_code)] @@ -28,9 +31,29 @@ impl SqliteStackProvider { disk_batch_size: config.spool_envelopes_stack_disk_batch_size(), max_batches: config.spool_envelopes_stack_max_batches(), max_disk_size: config.spool_envelopes_max_disk_size(), + drain_batch_size: config.spool_envelopes_stack_disk_batch_size(), }) } + /// Inserts the supplied [`Envelope`]s in the database. + #[allow(clippy::vec_box)] + async fn drain_many(&mut self, envelopes: Vec>) { + if let Err(error) = self + .envelope_store + .insert_many( + envelopes + .into_iter() + .filter_map(|e| e.as_ref().try_into().ok()), + ) + .await + { + relay_log::error!( + error = &error as &dyn Error, + "failed to drain the envelope stacks, some envelopes might be lost", + ); + } + } + /// Returns `true` when there might be data residing on disk, `false` otherwise. fn assume_data_on_disk(stack_creation_type: StackCreationType) -> bool { matches!(stack_creation_type, StackCreationType::Initialization) @@ -94,4 +117,90 @@ impl StackProvider for SqliteStackProvider { fn stack_type<'a>(&self) -> &'a str { "sqlite" } + + async fn flush(&mut self, envelope_stacks: impl IntoIterator) { + relay_log::trace!("Flushing sqlite envelope buffer"); + + relay_statsd::metric!(timer(RelayTimers::BufferDrain), { + let mut envelopes = Vec::with_capacity(self.drain_batch_size); + for envelope_stack in envelope_stacks { + for envelope in envelope_stack.flush() { + if envelopes.len() >= self.drain_batch_size { + self.drain_many(envelopes).await; + envelopes = Vec::with_capacity(self.drain_batch_size); + } + + envelopes.push(envelope); + } + } + + if !envelopes.is_empty() { + self.drain_many(envelopes).await; + } + }); + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use relay_base_schema::project::ProjectKey; + use relay_config::Config; + use uuid::Uuid; + + use crate::services::buffer::common::ProjectKeyPair; + use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; + use crate::services::buffer::stack_provider::{StackCreationType, StackProvider}; + use crate::services::buffer::testutils::utils::mock_envelopes; + use crate::EnvelopeStack; + + fn mock_config() -> Arc { + let path = std::env::temp_dir() + .join(Uuid::new_v4().to_string()) + .into_os_string() + .into_string() + .unwrap(); + + Config::from_json_value(serde_json::json!({ + "spool": { + "envelopes": { + "path": path, + "disk_batch_size": 100, + "max_batches": 1, + } + } + })) + .unwrap() + .into() + } + + #[tokio::test] + async fn test_flush() { + let config = mock_config(); + let mut stack_provider = SqliteStackProvider::new(&config).await.unwrap(); + + let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); + + let mut envelope_stack = stack_provider.create_stack( + StackCreationType::New, + ProjectKeyPair::new(own_key, sampling_key), + ); + + let envelopes = mock_envelopes(10); + for envelope in envelopes { + envelope_stack.push(envelope).await.unwrap(); + } + + let envelope_store = stack_provider.envelope_store.clone(); + + // We make sure that no data is on disk since we will spool when more than 100 elements are + // in the in-memory stack. + assert_eq!(envelope_store.total_count().await.unwrap(), 0); + + // We drain the stack provider, and we expect all in-memory envelopes to be spooled to disk. + stack_provider.flush(vec![envelope_stack]).await; + assert_eq!(envelope_store.total_count().await.unwrap(), 10); + } } diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 4fddb01fb2..103fc03e79 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -530,6 +530,8 @@ pub enum RelayTimers { BufferPeek, /// Timing in milliseconds for the time it takes for the buffer to pop. BufferPop, + /// Timing in milliseconds for the time it takes for the buffer to drain its envelopes. + BufferDrain, } impl TimerMetric for RelayTimers { @@ -577,6 +579,7 @@ impl TimerMetric for RelayTimers { RelayTimers::BufferPush => "buffer.push.duration", RelayTimers::BufferPeek => "buffer.peek.duration", RelayTimers::BufferPop => "buffer.pop.duration", + RelayTimers::BufferDrain => "buffer.drain.duration", } } } diff --git a/tests/integration/test_basic.py b/tests/integration/test_basic.py index 6b6a943b20..d3e42def0f 100644 --- a/tests/integration/test_basic.py +++ b/tests/integration/test_basic.py @@ -2,12 +2,15 @@ import queue import os import gzip +import sqlite3 +import tempfile + import pytest import signal import zlib -def test_graceful_shutdown(mini_sentry, relay): +def test_graceful_shutdown_with_in_memory_buffer(mini_sentry, relay): from time import sleep get_project_config_original = mini_sentry.app.view_functions["get_project_config"] @@ -17,16 +20,74 @@ def get_project_config(): sleep(1) # Causes the process to wait for one second before shutting down return get_project_config_original() - relay = relay(mini_sentry, {"limits": {"shutdown_timeout": 2}}) project_id = 42 mini_sentry.add_basic_project_config(project_id) + + relay = relay( + mini_sentry, + { + "limits": {"shutdown_timeout": 2}, + "spool": {"envelopes": {"version": "experimental"}}, + }, + ) + relay.send_event(project_id) relay.shutdown(sig=signal.SIGTERM) + + # When using the memory envelope buffer, we optimistically do not do anything on shutdown, which means that the + # buffer will try and pop as always as long as it can (within the shutdown timeout). event = mini_sentry.captured_events.get(timeout=0).get_event() assert event["logentry"] == {"formatted": "Hello, World!"} +def test_graceful_shutdown_with_sqlite_buffer(mini_sentry, relay): + from time import sleep + + # Create a temporary directory for the sqlite db. + db_file_path = os.path.join(tempfile.mkdtemp(), "database.db") + + get_project_config_original = mini_sentry.app.view_functions["get_project_config"] + + @mini_sentry.app.endpoint("get_project_config") + def get_project_config(): + sleep(1) # Causes the process to wait for one second before shutting down + return get_project_config_original() + + project_id = 42 + mini_sentry.add_basic_project_config(project_id) + + relay = relay( + mini_sentry, + { + "limits": {"shutdown_timeout": 2}, + "spool": {"envelopes": {"version": "experimental", "path": db_file_path}}, + }, + ) + + n = 10 + for i in range(n): + relay.send_event(project_id) + + relay.shutdown(sig=signal.SIGTERM) + + # When using the disk envelope buffer, we don't forward envelopes, but we spool them to disk. + assert mini_sentry.captured_events.empty() + + # Check if there's data in the SQLite table `envelopes` + conn = sqlite3.connect(db_file_path) + cursor = conn.cursor() + + # Check if there's data in the `envelopes` table + cursor.execute("SELECT COUNT(*) FROM envelopes") + row_count = cursor.fetchone()[0] + assert ( + row_count == n + ), f"The 'envelopes' table is empty. Expected {n} rows, but found {row_count}" + + conn.close() + + @pytest.mark.skip("Flaky test") def test_forced_shutdown(mini_sentry, relay): from time import sleep