-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Implement eth/65 (EIP-2464) #11626
base: master
Are you sure you want to change the base?
Implement eth/65 (EIP-2464) #11626
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ rlp = "0.4.5" | |
snapshot = { path = "../snapshot" } | ||
trace-time = "0.1" | ||
triehash-ethereum = { version = "0.2", path = "../../util/triehash-ethereum" } | ||
transaction-pool = "2" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it might be worth to introduce some abstraction to avoid introducing this dependency. For instance |
||
|
||
[dev-dependencies] | ||
env_logger = "0.5" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,14 +26,12 @@ use crate::{ | |
sync_packet::{ | ||
PacketInfo, | ||
SyncPacket::{ | ||
self, BlockBodiesPacket, BlockHeadersPacket, NewBlockHashesPacket, NewBlockPacket, | ||
PrivateStatePacket, PrivateTransactionPacket, ReceiptsPacket, SignedPrivateTransactionPacket, | ||
SnapshotDataPacket, SnapshotManifestPacket, StatusPacket, | ||
self, *, | ||
} | ||
}, | ||
BlockSet, ChainSync, ForkConfirmation, PacketProcessError, PeerAsking, PeerInfo, SyncRequester, | ||
SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES, | ||
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4, | ||
SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_65, | ||
MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4, | ||
} | ||
}; | ||
|
||
|
@@ -53,6 +51,7 @@ use common_types::{ | |
verification::Unverified, | ||
snapshot::{ManifestData, RestorationStatus}, | ||
}; | ||
use transaction_pool::VerifiedTransaction; | ||
|
||
|
||
/// The Chain Sync Handler: handles responses from peers | ||
|
@@ -70,6 +69,8 @@ impl SyncHandler { | |
ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp), | ||
NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp), | ||
NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp), | ||
NewPooledTransactionHashesPacket => SyncHandler::on_peer_new_pooled_transactions(sync, io, peer, &rlp), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In theory The reason for that is that we try to avoid locking I think responding to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If my understanding is correct, |
||
PooledTransactionsPacket => SyncHandler::on_peer_pooled_transactions(sync, io, peer, &rlp), | ||
SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp), | ||
SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp), | ||
PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp), | ||
|
@@ -595,9 +596,11 @@ impl SyncHandler { | |
difficulty, | ||
latest_hash, | ||
genesis, | ||
unsent_pooled_hashes: if eth_protocol_version >= ETH_PROTOCOL_VERSION_65.0 { Some(io.chain().transactions_to_propagate().into_iter().map(|tx| *tx.hash()).collect()) } else { None }, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using We support a more efficient way to query all hashes from the transaction pool (see There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there is also some recommended limit in the spec, isn't there? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should I somehow expose |
||
asking: PeerAsking::Nothing, | ||
asking_blocks: Vec::new(), | ||
asking_hash: None, | ||
asking_pooled_transactions: if eth_protocol_version >= ETH_PROTOCOL_VERSION_65.0 { Some(Vec::new()) } else { None }, | ||
asking_private_state: None, | ||
ask_time: Instant::now(), | ||
last_sent_transactions: Default::default(), | ||
|
@@ -656,7 +659,7 @@ impl SyncHandler { | |
|
||
if false | ||
|| (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_4.0)) | ||
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_64.0)) | ||
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_65.0)) | ||
{ | ||
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version); | ||
return Err(DownloaderImportError::Invalid); | ||
|
@@ -703,6 +706,61 @@ impl SyncHandler { | |
Ok(()) | ||
} | ||
|
||
/// Called when peer sends us a set of new pooled transactions | ||
pub fn on_peer_new_pooled_transactions(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: &Rlp) -> Result<(), DownloaderImportError> { | ||
vorot93 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for item in tx_rlp { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have you considered parsing the hashes first and then passing a batch request to the transaction pool? I think lock-wise it might be a better way to actually retrieve data for 4k transactions. Especially given that the only thing you care is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should I move it into |
||
let hash = item.as_val::<H256>().map_err(|_| DownloaderImportError::Invalid)?; | ||
|
||
if io.chain().queued_transaction(hash).is_none() { | ||
let unfetched = sync.unfetched_pooled_transactions.entry(hash).or_insert_with(|| super::UnfetchedTransaction { | ||
announcer: peer_id, | ||
next_fetch: Instant::now(), | ||
tries: 0, | ||
}); | ||
|
||
// Only reset the budget if we hear from multiple sources | ||
if unfetched.announcer != peer_id { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't we keep a list of announcers then? In case you:
you will keep resetting the budget, which seems weird. Why is the first peer to announce it unpriviledged? I think the idea here is that everytime we have a peer that is looking for a particular hash (and we don't have it either) we should be updating our budget (and indirectly prioritise looking for that hash), no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there is a potential attack vector in that peer A may erroneously announce (or even keep announcing) a non-existent hash and thus clog our request queue. The mitigation here is the assumption that the hash is guaranteed to exist in the network if >1 peer can corroborate its existence. |
||
unfetched.next_fetch = Instant::now(); | ||
unfetched.tries = 0; | ||
} | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Called when peer sends us a list of pooled transactions | ||
pub fn on_peer_pooled_transactions(sync: &ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: &Rlp) -> Result<(), DownloaderImportError> { | ||
let peer = match sync.peers.get(&peer_id).filter(|p| p.can_sync()) { | ||
Some(peer) => peer, | ||
None => { | ||
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id); | ||
return Ok(()); | ||
} | ||
}; | ||
|
||
// TODO: actually check against asked hashes | ||
let item_count = tx_rlp.item_count()?; | ||
if let Some(p) = &peer.asking_pooled_transactions { | ||
if item_count > p.len() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As per todo, this should be have an additional intersection check. |
||
trace!(target: "sync", "{} Peer sent us more transactions than was supposed to", peer_id); | ||
return Err(DownloaderImportError::Invalid); | ||
} | ||
} else { | ||
trace!(target: "sync", "{} Peer sent us pooled transactions but does not declare support for them", peer_id); | ||
return Err(DownloaderImportError::Invalid); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will that cause disconnection? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes |
||
} | ||
trace!(target: "sync", "{:02} -> PooledTransactions ({} entries)", peer_id, item_count); | ||
let mut transactions = Vec::with_capacity(item_count); | ||
for i in 0 .. item_count { | ||
let rlp = tx_rlp.at(i)?; | ||
let tx = rlp.as_raw().to_vec(); | ||
transactions.push(tx); | ||
} | ||
io.chain().queue_transactions(transactions, peer_id); | ||
Ok(()) | ||
} | ||
|
||
/// Called when peer sends us signed private transaction packet | ||
fn on_signed_private_transaction(sync: &mut ChainSync, _io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { | ||
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -166,6 +166,8 @@ impl From<DecoderError> for PacketProcessError { | |||||
} | ||||||
} | ||||||
|
||||||
/// Version 65 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count). | ||||||
pub const ETH_PROTOCOL_VERSION_65: (u8, u8) = (65, 0x11); | ||||||
/// Version 64 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count). | ||||||
pub const ETH_PROTOCOL_VERSION_64: (u8, u8) = (64, 0x11); | ||||||
/// Version 63 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count). | ||||||
|
@@ -217,6 +219,7 @@ const STATUS_TIMEOUT: Duration = Duration::from_secs(10); | |||||
const HEADERS_TIMEOUT: Duration = Duration::from_secs(15); | ||||||
const BODIES_TIMEOUT: Duration = Duration::from_secs(20); | ||||||
const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10); | ||||||
const POOLED_TRANSACTIONS_TIMEOUT: Duration = Duration::from_secs(10); | ||||||
const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3); | ||||||
/// Max time to wait for the Snapshot Manifest packet to arrive from a peer after it's being asked. | ||||||
const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5); | ||||||
|
@@ -318,6 +321,7 @@ pub enum PeerAsking { | |||||
BlockHeaders, | ||||||
BlockBodies, | ||||||
BlockReceipts, | ||||||
PooledTransactions, | ||||||
SnapshotManifest, | ||||||
SnapshotData, | ||||||
PrivateState, | ||||||
|
@@ -352,6 +356,8 @@ pub struct PeerInfo { | |||||
network_id: u64, | ||||||
/// Peer best block hash | ||||||
latest_hash: H256, | ||||||
/// Unpropagated tx pool hashes | ||||||
unsent_pooled_hashes: Option<H256FastSet>, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the point of |
||||||
/// Peer total difficulty if known | ||||||
difficulty: Option<U256>, | ||||||
/// Type of data currently being requested by us from a peer. | ||||||
|
@@ -360,6 +366,8 @@ pub struct PeerInfo { | |||||
asking_blocks: Vec<H256>, | ||||||
/// Holds requested header hash if currently requesting block header by hash | ||||||
asking_hash: Option<H256>, | ||||||
/// Holds requested transaction IDs | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
? |
||||||
asking_pooled_transactions: Option<Vec<H256>>, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
/// Holds requested private state hash | ||||||
asking_private_state: Option<H256>, | ||||||
/// Holds requested snapshot chunk hash if any. | ||||||
|
@@ -669,6 +677,13 @@ enum PeerState { | |||||
SameBlock | ||||||
} | ||||||
|
||||||
#[derive(Clone, MallocSizeOf)] | ||||||
struct UnfetchedTransaction { | ||||||
announcer: PeerId, | ||||||
next_fetch: Instant, | ||||||
tries: usize, | ||||||
} | ||||||
|
||||||
/// Blockchain sync handler. | ||||||
/// See module documentation for more details. | ||||||
#[derive(MallocSizeOf)] | ||||||
|
@@ -708,6 +723,8 @@ pub struct ChainSync { | |||||
sync_start_time: Option<Instant>, | ||||||
/// Transactions propagation statistics | ||||||
transactions_stats: TransactionsStats, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need changes here too to account for |
||||||
/// Transactions whose hash has been announced, but that we have not fetched | ||||||
unfetched_pooled_transactions: H256FastMap<UnfetchedTransaction>, | ||||||
/// Enable ancient block downloading | ||||||
download_old_blocks: bool, | ||||||
/// Shared private tx service. | ||||||
|
@@ -751,6 +768,7 @@ impl ChainSync { | |||||
snapshot: Snapshot::new(), | ||||||
sync_start_time: None, | ||||||
transactions_stats: TransactionsStats::default(), | ||||||
unfetched_pooled_transactions: Default::default(), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
(to me it's more readable to see the actual type, but maybe it's just me) |
||||||
private_tx_handler, | ||||||
warp_sync: config.warp_sync, | ||||||
status_sinks: Vec::new() | ||||||
|
@@ -764,7 +782,7 @@ impl ChainSync { | |||||
let last_imported_number = self.new_blocks.last_imported_block_number(); | ||||||
SyncStatus { | ||||||
state: self.state.clone(), | ||||||
protocol_version: ETH_PROTOCOL_VERSION_64.0, | ||||||
protocol_version: ETH_PROTOCOL_VERSION_65.0, | ||||||
network_id: self.network_id, | ||||||
start_block_number: self.starting_block, | ||||||
last_imported_block_number: Some(last_imported_number), | ||||||
|
@@ -798,8 +816,17 @@ impl ChainSync { | |||||
|
||||||
/// Updates the set of transactions recently sent to this peer to avoid spamming. | ||||||
pub fn transactions_received(&mut self, txs: &[UnverifiedTransaction], peer_id: PeerId) { | ||||||
if let Some(peer_info) = self.peers.get_mut(&peer_id) { | ||||||
peer_info.last_sent_transactions.extend(txs.iter().map(|tx| tx.hash())); | ||||||
for (id, peer) in &mut self.peers { | ||||||
let hashes = txs.iter().map(|tx| tx.hash()); | ||||||
if *id == peer_id { | ||||||
peer.last_sent_transactions.extend(hashes); | ||||||
} else if let Some(s) = &mut peer.unsent_pooled_hashes { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't it be better to keep |
||||||
s.extend(hashes); | ||||||
} | ||||||
} | ||||||
|
||||||
for tx in txs { | ||||||
self.unfetched_pooled_transactions.remove(&tx.hash()); | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -1149,6 +1176,48 @@ impl ChainSync { | |||||
} | ||||||
} | ||||||
|
||||||
// get the peer to give us at least some of announced but unfetched transactions | ||||||
if !self.unfetched_pooled_transactions.is_empty() { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO the whole code could be rewritten more clearly as follows. I did change the code significantly (even the logic) cause the previous one did not make any sense to me. It could easily get stuck in an endless loop forever. // Consider asking the peer for transactions we need.
if self.unfetched_pooled_transactions.is_empty() {
return;
}
let asking = self.peers.get_mut(&peer_id).as_mut().map(|peer| &mut peer.asking_pooled_transactions);
let mut asking = match asking {
Some(ref mut asking) => asking,
// no peer or peer does not support eth65
None => return;
};
const MAX_ASKING: usize = 256;
// we already have too many requests scheduled, don't do anything.
if asking.len() > MAX_ASKING {
return;
}
let now = Instant::now();
// we assume that only few transactions will actually be added to `asking`, so we only collect
// hashes to remove.
let mut remove_from_unfetched = vec![];
for (ref hash, ref mt item) in self.unfetched_pooled_transactions.iter_mut() {
// this function moves the internal state of the item (checking against `now`, increasing counters, etc).
match item.poll(&now) {
NotReadyToRefetch => {},
ReadyToRefetch => {
asking.insert(*hash);
if asking.len() > MAX_ASKING {
break;
}
},
// I've made up this case, but I really think we should actually remove items if there is no way
// we can retrieve them. The previous code was pretty much never removing anything from `unfetched`
// (except for the case when we actually retrieve the hash)
RefetchTimeout => {
remove_from_unfetched.push(*hash);
}
}
for hash in remove_from_unfeched {
self.unfetched_pooled_transactions.remove(&hash);
}
// avoid copying, we can just pass an iterator instead of a slice.
SyncRequester::request_pooled_transactions(self, io, peer_id, asking.iter()); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have rewritten this part with additional comments, hopefully it's clear now. I could adopt your version if you still think it's bad though. |
||||||
if let Some(s) = &mut self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions { | ||||||
let now = Instant::now(); | ||||||
|
||||||
let mut new_asking_pooled_transactions = s.iter().copied().collect::<HashSet<_>>(); | ||||||
let mut remaining_unfetched_pooled_transactions = self.unfetched_pooled_transactions.clone(); | ||||||
for (hash, mut item) in self.unfetched_pooled_transactions.drain() { | ||||||
if new_asking_pooled_transactions.len() >= 256 { | ||||||
// can't request any more transactions | ||||||
break; | ||||||
} | ||||||
|
||||||
// if enough time has passed since last attempt... | ||||||
if item.next_fetch < now { | ||||||
// ...queue this hash for requesting | ||||||
new_asking_pooled_transactions.insert(hash); | ||||||
item.tries += 1; | ||||||
|
||||||
// if we just started asking for it, queue it to be asked later on again | ||||||
if item.tries < 5 { | ||||||
item.next_fetch = now + (POOLED_TRANSACTIONS_TIMEOUT / 2); | ||||||
remaining_unfetched_pooled_transactions.insert(hash, item); | ||||||
} else { | ||||||
// ...otherwise we assume this transaction does not exist and remove its hash from request queue | ||||||
remaining_unfetched_pooled_transactions.remove(&hash); | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
let new_asking_pooled_transactions = new_asking_pooled_transactions.into_iter().collect::<Vec<_>>(); | ||||||
SyncRequester::request_pooled_transactions(self, io, peer_id, &new_asking_pooled_transactions); | ||||||
|
||||||
self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions = Some(new_asking_pooled_transactions); | ||||||
self.unfetched_pooled_transactions = remaining_unfetched_pooled_transactions; | ||||||
|
||||||
return; | ||||||
} else { | ||||||
trace!(target: "sync", "Skipping transaction fetch for peer {} as they don't support eth/65", peer_id); | ||||||
} | ||||||
} | ||||||
|
||||||
// Only ask for old blocks if the peer has an equal or higher difficulty | ||||||
let equal_or_higher_difficulty = peer_difficulty.map_or(true, |pd| pd >= syncing_difficulty); | ||||||
|
||||||
|
@@ -1340,6 +1409,7 @@ impl ChainSync { | |||||
PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT, | ||||||
PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT, | ||||||
PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT, | ||||||
PeerAsking::PooledTransactions => elapsed > POOLED_TRANSACTIONS_TIMEOUT, | ||||||
PeerAsking::Nothing => false, | ||||||
PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT, | ||||||
PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT, | ||||||
|
@@ -1668,10 +1738,12 @@ pub mod tests { | |||||
genesis: H256::zero(), | ||||||
network_id: 0, | ||||||
latest_hash: peer_latest_hash, | ||||||
unsent_pooled_hashes: Some(Default::default()), | ||||||
difficulty: None, | ||||||
asking: PeerAsking::Nothing, | ||||||
asking_blocks: Vec::new(), | ||||||
asking_hash: None, | ||||||
asking_pooled_transactions: Some(Vec::new()), | ||||||
asking_private_state: None, | ||||||
ask_time: Instant::now(), | ||||||
last_sent_transactions: Default::default(), | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
importer
separation was initially there to try to separate import pipeline from the client (which would become responsible only for queries). However I'm not sure how much it's still relevant, so I don't believe any change is required here, just giving some context.