Skip to content

Commit

Permalink
test(adapter): improve test-suite and fix bugs (#439)
Browse files Browse the repository at this point in the history
* test(adapter): improve the adapter test-suite to cover a lot more cases.

* fix(redis): the ack stream was closed if the remote part was closed without checking the local one.
  
* fix(redis): the expected response count when waiting for responses was set to server count even when sending the request to a single server leading to waiting for the timeout before getting the response.

* fix(socketio): issue on `RemoteSocket` struct, the `impl RoomParam` lifetime was bound to the async function for `leave` and `join` fns. Therefore it was impossible to use `&'static str` whereas `RoomParam` is implemented for `&'static str`.

* fix(socketio): manual implementation for some `Clone` and `Debug` to avoid automatic bound on generic adapter.
  • Loading branch information
Totodore authored Jan 16, 2025
1 parent 1f57e8f commit 39ab18a
Show file tree
Hide file tree
Showing 13 changed files with 423 additions and 180 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/github-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ jobs:
run: |
cd e2e/engineioxide/test-suites && npm install && cd ../../..
cargo build -p engineioxide-e2e --bin engineioxide-e2e -F ${{ matrix.engineio-version }} -r
cargo run -p engineioxide-e2e --bin engineioxide-e2e -F ${{ matrix.engineio-version }} -r > server.txt & node --test-reporter=spec --experimental-strip-types e2e/engineioxide/test-suites/${{ matrix.engineio-version }}.ts > client.txt
cargo run -p engineioxide-e2e --bin engineioxide-e2e -F ${{ matrix.engineio-version }} -r > server.txt && sleep 1 & \
node --test-reporter=spec --experimental-strip-types e2e/engineioxide/test-suites/${{ matrix.engineio-version }}.ts > client.txt
- name: Server output
if: always()
run: cat server.txt
Expand Down Expand Up @@ -233,7 +234,8 @@ jobs:
VERSION=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f1)
cd e2e/socketioxide/test-suites && npm install && cd ../../..
cargo build -p socketioxide-e2e --bin socketioxide-e2e -F $VERSION,$PARSER -r
cargo run -p socketioxide-e2e --bin socketioxide-e2e -F $VERSION,$PARSER -r > server.txt & node --test-reporter=spec --experimental-strip-types e2e/socketioxide/test-suites/${{ matrix.socketio-version }}.ts > client.txt
cargo run -p socketioxide-e2e --bin socketioxide-e2e -F $VERSION,$PARSER -r > server.txt && sleep 1 & \
node --test-reporter=spec --experimental-strip-types e2e/socketioxide/test-suites/${{ matrix.socketio-version }}.ts > client.txt
- name: Server output
if: always()
run: cat server.txt
Expand Down Expand Up @@ -274,7 +276,7 @@ jobs:
PARSER=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f2 -s)
VERSION=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f1)
cargo build -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER
cd e2e/adapter && CMD="cargo run -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER" node --experimental-strip-types client.ts
cd e2e/adapter && CMD="cargo run -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER" node --experimental-strip-types --test-reporter=spec client.ts
- name: Server output
if: always()
run: cat e2e/adapter/*.log
Expand Down
25 changes: 21 additions & 4 deletions crates/socketioxide-redis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ impl<E: SocketEmitter, R: Driver> CoreAdapter<E> for CustomRedisAdapter<E, R> {
timeout: Option<Duration>,
) -> Result<Self::AckStream, Self::Error> {
if is_local_op(self.uid, &opts) {
tracing::debug!(?opts, "broadcast with ack is local");
let (local, _) = self.local.broadcast_with_ack(packet, opts, timeout);
let stream = AckStream::new_local(local);
return Ok(stream);
Expand Down Expand Up @@ -588,7 +589,9 @@ impl<E: SocketEmitter, R: Driver> CoreAdapter<E> for CustomRedisAdapter<E, R> {

// First get the remote stream because redis might send
// the responses before subscription is done.
let stream = self.get_res::<()>(req_id, PACKET_IDX).await?;
let stream = self
.get_res::<()>(req_id, PACKET_IDX, opts.server_id)
.await?;
self.send_req(req, opts.server_id).await?;
let local = self.local.rooms(opts);
let rooms = stream
Expand Down Expand Up @@ -641,7 +644,9 @@ impl<E: SocketEmitter, R: Driver> CoreAdapter<E> for CustomRedisAdapter<E, R> {
let req_id = req.id;
// First get the remote stream because redis might send
// the responses before subscription is done.
let remote = self.get_res::<RemoteSocketData>(req_id, PACKET_IDX).await?;
let remote = self
.get_res::<RemoteSocketData>(req_id, PACKET_IDX, opts.server_id)
.await?;

self.send_req(req, opts.server_id).await?;
let local = self.local.fetch_sockets(opts);
Expand Down Expand Up @@ -909,8 +914,14 @@ impl<E: SocketEmitter, R: Driver> CustomRedisAdapter<E, R> {
&self,
req_id: Sid,
response_idx: u8,
target_uid: Option<Uid>,
) -> Result<impl Stream<Item = Response<D>>, Error<R>> {
let remote_serv_cnt = self.server_count().await?.saturating_sub(1) as usize;
// Check for specific target node
let remote_serv_cnt = if target_uid.is_none() {
self.server_count().await?.saturating_sub(1) as usize
} else {
1
};
let (tx, rx) = mpsc::channel(std::cmp::max(remote_serv_cnt, 1));
self.responses.lock().unwrap().insert(req_id, tx);
let stream = MessageStream::new(rx)
Expand Down Expand Up @@ -946,11 +957,17 @@ impl<E: SocketEmitter, R: Driver> CustomRedisAdapter<E, R> {
/// sent to the current server.
#[inline]
fn is_local_op(uid: Uid, opts: &BroadcastOptions) -> bool {
opts.has_flag(BroadcastFlags::Local)
if opts.has_flag(BroadcastFlags::Local)
|| (!opts.has_flag(BroadcastFlags::Broadcast)
&& opts.server_id == Some(uid)
&& opts.rooms.is_empty()
&& opts.sid.is_some())
{
tracing::debug!(?opts, "operation is local");
true
} else {
false
}
}

/// Checks if the namespace path is valid
Expand Down
9 changes: 7 additions & 2 deletions crates/socketioxide-redis/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,18 @@ where
S: Stream<Item = AckStreamItem<E>> + FusedStream,
{
type Item = AckStreamItem<E>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
match self.as_mut().project().local.poll_next(cx) {
Poll::Pending | Poll::Ready(None) => self.poll_remote(cx),
Poll::Pending => match self.poll_remote(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => Poll::Pending,
},
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => self.poll_remote(cx),
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let (lower, upper) = self.local.size_hint();
(lower, upper.map(|upper| upper + self.total_ack_cnt))
Expand Down
4 changes: 1 addition & 3 deletions crates/socketioxide/src/ack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,7 @@ impl<T: DeserializeOwned, A: Adapter> Future for AckStream<T, A> {
match self.project().inner.poll_next(cx) {
Poll::Ready(Some(v)) => Poll::Ready(map_ack_response(v.1, parser)),
Poll::Pending => Poll::Pending,
Poll::Ready(None) => {
unreachable!("stream should at least yield 1 value")
}
Poll::Ready(None) => Poll::Ready(Err(AckError::Timeout)),
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion crates/socketioxide/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex, OnceLock, RwLock};

use bytes::Bytes;
Expand Down Expand Up @@ -212,7 +213,6 @@ impl<A: Adapter> Client<A> {
}
}

#[derive(Debug)]
pub struct SocketData<A: Adapter> {
pub parser_state: ParserState,
/// Channel used to notify the socket that it has been connected to a namespace for v5
Expand All @@ -230,6 +230,14 @@ impl<A: Adapter> Default for SocketData<A> {
}
}
}
impl<A: Adapter> fmt::Debug for SocketData<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SocketData")
.field("parser_state", &self.parser_state)
.field("connect_recv_tx", &self.connect_recv_tx)
.finish()
}
}

impl<A: Adapter> EngineIoHandler for Client<A> {
type Data = SocketData<A>;
Expand Down
8 changes: 6 additions & 2 deletions crates/socketioxide/src/extract/socket.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::convert::Infallible;
use std::fmt;
use std::sync::Arc;

use crate::{
Expand All @@ -14,7 +15,6 @@ use socketioxide_core::{errors::SocketError, packet::Packet, parser::Parse, Valu
///
/// It is generic over the [`Adapter`] type. If you plan to use it with another adapter than the default,
/// make sure to have a handler that is [generic over the adapter type](crate#adapters).
#[derive(Debug)]
pub struct SocketRef<A: Adapter = LocalAdapter>(Arc<Socket<A>>);

impl<A: Adapter> FromConnectParts<A> for SocketRef<A> {
Expand Down Expand Up @@ -75,13 +75,17 @@ impl<A: Adapter> SocketRef<A> {
self.0.disconnect()
}
}
impl<A: Adapter> fmt::Debug for SocketRef<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}

/// An Extractor to send an ack response corresponding to the current event.
/// If the client sent a normal message without expecting an ack, the ack callback will do nothing.
///
/// It is generic over the [`Adapter`] type. If you plan to use it with another adapter than the default,
/// make sure to have a handler that is [generic over the adapter type](crate#adapters).
#[derive(Debug)]
pub struct AckSender<A: Adapter = LocalAdapter> {
socket: Arc<Socket<A>>,
ack_id: Option<i64>,
Expand Down
22 changes: 17 additions & 5 deletions crates/socketioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
borrow::Cow,
collections::HashMap,
fmt::{self, Debug},
future::Future,
sync::{
atomic::{AtomicBool, AtomicI64, Ordering},
Arc, Mutex, RwLock,
Expand Down Expand Up @@ -130,7 +131,6 @@ impl<'a> PermitExt<'a> for Permit<'a> {

/// A RemoteSocket is a [`Socket`] that is remotely connected on another server.
/// It implements a subset of the [`Socket`] API.
#[derive(Clone)]
pub struct RemoteSocket<A> {
adapter: Arc<A>,
parser: Parser,
Expand Down Expand Up @@ -203,16 +203,19 @@ impl<A: Adapter> RemoteSocket<A> {
///
/// See [`Socket::join`] for more info.
#[inline]
pub async fn join(&self, rooms: impl RoomParam) -> Result<(), A::Error> {
self.adapter.add_sockets(self.get_opts(), rooms).await
pub fn join(&self, rooms: impl RoomParam) -> impl Future<Output = Result<(), A::Error>> + '_ {
self.adapter.add_sockets(self.get_opts(), rooms)
}

/// # Remove the remote socket from the specified room(s).
///
/// See [`Socket::leave`] for more info.
#[inline]
pub async fn leave(&self, rooms: impl RoomParam) -> Result<(), A::Error> {
self.adapter.del_sockets(self.get_opts(), rooms).await
pub fn leave(
&self,
rooms: impl RoomParam,
) -> impl Future<Output = Result<(), A::Error>> + Send + '_ {
self.adapter.del_sockets(self.get_opts(), rooms)
}

/// # Disconnect the remote socket from the current namespace,
Expand All @@ -238,6 +241,15 @@ impl<A> fmt::Debug for RemoteSocket<A> {
.finish()
}
}
impl<A> Clone for RemoteSocket<A> {
fn clone(&self) -> Self {
Self {
adapter: self.adapter.clone(),
parser: self.parser,
data: self.data.clone(),
}
}
}

/// A error that can occur when emitting a message to a remote socket.
#[derive(Debug, thiserror::Error)]
Expand Down
Loading

0 comments on commit 39ab18a

Please sign in to comment.