Skip to content
12 changes: 6 additions & 6 deletions bin/citrea/src/rollup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
rpc_module: RpcModule<()>,
backup_manager: Arc<BackupManager>,
) -> Result<(
FullNodeL2Syncer<Self::DaService, LedgerDB>,
FullNodeL1BlockHandler<Self::Vm, Self::DaService, LedgerDB>,
FullNodeL2Syncer<Self::DaService>,
FullNodeL1BlockHandler<Self::Vm, Self::DaService>,
Option<PrunerService>,
RpcModule<()>,
)> {
Expand Down Expand Up @@ -309,9 +309,9 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
rpc_module: RpcModule<()>,
backup_manager: Arc<BackupManager>,
) -> Result<(
BatchProverL2Syncer<Self::DaService, LedgerDB>,
BatchProverL1Syncer<Self::DaService, LedgerDB>,
Prover<Self::DaService, LedgerDB, Self::Vm>,
BatchProverL2Syncer<Self::DaService>,
BatchProverL1Syncer<Self::DaService>,
Prover<Self::DaService, Self::Vm>,
RpcModule<()>,
)> {
let runner_config = rollup_config.runner.expect("Runner config is missing");
Expand Down Expand Up @@ -381,7 +381,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
rpc_module: RpcModule<()>,
backup_manager: Arc<BackupManager>,
) -> Result<(
LightClientProverL1BlockHandler<Self::Vm, Self::DaService, LedgerDB>,
LightClientProverL1BlockHandler<Self::Vm, Self::DaService>,
RpcModule<()>,
)>
where
Expand Down
20 changes: 17 additions & 3 deletions bin/citrea/tests/bitcoin/light_client_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,14 @@ impl TestCase for LightClientProvingTestMultipleProofs {
full_node
.wait_for_l1_height(batch_proof_l1_height, Some(TEN_MINS))
.await?;
let batch_proofs = wait_for_zkproofs(full_node, batch_proof_l1_height, None, 2).await?;
let batch_proofs = wait_for_zkproofs(
full_node,
batch_proof_l1_height,
Some(Duration::from_secs(30)),
2,
)
.await
.unwrap();
assert_eq!(batch_proofs.len(), 2);

// Wait for light client prover to process batch proofs.
Expand Down Expand Up @@ -438,7 +445,7 @@ impl TestCase for LightClientProvingTestMultipleProofs {
.await
.unwrap();

let response = wait_for_prover_job(batch_prover, job_ids[0], None)
let response = wait_for_prover_job(batch_prover, job_ids[0], Some(Duration::from_secs(30)))
.await
.unwrap();
assert_eq!(response.commitments.len(), 1);
Expand All @@ -450,7 +457,14 @@ impl TestCase for LightClientProvingTestMultipleProofs {
full_node
.wait_for_l1_height(batch_proof_l1_height, Some(TEN_MINS))
.await?;
let batch_proofs = wait_for_zkproofs(full_node, batch_proof_l1_height, None, 1).await?;
let batch_proofs = wait_for_zkproofs(
full_node,
batch_proof_l1_height,
Some(Duration::from_secs(30)),
1,
)
.await
.unwrap();
assert_eq!(batch_proofs.len(), 1);

// Wait for light client prover to process batch proofs.
Expand Down
62 changes: 40 additions & 22 deletions crates/batch-prover/src/l1_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Instant;

use anyhow::Context;
use citrea_common::backup::BackupManager;
use citrea_common::cache::L1BlockCache;
use citrea_common::da::{extract_sequencer_commitments, sync_l1};
use citrea_common::RollupPublicKeys;
use reth_tasks::shutdown::GracefulShutdown;
use sov_db::ledger_db::BatchProverLedgerOps;
use sov_db::ledger_db::{LedgerDB, SchemaBatch};
use sov_db::schema::tables::{
CommitmentIndicesByL1, ProverLastScannedSlot, ProverPendingCommitments,
SequencerCommitmentByIndex, ShortHeaderProofBySlotHash, SlotByHash,
};
use sov_db::schema::types::SlotNumber;
use sov_modules_api::DaSpec;
use sov_rollup_interface::da::BlockHeaderTrait;
Expand All @@ -32,13 +37,12 @@ use crate::metrics::BATCH_PROVER_METRICS as BPM;
/// - Processing sequencer commitments
/// - Maintaining block processing order
/// - Managing the backup state
pub struct L1Syncer<Da, DB>
pub struct L1Syncer<Da>
where
Da: DaService,
DB: BatchProverLedgerOps,
{
/// Database for ledger operations
ledger_db: DB,
ledger_db: LedgerDB,
/// Data availability service instance
da_service: Arc<Da>,
/// Sequencer's DA public key for verifying commitments
Expand All @@ -55,10 +59,9 @@ where
l1_signal_tx: mpsc::Sender<()>,
}

impl<Da, DB> L1Syncer<Da, DB>
impl<Da> L1Syncer<Da>
where
Da: DaService,
DB: BatchProverLedgerOps + Clone + 'static,
{
/// Creates a new instance of `L1Syncer`
///
Expand All @@ -72,7 +75,7 @@ where
/// * `l1_signal_tx` - A channel sender to signal prover module when new L1 blocks are processed.
#[allow(clippy::too_many_arguments)]
pub fn new(
ledger_db: DB,
ledger_db: LedgerDB,
da_service: Arc<Da>,
public_keys: RollupPublicKeys,
scan_l1_start_height: u64,
Expand Down Expand Up @@ -103,7 +106,7 @@ where
pub async fn run(mut self, mut shutdown_signal: GracefulShutdown) {
let l1_start_height = self
.ledger_db
.get_last_scanned_l1_height()
.get::<ProverLastScannedSlot>(())
.expect("Failed to get last scanned l1 height when starting l1 syncer")
.map(|h| h.0)
.unwrap_or(self.scan_l1_start_height);
Expand Down Expand Up @@ -150,6 +153,8 @@ where
// don't ping if no new l1 blocks
let should_ping = !pending_l1_blocks.is_empty();

let mut schema_batch = SchemaBatch::new();

// process all the pending l1 blocks
while !pending_l1_blocks.is_empty() {
let l1_block = pending_l1_blocks
Expand All @@ -160,17 +165,17 @@ where
let l1_hash = l1_block.header().hash().into();

// Set the l1 height of the l1 hash
self.ledger_db
.set_l1_height_of_l1_hash(l1_hash, l1_height)
schema_batch
.put::<SlotByHash>(&l1_hash, &SlotNumber(l1_height))
.unwrap();

// Set short header proof
let short_header_proof: <<Da as DaService>::Spec as DaSpec>::ShortHeaderProof =
Da::block_to_short_header_proof(l1_block.clone());
self.ledger_db
.put_short_header_proof_by_l1_hash(
schema_batch
.put::<ShortHeaderProofBySlotHash>(
&l1_hash,
borsh::to_vec(&short_header_proof)
&borsh::to_vec(&short_header_proof)
.expect("Should serialize short header proof"),
)
.expect("Should save short header proof to ledger db");
Expand All @@ -190,7 +195,7 @@ where
continue;
}

match self.ledger_db.get_commitment_by_index(index)? {
match self.ledger_db.get::<SequencerCommitmentByIndex>(index)? {
Some(db_commitment) => {
if commitment != &db_commitment {
error!("Found duplicate commitment index with different data\nDA: {:?}\nDB:{:?}", commitment, db_commitment);
Expand All @@ -204,22 +209,30 @@ where
index, l1_height
);

self.ledger_db
.put_commitment_by_index(commitment)
schema_batch
.put::<SequencerCommitmentByIndex>(&commitment.index, commitment)
.expect("Should store commitment");
self.ledger_db
.put_commitment_index_by_l1(SlotNumber(l1_height), index)

// put commitment index by l1
let mut indices = self
.ledger_db
.get::<CommitmentIndicesByL1>(SlotNumber(l1_height))?
.unwrap_or_default();
indices.push(index);
schema_batch
.put::<CommitmentIndicesByL1>(&SlotNumber(l1_height), &indices)
.expect("Should put commitment index by l1");
self.ledger_db
.put_prover_pending_commitment(index)

schema_batch
.put::<ProverPendingCommitments>(&index, &())
.expect("Should set commitment status to pending");
}
}
}

// Set last scanned l1 height
self.ledger_db
.set_last_scanned_l1_height(SlotNumber(l1_height))
schema_batch
.put::<ProverLastScannedSlot>(&(), &SlotNumber(l1_height))
.expect("Should put prover last scanned l1 height");

BPM.current_l1_block.set(l1_height as f64);
Expand All @@ -234,6 +247,11 @@ where
info!("Processed L1 block {}", l1_height);
}

// Commit all changes to the ledger db
self.ledger_db
.write_schemas(schema_batch)
.context("Failed to write schema batch to ledger db")?;

if should_ping {
// signal that new l1 blocks are processed
if let Err(e) = self.l1_signal_tx.try_send(()) {
Expand Down
32 changes: 16 additions & 16 deletions crates/batch-prover/src/l2_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ use backoff::ExponentialBackoff;
use borsh::BorshDeserialize;
use citrea_common::backup::BackupManager;
use citrea_common::cache::L1BlockCache;
use citrea_common::l2::{apply_l2_block, commit_l2_block, sync_l2};
use citrea_common::l2::{apply_l2_block, sync_l2};
use citrea_primitives::types::L2BlockHash;
use citrea_stf::runtime::CitreaRuntime;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use reth_tasks::shutdown::GracefulShutdown;
use sov_db::ledger_db::BatchProverLedgerOps;
use sov_db::ledger_db::{LedgerDB, SharedLedgerOps};
use sov_db::schema::tables::ProverStateDiffs;
use sov_db::schema::types::L2BlockNumber;
use sov_keys::default_signature::K256PublicKey;
use sov_modules_api::default_context::DefaultContext;
Expand All @@ -40,10 +41,9 @@ use crate::{InitParams, RollupPublicKeys, RunnerConfig};
/// - Validating block signatures and contents
/// - Processing blocks to update the local state
/// - Managing forks and state transitions
pub struct L2Syncer<DA, DB>
pub struct L2Syncer<DA>
where
DA: DaService,
DB: BatchProverLedgerOps + Clone,
{
/// Starting height for L2 block synchronization
start_l2_height: u64,
Expand All @@ -54,7 +54,7 @@ where
/// Manager for prover storage
storage_manager: ProverStorageManager,
/// Database for ledger operations
ledger_db: DB,
ledger_db: LedgerDB,
/// Current state root hash
state_root: StorageRootHash,
/// Current L2 block hash
Expand All @@ -77,10 +77,9 @@ where
backup_manager: Arc<BackupManager>,
}

impl<DA, DB> L2Syncer<DA, DB>
impl<DA> L2Syncer<DA>
where
DA: DaService,
DB: BatchProverLedgerOps + Clone + Send + Sync + 'static,
{
/// Creates a new `L2Syncer` instance.
///
Expand All @@ -103,7 +102,7 @@ where
stf: StfBlueprint<DefaultContext, DA::Spec, CitreaRuntime<DefaultContext, DA::Spec>>,
public_keys: RollupPublicKeys,
da_service: Arc<DA>,
ledger_db: DB,
ledger_db: LedgerDB,
storage_manager: ProverStorageManager,
fork_manager: ForkManager<'static>,
l2_block_tx: broadcast::Sender<u64>,
Expand Down Expand Up @@ -211,16 +210,17 @@ where
)
.await?;

// Save state diff BEFORE committing the L2 block
// This prevents race conditions where the batch prover might shut down
// between committing the L2 block and saving the state diff
self.ledger_db
.set_l2_state_diff(L2BlockNumber(applied.l2_height), applied.state_diff.clone())?;

let l2_height = applied.l2_height;
let state_root = applied.state_root;

commit_l2_block(&self.ledger_db, applied)?;
let state_diff = applied.state_diff.clone();

let mut schema_batch = self.ledger_db.commit_l2_block(
applied.l2_block,
applied.tx_hashes,
applied.tx_bodies,
)?;
schema_batch.put::<ProverStateDiffs>(&L2BlockNumber(l2_height), &state_diff)?;
self.ledger_db.write_schemas(schema_batch)?;

let process_duration = Instant::now()
.saturating_duration_since(start)
Expand Down
16 changes: 5 additions & 11 deletions crates/batch-prover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub use l2_syncer::L2Syncer;
pub use partition::PartitionMode;
use prover::Prover;
use prover_services::ParallelProverService;
use sov_db::ledger_db::BatchProverLedgerOps;
use sov_db::ledger_db::LedgerDB;
use sov_modules_api::default_context::DefaultContext;
use sov_modules_api::fork::ForkManager;
use sov_modules_api::{SpecId, Zkvm};
Expand Down Expand Up @@ -97,7 +97,7 @@ pub mod rpc;
/// - `Prover` for handling the proving process.
/// - `RpcModule` configured with the necessary RPC methods.
#[allow(clippy::type_complexity, clippy::too_many_arguments)]
pub async fn build_services<DA, DB, Vm>(
pub async fn build_services<DA, Vm>(
network: Network,
prover_config: BatchProverConfig,
runner_config: RunnerConfig,
Expand All @@ -111,29 +111,23 @@ pub async fn build_services<DA, DB, Vm>(
public_keys: RollupPublicKeys,
da_service: Arc<DA>,
prover_service: Arc<ParallelProverService<DA, Vm>>,
ledger_db: DB,
ledger_db: LedgerDB,
storage_manager: ProverStorageManager,
l2_block_tx: broadcast::Sender<u64>,
fork_manager: ForkManager<'static>,
code_commitments: HashMap<SpecId, <Vm as Zkvm>::CodeCommitment>,
elfs: HashMap<SpecId, Vec<u8>>,
rpc_module: RpcModule<()>,
backup_manager: Arc<BackupManager>,
) -> Result<(
L2Syncer<DA, DB>,
L1Syncer<DA, DB>,
Prover<DA, DB, Vm>,
RpcModule<()>,
)>
) -> Result<(L2Syncer<DA>, L1Syncer<DA>, Prover<DA, Vm>, RpcModule<()>)>
where
DA: DaService,
DB: BatchProverLedgerOps + Clone + 'static,
Vm: ZkvmHost + Zkvm + 'static,
{
let l1_block_cache = Arc::new(Mutex::new(L1BlockCache::new()));
let (request_tx, request_rx) = mpsc::channel(4);

let rpc_context = rpc::create_rpc_context::<_, _, Vm>(
let rpc_context = rpc::create_rpc_context::<_, Vm>(
ledger_db.clone(),
request_tx,
da_service.clone(),
Expand Down
Loading