diff --git a/.github/workflows/github-ci.yml b/.github/workflows/github-ci.yml index 8a45f14e..e40f51cd 100644 --- a/.github/workflows/github-ci.yml +++ b/.github/workflows/github-ci.yml @@ -196,7 +196,8 @@ jobs: run: | cd e2e/engineioxide/test-suites && npm install && cd ../../.. cargo build -p engineioxide-e2e --bin engineioxide-e2e -F ${{ matrix.engineio-version }} -r - cargo run -p engineioxide-e2e --bin engineioxide-e2e -F ${{ matrix.engineio-version }} -r > server.txt & node --test-reporter=spec --experimental-strip-types e2e/engineioxide/test-suites/${{ matrix.engineio-version }}.ts > client.txt + cargo run -p engineioxide-e2e --bin engineioxide-e2e -F ${{ matrix.engineio-version }} -r > server.txt && sleep 1 & \ + node --test-reporter=spec --experimental-strip-types e2e/engineioxide/test-suites/${{ matrix.engineio-version }}.ts > client.txt - name: Server output if: always() run: cat server.txt @@ -233,7 +234,8 @@ jobs: VERSION=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f1) cd e2e/socketioxide/test-suites && npm install && cd ../../.. cargo build -p socketioxide-e2e --bin socketioxide-e2e -F $VERSION,$PARSER -r - cargo run -p socketioxide-e2e --bin socketioxide-e2e -F $VERSION,$PARSER -r > server.txt & node --test-reporter=spec --experimental-strip-types e2e/socketioxide/test-suites/${{ matrix.socketio-version }}.ts > client.txt + cargo run -p socketioxide-e2e --bin socketioxide-e2e -F $VERSION,$PARSER -r > server.txt && sleep 1 & \ + node --test-reporter=spec --experimental-strip-types e2e/socketioxide/test-suites/${{ matrix.socketio-version }}.ts > client.txt - name: Server output if: always() run: cat server.txt @@ -274,7 +276,7 @@ jobs: PARSER=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f2 -s) VERSION=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f1) cargo build -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER - cd e2e/adapter && CMD="cargo run -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER" node --experimental-strip-types client.ts + cd e2e/adapter && CMD="cargo run -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER" node --experimental-strip-types --test-reporter=spec client.ts - name: Server output if: always() run: cat e2e/adapter/*.log diff --git a/crates/socketioxide-redis/src/lib.rs b/crates/socketioxide-redis/src/lib.rs index 23cce2d5..2df2bb4a 100644 --- a/crates/socketioxide-redis/src/lib.rs +++ b/crates/socketioxide-redis/src/lib.rs @@ -537,6 +537,7 @@ impl CoreAdapter for CustomRedisAdapter { timeout: Option, ) -> Result { if is_local_op(self.uid, &opts) { + tracing::debug!(?opts, "broadcast with ack is local"); let (local, _) = self.local.broadcast_with_ack(packet, opts, timeout); let stream = AckStream::new_local(local); return Ok(stream); @@ -588,7 +589,9 @@ impl CoreAdapter for CustomRedisAdapter { // First get the remote stream because redis might send // the responses before subscription is done. - let stream = self.get_res::<()>(req_id, PACKET_IDX).await?; + let stream = self + .get_res::<()>(req_id, PACKET_IDX, opts.server_id) + .await?; self.send_req(req, opts.server_id).await?; let local = self.local.rooms(opts); let rooms = stream @@ -641,7 +644,9 @@ impl CoreAdapter for CustomRedisAdapter { let req_id = req.id; // First get the remote stream because redis might send // the responses before subscription is done. - let remote = self.get_res::(req_id, PACKET_IDX).await?; + let remote = self + .get_res::(req_id, PACKET_IDX, opts.server_id) + .await?; self.send_req(req, opts.server_id).await?; let local = self.local.fetch_sockets(opts); @@ -909,8 +914,14 @@ impl CustomRedisAdapter { &self, req_id: Sid, response_idx: u8, + target_uid: Option, ) -> Result>, Error> { - let remote_serv_cnt = self.server_count().await?.saturating_sub(1) as usize; + // Check for specific target node + let remote_serv_cnt = if target_uid.is_none() { + self.server_count().await?.saturating_sub(1) as usize + } else { + 1 + }; let (tx, rx) = mpsc::channel(std::cmp::max(remote_serv_cnt, 1)); self.responses.lock().unwrap().insert(req_id, tx); let stream = MessageStream::new(rx) @@ -946,11 +957,17 @@ impl CustomRedisAdapter { /// sent to the current server. #[inline] fn is_local_op(uid: Uid, opts: &BroadcastOptions) -> bool { - opts.has_flag(BroadcastFlags::Local) + if opts.has_flag(BroadcastFlags::Local) || (!opts.has_flag(BroadcastFlags::Broadcast) && opts.server_id == Some(uid) && opts.rooms.is_empty() && opts.sid.is_some()) + { + tracing::debug!(?opts, "operation is local"); + true + } else { + false + } } /// Checks if the namespace path is valid diff --git a/crates/socketioxide-redis/src/stream.rs b/crates/socketioxide-redis/src/stream.rs index b0580b9a..1853c734 100644 --- a/crates/socketioxide-redis/src/stream.rs +++ b/crates/socketioxide-redis/src/stream.rs @@ -139,13 +139,18 @@ where S: Stream> + FusedStream, { type Item = AckStreamItem; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { match self.as_mut().project().local.poll_next(cx) { - Poll::Pending | Poll::Ready(None) => self.poll_remote(cx), + Poll::Pending => match self.poll_remote(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) => Poll::Pending, + }, Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) => self.poll_remote(cx), } } + fn size_hint(&self) -> (usize, Option) { let (lower, upper) = self.local.size_hint(); (lower, upper.map(|upper| upper + self.total_ack_cnt)) diff --git a/crates/socketioxide/src/ack.rs b/crates/socketioxide/src/ack.rs index de281634..40c7bfd5 100644 --- a/crates/socketioxide/src/ack.rs +++ b/crates/socketioxide/src/ack.rs @@ -255,9 +255,7 @@ impl Future for AckStream { match self.project().inner.poll_next(cx) { Poll::Ready(Some(v)) => Poll::Ready(map_ack_response(v.1, parser)), Poll::Pending => Poll::Pending, - Poll::Ready(None) => { - unreachable!("stream should at least yield 1 value") - } + Poll::Ready(None) => Poll::Ready(Err(AckError::Timeout)), } } } diff --git a/crates/socketioxide/src/client.rs b/crates/socketioxide/src/client.rs index 6205186d..504328aa 100644 --- a/crates/socketioxide/src/client.rs +++ b/crates/socketioxide/src/client.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; use std::collections::HashMap; +use std::fmt; use std::sync::{Arc, Mutex, OnceLock, RwLock}; use bytes::Bytes; @@ -212,7 +213,6 @@ impl Client { } } -#[derive(Debug)] pub struct SocketData { pub parser_state: ParserState, /// Channel used to notify the socket that it has been connected to a namespace for v5 @@ -230,6 +230,14 @@ impl Default for SocketData { } } } +impl fmt::Debug for SocketData { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SocketData") + .field("parser_state", &self.parser_state) + .field("connect_recv_tx", &self.connect_recv_tx) + .finish() + } +} impl EngineIoHandler for Client { type Data = SocketData; diff --git a/crates/socketioxide/src/extract/socket.rs b/crates/socketioxide/src/extract/socket.rs index aeb05b24..e5a14cde 100644 --- a/crates/socketioxide/src/extract/socket.rs +++ b/crates/socketioxide/src/extract/socket.rs @@ -1,4 +1,5 @@ use std::convert::Infallible; +use std::fmt; use std::sync::Arc; use crate::{ @@ -14,7 +15,6 @@ use socketioxide_core::{errors::SocketError, packet::Packet, parser::Parse, Valu /// /// It is generic over the [`Adapter`] type. If you plan to use it with another adapter than the default, /// make sure to have a handler that is [generic over the adapter type](crate#adapters). -#[derive(Debug)] pub struct SocketRef(Arc>); impl FromConnectParts for SocketRef { @@ -75,13 +75,17 @@ impl SocketRef { self.0.disconnect() } } +impl fmt::Debug for SocketRef { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.0, f) + } +} /// An Extractor to send an ack response corresponding to the current event. /// If the client sent a normal message without expecting an ack, the ack callback will do nothing. /// /// It is generic over the [`Adapter`] type. If you plan to use it with another adapter than the default, /// make sure to have a handler that is [generic over the adapter type](crate#adapters). -#[derive(Debug)] pub struct AckSender { socket: Arc>, ack_id: Option, diff --git a/crates/socketioxide/src/socket.rs b/crates/socketioxide/src/socket.rs index 8f317070..e06a0304 100644 --- a/crates/socketioxide/src/socket.rs +++ b/crates/socketioxide/src/socket.rs @@ -4,6 +4,7 @@ use std::{ borrow::Cow, collections::HashMap, fmt::{self, Debug}, + future::Future, sync::{ atomic::{AtomicBool, AtomicI64, Ordering}, Arc, Mutex, RwLock, @@ -130,7 +131,6 @@ impl<'a> PermitExt<'a> for Permit<'a> { /// A RemoteSocket is a [`Socket`] that is remotely connected on another server. /// It implements a subset of the [`Socket`] API. -#[derive(Clone)] pub struct RemoteSocket { adapter: Arc, parser: Parser, @@ -203,16 +203,19 @@ impl RemoteSocket { /// /// See [`Socket::join`] for more info. #[inline] - pub async fn join(&self, rooms: impl RoomParam) -> Result<(), A::Error> { - self.adapter.add_sockets(self.get_opts(), rooms).await + pub fn join(&self, rooms: impl RoomParam) -> impl Future> + '_ { + self.adapter.add_sockets(self.get_opts(), rooms) } /// # Remove the remote socket from the specified room(s). /// /// See [`Socket::leave`] for more info. #[inline] - pub async fn leave(&self, rooms: impl RoomParam) -> Result<(), A::Error> { - self.adapter.del_sockets(self.get_opts(), rooms).await + pub fn leave( + &self, + rooms: impl RoomParam, + ) -> impl Future> + Send + '_ { + self.adapter.del_sockets(self.get_opts(), rooms) } /// # Disconnect the remote socket from the current namespace, @@ -238,6 +241,15 @@ impl fmt::Debug for RemoteSocket { .finish() } } +impl Clone for RemoteSocket { + fn clone(&self) -> Self { + Self { + adapter: self.adapter.clone(), + parser: self.parser, + data: self.data.clone(), + } + } +} /// A error that can occur when emitting a message to a remote socket. #[derive(Debug, thiserror::Error)] diff --git a/e2e/adapter/client.ts b/e2e/adapter/client.ts index 7d152e43..ba85f58f 100644 --- a/e2e/adapter/client.ts +++ b/e2e/adapter/client.ts @@ -1,117 +1,250 @@ -import { spawn_servers, spawn_sockets, TEST, timeout } from "./fixture.ts"; +import { kill_servers, spawn_servers, spawn_sockets } from "./fixture.ts"; import assert from "assert"; +import { describe, it, after, before, beforeEach, afterEach } from "node:test"; +import { Socket } from "socket.io-client"; assert(!!process.env.CMD, "CMD env var must be set"); -// * Spawn 10 sockets on 3 servers -// * Call a `broadcast` event on each socket -// * Expect the socket to broadcast a message to all other sockets -async function broadcast() { - const sockets = await spawn_sockets([3000, 3001, 3002], 10); - for (const socket of sockets) { - let msgs: string[] = []; - const prom = new Promise((resolve) => { +describe("adapter tests", { timeout: 10000 }, () => { + before(async (ctx) => { + const servers = await spawn_servers([3000, 3001, 3002]); + (ctx as any).servers = servers; + }); + after(async (ctx) => { + kill_servers((ctx as any).servers); + await new Promise((resolve) => setTimeout(resolve, 1000)); + process.exit(0); + }); + + beforeEach(async (ctx) => { + const sockets = await spawn_sockets([3000, 3001, 3002], 10); + (ctx as any).sockets = sockets; + }); + + afterEach(async (ctx) => { + const sockets: Socket[] = (ctx as any).sockets ?? []; + for (const socket of sockets) { + socket.disconnect(); + } + }); + + describe("broadcast tests", () => { + it("should broadcast a packet sent from a socket to all other sockets", async (ctx) => { + const sockets: Socket[] = (ctx as any).sockets; for (const socket of sockets) { - socket.once("broadcast", (data: string) => { - msgs.push(data); - if (msgs.length === sockets.length) resolve(null); + let msgs: string[] = []; + const prom = new Promise((resolve) => { + for (const socket of sockets) { + socket.once("broadcast", (data: string) => { + msgs.push(data); + if (msgs.length === sockets.length) resolve(null); + }); + } }); + + socket.emit("broadcast"); + await prom; + assert.equal(Object.values(msgs).length, sockets.length); + for (const msg of msgs) { + assert.deepStrictEqual(msg, `hello from ${socket.id}`); + } } }); - socket.emit("broadcast"); - await timeout(prom); - assert.equal(Object.values(msgs).length, sockets.length); - for (const msg of msgs) { - assert.deepStrictEqual(msg, `hello from ${socket.id}`); - } - } - return sockets; -} - -async function broadcastWithAck() { - const sockets = await spawn_sockets([3000, 3001, 3002], 10); - const expected = sockets.map((s) => `ack from ${s.id}`).sort(); - for (const socket of sockets) { - socket.on("broadcast_with_ack", (_data, ack) => { - ack(`ack from ${socket.id}`); + + it("should broadcast a packet sent from a socket to all other sockets and get an ack from each socket", async (ctx) => { + const sockets: Socket[] = (ctx as any).sockets; + const expected = sockets.map((s) => `ack from ${s.id}`).sort(); + for (const socket of sockets) { + socket.on("broadcast_with_ack", (_data, ack) => { + ack(`ack from ${socket.id}`); + }); + } + for (const socket of sockets) { + const res: string[] = await socket.emitWithAck("broadcast_with_ack"); + assert.deepStrictEqual(res.sort(), expected); + } }); - } - for (const socket of sockets) { - const res: string[] = await timeout( - socket.emitWithAck("broadcast_with_ack"), - ); - assert.deepStrictEqual(res.sort(), expected); - } - return sockets; -} - -async function disconnectSocket() { - const sockets = await spawn_sockets([3000, 3001, 3002], 10); - let cnt = 0; - const prom = new Promise((resolve) => { - for (const socket of sockets) { - socket.on("disconnect", () => { - cnt++; - if (cnt === sockets.length) resolve(null); + + it("should disconnect all sockets from a given one", async (ctx) => { + const sockets: Socket[] = (ctx as any).sockets; + let cnt = 0; + const prom = new Promise((resolve) => { + for (const socket of sockets) { + socket.on("disconnect", () => { + cnt++; + if (cnt === sockets.length) resolve(null); + }); + } }); - } + sockets[0].emit("disconnect_socket"); + await prom; + for (const socket of sockets) { + assert(!socket.connected); + } + }); + + it("should fetch all socket rooms from one socket", async (ctx) => { + const sockets: Socket[] = (ctx as any).sockets; + const expected = [ + "room1", + "room2", + "room4", + "room5", + ...sockets.map((s) => s.id), + ].sort(); + for (const socket of sockets) { + const rooms: string[] = await socket.emitWithAck("rooms"); + assert.deepStrictEqual(rooms.sort(), expected); + } + }); + + it("should fetch all sockets from one socket", async (ctx) => { + type SocketData = { id: string; ns: string }; + const sockets: Socket[] = (ctx as any).sockets; + const expected = sockets + .map((socket) => ({ + id: socket.id, + ns: "/", + })) + .sort((a, b) => a.id!.localeCompare(b.id!)); + + for (const socket of sockets) { + const data: SocketData[] = await socket.emitWithAck("fetch_sockets"); + const sorted = data + ?.map((data) => ({ id: data.id, ns: data.ns })) + ?.sort((a, b) => a.id.localeCompare(b.id)); + assert.deepStrictEqual(sorted, expected); + } + }); + + it("should make all sockets join a room from one socket", async (ctx) => { + const sockets: Socket[] = (ctx as any).sockets; + const expected = [ + "room1", + "room2", + "room4", + "room5", + ...sockets.map((s) => s.id), + ]; + await sockets[0].emitWithAck("join_room"); + const rooms: string[] = await sockets[0].emitWithAck("rooms"); + assert.deepStrictEqual(rooms.sort(), [...expected, "room7"].sort()); + }); + it("should make all sockets leave a room from one socket", async (ctx) => { + const sockets: Socket[] = (ctx as any).sockets; + const expected = [ + "room1", + "room2", + "room5", + ...sockets.map((s) => s.id), + ].sort(); + + await sockets[0].emitWithAck("leave_room"); + const rooms: string[] = await sockets[0].emitWithAck("rooms"); + assert.deepStrictEqual(rooms.sort(), expected); + }); + }); + describe("remote sockets actions", () => { + it("should emit from a remote socket", async (ctx) => { + const sockets: Socket[] = (ctx as any).sockets; + + for (let i = 0; i < sockets.length; i++) { + const socket_emit = sockets[i]; + const socket_rcv = sockets[(i + 1) % sockets.length]; + const prom = new Promise((resolve) => socket_rcv.once("emit", resolve)); + await socket_emit.emitWithAck("emit_from_remote_sock", socket_rcv.id); + assert.equal(await prom, `hello from ${socket_emit.id}`); + } + }); + + it("should emit with ack from a remote socket", async (ctx) => { + const sockets: Socket[] = (ctx as any).sockets; + + for (let i = 0; i < sockets.length; i++) { + const socket_emit = sockets[i]; + const socket_rcv = sockets[(i + 1) % sockets.length]; + const prom = new Promise<[string, (ack: any) => void]>((resolve) => + socket_rcv.once("emit", (msg, ack) => resolve([msg, ack])), + ); + const ack = socket_emit.emitWithAck( + "emit_with_ack_from_remote_sock", + socket_rcv.id, + ); + const [msg, ack_sender] = await prom; + assert.equal(msg, `hello from ${socket_emit.id}`); + ack_sender(`hi from ${socket_rcv.id}`); + assert.equal(await ack, `hi from ${socket_rcv.id}`); + } + }); + + it("should join a room from a remote socket", async (ctx) => { + const sockets: Socket[] = (ctx as any).sockets; + + for (let i = 0; i < sockets.length; i++) { + const socket_emit = sockets[i]; + const socket_rcv = sockets[(i + 1) % sockets.length]; + await socket_emit.emitWithAck( + "join_room_from_remote_sock", + socket_rcv.id, + ); + const rooms = await socket_emit.emitWithAck( + "get_rooms_remote_sock", + socket_rcv.id, + ); + assert(rooms.includes(`hello from ${socket_emit.id}`)); + } + }); + + it("should leave a room from a remote socket", async (ctx) => { + const sockets: Socket[] = (ctx as any).sockets; + + for (let i = 0; i < sockets.length; i++) { + const socket_emit = sockets[i]; + const socket_rcv = sockets[(i + 1) % sockets.length]; + const rooms_before = await socket_emit.emitWithAck( + "get_rooms_remote_sock", + socket_rcv.id, + ); + assert(rooms_before.includes("room4")); + + await socket_emit.emitWithAck( + "leave_room_from_remote_sock", + socket_rcv.id, + ); + const rooms = await socket_emit.emitWithAck( + "get_rooms_remote_sock", + socket_rcv.id, + ); + assert(!rooms.includes(`room4`)); + } + }); + + it("should disconnect a remote socket", async (ctx) => { + const sockets: Socket[] = (ctx as any).sockets; + + for (let i = 1; i < sockets.length; i++) { + const socket_emit = sockets[i]; + const socket_rcv = sockets[i - 1]; + const prom = new Promise((resolve) => + socket_rcv.once("disconnect", resolve), + ); + await socket_emit.emitWithAck("disconnect_remote_sock", socket_rcv.id); + await prom; + } + }); + + it("should get rooms from remote socket", async (ctx) => { + const sockets: Socket[] = (ctx as any).sockets; + const ROOMS = ["room1", "room2", "room4", "room5"]; + for (let i = 0; i < sockets.length; i++) { + const socket_emit = sockets[i]; + const socket_rcv = sockets[(i + 1) % sockets.length]; + const rooms = await socket_emit.emitWithAck( + "get_rooms_remote_sock", + socket_rcv.id, + ); + assert.deepEqual(rooms.sort(), [...ROOMS, socket_rcv.id].sort()); + } + }); }); - sockets[0].emit("disconnect_socket"); - await timeout(prom); - for (const socket of sockets) { - assert(!socket.connected); - } -} - -async function rooms() { - const sockets = await spawn_sockets([3000, 3001, 3002], 10); - const expected = [ - "room1", - "room2", - "room4", - "room5", - ...sockets.map((s) => s.id), - ].sort(); - for (const socket of sockets) { - const rooms: string[] = await timeout(socket.emitWithAck("rooms")); - assert.deepStrictEqual(rooms.sort(), expected); - } - return sockets; -} - -// * Spawn 10 sockets on 3 servers -// * Call a `fetch_sockets` event on each socket -// * Get the list of sockets and compare it to the expected list -async function fetchSockets() { - type SocketData = { id: string; ns: string }; - const sockets = await spawn_sockets([3000, 3001, 3002], 10); - const expected = sockets - .map((socket) => ({ - id: socket.id, - ns: "/", - })) - .sort((a, b) => a.id!.localeCompare(b.id!)); - - for (const socket of sockets) { - const data: SocketData[] = await timeout( - socket.emitWithAck("fetch_sockets"), - ); - const sorted = data - ?.map((data) => ({ id: data.id, ns: data.ns })) - ?.sort((a, b) => a.id.localeCompare(b.id)); - assert.deepStrictEqual(sorted, expected); - } - - return sockets; -} - -async function main() { - await spawn_servers([3000, 3001, 3002]); - await TEST(broadcast); - await TEST(broadcastWithAck); - await TEST(fetchSockets); - await TEST(disconnectSocket); - await TEST(rooms); - process.exit(); -} -main(); +}); diff --git a/e2e/adapter/fixture.ts b/e2e/adapter/fixture.ts index 3c055f2e..1dd1af98 100644 --- a/e2e/adapter/fixture.ts +++ b/e2e/adapter/fixture.ts @@ -1,58 +1,34 @@ import io, { Socket } from "socket.io-client"; import msgpackParser from "socket.io-msgpack-parser"; -import { ChildProcess, exec, spawn } from "child_process"; +import { ChildProcess, spawn } from "child_process"; import assert from "assert"; import { open } from "fs/promises"; -export async function timeout_recv( - fn: (resolve: (value: T) => void) => any, - duration = 500, -) { - return new Promise((resolve, reject) => { - fn(resolve); - setTimeout(() => reject("timeout"), duration); - }); -} -export async function timeout( - promise: Promise, - duration = 500, -): Promise { - return new Promise((resolve, reject) => { - setTimeout(() => reject("timeout"), duration); - promise.then(resolve, reject); - }); +export async function kill_servers(servers: ChildProcess[]) { + for (const server of servers) { + server.kill(); + } } export async function spawn_servers(ports: number[]) { const args = process.env.CMD!.split(" "); - const bin = args.shift(); - const servers: [ChildProcess, number][] = []; - const logs: Record = {}; + const bin = args.shift()!; + const servers: ChildProcess[] = []; for (const port of ports) { - exec(`kill $(lsof -t -i:${port})`); - console.log("spawning server on port", port); const file = (await open(`${port}.log`, "w")).createWriteStream(); console.log(`EXEC PORT=${port} ${bin} ${args.join(" ")}`); const server = spawn(bin, args, { - shell: true, + detached: false, + timeout: 10000, env: { ...process.env, PORT: port.toString(), }, }); - logs[server.pid] = ""; server.stdout.pipe(file); server.stderr.pipe(file); + servers.push(server); } - process.on("exit", () => { - for (const [server, port] of servers) { - console.log("killing", server.pid); - server.kill(); - exec(`kill $(lsof -t -i:${port})`); - } - }); - process.on("SIGINT", () => process.exit()); // catch ctrl-c - process.on("SIGTERM", () => process.exit()); // catch kill await new Promise((resolve) => setTimeout(resolve, 1000)); return servers; } @@ -60,29 +36,17 @@ export async function spawn_servers(ports: number[]) { // Spawn a number of distributed sockets on a list of ports export async function spawn_sockets(ports: number[], len: number) { let sockets: Socket[] = []; - const parser = process.env.CMD.includes("msgpack") ? msgpackParser : null; + const parser = process.env.CMD?.includes("msgpack") ? msgpackParser : null; for (let i = 0; i < len; i++) { const socket = io(`http://localhost:${ports[i % ports.length]}`, { parser, }); assert( - await timeout_recv((resolve) => - socket.on("connect", () => resolve(true)), + await new Promise((resolve) => + socket.once("connect", () => resolve(true)), ), ); sockets.push(socket); } return sockets; } -export async function TEST( - fn: () => Promise, -): Promise { - console.log(`RUN ${fn.name}`); - const sockets = await fn(); - if (sockets) { - for (const socket of sockets) { - socket.disconnect(); - } - } - console.log(`OK ${fn.name}`); -} diff --git a/e2e/adapter/package.json b/e2e/adapter/package.json index 9a3a1624..2ab5b5c9 100644 --- a/e2e/adapter/package.json +++ b/e2e/adapter/package.json @@ -4,7 +4,6 @@ "dependencies": { "@types/node": "^22", "socket.io-client": "^4.8.1", - "socket.io-msgpack-parser": "^3.0.2", - "ts-node": "^10.9.2" + "socket.io-msgpack-parser": "^3.0.2" } } diff --git a/e2e/adapter/src/lib.rs b/e2e/adapter/src/lib.rs index 3741ab69..9c7e4fe2 100644 --- a/e2e/adapter/src/lib.rs +++ b/e2e/adapter/src/lib.rs @@ -12,17 +12,33 @@ use futures_util::StreamExt; use socketioxide::{ adapter::Adapter, - extract::{AckSender, SocketRef}, + extract::{AckSender, Data, SocketRef}, + socket::Sid, SocketIo, }; pub async fn handler(s: SocketRef) { s.join(["room1", "room2", "room4", "room5"]); s.join(s.id); + + // "Broadcast" tests s.on("broadcast", broadcast); s.on("fetch_sockets", fetch_sockets); s.on("broadcast_with_ack", broadcast_with_ack); s.on("disconnect_socket", disconnect_socket); + s.on("join_room", join_room); + s.on("leave_room", leave_room); s.on("rooms", rooms); + + // Remote socket tests + s.on("emit_from_remote_sock", emit_from_remote_sock); + s.on( + "emit_with_ack_from_remote_sock", + emit_with_ack_from_remote_sock, + ); + s.on("join_room_from_remote_sock", join_room_from_remote_sock); + s.on("leave_room_from_remote_sock", leave_room_from_remote_sock); + s.on("get_rooms_remote_sock", get_rooms_remote_sock); + s.on("disconnect_remote_sock", disconnect_remote_sock); } async fn broadcast(io: SocketIo, s: SocketRef) { @@ -53,7 +69,92 @@ async fn fetch_sockets(io: SocketIo, ack: AckSender) { async fn disconnect_socket(io: SocketIo) { io.disconnect().await.unwrap(); } +async fn join_room(io: SocketIo, ack: AckSender) { + io.join("room7").await.unwrap(); + ack.send(&()).unwrap(); +} +async fn leave_room(io: SocketIo, ack: AckSender) { + io.leave("room4").await.unwrap(); + ack.send(&()).unwrap(); +} async fn rooms(io: SocketIo, ack: AckSender) { let rooms = io.rooms().await.unwrap(); ack.send(&rooms).unwrap(); } + +async fn emit_from_remote_sock( + io: SocketIo, + s: SocketRef, + Data(id): Data, + ack: AckSender, +) { + let socks = io.to(id).fetch_sockets().await.unwrap(); + assert_eq!(socks.len(), 1); + let sock = socks.first().unwrap(); + sock.emit("emit", &format!("hello from {}", s.id)) + .await + .unwrap(); + ack.send(&()).unwrap(); +} +async fn emit_with_ack_from_remote_sock( + io: SocketIo, + s: SocketRef, + Data(id): Data, + ack: AckSender, +) { + let socks = io.to(id).fetch_sockets().await.unwrap(); + assert_eq!(socks.len(), 1); + let sock = socks.first().unwrap(); + let res: String = sock + .emit_with_ack("emit", &format!("hello from {}", s.id)) + .await + .unwrap() + .await + .unwrap(); + ack.send(&res).unwrap(); +} +async fn join_room_from_remote_sock( + io: SocketIo, + s: SocketRef, + Data(id): Data, + ack: AckSender, +) { + let socks = io.to(id).fetch_sockets().await.unwrap(); + assert_eq!(socks.len(), 1); + let sock = socks.first().unwrap(); + sock.join(format!("hello from {}", s.id)).await.unwrap(); + ack.send(&()).unwrap(); +} +async fn leave_room_from_remote_sock( + io: SocketIo, + Data(id): Data, + ack: AckSender, +) { + let socks = io.to(id).fetch_sockets().await.unwrap(); + assert_eq!(socks.len(), 1); + let sock = socks.first().unwrap(); + sock.leave("room4").await.unwrap(); + ack.send(&()).unwrap(); +} +async fn disconnect_remote_sock( + io: SocketIo, + Data(id): Data, + ack: AckSender, +) { + let socks = io.to(id).fetch_sockets().await.unwrap(); + assert_eq!(socks.len(), 1); + let sock = socks.first().unwrap(); + sock.clone().disconnect().await.unwrap(); + ack.send(&()).unwrap(); +} +async fn get_rooms_remote_sock( + io: SocketIo, + Data(id): Data, + ack: AckSender, +) { + let socks = io.to(id).fetch_sockets().await.unwrap(); + assert_eq!(socks.len(), 1); + let sock = socks.first().unwrap(); + let rooms = sock.rooms().await.unwrap(); + ack.send(&rooms).unwrap(); +} diff --git a/e2e/engineioxide/test-suites/v3.ts b/e2e/engineioxide/test-suites/v3.ts index ed339dd7..8be791f1 100644 --- a/e2e/engineioxide/test-suites/v3.ts +++ b/e2e/engineioxide/test-suites/v3.ts @@ -417,7 +417,7 @@ describe("Engine.IO protocol", () => { it("closes the session upon ping timeout", async () => { const sid = await initLongPollingSession(); - await sleep(PING_INTERVAL + PING_TIMEOUT); + await sleep(PING_INTERVAL + PING_TIMEOUT + 100); const pushResponse = await fetch( `${POLLING_URL}/engine.io/?EIO=3&transport=polling&sid=${sid}`, diff --git a/e2e/socketioxide/test-suites/v4.ts b/e2e/socketioxide/test-suites/v4.ts index f3bd127f..c874d5c8 100644 --- a/e2e/socketioxide/test-suites/v4.ts +++ b/e2e/socketioxide/test-suites/v4.ts @@ -220,7 +220,7 @@ describe("Engine.IO protocol", () => { it("should close the session upon ping timeout", async () => { const sid = await initLongPollingSession(); - await sleep(PING_INTERVAL + PING_TIMEOUT); + await sleep(PING_INTERVAL + PING_TIMEOUT + 100); const pollResponse = await fetch( `${POLLING_URL}/socket.io/?EIO=3&transport=polling&sid=${sid}`,