Skip to content

Commit

Permalink
Return closest nodes to topics
Browse files Browse the repository at this point in the history
  • Loading branch information
emhane committed Jun 16, 2022
1 parent b87ad5c commit 7676752
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 25 deletions.
5 changes: 1 addition & 4 deletions src/advertisement/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ async fn ticket_wait_time_duration() {
// Add an add for topic
ads.insert(enr, topic).unwrap();

assert_gt!(
ads.ticket_wait_time(topic),
Some(Duration::from_secs(2))
);
assert_gt!(ads.ticket_wait_time(topic), Some(Duration::from_secs(2)));
assert_lt!(ads.ticket_wait_time(topic), Some(Duration::from_secs(3)));
}

Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub struct Discv5Config {
pub topic_radius: u64,

pub topic_query_timeout: Duration,
pub topics_num_results: usize,
pub topic_query_peers: usize,

/// A custom executor which can spawn the discv5 tasks. This must be a tokio runtime, with
/// timing support. By default, the executor that created the discv5 struct will be used.
Expand Down Expand Up @@ -146,7 +146,7 @@ impl Default for Discv5Config {
ban_duration: Some(Duration::from_secs(3600)), // 1 hour
topic_radius: 256,
topic_query_timeout: Duration::from_secs(60),
topics_num_results: 16,
topic_query_peers: 10,
ip_mode: IpMode::default(),
executor: None,
}
Expand Down
16 changes: 8 additions & 8 deletions src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use crate::{
};
use enr::{CombinedKey, NodeId};
use futures::prelude::*;
use more_asserts::debug_unreachable;
use parking_lot::RwLock;
use std::{
collections::HashMap,
Expand Down Expand Up @@ -694,7 +693,10 @@ impl Handler {
topic,
enr: _,
ticket: _,
} => HandlerOut::EstablishedTopic(enr, connection_direction, topic),
}
| RequestBody::TopicQuery { topic } => {
HandlerOut::EstablishedTopic(enr, connection_direction, topic)
}
_ => HandlerOut::Established(enr, connection_direction),
};
self.service_send
Expand Down Expand Up @@ -1006,16 +1008,14 @@ impl Handler {
if let Some(remaining_responses) = request_call.remaining_responses.as_mut() {
*remaining_responses -= 1;
let reinsert = match request_call.request.body {
RequestBody::FindNode { .. } | RequestBody::TopicQuery { .. } => {
remaining_responses > &mut 0
}
// The request is reinserted for either another nodes response, a ticket or a
// register confirmation response that may come, otherwise the request times out.
RequestBody::RegisterTopic { .. } => remaining_responses >= &mut 0,
_ => {
debug_unreachable!("Only FINDNODE, TOPICQUERY and REGISTERTOPIC expect nodes response");
false
RequestBody::TopicQuery { .. } => {
// remove from some map of NODES and AD NODES
remaining_responses >= &mut 0
}
_ => remaining_responses > &mut 0,
};
if reinsert {
// more responses remaining, add back the request and send the response
Expand Down
101 changes: 91 additions & 10 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl Service {
ticket_pools: TicketPools::default(),
active_topic_queries: ActiveTopicQueries::new(
config.topic_query_timeout,
config.topics_num_results,
config.max_nodes_response,
),
exit,
config: config.clone(),
Expand Down Expand Up @@ -666,12 +666,6 @@ impl Service {
if callback.send(found_enrs).is_err() {
warn!("Callback dropped for query {}. Results dropped", *id);
}
} else {
let QueryType::FindNode(node_id) = result.target.query_type;
let topic = TopicHash::from_raw(node_id.raw());
if self.topics.contains_key(&topic){
// add to topic kbuckets?
}
}
}
}
Expand Down Expand Up @@ -978,6 +972,13 @@ impl Service {
if enr.node_id() == node_address.node_id
&& enr.udp4_socket().map(SocketAddr::V4) == Some(node_address.socket_addr)
{
self.send_topic_nodes_response(
topic,
node_address.clone(),
id.clone(),
"REGTOPIC".into(),
);

let wait_time = self
.ads
.ticket_wait_time(topic)
Expand Down Expand Up @@ -1060,6 +1061,12 @@ impl Service {
}
}
RequestBody::TopicQuery { topic } => {
self.send_topic_nodes_response(
topic,
node_address.clone(),
id.clone(),
"REGTOPIC".into(),
);
self.send_topic_query_response(node_address, id, topic);
}
}
Expand Down Expand Up @@ -1243,6 +1250,40 @@ impl Service {
});
}
}
RequestBody::RegisterTopic {
topic,
enr: _,
ticket: _,
} => {
if let Some(kbuckets) = self.topics_kbuckets.get_mut(&topic) {
for enr in nodes {
let peer_key: kbucket::Key<NodeId> = enr.node_id().into();
match kbuckets.insert_or_update(
&peer_key,
enr,
NodeStatus {
state: ConnectionState::Disconnected,
direction: ConnectionDirection::Incoming,
},
) {
InsertResult::Failed(FailureReason::BucketFull) => {
error!("Table full")
}
InsertResult::Failed(FailureReason::BucketFilter) => {
error!("Failed bucket filter")
}
InsertResult::Failed(FailureReason::TableFilter) => {
error!("Failed table filter")
}
InsertResult::Failed(FailureReason::InvalidSelfUpdate) => {
error!("Invalid self update")
}
InsertResult::Failed(_) => error!("Failed to insert ENR"),
_ => {}
}
}
}
}
RequestBody::FindNode { .. } => {
self.discovered(&node_id, nodes, active_request.query_id)
}
Expand Down Expand Up @@ -1599,6 +1640,46 @@ impl Service {
self.send_nodes_response(nodes_to_send, node_address, rpc_id, "TOPICQUERY");
}

