Skip to content

Commit 3ce1ce7

Browse files
authored
Revert "Revert "feat: middleware submission tweaks (#23)" (#29)" (#31)
re-adds middleware fixes to test on RC context
1 parent 0afa806 commit 3ce1ce7

File tree

2 files changed

+78
-44
lines changed

2 files changed

+78
-44
lines changed

ethers-middleware/src/gas_escalator/mod.rs

+52-44
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub(crate) struct GasEscalatorMiddlewareInternal<M> {
6161

6262
#[derive(Debug, Clone, PartialEq, Eq)]
6363
pub struct MonitoredTransaction {
64-
hash: TxHash,
64+
hash: Option<TxHash>,
6565
inner: TypedTransaction,
6666
creation_time: Instant,
6767
block: Option<BlockId>,
@@ -205,23 +205,30 @@ where
205205
) -> Result<PendingTransaction<'_, M::Provider>, GasEscalatorError<M>> {
206206
let tx = tx.into();
207207

208-
let pending_tx = self
209-
.inner
210-
.send_transaction(tx.clone(), block)
211-
.await
212-
.map_err(GasEscalatorError::MiddlewareError)?;
213-
214-
tracing::debug!(tx = ?tx, tx_hash = ?pending_tx.tx_hash(), "Sent tx, adding to gas escalator watcher");
215-
// insert the tx in the pending txs
216-
let mut lock = self.txs.lock().await;
217-
lock.push(MonitoredTransaction {
218-
hash: *pending_tx,
219-
inner: tx,
220-
creation_time: Instant::now(),
221-
block,
222-
});
223-
224-
Ok(pending_tx)
208+
match self.inner.send_transaction(tx.clone(), block).await {
209+
Ok(pending_tx) => {
210+
tracing::debug!(tx = ?tx, tx_hash = ?pending_tx.tx_hash(), "Sent tx, adding to gas escalator watcher");
211+
let mut lock = self.txs.lock().await;
212+
lock.push(MonitoredTransaction {
213+
hash: Some(*pending_tx),
214+
inner: tx,
215+
creation_time: Instant::now(),
216+
block,
217+
});
218+
Ok(pending_tx)
219+
}
220+
Err(err) => {
221+
tracing::warn!(tx = ?tx, "Failed to send tx, adding to gas escalator watcher regardless");
222+
let mut lock = self.txs.lock().await;
223+
lock.push(MonitoredTransaction {
224+
hash: None,
225+
inner: tx,
226+
creation_time: Instant::now(),
227+
block: None,
228+
});
229+
Err(GasEscalatorError::MiddlewareError(err))
230+
}
231+
}
225232
}
226233
}
227234

