diff --git a/src/actors/consumer/actor.rs b/src/actors/consumer/actor.rs index d7d0576..37ccea0 100644 --- a/src/actors/consumer/actor.rs +++ b/src/actors/consumer/actor.rs @@ -14,7 +14,7 @@ use tracing::{debug, error, info, warn}; use crate::{ actors::{ consumer::dispatcher::{Dispatcher, SimpleDispatcher}, - monitor::{RegisterConsumer, SubmissionResult, UpdateSubmissionResult}, + monitor::{RegisterConsumer, RetryTxn, SubmissionResult, UpdateSubmissionResult}, Monitor, }, eth::EthHttpCli, @@ -231,6 +231,7 @@ impl Consumer { result: Arc::new(SubmissionResult::Success(tx_hash)), rpc_url, send_time: Instant::now(), + signed_bytes: Arc::new(signed_txn.bytes.clone()), }); // Update statistics and return early @@ -244,14 +245,20 @@ impl Consumer { // --- Handle "already known" error --- // This means the transaction is already in the node's mempool - if error_string.contains("already known") || error_string.contains("already imported") { + if error_string.contains("already known") + || error_string.contains("already imported") + { let tx_hash = keccak256(&signed_txn.bytes); - debug!("Transaction already known by node: {:?}, treating as success", tx_hash); + debug!( + "Transaction already known by node: {:?}, treating as success", + tx_hash + ); monitor_addr.do_send(UpdateSubmissionResult { metadata, result: Arc::new(SubmissionResult::Success(tx_hash)), rpc_url: url, send_time: Instant::now(), + signed_bytes: Arc::new(signed_txn.bytes.clone()), }); transactions_sending.fetch_sub(1, Ordering::Relaxed); @@ -268,6 +275,7 @@ impl Consumer { result: Arc::new(SubmissionResult::Success(tx_hash)), rpc_url: url, send_time: Instant::now(), + signed_bytes: Arc::new(signed_txn.bytes.clone()), }); transactions_sending.fetch_sub(1, Ordering::Relaxed); @@ -304,6 +312,7 @@ impl Consumer { }), rpc_url: url, send_time: Instant::now(), + signed_bytes: Arc::new(signed_txn.bytes.clone()), }); } } else { @@ -314,6 +323,7 @@ impl Consumer { result: Arc::new(SubmissionResult::ErrorWithRetry), rpc_url: "unknown".to_string(), send_time: Instant::now(), + signed_bytes: Arc::new(signed_txn.bytes.clone()), }); } // After encountering Nonce error, should stop retrying and return regardless @@ -346,6 +356,7 @@ impl Consumer { result: Arc::new(SubmissionResult::ErrorWithRetry), // Mark as needing upstream retry rpc_url: "unknown".to_string(), send_time: Instant::now(), + signed_bytes: Arc::new(signed_txn.bytes.clone()), }); transactions_sending.fetch_sub(1, Ordering::Relaxed); @@ -428,10 +439,11 @@ impl Consumer { Err(_) => { error!("Semaphore has been closed, stopping consumer loop."); monitor_addr.do_send(UpdateSubmissionResult { - metadata: signed_txn.metadata, + metadata: signed_txn.metadata.clone(), result: Arc::new(SubmissionResult::ErrorWithRetry), rpc_url: "unknown".to_string(), send_time: Instant::now(), + signed_bytes: Arc::new(signed_txn.bytes.clone()), }); break; } @@ -544,3 +556,34 @@ impl Handler for Consumer { }) } } + +/// Handler for retry requests from Monitor for timed-out transactions +impl Handler for Consumer { + type Result = (); + + fn handle(&mut self, msg: RetryTxn, _ctx: &mut Self::Context) -> Self::Result { + debug!("Retrying transaction: {:?}", msg.metadata.txn_id); + + // Convert to SignedTxnWithMetadata and send through normal channel + let signed_txn = SignedTxnWithMetadata { + bytes: (*msg.signed_bytes).clone(), + metadata: msg.metadata, + }; + + let sender = self.pool_sender.clone(); + let pool_size = self.stats.pool_size.clone(); + + // Spawn to avoid blocking the actor + actix::spawn(async move { + match sender.send(signed_txn).await { + Ok(_) => { + pool_size.fetch_add(1, Ordering::Relaxed); + debug!("Retry transaction added to pool"); + } + Err(e) => { + error!("Failed to add retry transaction to pool: {:?}", e); + } + } + }); + } +} diff --git a/src/actors/monitor/mod.rs b/src/actors/monitor/mod.rs index f9a425e..5fd8f9d 100644 --- a/src/actors/monitor/mod.rs +++ b/src/actors/monitor/mod.rs @@ -49,6 +49,8 @@ pub struct UpdateSubmissionResult { pub rpc_url: String, #[allow(unused)] pub send_time: Instant, + /// Signed transaction bytes for retry support + pub signed_bytes: Arc>, } #[derive(Message)] @@ -76,4 +78,12 @@ pub struct PlanFailed { pub reason: String, } +/// Message to retry a timed-out transaction +#[derive(Message, Clone)] +#[rtype(result = "()")] +pub struct RetryTxn { + pub signed_bytes: Arc>, + pub metadata: Arc, +} + pub use monitor_actor::Monitor; diff --git a/src/actors/monitor/monitor_actor.rs b/src/actors/monitor/monitor_actor.rs index a888712..4171765 100644 --- a/src/actors/monitor/monitor_actor.rs +++ b/src/actors/monitor/monitor_actor.rs @@ -12,11 +12,12 @@ use crate::actors::producer::Producer; use crate::eth::EthHttpCli; use crate::txn_plan::PlanId; -use super::txn_tracker::{PlanStatus, TxnTracker}; +use super::txn_tracker::{BackpressureAction, PlanStatus, TxnTracker}; use super::{ PlanCompleted, PlanFailed, RegisterConsumer, RegisterPlan, RegisterProducer, - ReportProducerStats, Tick, UpdateSubmissionResult, + ReportProducerStats, RetryTxn, Tick, UpdateSubmissionResult, }; +use crate::actors::{PauseProducer, ResumeProducer}; #[derive(Message)] #[rtype(result = "()")] @@ -154,15 +155,41 @@ impl Handler for Monitor { type Result = (); fn handle(&mut self, _msg: Tick, ctx: &mut Self::Context) { - // Perform sampling check + // 1. Check local pending backpressure + match self.txn_tracker.check_pending_backpressure() { + BackpressureAction::Pause => { + if let Some(producer_addr) = &self.producer_addr { + producer_addr.do_send(PauseProducer); + } + } + BackpressureAction::Resume => { + if let Some(producer_addr) = &self.producer_addr { + producer_addr.do_send(ResumeProducer); + } + } + BackpressureAction::None => {} + } + + // 2. Perform sampling check let tasks = self.txn_tracker.perform_sampling_check(); + let consumer_addr = self.consumer_addr.clone(); if !tasks.is_empty() { ctx.spawn( future::join_all(tasks) .into_actor(self) .map(move |results, act, _ctx| { - // Handle all parallel execution results - act.txn_tracker.handle_receipt_result(results); + // Process results and get retry queue + let retry_queue = act.txn_tracker.handle_receipt_result(results); + + // 3. Send retries to consumer + if let Some(consumer) = &consumer_addr { + for retry_txn in retry_queue { + consumer.do_send(RetryTxn { + signed_bytes: retry_txn.signed_bytes, + metadata: retry_txn.metadata, + }); + } + } }), ); } diff --git a/src/actors/monitor/txn_tracker.rs b/src/actors/monitor/txn_tracker.rs index 8bafead..6b0aeee 100644 --- a/src/actors/monitor/txn_tracker.rs +++ b/src/actors/monitor/txn_tracker.rs @@ -18,6 +18,14 @@ const SAMPLING_SIZE: usize = 10; // Define sampling size const TXN_TIMEOUT: Duration = Duration::from_secs(600); // 10 minutes timeout const TPS_WINDOW: Duration = Duration::from_secs(17); +// Backpressure configuration +const MAX_PENDING_TXNS: usize = 100_000; +const BACKPRESSURE_RESUME_THRESHOLD: usize = 80_000; // 80% of max + +// Retry configuration +const RETRY_TIMEOUT: Duration = Duration::from_secs(120); // Retry if stuck for 120s +const MAX_RETRY_BATCH: usize = 10_000; // Max transactions to retry per batch + /// Format large numbers with appropriate suffixes (K, M, B) fn format_large_number(num: u64) -> String { if num >= 1_000_000_000 { @@ -53,6 +61,8 @@ pub struct TxnTracker { producer_sending_txns: u64, mempool_pending: u64, mempool_queued: u64, + /// Track if producer was paused due to pending txn limit + producer_paused_by_pending: bool, } /// Tracking status of a single transaction plan @@ -82,6 +92,8 @@ pub(crate) struct PendingTxInfo { metadata: Arc, rpc_url: String, submit_time: Instant, + /// Signed transaction bytes for retry support + signed_bytes: Arc>, } //--- Core implementation required for BTreeSet sorting ---// @@ -118,6 +130,21 @@ pub enum PlanStatus { InProgress, } +/// Backpressure action to control producer +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BackpressureAction { + Pause, + Resume, + None, +} + +/// Transaction ready for retry +#[derive(Debug, Clone)] +pub struct RetryTxnInfo { + pub signed_bytes: Arc>, + pub metadata: Arc, +} + impl TxnTracker { /// Create new transaction tracker pub fn new(clients: Vec>) -> Self { @@ -142,6 +169,7 @@ impl TxnTracker { producer_sending_txns: 0, mempool_pending: 0, mempool_queued: 0, + producer_paused_by_pending: false, } } @@ -155,6 +183,29 @@ impl TxnTracker { self.mempool_queued = queued; } + /// Check if backpressure should be applied based on pending txn count + pub fn check_pending_backpressure(&mut self) -> BackpressureAction { + let current = self.pending_txns.len(); + if current >= MAX_PENDING_TXNS && !self.producer_paused_by_pending { + self.producer_paused_by_pending = true; + warn!( + "Pending txns {} >= {}, pausing producer", + current, MAX_PENDING_TXNS + ); + BackpressureAction::Pause + } else if current < BACKPRESSURE_RESUME_THRESHOLD && self.producer_paused_by_pending { + self.producer_paused_by_pending = false; + tracing::info!( + "Pending txns {} < {}, resuming producer", + current, + BACKPRESSURE_RESUME_THRESHOLD + ); + BackpressureAction::Resume + } else { + BackpressureAction::None + } + } + pub fn handler_produce_txns(&mut self, plan_id: PlanId, count: usize) { if let Some(tracker) = self.plan_trackers.get_mut(&plan_id) { tracker.produce_transactions += count; @@ -205,6 +256,7 @@ impl TxnTracker { metadata: msg.metadata.clone(), rpc_url: msg.rpc_url.clone(), submit_time: Instant::now(), + signed_bytes: msg.signed_bytes.clone(), }; // Insert transaction into the global, time-sorted BTreeSet @@ -221,6 +273,7 @@ impl TxnTracker { metadata: msg.metadata.clone(), rpc_url: msg.rpc_url.clone(), submit_time: Instant::now(), + signed_bytes: msg.signed_bytes.clone(), }; self.pending_txns.insert(pending_info); debug!( @@ -351,9 +404,10 @@ impl TxnTracker { Result, Result, anyhow::Error>, )>, - ) { + ) -> Vec { let mut successful_txns = Vec::new(); let mut failed_txns = Vec::new(); // Including Pending, Timeout, Error + let mut retry_queue = Vec::new(); // 1. Categorize results for (info, account, result) in results { @@ -454,30 +508,81 @@ impl TxnTracker { } } - // 4. Process failed or still pending transactions from this sampling - for info in failed_txns { - if info.submit_time.elapsed() > TXN_TIMEOUT { - // Transaction timed out, completely failed - error!( - "Transaction timed out: plan_id={}, tx_hash={:?}", - info.metadata.plan_id, info.tx_hash + // 4. Process failed or still pending transactions + // Find the youngest (latest submit_time) transaction that has timed out (>120s) + // Then retry ALL transactions older than or equal to it (up to MAX_RETRY_BATCH) + let retry_cutoff: Option = failed_txns + .iter() + .filter(|info| info.submit_time.elapsed() > RETRY_TIMEOUT) + .max_by_key(|info| info.submit_time) + .cloned(); + + if let Some(cutoff) = retry_cutoff { + // Collect all transactions older than or equal to the cutoff + let to_process: Vec<_> = self + .pending_txns + .iter() + .take_while(|info| info.submit_time <= cutoff.submit_time) + .take(MAX_RETRY_BATCH) + .cloned() + .collect(); + + let batch_size = to_process.len(); + if batch_size > 0 { + warn!( + "Detected stuck transactions, processing interval of {} txns (cutoff tx={}, stuck for {}s)", + batch_size, + cutoff.tx_hash, + cutoff.submit_time.elapsed().as_secs() ); - if let Some(plan_tracker) = self.plan_trackers.get_mut(&info.metadata.plan_id) { - plan_tracker.resolved_transactions += 1; - plan_tracker.failed_executions += 1; - self.total_failed_executions += 1; - self.resolved_txn_timestamps.push_back(Instant::now()); - self.total_resolved_transactions += 1; + } + + for info in to_process { + self.pending_txns.remove(&info); + + if info.submit_time.elapsed() > TXN_TIMEOUT { + // Transaction has completely timed out (10 min), mark as failed + error!( + "Transaction completely timed out: plan_id={}, tx_hash={:?}", + info.metadata.plan_id, info.tx_hash + ); + if let Some(plan_tracker) = self.plan_trackers.get_mut(&info.metadata.plan_id) { + plan_tracker.resolved_transactions += 1; + plan_tracker.failed_executions += 1; + self.total_failed_executions += 1; + self.resolved_txn_timestamps.push_back(Instant::now()); + self.total_resolved_transactions += 1; + } + } else { + // Queue for retry (and remove from pending to avoid duplicate retry) + retry_queue.push(RetryTxnInfo { + signed_bytes: info.signed_bytes.clone(), + metadata: info.metadata.clone(), + }); } - } else { - // Not timed out, put back in main queue for next round check - debug!( - "Re-inserting pending transaction: tx_hash={:?}", - info.tx_hash - ); - self.pending_txns.insert(info); + } + } else { + // No timed-out transactions in samples, just re-insert failed ones + for info in failed_txns { + if info.submit_time.elapsed() > TXN_TIMEOUT { + error!( + "Transaction completely timed out: plan_id={}, tx_hash={:?}", + info.metadata.plan_id, info.tx_hash + ); + self.pending_txns.remove(&info); + if let Some(plan_tracker) = self.plan_trackers.get_mut(&info.metadata.plan_id) { + plan_tracker.resolved_transactions += 1; + plan_tracker.failed_executions += 1; + self.total_failed_executions += 1; + self.resolved_txn_timestamps.push_back(Instant::now()); + self.total_resolved_transactions += 1; + } + } + // If not timed out, they stay in pending_txns (already there from earlier) } } + + retry_queue } pub fn log_stats(&mut self) { diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index a4dedef..605785a 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -113,7 +113,7 @@ impl Producer { consumer_addr, plan_queue: VecDeque::new(), pending_plans: HashMap::new(), - max_queue_size: 100000, + max_queue_size: 10, }) } diff --git a/src/main.rs b/src/main.rs index 6cde07b..1e20475 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,7 @@ use std::{ time::{Duration, Instant}, }; use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader}; -use tracing::{info, Level}; +use tracing::{error, info, Level}; use crate::{ actors::{consumer::Consumer, producer::Producer, Monitor, RegisterTxnPlan}, @@ -105,8 +105,11 @@ async fn execute_faucet_distribution( let faucet_level_plan = faucet_builder.create_plan_for_level(level, init_nonce_map.clone(), chain_id); - let rx = run_plan(faucet_level_plan, producer).await?; - rx.await??; + let rx = run_plan(faucet_level_plan, producer).await; + match rx { + Ok(rx) => rx.await??, + Err(err) => tracing::error!("Failed to run plan: {}", err), + } if wait_duration_secs > 0 { tokio::time::sleep(std::time::Duration::from_secs(wait_duration_secs)).await; } @@ -162,7 +165,20 @@ async fn test_uniswap( contract_config.get_router_address().unwrap(), tps, ); - let _rx = run_plan(plan, producer).await?; + let rx = match run_plan(plan, producer).await { + Ok(rx) => rx, + Err(e) => { + info!("Failed to submit plan: {}. Retrying...", e); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + }; + // Spawn waiting in background to allow parallel plan execution + tokio::spawn(async move { + if let Err(e) = rx.await { + error!("Plan execution failed: {:?}", e); + } + }); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } Ok(()) @@ -193,7 +209,20 @@ async fn test_erc20_transfer( address_pool.clone(), tps, ); - let _rx = run_plan(erc20_transfer, producer).await?; + let rx = match run_plan(erc20_transfer, producer).await { + Ok(rx) => rx, + Err(e) => { + info!("Failed to submit plan: {}. Retrying...", e); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + }; + // Spawn waiting in background to allow parallel plan execution + tokio::spawn(async move { + if let Err(e) = rx.await { + error!("Plan execution failed: {:?}", e); + } + }); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } Ok(()) @@ -447,14 +476,27 @@ async fn init_nonce(accout_generator: &mut AccountGenerator, eth_client: Arc { - nonce.store(init_nonce, Ordering::Relaxed); + let mut init_nonce = None; + { + for _ in 0..5 { + let res = tokio::time::timeout( + std::time::Duration::from_secs(10), + client.get_txn_count(addr), + ) + .await; + if res.is_ok() { + init_nonce = res.ok(); + break; + } + } + } + match &init_nonce { + Some(Ok(init_nonce)) => { + nonce.store(*init_nonce, Ordering::Relaxed); pb.inc(1); } - Err(e) => { - tracing::error!("Failed to get nonce for address: {}: {}", addr, e); + _ => { + tracing::error!("Failed to get nonce for address: {:?}", init_nonce); panic!("Failed to get nonce for address: {}", addr); } }