fn send_topic_nodes_response(
&mut self,
topic: TopicHash,
node_address: NodeAddress,
id: RequestId,
req_type: String,
) {
let local_key: kbucket::Key<NodeId> = self.local_enr.read().node_id().into();
let topic_key: kbucket::Key<NodeId> = NodeId::new(&topic.as_bytes()).into();
let distance_to_topic = local_key.log2_distance(&topic_key);

let mut closest_peers: Vec<Enr> = Vec::new();
let closest_peers_length = closest_peers.len();
if let Some(distance) = distance_to_topic {
self.kbuckets
.write()
.nodes_by_distances(&[distance], self.config.max_nodes_response)
.iter()
.for_each(|entry| closest_peers.push(entry.node.value.clone()));

if closest_peers_length < self.config.max_nodes_response {
for entry in self
.kbuckets
.write()
.nodes_by_distances(
&[distance - 1, distance + 1],
self.config.max_nodes_response - closest_peers_length,
)
.iter()
{
if closest_peers_length > self.config.max_nodes_response {
break;
}
closest_peers.push(entry.node.value.clone());
}
}
}
self.send_nodes_response(closest_peers, node_address, id, &req_type);
}

/// Sends a NODES response, given a list of found ENR's. This function splits the nodes up
/// into multiple responses to ensure the response stays below the maximum packet size.
fn send_find_nodes_response(
Expand Down Expand Up @@ -1645,7 +1726,7 @@ impl Service {
nodes_to_send: Vec<Enr>,
node_address: NodeAddress,
rpc_id: RequestId,
query: &str,
req_type: &str,
) {
// if there are no nodes, send an empty response
if nodes_to_send.is_empty() {
Expand All @@ -1658,7 +1739,7 @@ impl Service {
};
trace!(
"Sending empty {} response to: {}",
query,
req_type,
node_address.node_id
);
if let Err(e) = self
Expand Down Expand Up @@ -1718,7 +1799,7 @@ impl Service {
for response in responses {
trace!(
"Sending {} response to: {}. Response: {} ",
query,
req_type,
node_address,
response
);
Expand Down
2 changes: 1 addition & 1 deletion src/service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async fn build_service(
ticket_pools: TicketPools::default(),
active_topic_queries: ActiveTopicQueries::new(
config.topic_query_timeout,
config.topics_num_results,
config.max_nodes_response,
),
exit,
config,
Expand Down

0 comments on commit 7676752

Please sign in to comment.