Skip to content

Commit

Permalink
fix(engineio): heartbeat delay (#437)
Browse files Browse the repository at this point in the history
* chore(ci/e2e): fix polling ping timeout test

In case of http polling heartbeat timeout, engine.io might send a close packet to an existing polling connection before closing.
Fix the test for this case.

* feat(engineio/socket): fix heartbeat delay.

A delay was arbitrarily before starting the engine.io delay.
The goal was mostly to comply with the socket.io test suite which was
not checking the ping packet before doing other operations.

With the js implementation it is OK because it is slow.
But it was causing issue with this one.

Fix many tests to work with this issue.
  • Loading branch information
Totodore authored Jan 15, 2025
1 parent a3f48f7 commit e47353d
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 37 deletions.
16 changes: 8 additions & 8 deletions crates/engineioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,6 @@ where
.try_lock()
.expect("Pong rx should be locked only once");

let instant = tokio::time::Instant::now();
// Sleep for an interval minus the time it took to get here
tokio::time::sleep(interval.saturating_sub(Duration::from_millis(
15 + instant.elapsed().as_millis() as u64,
)))
.await;

#[cfg(feature = "tracing")]
tracing::debug!(sid = ?self.id, "heartbeat sender routine started");

Expand Down Expand Up @@ -513,7 +506,14 @@ where
sid: Sid,
close_fn: Box<dyn Fn(Sid, DisconnectReason) + Send + Sync>,
) -> Arc<Socket<D>> {
Socket::new_dummy_piped(sid, close_fn, 1024).0
let (s, mut rx) = Socket::new_dummy_piped(sid, close_fn, 1024);
tokio::spawn(async move {
while let Some(el) = rx.recv().await {
#[cfg(feature = "tracing")]
tracing::debug!(?sid, ?el, "emitting eio msg");
}
});
s
}

/// Create a dummy socket for testing purpose with a
Expand Down
10 changes: 8 additions & 2 deletions crates/engineioxide/tests/disconnect_reason.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ impl EngineIoHandler for MyHandler {
pub async fn polling_heartbeat_timeout() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1234).await;
tokio::time::sleep(Duration::from_millis(500)).await;
create_polling_connection(1234).await;

let data = tokio::time::timeout(Duration::from_millis(500), rx.recv())
Expand Down Expand Up @@ -93,7 +92,7 @@ pub async fn polling_transport_closed() {
)
.await;

let data = tokio::time::timeout(Duration::from_millis(1), rx.recv())
let data = tokio::time::timeout(Duration::from_millis(10), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::TransportClose")
.unwrap();
Expand Down Expand Up @@ -122,6 +121,13 @@ pub async fn multiple_http_polling() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1236).await;
let sid = create_polling_connection(1236).await;
send_req(
1236,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
)
.await; // we eat the first ping from the server.

