Skip to content

Commit

Permalink
feat(server): collect and postpone incoming PDUs during reactivation
Browse files Browse the repository at this point in the history
The client may have pending messages while the activation-reactivation
sequence is ongoing. Let's collect them in this case and restore them
after successfull reconnection.

Signed-off-by: Marc-André Lureau <[email protected]>
  • Loading branch information
elmarco committed Aug 19, 2024
1 parent 9d4cde3 commit 8141cb5
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
5 changes: 3 additions & 2 deletions crates/ironrdp-acceptor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[macro_use]
extern crate tracing;

use ironrdp_async::bytes::Bytes;
use ironrdp_async::{single_sequence_step, Framed, FramedRead, FramedWrite, StreamWrapper};
use ironrdp_connector::ConnectorResult;
use ironrdp_pdu::write_buf::WriteBuf;
Expand Down Expand Up @@ -48,6 +49,7 @@ where
pub async fn accept_finalize<S>(
mut framed: Framed<S>,
acceptor: &mut Acceptor,
mut unmatched: Option<&mut Vec<Bytes>>,
) -> ConnectorResult<(Framed<S>, AcceptorResult)>
where
S: FramedRead + FramedWrite,
Expand All @@ -58,7 +60,6 @@ where
if let Some(result) = acceptor.get_result() {
return Ok((framed, result));
}

single_sequence_step(&mut framed, acceptor, &mut buf, None).await?;
single_sequence_step(&mut framed, acceptor, &mut buf, unmatched.as_deref_mut()).await?;
}
}
22 changes: 18 additions & 4 deletions crates/ironrdp-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use ironrdp_pdu::{self, decode, encode_vec, mcs, nego, rdp, Action, PduResult};
use ironrdp_svc::{impl_as_any, server_encode_svc_messages, StaticChannelId, StaticChannelSet, SvcProcessor};
use ironrdp_tokio::{Framed, FramedRead, FramedWrite, TokioFramed};
use rdpsnd::server::{RdpsndServer, RdpsndServerMessage};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Mutex};
use tokio::task;
Expand Down Expand Up @@ -797,21 +798,34 @@ impl RdpServer {
}
}

async fn accept_finalize<S>(&mut self, mut framed: Framed<S>, mut acceptor: Acceptor) -> Result<()>
async fn accept_finalize<S>(&mut self, mut framed: TokioFramed<S>, mut acceptor: Acceptor) -> Result<()>
where
S: FramedRead + FramedWrite,
S: AsyncRead + AsyncWrite + Sync + Send + Unpin,
{
let mut other_pdus = None;

loop {
let (new_framed, result) = ironrdp_acceptor::accept_finalize(framed, &mut acceptor)
let (new_framed, result) = ironrdp_acceptor::accept_finalize(framed, &mut acceptor, other_pdus.as_mut())
.await
.context("failed to accept client during finalize")?;
framed = new_framed;

let (stream, mut leftover) = new_framed.into_inner();

if let Some(pdus) = other_pdus.take() {
let unmatched_frames = pdus.into_iter().flatten();
let previous_leftover = leftover.split();
leftover.extend(unmatched_frames);
leftover.extend_from_slice(&previous_leftover);
}

framed = TokioFramed::new_with_leftover(stream, leftover);

match self.client_accepted(&mut framed, result).await? {
RunState::Continue => {
unreachable!();
}
RunState::DeactivationReactivation { desktop_size } => {
other_pdus = Some(Vec::new());
acceptor = Acceptor::new_deactivation_reactivation(acceptor, desktop_size);
self.attach_channels(&mut acceptor);
continue;
Expand Down

0 comments on commit 8141cb5

Please sign in to comment.