Skip to content

Commit

Permalink
fix(da): fix inconsistence issue between re-exec blocks locally and r…
Browse files Browse the repository at this point in the history
…ooch network (#3077)

* fix(rooch-executor): update log levels to info for reload MoveOS

Updated log levels in `ExecutorActor` and `ReaderExecutorActor` from `debug` to `info` for better visibility during gas upgrade events. This ensures important operational messages are more prominent in logs.

* feat(rooch-db): add get_execution_info_by_order command

Introduce a new command to retrieve execution info by order. This feature supports querying transaction-related data based on order numbers for added flexibility in database operations.

* fix(rooch-da): integrate event handling via EventBus for `rooch da exec` and other fix

1. add `EventBus` and `EventActor` support for event-driven architecture.
2. fix rollback issue caused by execution tx immediately after rollback. now we force restart process after rollback
3. fix restart issue caused by wrong next_block_number
4. add sequence_info in moveos_tx.ctx
5. add log improvements and refined block/transaction ordering logic.
  • Loading branch information
popcnt1 authored Dec 20, 2024
1 parent a49c450 commit 5eb829d
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 12 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/rooch-executor/src/actor/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ impl MoveFunctionCaller for ExecutorActor {
impl Handler<EventData> for ExecutorActor {
async fn handle(&mut self, message: EventData, _ctx: &mut ActorContext) -> Result<()> {
if let Ok(_gas_upgrade_msg) = message.data.downcast::<GasUpgradeEvent>() {
tracing::debug!("ExecutorActor: Reload the MoveOS instance...");
tracing::info!("ExecutorActor: Reload the MoveOS instance...");

let resolver = RootObjectResolver::new(self.root.clone(), &self.moveos_store);
let gas_parameters = FrameworksGasParameters::load_from_chain(&resolver)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/rooch-executor/src/actor/reader_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl Handler<RefreshStateMessage> for ReaderExecutorActor {
impl Handler<EventData> for ReaderExecutorActor {
async fn handle(&mut self, message: EventData, _ctx: &mut ActorContext) -> Result<()> {
if let Ok(_gas_upgrade_msg) = message.data.downcast::<GasUpgradeEvent>() {
tracing::debug!("ReadExecutorActor: Reload the MoveOS instance...");
tracing::info!("ReadExecutorActor: Reload the MoveOS instance...");

let resolver = RootObjectResolver::new(self.root.clone(), &self.moveos_store);
let gas_parameters = FrameworksGasParameters::load_from_chain(&resolver)?;
Expand Down
2 changes: 2 additions & 0 deletions crates/rooch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ move-vm-types = { workspace = true }
moveos-stdlib = { workspace = true }
moveos-types = { workspace = true }
moveos-store = { workspace = true }
moveos-eventbus = { workspace = true }
moveos-common = { workspace = true }
moveos = { workspace = true }
moveos-verifier = { workspace = true }
Expand All @@ -94,6 +95,7 @@ rooch-rpc-server = { workspace = true }
rooch-rpc-client = { workspace = true }
rooch-integration-test-runner = { workspace = true }
rooch-indexer = { workspace = true }
rooch-event = { workspace = true }
rooch-db = { workspace = true }
rooch-common = { workspace = true }
rooch-store = { workspace = true }
Expand Down
30 changes: 20 additions & 10 deletions crates/rooch/src/commands/da/commands/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use coerce::actor::system::ActorSystem;
use coerce::actor::IntoActor;
use metrics::RegistryService;
use moveos_common::utils::to_bytes;
use moveos_eventbus::bus::EventBus;
use moveos_store::config_store::STARTUP_INFO_KEY;
use moveos_store::{MoveOSStore, CONFIG_STARTUP_INFO_COLUMN_FAMILY_NAME};
use moveos_types::h256::H256;
Expand All @@ -20,6 +21,7 @@ use raw_store::rocks::batch::WriteBatch;
use raw_store::traits::DBStore;
use rooch_config::R_OPT_NET_HELP;
use rooch_db::RoochDB;
use rooch_event::actor::EventActor;
use rooch_executor::actor::executor::ExecutorActor;
use rooch_executor::actor::reader_executor::ReaderExecutorActor;
use rooch_executor::proxy::ExecutorProxy;
Expand Down Expand Up @@ -265,13 +267,13 @@ impl ExecInner {

async fn produce_tx(&self, tx: Sender<ExecMsg>) -> anyhow::Result<()> {
let last_executed_opt = self.tx_da_indexer.find_last_executed()?;
let mut next_tx_order = last_executed_opt
let next_tx_order = last_executed_opt
.clone()
.map(|v| v.tx_order + 1)
.unwrap_or(1);
let mut next_block_number = last_executed_opt
.clone()
.map(|v| v.block_number + 1)
.map(|v| v.block_number) // next_tx_order and last executed tx may be in the same block
.unwrap_or(0);
tracing::info!(
"next_tx_order: {:?}. need rollback soon: {:?}",
Expand Down Expand Up @@ -306,9 +308,10 @@ impl ExecInner {
let rollback_execution_info =
self.tx_da_indexer.get_execution_info(new_last.tx_hash)?;
self.update_startup_info_after_rollback(rollback_execution_info.unwrap())?;
next_block_number = new_last.block_number;
next_tx_order = rollback + 1;
tracing::info!("Rollback transactions done",);
tracing::info!(
"Rollback transactions done. Please RESTART process without rollback."
);
return Ok(()); // rollback done, need to restart to get new state_root for startup rooch store
}
};

Expand Down Expand Up @@ -432,7 +435,7 @@ impl ExecInner {
ledger_tx: LedgerTransaction,
l1block_with_body: Option<L1BlockWithBody>,
) -> anyhow::Result<VerifiedMoveOSTransaction> {
let moveos_tx = match &ledger_tx.data {
let mut moveos_tx = match &ledger_tx.data {
LedgerTxData::L1Block(_block) => {
self.executor
.validate_l1_block(l1block_with_body.unwrap())
Expand All @@ -441,6 +444,7 @@ impl ExecInner {
LedgerTxData::L1Tx(l1_tx) => self.executor.validate_l1_tx(l1_tx.clone()).await?,
LedgerTxData::L2Tx(l2_tx) => self.executor.validate_l2_tx(l2_tx.clone()).await?,
};
moveos_tx.ctx.add(ledger_tx.sequence_info.clone())?;
Ok(moveos_tx)
}

Expand All @@ -451,17 +455,17 @@ impl ExecInner {
) -> anyhow::Result<()> {
let executor = self.executor.clone();

let (output, execution_info) = executor.execute_transaction(moveos_tx.clone()).await?;
let (_output, execution_info) = executor.execute_transaction(moveos_tx.clone()).await?;

let root = execution_info.root_metadata();
let expected_root_opt = self.order_state_pair.get(&tx_order);
match expected_root_opt {
Some(expected_root) => {
if root.state_root.unwrap() != *expected_root {
return Err(anyhow::anyhow!(
"Execution state root is not equal to RoochNetwork: tx_order: {}, exp: {:?}, act: {:?}; act_changeset: {:?}",
"Execution state root is not equal to RoochNetwork: tx_order: {}, exp: {:?}, act: {:?}; act_execution_info: {:?}",
tx_order,
*expected_root, root.state_root.unwrap(), output.changeset
*expected_root, root.state_root.unwrap(), execution_info
));
}
tracing::info!(
Expand Down Expand Up @@ -506,12 +510,18 @@ async fn build_executor_and_store(
build_rooch_db(base_data_dir.clone(), chain_id.clone(), enable_rocks_stats);
let (rooch_store, moveos_store) = (rooch_db.rooch_store.clone(), rooch_db.moveos_store.clone());

let event_bus = EventBus::new();
let event_actor = EventActor::new(event_bus.clone());
let event_actor_ref = event_actor
.into_actor(Some("EventActor"), actor_system)
.await?;

let executor_actor = ExecutorActor::new(
root.clone(),
moveos_store.clone(),
rooch_store.clone(),
&registry_service.default_registry(),
None,
Some(event_actor_ref.clone()),
)?;

let executor_actor_ref = executor_actor
Expand Down
4 changes: 4 additions & 0 deletions crates/rooch/src/commands/da/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ impl TxDAIndexer {
tx_order_hashes.push(item);
}
tx_order_hashes.sort_by(|a, b| a.tx_order.cmp(&b.tx_order)); // avoiding wrong order
tracing::info!(
"tx_order:tx_hash:block indexer loaded, tx cnt: {}",
tx_order_hashes.len()
);
Ok(TxDAIndexer {
tx_order_hash_blocks: tx_order_hashes,
transaction_store,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use crate::commands::da::commands::TxDAIndexer;
use crate::commands::db::commands::init;
use clap::Parser;
use rooch_config::R_OPT_NET_HELP;
use rooch_types::error::RoochResult;
use rooch_types::rooch_network::RoochChainID;
use std::path::PathBuf;

/// Get ExecutionInfo by order
#[derive(Debug, Parser)]
pub struct GetExecutionInfoByOrderCommand {
#[clap(long)]
pub order: u64,

#[clap(
long = "order-hash-path",
help = "Path to tx_order:tx_hash:block_number file"
)]
pub order_hash_path: PathBuf,

#[clap(long = "data-dir", short = 'd')]
/// Path to data dir, this dir is base dir, the final data_dir is base_dir/chain_network_name
pub base_data_dir: Option<PathBuf>,

/// If local chainid, start the service with a temporary data store.
/// All data will be deleted when the service is stopped.
#[clap(long, short = 'n', help = R_OPT_NET_HELP)]
pub chain_id: Option<RoochChainID>,
}

impl GetExecutionInfoByOrderCommand {
pub fn execute(self) -> RoochResult<()> {
let (_root, rooch_db, _start_time) = init(self.base_data_dir, self.chain_id);
let moveos_store = rooch_db.moveos_store.clone();
let tx_da_indexer = TxDAIndexer::load_from_file(
self.order_hash_path.clone(),
moveos_store.transaction_store,
)?;

let tx_order = self.order;

let execution_info = tx_da_indexer.get_execution_info_by_order(tx_order)?;
match execution_info {
Some(_) => {
println!("{}:{:?}", tx_order, execution_info.unwrap());
}
None => {
tracing::warn!("tx_order {} execution_info not found", tx_order);
}
}

Ok(())
}
}
1 change: 1 addition & 0 deletions crates/rooch/src/commands/db/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::time::SystemTime;
pub mod drop;
pub mod dump_tx_root;
pub mod get_changeset_by_order;
pub mod get_execution_info_by_order;
pub mod repair;
pub mod revert;
pub mod rollback;
Expand Down
7 changes: 7 additions & 0 deletions crates/rooch/src/commands/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::cli_types::CommandAction;
use crate::commands::db::commands::drop::DropCommand;
use crate::commands::db::commands::dump_tx_root::DumpTxRootCommand;
use crate::commands::db::commands::get_changeset_by_order::GetChangesetByOrderCommand;
use crate::commands::db::commands::get_execution_info_by_order::GetExecutionInfoByOrderCommand;
use crate::commands::db::commands::repair::RepairCommand;
use crate::commands::db::commands::revert::RevertCommand;
use async_trait::async_trait;
Expand Down Expand Up @@ -45,6 +46,11 @@ impl CommandAction<String> for DB {
DBCommand::DumpTxRoot(dump_tx_root) => dump_tx_root.execute().await.map(|resp| {
serde_json::to_string_pretty(&resp).expect("Failed to serialize response")
}),
DBCommand::GetExecutionInfoByOrder(get_execution_info_by_order) => {
get_execution_info_by_order.execute().map(|resp| {
serde_json::to_string_pretty(&resp).expect("Failed to serialize response")
})
}
}
}
}
Expand All @@ -58,4 +64,5 @@ pub enum DBCommand {
Repair(RepairCommand),
GetChangesetByOrder(GetChangesetByOrderCommand),
DumpTxRoot(DumpTxRootCommand),
GetExecutionInfoByOrder(GetExecutionInfoByOrderCommand),
}

0 comments on commit 5eb829d

Please sign in to comment.