Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion src/actors/consumer/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::{debug, error, info, warn};
use crate::{
actors::{
consumer::dispatcher::{Dispatcher, SimpleDispatcher},
monitor::{RegisterConsumer, SubmissionResult, UpdateSubmissionResult},
monitor::{RegisterConsumer, RetryDroppedTxn, SubmissionResult, UpdateSubmissionResult},
Monitor,
},
eth::EthHttpCli,
Expand Down Expand Up @@ -231,6 +231,7 @@ impl Consumer {
result: Arc::new(SubmissionResult::Success(tx_hash)),
rpc_url,
send_time: Instant::now(),
raw_tx: Some(Arc::new(signed_txn.bytes.clone())),
});

// Update statistics and return early
Expand All @@ -251,6 +252,7 @@ impl Consumer {
result: Arc::new(SubmissionResult::Success(tx_hash)),
rpc_url: url,
send_time: Instant::now(),
raw_tx: Some(Arc::new(signed_txn.bytes.clone())),
});

transactions_sending.fetch_sub(1, Ordering::Relaxed);
Expand Down Expand Up @@ -287,6 +289,7 @@ impl Consumer {
}),
rpc_url: url,
send_time: Instant::now(),
raw_tx: None,
});
}
} else {
Expand All @@ -297,6 +300,7 @@ impl Consumer {
result: Arc::new(SubmissionResult::ErrorWithRetry),
rpc_url: "unknown".to_string(),
send_time: Instant::now(),
raw_tx: None,
});
}
// After encountering Nonce error, should stop retrying and return regardless
Expand Down Expand Up @@ -329,6 +333,7 @@ impl Consumer {
result: Arc::new(SubmissionResult::ErrorWithRetry), // Mark as needing upstream retry
rpc_url: "unknown".to_string(),
send_time: Instant::now(),
raw_tx: None,
});

transactions_sending.fetch_sub(1, Ordering::Relaxed);
Expand Down Expand Up @@ -415,6 +420,7 @@ impl Consumer {
result: Arc::new(SubmissionResult::ErrorWithRetry),
rpc_url: "unknown".to_string(),
send_time: Instant::now(),
raw_tx: None,
});
break;
}
Expand Down Expand Up @@ -527,3 +533,29 @@ impl Handler<SignedTxnWithMetadata> for Consumer {
})
}
}

impl Handler<RetryDroppedTxn> for Consumer {
type Result = ResponseFuture<()>;

fn handle(&mut self, msg: RetryDroppedTxn, _ctx: &mut Self::Context) -> Self::Result {
debug!("Retrying dropped transaction: {:?}", msg.original_hash);
let dispatcher = self.dispatcher.clone();
let monitor_addr = self.monitor_addr.clone();
// Note: We don't update stats here as it's a silent retry, or maybe we should?
// For now, keep it simple.
Box::pin(async move {
match dispatcher.send_tx(msg.raw_tx.as_ref().clone(), msg.metadata.txn_id).await {
Ok((tx_hash, rpc_url)) => {
info!("Retried txn sent. Hash: {}, URL: {}", tx_hash, rpc_url);
// Notify monitor again? If we do, it might create a duplicate entry if not careful.
// But we updated the hash in the previous successful send.
// Actually, if we just resend, the hash should be the same (raw_tx is same).
// So Monitor will just see it eventually.
}
Err((e, _)) => {
warn!("Retry failed for txn {:?}: {}", msg.original_hash, e);
}
}
})
}
}
10 changes: 10 additions & 0 deletions src/actors/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub struct UpdateSubmissionResult {
pub rpc_url: String,
#[allow(unused)]
pub send_time: Instant,
// Added: raw transaction bytes for retry
pub raw_tx: Option<Arc<Vec<u8>>>,
}

#[derive(Message)]
Expand All @@ -69,4 +71,12 @@ pub struct PlanFailed {
pub reason: String,
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct RetryDroppedTxn {
pub raw_tx: Arc<Vec<u8>>,
pub metadata: Arc<TxnMetadata>,
pub original_hash: TxHash,
}

pub use monitor_actor::Monitor;
13 changes: 12 additions & 1 deletion src/actors/monitor/monitor_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,18 @@ impl Handler<Tick> for Monitor {
.into_actor(self)
.map(move |results, act, _ctx| {
// Handle all parallel execution results
act.txn_tracker.handle_receipt_result(results);
let retries = act.txn_tracker.handle_receipt_result(results);

// Send retry requests if any
if !retries.is_empty() {
if let Some(consumer_addr) = &act.consumer_addr {
for retry_msg in retries {
consumer_addr.do_send(retry_msg);
}
} else {
tracing::error!("Cannot retry transactions: Consumer address not registered");
}
}
}),
);
}
Expand Down
137 changes: 87 additions & 50 deletions src/actors/monitor/txn_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use std::time::{Duration, Instant};

