From 2a32d567c14426d53df338fa1de56fdbf81f8a83 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Tue, 12 Nov 2024 17:20:36 -0800 Subject: [PATCH] Better configurability for consensus storage GC Add target, minimum retentions and target usage, like the archival pruner. This allows us to take full advantage of the storage space if we have it, keeping data around for longer, while still ensuring we keep it around long *enough* even if we are low on space. --- sequencer/api/public-env-vars.toml | 4 +- sequencer/src/api/sql.rs | 2 +- sequencer/src/persistence/sql.rs | 354 +++++++++++++++++++++++------ 3 files changed, 283 insertions(+), 77 deletions(-) diff --git a/sequencer/api/public-env-vars.toml b/sequencer/api/public-env-vars.toml index 35a5620de..4fbd6db5a 100644 --- a/sequencer/api/public-env-vars.toml +++ b/sequencer/api/public-env-vars.toml @@ -60,7 +60,9 @@ variables = [ "ESPRESSO_SEQUENCER_CATCHUP_MAX_RETRY_DELAY", "ESPRESSO_SEQUENCER_CDN_ENDPOINT", "ESPRESSO_SEQUENCER_CHUNK_FETCH_DELAY", - "ESPRESSO_SEQUENCER_CONSENSUS_VIEW_RETENTION", + "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_MINIMUM_RETENTION", + "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_RETENTION", + "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_USAGE", "ESPRESSO_SEQUENCER_FETCH_RATE_LIMIT", "ESPRESSO_SEQUENCER_HOTSHOT_ADDRESS", "ESPRESSO_SEQUENCER_HOTSHOT_EVENT_STREAMING_API_PORT", diff --git a/sequencer/src/api/sql.rs b/sequencer/src/api/sql.rs index fd9458b41..6b1b8d1a4 100644 --- a/sequencer/src/api/sql.rs +++ b/sequencer/src/api/sql.rs @@ -53,7 +53,7 @@ impl SequencerDataSource for DataSource { let fetch_limit = opt.fetch_rate_limit; let active_fetch_delay = opt.active_fetch_delay; let chunk_fetch_delay = opt.chunk_fetch_delay; - let mut cfg = Config::try_from(opt)?; + let mut cfg = Config::try_from(&opt)?; if reset { cfg = cfg.reset_schema(); diff --git a/sequencer/src/persistence/sql.rs b/sequencer/src/persistence/sql.rs index 65590e622..6c12a3584 100644 --- a/sequencer/src/persistence/sql.rs +++ b/sequencer/src/persistence/sql.rs @@ -14,7 +14,10 @@ use hotshot_query_service::{ data_source::{ storage::{ pruning::PrunerCfg, - sql::{include_migrations, query_as, Config, SqlStorage, Transaction, TransactionMode}, + sql::{ + include_migrations, query_as, Config, SqlStorage, Transaction, TransactionMode, + Write, + }, }, Transaction as _, VersionedDataSource, }, @@ -37,6 +40,7 @@ use hotshot_types::{ vid::{VidCommitment, VidCommon}, vote::HasViewNumber, }; +use itertools::Itertools; use sqlx::Row; use sqlx::{query, Executor}; use std::sync::Arc; @@ -103,6 +107,10 @@ pub struct Options { #[clap(flatten)] pub(crate) pruning: PruningOptions, + /// Pruning parameters for ephemeral consensus storage. + #[clap(flatten)] + pub(crate) consensus_pruning: ConsensusPruningOptions, + #[clap(long, env = "ESPRESSO_SEQUENCER_STORE_UNDECIDED_STATE", hide = true)] pub(crate) store_undecided_state: bool, @@ -126,24 +134,6 @@ pub struct Options { /// fetching from peers. #[clap(long, env = "ESPRESSO_SEQUENCER_ARCHIVE", conflicts_with = "prune")] pub(crate) archive: bool, - - /// Number of views to retain in consensus storage before data that hasn't been archived is - /// garbage collected. - /// - /// The longer this is, the more certain that all data will eventually be archived, even if - /// there are temporary problems with archive storage or partially missing data. This can be set - /// very large, as most data is garbage collected as soon as it is finalized by consensus. This - /// setting only applies to views which never get decided (ie forks in consensus) and views for - /// which this node is partially offline. These should be exceptionally rare. - /// - /// The default of 130000 views equates to approximately 3 days (259200 seconds) at an average - /// view time of 2s. - #[clap( - long, - env = "ESPRESSO_SEQUENCER_CONSENSUS_VIEW_RETENTION", - default_value = "130000" - )] - pub(crate) consensus_view_retention: u64, } impl Default for Options { @@ -152,17 +142,17 @@ impl Default for Options { } } -impl TryFrom for Config { +impl TryFrom<&Options> for Config { type Error = anyhow::Error; - fn try_from(opt: Options) -> Result { - let mut cfg = match opt.uri { + fn try_from(opt: &Options) -> Result { + let mut cfg = match &opt.uri { Some(uri) => uri.parse()?, None => Self::default(), }; cfg = cfg.migrations(include_migrations!("$CARGO_MANIFEST_DIR/api/migrations")); - if let Some(host) = opt.host { + if let Some(host) = &opt.host { cfg = cfg.host(host); } if let Some(port) = opt.port { @@ -193,7 +183,7 @@ impl TryFrom for Config { } /// Pruning parameters. -#[derive(Parser, Clone, Debug)] +#[derive(Parser, Clone, Copy, Debug)] pub struct PruningOptions { /// Threshold for pruning, specified in bytes. /// If the disk usage surpasses this threshold, pruning is initiated for data older than the specified minimum retention period. @@ -266,26 +256,84 @@ impl From for PrunerCfg { } } +/// Pruning parameters for ephemeral consensus storage. +#[derive(Parser, Clone, Debug)] +pub struct ConsensusPruningOptions { + /// Number of views to try to retain in consensus storage before data that hasn't been archived + /// is garbage collected. + /// + /// The longer this is, the more certain that all data will eventually be archived, even if + /// there are temporary problems with archive storage or partially missing data. This can be set + /// very large, as most data is garbage collected as soon as it is finalized by consensus. This + /// setting only applies to views which never get decided (ie forks in consensus) and views for + /// which this node is partially offline. These should be exceptionally rare. + /// + /// Note that in extreme scenarios, data may be garbage collected even before TARGET_RETENTION + /// views, if consensus storage exceeds TARGET_USAGE. For a hard lower bound on how long + /// consensus data will be retained, see MINIMUM_RETENTION. + /// + /// The default of 302000 views equates to approximately 1 week (604800 seconds) at an average + /// view time of 2s. + #[clap( + name = "TARGET_RETENTION", + long = "consensus-storage-target-retention", + env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_RETENTION", + default_value = "302000" + )] + target_retention: u64, + + /// Minimum number of views to try to retain in consensus storage before data that hasn't been + /// archived is garbage collected. + /// + /// This bound allows data to be retained even if consensus storage occupies more than + /// TARGET_USAGE. This can be used to ensure sufficient time to move consensus data to archival + /// storage as necessary, even under extreme circumstances where otherwise garbage collection + /// would kick in based on TARGET_RETENTION. + /// + /// The default of 130000 views equates to approximately 3 days (259200 seconds) at an average + /// view time of 2s. + #[clap( + name = "MINIMUM_RETENTION", + long = "consensus-storage-minimum-retention", + env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_MINIMUM_RETENTION", + default_value = "130000" + )] + minimum_retention: u64, + + /// Amount (in bytes) of data to retain in consensus storage before garbage collecting more + /// aggressively. + /// + /// See also TARGET_RETENTION and MINIMUM_RETENTION. + #[clap( + name = "TARGET_USAGE", + long = "consensus-storage-target-usage", + env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_USAGE", + default_value = "1000000000" + )] + target_usage: u64, +} + #[async_trait] impl PersistenceOptions for Options { type Persistence = Persistence; fn set_view_retention(&mut self, view_retention: u64) { - self.consensus_view_retention = view_retention; + self.consensus_pruning.target_retention = view_retention; + self.consensus_pruning.minimum_retention = view_retention; } async fn create(self) -> anyhow::Result { let persistence = Persistence { store_undecided_state: self.store_undecided_state, - view_retention: self.consensus_view_retention, - db: SqlStorage::connect(self.try_into()?).await?, + db: SqlStorage::connect((&self).try_into()?).await?, + gc_opt: self.consensus_pruning, }; persistence.migrate_quorum_proposal_leaf_hashes().await?; Ok(persistence) } async fn reset(self) -> anyhow::Result<()> { - SqlStorage::connect(Config::try_from(self)?.reset_schema()).await?; + SqlStorage::connect(Config::try_from(&self)?.reset_schema()).await?; Ok(()) } } @@ -295,7 +343,7 @@ impl PersistenceOptions for Options { pub struct Persistence { db: SqlStorage, store_undecided_state: bool, - view_retention: u64, + gc_opt: ConsensusPruningOptions, } impl Persistence { @@ -531,58 +579,73 @@ impl Persistence { } #[tracing::instrument(skip(self))] - async fn prune(&self, view: ViewNumber) -> anyhow::Result<()> { - let view = view.u64().saturating_sub(self.view_retention) as i64; - if view == 0 { - // Nothing to prune, the entire chain is younger than the retention period. - return Ok(()); - } - + async fn prune(&self, cur_view: ViewNumber) -> anyhow::Result<()> { let mut tx = self.db.write().await?; - let res = query("DELETE FROM anchor_leaf WHERE view < $1") - .bind(view) - .execute(tx.as_mut()) - .await - .context("deleting old anchor leaves")?; - tracing::debug!("garbage collecting {} leaves", res.rows_affected()); + // Prune everything older than the target retention period. + prune_to_view( + &mut tx, + cur_view.u64().saturating_sub(self.gc_opt.target_retention), + ) + .await?; - let res = query("DELETE FROM vid_share WHERE view < $1") - .bind(view) - .execute(tx.as_mut()) - .await - .context("deleting old VID shares")?; - tracing::debug!("garbage collecting {} VID shares", res.rows_affected()); + // Check our storage usage; if necessary we will prune more aggressively (up to the minimum + // retention) to get below the target usage. + let table_sizes = PRUNE_TABLES + .iter() + .map(|table| format!("pg_table_size('{table}')")) + .join(" + "); + let usage_query = format!("SELECT {table_sizes}"); + let (usage,): (i64,) = query_as(&usage_query).fetch_one(tx.as_mut()).await?; + tracing::debug!(usage, "consensus storage usage after pruning"); + + if (usage as u64) > self.gc_opt.target_usage { + tracing::warn!( + usage, + gc_opt = ?self.gc_opt, + "consensus storage is running out of space, pruning to minimum retention" + ); + prune_to_view( + &mut tx, + cur_view.u64().saturating_sub(self.gc_opt.minimum_retention), + ) + .await?; + } - let res = query("DELETE FROM da_proposal WHERE view < $1") - .bind(view) - .execute(tx.as_mut()) - .await - .context("deleting old DA proposals")?; - tracing::debug!("garbage collecting {} DA proposals", res.rows_affected()); + tx.commit().await + } +} - let res = query("DELETE FROM quorum_proposals WHERE view < $1") - .bind(view) - .execute(tx.as_mut()) - .await - .context("deleting old quorum proposals")?; - tracing::debug!( - "garbage collecting {} quorum proposals", - res.rows_affected() - ); +const PRUNE_TABLES: &[&str] = &[ + "anchor_leaf", + "vid_share", + "da_proposal", + "quorum_proposals", + "quorum_certificate", +]; + +async fn prune_to_view(tx: &mut Transaction, view: u64) -> anyhow::Result<()> { + if view == 0 { + // Nothing to prune, the entire chain is younger than the retention period. + return Ok(()); + } + tracing::debug!(view, "pruning consensus storage"); - let res = query("DELETE FROM quorum_certificate WHERE view < $1") - .bind(view) + for table in PRUNE_TABLES { + let res = query(&format!("DELETE FROM {table} WHERE view < $1")) + .bind(view as i64) .execute(tx.as_mut()) .await - .context("deleting old quorum certificates")?; - tracing::debug!( - "garbage collecting {} quorum certificates", - res.rows_affected() - ); - - tx.commit().await + .context(format!("pruning {table}"))?; + if res.rows_affected() > 0 { + tracing::info!( + "garbage collected {} rows from {table}", + res.rows_affected() + ); + } } + + Ok(()) } #[async_trait] @@ -1113,7 +1176,8 @@ mod testing { TmpDb::init().await } - fn options(db: &Self::Storage) -> impl PersistenceOptions { + #[allow(refining_impl_trait)] + fn options(db: &Self::Storage) -> Options { Options { port: Some(db.port()), host: Some(db.host()), @@ -1139,11 +1203,11 @@ mod generic_tests { mod test { use super::*; use crate::{persistence::testing::TestablePersistence, BLSPubKey, PubKey}; - use espresso_types::{NodeState, ValidatedState}; + use espresso_types::{traits::NullEventConsumer, NodeState, ValidatedState}; use futures::stream::TryStreamExt; use hotshot_example_types::node_types::TestVersions; use hotshot_types::{ - traits::{signature_key::SignatureKey, EncodeBytes}, + traits::{block_contents::vid_commitment, signature_key::SignatureKey, EncodeBytes}, vid::vid_scheme, }; use jf_vid::VidScheme; @@ -1325,4 +1389,144 @@ mod test { .unwrap() ); } + + /// Test conditions that trigger pruning. + /// + /// This is a configurable test that can be used to test different configurations of GC, + /// `pruning_opt`. The test populates the database with some data for view 1, asserts that it is + /// retained for view 2, and then asserts that it is pruned by view 3. There are various + /// different configurations that can achieve this behavior, such that the data is retained and + /// then pruned due to different logic and code paths. + async fn test_pruning_helper(pruning_opt: ConsensusPruningOptions) { + setup_test(); + + let tmp = Persistence::tmp_storage().await; + let mut opt = Persistence::options(&tmp); + opt.consensus_pruning = pruning_opt; + let storage = opt.create().await.unwrap(); + + let data_view = ViewNumber::new(1); + + // Populate some data. + let leaf = Leaf::genesis(&ValidatedState::default(), &NodeState::mock()).await; + let leaf_payload = leaf.block_payload().unwrap(); + let leaf_payload_bytes_arc = leaf_payload.encode(); + + let disperse = vid_scheme(2) + .disperse(leaf_payload_bytes_arc.clone()) + .unwrap(); + let payload_commitment = vid_commitment(&leaf_payload_bytes_arc, 2); + let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1); + let vid = VidDisperseShare:: { + view_number: data_view, + payload_commitment, + share: disperse.shares[0].clone(), + common: disperse.common, + recipient_key: pubkey, + } + .to_proposal(&privkey) + .unwrap() + .clone(); + let quorum_proposal = QuorumProposal:: { + block_header: leaf.block_header().clone(), + view_number: data_view, + justify_qc: QuorumCertificate::genesis::( + &ValidatedState::default(), + &NodeState::mock(), + ) + .await, + upgrade_certificate: None, + proposal_certificate: None, + }; + let quorum_proposal_signature = + BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap()) + .expect("Failed to sign quorum proposal"); + let quorum_proposal = Proposal { + data: quorum_proposal, + signature: quorum_proposal_signature, + _pd: Default::default(), + }; + + let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc) + .expect("Failed to sign block payload"); + let da_proposal = Proposal { + data: DaProposal:: { + encoded_transactions: leaf_payload_bytes_arc.clone(), + metadata: leaf_payload.ns_table().clone(), + view_number: data_view, + }, + signature: block_payload_signature, + _pd: Default::default(), + }; + + tracing::info!(?vid, ?da_proposal, ?quorum_proposal, "append data"); + storage.append_vid(&vid).await.unwrap(); + storage + .append_da(&da_proposal, payload_commitment) + .await + .unwrap(); + storage + .append_quorum_proposal(&quorum_proposal) + .await + .unwrap(); + + // The first decide doesn't trigger any garbage collection, even though our usage exceeds + // the target, because of the minimum retention. + tracing::info!("decide view 1"); + storage + .append_decided_leaves(data_view + 1, [], &NullEventConsumer) + .await + .unwrap(); + assert_eq!( + storage.load_vid_share(data_view).await.unwrap().unwrap(), + vid + ); + assert_eq!( + storage.load_da_proposal(data_view).await.unwrap().unwrap(), + da_proposal + ); + assert_eq!( + storage.load_quorum_proposal(data_view).await.unwrap(), + quorum_proposal + ); + + // After another view, our data is beyond the minimum retention (though not the target + // retention) so it gets pruned. + tracing::info!("decide view 2"); + storage + .append_decided_leaves(data_view + 2, [], &NullEventConsumer) + .await + .unwrap(); + assert!(storage.load_vid_share(data_view).await.unwrap().is_none(),); + assert!(storage.load_da_proposal(data_view).await.unwrap().is_none()); + storage.load_quorum_proposal(data_view).await.unwrap_err(); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_pruning_minimum_retention() { + test_pruning_helper(ConsensusPruningOptions { + // Use a very low target usage, to show that we still retain data up to the minimum + // retention even when usage is above target. + target_usage: 0, + minimum_retention: 1, + // Use a very high target retention, so that pruning is only triggered by the minimum + // retention. + target_retention: u64::MAX, + }) + .await + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_pruning_target_retention() { + test_pruning_helper(ConsensusPruningOptions { + target_retention: 1, + // Use a very low minimum retention, so that data is only kept around due to the target + // retention. + minimum_retention: 0, + // Use a very high target usage, so that pruning is only triggered by the target + // retention. + target_usage: u64::MAX, + }) + .await + } }