diff --git a/anchor/eth/src/event_processor.rs b/anchor/eth/src/event_processor.rs index e3baed3ea..7f597dde4 100644 --- a/anchor/eth/src/event_processor.rs +++ b/anchor/eth/src/event_processor.rs @@ -324,7 +324,7 @@ impl EventProcessor { })?; // Schedule validator for index lookup - if let Err(err) = index_lookup_queue.blocking_send(validator_pubkey) { + if let Err(err) = index_lookup_queue.send(validator_pubkey) { error!(?err, "Failed to send validator to index lookup"); } diff --git a/anchor/eth/src/index_sync.rs b/anchor/eth/src/index_sync.rs index 9e73ca08a..1d2b9a55f 100644 --- a/anchor/eth/src/index_sync.rs +++ b/anchor/eth/src/index_sync.rs @@ -8,18 +8,17 @@ use ssv_types::{ValidatorIndex, ValidatorMetadata}; use task_executor::TaskExecutor; use tokio::{ select, - sync::mpsc::{channel, Receiver, Sender}, + sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, time::sleep, }; use tracing::{debug, error, info, warn}; use types::PublicKeyBytes; -pub type Tx = Sender; +pub type Tx = UnboundedSender; const INDEX_SYNCER_NAME: &str = "validator_index_syncer"; const MAX_BATCH_SIZE: usize = 512; -const QUEUE_SIZE: usize = 4096; const BATCHING_DELAY: Duration = Duration::from_secs(1); const MAX_DELAY: Duration = Duration::from_secs(45); @@ -28,7 +27,7 @@ pub fn start_validator_index_syncer( db: Arc, executor: TaskExecutor, ) -> Tx { - let (tx, rx) = channel(QUEUE_SIZE); + let (tx, rx) = unbounded_channel(); executor.spawn(validator_index_syncer(nodes, db, rx), INDEX_SYNCER_NAME); tx } @@ -36,7 +35,7 @@ pub fn start_validator_index_syncer( async fn validator_index_syncer( nodes: Arc>, db: Arc, - mut validator_queue_rx: Receiver, + mut validator_queue_rx: UnboundedReceiver, ) { info!("Starting validator index syncer");