Skip to content

Commit

Permalink
feat(rooch-proposer): add last proposed block repair logic (#3104)
Browse files Browse the repository at this point in the history
## Summary

Add a mechanism to repair the last proposed block inconsistencies during SCC initialization. This ensures better handling of rollback/revert scenarios and maintains proper state alignment in the background submission process.

- Closes #3073
  • Loading branch information
popcnt1 authored Dec 25, 2024
1 parent c4c6b87 commit b37107d
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 14 deletions.
1 change: 1 addition & 0 deletions crates/rooch-proposer/src/actor/proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl ProposerActor {
rooch_store.set_last_proposed(init_offset - 1)?;
}
};

let scc = StateCommitmentChain::new(rooch_store, moveos_store)?;

Ok(Self {
Expand Down
26 changes: 26 additions & 0 deletions crates/rooch-proposer/src/scc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct StateCommitmentChain {
impl StateCommitmentChain {
/// Create a new SCC
pub fn new(rooch_store: RoochStore, moveos_store: MoveOSStore) -> anyhow::Result<Self> {
Self::repair_last_proposed(rooch_store.clone())?;

let last_proposed_block_number = rooch_store.get_last_proposed()?;

let last_proposed_block_accumulator_root: H256 = match last_proposed_block_number {
Expand All @@ -45,6 +47,30 @@ impl StateCommitmentChain {
})
}

// last_proposed may beyond the DA submitted caused by manual rollback/revert
// we need to repair the last proposed block number
// invoke it when new scc is created
fn repair_last_proposed(rooch_store: RoochStore) -> anyhow::Result<()> {
let last_proposed_block_number = rooch_store.get_last_proposed()?;
if last_proposed_block_number.is_none() {
return Ok(());
}
let last_proposed = last_proposed_block_number.unwrap();

let background_submit_block_cursor = rooch_store.get_background_submit_block_cursor()?;
match background_submit_block_cursor {
Some(background_submit_block_cursor) => {
if background_submit_block_cursor < last_proposed {
rooch_store.set_last_proposed(background_submit_block_cursor)?;
}
}
None => {
rooch_store.clear_last_proposed()?;
}
}
Ok(())
}

#[allow(dead_code)]
fn get_block(&self, block_number: u128) -> anyhow::Result<Block> {
match self.last_proposed_block_number {
Expand Down
49 changes: 36 additions & 13 deletions crates/rooch-store/src/da_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl DAMetaDBStore {
} else {
Some(min_block_number_wait_rm - 1)
};
let last_block_number_wait_rm = *remove_blocks.last().unwrap();

let inner_store = self.block_submit_state_store.get_store().store();
let mut cf_batches: Vec<WriteBatchCF> = Vec::new();
Expand All @@ -148,6 +149,20 @@ impl DAMetaDBStore {
cf_name: DA_BLOCK_CURSOR_COLUMN_FAMILY_NAME.to_string(),
};
cf_batches.push(last_block_batch);

// update background_submit_block_cursor
let background_submit_block_cursor = self.get_background_submit_block_cursor()?;
if let Some(background_submit_block_cursor) = background_submit_block_cursor {
if background_submit_block_cursor > new_last_block_number {
cf_batches.push(WriteBatchCF {
batch: WriteBatch::new_with_rows(vec![(
to_bytes(BACKGROUND_SUBMIT_BLOCK_CURSOR_KEY).unwrap(),
WriteOp::Value(to_bytes(&new_last_block_number).unwrap()),
)]),
cf_name: DA_BLOCK_CURSOR_COLUMN_FAMILY_NAME.to_string(),
});
}
}
}
None => {
let last_block_batch = WriteBatchCF {
Expand All @@ -158,22 +173,29 @@ impl DAMetaDBStore {
cf_name: DA_BLOCK_CURSOR_COLUMN_FAMILY_NAME.to_string(),
};
cf_batches.push(last_block_batch);

// If no block left, remove background_submit_block_cursor directly
cf_batches.push(WriteBatchCF {
batch: WriteBatch::new_with_rows(vec![(
to_bytes(BACKGROUND_SUBMIT_BLOCK_CURSOR_KEY).unwrap(),
WriteOp::Deletion,
)]),
cf_name: DA_BLOCK_CURSOR_COLUMN_FAMILY_NAME.to_string(),
});
}
}
// remove background_submit_block_cursor directly, since we could catch up with the last order by background submitter
// will just ignore the blocks that have been submitted
cf_batches.push(WriteBatchCF {
batch: WriteBatch::new_with_rows(vec![(
to_bytes(BACKGROUND_SUBMIT_BLOCK_CURSOR_KEY).unwrap(),
WriteOp::Deletion,
)]),
cf_name: DA_BLOCK_CURSOR_COLUMN_FAMILY_NAME.to_string(),
});

inner_store.write_cf_batch(cf_batches, true)
inner_store.write_cf_batch(cf_batches, true)?;
tracing::info!(
"rollback to block {:?} successfully, removed blocks: [{},{}]",
remove_blocks,
min_block_number_wait_rm,
last_block_number_wait_rm
);
Ok(())
}

// generate the blocks need to be removed by tx_order_end > last_order
// generate the block need to be removed by tx_order_end > last_order
pub(crate) fn generate_remove_blocks_after_order(
&self,
last_block_number: Option<u128>,
Expand Down Expand Up @@ -377,9 +399,10 @@ impl DAMetaDBStore {
Some(last_block_number),
last_order,
)?;
issues += remove_blocks.len();
fixed += remove_blocks.len();
let remove_blocks_len = remove_blocks.len();
issues += remove_blocks_len;
self.inner_rollback(remove_blocks)?;
fixed += remove_blocks_len;
self.try_repair_blocks(last_order, issues, fixed)
}
Ordering::Equal => Ok((issues, fixed)),
Expand Down
2 changes: 1 addition & 1 deletion crates/rooch-store/src/tests/test_da_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn get_submitting_blocks() {
}

#[tokio::test]
async fn rollback_to_last_tx_order() {
async fn generate_remove_blocks() {
let (rooch_store, _) = RoochStore::mock_rooch_store().unwrap();
let da_meta_store = rooch_store.get_da_meta_store();

Expand Down

0 comments on commit b37107d

Please sign in to comment.