Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: waitFor message latency causing missed packets #33

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions test-suite/node-imports.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,32 @@
import fetch from "node-fetch";
import { WebSocket } from "ws";
import { WebSocket, createWebSocketStream } from "ws";
import chai from "chai";
import chaiString from "chai-string";

// Wrap WebSocket to provide a async iterator to yield messages
// This is a workaround for the lack of support for async iterators in ws
// It avoids the need to spawn a new event handler + promise for each message
class WebSocketStream extends WebSocket {
constructor(url) {
super(url);
this.stream = createWebSocketStream(this, {
objectMode: true,
readableObjectMode: true,
});
// ignore errors
this.stream.on("error", () => {});
}

// implement async iterator
async *iterator() {
for await (const message of this.stream.iterator({
destroyOnReturn: false,
})) {
yield message;
}
}
}

chai.use(chaiString);

globalThis.fetch = fetch;
globalThis.WebSocket = WebSocket;
globalThis.WebSocket = WebSocketStream;
globalThis.chai = chai;
139 changes: 0 additions & 139 deletions test-suite/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion test-suite/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
"chai": "^4.3.6",
"chai-string": "^1.5.0",
"mocha": "^9.2.1",
"node-fetch": "^3.2.0",
"prettier": "^2.5.1",
"ws": "^8.5.0"
}
Expand Down
68 changes: 60 additions & 8 deletions test-suite/test-suite.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ function sleep(delay) {
return new Promise((resolve) => setTimeout(resolve, delay));
}

function waitFor(socket, eventType) {
async function waitFor(socket, eventType) {
if (eventType == "message" && isNodejs) {
const { value: data } = await socket.iterator().next();
return { data };
}

return new Promise((resolve) => {
socket.addEventListener(
eventType,
Expand All @@ -29,8 +34,16 @@ function waitFor(socket, eventType) {
});
}

function waitForPackets(socket, count) {
async function waitForPackets(socket, count) {
const packets = [];
if (isNodejs) {
for await (const packet of socket.iterator()) {
packets.push(packet);
if (packets.length === count) {
return packets;
}
}
}

return new Promise((resolve) => {
const handler = (event) => {
Expand Down Expand Up @@ -531,11 +544,50 @@ describe("Socket.IO protocol", () => {
await waitFor(socket, "message"); // auth packet

socket.send("41/custom");
socket.send('42["message","message to main namespace"]');
socket.send('42["message","message to main namespace",1,2]');

const { data } = await waitFor(socket, "message");

expect(data).to.eql('42["message-back","message to main namespace"]');
expect(data).to.eql('42["message-back","message to main namespace",1,2]');
});
});

describe("acknowledgements", () => {
it("should emit with an ack expectation", async () => {
const socket = await initSocketIOConnection();

socket.send('42["emit-with-ack",1,"2",{"3":[true]}]');

const { data } = await waitFor(socket, "message");
expect(data).to.eql('421["emit-with-ack",1,"2",{"3":[true]}]');
socket.send('431[1,"2",{"3":[true]}]');

const { data: data2 } = await waitFor(socket, "message");
expect(data2).to.eql('42["emit-with-ack",1,"2",{"3":[true]}]');
});

it("should emit with a binary ack expectation", async () => {
const socket = await initSocketIOConnection();
const DATA =
'{"_placeholder":true,"num":0},{"_placeholder":true,"num":1},"test"';

socket.send(`452-["emit-with-ack",1,${DATA}]`);
socket.send(Uint8Array.from([1, 2, 3]));
socket.send(Uint8Array.from([4, 5, 6]));

let packets = await waitForPackets(socket, 3);
expect(packets[0]).to.eql(`452-1["emit-with-ack",1,${DATA}]`);
expect(packets[1]).to.eql(Uint8Array.from([1, 2, 3]).buffer);
expect(packets[2]).to.eql(Uint8Array.from([4, 5, 6]).buffer);

socket.send(`462-1[1,${DATA}]`);
socket.send(Uint8Array.from([1, 2, 3]));
socket.send(Uint8Array.from([4, 5, 6]));

packets = await waitForPackets(socket, 3);
expect(packets[0]).to.eql(`452-["emit-with-ack",1,${DATA}]`);
expect(packets[1]).to.eql(Uint8Array.from([1, 2, 3]).buffer);
expect(packets[2]).to.eql(Uint8Array.from([4, 5, 6]).buffer);
});
});

Expand All @@ -554,15 +606,15 @@ describe("Socket.IO protocol", () => {
const socket = await initSocketIOConnection();

socket.send(
'452-["message",{"_placeholder":true,"num":0},{"_placeholder":true,"num":1}]'
'452-["message",1,{"_placeholder":true,"num":0},{"_placeholder":true,"num":1}]'
);
socket.send(Uint8Array.from([1, 2, 3]));
socket.send(Uint8Array.from([4, 5, 6]));

const packets = await waitForPackets(socket, 3);

expect(packets[0]).to.eql(
'452-["message-back",{"_placeholder":true,"num":0},{"_placeholder":true,"num":1}]'
'452-["message-back",1,{"_placeholder":true,"num":0},{"_placeholder":true,"num":1}]'
);
expect(packets[1]).to.eql(Uint8Array.from([1, 2, 3]).buffer);
expect(packets[2]).to.eql(Uint8Array.from([4, 5, 6]).buffer);
Expand All @@ -584,15 +636,15 @@ describe("Socket.IO protocol", () => {
const socket = await initSocketIOConnection();

socket.send(
'452-789["message-with-ack",{"_placeholder":true,"num":0},{"_placeholder":true,"num":1}]'
'452-789["message-with-ack",1,{"_placeholder":true,"num":0},{"_placeholder":true,"num":1}]'
);
socket.send(Uint8Array.from([1, 2, 3]));
socket.send(Uint8Array.from([4, 5, 6]));

const packets = await waitForPackets(socket, 3);

expect(packets[0]).to.eql(
'462-789[{"_placeholder":true,"num":0},{"_placeholder":true,"num":1}]'
'462-789[1,{"_placeholder":true,"num":0},{"_placeholder":true,"num":1}]'
);
expect(packets[1]).to.eql(Uint8Array.from([1, 2, 3]).buffer);
expect(packets[2]).to.eql(Uint8Array.from([4, 5, 6]).buffer);
Expand Down