Skip to content

Commit

Permalink
test(engineio): fix rx to stream adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
Totodore committed Jan 20, 2025
1 parent dacbe2e commit 31e85e0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 20 deletions.
3 changes: 2 additions & 1 deletion crates/engineioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ tracing-subscriber.workspace = true
hyper = { workspace = true, features = ["server", "http1"] }
criterion.workspace = true
axum.workspace = true
hyper-util = { workspace = true, features = ["tokio", "client-legacy"] }
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io"], default-features = false }

[features]
v3 = ["memchr", "unicode-segmentation", "itoa"]
Expand Down
2 changes: 1 addition & 1 deletion crates/engineioxide/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl<H, Svc> EngineIoService<H, Svc>
where
H: EngineIoHandler,
{
/// Create a new engine.io over websocket through a raw stream.
/// Create a new engine.io conn over websocket through a raw stream.
/// Mostly used for testing.
pub fn ws_init<S>(
&self,
Expand Down
48 changes: 30 additions & 18 deletions crates/engineioxide/tests/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
time::Duration,
};

use bytes::{BufMut, Bytes};
use bytes::Bytes;
use engineioxide::{
config::EngineIoConfig, handler::EngineIoHandler, service::EngineIoService, sid::Sid,
ProtocolVersion,
Expand All @@ -20,10 +20,12 @@ use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
sync::mpsc,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_tungstenite::{
tungstenite::{handshake::client::generate_key, protocol::Role},
WebSocketStream,
};
use tokio_util::io::StreamReader;
use tower_service::Service;

/// An OpenPacket is used to initiate a connection
Expand Down Expand Up @@ -89,20 +91,32 @@ pub async fn create_ws_connection<H: EngineIoHandler>(
new_ws_mock_conn(svc, ProtocolVersion::V4, None).await
}

pub struct StreamImpl(mpsc::UnboundedSender<Bytes>, mpsc::UnboundedReceiver<Bytes>);
pin_project_lite::pin_project! {
pub struct StreamImpl {
tx: mpsc::UnboundedSender<Result<Bytes, io::Error>>,
#[pin]
rx: StreamReader<UnboundedReceiverStream<Result<Bytes, io::Error>>, Bytes>,
}
}
impl StreamImpl {
pub fn new(
tx: mpsc::UnboundedSender<Result<Bytes, io::Error>>,
rx: mpsc::UnboundedReceiver<Result<Bytes, io::Error>>,
) -> Self {
Self {
tx,
rx: StreamReader::new(UnboundedReceiverStream::new(rx)),
}
}
}

impl AsyncRead for StreamImpl {
fn poll_read(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
self.1.poll_recv(cx).map(|e| {
if let Some(e) = e {
buf.put(e);
}
Ok(())
})
self.project().rx.poll_read(cx, buf)
}
}
impl AsyncWrite for StreamImpl {
Expand All @@ -112,19 +126,18 @@ impl AsyncWrite for StreamImpl {
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let len = buf.len();
self.0.send(Bytes::copy_from_slice(buf)).unwrap();
self.project()
.tx
.send(Ok(Bytes::copy_from_slice(buf)))
.unwrap();
Poll::Ready(Ok(len))
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
self.1.close();
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
}
Expand All @@ -148,11 +161,10 @@ async fn new_ws_mock_conn<H: EngineIoHandler>(
.unwrap()
.into_parts()
.0;

tokio::spawn(svc.ws_init(StreamImpl(tx, rx1), protocol, sid, parts));
tokio::spawn(svc.ws_init(StreamImpl::new(tx, rx1), protocol, sid, parts));

tokio_tungstenite::WebSocketStream::from_raw_socket(
StreamImpl(tx1, rx),
StreamImpl::new(tx1, rx),
Role::Client,
Default::default(),
)
Expand Down

0 comments on commit 31e85e0

Please sign in to comment.