diff --git a/crates/engineioxide/src/socket.rs b/crates/engineioxide/src/socket.rs index 8afc0b18..2477afa9 100644 --- a/crates/engineioxide/src/socket.rs +++ b/crates/engineioxide/src/socket.rs @@ -329,13 +329,6 @@ where .try_lock() .expect("Pong rx should be locked only once"); - let instant = tokio::time::Instant::now(); - // Sleep for an interval minus the time it took to get here - tokio::time::sleep(interval.saturating_sub(Duration::from_millis( - 15 + instant.elapsed().as_millis() as u64, - ))) - .await; - #[cfg(feature = "tracing")] tracing::debug!(sid = ?self.id, "heartbeat sender routine started"); @@ -513,7 +506,14 @@ where sid: Sid, close_fn: Box, ) -> Arc> { - Socket::new_dummy_piped(sid, close_fn, 1024).0 + let (s, mut rx) = Socket::new_dummy_piped(sid, close_fn, 1024); + tokio::spawn(async move { + while let Some(el) = rx.recv().await { + #[cfg(feature = "tracing")] + tracing::debug!(?sid, ?el, "emitting eio msg"); + } + }); + s } /// Create a dummy socket for testing purpose with a diff --git a/crates/engineioxide/tests/disconnect_reason.rs b/crates/engineioxide/tests/disconnect_reason.rs index 7339f12d..b29ab819 100644 --- a/crates/engineioxide/tests/disconnect_reason.rs +++ b/crates/engineioxide/tests/disconnect_reason.rs @@ -54,7 +54,6 @@ impl EngineIoHandler for MyHandler { pub async fn polling_heartbeat_timeout() { let (disconnect_tx, mut rx) = mpsc::channel(10); create_server(MyHandler { disconnect_tx }, 1234).await; - tokio::time::sleep(Duration::from_millis(500)).await; create_polling_connection(1234).await; let data = tokio::time::timeout(Duration::from_millis(500), rx.recv()) @@ -93,7 +92,7 @@ pub async fn polling_transport_closed() { ) .await; - let data = tokio::time::timeout(Duration::from_millis(1), rx.recv()) + let data = tokio::time::timeout(Duration::from_millis(10), rx.recv()) .await .expect("timeout waiting for DisconnectReason::TransportClose") .unwrap(); @@ -122,6 +121,13 @@ pub async fn multiple_http_polling() { let (disconnect_tx, mut rx) = mpsc::channel(10); create_server(MyHandler { disconnect_tx }, 1236).await; let sid = create_polling_connection(1236).await; + send_req( + 1236, + format!("transport=polling&sid={sid}"), + http::Method::GET, + None, + ) + .await; // we eat the first ping from the server. tokio::spawn(futures_util::future::join_all(vec![ send_req( diff --git a/crates/socketioxide/src/client.rs b/crates/socketioxide/src/client.rs index c1285e04..6205186d 100644 --- a/crates/socketioxide/src/client.rs +++ b/crates/socketioxide/src/client.rs @@ -419,7 +419,7 @@ mod test { use tokio::sync::mpsc; use crate::adapter::LocalAdapter; - const CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10); + const CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50); fn create_client() -> Arc> { let config = crate::SocketIoConfig { @@ -457,25 +457,28 @@ mod test { #[tokio::test] async fn connect_timeout_fail() { let client = create_client(); - let (tx, mut rx) = mpsc::channel(1); - let close_fn = Box::new(move |_, _| tx.try_send(()).unwrap()); + let (close_tx, mut close_rx) = mpsc::channel(1); + let close_fn = Box::new(move |_, reason| close_tx.try_send(reason).unwrap()); let sock = EIoSocket::new_dummy(Sid::new(), close_fn); client.on_connect(sock.clone()); - tokio::time::timeout(CONNECT_TIMEOUT * 2, rx.recv()) + // The socket is closed + let res = tokio::time::timeout(CONNECT_TIMEOUT * 2, close_rx.recv()) .await - .unwrap() .unwrap(); + // applied in case of ns timeout + assert_eq!(res, Some(EIoDisconnectReason::TransportClose)); } #[tokio::test] async fn connect_timeout() { let client = create_client(); - let (tx, mut rx) = mpsc::channel(1); - let close_fn = Box::new(move |_, _| tx.try_send(()).unwrap()); + let (close_tx, mut close_rx) = mpsc::channel(1); + let close_fn = Box::new(move |_, reason| close_tx.try_send(reason).unwrap()); let sock = EIoSocket::new_dummy(Sid::new(), close_fn); client.clone().on_connect(sock.clone()); client.on_message("0".into(), sock.clone()); - tokio::time::timeout(CONNECT_TIMEOUT * 2, rx.recv()) + // The socket is not closed. + tokio::time::timeout(CONNECT_TIMEOUT * 2, close_rx.recv()) .await .unwrap_err(); } diff --git a/crates/socketioxide/tests/disconnect_reason.rs b/crates/socketioxide/tests/disconnect_reason.rs index 7f893321..49168f04 100644 --- a/crates/socketioxide/tests/disconnect_reason.rs +++ b/crates/socketioxide/tests/disconnect_reason.rs @@ -253,7 +253,6 @@ pub async fn server_ws_closing() { let mut streams = futures_util::future::join_all((0..100).map(|_| create_ws_connection(12350))).await; futures_util::future::join_all(streams.iter_mut().map(|s| async move { - s.next().await; // engine.io open packet s.next().await; // socket.io open packet })) .await; diff --git a/crates/socketioxide/tests/fixture.rs b/crates/socketioxide/tests/fixture.rs index 5a6fea5c..97c0578f 100644 --- a/crates/socketioxide/tests/fixture.rs +++ b/crates/socketioxide/tests/fixture.rs @@ -7,7 +7,7 @@ use std::{ }; use engineioxide::service::NotFoundService; -use futures_util::SinkExt; +use futures_util::{SinkExt, StreamExt}; use http::Request; use http_body_util::{BodyExt, Either, Empty, Full}; use hyper::server::conn::http1; @@ -101,8 +101,13 @@ pub async fn create_ws_connection(port: u16) -> WebSocketStream { @@ -309,6 +319,7 @@ describe("Engine.IO protocol", () => { await waitForEvent(socket, "open"); await waitForMessage(socket); // handshake + await waitForMessage(socket); // ping socket.send("4hello"); @@ -326,6 +337,7 @@ describe("Engine.IO protocol", () => { socket.binaryType = "arraybuffer"; await waitForMessage(socket); // handshake + await waitForMessage(socket); // ping socket.send(Uint8Array.from([1, 2, 3, 4])); @@ -342,6 +354,7 @@ describe("Engine.IO protocol", () => { ); await waitForMessage(socket); // handshake + await waitForMessage(socket); // ping socket.send("abc"); @@ -355,7 +368,11 @@ describe("Engine.IO protocol", () => { describe("heartbeat", { timeout: 5000 }, function () { describe("HTTP long-polling", () => { it("sends ping/pong packets", async () => { - const sid = await initLongPollingSession(); + const response = await fetch( + `${POLLING_URL}/engine.io/?EIO=4&transport=polling`, + ); + const content = await response.text(); + const sid = JSON.parse(content.substring(1)).sid; for (let i = 0; i < 3; i++) { const pollResponse = await fetch( @@ -462,6 +479,7 @@ describe("Engine.IO protocol", () => { ); await waitForMessage(socket); // handshake + await waitForMessage(socket); // ping socket.send("1"); diff --git a/e2e/socketioxide/socketioxide.rs b/e2e/socketioxide/socketioxide.rs index 5bb43982..14da5d77 100644 --- a/e2e/socketioxide/socketioxide.rs +++ b/e2e/socketioxide/socketioxide.rs @@ -80,7 +80,7 @@ async fn main() -> Result<(), Box> { #[cfg(all(feature = "v4", not(feature = "msgpack")))] info!("Starting server with v4 protocol and common parser"); - let listener = TcpListener::bind("127.0.0.1:3000").await?; + let listener = TcpListener::bind("0.0.0.0:3000").await?; // We start a loop to continuously accept incoming connections loop { diff --git a/e2e/socketioxide/test-suites/v5-msgpack.ts b/e2e/socketioxide/test-suites/v5-msgpack.ts index 5aabfa14..fc6e08e0 100644 --- a/e2e/socketioxide/test-suites/v5-msgpack.ts +++ b/e2e/socketioxide/test-suites/v5-msgpack.ts @@ -17,7 +17,16 @@ async function initLongPollingSession() { `${POLLING_URL}/socket.io/?EIO=4&transport=polling`, ); const content = await response.text(); - return JSON.parse(content.substring(1)).sid; + const sid = JSON.parse(content.substring(1)).sid; + // receive ping packet + const pingRes = await fetch( + `${POLLING_URL}/socket.io/?EIO=4&transport=polling&sid=${sid}`, + ); + assert.equal(pingRes.status, 200); + const ping = await pingRes.text(); + assert.equal(ping, "2"); + + return sid; } async function initSocketIOConnection() { @@ -27,6 +36,7 @@ async function initSocketIOConnection() { socket.binaryType = "arraybuffer"; await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send(encode({ type: 0, nsp: "/" })); @@ -181,7 +191,11 @@ describe("Engine.IO protocol", () => { describe("heartbeat", { timeout: 5000 }, function () { describe("HTTP long-polling", () => { it("should send ping/pong packets", async () => { - const sid = await initLongPollingSession(); + const response = await fetch( + `${POLLING_URL}/socket.io/?EIO=4&transport=polling`, + ); + const content = await response.text(); + const sid = JSON.parse(content.substring(1)).sid; for (let i = 0; i < 3; i++) { const pollResponse = await fetch( @@ -364,6 +378,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send(encode({ type: 0, nsp: "/", data: undefined })); @@ -392,6 +407,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send(encode({ type: 0, nsp: "/", data: { token: "123" } })); @@ -419,6 +435,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send(encode({ type: 0, nsp: "/custom" })); const handshake = decode(await waitForMessage(socket)); @@ -445,6 +462,8 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping + socket.send(encode({ type: 0, nsp: "/custom", data: { token: "abc" } })); const handshake = decode(await waitForMessage(socket)); @@ -471,6 +490,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send(encode({ type: 0, nsp: "/random" })); const msg = decode(await waitForMessage(socket)); @@ -487,6 +507,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send(new Uint8Array([4, 132, 23, 2])); @@ -500,14 +521,13 @@ describe("Socket.IO protocol", () => { socket.send(encode({ type: 1, nsp: "/" })); - const data = await waitForMessage(socket); - assert.equal(data, "2"); + // The socket heartbeat should timeout. + await waitForEvent(socket, "close"); }); it("should connect then disconnect from a custom namespace", async () => { const socket = await initSocketIOConnection(); - await waitForMessage(socket); // ping socket.send(encode({ type: 0, nsp: "/custom" })); await waitForMessage(socket); // Socket.IO handshake diff --git a/e2e/socketioxide/test-suites/v5.ts b/e2e/socketioxide/test-suites/v5.ts index d67270fc..6d5a7379 100644 --- a/e2e/socketioxide/test-suites/v5.ts +++ b/e2e/socketioxide/test-suites/v5.ts @@ -17,7 +17,16 @@ async function initLongPollingSession() { `${POLLING_URL}/socket.io/?EIO=4&transport=polling`, ); const content = await response.text(); - return JSON.parse(content.substring(1)).sid; + const sid = JSON.parse(content.substring(1)).sid; + // receive ping packet + const pingRes = await fetch( + `${POLLING_URL}/socket.io/?EIO=4&transport=polling&sid=${sid}`, + ); + assert.equal(pingRes.status, 200); + const ping = await pingRes.text(); + assert.equal(ping, "2"); + + return sid; } async function initSocketIOConnection() { @@ -27,6 +36,7 @@ async function initSocketIOConnection() { socket.binaryType = "arraybuffer"; await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send("40"); @@ -180,7 +190,11 @@ describe("Engine.IO protocol", () => { describe("heartbeat", { timeout: 5000 }, function () { describe("HTTP long-polling", () => { it("should send ping/pong packets", async () => { - const sid = await initLongPollingSession(); + const response = await fetch( + `${POLLING_URL}/socket.io/?EIO=4&transport=polling`, + ); + const content = await response.text(); + const sid = JSON.parse(content.substring(1)).sid; for (let i = 0; i < 3; i++) { const pollResponse = await fetch( @@ -361,6 +375,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send("40"); @@ -381,6 +396,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send('40{"token":"123"}'); @@ -404,6 +420,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send("40/custom,"); @@ -427,6 +444,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send('40/custom,{"token":"abc"}'); @@ -450,6 +468,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send("40/random"); @@ -464,6 +483,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send("4abc"); @@ -477,20 +497,17 @@ describe("Socket.IO protocol", () => { socket.send("41"); - const data = await waitForMessage(socket); - - assert.equal(data, "2"); + // The socket heartbeat will timeout and will close ultimately + await waitForEvent(socket, "close"); }); it("should connect then disconnect from a custom namespace", async () => { const socket = await initSocketIOConnection(); - await waitForMessage(socket); // ping - socket.send("40/custom"); - await waitForMessage(socket); // Socket.IO handshake - await waitForMessage(socket); // auth packet + await waitForMessage(socket); // Socket.IO handshake for /custom + await waitForMessage(socket); // auth packet for /custom socket.send("41/custom"); socket.send('42["message","message to main namespace",1,2]');