From e47353dae8743be2077bd204c4c1da125e5ad29e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Wed, 15 Jan 2025 19:08:41 +0100 Subject: [PATCH] fix(engineio): heartbeat delay (#437) * chore(ci/e2e): fix polling ping timeout test In case of http polling heartbeat timeout, engine.io might send a close packet to an existing polling connection before closing. Fix the test for this case. * feat(engineio/socket): fix heartbeat delay. A delay was arbitrarily before starting the engine.io delay. The goal was mostly to comply with the socket.io test suite which was not checking the ping packet before doing other operations. With the js implementation it is OK because it is slow. But it was causing issue with this one. Fix many tests to work with this issue. --- crates/engineioxide/src/socket.rs | 16 ++++----- .../engineioxide/tests/disconnect_reason.rs | 10 ++++-- crates/socketioxide/src/client.rs | 19 +++++----- .../socketioxide/tests/disconnect_reason.rs | 1 - crates/socketioxide/tests/fixture.rs | 7 +++- e2e/engineioxide/test-suites/v4.ts | 22 ++++++++++-- e2e/socketioxide/socketioxide.rs | 2 +- e2e/socketioxide/test-suites/v5-msgpack.ts | 30 +++++++++++++--- e2e/socketioxide/test-suites/v5.ts | 35 ++++++++++++++----- 9 files changed, 105 insertions(+), 37 deletions(-) diff --git a/crates/engineioxide/src/socket.rs b/crates/engineioxide/src/socket.rs index 8afc0b18..2477afa9 100644 --- a/crates/engineioxide/src/socket.rs +++ b/crates/engineioxide/src/socket.rs @@ -329,13 +329,6 @@ where .try_lock() .expect("Pong rx should be locked only once"); - let instant = tokio::time::Instant::now(); - // Sleep for an interval minus the time it took to get here - tokio::time::sleep(interval.saturating_sub(Duration::from_millis( - 15 + instant.elapsed().as_millis() as u64, - ))) - .await; - #[cfg(feature = "tracing")] tracing::debug!(sid = ?self.id, "heartbeat sender routine started"); @@ -513,7 +506,14 @@ where sid: Sid, close_fn: Box, ) -> Arc> { - Socket::new_dummy_piped(sid, close_fn, 1024).0 + let (s, mut rx) = Socket::new_dummy_piped(sid, close_fn, 1024); + tokio::spawn(async move { + while let Some(el) = rx.recv().await { + #[cfg(feature = "tracing")] + tracing::debug!(?sid, ?el, "emitting eio msg"); + } + }); + s } /// Create a dummy socket for testing purpose with a diff --git a/crates/engineioxide/tests/disconnect_reason.rs b/crates/engineioxide/tests/disconnect_reason.rs index 7339f12d..b29ab819 100644 --- a/crates/engineioxide/tests/disconnect_reason.rs +++ b/crates/engineioxide/tests/disconnect_reason.rs @@ -54,7 +54,6 @@ impl EngineIoHandler for MyHandler { pub async fn polling_heartbeat_timeout() { let (disconnect_tx, mut rx) = mpsc::channel(10); create_server(MyHandler { disconnect_tx }, 1234).await; - tokio::time::sleep(Duration::from_millis(500)).await; create_polling_connection(1234).await; let data = tokio::time::timeout(Duration::from_millis(500), rx.recv()) @@ -93,7 +92,7 @@ pub async fn polling_transport_closed() { ) .await; - let data = tokio::time::timeout(Duration::from_millis(1), rx.recv()) + let data = tokio::time::timeout(Duration::from_millis(10), rx.recv()) .await .expect("timeout waiting for DisconnectReason::TransportClose") .unwrap(); @@ -122,6 +121,13 @@ pub async fn multiple_http_polling() { let (disconnect_tx, mut rx) = mpsc::channel(10); create_server(MyHandler { disconnect_tx }, 1236).await; let sid = create_polling_connection(1236).await; + send_req( + 1236, + format!("transport=polling&sid={sid}"), + http::Method::GET, + None, + ) + .await; // we eat the first ping from the server. tokio::spawn(futures_util::future::join_all(vec![ send_req( diff --git a/crates/socketioxide/src/client.rs b/crates/socketioxide/src/client.rs index c1285e04..6205186d 100644 --- a/crates/socketioxide/src/client.rs +++ b/crates/socketioxide/src/client.rs @@ -419,7 +419,7 @@ mod test { use tokio::sync::mpsc; use crate::adapter::LocalAdapter; - const CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10); + const CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50); fn create_client() -> Arc> { let config = crate::SocketIoConfig { @@ -457,25 +457,28 @@ mod test { #[tokio::test] async fn connect_timeout_fail() { let client = create_client(); - let (tx, mut rx) = mpsc::channel(1); - let close_fn = Box::new(move |_, _| tx.try_send(()).unwrap()); + let (close_tx, mut close_rx) = mpsc::channel(1); + let close_fn = Box::new(move |_, reason| close_tx.try_send(reason).unwrap()); let sock = EIoSocket::new_dummy(Sid::new(), close_fn); client.on_connect(sock.clone()); - tokio::time::timeout(CONNECT_TIMEOUT * 2, rx.recv()) + // The socket is closed + let res = tokio::time::timeout(CONNECT_TIMEOUT * 2, close_rx.recv()) .await - .unwrap() .unwrap(); + // applied in case of ns timeout + assert_eq!(res, Some(EIoDisconnectReason::TransportClose)); } #[tokio::test] async fn connect_timeout() { let client = create_client(); - let (tx, mut rx) = mpsc::channel(1); - let close_fn = Box::new(move |_, _| tx.try_send(()).unwrap()); + let (close_tx, mut close_rx) = mpsc::channel(1); + let close_fn = Box::new(move |_, reason| close_tx.try_send(reason).unwrap()); let sock = EIoSocket::new_dummy(Sid::new(), close_fn); client.clone().on_connect(sock.clone()); client.on_message("0".into(), sock.clone()); - tokio::time::timeout(CONNECT_TIMEOUT * 2, rx.recv()) + // The socket is not closed. + tokio::time::timeout(CONNECT_TIMEOUT * 2, close_rx.recv()) .await .unwrap_err(); } diff --git a/crates/socketioxide/tests/disconnect_reason.rs b/crates/socketioxide/tests/disconnect_reason.rs index 7f893321..49168f04 100644 --- a/crates/socketioxide/tests/disconnect_reason.rs +++ b/crates/socketioxide/tests/disconnect_reason.rs @@ -253,7 +253,6 @@ pub async fn server_ws_closing() { let mut streams = futures_util::future::join_all((0..100).map(|_| create_ws_connection(12350))).await; futures_util::future::join_all(streams.iter_mut().map(|s| async move { - s.next().await; // engine.io open packet s.next().await; // socket.io open packet })) .await; diff --git a/crates/socketioxide/tests/fixture.rs b/crates/socketioxide/tests/fixture.rs index 5a6fea5c..97c0578f 100644 --- a/crates/socketioxide/tests/fixture.rs +++ b/crates/socketioxide/tests/fixture.rs @@ -7,7 +7,7 @@ use std::{ }; use engineioxide::service::NotFoundService; -use futures_util::SinkExt; +use futures_util::{SinkExt, StreamExt}; use http::Request; use http_body_util::{BodyExt, Either, Empty, Full}; use hyper::server::conn::http1; @@ -101,8 +101,13 @@ pub async fn create_ws_connection(port: u16) -> WebSocketStream { @@ -309,6 +319,7 @@ describe("Engine.IO protocol", () => { await waitForEvent(socket, "open"); await waitForMessage(socket); // handshake + await waitForMessage(socket); // ping socket.send("4hello"); @@ -326,6 +337,7 @@ describe("Engine.IO protocol", () => { socket.binaryType = "arraybuffer"; await waitForMessage(socket); // handshake + await waitForMessage(socket); // ping socket.send(Uint8Array.from([1, 2, 3, 4])); @@ -342,6 +354,7 @@ describe("Engine.IO protocol", () => { ); await waitForMessage(socket); // handshake + await waitForMessage(socket); // ping socket.send("abc"); @@ -355,7 +368,11 @@ describe("Engine.IO protocol", () => { describe("heartbeat", { timeout: 5000 }, function () { describe("HTTP long-polling", () => { it("sends ping/pong packets", async () => { - const sid = await initLongPollingSession(); + const response = await fetch( + `${POLLING_URL}/engine.io/?EIO=4&transport=polling`, + ); + const content = await response.text(); + const sid = JSON.parse(content.substring(1)).sid; for (let i = 0; i < 3; i++) { const pollResponse = await fetch( @@ -462,6 +479,7 @@ describe("Engine.IO protocol", () => { ); await waitForMessage(socket); // handshake + await waitForMessage(socket); // ping socket.send("1"); diff --git a/e2e/socketioxide/socketioxide.rs b/e2e/socketioxide/socketioxide.rs index 5bb43982..14da5d77 100644 --- a/e2e/socketioxide/socketioxide.rs +++ b/e2e/socketioxide/socketioxide.rs @@ -80,7 +80,7 @@ async fn main() -> Result<(), Box> { #[cfg(all(feature = "v4", not(feature = "msgpack")))] info!("Starting server with v4 protocol and common parser"); - let listener = TcpListener::bind("127.0.0.1:3000").await?; + let listener = TcpListener::bind("0.0.0.0:3000").await?; // We start a loop to continuously accept incoming connections loop { diff --git a/e2e/socketioxide/test-suites/v5-msgpack.ts b/e2e/socketioxide/test-suites/v5-msgpack.ts index 5aabfa14..fc6e08e0 100644 --- a/e2e/socketioxide/test-suites/v5-msgpack.ts +++ b/e2e/socketioxide/test-suites/v5-msgpack.ts @@ -17,7 +17,16 @@ async function initLongPollingSession() { `${POLLING_URL}/socket.io/?EIO=4&transport=polling`, ); const content = await response.text(); - return JSON.parse(content.substring(1)).sid; + const sid = JSON.parse(content.substring(1)).sid; + // receive ping packet + const pingRes = await fetch( + `${POLLING_URL}/socket.io/?EIO=4&transport=polling&sid=${sid}`, + ); + assert.equal(pingRes.status, 200); + const ping = await pingRes.text(); + assert.equal(ping, "2"); + + return sid; } async function initSocketIOConnection() { @@ -27,6 +36,7 @@ async function initSocketIOConnection() { socket.binaryType = "arraybuffer"; await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send(encode({ type: 0, nsp: "/" })); @@ -181,7 +191,11 @@ describe("Engine.IO protocol", () => { describe("heartbeat", { timeout: 5000 }, function () { describe("HTTP long-polling", () => { it("should send ping/pong packets", async () => { - const sid = await initLongPollingSession(); + const response = await fetch( + `${POLLING_URL}/socket.io/?EIO=4&transport=polling`, + ); + const content = await response.text(); + const sid = JSON.parse(content.substring(1)).sid; for (let i = 0; i < 3; i++) { const pollResponse = await fetch( @@ -364,6 +378,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send(encode({ type: 0, nsp: "/", data: undefined })); @@ -392,6 +407,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send(encode({ type: 0, nsp: "/", data: { token: "123" } })); @@ -419,6 +435,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send(encode({ type: 0, nsp: "/custom" })); const handshake = decode(await waitForMessage(socket)); @@ -445,6 +462,8 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping + socket.send(encode({ type: 0, nsp: "/custom", data: { token: "abc" } })); const handshake = decode(await waitForMessage(socket)); @@ -471,6 +490,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send(encode({ type: 0, nsp: "/random" })); const msg = decode(await waitForMessage(socket)); @@ -487,6 +507,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send(new Uint8Array([4, 132, 23, 2])); @@ -500,14 +521,13 @@ describe("Socket.IO protocol", () => { socket.send(encode({ type: 1, nsp: "/" })); - const data = await waitForMessage(socket); - assert.equal(data, "2"); + // The socket heartbeat should timeout. + await waitForEvent(socket, "close"); }); it("should connect then disconnect from a custom namespace", async () => { const socket = await initSocketIOConnection(); - await waitForMessage(socket); // ping socket.send(encode({ type: 0, nsp: "/custom" })); await waitForMessage(socket); // Socket.IO handshake diff --git a/e2e/socketioxide/test-suites/v5.ts b/e2e/socketioxide/test-suites/v5.ts index d67270fc..6d5a7379 100644 --- a/e2e/socketioxide/test-suites/v5.ts +++ b/e2e/socketioxide/test-suites/v5.ts @@ -17,7 +17,16 @@ async function initLongPollingSession() { `${POLLING_URL}/socket.io/?EIO=4&transport=polling`, ); const content = await response.text(); - return JSON.parse(content.substring(1)).sid; + const sid = JSON.parse(content.substring(1)).sid; + // receive ping packet + const pingRes = await fetch( + `${POLLING_URL}/socket.io/?EIO=4&transport=polling&sid=${sid}`, + ); + assert.equal(pingRes.status, 200); + const ping = await pingRes.text(); + assert.equal(ping, "2"); + + return sid; } async function initSocketIOConnection() { @@ -27,6 +36,7 @@ async function initSocketIOConnection() { socket.binaryType = "arraybuffer"; await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send("40"); @@ -180,7 +190,11 @@ describe("Engine.IO protocol", () => { describe("heartbeat", { timeout: 5000 }, function () { describe("HTTP long-polling", () => { it("should send ping/pong packets", async () => { - const sid = await initLongPollingSession(); + const response = await fetch( + `${POLLING_URL}/socket.io/?EIO=4&transport=polling`, + ); + const content = await response.text(); + const sid = JSON.parse(content.substring(1)).sid; for (let i = 0; i < 3; i++) { const pollResponse = await fetch( @@ -361,6 +375,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send("40"); @@ -381,6 +396,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send('40{"token":"123"}'); @@ -404,6 +420,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send("40/custom,"); @@ -427,6 +444,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send('40/custom,{"token":"abc"}'); @@ -450,6 +468,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send("40/random"); @@ -464,6 +483,7 @@ describe("Socket.IO protocol", () => { ); await waitForMessage(socket); // Engine.IO handshake + await waitForMessage(socket); // Engine.IO ping socket.send("4abc"); @@ -477,20 +497,17 @@ describe("Socket.IO protocol", () => { socket.send("41"); - const data = await waitForMessage(socket); - - assert.equal(data, "2"); + // The socket heartbeat will timeout and will close ultimately + await waitForEvent(socket, "close"); }); it("should connect then disconnect from a custom namespace", async () => { const socket = await initSocketIOConnection(); - await waitForMessage(socket); // ping - socket.send("40/custom"); - await waitForMessage(socket); // Socket.IO handshake - await waitForMessage(socket); // auth packet + await waitForMessage(socket); // Socket.IO handshake for /custom + await waitForMessage(socket); // auth packet for /custom socket.send("41/custom"); socket.send('42["message","message to main namespace",1,2]');