diff --git a/sequencer/api/public-env-vars.toml b/sequencer/api/public-env-vars.toml index 35a5620de..4fbd6db5a 100644 --- a/sequencer/api/public-env-vars.toml +++ b/sequencer/api/public-env-vars.toml @@ -60,7 +60,9 @@ variables = [ "ESPRESSO_SEQUENCER_CATCHUP_MAX_RETRY_DELAY", "ESPRESSO_SEQUENCER_CDN_ENDPOINT", "ESPRESSO_SEQUENCER_CHUNK_FETCH_DELAY", - "ESPRESSO_SEQUENCER_CONSENSUS_VIEW_RETENTION", + "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_MINIMUM_RETENTION", + "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_RETENTION", + "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_USAGE", "ESPRESSO_SEQUENCER_FETCH_RATE_LIMIT", "ESPRESSO_SEQUENCER_HOTSHOT_ADDRESS", "ESPRESSO_SEQUENCER_HOTSHOT_EVENT_STREAMING_API_PORT", diff --git a/sequencer/src/api/sql.rs b/sequencer/src/api/sql.rs index fd9458b41..6b1b8d1a4 100644 --- a/sequencer/src/api/sql.rs +++ b/sequencer/src/api/sql.rs @@ -53,7 +53,7 @@ impl SequencerDataSource for DataSource { let fetch_limit = opt.fetch_rate_limit; let active_fetch_delay = opt.active_fetch_delay; let chunk_fetch_delay = opt.chunk_fetch_delay; - let mut cfg = Config::try_from(opt)?; + let mut cfg = Config::try_from(&opt)?; if reset { cfg = cfg.reset_schema(); diff --git a/sequencer/src/persistence/sql.rs b/sequencer/src/persistence/sql.rs index 65590e622..6c12a3584 100644 --- a/sequencer/src/persistence/sql.rs +++ b/sequencer/src/persistence/sql.rs @@ -14,7 +14,10 @@ use hotshot_query_service::{ data_source::{ storage::{ pruning::PrunerCfg, - sql::{include_migrations, query_as, Config, SqlStorage, Transaction, TransactionMode}, + sql::{ + include_migrations, query_as, Config, SqlStorage, Transaction, TransactionMode, + Write, + }, }, Transaction as _, VersionedDataSource, }, @@ -37,6 +40,7 @@ use hotshot_types::{ vid::{VidCommitment, VidCommon}, vote::HasViewNumber, }; +use itertools::Itertools; use sqlx::Row; use sqlx::{query, Executor}; use std::sync::Arc; @@ -103,6 +107,10 @@ pub struct Options { #[clap(flatten)] pub(crate) pruning: PruningOptions, + /// Pruning parameters for ephemeral consensus storage. + #[clap(flatten)] + pub(crate) consensus_pruning: ConsensusPruningOptions, + #[clap(long, env = "ESPRESSO_SEQUENCER_STORE_UNDECIDED_STATE", hide = true)] pub(crate) store_undecided_state: bool, @@ -126,24 +134,6 @@ pub struct Options { /// fetching from peers. #[clap(long, env = "ESPRESSO_SEQUENCER_ARCHIVE", conflicts_with = "prune")] pub(crate) archive: bool, - - /// Number of views to retain in consensus storage before data that hasn't been archived is - /// garbage collected. - /// - /// The longer this is, the more certain that all data will eventually be archived, even if - /// there are temporary problems with archive storage or partially missing data. This can be set - /// very large, as most data is garbage collected as soon as it is finalized by consensus. This - /// setting only applies to views which never get decided (ie forks in consensus) and views for - /// which this node is partially offline. These should be exceptionally rare. - /// - /// The default of 130000 views equates to approximately 3 days (259200 seconds) at an average - /// view time of 2s. - #[clap( - long, - env = "ESPRESSO_SEQUENCER_CONSENSUS_VIEW_RETENTION", - default_value = "130000" - )] - pub(crate) consensus_view_retention: u64, } impl Default for Options { @@ -152,17 +142,17 @@ impl Default for Options { } } -impl TryFrom for Config { +impl TryFrom<&Options> for Config { type Error = anyhow::Error; - fn try_from(opt: Options) -> Result { - let mut cfg = match opt.uri { + fn try_from(opt: &Options) -> Result { + let mut cfg = match &opt.uri { Some(uri) => uri.parse()?, None => Self::default(), }; cfg = cfg.migrations(include_migrations!("$CARGO_MANIFEST_DIR/api/migrations")); - if let Some(host) = opt.host { + if let Some(host) = &opt.host { cfg = cfg.host(host); } if let Some(port) = opt.port { @@ -193,7 +183,7 @@ impl TryFrom for Config { } /// Pruning parameters. -#[derive(Parser, Clone, Debug)] +#[derive(Parser, Clone, Copy, Debug)] pub struct PruningOptions { /// Threshold for pruning, specified in bytes. /// If the disk usage surpasses this threshold, pruning is initiated for data older than the specified minimum retention period. @@ -266,26 +256,84 @@ impl From for PrunerCfg { } } +/// Pruning parameters for ephemeral consensus storage. +#[derive(Parser, Clone, Debug)] +pub struct ConsensusPruningOptions { + /// Number of views to try to retain in consensus storage before data that hasn't been archived + /// is garbage collected. + /// + /// The longer this is, the more certain that all data will eventually be archived, even if + /// there are temporary problems with archive storage or partially missing data. This can be set + /// very large, as most data is garbage collected as soon as it is finalized by consensus. This + /// setting only applies to views which never get decided (ie forks in consensus) and views for + /// which this node is partially offline. These should be exceptionally rare. + /// + /// Note that in extreme scenarios, data may be garbage collected even before TARGET_RETENTION + /// views, if consensus storage exceeds TARGET_USAGE. For a hard lower bound on how long + /// consensus data will be retained, see MINIMUM_RETENTION. + /// + /// The default of 302000 views equates to approximately 1 week (604800 seconds) at an average + /// view time of 2s. + #[clap( + name = "TARGET_RETENTION", + long = "consensus-storage-target-retention", + env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_RETENTION", + default_value = "302000" + )] + target_retention: u64, + + /// Minimum number of views to try to retain in consensus storage before data that hasn't been + /// archived is garbage collected. + /// + /// This bound allows data to be retained even if consensus storage occupies more than + /// TARGET_USAGE. This can be used to ensure sufficient time to move consensus data to archival + /// storage as necessary, even under extreme circumstances where otherwise garbage collection + /// would kick in based on TARGET_RETENTION. + /// + /// The default of 130000 views equates to approximately 3 days (259200 seconds) at an average + /// view time of 2s. + #[clap( + name = "MINIMUM_RETENTION", + long = "consensus-storage-minimum-retention", + env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_MINIMUM_RETENTION", + default_value = "130000" + )] + minimum_retention: u64, + + /// Amount (in bytes) of data to retain in consensus storage before garbage collecting more + /// aggressively. + /// + /// See also TARGET_RETENTION and MINIMUM_RETENTION. + #[clap( + name = "TARGET_USAGE", + long = "consensus-storage-target-usage", + env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_USAGE", + default_value = "1000000000" + )] + target_usage: u64, +} + #[async_trait] impl PersistenceOptions for Options { type Persistence = Persistence; fn set_view_retention(&mut self, view_retention: u64) { - self.consensus_view_retention = view_retention; + self.consensus_pruning.target_retention = view_retention; + self.consensus_pruning.minimum_retention = view_retention; } async fn create(self) -> anyhow::Result { let persistence = Persistence { store_undecided_state: self.store_undecided_state, - view_retention: self.consensus_view_retention, - db: SqlStorage::connect(self.try_into()?).await?, + db: SqlStorage::connect((&self).try_into()?).await?, + gc_opt: self.consensus_pruning, }; persistence.migrate_quorum_proposal_leaf_hashes().await?; Ok(persistence) } async fn reset(self) -> anyhow::Result<()> { - SqlStorage::connect(Config::try_from(self)?.reset_schema()).await?; + SqlStorage::connect(Config::try_from(&self)?.reset_schema()).await?; Ok(()) } } @@ -295,7 +343,7 @@ impl PersistenceOptions for Options { pub struct Persistence { db: SqlStorage, store_undecided_state: bool, - view_retention: u64, + gc_opt: ConsensusPruningOptions, } impl Persistence { @@ -531,58 +579,73 @@ impl Persistence { } #[tracing::instrument(skip(self))] - async fn prune(&self, view: ViewNumber) -> anyhow::Result<()> { - let view = view.u64().saturating_sub(self.view_retention) as i64; - if view == 0 { - // Nothing to prune, the entire chain is younger than the retention period. - return Ok(()); - } - + async fn prune(&self, cur_view: ViewNumber) -> anyhow::Result<()> { let mut tx = self.db.write().await?; - let res = query("DELETE FROM anchor_leaf WHERE view < $1") - .bind(view) - .execute(tx.as_mut()) - .await - .context("deleting old anchor leaves")?; - tracing::debug!("garbage collecting {} leaves", res.rows_affected()); + // Prune everything older than the target retention period. + prune_to_view( + &mut tx, + cur_view.u64().saturating_sub(self.gc_opt.target_retention), + ) + .await?; - let res = query("DELETE FROM vid_share WHERE view < $1") - .bind(view) - .execute(tx.as_mut()) - .await - .context("deleting old VID shares")?; - tracing::debug!("garbage collecting {} VID shares", res.rows_affected()); + // Check our storage usage; if necessary we will prune more aggressively (up to the minimum + // retention) to get below the target usage. + let table_sizes = PRUNE_TABLES + .iter() + .map(|table| format!("pg_table_size('{table}')")) + .join(" + "); + let usage_query = format!("SELECT {table_sizes}"); + let (usage,): (i64,) = query_as(&usage_query).fetch_one(tx.as_mut()).await?; + tracing::debug!(usage, "consensus storage usage after pruning"); + + if (usage as u64) > self.gc_opt.target_usage { + tracing::warn!( + usage, + gc_opt = ?self.gc_opt, + "consensus storage is running out of space, pruning to minimum retention" + ); + prune_to_view( + &mut tx, + cur_view.u64().saturating_sub(self.gc_opt.minimum_retention), + ) + .await?; + } - let res = query("DELETE FROM da_proposal WHERE view < $1") - .bind(view) - .execute(tx.as_mut()) - .await - .context("deleting old DA proposals")?; - tracing::debug!("garbage collecting {} DA proposals", res.rows_affected()); + tx.commit().await + } +} - let res = query("DELETE FROM quorum_proposals WHERE view < $1") - .bind(view) - .execute(tx.as_mut()) - .await - .context("deleting old quorum proposals")?; - tracing::debug!( - "garbage collecting {} quorum proposals", - res.rows_affected() - ); +const PRUNE_TABLES: &[&str] = &[ + "anchor_leaf", + "vid_share", + "da_proposal", + "quorum_proposals", + "quorum_certificate", +]; + +async fn prune_to_view(tx: &mut Transaction, view: u64) -> anyhow::Result<()> { + if view == 0 { + // Nothing to prune, the entire chain is younger than the retention period. + return Ok(()); + } + tracing::debug!(view, "pruning consensus storage"); - let res = query("DELETE FROM quorum_certificate WHERE view < $1") - .bind(view) + for table in PRUNE_TABLES { + let res = query(&format!("DELETE FROM {table} WHERE view < $1")) + .bind(view as i64) .execute(tx.as_mut()) .await - .context("deleting old quorum certificates")?; - tracing::debug!( - "garbage collecting {} quorum certificates", - res.rows_affected() - ); - - tx.commit().await + .context(format!("pruning {table}"))?; + if res.rows_affected() > 0 { + tracing::info!( + "garbage collected {} rows from {table}", + res.rows_affected() + ); + } } + + Ok(()) } #[async_trait] @@ -1113,7 +1176,8 @@ mod testing { TmpDb::init().await } - fn options(db: &Self::Storage) -> impl PersistenceOptions { + #[allow(refining_impl_trait)] + fn options(db: &Self::Storage) -> Options { Options { port: Some(db.port()), host: Some(db.host()), @@ -1139,11 +1203,11 @@ mod generic_tests { mod test { use super::*; use crate::{persistence::testing::TestablePersistence, BLSPubKey, PubKey}; - use espresso_types::{NodeState, ValidatedState}; + use espresso_types::{traits::NullEventConsumer, NodeState, ValidatedState}; use futures::stream::TryStreamExt; use hotshot_example_types::node_types::TestVersions; use hotshot_types::{ - traits::{signature_key::SignatureKey, EncodeBytes}, + traits::{block_contents::vid_commitment, signature_key::SignatureKey, EncodeBytes}, vid::vid_scheme, }; use jf_vid::VidScheme; @@ -1325,4 +1389,144 @@ mod test { .unwrap() ); } + + /// Test conditions that trigger pruning. + /// + /// This is a configurable test that can be used to test different configurations of GC, + /// `pruning_opt`. The test populates the database with some data for view 1, asserts that it is + /// retained for view 2, and then asserts that it is pruned by view 3. There are various + /// different configurations that can achieve this behavior, such that the data is retained and + /// then pruned due to different logic and code paths. + async fn test_pruning_helper(pruning_opt: ConsensusPruningOptions) { + setup_test(); + + let tmp = Persistence::tmp_storage().await; + let mut opt = Persistence::options(&tmp); + opt.consensus_pruning = pruning_opt; + let storage = opt.create().await.unwrap(); + + let data_view = ViewNumber::new(1); + + // Populate some data. + let leaf = Leaf::genesis(&ValidatedState::default(), &NodeState::mock()).await; + let leaf_payload = leaf.block_payload().unwrap(); + let leaf_payload_bytes_arc = leaf_payload.encode(); + + let disperse = vid_scheme(2) + .disperse(leaf_payload_bytes_arc.clone()) + .unwrap(); + let payload_commitment = vid_commitment(&leaf_payload_bytes_arc, 2); + let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1); + let vid = VidDisperseShare:: { + view_number: data_view, + payload_commitment, + share: disperse.shares[0].clone(), + common: disperse.common, + recipient_key: pubkey, + } + .to_proposal(&privkey) + .unwrap() + .clone(); + let quorum_proposal = QuorumProposal:: { + block_header: leaf.block_header().clone(), + view_number: data_view, + justify_qc: QuorumCertificate::genesis::( + &ValidatedState::default(), + &NodeState::mock(), + ) + .await, + upgrade_certificate: None, + proposal_certificate: None, + }; + let quorum_proposal_signature = + BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap()) + .expect("Failed to sign quorum proposal"); + let quorum_proposal = Proposal { + data: quorum_proposal, + signature: quorum_proposal_signature, + _pd: Default::default(), + }; + + let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc) + .expect("Failed to sign block payload"); + let da_proposal = Proposal { + data: DaProposal:: { + encoded_transactions: leaf_payload_bytes_arc.clone(), + metadata: leaf_payload.ns_table().clone(), + view_number: data_view, + }, + signature: block_payload_signature, + _pd: Default::default(), + }; + + tracing::info!(?vid, ?da_proposal, ?quorum_proposal, "append data"); + storage.append_vid(&vid).await.unwrap(); + storage + .append_da(&da_proposal, payload_commitment) + .await + .unwrap(); + storage + .append_quorum_proposal(&quorum_proposal) + .await + .unwrap(); + + // The first decide doesn't trigger any garbage collection, even though our usage exceeds + // the target, because of the minimum retention. + tracing::info!("decide view 1"); + storage + .append_decided_leaves(data_view + 1, [], &NullEventConsumer) + .await + .unwrap(); + assert_eq!( + storage.load_vid_share(data_view).await.unwrap().unwrap(), + vid + ); + assert_eq!( + storage.load_da_proposal(data_view).await.unwrap().unwrap(), + da_proposal + ); + assert_eq!( + storage.load_quorum_proposal(data_view).await.unwrap(), + quorum_proposal + ); + + // After another view, our data is beyond the minimum retention (though not the target + // retention) so it gets pruned. + tracing::info!("decide view 2"); + storage + .append_decided_leaves(data_view + 2, [], &NullEventConsumer) + .await + .unwrap(); + assert!(storage.load_vid_share(data_view).await.unwrap().is_none(),); + assert!(storage.load_da_proposal(data_view).await.unwrap().is_none()); + storage.load_quorum_proposal(data_view).await.unwrap_err(); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_pruning_minimum_retention() { + test_pruning_helper(ConsensusPruningOptions { + // Use a very low target usage, to show that we still retain data up to the minimum + // retention even when usage is above target. + target_usage: 0, + minimum_retention: 1, + // Use a very high target retention, so that pruning is only triggered by the minimum + // retention. + target_retention: u64::MAX, + }) + .await + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_pruning_target_retention() { + test_pruning_helper(ConsensusPruningOptions { + target_retention: 1, + // Use a very low minimum retention, so that data is only kept around due to the target + // retention. + minimum_retention: 0, + // Use a very high target usage, so that pruning is only triggered by the target + // retention. + target_usage: u64::MAX, + }) + .await + } }