Skip to content
Merged
2 changes: 1 addition & 1 deletion src/actors/consumer/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl Consumer {
.provider(&url)
.await
.unwrap()
.get_txn_count(*metadata.from_account.as_ref())
.get_txn_count(metadata.from_account.as_ref().clone())
.await
{
// If on-chain nonce is greater than our attempted nonce, our transaction is indeed outdated
Expand Down
45 changes: 25 additions & 20 deletions src/actors/producer/producer_actor.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use actix::prelude::*;
use alloy::primitives::Address;
use dashmap::DashMap;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;

use crate::actors::consumer::Consumer;
use crate::actors::monitor::monitor_actor::{PlanProduced, ProduceTxns};
Expand All @@ -15,7 +13,7 @@ use crate::actors::monitor::{
};
use crate::actors::{ExeFrontPlan, PauseProducer, ResumeProducer};
use crate::txn_plan::{addr_pool::AddressPool, PlanExecutionMode, PlanId, TxnPlan};
use crate::util::gen_account::AccountGenerator;
use crate::util::gen_account::{AccountId, AccountManager};

use super::messages::RegisterTxnPlan;

Expand Down Expand Up @@ -71,7 +69,9 @@ pub struct Producer {
monitor_addr: Addr<Monitor>,
consumer_addr: Addr<Consumer>,

nonce_cache: Arc<DashMap<Arc<Address>, u32>>,
nonce_cache: Arc<DashMap<AccountId, u32>>,

account_generator: AccountManager,

/// A queue of plans waiting to be executed. Plans are processed in FIFO order.
plan_queue: VecDeque<Box<dyn TxnPlan>>,
Expand All @@ -86,16 +86,14 @@ impl Producer {
address_pool: Arc<dyn AddressPool>,
consumer_addr: Addr<Consumer>,
monitor_addr: Addr<Monitor>,
account_generator: Arc<RwLock<AccountGenerator>>,
account_generator: AccountManager,
) -> Result<Self, anyhow::Error> {
let nonce_cache = Arc::new(DashMap::new());
let account_generator = account_generator.read().await;
address_pool.clean_ready_accounts();
for (account, nonce) in account_generator.accouts_nonce_iter() {
let address = Arc::new(account.address());
for (account_id, nonce) in account_generator.account_ids_with_nonce() {
let nonce = nonce.load(Ordering::Relaxed) as u32;
nonce_cache.insert(address.clone(), nonce);
address_pool.unlock_correct_nonce(address.clone(), nonce);
nonce_cache.insert(account_id, nonce);
address_pool.unlock_correct_nonce(account_id, nonce);
}
Ok(Self {
state: ProducerState::running(),
Expand All @@ -110,6 +108,7 @@ impl Producer {
},
address_pool,
nonce_cache,
account_generator,
monitor_addr,
consumer_addr,
plan_queue: VecDeque::new(),
Expand Down Expand Up @@ -152,17 +151,20 @@ impl Producer {
monitor_addr: Addr<Monitor>,
consumer_addr: Addr<Consumer>,
address_pool: Arc<dyn AddressPool>,
account_generator: AccountManager,
mut plan: Box<dyn TxnPlan>,
sending_txns: Arc<AtomicU64>,
state: ProducerState,
nonce_cache: Arc<DashMap<Arc<Address>, u32>>,
nonce_cache: Arc<DashMap<AccountId, u32>>,
) -> Result<(), anyhow::Error> {
let plan_id = plan.id().clone();

// Fetch accounts and build transactions
let ready_accounts =
address_pool.fetch_senders(plan.size().unwrap_or_else(|| address_pool.len()));
let iterator = plan.as_mut().build_txns(ready_accounts)?;
let iterator = plan
.as_mut()
.build_txns(ready_accounts, account_generator.clone())?;

// If the plan doesn't consume nonces, accounts can be used by other processes immediately.
if !iterator.consume_nonce {
Expand All @@ -183,7 +185,8 @@ impl Producer {
tracing::debug!("Producer is paused");
tokio::time::sleep(Duration::from_millis(500)).await;
}
let next_nonce = match nonce_cache.get(signed_txn.metadata.from_account.as_ref()) {
let account_id = signed_txn.metadata.from_account_id;
let next_nonce = match nonce_cache.get(&account_id) {
Some(nonce) => *nonce,
None => 0,
};
Expand Down Expand Up @@ -269,6 +272,7 @@ impl Handler<ExeFrontPlan> for Producer {
let plan = self.plan_queue.pop_front().unwrap();
self.stats.remain_plans_num -= 1;
let address_pool = self.address_pool.clone();
let account_generator = self.account_generator.clone();
let monitor_addr = self.monitor_addr.clone();
let consumer_addr = self.consumer_addr.clone();
let self_addr = ctx.address();
Expand All @@ -292,6 +296,7 @@ impl Handler<ExeFrontPlan> for Producer {
monitor_addr,
consumer_addr,
address_pool,
account_generator,
plan,
sending_txns,
state,
Expand Down Expand Up @@ -401,16 +406,15 @@ impl Handler<UpdateSubmissionResult> for Producer {

fn handle(&mut self, msg: UpdateSubmissionResult, _ctx: &mut Self::Context) -> Self::Result {
let address_pool = self.address_pool.clone();
let account = msg.metadata.from_account.clone();
self.stats.sending_txns.fetch_sub(1, Ordering::Relaxed);
match msg.result.as_ref() {
SubmissionResult::Success(_) => {
self.stats.success_txns += 1;
}
SubmissionResult::NonceTooLow { expect_nonce, .. } => {
self.stats.success_txns += 1;
self.nonce_cache
.insert(account.clone(), *expect_nonce as u32);
let account_id = msg.metadata.from_account_id;
self.nonce_cache.insert(account_id, *expect_nonce as u32);
}
SubmissionResult::ErrorWithRetry => {
self.stats.failed_txns += 1;
Expand All @@ -419,21 +423,22 @@ impl Handler<UpdateSubmissionResult> for Producer {
let ready_accounts = self.stats.ready_accounts.clone();
Box::pin(
async move {
let account_id = msg.metadata.from_account_id;
match msg.result.as_ref() {
SubmissionResult::Success(_) => {
address_pool.unlock_next_nonce(account);
address_pool.unlock_next_nonce(account_id);
}
SubmissionResult::NonceTooLow { expect_nonce, .. } => {
tracing::debug!(
"Nonce too low for account {:?}, expect nonce: {}, actual nonce: {}",
account,
account_id,
expect_nonce,
msg.metadata.nonce
);
address_pool.unlock_correct_nonce(account, *expect_nonce as u32);
address_pool.unlock_correct_nonce(account_id, *expect_nonce as u32);
}
SubmissionResult::ErrorWithRetry => {
address_pool.retry_current_nonce(account);
address_pool.retry_current_nonce(account_id);
}
}
ready_accounts.store(address_pool.ready_len() as u64, Ordering::Relaxed);
Expand Down
123 changes: 58 additions & 65 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader as TokioBufReader},
sync::RwLock,
};
use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader};
use tracing::{info, Level};

use crate::{
Expand All @@ -30,7 +27,7 @@ use crate::{
faucet_txn_builder::{Erc20FaucetTxnBuilder, EthFaucetTxnBuilder, FaucetTxnBuilder},
PlanBuilder, TxnPlan,
},
util::gen_account::AccountGenerator,
util::gen_account::{AccountGenerator, AccountManager},
};

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -212,11 +209,11 @@ fn run_command(command: &str) -> Result<Output> {
}

async fn get_init_nonce_map(
accout_generator: Arc<RwLock<AccountGenerator>>,
accout_generator: AccountManager,
faucet_private_key: &str,
eth_client: Arc<EthHttpCli>,
) -> Arc<HashMap<Address, u64>> {
let mut init_nonce_map = accout_generator.read().await.init_nonce_map();
let mut init_nonce_map = accout_generator.init_nonce_map();
let faucet_signer = PrivateKeySigner::from_str(faucet_private_key).unwrap();
let faucet_address = faucet_signer.address();
init_nonce_map.insert(
Expand Down Expand Up @@ -264,19 +261,18 @@ async fn start_bench() -> Result<()> {
});
contract_config
};

let accout_generator = AccountGenerator::with_capacity(benchmark_config.accounts.num_accounts);
let accounts = accout_generator
.write()
.await
let mut accout_generator = AccountGenerator::with_capacity(
PrivateKeySigner::from_str(&benchmark_config.faucet.private_key).unwrap(),
);
let account_ids = accout_generator
.gen_account(0, benchmark_config.accounts.num_accounts as u64)
.unwrap();
let account_addresses = Arc::new(
accounts
let account_addresses = Arc::new({
account_ids
.iter()
.map(|(address, _)| address.clone())
.collect::<Vec<_>>(),
);
.map(|&id| Arc::new(accout_generator.get_address_by_id(id)))
.collect::<Vec<_>>()
});
// Create EthHttpCli instance
let eth_clients: Vec<Arc<EthHttpCli>> = benchmark_config
.nodes
Expand All @@ -287,10 +283,6 @@ async fn start_bench() -> Result<()> {
})
.collect();

let address_pool: Arc<dyn AddressPool> = Arc::new(
txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new(accounts.clone()),
);

let chain_id = benchmark_config.nodes[0].chain_id;

info!("Initializing Faucet constructor...");
Expand All @@ -305,32 +297,51 @@ async fn start_bench() -> Result<()> {
U256::from(benchmark_config.num_tokens)
* U256::from(21000)
* U256::from(1000_000_000_000u64),
accout_generator.clone(),
&mut accout_generator,
)
.await
.unwrap();
if args.recover {
init_nonce(accout_generator.clone(), eth_clients[0].clone()).await;
init_nonce(&mut accout_generator, eth_clients[0].clone()).await;
}
let monitor = Monitor::new_with_clients(
eth_clients.clone(),
benchmark_config.performance.max_pool_size,
)
.start();
// let mut file = tokio::fs::File::create("accounts.txt").await.unwrap();
// for (sign, nonce) in accout_generator.read().await.accouts_nonce_iter() {
// file.write(
// format!(
// "{}, {}, {}\n",
// hex::encode(sign.to_bytes()),
// sign.address().to_string(),
// nonce.load(Ordering::Relaxed),
// )
// .as_bytes(),
// )
// .await
// .unwrap();
// }

let tokens = contract_config.get_all_token();
let mut tokens_plan = Vec::new();
for token in &tokens {
start_nonce += benchmark_config.faucet.faucet_level as u64;
info!("distributing token: {}", token.address);
let token_address = Address::from_str(&token.address).unwrap();
let faucet_token_balance = U256::from_str(&token.faucet_balance).unwrap();
info!("balance of token: {}", faucet_token_balance);
let token_faucet_builder = PlanBuilder::create_faucet_tree_plan_builder(
benchmark_config.faucet.faucet_level as usize,
faucet_token_balance,
&benchmark_config.faucet.private_key,
start_nonce,
account_addresses.clone(),
Arc::new(Erc20FaucetTxnBuilder::new(token_address)),
U256::ZERO,
&mut accout_generator,
)
.await
.unwrap();
tokens_plan.push(token_faucet_builder);
}

let account_manager = accout_generator.to_manager();

let address_pool: Arc<dyn AddressPool> = Arc::new(
txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new(
account_ids.clone(),
account_manager.clone(),
),
);

// Use the same client instances for Consumer to share metrics
let eth_providers: Vec<EthHttpCli> = eth_clients
.iter()
Expand All @@ -346,7 +357,7 @@ async fn start_bench() -> Result<()> {
)
.start();
let init_nonce_map = get_init_nonce_map(
accout_generator.clone(),
account_manager.clone(),
benchmark_config.faucet.private_key.as_str(),
eth_clients[0].clone(),
)
Expand All @@ -356,7 +367,7 @@ async fn start_bench() -> Result<()> {
address_pool.clone(),
consumer,
monitor,
accout_generator.clone(),
account_manager.clone(),
)
.await
.unwrap()
Expand All @@ -371,29 +382,9 @@ async fn start_bench() -> Result<()> {
)
.await?;

let tokens = contract_config.get_all_token();

for token in &tokens {
start_nonce += benchmark_config.faucet.faucet_level as u64;
info!("distributing token: {}", token.address);
let token_address = Address::from_str(&token.address).unwrap();
let faucet_token_balance = U256::from_str(&token.faucet_balance).unwrap();
info!("balance of token: {}", faucet_token_balance);
let token_faucet_builder = PlanBuilder::create_faucet_tree_plan_builder(
benchmark_config.faucet.faucet_level as usize,
faucet_token_balance,
&benchmark_config.faucet.private_key,
start_nonce,
account_addresses.clone(),
Arc::new(Erc20FaucetTxnBuilder::new(token_address)),
U256::ZERO,
accout_generator.clone(),
)
.await
.unwrap();

for (token_plan, token) in tokens_plan.into_iter().zip(tokens.iter()) {
execute_faucet_distribution(
token_faucet_builder,
token_plan,
chain_id,
&producer,
&format!("Token {}", token.symbol),
Expand Down Expand Up @@ -431,14 +422,13 @@ async fn start_bench() -> Result<()> {
Ok(())
}

async fn init_nonce(accout_generator: Arc<RwLock<AccountGenerator>>, eth_client: Arc<EthHttpCli>) {
async fn init_nonce(accout_generator: &mut AccountGenerator, eth_client: Arc<EthHttpCli>) {
tracing::info!("Initializing nonce...");
let accout_generator = accout_generator.read().await;
let tasks = accout_generator
.accouts_nonce_iter()
.map(|(account, nonce)| {
let client = eth_client.clone();
let addr = account.address();
let addr = account.clone();
async move {
let init_nonce = client.get_txn_count(addr).await;
match init_nonce {
Expand All @@ -462,7 +452,10 @@ static ALLOC: dhat::Alloc = dhat::Alloc;
#[actix::main]
async fn main() -> Result<()> {
#[cfg(feature = "dhat-heap")]
let _profiler = dhat::Profiler::new_heap();
let _profiler = {
println!("starting heap profiler...");
dhat::Profiler::new_heap()
};
let res = async { start_bench().await };
let ctrl_c = async {
tokio::signal::ctrl_c()
Expand Down
Loading