@@ -279,11 +286,11 @@ impl<M, E: Clone> EscalationTask<M, E> {
279286
err_message: String,
280287
old_monitored_tx: MonitoredTransaction,
281288
new_tx: TypedTransaction,
282-
) -> Option<(H256, Instant)> {
289+
) -> Option<(Option<H256>, Instant)> {
283290
if err_message.contains("nonce too low") {
284291
// may happen if we try to broadcast a new, gas-escalated tx when the original tx
285292
// already landed onchain, meaning we no longer need to escalate it
286-
tracing::warn!(err = err_message, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator");
293+
tracing::warn!(err = err_message, ?old_monitored_tx, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator");
287294
None
288295
} else if RETRYABLE_ERRORS.iter().any(|err_msg| err_message.contains(err_msg)) {
289296
// if the error is one of the known retryable errors, we can keep trying to escalate
@@ -311,21 +318,15 @@ impl<M, E: Clone> EscalationTask<M, E> {
311318
///
312319
/// **Returns** a tx hash to monitor and the time it was created, unless the tx was already
313320
/// included or an unknown error occurred
314-
async fn broadcast_tx_if_escalated(
321+
async fn broadcast_tx(
315322
&self,
316323
old_monitored_tx: MonitoredTransaction,
317324
new_tx: TypedTransaction,
318-
) -> Option<(H256, Instant)>
325+
) -> Option<(Option<H256>, Instant)>
319326
where
320327
M: Middleware,
321328
E: GasEscalator,
322329
{
323-
// gas price wasn't escalated
324-
// keep monitoring the old tx
325-
if old_monitored_tx.inner.eq(&new_tx) {
326-
return Some((old_monitored_tx.hash, old_monitored_tx.creation_time));
327-
}
328-
329330
// send a replacement tx with the escalated gas price
330331
match self.inner.send_transaction(new_tx.clone(), old_monitored_tx.block).await {
331332
Ok(new_tx_hash) => {
@@ -338,7 +339,7 @@ impl<M, E: Clone> EscalationTask<M, E> {
338339
);
339340
// Return the new tx hash to monitor and the time it was created.
340341
// The latter is used to know when to escalate the gas price again
341-
Some((new_tx_hash, Instant::now()))
342+
Some((Some(new_tx_hash), Instant::now()))
342343
}
343344
Err(err) => Self::handle_broadcast_error(err.to_string(), old_monitored_tx, new_tx),
344345
}
@@ -360,34 +361,41 @@ impl<M, E: Clone> EscalationTask<M, E> {
360361
tracing::trace!(?monitored_txs, "In the escalator watcher loop. Monitoring txs");
361362
}
362363
let mut new_txs_to_monitor = vec![];
363-
for monitored_tx in monitored_txs {
364-
let receipt = self
365-
.inner
366-
.get_transaction_receipt(monitored_tx.hash)
367-
.await
368-
.map_err(GasEscalatorError::MiddlewareError)?;
369-
370-
tracing::trace!(tx_hash = ?monitored_tx.hash, "checking if exists");
364+
for old_monitored_tx in monitored_txs {
365+
let receipt = if let Some(tx_hash) = old_monitored_tx.hash {
366+
tracing::trace!(tx_hash = ?old_monitored_tx.hash, "checking if exists");
367+
self.inner
368+
.get_transaction_receipt(tx_hash)
369+
.await
370+
.map_err(GasEscalatorError::MiddlewareError)?
371+
} else {
372+
None
373+
};
371374

372-
if receipt.is_some() {
375+
if let Some(receipt) = receipt {
373376
// tx was already included, can drop from escalator
374-
tracing::debug!(tx = ?monitored_tx.hash, "Transaction was included onchain, dropping from escalator");
377+
tracing::debug!(tx = ?receipt.transaction_hash, "Transaction was included onchain, dropping from escalator");
375378
continue;
376379
}
377-
let Some(new_tx) = monitored_tx.escalate_gas_price(self.escalator.clone()) else {
378-
tracing::error!(tx=?monitored_tx.hash, "gas price is not set for transaction, dropping from escalator");
380+
let Some(new_tx) = old_monitored_tx.escalate_gas_price(self.escalator.clone()) else {
381+
tracing::error!(tx=?old_monitored_tx.hash, "gas price is not set for transaction, dropping from escalator");
379382
continue;
380383
};
381384

382-
let maybe_tx_to_monitor =
383-
self.broadcast_tx_if_escalated(monitored_tx.clone(), new_tx.clone()).await;
385+
// gas price wasn't escalated
386+
// keep monitoring the old tx
387+
let maybe_tx_to_monitor = if old_monitored_tx.inner.eq(&new_tx) {
388+
Some((old_monitored_tx.hash, old_monitored_tx.creation_time))
389+
} else {
390+
self.broadcast_tx(old_monitored_tx.clone(), new_tx.clone()).await
391+
};
384392

385393
if let Some((new_txhash, new_creation_time)) = maybe_tx_to_monitor {
386394
new_txs_to_monitor.push(MonitoredTransaction {
387395
hash: new_txhash,
388396
inner: new_tx,
389397
creation_time: new_creation_time,
390-
block: monitored_tx.block,
398+
block: old_monitored_tx.block,
391399
});
392400
}
393401
}

ethers-middleware/src/nonce_manager.rs

+26
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
55
use thiserror::Error;
66
use tracing::instrument;
77

8+
const DEFAULT_TX_COUNT_FOR_RESYNC: u64 = 10;
9+
810
#[derive(Debug)]
911
/// Middleware used for calculating nonces locally, useful for signing multiple
1012
/// consecutive transactions without waiting for them to hit the mempool
@@ -13,6 +15,8 @@ pub struct NonceManagerMiddleware<M> {
1315
init_guard: futures_locks::Mutex<()>,
1416
initialized: AtomicBool,
1517
nonce: AtomicU64,
18+
tx_count_for_resync: Option<AtomicU64>,
19+
txs_since_resync: AtomicU64,
1620
address: Address,
1721
}
1822

@@ -28,6 +32,8 @@ where
2832
init_guard: Default::default(),
2933
initialized: Default::default(),
3034
nonce: Default::default(),
35+
tx_count_for_resync: Default::default(),
36+
txs_since_resync: 0u64.into(),
3137
address,
3238
}
3339
}
@@ -38,6 +44,13 @@ where
3844
nonce.into()
3945
}
4046

47+
pub fn get_tx_count_for_resync(&self) -> u64 {
48+
self.tx_count_for_resync
49+
.as_ref()
50+
.map(|count| count.load(Ordering::SeqCst))
51+
.unwrap_or(DEFAULT_TX_COUNT_FOR_RESYNC)
52+
}
53+
4154
pub async fn initialize_nonce(
4255
&self,
4356
block: Option<BlockId>,
@@ -142,7 +155,20 @@ where
142155
tracing::debug!(?nonce, "Sending transaction");
143156
match self.inner.send_transaction(tx.clone(), block).await {
144157
Ok(pending_tx) => {
158+
let txs_since_resync = self.txs_since_resync.load(Ordering::SeqCst);
159+
let new_txs_since_resync = txs_since_resync + 1;
145160
tracing::debug!(?nonce, "Sent transaction");
161+
let tx_count_for_resync = self.get_tx_count_for_resync();
162+
if new_txs_since_resync >= tx_count_for_resync {
163+
let onchain_nonce = self.get_transaction_count(self.address, block).await?;
164+
self.nonce.store(onchain_nonce.as_u64(), Ordering::SeqCst);
165+
self.txs_since_resync.store(0, Ordering::SeqCst);
166+
tracing::debug!(?nonce, "Resynced internal nonce with onchain nonce");
167+
} else {
168+
self.txs_since_resync.store(new_txs_since_resync, Ordering::SeqCst);
169+
let txs_until_resync = tx_count_for_resync - new_txs_since_resync;
170+
tracing::debug!(?txs_until_resync, "Transactions until nonce resync");
171+
}
146172
Ok(pending_tx)
147173
}
148174
Err(err) => {

0 commit comments

Comments
 (0)