Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(engineio): mock http/ws connections and improve integration tests #447

Merged
merged 3 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/engineioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ tracing-subscriber.workspace = true
hyper = { workspace = true, features = ["server", "http1"] }
criterion.workspace = true
axum.workspace = true
hyper-util = { workspace = true, features = ["tokio", "client-legacy"] }
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io"], default-features = false }

[features]
v3 = ["memchr", "unicode-segmentation", "itoa"]
Expand Down
23 changes: 23 additions & 0 deletions crates/engineioxide/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,29 @@ where
}
}

#[cfg(feature = "__test_harness")]
#[doc(hidden)]
impl<H, Svc> EngineIoService<H, Svc>
where
H: EngineIoHandler,
{
/// Create a new engine.io conn over websocket through a raw stream.
/// Mostly used for testing.
pub fn ws_init<S>(
&self,
conn: S,
protocol: ProtocolVersion,
sid: Option<crate::sid::Sid>,
req_data: http::request::Parts,
) -> impl std::future::Future<Output = Result<(), crate::errors::Error>>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
let engine = self.engine.clone();
crate::transport::ws::on_init(engine, conn, protocol, sid, req_data)
}
}

/// A MakeService that always returns a clone of the [`EngineIoService`] it was created with.
pub struct MakeEngineIoService<H: EngineIoHandler, S> {
svc: EngineIoService<H, S>,
Expand Down
2 changes: 1 addition & 1 deletion crates/engineioxide/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub fn new_req<R: Send + 'static, B, H: EngineIoHandler>(
/// Sends an open packet if it is not an upgrade from a polling request
///
/// Read packets from the websocket and handle them, it will block until the connection is closed
async fn on_init<H: EngineIoHandler, S>(
pub async fn on_init<H: EngineIoHandler, S>(
engine: Arc<EngineIo<H>>,
conn: S,
protocol: ProtocolVersion,
Expand Down
42 changes: 21 additions & 21 deletions crates/engineioxide/tests/disconnect_reason.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use tokio::sync::mpsc;

mod fixture;

use fixture::{create_server, send_req};
use fixture::{create_server, create_ws_connection, send_req};
use tokio_tungstenite::tungstenite::Message;

use crate::fixture::{create_polling_connection, create_ws_connection};
use crate::fixture::create_polling_connection;

#[derive(Debug, Clone)]
struct MyHandler {
Expand Down Expand Up @@ -53,8 +53,8 @@ impl EngineIoHandler for MyHandler {
#[tokio::test]
pub async fn polling_heartbeat_timeout() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1234).await;
create_polling_connection(1234).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
create_polling_connection(&mut svc).await;

let data = tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
Expand All @@ -67,8 +67,8 @@ pub async fn polling_heartbeat_timeout() {
#[tokio::test]
pub async fn ws_heartbeat_timeout() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 12344).await;
let _stream = create_ws_connection(12344).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let _stream = create_ws_connection(&mut svc).await;

let data = tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
Expand All @@ -81,11 +81,11 @@ pub async fn ws_heartbeat_timeout() {
#[tokio::test]
pub async fn polling_transport_closed() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1235).await;
let sid = create_polling_connection(1235).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let sid = create_polling_connection(&mut svc).await;

send_req(
1235,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::POST,
Some("1".into()),
Expand All @@ -103,8 +103,8 @@ pub async fn polling_transport_closed() {
#[tokio::test]
pub async fn ws_transport_closed() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 12345).await;
let mut stream = create_ws_connection(12345).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let mut stream = create_ws_connection(&mut svc).await;

stream.send(Message::Text("1".into())).await.unwrap();

Expand All @@ -119,10 +119,10 @@ pub async fn ws_transport_closed() {
#[tokio::test]
pub async fn multiple_http_polling() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1236).await;
let sid = create_polling_connection(1236).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let sid = create_polling_connection(&mut svc).await;
send_req(
1236,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
Expand All @@ -131,13 +131,13 @@ pub async fn multiple_http_polling() {

tokio::spawn(futures_util::future::join_all(vec![
send_req(
1236,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
),
send_req(
1236,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
Expand All @@ -155,10 +155,10 @@ pub async fn multiple_http_polling() {
#[tokio::test]
pub async fn polling_packet_parsing() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1237).await;
let sid = create_polling_connection(1237).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let sid = create_polling_connection(&mut svc).await;
send_req(
1237,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::POST,
Some("aizdunazidaubdiz".into()),
Expand All @@ -176,8 +176,8 @@ pub async fn polling_packet_parsing() {
#[tokio::test]
pub async fn ws_packet_parsing() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 12347).await;
let mut stream = create_ws_connection(12347).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let mut stream = create_ws_connection(&mut svc).await;
stream
.send(Message::Text("aizdunazidaubdiz".into()))
.await
Expand Down
Loading
Loading