Skip to content

Commit

Permalink
test(engineio): mock http/ws connections
Browse files Browse the repository at this point in the history
  • Loading branch information
Totodore committed Jan 19, 2025
1 parent ff872c3 commit dacbe2e
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 86 deletions.
23 changes: 23 additions & 0 deletions crates/engineioxide/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,29 @@ where
}
}

#[cfg(feature = "__test_harness")]
#[doc(hidden)]
impl<H, Svc> EngineIoService<H, Svc>
where
H: EngineIoHandler,
{
/// Create a new engine.io over websocket through a raw stream.
/// Mostly used for testing.
pub fn ws_init<S>(
&self,
conn: S,
protocol: ProtocolVersion,
sid: Option<crate::sid::Sid>,
req_data: http::request::Parts,
) -> impl std::future::Future<Output = Result<(), crate::errors::Error>>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
let engine = self.engine.clone();
crate::transport::ws::on_init(engine, conn, protocol, sid, req_data)
}
}

/// A MakeService that always returns a clone of the [`EngineIoService`] it was created with.
pub struct MakeEngineIoService<H: EngineIoHandler, S> {
svc: EngineIoService<H, S>,
Expand Down
2 changes: 1 addition & 1 deletion crates/engineioxide/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub fn new_req<R: Send + 'static, B, H: EngineIoHandler>(
/// Sends an open packet if it is not an upgrade from a polling request
///
/// Read packets from the websocket and handle them, it will block until the connection is closed
async fn on_init<H: EngineIoHandler, S>(
pub async fn on_init<H: EngineIoHandler, S>(
engine: Arc<EngineIo<H>>,
conn: S,
protocol: ProtocolVersion,
Expand Down
42 changes: 21 additions & 21 deletions crates/engineioxide/tests/disconnect_reason.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use tokio::sync::mpsc;

mod fixture;

use fixture::{create_server, send_req};
use fixture::{create_server, create_ws_connection, send_req};
use tokio_tungstenite::tungstenite::Message;

use crate::fixture::{create_polling_connection, create_ws_connection};
use crate::fixture::create_polling_connection;

#[derive(Debug, Clone)]
struct MyHandler {
Expand Down Expand Up @@ -53,8 +53,8 @@ impl EngineIoHandler for MyHandler {
#[tokio::test]
pub async fn polling_heartbeat_timeout() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1234).await;
create_polling_connection(1234).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
create_polling_connection(&mut svc).await;

let data = tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
Expand All @@ -67,8 +67,8 @@ pub async fn polling_heartbeat_timeout() {
#[tokio::test]
pub async fn ws_heartbeat_timeout() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 12344).await;
let _stream = create_ws_connection(12344).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let _stream = create_ws_connection(&mut svc).await;

let data = tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
Expand All @@ -81,11 +81,11 @@ pub async fn ws_heartbeat_timeout() {
#[tokio::test]
pub async fn polling_transport_closed() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1235).await;
let sid = create_polling_connection(1235).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let sid = create_polling_connection(&mut svc).await;

send_req(
1235,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::POST,
Some("1".into()),
Expand All @@ -103,8 +103,8 @@ pub async fn polling_transport_closed() {
#[tokio::test]
pub async fn ws_transport_closed() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 12345).await;
let mut stream = create_ws_connection(12345).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let mut stream = create_ws_connection(&mut svc).await;

stream.send(Message::Text("1".into())).await.unwrap();

Expand All @@ -119,10 +119,10 @@ pub async fn ws_transport_closed() {
#[tokio::test]
pub async fn multiple_http_polling() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1236).await;
let sid = create_polling_connection(1236).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let sid = create_polling_connection(&mut svc).await;
send_req(
1236,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
Expand All @@ -131,13 +131,13 @@ pub async fn multiple_http_polling() {

tokio::spawn(futures_util::future::join_all(vec![
send_req(
1236,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
),
send_req(
1236,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
Expand All @@ -155,10 +155,10 @@ pub async fn multiple_http_polling() {
#[tokio::test]
pub async fn polling_packet_parsing() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1237).await;
let sid = create_polling_connection(1237).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let sid = create_polling_connection(&mut svc).await;
send_req(
1237,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::POST,
Some("aizdunazidaubdiz".into()),
Expand All @@ -176,8 +176,8 @@ pub async fn polling_packet_parsing() {
#[tokio::test]
pub async fn ws_packet_parsing() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 12347).await;
let mut stream = create_ws_connection(12347).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let mut stream = create_ws_connection(&mut svc).await;
stream
.send(Message::Text("aizdunazidaubdiz".into()))
.await
Expand Down
179 changes: 115 additions & 64 deletions crates/engineioxide/tests/fixture.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
use std::{
collections::VecDeque,
net::{IpAddr, Ipv4Addr, SocketAddr},
future::Future,
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use engineioxide::{config::EngineIoConfig, handler::EngineIoHandler, service::EngineIoService};
use bytes::{BufMut, Bytes};
use engineioxide::{
config::EngineIoConfig, handler::EngineIoHandler, service::EngineIoService, sid::Sid,
ProtocolVersion,
};
use http::Request;
use http_body_util::{BodyExt, Either, Empty, Full};
use hyper::server::conn::http1;
use hyper_util::{
client::legacy::Client,
rt::{TokioExecutor, TokioIo},
};
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
sync::mpsc,
};
use tokio_tungstenite::{
tungstenite::{handshake::client::generate_key, protocol::Role},
WebSocketStream,
};
use tower_service::Service;

/// An OpenPacket is used to initiate a connection
#[derive(Debug, Serialize, Deserialize, PartialEq, PartialOrd)]
Expand All @@ -29,41 +38,43 @@ struct OpenPacket {
}

/// Params should be in the form of `key1=value1&key2=value2`
pub async fn send_req(
port: u16,
pub fn send_req<H: EngineIoHandler>(
svc: &mut EngineIoService<H>,
params: String,
method: http::Method,
body: Option<String>,
) -> String {
) -> impl Future<Output = String> + 'static {
let body = match body {
Some(b) => Either::Left(Full::new(VecDeque::from(b.into_bytes()))),
None => Either::Right(Empty::<VecDeque<u8>>::new()),
};

let req = Request::builder()
.method(method)
.uri(format!(
"http://127.0.0.1:{port}/engine.io/?EIO=4&{}",
params
))
.uri(format!("http://127.0.0.1/engine.io/?EIO=4&{}", params))
.body(body)
.unwrap();
let mut res = Client::builder(TokioExecutor::new())
.build_http()
.request(req)
.await
.unwrap();
let body = res.body_mut().collect().await.unwrap().to_bytes();
String::from_utf8(body.to_vec())
.unwrap()
.chars()
.skip(1)
.collect()
let res = svc.call(req);
async move {
let body = res
.await
.unwrap()
.body_mut()
.collect()
.await
.unwrap()
.to_bytes();
String::from_utf8(body.to_vec())
.unwrap()
.chars()
.skip(1)
.collect()
}
}

pub async fn create_polling_connection(port: u16) -> String {
pub async fn create_polling_connection<H: EngineIoHandler>(svc: &mut EngineIoService<H>) -> String {
let body = send_req(
port,
svc,
"transport=polling".to_string(),
http::Method::GET,
None,
Expand All @@ -72,48 +83,88 @@ pub async fn create_polling_connection(port: u16) -> String {
let open_packet: OpenPacket = serde_json::from_str(&body).unwrap();
open_packet.sid
}
pub async fn create_ws_connection(port: u16) -> WebSocketStream<MaybeTlsStream<TcpStream>> {
tokio_tungstenite::connect_async(format!(
"ws://127.0.0.1:{port}/engine.io/?EIO=4&transport=websocket"
))
pub async fn create_ws_connection<H: EngineIoHandler>(
svc: &mut EngineIoService<H>,
) -> WebSocketStream<StreamImpl> {
new_ws_mock_conn(svc, ProtocolVersion::V4, None).await
}

pub struct StreamImpl(mpsc::UnboundedSender<Bytes>, mpsc::UnboundedReceiver<Bytes>);

impl AsyncRead for StreamImpl {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
self.1.poll_recv(cx).map(|e| {
if let Some(e) = e {
buf.put(e);
}
Ok(())
})
}
}
impl AsyncWrite for StreamImpl {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let len = buf.len();
self.0.send(Bytes::copy_from_slice(buf)).unwrap();
Poll::Ready(Ok(len))
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
self.1.close();
Poll::Ready(Ok(()))
}
}
async fn new_ws_mock_conn<H: EngineIoHandler>(
svc: &mut EngineIoService<H>,
protocol: ProtocolVersion,
sid: Option<Sid>,
) -> WebSocketStream<StreamImpl> {
let (tx, rx) = mpsc::unbounded_channel();
let (tx1, rx1) = mpsc::unbounded_channel();

let parts = Request::builder()
.method("GET")
.header("Host", "127.0.0.1")
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Key", generate_key())
.uri("ws://127.0.0.1/engine.io/?EIO=4&transport=websocket")
.body(http_body_util::Empty::<Bytes>::new())
.unwrap()
.into_parts()
.0;

tokio::spawn(svc.ws_init(StreamImpl(tx, rx1), protocol, sid, parts));

tokio_tungstenite::WebSocketStream::from_raw_socket(
StreamImpl(tx1, rx),
Role::Client,
Default::default(),
)
.await
.unwrap()
.0
}

pub async fn create_server<H: EngineIoHandler>(handler: H, port: u16) {
pub async fn create_server<H: EngineIoHandler>(handler: H) -> EngineIoService<H> {
let config = EngineIoConfig::builder()
.ping_interval(Duration::from_millis(300))
.ping_timeout(Duration::from_millis(200))
.max_payload(1e6 as u64)
.build();

let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);

let svc = EngineIoService::with_config(Arc::new(handler), config);

let listener = TcpListener::bind(&addr).await.unwrap();
tokio::spawn(async move {
// We start a loop to continuously accept incoming connections
loop {
let (stream, _) = listener.accept().await.unwrap();

// Use an adapter to access something implementing `tokio::io` traits as if they implement
// `hyper::rt` IO traits.
let io = TokioIo::new(stream);
let svc = svc.clone();

// Spawn a tokio task to serve multiple connections concurrently
tokio::task::spawn(async move {
// Finally, we bind the incoming connection to our `hello` service
if let Err(err) = http1::Builder::new()
.serve_connection(io, svc)
.with_upgrades()
.await
{
println!("Error serving connection: {:?}", err);
}
});
}
});
EngineIoService::with_config(Arc::new(handler), config)
}

0 comments on commit dacbe2e

Please sign in to comment.