Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(engineio): mock http/ws connections and improve integration tests #444

Merged
merged 2 commits into from
Jan 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions crates/engineioxide/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,29 @@ where
}
}

#[cfg(feature = "__test_harness")]
#[doc(hidden)]
impl<H, Svc> EngineIoService<H, Svc>
where
H: EngineIoHandler,
{
/// Create a new engine.io over websocket through a raw stream.
/// Mostly used for testing.
pub fn ws_init<S>(
&self,
conn: S,
protocol: ProtocolVersion,
sid: Option<crate::sid::Sid>,
req_data: http::request::Parts,
) -> impl std::future::Future<Output = Result<(), crate::errors::Error>>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
let engine = self.engine.clone();
crate::transport::ws::on_init(engine, conn, protocol, sid, req_data)
}
}

/// A MakeService that always returns a clone of the [`EngineIoService`] it was created with.
pub struct MakeEngineIoService<H: EngineIoHandler, S> {
svc: EngineIoService<H, S>,
Expand Down
2 changes: 1 addition & 1 deletion crates/engineioxide/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub fn new_req<R: Send + 'static, B, H: EngineIoHandler>(
/// Sends an open packet if it is not an upgrade from a polling request
///
/// Read packets from the websocket and handle them, it will block until the connection is closed
async fn on_init<H: EngineIoHandler, S>(
pub async fn on_init<H: EngineIoHandler, S>(
engine: Arc<EngineIo<H>>,
conn: S,
protocol: ProtocolVersion,
Expand Down
42 changes: 21 additions & 21 deletions crates/engineioxide/tests/disconnect_reason.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use tokio::sync::mpsc;

mod fixture;

use fixture::{create_server, send_req};
use fixture::{create_server, create_ws_connection, send_req};
use tokio_tungstenite::tungstenite::Message;

use crate::fixture::{create_polling_connection, create_ws_connection};
use crate::fixture::create_polling_connection;

#[derive(Debug, Clone)]
struct MyHandler {
Expand Down Expand Up @@ -53,8 +53,8 @@ impl EngineIoHandler for MyHandler {
#[tokio::test]
pub async fn polling_heartbeat_timeout() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1234).await;
create_polling_connection(1234).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
create_polling_connection(&mut svc).await;

let data = tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
Expand All @@ -67,8 +67,8 @@ pub async fn polling_heartbeat_timeout() {
#[tokio::test]
pub async fn ws_heartbeat_timeout() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 12344).await;
let _stream = create_ws_connection(12344).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let _stream = create_ws_connection(&mut svc).await;

let data = tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
Expand All @@ -81,11 +81,11 @@ pub async fn ws_heartbeat_timeout() {
#[tokio::test]
pub async fn polling_transport_closed() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1235).await;
let sid = create_polling_connection(1235).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let sid = create_polling_connection(&mut svc).await;

send_req(
1235,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::POST,
Some("1".into()),
Expand All @@ -103,8 +103,8 @@ pub async fn polling_transport_closed() {
#[tokio::test]
pub async fn ws_transport_closed() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 12345).await;
let mut stream = create_ws_connection(12345).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let mut stream = create_ws_connection(&mut svc).await;

stream.send(Message::Text("1".into())).await.unwrap();

Expand All @@ -119,10 +119,10 @@ pub async fn ws_transport_closed() {
#[tokio::test]
pub async fn multiple_http_polling() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1236).await;
let sid = create_polling_connection(1236).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let sid = create_polling_connection(&mut svc).await;
send_req(
1236,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
Expand All @@ -131,13 +131,13 @@ pub async fn multiple_http_polling() {

tokio::spawn(futures_util::future::join_all(vec![
send_req(
1236,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
),
send_req(
1236,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
Expand All @@ -155,10 +155,10 @@ pub async fn multiple_http_polling() {
#[tokio::test]
pub async fn polling_packet_parsing() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1237).await;
let sid = create_polling_connection(1237).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let sid = create_polling_connection(&mut svc).await;
send_req(
1237,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::POST,
Some("aizdunazidaubdiz".into()),
Expand All @@ -176,8 +176,8 @@ pub async fn polling_packet_parsing() {
#[tokio::test]
pub async fn ws_packet_parsing() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 12347).await;
let mut stream = create_ws_connection(12347).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let mut stream = create_ws_connection(&mut svc).await;
stream
.send(Message::Text("aizdunazidaubdiz".into()))
.await
Expand Down
179 changes: 115 additions & 64 deletions crates/engineioxide/tests/fixture.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
use std::{
collections::VecDeque,
net::{IpAddr, Ipv4Addr, SocketAddr},
future::Future,
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use engineioxide::{config::EngineIoConfig, handler::EngineIoHandler, service::EngineIoService};
use bytes::{BufMut, Bytes};
use engineioxide::{
config::EngineIoConfig, handler::EngineIoHandler, service::EngineIoService, sid::Sid,
ProtocolVersion,
};
use http::Request;
use http_body_util::{BodyExt, Either, Empty, Full};
use hyper::server::conn::http1;
use hyper_util::{
client::legacy::Client,
rt::{TokioExecutor, TokioIo},
};
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
sync::mpsc,
};
use tokio_tungstenite::{
tungstenite::{handshake::client::generate_key, protocol::Role},
WebSocketStream,
};
use tower_service::Service;

/// An OpenPacket is used to initiate a connection
#[derive(Debug, Serialize, Deserialize, PartialEq, PartialOrd)]
Expand All @@ -29,41 +38,43 @@ struct OpenPacket {
}

/// Params should be in the form of `key1=value1&key2=value2`
pub async fn send_req(
port: u16,
pub fn send_req<H: EngineIoHandler>(
svc: &mut EngineIoService<H>,
params: String,
method: http::Method,
body: Option<String>,
) -> String {
) -> impl Future<Output = String> + 'static {
let body = match body {
Some(b) => Either::Left(Full::new(VecDeque::from(b.into_bytes()))),
None => Either::Right(Empty::<VecDeque<u8>>::new()),
};

let req = Request::builder()
.method(method)
.uri(format!(
"http://127.0.0.1:{port}/engine.io/?EIO=4&{}",
params
))
.uri(format!("http://127.0.0.1/engine.io/?EIO=4&{}", params))
.body(body)
.unwrap();
let mut res = Client::builder(TokioExecutor::new())
.build_http()
.request(req)
.await
.unwrap();
let body = res.body_mut().collect().await.unwrap().to_bytes();
String::from_utf8(body.to_vec())
.unwrap()
.chars()
.skip(1)
.collect()
let res = svc.call(req);
async move {
let body = res
.await
.unwrap()
.body_mut()
.collect()
.await
.unwrap()
.to_bytes();
String::from_utf8(body.to_vec())
.unwrap()
.chars()
.skip(1)
.collect()
}
}

pub async fn create_polling_connection(port: u16) -> String {
pub async fn create_polling_connection<H: EngineIoHandler>(svc: &mut EngineIoService<H>) -> String {
let body = send_req(
port,
svc,
"transport=polling".to_string(),
http::Method::GET,
None,
Expand All @@ -72,48 +83,88 @@ pub async fn create_polling_connection(port: u16) -> String {
let open_packet: OpenPacket = serde_json::from_str(&body).unwrap();
open_packet.sid
}
pub async fn create_ws_connection(port: u16) -> WebSocketStream<MaybeTlsStream<TcpStream>> {
tokio_tungstenite::connect_async(format!(
"ws://127.0.0.1:{port}/engine.io/?EIO=4&transport=websocket"
))
pub async fn create_ws_connection<H: EngineIoHandler>(
svc: &mut EngineIoService<H>,
) -> WebSocketStream<StreamImpl> {
new_ws_mock_conn(svc, ProtocolVersion::V4, None).await
}

pub struct StreamImpl(mpsc::UnboundedSender<Bytes>, mpsc::UnboundedReceiver<Bytes>);

impl AsyncRead for StreamImpl {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
self.1.poll_recv(cx).map(|e| {
if let Some(e) = e {
buf.put(e);
}
Ok(())
})
}
}
impl AsyncWrite for StreamImpl {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let len = buf.len();
self.0.send(Bytes::copy_from_slice(buf)).unwrap();
Poll::Ready(Ok(len))
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
self.1.close();
Poll::Ready(Ok(()))
}
}
async fn new_ws_mock_conn<H: EngineIoHandler>(
svc: &mut EngineIoService<H>,
protocol: ProtocolVersion,
sid: Option<Sid>,
) -> WebSocketStream<StreamImpl> {
let (tx, rx) = mpsc::unbounded_channel();
let (tx1, rx1) = mpsc::unbounded_channel();

let parts = Request::builder()
.method("GET")
.header("Host", "127.0.0.1")
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Key", generate_key())
.uri("ws://127.0.0.1/engine.io/?EIO=4&transport=websocket")
.body(http_body_util::Empty::<Bytes>::new())
.unwrap()
.into_parts()
.0;

tokio::spawn(svc.ws_init(StreamImpl(tx, rx1), protocol, sid, parts));

tokio_tungstenite::WebSocketStream::from_raw_socket(
StreamImpl(tx1, rx),
Role::Client,
Default::default(),
)
.await
.unwrap()
.0
}

pub async fn create_server<H: EngineIoHandler>(handler: H, port: u16) {
pub async fn create_server<H: EngineIoHandler>(handler: H) -> EngineIoService<H> {
let config = EngineIoConfig::builder()
.ping_interval(Duration::from_millis(300))
.ping_timeout(Duration::from_millis(200))
.max_payload(1e6 as u64)
.build();

let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);

let svc = EngineIoService::with_config(Arc::new(handler), config);

let listener = TcpListener::bind(&addr).await.unwrap();
tokio::spawn(async move {
// We start a loop to continuously accept incoming connections
loop {
let (stream, _) = listener.accept().await.unwrap();

// Use an adapter to access something implementing `tokio::io` traits as if they implement
// `hyper::rt` IO traits.
let io = TokioIo::new(stream);
let svc = svc.clone();

// Spawn a tokio task to serve multiple connections concurrently
tokio::task::spawn(async move {
// Finally, we bind the incoming connection to our `hello` service
if let Err(err) = http1::Builder::new()
.serve_connection(io, svc)
.with_upgrades()
.await
{
println!("Error serving connection: {:?}", err);
}
});
}
});
EngineIoService::with_config(Arc::new(handler), config)
}
Loading