Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions src/actors/consumer/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::{debug, error, info, warn};
use crate::{
actors::{
consumer::dispatcher::{Dispatcher, SimpleDispatcher},
monitor::{RegisterConsumer, SubmissionResult, UpdateSubmissionResult},
monitor::{RegisterConsumer, RetryTxn, SubmissionResult, UpdateSubmissionResult},
Monitor,
},
eth::EthHttpCli,
Expand Down Expand Up @@ -231,6 +231,7 @@ impl Consumer {
result: Arc::new(SubmissionResult::Success(tx_hash)),
rpc_url,
send_time: Instant::now(),
signed_bytes: Arc::new(signed_txn.bytes.clone()),
});

// Update statistics and return early
Expand All @@ -244,14 +245,20 @@ impl Consumer {

// --- Handle "already known" error ---
// This means the transaction is already in the node's mempool
if error_string.contains("already known") || error_string.contains("already imported") {
if error_string.contains("already known")
|| error_string.contains("already imported")
{
let tx_hash = keccak256(&signed_txn.bytes);
debug!("Transaction already known by node: {:?}, treating as success", tx_hash);
debug!(
"Transaction already known by node: {:?}, treating as success",
tx_hash
);
monitor_addr.do_send(UpdateSubmissionResult {
metadata,
result: Arc::new(SubmissionResult::Success(tx_hash)),
rpc_url: url,
send_time: Instant::now(),
signed_bytes: Arc::new(signed_txn.bytes.clone()),
});

transactions_sending.fetch_sub(1, Ordering::Relaxed);
Expand All @@ -268,6 +275,7 @@ impl Consumer {
result: Arc::new(SubmissionResult::Success(tx_hash)),
rpc_url: url,
send_time: Instant::now(),
signed_bytes: Arc::new(signed_txn.bytes.clone()),
});

transactions_sending.fetch_sub(1, Ordering::Relaxed);
Expand Down Expand Up @@ -304,6 +312,7 @@ impl Consumer {
}),
rpc_url: url,
send_time: Instant::now(),
signed_bytes: Arc::new(signed_txn.bytes.clone()),
});
}
} else {
Expand All @@ -314,6 +323,7 @@ impl Consumer {
result: Arc::new(SubmissionResult::ErrorWithRetry),
rpc_url: "unknown".to_string(),
send_time: Instant::now(),
signed_bytes: Arc::new(signed_txn.bytes.clone()),
});
}
// After encountering Nonce error, should stop retrying and return regardless
Expand Down Expand Up @@ -346,6 +356,7 @@ impl Consumer {
result: Arc::new(SubmissionResult::ErrorWithRetry), // Mark as needing upstream retry
rpc_url: "unknown".to_string(),
send_time: Instant::now(),
signed_bytes: Arc::new(signed_txn.bytes.clone()),
});

transactions_sending.fetch_sub(1, Ordering::Relaxed);
Expand Down Expand Up @@ -428,10 +439,11 @@ impl Consumer {
Err(_) => {
error!("Semaphore has been closed, stopping consumer loop.");
monitor_addr.do_send(UpdateSubmissionResult {
metadata: signed_txn.metadata,
metadata: signed_txn.metadata.clone(),
result: Arc::new(SubmissionResult::ErrorWithRetry),
rpc_url: "unknown".to_string(),
send_time: Instant::now(),
signed_bytes: Arc::new(signed_txn.bytes.clone()),
});
break;
}
Expand Down Expand Up @@ -544,3 +556,34 @@ impl Handler<SignedTxnWithMetadata> for Consumer {
})
}
}

/// Handler for retry requests from Monitor for timed-out transactions
impl Handler<RetryTxn> for Consumer {
type Result = ();

fn handle(&mut self, msg: RetryTxn, _ctx: &mut Self::Context) -> Self::Result {
debug!("Retrying transaction: {:?}", msg.metadata.txn_id);

// Convert to SignedTxnWithMetadata and send through normal channel
let signed_txn = SignedTxnWithMetadata {
bytes: (*msg.signed_bytes).clone(),
metadata: msg.metadata,
};

let sender = self.pool_sender.clone();
let pool_size = self.stats.pool_size.clone();

// Spawn to avoid blocking the actor
actix::spawn(async move {
match sender.send(signed_txn).await {
Ok(_) => {
pool_size.fetch_add(1, Ordering::Relaxed);
debug!("Retry transaction added to pool");
}
Err(e) => {
error!("Failed to add retry transaction to pool: {:?}", e);
}
}
});
}
}
10 changes: 10 additions & 0 deletions src/actors/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub struct UpdateSubmissionResult {
pub rpc_url: String,
#[allow(unused)]
pub send_time: Instant,
/// Signed transaction bytes for retry support
pub signed_bytes: Arc<Vec<u8>>,
}

#[derive(Message)]
Expand Down Expand Up @@ -76,4 +78,12 @@ pub struct PlanFailed {
pub reason: String,
}

/// Message to retry a timed-out transaction
#[derive(Message, Clone)]
#[rtype(result = "()")]
pub struct RetryTxn {
pub signed_bytes: Arc<Vec<u8>>,
pub metadata: Arc<TxnMetadata>,
}

pub use monitor_actor::Monitor;
37 changes: 32 additions & 5 deletions src/actors/monitor/monitor_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use crate::actors::producer::Producer;
use crate::eth::EthHttpCli;
use crate::txn_plan::PlanId;

use super::txn_tracker::{PlanStatus, TxnTracker};
use super::txn_tracker::{BackpressureAction, PlanStatus, TxnTracker};
use super::{
PlanCompleted, PlanFailed, RegisterConsumer, RegisterPlan, RegisterProducer,
ReportProducerStats, Tick, UpdateSubmissionResult,
ReportProducerStats, RetryTxn, Tick, UpdateSubmissionResult,
};
use crate::actors::{PauseProducer, ResumeProducer};

#[derive(Message)]
#[rtype(result = "()")]
Expand Down Expand Up @@ -154,15 +155,41 @@ impl Handler<Tick> for Monitor {
type Result = ();

fn handle(&mut self, _msg: Tick, ctx: &mut Self::Context) {
// Perform sampling check
// 1. Check local pending backpressure
match self.txn_tracker.check_pending_backpressure() {
BackpressureAction::Pause => {
if let Some(producer_addr) = &self.producer_addr {
producer_addr.do_send(PauseProducer);
}
}
BackpressureAction::Resume => {
if let Some(producer_addr) = &self.producer_addr {
producer_addr.do_send(ResumeProducer);
}
}
BackpressureAction::None => {}
}

// 2. Perform sampling check
let tasks = self.txn_tracker.perform_sampling_check();
let consumer_addr = self.consumer_addr.clone();
if !tasks.is_empty() {
ctx.spawn(
future::join_all(tasks)
.into_actor(self)
.map(move |results, act, _ctx| {
// Handle all parallel execution results
act.txn_tracker.handle_receipt_result(results);
// Process results and get retry queue
let retry_queue = act.txn_tracker.handle_receipt_result(results);

// 3. Send retries to consumer
if let Some(consumer) = &consumer_addr {
for retry_txn in retry_queue {
consumer.do_send(RetryTxn {
signed_bytes: retry_txn.signed_bytes,
metadata: retry_txn.metadata,
});
}
}
}),
);
}
Expand Down
Loading
Loading