diff --git a/crates/ironrdp-acceptor/src/lib.rs b/crates/ironrdp-acceptor/src/lib.rs index 169c239a0..24e63b1e4 100644 --- a/crates/ironrdp-acceptor/src/lib.rs +++ b/crates/ironrdp-acceptor/src/lib.rs @@ -1,6 +1,7 @@ #[macro_use] extern crate tracing; +use ironrdp_async::bytes::Bytes; use ironrdp_async::{single_sequence_step, Framed, FramedRead, FramedWrite, StreamWrapper}; use ironrdp_connector::ConnectorResult; use ironrdp_pdu::write_buf::WriteBuf; @@ -48,6 +49,7 @@ where pub async fn accept_finalize( mut framed: Framed, acceptor: &mut Acceptor, + mut unmatched: Option<&mut Vec>, ) -> ConnectorResult<(Framed, AcceptorResult)> where S: FramedRead + FramedWrite, @@ -58,7 +60,6 @@ where if let Some(result) = acceptor.get_result() { return Ok((framed, result)); } - - single_sequence_step(&mut framed, acceptor, &mut buf, None).await?; + single_sequence_step(&mut framed, acceptor, &mut buf, unmatched.as_deref_mut()).await?; } } diff --git a/crates/ironrdp-server/src/server.rs b/crates/ironrdp-server/src/server.rs index 53a0050b1..b046c1dcc 100644 --- a/crates/ironrdp-server/src/server.rs +++ b/crates/ironrdp-server/src/server.rs @@ -17,6 +17,7 @@ use ironrdp_pdu::{self, decode, encode_vec, mcs, nego, rdp, Action, PduResult}; use ironrdp_svc::{impl_as_any, server_encode_svc_messages, StaticChannelId, StaticChannelSet, SvcProcessor}; use ironrdp_tokio::{Framed, FramedRead, FramedWrite, TokioFramed}; use rdpsnd::server::{RdpsndServer, RdpsndServerMessage}; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex}; use tokio::task; @@ -797,21 +798,34 @@ impl RdpServer { } } - async fn accept_finalize(&mut self, mut framed: Framed, mut acceptor: Acceptor) -> Result<()> + async fn accept_finalize(&mut self, mut framed: TokioFramed, mut acceptor: Acceptor) -> Result<()> where - S: FramedRead + FramedWrite, + S: AsyncRead + AsyncWrite + Sync + Send + Unpin, { + let mut other_pdus = None; + loop { - let (new_framed, result) = ironrdp_acceptor::accept_finalize(framed, &mut acceptor) + let (new_framed, result) = ironrdp_acceptor::accept_finalize(framed, &mut acceptor, other_pdus.as_mut()) .await .context("failed to accept client during finalize")?; - framed = new_framed; + + let (stream, mut leftover) = new_framed.into_inner(); + + if let Some(pdus) = other_pdus.take() { + let unmatched_frames = pdus.into_iter().flatten(); + let previous_leftover = leftover.split(); + leftover.extend(unmatched_frames); + leftover.extend_from_slice(&previous_leftover); + } + + framed = TokioFramed::new_with_leftover(stream, leftover); match self.client_accepted(&mut framed, result).await? { RunState::Continue => { unreachable!(); } RunState::DeactivationReactivation { desktop_size } => { + other_pdus = Some(Vec::new()); acceptor = Acceptor::new_deactivation_reactivation(acceptor, desktop_size); self.attach_channels(&mut acceptor); continue;