Skip to content

Commit

Permalink
Reduce risk of losing data due to garbage collection or restarts (#2252)
Browse files Browse the repository at this point in the history
* Make persistence garbage collection more conservative.

Previously, we would delete all data from consensus storage up to
the decided view, with each new decide. In the case where we miss a
decide event, this can cause us to delete the associated data (e.g.
DA, VID proposals) from a view that we never actually moved to
archival storage. This in turn makes it much harder to use any
guarantees from consensus about certain nodes posessing certain data
after a decide, because we are garbage collecting such data.

One particularly bad case is about DA nodes. Consensus guarantees
that some honest DA nodes will see every DA proposal. However, if
those nodes all restart at the same time shortly after seeing a DA
proposal, they may miss the corresonding quorum proposal and decide.
Then, when they restart, they may end up garbage collecting that DA
proposal before anyone realizes that it did get decided, and now no
one has the data.

The new technique is more conservative. We only garbage collect
specific views or ranges of views for which we know we have successfully
processed all decide events. Other data, whether it is for views that
never decided or for views where we missed a decide (we cannot
immediately tell the difference) will be retained indefinitely. This
ensures we never lose data before it is archived, and allows us to
manually rebuild an incomplete archive after it is discovered.

It also enables us to implement a catchup data source that pulls
from this store of undecided, un-garbage-collected data, so that the
archive can automatically rebuild itself.

Of course, indefinitely retaining data which may not even have been
decided is undesirable. The next commit will add pruning, so that
all data is deleted after a certain number of views, even if we never
archived it. Then the guarantee will be: we can always recover a
complete archive, based on the guarantees of consensus, as long as
recover completes within a certain (configurable) time period.

* Implement pruning for old, undecided consensus data

* Implement provider traits for consensus persistence

* Regression test for deadlock

* Use consensus storage as fetching provider when running query service

* Remove dyn-clone, which is no longer necessary

* Bump query service

* Better configurability for consensus storage GC

Add target, minimum retentions and target usage, like the archival
pruner. This allows us to take full advantage of the storage space
if we have it, keeping data around for longer, while still ensuring
we keep it around long *enough* even if we are low on space.

* Log return value of background tasks

* Update lock file

* Tag query service

* Update lock file
  • Loading branch information
jbearer authored Dec 10, 2024
1 parent 979cfb3 commit a4c67eb
Show file tree
Hide file tree
Showing 17 changed files with 1,369 additions and 283 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ cld = "0.5"
derive_more = { version = "1.0", features = ["full"] }
es-version = { git = "https://github.com/EspressoSystems/es-version.git", branch = "main" }
dotenvy = "0.15"
dyn-clone = "1.0"
ethers = { version = "2.0", features = ["solc", "ws"] }
futures = "0.3"
tokio = { version = "1", default-features = false, features = [
Expand Down
2 changes: 0 additions & 2 deletions sequencer-sqlite/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ csv = "1"
derivative = "2.2"
derive_more = { workspace = true }
dotenvy = { workspace = true }
dyn-clone = { workspace = true }
espresso-types = { path = "../types" }
ethers = { workspace = true }
futures = { workspace = true }
Expand Down
21 changes: 21 additions & 0 deletions sequencer/api/migrations/postgres/V401__archive_provider.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Add information needed for consensus storage to act as a provider for archive recovery.

-- Add payload hash to DA proposal, since the query service requests missing payloads by hash.
ALTER TABLE da_proposal
ADD COLUMN payload_hash VARCHAR;
CREATE INDEX da_proposal_payload_hash_idx ON da_proposal (payload_hash);

-- Add payload hash to VID share, since the query service requests missing VID common by payload
-- hash.
ALTER TABLE vid_share
ADD COLUMN payload_hash VARCHAR;
CREATE INDEX vid_share_payload_hash_idx ON vid_share (payload_hash);

-- Add QC storage, since the query service requires missing leaves to be fetched alongside a QC with
-- that leaf hash.
CREATE TABLE quorum_certificate (
view BIGINT PRIMARY KEY,
leaf_hash VARCHAR NOT NULL,
data BYTEA NOT NULL
);
CREATE INDEX quorum_certificate_leaf_hash_idx ON quorum_certificate (leaf_hash);
21 changes: 21 additions & 0 deletions sequencer/api/migrations/sqlite/V201__archive_provider.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Add information needed for consensus storage to act as a provider for archive recovery.

-- Add payload hash to DA proposal, since the query service requests missing payloads by hash.
ALTER TABLE da_proposal
ADD COLUMN payload_hash VARCHAR;
CREATE INDEX da_proposal_payload_hash_idx ON da_proposal (payload_hash);

-- Add payload hash to VID share, since the query service requests missing VID common by payload
-- hash.
ALTER TABLE vid_share
ADD COLUMN payload_hash VARCHAR;
CREATE INDEX vid_share_payload_hash_idx ON vid_share (payload_hash);

-- Add QC storage, since the query service requires missing leaves to be fetched alongside a QC with
-- that leaf hash.
CREATE TABLE quorum_certificate (
view BIGINT PRIMARY KEY,
leaf_hash VARCHAR NOT NULL,
data BLOB NOT NULL
);
CREATE INDEX quorum_certificate_leaf_hash_idx ON quorum_certificate (leaf_hash);
3 changes: 3 additions & 0 deletions sequencer/api/public-env-vars.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ variables = [
"ESPRESSO_SEQUENCER_CATCHUP_MAX_RETRY_DELAY",
"ESPRESSO_SEQUENCER_CDN_ENDPOINT",
"ESPRESSO_SEQUENCER_CHUNK_FETCH_DELAY",
"ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_MINIMUM_RETENTION",
"ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_RETENTION",
"ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_USAGE",
"ESPRESSO_SEQUENCER_FETCH_RATE_LIMIT",
"ESPRESSO_SEQUENCER_HOTSHOT_ADDRESS",
"ESPRESSO_SEQUENCER_HOTSHOT_EVENT_STREAMING_API_PORT",
Expand Down
188 changes: 181 additions & 7 deletions sequencer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,10 +1066,15 @@ mod api_tests {
};
use hotshot_types::drb::{INITIAL_DRB_RESULT, INITIAL_DRB_SEED_INPUT};
use hotshot_types::{
data::QuorumProposal2, event::LeafInfo, simple_certificate::QuorumCertificate,
traits::node_implementation::ConsensusTime,
data::{DaProposal, QuorumProposal2, VidDisperseShare},
event::LeafInfo,
message::Proposal,
simple_certificate::QuorumCertificate,
traits::{node_implementation::ConsensusTime, signature_key::SignatureKey, EncodeBytes},
vid::vid_scheme,
};

use jf_vid::VidScheme;
use portpicker::pick_unused_port;
use sequencer_utils::test_utils::setup_test;
use std::fmt::Debug;
Expand Down Expand Up @@ -1226,6 +1231,7 @@ mod api_tests {
}

setup_test();
let (pubkey, privkey) = PubKey::generated_from_seed_indexed([0; 32], 1);

let storage = D::create_storage().await;
let persistence = D::persistence_options(&storage).create().await.unwrap();
Expand All @@ -1240,11 +1246,13 @@ mod api_tests {
// Create two non-consecutive leaf chains.
let mut chain1 = vec![];

let genesis = Leaf::genesis(&Default::default(), &NodeState::mock()).await;
let payload = genesis.block_payload().unwrap();
let payload_bytes_arc = payload.encode();
let disperse = vid_scheme(2).disperse(payload_bytes_arc.clone()).unwrap();
let payload_commitment = disperse.commit;
let mut quorum_proposal = QuorumProposal2::<SeqTypes> {
block_header: Leaf::genesis(&Default::default(), &NodeState::mock())
.await
.block_header()
.clone(),
block_header: genesis.block_header().clone(),
view_number: ViewNumber::genesis(),
justify_qc: QuorumCertificate::genesis::<MockSequencerVersions>(
&ValidatedState::default(),
Expand Down Expand Up @@ -1274,6 +1282,50 @@ mod api_tests {
qc.data.leaf_commit = Committable::commit(&leaf);
justify_qc = qc.clone();
chain1.push((leaf.clone(), qc.clone()));

// Include a quorum proposal for each leaf.
let quorum_proposal_signature =
PubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
.expect("Failed to sign quorum_proposal");
persistence
.append_quorum_proposal(&Proposal {
data: quorum_proposal.clone(),
signature: quorum_proposal_signature,
_pd: Default::default(),
})
.await
.unwrap();

// Include VID information for each leaf.
let share = VidDisperseShare::<SeqTypes> {
view_number: leaf.view_number(),
payload_commitment,
share: disperse.shares[0].clone(),
common: disperse.common.clone(),
recipient_key: pubkey,
};
persistence
.append_vid(&share.to_proposal(&privkey).unwrap())
.await
.unwrap();

// Include payload information for each leaf.
let block_payload_signature =
PubKey::sign(&privkey, &payload_bytes_arc).expect("Failed to sign block payload");
let da_proposal_inner = DaProposal::<SeqTypes> {
encoded_transactions: payload_bytes_arc.clone(),
metadata: payload.ns_table().clone(),
view_number: leaf.view_number(),
};
let da_proposal = Proposal {
data: da_proposal_inner,
signature: block_payload_signature,
_pd: Default::default(),
};
persistence
.append_da(&da_proposal, payload_commitment)
.await
.unwrap();
}
// Split into two chains.
let mut chain2 = chain1.split_off(2);
Expand Down Expand Up @@ -1312,15 +1364,137 @@ mod api_tests {
.await
.unwrap();

// Check that the leaves were moved to archive storage.
// Check that the leaves were moved to archive storage, along with payload and VID
// information.
for (leaf, qc) in chain1.iter().chain(&chain2) {
tracing::info!(height = leaf.height(), "check archive");
let qd = data_source.get_leaf(leaf.height() as usize).await.await;
let stored_leaf: Leaf2 = qd.leaf().clone().into();
let stored_qc = qd.qc().clone().to_qc2();
assert_eq!(&stored_leaf, leaf);
assert_eq!(&stored_qc, qc);

data_source
.get_block(leaf.height() as usize)
.await
.try_resolve()
.ok()
.unwrap();
data_source
.get_vid_common(leaf.height() as usize)
.await
.try_resolve()
.ok()
.unwrap();

// Check that all data has been garbage collected for the decided views.
assert!(persistence
.load_da_proposal(leaf.view_number())
.await
.unwrap()
.is_none());
assert!(persistence
.load_vid_share(leaf.view_number())
.await
.unwrap()
.is_none());
assert!(persistence
.load_quorum_proposal(leaf.view_number())
.await
.is_err());
}

// Check that data has _not_ been garbage collected for the missing view.
assert!(persistence
.load_da_proposal(ViewNumber::new(2))
.await
.unwrap()
.is_some());
assert!(persistence
.load_vid_share(ViewNumber::new(2))
.await
.unwrap()
.is_some());
persistence
.load_quorum_proposal(ViewNumber::new(2))
.await
.unwrap();
}

#[tokio::test(flavor = "multi_thread")]
pub async fn test_decide_missing_data<D>()
where
D: TestableSequencerDataSource + Debug + 'static,
{
setup_test();

let storage = D::create_storage().await;
let persistence = D::persistence_options(&storage).create().await.unwrap();
let data_source: Arc<StorageState<network::Memory, NoStorage, _, MockSequencerVersions>> =
Arc::new(StorageState::new(
D::create(D::persistence_options(&storage), Default::default(), false)
.await
.unwrap(),
ApiState::new(future::pending()),
));
let consumer = ApiEventConsumer::from(data_source.clone());

let mut qc = QuorumCertificate::genesis::<MockSequencerVersions>(
&ValidatedState::default(),
&NodeState::mock(),
)
.await
.to_qc2();
let leaf = Leaf::genesis(&ValidatedState::default(), &NodeState::mock()).await;

// Append the genesis leaf. We don't use this for the test, because the update function will
// automatically fill in the missing data for genesis. We just append this to get into a
// consistent state to then append the leaf from view 1, which will have missing data.
tracing::info!(?leaf, ?qc, "decide genesis leaf");
persistence
.append_decided_leaves(
leaf.view_number(),
[(&leaf_info(leaf.clone().into()), qc.clone())],
&consumer,
)
.await
.unwrap();

// Create another leaf, with missing data.
let mut block_header = leaf.block_header().clone();
*block_header.height_mut() += 1;
let qp = QuorumProposal2 {
block_header,
view_number: leaf.view_number() + 1,
justify_qc: qc.clone(),
upgrade_certificate: None,
view_change_evidence: None,
drb_seed: INITIAL_DRB_SEED_INPUT,
drb_result: INITIAL_DRB_RESULT,
};

let leaf = Leaf2::from_quorum_proposal(&qp);
qc.view_number = leaf.view_number();
qc.data.leaf_commit = Committable::commit(&leaf);

// Decide a leaf without the corresponding payload or VID.
tracing::info!(?leaf, ?qc, "append leaf 1");
persistence
.append_decided_leaves(
leaf.view_number(),
[(&leaf_info(leaf.clone()), qc)],
&consumer,
)
.await
.unwrap();

// Check that we still processed the leaf.
assert_eq!(
leaf,
data_source.get_leaf(1).await.await.leaf().clone().into()
);
assert!(data_source.get_vid_common(1).await.is_pending());
assert!(data_source.get_block(1).await.is_pending());
}

fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
Expand Down
23 changes: 15 additions & 8 deletions sequencer/src/api/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use anyhow::{bail, Context};
use clap::Parser;
use espresso_types::{
v0::traits::{EventConsumer, NullEventConsumer, SequencerPersistence},
v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, SequencerPersistence},
BlockMerkleTree, PubKey,
};
use futures::{
Expand All @@ -13,6 +13,7 @@ use futures::{
use hotshot_events_service::events::Error as EventStreamingError;
use hotshot_query_service::{
data_source::{ExtensibleDataSource, MetricsDataSource},
fetching::provider::QueryServiceProvider,
status::{self, UpdateStatusData},
ApiState as AppState, Error,
};
Expand All @@ -27,7 +28,7 @@ use vbs::version::StaticVersionType;

use super::{
data_source::{
provider, CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource,
provider, CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource, Provider,
SequencerDataSource, StateSignatureDataSource, SubmitDataSource,
},
endpoints, fs, sql,
Expand Down Expand Up @@ -333,12 +334,18 @@ impl Options {
N: ConnectedNetwork<PubKey>,
P: SequencerPersistence,
{
let ds = sql::DataSource::create(
mod_opt.clone(),
provider::<V>(query_opt.peers.clone(), bind_version),
false,
)
.await?;
let mut provider = Provider::default();

// Use the database itself as a fetching provider: sometimes we can fetch data that is
// missing from the query service from ephemeral consensus storage.
provider = provider.with_provider(mod_opt.clone().create().await?);
// If that fails, fetch missing data from peers.
for peer in query_opt.peers {
tracing::info!("will fetch missing data from {peer}");
provider = provider.with_provider(QueryServiceProvider::new(peer, bind_version));
}

let ds = sql::DataSource::create(mod_opt.clone(), provider, false).await?;
let (metrics, ds, mut app) = self
.init_app_modules(ds, state.clone(), bind_version)
.await?;
Expand Down
Loading

0 comments on commit a4c67eb

Please sign in to comment.