Skip to content

Commit ea4cc92

Browse files
committed
Merge branch 'master' into cpp-bindings
2 parents aa79a0f + 1d55d6c commit ea4cc92

File tree

8 files changed

+58
-48
lines changed

8 files changed

+58
-48
lines changed

lib/src/network/connection.rs

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::{
22
peer_addr::PeerAddr,
33
peer_info::PeerInfo,
4-
peer_source::PeerSource,
4+
peer_source::{ConnectionDirection, PeerSource},
55
peer_state::PeerState,
66
runtime_id::PublicRuntimeId,
77
stats::{ByteCounters, StatsTracker},
@@ -10,7 +10,6 @@ use crate::{
1010
collections::{hash_map::Entry, HashMap},
1111
sync::{AwaitDrop, DropAwaitable, WatchSenderExt},
1212
};
13-
use serde::Serialize;
1413
use std::{
1514
fmt,
1615
sync::{
@@ -40,7 +39,7 @@ impl ConnectionSet {
4039
pub fn reserve(&self, addr: PeerAddr, source: PeerSource) -> ReserveResult {
4140
let key = ConnectionKey {
4241
addr,
43-
dir: ConnectionDirection::from_source(source),
42+
dir: source.direction(),
4443
};
4544

4645
self.connections
@@ -145,24 +144,6 @@ impl PeerInfoCollector {
145144
}
146145
}
147146

148-
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Serialize)]
149-
pub(super) enum ConnectionDirection {
150-
Incoming,
151-
Outgoing,
152-
}
153-
154-
impl ConnectionDirection {
155-
pub fn from_source(source: PeerSource) -> Self {
156-
match source {
157-
PeerSource::Listener => Self::Incoming,
158-
PeerSource::UserProvided
159-
| PeerSource::LocalDiscovery
160-
| PeerSource::Dht
161-
| PeerSource::PeerExchange => Self::Outgoing,
162-
}
163-
}
164-
}
165-
166147
/// Connection permit that prevents another connection to the same peer (socket address) to be
167148
/// established as long as it remains in scope.
168149
pub(super) struct ConnectionPermit {

lib/src/network/connection_monitor.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
use super::{
2-
connection::{ConnectionDirection, ConnectionId},
3-
peer_addr::PeerAddr,
4-
PeerSource, PublicRuntimeId,
5-
};
1+
use super::{connection::ConnectionId, peer_addr::PeerAddr, PeerSource, PublicRuntimeId};
62
use crate::crypto::sign::PublicKey;
73
use state_monitor::{MonitoredValue, StateMonitor};
84
use std::sync::atomic::{AtomicU64, Ordering};
@@ -26,10 +22,7 @@ impl ConnectionMonitor {
2622
runtime_id = field::Empty,
2723
);
2824

29-
let direction_glyph = match ConnectionDirection::from_source(source) {
30-
ConnectionDirection::Incoming => '↓',
31-
ConnectionDirection::Outgoing => '↑',
32-
};
25+
let direction_glyph = source.direction().glyph();
3326

3427
// We need to ID the StateMonitor node because it is created prior to `addr` being
3528
// deduplicated and so we'd get an ambiguous entry otherwise.

lib/src/network/message_broker.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,17 @@ use super::{
1010
server::Server,
1111
stats::ByteCounters,
1212
};
13-
use crate::{collections::HashMap, crypto::Hashable, protocol::RepositoryId, repository::Vault};
13+
use crate::{
14+
collections::HashMap,
15+
crypto::{sign::PublicKey, Hashable},
16+
protocol::RepositoryId,
17+
repository::Vault,
18+
};
1419
use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
1520
use bytes::{BufMut, BytesMut};
1621
use futures_util::{SinkExt, StreamExt};
1722
use net::{bus::TopicId, unified::Connection};
18-
use state_monitor::StateMonitor;
23+
use state_monitor::{MonitoredValue, StateMonitor};
1924
use std::{collections::hash_map::Entry, sync::Arc, time::Instant};
2025
use tokio::{
2126
select,
@@ -36,6 +41,7 @@ pub(super) struct MessageBroker {
3641
links: HashMap<RepositoryId, oneshot::Sender<()>>,
3742
pex_peer: PexPeer,
3843
monitor: StateMonitor,
44+
_monitor_runtime_id: MonitoredValue<PublicKey>,
3945
span: SpanGuard,
4046
}
4147

@@ -51,6 +57,8 @@ impl MessageBroker {
5157
) -> Self {
5258
let span = SpanGuard::new(Span::current());
5359

60+
let monitor_runtime_id = monitor.make_value("runtime id", *that_runtime_id.as_public_key());
61+
5462
Self {
5563
this_runtime_id,
5664
that_runtime_id,
@@ -61,6 +69,7 @@ impl MessageBroker {
6169
links: HashMap::default(),
6270
pex_peer,
6371
monitor,
72+
_monitor_runtime_id: monitor_runtime_id,
6473
span,
6574
}
6675
}

lib/src/network/mod.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,29 +41,29 @@ pub use self::{
4141
runtime_id::{PublicRuntimeId, SecretRuntimeId},
4242
stats::Stats,
4343
};
44-
use choke::Choker;
45-
use constants::REQUEST_TIMEOUT;
46-
use event::ProtocolVersions;
4744
pub use net::stun::NatBehavior;
48-
use request_tracker::RequestTracker;
4945

5046
use self::{
47+
choke::Choker,
5148
connection::{ConnectionPermit, ConnectionSet, ReserveResult},
5249
connection_monitor::ConnectionMonitor,
50+
constants::REQUEST_TIMEOUT,
5351
dht_discovery::DhtDiscovery,
52+
event::ProtocolVersions,
5453
gateway::{Connectivity, Gateway, StackAddresses},
5554
local_discovery::LocalDiscovery,
5655
message_broker::MessageBroker,
5756
peer_addr::PeerPort,
5857
peer_exchange::{PexDiscovery, PexRepository},
58+
peer_source::ConnectionDirection,
5959
protocol::{Version, MAGIC, VERSION},
60+
request_tracker::RequestTracker,
6061
seen_peers::{SeenPeer, SeenPeers},
6162
stats::{ByteCounters, StatsTracker},
6263
stun::StunClients,
6364
};
6465
use crate::{
6566
collections::HashSet,
66-
network::connection::ConnectionDirection,
6767
protocol::RepositoryId,
6868
repository::{RepositoryHandle, Vault},
6969
};
@@ -831,7 +831,7 @@ impl Inner {
831831
&connection,
832832
VERSION,
833833
&self.this_runtime_id,
834-
ConnectionDirection::from_source(permit.source()),
834+
permit.source().direction(),
835835
)
836836
.await;
837837

@@ -883,8 +883,11 @@ impl Inner {
883883
that_runtime_id,
884884
connection,
885885
pex_peer,
886-
self.peers_monitor
887-
.make_child(format!("{:?}", that_runtime_id.as_public_key())),
886+
self.peers_monitor.make_child(format!(
887+
"{} {}",
888+
permit.source().direction().glyph(),
889+
permit.addr()
890+
)),
888891
self.stats_tracker.bytes.clone(),
889892
permit.byte_counters(),
890893
)

lib/src/network/peer_addr.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,7 @@ impl FromStr for PeerAddr {
6969
let (proto, addr) = match s.split_once('/') {
7070
Some((proto, addr)) => (proto, addr),
7171
None => {
72-
return Err(format!(
73-
"Could not find '/' delimiter in the address {s:?}"
74-
));
72+
return Err(format!("Could not find '/' delimiter in the address {s:?}"));
7573
}
7674
};
7775

lib/src/network/peer_exchange.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
//! other in order to discover new peers.
33
44
use super::{
5-
connection::ConnectionDirection,
65
ip,
76
message::Message,
87
peer_addr::PeerAddr,
8+
peer_source::ConnectionDirection,
99
seen_peers::{SeenPeer, SeenPeers},
1010
PeerSource,
1111
};
@@ -159,9 +159,7 @@ impl PexPeer {
159159
/// Call this whenever new connection to this peer has been established. The `closed` should
160160
/// trigger when the connection gets closed.
161161
pub fn handle_connection(&self, addr: PeerAddr, source: PeerSource, closed: AwaitDrop) {
162-
if addr.is_tcp()
163-
&& ConnectionDirection::from_source(source) == ConnectionDirection::Incoming
164-
{
162+
if addr.is_tcp() && source.direction() == ConnectionDirection::Incoming {
165163
// Incomming TCP address can't be connected to (it has different port than the listener)
166164
// so there is no point exchanging it.
167165
return;

lib/src/network/peer_source.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,17 @@ pub enum PeerSource {
3333
PeerExchange = 4,
3434
}
3535

36+
impl PeerSource {
37+
pub(super) fn direction(&self) -> ConnectionDirection {
38+
match self {
39+
Self::Listener => ConnectionDirection::Incoming,
40+
Self::UserProvided | Self::LocalDiscovery | Self::Dht | Self::PeerExchange => {
41+
ConnectionDirection::Outgoing
42+
}
43+
}
44+
}
45+
}
46+
3647
impl fmt::Display for PeerSource {
3748
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3849
match self {
@@ -44,3 +55,20 @@ impl fmt::Display for PeerSource {
4455
}
4556
}
4657
}
58+
59+
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Serialize)]
60+
pub(super) enum ConnectionDirection {
61+
/// Peer connected to us
62+
Incoming,
63+
/// We connected to the peer
64+
Outgoing,
65+
}
66+
67+
impl ConnectionDirection {
68+
pub fn glyph(&self) -> char {
69+
match self {
70+
Self::Incoming => '↓',
71+
Self::Outgoing => '↑',
72+
}
73+
}
74+
}

service/src/logger/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ fn create_log_filter() -> EnvFilter {
4949
// time (via a command-line option or so)
5050
.parse_lossy(
5151
env::var(EnvFilter::DEFAULT_ENV)
52-
.unwrap_or_else(|_| "ouisync=debug,deadlock=warn".to_string()),
52+
.unwrap_or_else(|_| "ouisync=debug,state_monitor=warn,deadlock=warn".to_string()),
5353
)
5454
}
5555

0 commit comments

Comments
 (0)