use alloy::consensus::Account;
use alloy::primitives::TxHash;
use alloy::rpc::types::TransactionReceipt;
use comfy_table::{presets::UTF8_FULL, Attribute, Cell, Color, Table};
use tracing::{debug, error, warn};

use crate::actors::monitor::SubmissionResult;
use crate::actors::monitor::{RetryDroppedTxn, SubmissionResult};
use crate::eth::EthHttpCli;
use crate::txn_plan::{PlanId, TxnMetadata};

Expand All @@ -17,6 +18,7 @@ use super::UpdateSubmissionResult;
const SAMPLING_SIZE: usize = 10; // Define sampling size
const TXN_TIMEOUT: Duration = Duration::from_secs(600); // 10 minutes timeout
const TPS_WINDOW: Duration = Duration::from_secs(17);
const DROP_DETECTION_THRESHOLD: Duration = Duration::from_secs(10); // Minimum time before checking for dropped txs

/// Format large numbers with appropriate suffixes (K, M, B)
fn format_large_number(num: u64) -> String {
Expand Down Expand Up @@ -78,6 +80,7 @@ pub(crate) struct PendingTxInfo {
metadata: Arc<TxnMetadata>,
rpc_url: String,
submit_time: Instant,
raw_tx: Option<Arc<Vec<u8>>>, // Added raw_tx for retry
}

//--- Core implementation required for BTreeSet sorting ---//
Expand Down Expand Up @@ -114,6 +117,16 @@ pub enum PlanStatus {
InProgress,
}

/// Status of a checked transaction
#[derive(Debug)]
pub enum TransactionCheckStatus {
Confirmed(TransactionReceipt),
PendingInPool, // Found in pool (get_tx_by_hash returned it)
Dropped, // Not in pool, not confirmed (get_tx_by_hash returned None)
RpcError(anyhow::Error),
NonceMismatch(Account), // Account nonce > tx nonce, likely confirmed but receipt missing (or reorged)
}

impl TxnTracker {
/// Create new transaction tracker
pub fn new(clients: Vec<Arc<EthHttpCli>>) -> Self {
Expand Down Expand Up @@ -187,6 +200,7 @@ impl TxnTracker {
metadata: msg.metadata.clone(),
rpc_url: msg.rpc_url.clone(),
submit_time: Instant::now(),
raw_tx: msg.raw_tx.clone(),
};

// Insert transaction into the global, time-sorted BTreeSet
Expand All @@ -203,6 +217,7 @@ impl TxnTracker {
metadata: msg.metadata.clone(),
rpc_url: msg.rpc_url.clone(),
submit_time: Instant::now(),
raw_tx: msg.raw_tx.clone(),
};
self.pending_txns.insert(pending_info);
debug!(
Expand Down Expand Up @@ -267,8 +282,7 @@ impl TxnTracker {
impl std::future::Future<
Output = (
PendingTxInfo,
Result<Account, anyhow::Error>,
Result<Option<alloy::rpc::types::TransactionReceipt>, anyhow::Error>,
TransactionCheckStatus,
),
>,
> {
Expand Down Expand Up @@ -306,16 +320,25 @@ impl TxnTracker {
let task_info = pending_info.clone();

let task = async move {
let result = client.get_transaction_receipt(task_info.tx_hash).await;
let account = client
.get_account(*task_info.metadata.from_account.as_ref())
.await;
tracing::debug!(
"checked tx_hash={:?} result={:?}",
task_info.tx_hash,
result
);
(task_info, account, result)
// 1. Check Receipt
match client.get_transaction_receipt(task_info.tx_hash).await {
Ok(Some(receipt)) => (task_info, TransactionCheckStatus::Confirmed(receipt)),
Err(e) => (task_info, TransactionCheckStatus::RpcError(e)),
Ok(None) => {
// 2. Receipt not found, check if transaction exists in pool/chain
// Only perform this check if enough time has passed to assume propagation
if task_info.submit_time.elapsed() > DROP_DETECTION_THRESHOLD {
match client.get_transaction_by_hash(task_info.tx_hash).await {
Ok(Some(_)) => (task_info, TransactionCheckStatus::PendingInPool),
Ok(None) => (task_info, TransactionCheckStatus::Dropped),
Err(e) => (task_info, TransactionCheckStatus::RpcError(e)),
}
} else {
// Too soon to check for drop, assume pending
(task_info, TransactionCheckStatus::PendingInPool)
}
}
}
};
tasks.push(task);
} else {
Expand All @@ -330,49 +353,65 @@ impl TxnTracker {
&mut self,
results: Vec<(
PendingTxInfo,
Result<Account, anyhow::Error>,
Result<Option<alloy::rpc::types::TransactionReceipt>, anyhow::Error>,
TransactionCheckStatus,
)>,
) {
) -> Vec<RetryDroppedTxn> {
let mut successful_txns = Vec::new();
let mut failed_txns = Vec::new(); // Including Pending, Timeout, Error
let mut retries = Vec::new();

// 1. Categorize results
for (info, account, result) in results {
match result {
Ok(Some(receipt)) => {
for (info, status) in results {
match status {
TransactionCheckStatus::Confirmed(receipt) => {
// Transaction successfully confirmed
self.pending_txns.remove(&info);
successful_txns.push((info, receipt.status()));
}
Ok(None) => {
// Transaction still pending
if let Ok(account) = account {
if account.nonce > info.metadata.nonce {
successful_txns.push((info, true));
}
} else {
failed_txns.push(info);
}
TransactionCheckStatus::PendingInPool => {
// Check for nonce update to detect "silent success" (optional but good)
// For now, just treat as pending
if info.submit_time.elapsed() > TXN_TIMEOUT {
failed_txns.push(info); // Will be handled as timeout
}
// Else do nothing, it stays in pending_txns
}
Err(e) => {
// RPC query failed
warn!(
"Failed to get receipt for tx_hash={:?}: {}",
info.tx_hash, e
);
failed_txns.push(info);
TransactionCheckStatus::Dropped => {
// Transaction is missing from node!
if info.submit_time.elapsed() > TXN_TIMEOUT {
failed_txns.push(info);
} else {
// Trigger Retry
if let Some(raw_tx) = &info.raw_tx {
warn!("Transaction dropped (not found in pool), retrying: {:?}", info.tx_hash);
retries.push(RetryDroppedTxn {
raw_tx: raw_tx.clone(),
metadata: info.metadata.clone(),
original_hash: info.tx_hash,
});

// IMPORTANT: Update submit_time to prevent immediate retry loop
// We remove and re-insert with new time
self.pending_txns.remove(&info);
let mut new_info = info.clone();
new_info.submit_time = Instant::now();
self.pending_txns.insert(new_info);
} else {
error!("Transaction dropped but no raw_tx available for retry: {:?}", info.tx_hash);
failed_txns.push(info);
}
}
}
TransactionCheckStatus::RpcError(e) => {
warn!("RPC error checking status for {:?}: {}", info.tx_hash, e);
// Keep pending
}
TransactionCheckStatus::NonceMismatch(_) => {
// Not implementing nonce check logic here as requested, relying on hash checks
}
}
}

if !failed_txns.is_empty() {
debug!(
"Failed to get receipt for {} transactions",
failed_txns.len()
);
}

let successful_txns_hash = successful_txns
.iter()
.map(|(info, _)| info.tx_hash)
Expand Down Expand Up @@ -451,15 +490,13 @@ impl TxnTracker {
self.resolved_txn_timestamps.push_back(Instant::now());
self.total_resolved_transactions += 1;
}
} else {
// Not timed out, put back in main queue for next round check
debug!(
"Re-inserting pending transaction: tx_hash={:?}",
info.tx_hash
);
self.pending_txns.insert(info);
}
// Remove from pending set if timed out
self.pending_txns.remove(&info);
}
// Else: it was just re-inserted or kept in pending set above
}

retries
}

pub fn log_stats(&mut self) {
Expand Down
6 changes: 6 additions & 0 deletions src/eth/eth_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ impl EthHttpCli {
self.chain_id
}

pub async fn get_transaction_by_hash(&self, tx_hash: TxHash) -> Result<Option<TransactionReceipt>> {
let idx = rand::thread_rng().gen_range(0..self.inner.len());
let result = self.inner[idx].get_transaction_receipt(tx_hash).await;
result.with_context(|| format!("Failed to get transaction receipt for hash: {:?}", tx_hash))
}

pub async fn get_txn_count(&self, address: Address) -> Result<u64> {
tokio::time::timeout(Duration::from_secs(10), async {
let nonce = self.inner[0].get_transaction_count(address).await?;
Expand Down
Loading