Skip to content

Commit

Permalink
feat: improve the transaction for data space
Browse files Browse the repository at this point in the history
  • Loading branch information
huster-zhangpeng committed Nov 19, 2024
1 parent bcd396a commit 5e4a74f
Show file tree
Hide file tree
Showing 17 changed files with 238 additions and 109 deletions.
45 changes: 45 additions & 0 deletions qbase/src/cid/local_cid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ where
}
}

fn initial_scid(&self) -> Option<ConnectionId> {
self.cid_deque.get(0)?.map(|(cid, _)| cid)
}

/// Set the maximum number of active connection IDs.
///
/// The value of the active_connection_id_limit parameter MUST be at least 2.
Expand Down Expand Up @@ -159,6 +163,47 @@ where
Self(Arc::new(Mutex::new(raw_local_cids)))
}

/// Get the initial source connection ID.
///
/// 0-RTT packets in the first flight use the same Destination Connection ID
/// and Source Connection ID values as the client's first Initial packet.
/// see [Section 7.2.6](https://datatracker.ietf.org/doc/html/rfc9000#section-7.2-6)
/// of [RFC9000](https://datatracker.ietf.org/doc/html/rfc9000).
///
/// Once a client has received a valid Initial packet from the server,
/// it MUST discard any subsequent packet it receives on that connection
/// with a different Source Connection ID,
/// see [Section 7.2.7](https://datatracker.ietf.org/doc/html/rfc9000#section-7.2-7)
/// of [RFC9000](https://datatracker.ietf.org/doc/html/rfc9000).
///
/// Any further changes to the Destination Connection ID are only permitted
/// if the values are taken from NEW_CONNECTION_ID frames;
/// if subsequent Initial packets include a different Source Connection ID,
/// they MUST be discarded,
/// see [Section 7.2.8](https://datatracker.ietf.org/doc/html/rfc9000#section-7.2-8)
/// of [RFC9000](https://datatracker.ietf.org/doc/html/rfc9000) for more details.
///
/// It means that the initial source connection ID is the only one that can be used
/// to send the Initial, 0Rtt and handshake packets.
/// Changing the scid is like issuing a new connection ID to the other party,
/// without specifying a sequence number or Stateless Reset Token.
/// Changing the scid during the Handshake phase is meaningless and harmful.
///
/// For the server, even though the server provides the preferred address
/// as the first connection ID, and even though the server can use this
/// connection ID as the scid in the Handshake packet, it is not necessary.
/// The client does not eliminate the zero connection ID.
/// When the client actually eliminates the zero connection ID,
/// it means that 1RTT packets have already started to be transmitted,
/// and all subsequent transmissions should be through 1RTT packets.
///
/// Return None if the initial source connection ID has been retired,
/// which indicates that the connection has been established,
/// and only the short header packet should be used.
pub fn initial_scid(&self) -> Option<ConnectionId> {
self.0.lock().unwrap().initial_scid()
}