tokio::spawn(futures_util::future::join_all(vec![
send_req(
Expand Down
19 changes: 11 additions & 8 deletions crates/socketioxide/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ mod test {
use tokio::sync::mpsc;

use crate::adapter::LocalAdapter;
const CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10);
const CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);

fn create_client() -> Arc<super::Client<LocalAdapter>> {
let config = crate::SocketIoConfig {
Expand Down Expand Up @@ -457,25 +457,28 @@ mod test {
#[tokio::test]
async fn connect_timeout_fail() {
let client = create_client();
let (tx, mut rx) = mpsc::channel(1);
let close_fn = Box::new(move |_, _| tx.try_send(()).unwrap());
let (close_tx, mut close_rx) = mpsc::channel(1);
let close_fn = Box::new(move |_, reason| close_tx.try_send(reason).unwrap());
let sock = EIoSocket::new_dummy(Sid::new(), close_fn);
client.on_connect(sock.clone());
tokio::time::timeout(CONNECT_TIMEOUT * 2, rx.recv())
// The socket is closed
let res = tokio::time::timeout(CONNECT_TIMEOUT * 2, close_rx.recv())
.await
.unwrap()
.unwrap();
// applied in case of ns timeout
assert_eq!(res, Some(EIoDisconnectReason::TransportClose));
}

#[tokio::test]
async fn connect_timeout() {
let client = create_client();
let (tx, mut rx) = mpsc::channel(1);
let close_fn = Box::new(move |_, _| tx.try_send(()).unwrap());
let (close_tx, mut close_rx) = mpsc::channel(1);
let close_fn = Box::new(move |_, reason| close_tx.try_send(reason).unwrap());
let sock = EIoSocket::new_dummy(Sid::new(), close_fn);
client.clone().on_connect(sock.clone());
client.on_message("0".into(), sock.clone());
tokio::time::timeout(CONNECT_TIMEOUT * 2, rx.recv())
// The socket is not closed.
tokio::time::timeout(CONNECT_TIMEOUT * 2, close_rx.recv())
.await
.unwrap_err();
}
Expand Down
1 change: 0 additions & 1 deletion crates/socketioxide/tests/disconnect_reason.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ pub async fn server_ws_closing() {
let mut streams =
futures_util::future::join_all((0..100).map(|_| create_ws_connection(12350))).await;
futures_util::future::join_all(streams.iter_mut().map(|s| async move {
s.next().await; // engine.io open packet
s.next().await; // socket.io open packet
}))
.await;
Expand Down
7 changes: 6 additions & 1 deletion crates/socketioxide/tests/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use engineioxide::service::NotFoundService;
use futures_util::SinkExt;
use futures_util::{SinkExt, StreamExt};
use http::Request;
use http_body_util::{BodyExt, Either, Empty, Full};
use hyper::server::conn::http1;
Expand Down Expand Up @@ -101,8 +101,13 @@ pub async fn create_ws_connection(port: u16) -> WebSocketStream<MaybeTlsStream<T
.unwrap()
.0;

assert!(matches!(ws.next().await, Some(Ok(Message::Text(msg))) if msg.starts_with("0"))); // engine.io connect.
assert!(matches!(ws.next().await, Some(Ok(Message::Text(msg))) if msg == "2")); // engine.io ping.
ws.send(Message::Text("40{}".into())).await.unwrap();

// wait the time that the socket is connected.
tokio::time::sleep(Duration::from_millis(10)).await;

ws
}

Expand Down
22 changes: 20 additions & 2 deletions e2e/engineioxide/test-suites/v4.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@ async function initLongPollingSession() {
`${POLLING_URL}/engine.io/?EIO=4&transport=polling`,
);
const content = await response.text();
return JSON.parse(content.substring(1)).sid;
const sid = JSON.parse(content.substring(1)).sid;

// receive ping packet
const pingRes = await fetch(
`${POLLING_URL}/engine.io/?EIO=4&transport=polling&sid=${sid}`,
);
assert.equal(pingRes.status, 200);
const ping = await pingRes.text();
assert.equal(ping, "2");

return sid;
}

describe("Engine.IO protocol", () => {
Expand Down Expand Up @@ -309,6 +319,7 @@ describe("Engine.IO protocol", () => {
await waitForEvent(socket, "open");

await waitForMessage(socket); // handshake
await waitForMessage(socket); // ping

socket.send("4hello");

Expand All @@ -326,6 +337,7 @@ describe("Engine.IO protocol", () => {
socket.binaryType = "arraybuffer";

await waitForMessage(socket); // handshake
await waitForMessage(socket); // ping

socket.send(Uint8Array.from([1, 2, 3, 4]));

Expand All @@ -342,6 +354,7 @@ describe("Engine.IO protocol", () => {
);

await waitForMessage(socket); // handshake
await waitForMessage(socket); // ping

socket.send("abc");

Expand All @@ -355,7 +368,11 @@ describe("Engine.IO protocol", () => {
describe("heartbeat", { timeout: 5000 }, function () {
describe("HTTP long-polling", () => {
it("sends ping/pong packets", async () => {
const sid = await initLongPollingSession();
const response = await fetch(
`${POLLING_URL}/engine.io/?EIO=4&transport=polling`,
);
const content = await response.text();
const sid = JSON.parse(content.substring(1)).sid;

for (let i = 0; i < 3; i++) {
const pollResponse = await fetch(
Expand Down Expand Up @@ -462,6 +479,7 @@ describe("Engine.IO protocol", () => {
);

await waitForMessage(socket); // handshake
await waitForMessage(socket); // ping

socket.send("1");

Expand Down
2 changes: 1 addition & 1 deletion e2e/socketioxide/socketioxide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(all(feature = "v4", not(feature = "msgpack")))]
info!("Starting server with v4 protocol and common parser");

let listener = TcpListener::bind("127.0.0.1:3000").await?;
let listener = TcpListener::bind("0.0.0.0:3000").await?;

// We start a loop to continuously accept incoming connections
loop {
Expand Down
30 changes: 25 additions & 5 deletions e2e/socketioxide/test-suites/v5-msgpack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@ async function initLongPollingSession() {
`${POLLING_URL}/socket.io/?EIO=4&transport=polling`,
);
const content = await response.text();
return JSON.parse(content.substring(1)).sid;
const sid = JSON.parse(content.substring(1)).sid;
// receive ping packet
const pingRes = await fetch(
`${POLLING_URL}/socket.io/?EIO=4&transport=polling&sid=${sid}`,
);
assert.equal(pingRes.status, 200);
const ping = await pingRes.text();
assert.equal(ping, "2");

return sid;
}

async function initSocketIOConnection() {
Expand All @@ -27,6 +36,7 @@ async function initSocketIOConnection() {
socket.binaryType = "arraybuffer";

await waitForMessage(socket); // Engine.IO handshake
await waitForMessage(socket); // Engine.IO ping

socket.send(encode({ type: 0, nsp: "/" }));

Expand Down Expand Up @@ -181,7 +191,11 @@ describe("Engine.IO protocol", () => {
describe("heartbeat", { timeout: 5000 }, function () {
describe("HTTP long-polling", () => {
it("should send ping/pong packets", async () => {
const sid = await initLongPollingSession();
const response = await fetch(
`${POLLING_URL}/socket.io/?EIO=4&transport=polling`,
);
const content = await response.text();
const sid = JSON.parse(content.substring(1)).sid;

for (let i = 0; i < 3; i++) {
const pollResponse = await fetch(
Expand Down Expand Up @@ -364,6 +378,7 @@ describe("Socket.IO protocol", () => {
);

await waitForMessage(socket); // Engine.IO handshake
await waitForMessage(socket); // Engine.IO ping

socket.send(encode({ type: 0, nsp: "/", data: undefined }));

Expand Down Expand Up @@ -392,6 +407,7 @@ describe("Socket.IO protocol", () => {
);

await waitForMessage(socket); // Engine.IO handshake
await waitForMessage(socket); // Engine.IO ping

socket.send(encode({ type: 0, nsp: "/", data: { token: "123" } }));

Expand Down Expand Up @@ -419,6 +435,7 @@ describe("Socket.IO protocol", () => {
);

await waitForMessage(socket); // Engine.IO handshake
await waitForMessage(socket); // Engine.IO ping
socket.send(encode({ type: 0, nsp: "/custom" }));

const handshake = decode(await waitForMessage(socket));
Expand All @@ -445,6 +462,8 @@ describe("Socket.IO protocol", () => {
);

await waitForMessage(socket); // Engine.IO handshake
await waitForMessage(socket); // Engine.IO ping

socket.send(encode({ type: 0, nsp: "/custom", data: { token: "abc" } }));

const handshake = decode(await waitForMessage(socket));
Expand All @@ -471,6 +490,7 @@ describe("Socket.IO protocol", () => {
);

await waitForMessage(socket); // Engine.IO handshake
await waitForMessage(socket); // Engine.IO ping
socket.send(encode({ type: 0, nsp: "/random" }));

const msg = decode(await waitForMessage(socket));
Expand All @@ -487,6 +507,7 @@ describe("Socket.IO protocol", () => {
);

await waitForMessage(socket); // Engine.IO handshake
await waitForMessage(socket); // Engine.IO ping

socket.send(new Uint8Array([4, 132, 23, 2]));

Expand All @@ -500,14 +521,13 @@ describe("Socket.IO protocol", () => {

socket.send(encode({ type: 1, nsp: "/" }));

const data = await waitForMessage(socket);
assert.equal(data, "2");
// The socket heartbeat should timeout.
await waitForEvent(socket, "close");
});

it("should connect then disconnect from a custom namespace", async () => {
const socket = await initSocketIOConnection();

await waitForMessage(socket); // ping
socket.send(encode({ type: 0, nsp: "/custom" }));

await waitForMessage(socket); // Socket.IO handshake
Expand Down
Loading

0 comments on commit e47353d

Please sign in to comment.