diff --git a/binaries/cuprated/src/config/p2p.rs b/binaries/cuprated/src/config/p2p.rs index 98ce9adad..77eeec093 100644 --- a/binaries/cuprated/src/config/p2p.rs +++ b/binaries/cuprated/src/config/p2p.rs @@ -101,6 +101,8 @@ impl From for cuprate_p2p::block_downloader::BlockDownloa Self { buffer_bytes: value.buffer_bytes, in_progress_queue_bytes: value.in_progress_queue_bytes, + order_blocks: true, + stop_height: None, check_client_pool_interval: value.check_client_pool_interval, target_batch_bytes: value.target_batch_bytes, initial_batch_len: 1, diff --git a/p2p/p2p/src/block_downloader.rs b/p2p/p2p/src/block_downloader.rs index fbd27d1af..64579cd52 100644 --- a/p2p/p2p/src/block_downloader.rs +++ b/p2p/p2p/src/block_downloader.rs @@ -65,6 +65,12 @@ pub struct BlockDownloaderConfig { pub buffer_bytes: usize, /// The size of the in progress queue (in bytes) at which we stop requesting more blocks. pub in_progress_queue_bytes: usize, + /// Whether blocks be ordered before being returned. + pub order_blocks: bool, + /// The height we should sync to, the block at this height will _not_ be synced. + /// + /// If [`None`] we will sync all blocks we can find. + pub stop_height: Option, /// The [`Duration`] between checking the client pool for free peers. pub check_client_pool_interval: Duration, /// The target size of a single batch of blocks (in bytes). @@ -251,7 +257,7 @@ where block_download_tasks: JoinSet::new(), chain_entry_task: JoinSet::new(), inflight_requests: BTreeMap::new(), - block_queue: BlockQueue::new(buffer_appender), + block_queue: BlockQueue::new(buffer_appender, config.order_blocks), failed_batches: BinaryHeap::new(), config, } @@ -450,6 +456,7 @@ where ) -> Option> { // We send 2 requests, so if one of them is slow or doesn't have the next chain, we still have a backup. if self.chain_entry_task.len() < 2 + && self.config.stop_height.is_none_or(|stop_height| chain_tracker.top_height() < stop_height) // If we have had too many failures then assume the tip has been found so no more chain entries. && self.amount_of_empty_chain_entries <= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED // Check we have a big buffer of pending block IDs to retrieve, we don't want to be waiting around @@ -633,7 +640,7 @@ where /// Starts the main loop of the block downloader. async fn run(mut self) -> Result<(), BlockDownloadError> { let mut chain_tracker = - initial_chain_search(&mut self.peer_set, &mut self.our_chain_svc).await?; + initial_chain_search(&mut self.peer_set, &mut self.our_chain_svc, self.config.stop_height).await?; let mut pending_peers = BTreeMap::new(); @@ -652,7 +659,7 @@ where self.check_for_free_clients(&mut chain_tracker, &mut pending_peers).await?; // If we have no inflight requests, and we have had too many empty chain entries in a row assume the top has been found. - if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED { + if self.inflight_requests.is_empty() && (self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED || self.config.stop_height.is_some_and(|h| chain_tracker.top_height() == h)) { tracing::debug!("Failed to find any more chain entries, probably fround the top"); return Ok(()); } @@ -666,7 +673,7 @@ where self.handle_download_batch_res(start_height, result, &mut chain_tracker, &mut pending_peers).await?; // If we have no inflight requests, and we have had too many empty chain entries in a row assume the top has been found. - if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED { + if self.inflight_requests.is_empty() && (self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED || self.config.stop_height.is_some_and(|h| chain_tracker.top_height() == h )) { tracing::debug!("Failed to find any more chain entries, probably fround the top"); return Ok(()); } @@ -674,7 +681,7 @@ where Some(Ok(res)) = self.chain_entry_task.join_next() => { match res { Ok((client, entry)) => { - match chain_tracker.add_entry(entry, &mut self.our_chain_svc).await { + match chain_tracker.add_entry(entry, &mut self.our_chain_svc, self.config.stop_height).await { Ok(()) => { tracing::debug!("Successfully added chain entry to chain tracker."); self.amount_of_empty_chain_entries = 0; diff --git a/p2p/p2p/src/block_downloader/block_queue.rs b/p2p/p2p/src/block_downloader/block_queue.rs index ba7c02ba0..e16ad75c3 100644 --- a/p2p/p2p/src/block_downloader/block_queue.rs +++ b/p2p/p2p/src/block_downloader/block_queue.rs @@ -41,9 +41,92 @@ impl Ord for ReadyQueueBatch { } } +/// The block queue that passes downloaded block batches to the receiver. +pub(crate) enum BlockQueue { + /// A queue that does order the batches. + Ordered(BlockQueueOrdered), + /// A queue that does not order the batches. + Unordered(BlockQueueUnordered), +} + +impl BlockQueue { + /// Creates a new [`BlockQueue`]. + pub(crate) const fn new( + buffer_appender: BufferAppender, + order_blocks: bool, + ) -> Self { + if order_blocks { + Self::Ordered(BlockQueueOrdered::new(buffer_appender)) + } else { + Self::Unordered(BlockQueueUnordered::new(buffer_appender)) + } + } + + /// Returns the oldest batch that has not been put in the [`async_buffer`] yet. + pub(crate) fn oldest_ready_batch(&self) -> Option { + match self { + Self::Ordered(q) => q.oldest_ready_batch(), + Self::Unordered(_) => None, + } + } + + /// Returns the size of all the batches that have not been put into the [`async_buffer`] yet. + pub(crate) const fn size(&self) -> usize { + match self { + Self::Ordered(q) => q.size(), + Self::Unordered(_) => 0, + } + } + + /// Adds an incoming batch to the queue and checks if we can push any batches into the [`async_buffer`]. + /// + /// `oldest_in_flight_start_height` should be the start height of the oldest batch that is still inflight, if + /// there are no batches inflight then this should be [`None`]. + pub(crate) async fn add_incoming_batch( + &mut self, + new_batch: ReadyQueueBatch, + oldest_in_flight_start_height: Option, + ) -> Result<(), BlockDownloadError> { + match self { + Self::Ordered(q) => { + q.add_incoming_batch(new_batch, oldest_in_flight_start_height) + .await + } + Self::Unordered(q) => q.add_incoming_batch(new_batch).await, + } + } +} + +/// A block queue that does not order the batches before giving them to the receiver. +pub(crate) struct BlockQueueUnordered { + /// The [`BufferAppender`] that gives blocks to Cuprate. + buffer_appender: BufferAppender, +} + +impl BlockQueueUnordered { + /// Creates a new [`BlockQueueUnordered`]. + const fn new(buffer_appender: BufferAppender) -> Self { + Self { buffer_appender } + } + + /// Pushes the batch into the [`async_buffer`]. + pub(crate) async fn add_incoming_batch( + &mut self, + new_batch: ReadyQueueBatch, + ) -> Result<(), BlockDownloadError> { + let size = new_batch.block_batch.size; + self.buffer_appender + .send(new_batch.block_batch, size) + .await + .map_err(|_| BlockDownloadError::BufferWasClosed)?; + + Ok(()) + } +} + /// The block queue that holds downloaded block batches, adding them to the [`async_buffer`] when the /// oldest batch has been downloaded. -pub(crate) struct BlockQueue { +pub(crate) struct BlockQueueOrdered { /// A queue of ready batches. ready_batches: BinaryHeap, /// The size, in bytes, of all the batches in [`Self::ready_batches`]. @@ -53,8 +136,8 @@ pub(crate) struct BlockQueue { buffer_appender: BufferAppender, } -impl BlockQueue { - /// Creates a new [`BlockQueue`]. +impl BlockQueueOrdered { + /// Creates a new [`BlockQueueOrdered`]. pub(crate) const fn new(buffer_appender: BufferAppender) -> Self { Self { ready_batches: BinaryHeap::new(), @@ -146,7 +229,7 @@ mod tests { block_on(async move { let (buffer_tx, mut buffer_rx) = cuprate_async_buffer::new_buffer(usize::MAX); - let mut queue = BlockQueue::new(buffer_tx); + let mut queue = BlockQueueOrdered::new(buffer_tx); let mut sorted_batches = BTreeSet::from_iter(batches.clone()); let mut soreted_batch_2 = sorted_batches.clone(); diff --git a/p2p/p2p/src/block_downloader/chain_tracker.rs b/p2p/p2p/src/block_downloader/chain_tracker.rs index 33f294f0c..fff13d211 100644 --- a/p2p/p2p/src/block_downloader/chain_tracker.rs +++ b/p2p/p2p/src/block_downloader/chain_tracker.rs @@ -1,5 +1,5 @@ use std::{cmp::min, collections::VecDeque, mem}; - +use rand_distr::num_traits::Saturating; use cuprate_fixed_bytes::ByteArrayVec; use tower::{Service, ServiceExt}; @@ -76,15 +76,29 @@ pub(crate) struct ChainTracker { impl ChainTracker { /// Creates a new chain tracker. pub(crate) async fn new( - new_entry: ChainEntry, + mut new_entry: ChainEntry, first_height: usize, our_genesis: [u8; 32], previous_hash: [u8; 32], our_chain_svc: &mut C, + stop_height: Option, ) -> Result where C: Service, Response = ChainSvcResponse, Error = tower::BoxError>, { + if let Some(stop_height) = stop_height { + let new_top_height = first_height + new_entry.ids.len(); + if new_top_height >= stop_height { + new_entry + .ids + .truncate(stop_height.saturating_sub(first_height)); + } + + if new_entry.ids.is_empty() { + return Err(ChainTrackerError::NewEntryIsEmpty); + } + } + let top_seen_hash = *new_entry.ids.last().unwrap(); let mut entries = VecDeque::with_capacity(1); entries.push_back(new_entry); @@ -149,6 +163,7 @@ impl ChainTracker { &mut self, mut chain_entry: ChainEntry, our_chain_svc: &mut C, + stop_height: Option, ) -> Result<(), ChainTrackerError> where C: Service, Response = ChainSvcResponse, Error = tower::BoxError>, @@ -167,13 +182,30 @@ impl ChainTracker { return Err(ChainTrackerError::NewEntryDoesNotFollowChain); } - let new_entry = ChainEntry { + let mut new_entry = ChainEntry { // ignore the first block - we already know it. ids: chain_entry.ids.split_off(1), peer: chain_entry.peer, handle: chain_entry.handle, }; + if let Some(stop_height) = stop_height { + let new_top_height = self.top_height() + new_entry.ids.len(); + if new_top_height >= stop_height { + new_entry + .ids + .truncate(stop_height.saturating_sub(self.top_height())); + } + + if self.top_height() + new_entry.ids.len() > stop_height { + panic!() + } + + if new_entry.ids.is_empty() { + return Err(ChainTrackerError::NewEntryIsEmpty); + } + } + self.top_seen_hash = *new_entry.ids.last().unwrap(); self.unknown_entries.push_back(new_entry); diff --git a/p2p/p2p/src/block_downloader/request_chain.rs b/p2p/p2p/src/block_downloader/request_chain.rs index 659574ad2..ed52d0e6a 100644 --- a/p2p/p2p/src/block_downloader/request_chain.rs +++ b/p2p/p2p/src/block_downloader/request_chain.rs @@ -82,6 +82,7 @@ pub(crate) async fn request_chain_entry_from_peer( pub(super) async fn initial_chain_search( peer_set: &mut BoxCloneService, tower::BoxError>, mut our_chain_svc: C, + stop_height: Option ) -> Result, BlockDownloadError> where C: Service, Response = ChainSvcResponse, Error = tower::BoxError>, @@ -220,6 +221,7 @@ where our_genesis, previous_id, &mut our_chain_svc, + stop_height ) .await .map_err(|_| BlockDownloadError::ChainInvalid)?; diff --git a/p2p/p2p/src/block_downloader/tests.rs b/p2p/p2p/src/block_downloader/tests.rs index 098c5ee42..2fd7b4fed 100644 --- a/p2p/p2p/src/block_downloader/tests.rs +++ b/p2p/p2p/src/block_downloader/tests.rs @@ -15,6 +15,9 @@ use monero_serai::{ transaction::{Input, Timelock, Transaction, TransactionPrefix}, }; use proptest::{collection::vec, prelude::*}; +use proptest::sample::SizeRange; +use proptest::strategy::ValueTree; +use proptest::test_runner::TestRunner; use tokio::{sync::mpsc, time::timeout}; use tower::{buffer::Buffer, service_fn, Service, ServiceExt}; @@ -43,9 +46,9 @@ proptest! { timeout: 60 * 1000, .. ProptestConfig::default() })] - + #[test] - fn test_block_downloader(blockchain in dummy_blockchain_stragtegy(), peers in 1_usize..128) { + fn test_block_downloader(blockchain in dummy_blockchain_stragtegy(1..50_000), peers in 1_usize..128) { let blockchain = Arc::new(blockchain); let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); @@ -70,6 +73,8 @@ proptest! { BlockDownloaderConfig { buffer_bytes: 1_000, in_progress_queue_bytes: 10_000, + order_blocks: true, + stop_height: None, check_client_pool_interval: Duration::from_secs(5), target_batch_bytes: 5_000, initial_batch_len: 1, @@ -85,6 +90,50 @@ proptest! { }).await }).unwrap(); } + + #[test] + fn test_block_downloader_unordered(blockchain in dummy_blockchain_stragtegy(15_000..20_000), peers in 1_usize..128) { + let blockchain = Arc::new(blockchain); + + let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); + + tokio_pool.block_on(async move { + timeout(Duration::from_secs(600), async move { + let (new_connection_tx, new_connection_rx) = mpsc::channel(peers); + + let peer_set = PeerSet::new(new_connection_rx); + + for _ in 0..peers { + let client = mock_block_downloader_client(Arc::clone(&blockchain)); + + new_connection_tx.try_send(client).unwrap(); + } + + let stream = download_blocks( + Buffer::new(peer_set, 10).boxed_clone(), + OurChainSvc { + genesis: *blockchain.blocks.first().unwrap().0 + }, + BlockDownloaderConfig { + buffer_bytes: 1_000, + in_progress_queue_bytes: 10_000, + order_blocks: false, + stop_height: Some(15_000), + check_client_pool_interval: Duration::from_secs(5), + target_batch_bytes: 5_000, + initial_batch_len: 1, + }); + + let blocks = stream.map(|blocks| blocks.blocks).concat().await; + + assert_eq!(blocks.len(), 14_999); + + for block in blocks.into_iter() { + assert!(block.0.number().unwrap() < 15_000); + } + }).await + }).unwrap(); + } } prop_compose! { @@ -148,8 +197,8 @@ impl Debug for MockBlockchain { prop_compose! { /// Returns a strategy to generate a [`MockBlockchain`]. - fn dummy_blockchain_stragtegy()( - blocks in vec(dummy_block_stragtegy(0, [0; 32]), 1..50_000), + fn dummy_blockchain_stragtegy(size: impl Into)( + blocks in vec(dummy_block_stragtegy(0, [0; 32]), size), ) -> MockBlockchain { let mut blockchain = IndexMap::new();