/// Get all active connection IDs.
///
/// This method will be useful when finally releasing connection resources,
Expand Down
2 changes: 1 addition & 1 deletion qbase/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ where
TX: SendFrame<DataBlockedFrame>,
{
/// Updates the amount of new data sent.
pub fn post_sent(mut self, amount: usize) {
pub fn post_sent(&mut self, amount: usize) {
match self.0.deref_mut() {
Ok(inner) => {
debug_assert!(inner.sent_data + amount as u64 <= inner.max_data);
Expand Down
4 changes: 2 additions & 2 deletions qbase/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub enum DataHeader {
/// |X|1|X X 0 0|0 0| ...hdr | len(0..16) | pn(8..32) | body... | tag |
/// +---+-------+-+-+--------+------------+-----+-----+---......--+-------+
/// | |
/// +---> packet number length +---> actual encoded packet number
/// +---> encoded pn length +---> encoded packet number
/// ```
#[derive(Debug, Clone, Deref, DerefMut)]
pub struct DataPacket {
Expand Down Expand Up @@ -233,7 +233,7 @@ impl<'b> PacketWriter<'b> {
}

pub fn is_empty(&self) -> bool {
self.cursor == self.hdr_len + self.len_encoding
self.cursor == self.hdr_len + self.len_encoding + self.pn.1.size()
}

pub fn encrypt_long_packet(
Expand Down
11 changes: 3 additions & 8 deletions qbase/src/sid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,17 +283,12 @@ where
max_bi_streams: u64,
max_uni_streams: u64,
sid_frames_tx: T,
strategy: Box<dyn ControlConcurrency>,
ctrl: Box<dyn ControlConcurrency>,
) -> Self {
// 缺省为0
let local = ArcLocalStreamIds::new(role, 0, 0, sid_frames_tx.clone());
let remote = ArcRemoteStreamIds::new(
!role,
max_bi_streams,
max_uni_streams,
sid_frames_tx,
strategy,
);
let remote =
ArcRemoteStreamIds::new(!role, max_bi_streams, max_uni_streams, sid_frames_tx, ctrl);
Self { local, remote }
}
}
24 changes: 12 additions & 12 deletions qbase/src/sid/remote_sid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ impl Iterator for NeedCreate {
/// Remote stream IDs management.
#[derive(Debug)]
struct RemoteStreamIds<MAX> {
role: Role, // The role of the peer
max: [u64; 2], // The maximum stream ID that limit peer to create
unallocated: [StreamId; 2], // The stream ID that peer has not used
strategy: Box<dyn ControlConcurrency>, // The strategy to control the concurrency of streams
max_tx: MAX, // The channel to send the MAX_STREAMS frame to peer
role: Role, // The role of the peer
max: [u64; 2], // The maximum stream ID that limit peer to create
unallocated: [StreamId; 2], // The stream ID that peer has not used
ctrl: Box<dyn ControlConcurrency>, // The strategy to control the concurrency of streams
max_tx: MAX, // The channel to send the MAX_STREAMS frame to peer
}

impl<MAX> RemoteStreamIds<MAX>
Expand All @@ -71,7 +71,7 @@ where
max_bi: u64,
max_uni: u64,
max_tx: MAX,
strategy: Box<dyn ControlConcurrency>,
ctrl: Box<dyn ControlConcurrency>,
) -> Self {
Self {
role,
Expand All @@ -80,7 +80,7 @@ where
StreamId::new(role, Dir::Bi, 0),
StreamId::new(role, Dir::Uni, 0),
],
strategy,
ctrl,
max_tx,
}
}
Expand All @@ -103,7 +103,7 @@ where
let start = *cur;
*cur = unsafe { sid.next_unchecked() };
log::debug!("unallocated: {:?}", self.unallocated[idx]);
if let Some(max_streams) = self.strategy.on_accept_streams(sid.dir(), sid.id()) {
if let Some(max_streams) = self.ctrl.on_accept_streams(sid.dir(), sid.id()) {
self.max[idx] = max_streams;
self.max_tx.send_frame([MaxStreamsFrame::with(
sid.dir(),
Expand All @@ -120,7 +120,7 @@ where
return;
}

if let Some(max_streams) = self.strategy.on_end_of_stream(sid.dir(), sid.id()) {
if let Some(max_streams) = self.ctrl.on_end_of_stream(sid.dir(), sid.id()) {
self.max[sid.dir() as usize] = max_streams;
self.max_tx.send_frame([MaxStreamsFrame::with(
sid.dir(),
Expand All @@ -134,7 +134,7 @@ where
StreamsBlockedFrame::Bi(max) => (Dir::Bi, (*max).into_inner()),
StreamsBlockedFrame::Uni(max) => (Dir::Uni, (*max).into_inner()),
};
if let Some(max_streams) = self.strategy.on_streams_blocked(dir, max_streams) {
if let Some(max_streams) = self.ctrl.on_streams_blocked(dir, max_streams) {
self.max[dir as usize] = max_streams;
self.max_tx.send_frame([MaxStreamsFrame::with(
dir,
Expand Down Expand Up @@ -181,10 +181,10 @@ where
max_bi: u64,
max_uni: u64,
max_tx: MAX,
strategy: Box<dyn ControlConcurrency>,
ctrl: Box<dyn ControlConcurrency>,
) -> Self {
Self(Arc::new(Mutex::new(RemoteStreamIds::new(
role, max_bi, max_uni, max_tx, strategy,
role, max_bi, max_uni, max_tx, ctrl,
))))
}

Expand Down
12 changes: 6 additions & 6 deletions qconnection/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl ArcConnection {

(
connection.params.remote.clone(),
connection.streams.clone(),
connection.data.streams.clone(),
connection.error.clone(),
)
};
Expand All @@ -278,7 +278,7 @@ impl ArcConnection {

(
connection.params.remote.clone(),
connection.streams.clone(),
connection.data.streams.clone(),
connection.error.clone(),
)
};
Expand All @@ -305,7 +305,7 @@ impl ArcConnection {

(
connection.params.remote.clone(),
connection.streams.clone(),
connection.data.streams.clone(),
connection.error.clone(),
)
};
Expand All @@ -330,7 +330,7 @@ impl ArcConnection {
Invalid => unreachable!(),
};

(connection.streams.clone(), connection.error.clone())
(connection.data.streams.clone(), connection.error.clone())
};

let result = data_streams
Expand All @@ -344,7 +344,7 @@ impl ArcConnection {
let guard = self.0.lock().unwrap();

match guard.deref() {
Normal(raw) => raw.datagrams.reader(),
Normal(raw) => raw.data.datagrams.reader(),
Closing(closing) => Err(closing.error().clone())?,
Draining(draining) => Err(draining.error().clone())?,
Closed(error) => Err(error.clone())?,
Expand All @@ -365,7 +365,7 @@ impl ArcConnection {

(
connection.params.remote.clone(),
connection.datagrams.clone(),
connection.data.datagrams.clone(),
)
};

Expand Down
23 changes: 6 additions & 17 deletions qconnection/src/conn/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use qbase::{
Epoch,
};
use qcongestion::{ArcCC, CongestionAlgorithm, CongestionControl};
use qrecovery::reliable::ArcReliableFrameDeque;
use qunreliable::DatagramFlow;
use rustls::quic::Keys;
use tokio::{sync::Notify, task::JoinHandle};

Expand All @@ -28,7 +26,7 @@ use super::{
handshake::{HandshakeSpace, HandshakeTracker},
initial::{InitialSpace, InitialTracker},
},
ArcLocalCids, ArcRemoteCids, CidRegistry, DataStreams, FlowController, Handshake, RcvdPackets,
ArcLocalCids, ArcRemoteCids, CidRegistry, FlowController, Handshake, RcvdPackets,
};
use crate::{
error::ConnError,
Expand All @@ -47,9 +45,6 @@ pub struct Connection {
pub(super) flow_ctrl: FlowController,
pub(super) error: ConnError,

pub(super) streams: DataStreams,
pub(super) datagrams: DatagramFlow,

pub(super) initial: InitialSpace,
pub(super) hs: HandshakeSpace,
pub(super) data: DataSpace,
Expand Down Expand Up @@ -77,10 +72,12 @@ impl Connection {
let (hs_packets_entry, rcvd_hs_packets) = mpsc::unbounded();
let (one_rtt_packets_entry, rcvd_1rtt_packets) = mpsc::unbounded();

let reliable_frames = ArcReliableFrameDeque::with_capacity(0);
let initial = InitialSpace::new(ArcKeys::with_keys(initial_keys));
let hs = HandshakeSpace::default();
let data = DataSpace::default();
let data = DataSpace::new(role, &local_params, streams_ctrl);
let reliable_frames = &data.reliable_frames;
let streams = &data.streams;
let datagrams = &data.datagrams;

let router_registry = Router::registry(
initial_scid,
Expand All @@ -103,9 +100,6 @@ impl Connection {
let flow_ctrl = FlowController::new(65535, 65535, reliable_frames.clone());
let conn_error = ConnError::default();

let streams = DataStreams::new(role, &local_params, streams_ctrl, reliable_frames.clone());
let datagrams = DatagramFlow::new(0);

let token = match token_registry.deref() {
TokenRegistry::Client((server_name, client)) => {
Arc::new(Mutex::new(client.fetch_token(server_name)))
Expand Down Expand Up @@ -256,8 +250,6 @@ impl Connection {
let (join_0rtt, join_1rtt) = data.build(
&pathes,
&handshake,
&streams,
&datagrams,
&cid_registry,
&flow_ctrl,
&notify,
Expand All @@ -274,8 +266,6 @@ impl Connection {
paths: pathes,
cid_registry,
flow_ctrl,
streams,
datagrams,
initial,
hs,
data,
Expand All @@ -295,9 +285,8 @@ impl Connection {
}

pub fn abort_with_error(&self, error: &Error) {
self.datagrams.on_conn_error(error);
self.data.on_conn_error(error);
self.flow_ctrl.on_conn_error(error);
self.streams.on_conn_error(error);
self.params.on_conn_error(error);
self.tls_session.abort();
self.notify.notify_waiters();
Expand Down
Loading

0 comments on commit 5e4a74f

Please sign in to comment.