Skip to content

Commit 49ed01d

Browse files
eareimuhuster-zhangpeng
authored andcommitted
fix(quic): adapt QuicServer::accpet more usages
1 parent e60764c commit 49ed01d

File tree

3 files changed

+154
-31
lines changed

3 files changed

+154
-31
lines changed

gm-quic/src/server.rs

+21-29
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,10 @@ use std::{
77
use dashmap::DashMap;
88
use qbase::{
99
cid::ConnectionId,
10-
packet::{
11-
header::{GetDcid, GetScid},
12-
long, DataHeader, DataPacket, InitialHeader, RetryHeader,
13-
},
10+
packet::{header::GetScid, long, DataHeader, DataPacket, InitialHeader, RetryHeader},
1411
param::{Parameters, ServerParameters},
1512
sid::{handy::ConsistentConcurrency, ControlConcurrency},
1613
token::{ArcTokenRegistry, TokenProvider},
17-
util::ArcAsyncDeque,
1814
};
1915
use qconnection::{conn::ArcConnection, path::Pathway, router::Router, usc::ArcUsc};
2016
use rustls::{
@@ -26,7 +22,7 @@ use rustls::{
2622
use crate::{get_or_create_usc, util, ConnKey, QuicConnection, CONNECTIONS};
2723

2824
type TlsServerConfigBuilder<T> = ConfigBuilder<TlsServerConfig, T>;
29-
type QuicListner = ArcAsyncDeque<(Arc<QuicConnection>, Pathway)>;
25+
type QuicListner = Arc<util::Channel<(Arc<QuicConnection>, Pathway)>>;
3026

3127
/// 理应全局只有一个server
3228
static SERVER: LazyLock<RwLock<Weak<QuicServer>>> = LazyLock::new(RwLock::default);
@@ -132,20 +128,18 @@ impl QuicServer {
132128
///
133129
/// If all listening udp sockets are closed, this method will return an error.
134130
pub async fn accept(&self) -> io::Result<(Arc<QuicConnection>, Pathway)> {
135-
// TODO: 错误
136-
let error = || {
137-
// let msg = "All listening udp sockets are closed";
138-
// io::Error::new(io::ErrorKind::AddrNotAvailable, msg)
139-
unreachable!()
131+
let no_address_listening = || {
132+
let error = "all listening udp sockets are closed";
133+
io::Error::new(io::ErrorKind::AddrNotAvailable, error)
140134
};
141-
// 这个VecDeque永远不会被close,所以这里不会是None
142-
self.listener.pop().await.ok_or_else(error)
135+
self.listener.recv().await.ok_or_else(no_address_listening)
143136
}
144137
}
145138

146139
// internal methods
147140
impl QuicServer {
148141
pub(crate) fn try_to_accept_conn_from(mut packet: DataPacket, pathway: Pathway, usc: &ArcUsc) {
142+
log::info!("try to accept connection from {}", pathway.dst_addr());
149143
let Some(server) = SERVER.read().unwrap().upgrade() else {
150144
return;
151145
};
@@ -161,13 +155,11 @@ impl QuicServer {
161155
.unwrap();
162156
let (initial_dcid, client_initial_dcid) = match &mut packet.header {
163157
DataHeader::Long(long::DataHeader::Initial(hdr)) => {
164-
let client_dcid = *hdr.get_dcid();
165-
hdr.dcid = initial_scid;
158+
let client_dcid = core::mem::replace(&mut hdr.dcid, initial_scid);
166159
(*hdr.get_scid(), client_dcid)
167160
}
168161
DataHeader::Long(long::DataHeader::ZeroRtt(hdr)) => {
169-
let client_dcid = *hdr.get_dcid();
170-
hdr.dcid = initial_scid;
162+
let client_dcid = core::mem::replace(&mut hdr.dcid, initial_scid);
171163
(*hdr.get_scid(), client_dcid)
172164
}
173165
_ => return,
@@ -195,25 +187,25 @@ impl QuicServer {
195187
token_registry,
196188
);
197189
inner.add_initial_path(pathway, usc.clone());
198-
let conn = QuicConnection {
190+
let conn = Arc::new(QuicConnection {
199191
key: ConnKey::Server(initial_scid),
200192
inner: inner.clone(), // emm...
201-
};
202-
log::info!("incoming connection established");
203-
server.listener.push_back((Arc::new(conn), pathway.filp()));
204-
CONNECTIONS.insert(ConnKey::Server(initial_scid), inner);
205-
_ = Router::try_to_route_packet_from(packet, pathway, usc);
193+
});
194+
195+
if server.listener.send((conn, pathway.filp())).is_ok() {
196+
CONNECTIONS.insert(ConnKey::Server(initial_scid), inner);
197+
_ = Router::try_to_route_packet_from(packet, pathway, usc);
198+
}
206199
}
207200

208201
pub(crate) fn on_socket_close(addr: SocketAddr) {
209202
if let Some(server) = SERVER.read().unwrap().upgrade() {
210203
let bind_address_removed = server.sockets.remove(&addr).is_some();
211-
// 所有已经绑定的地址都被关闭了,并且被动监听没有开启,那么就关闭server的监听...
212-
// THINK: 不可能再接收新连接的情况下,Listener应该被立刻关闭?否
213-
// if bind_address_removed && !server.passive_listening && server.sockets.is_empty() {
214-
// server.listener.close();
215-
// }
216-
_ = bind_address_removed;
204+
// when: add listening sockets are removed, and passive listening is not enabled, it's not possiable
205+
// to accept new connections anymore, so close the server's listener...
206+
if bind_address_removed && !server.passive_listening && server.sockets.is_empty() {
207+
server.listener.close();
208+
}
217209
}
218210
}
219211

gm-quic/src/util.rs

+132-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use std::{io, path::Path};
1+
use std::{collections::VecDeque, io, path::Path, sync::Mutex};
22

33
use rustls::pki_types::{pem::PemObject, CertificateDer, PrivateKeyDer};
4+
use tokio::sync::Notify;
45

56
pub fn parse_cert_files(
67
cert_chain_file: impl AsRef<Path>,
@@ -22,3 +23,133 @@ pub fn parse_cert_files(
2223
.map_err(|e| cast_pem_error(e, key_file.as_ref()))?;
2324
Ok((cert_chain, key_der))
2425
}
26+
27+
/// Unbound multi-producer multi-consumer (mpmc) channel.
28+
#[derive(Debug)]
29+
pub struct Channel<T> {
30+
deque: Mutex<Option<VecDeque<T>>>,
31+
notify: Notify,
32+
}
33+
34+
impl<T> Channel<T> {
35+
/// Create a new empty channel.
36+
pub fn new() -> Self {
37+
Self {
38+
deque: Some(VecDeque::new()).into(),
39+
notify: Default::default(),
40+
}
41+
}
42+
43+
/// Send an item to the channel.
44+
///
45+
/// If the channel is closed, the item is returned as [`Err`].
46+
pub fn send(&self, item: T) -> Result<(), T> {
47+
log::info!("send {} to channel", core::any::type_name_of_val(&item));
48+
let mut deque_guard = self.deque.lock().unwrap();
49+
50+
match deque_guard.as_mut() {
51+
None => Err(item),
52+
Some(deque) => {
53+
deque.push_back(item);
54+
drop(deque_guard);
55+
self.notify.notify_one();
56+
Ok(())
57+
}
58+
}
59+
}
60+
61+
/// Close the channel.
62+
///
63+
/// All unrecieved items will be dropped, and all pending [`Channel::recv`] calls will return [`None`].
64+
pub fn close(&self) {
65+
drop(self.deque.lock().unwrap().take());
66+
self.notify.notify_waiters();
67+
}
68+
69+
/// Try to recieve an item from the channel.
70+
///
71+
/// If the channel is empty, [`TryRecvError::Empty`] is returned.
72+
/// If the channel is closed, [`TryRecvError::Closed`] is returned.
73+
pub fn try_recv(&self) -> Result<T, TryRecvError> {
74+
let mut deque_guard = self.deque.lock().unwrap();
75+
let deque = deque_guard.as_mut().ok_or(TryRecvError::Closed)?;
76+
deque.pop_front().ok_or(TryRecvError::Empty)
77+
}
78+
79+
/// Recieve an item from the channel.
80+
///
81+
/// If the channel is empty, the call will wait until an item is sent.
82+
///
83+
/// If the channel is closed, [`None`] is returned.
84+
pub async fn recv(&self) -> Option<T> {
85+
let fut = self.notify.notified();
86+
tokio::pin!(fut);
87+
88+
loop {
89+
// because the Notify::notify_one will only store one permit,
90+
// we need to enable the notified to avoid missing the notification
91+
fut.as_mut().enable();
92+
93+
match self.try_recv() {
94+
Ok(item) => return Some(item),
95+
Err(TryRecvError::Empty) => {
96+
fut.as_mut().await;
97+
fut.set(self.notify.notified());
98+
}
99+
Err(TryRecvError::Closed) => return None,
100+
}
101+
}
102+
}
103+
}
104+
105+
/// Error type for [`Channel::try_recv`].
106+
#[derive(thiserror::Error, Debug, Clone, Copy)]
107+
pub enum TryRecvError {
108+
#[error("channel is empty")]
109+
Empty,
110+
#[error("channel is closed")]
111+
Closed,
112+
}
113+
114+
impl<T> Default for Channel<T> {
115+
fn default() -> Self {
116+
Self::new()
117+
}
118+
}
119+
120+
#[cfg(test)]
121+
mod tests {
122+
use std::sync::Arc;
123+
124+
use super::*;
125+
126+
#[tokio::test]
127+
async fn chennel() {
128+
let ch = Arc::new(Channel::new());
129+
let ch2 = ch.clone();
130+
assert_eq!(ch.send(0), Ok(()));
131+
132+
let t1 = tokio::spawn(async move {
133+
assert_eq!(ch.recv().await, Some(0));
134+
assert_eq!(ch.recv().await, Some(1));
135+
assert_eq!(ch.recv().await, Some(2));
136+
assert_eq!(ch.recv().await, Some(3));
137+
assert_eq!(ch.recv().await, None);
138+
});
139+
140+
let t2 = tokio::spawn(async move {
141+
assert_eq!(ch2.send(1), Ok(()));
142+
tokio::task::yield_now().await;
143+
assert_eq!(ch2.send(2), Ok(()));
144+
tokio::task::yield_now().await;
145+
assert_eq!(ch2.send(3), Ok(()));
146+
tokio::task::yield_now().await;
147+
ch2.close();
148+
assert_eq!(ch2.send(4), Err(4));
149+
assert_eq!(ch2.send(5), Err(5));
150+
});
151+
152+
t1.await.unwrap();
153+
t2.await.unwrap();
154+
}
155+
}

qconnection/src/usc.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ impl UscRegistry {
9191
/// This struct also provide useful methods to send datagrams via a given [`Pathway`].
9292
///
9393
/// [`UdpSocketController`]: qudp::UdpSocketController
94-
#[derive(Clone, Deref)]
94+
#[derive(Debug, Clone, Deref)]
9595
pub struct ArcUsc {
9696
#[deref]
9797
usc: Arc<qudp::UdpSocketController>,

0 commit comments

Comments
 (0)