Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test redis adapter #439

Merged
merged 9 commits into from
Jan 16, 2025
8 changes: 5 additions & 3 deletions .github/workflows/github-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ jobs:
run: |
cd e2e/engineioxide/test-suites && npm install && cd ../../..
cargo build -p engineioxide-e2e --bin engineioxide-e2e -F ${{ matrix.engineio-version }} -r
cargo run -p engineioxide-e2e --bin engineioxide-e2e -F ${{ matrix.engineio-version }} -r > server.txt & node --test-reporter=spec --experimental-strip-types e2e/engineioxide/test-suites/${{ matrix.engineio-version }}.ts > client.txt
cargo run -p engineioxide-e2e --bin engineioxide-e2e -F ${{ matrix.engineio-version }} -r > server.txt && sleep 1 & \
node --test-reporter=spec --experimental-strip-types e2e/engineioxide/test-suites/${{ matrix.engineio-version }}.ts > client.txt
- name: Server output
if: always()
run: cat server.txt
Expand Down Expand Up @@ -233,7 +234,8 @@ jobs:
VERSION=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f1)
cd e2e/socketioxide/test-suites && npm install && cd ../../..
cargo build -p socketioxide-e2e --bin socketioxide-e2e -F $VERSION,$PARSER -r
cargo run -p socketioxide-e2e --bin socketioxide-e2e -F $VERSION,$PARSER -r > server.txt & node --test-reporter=spec --experimental-strip-types e2e/socketioxide/test-suites/${{ matrix.socketio-version }}.ts > client.txt
cargo run -p socketioxide-e2e --bin socketioxide-e2e -F $VERSION,$PARSER -r > server.txt && sleep 1 & \
node --test-reporter=spec --experimental-strip-types e2e/socketioxide/test-suites/${{ matrix.socketio-version }}.ts > client.txt
- name: Server output
if: always()
run: cat server.txt
Expand Down Expand Up @@ -274,7 +276,7 @@ jobs:
PARSER=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f2 -s)
VERSION=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f1)
cargo build -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER
cd e2e/adapter && CMD="cargo run -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER" node --experimental-strip-types client.ts
cd e2e/adapter && CMD="cargo run -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER" node --experimental-strip-types --test-reporter=spec client.ts
- name: Server output
if: always()
run: cat e2e/adapter/*.log
Expand Down
25 changes: 21 additions & 4 deletions crates/socketioxide-redis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ impl<E: SocketEmitter, R: Driver> CoreAdapter<E> for CustomRedisAdapter<E, R> {
timeout: Option<Duration>,
) -> Result<Self::AckStream, Self::Error> {
if is_local_op(self.uid, &opts) {
tracing::debug!(?opts, "broadcast with ack is local");
let (local, _) = self.local.broadcast_with_ack(packet, opts, timeout);
let stream = AckStream::new_local(local);
return Ok(stream);
Expand Down Expand Up @@ -588,7 +589,9 @@ impl<E: SocketEmitter, R: Driver> CoreAdapter<E> for CustomRedisAdapter<E, R> {

// First get the remote stream because redis might send
// the responses before subscription is done.
let stream = self.get_res::<()>(req_id, PACKET_IDX).await?;
let stream = self
.get_res::<()>(req_id, PACKET_IDX, opts.server_id)
.await?;
self.send_req(req, opts.server_id).await?;
let local = self.local.rooms(opts);
let rooms = stream
Expand Down Expand Up @@ -641,7 +644,9 @@ impl<E: SocketEmitter, R: Driver> CoreAdapter<E> for CustomRedisAdapter<E, R> {
let req_id = req.id;
// First get the remote stream because redis might send
// the responses before subscription is done.
let remote = self.get_res::<RemoteSocketData>(req_id, PACKET_IDX).await?;
let remote = self
.get_res::<RemoteSocketData>(req_id, PACKET_IDX, opts.server_id)
.await?;

self.send_req(req, opts.server_id).await?;
let local = self.local.fetch_sockets(opts);
Expand Down Expand Up @@ -909,8 +914,14 @@ impl<E: SocketEmitter, R: Driver> CustomRedisAdapter<E, R> {
&self,
req_id: Sid,
response_idx: u8,
target_uid: Option<Uid>,
) -> Result<impl Stream<Item = Response<D>>, Error<R>> {
let remote_serv_cnt = self.server_count().await?.saturating_sub(1) as usize;
// Check for specific target node
let remote_serv_cnt = if target_uid.is_none() {
self.server_count().await?.saturating_sub(1) as usize
} else {
1
};
let (tx, rx) = mpsc::channel(std::cmp::max(remote_serv_cnt, 1));
self.responses.lock().unwrap().insert(req_id, tx);
let stream = MessageStream::new(rx)
Expand Down Expand Up @@ -946,11 +957,17 @@ impl<E: SocketEmitter, R: Driver> CustomRedisAdapter<E, R> {
/// sent to the current server.
#[inline]
fn is_local_op(uid: Uid, opts: &BroadcastOptions) -> bool {
opts.has_flag(BroadcastFlags::Local)
if opts.has_flag(BroadcastFlags::Local)
|| (!opts.has_flag(BroadcastFlags::Broadcast)
&& opts.server_id == Some(uid)
&& opts.rooms.is_empty()
&& opts.sid.is_some())
{
tracing::debug!(?opts, "operation is local");
true
} else {
false
}
}

/// Checks if the namespace path is valid
Expand Down
9 changes: 7 additions & 2 deletions crates/socketioxide-redis/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,18 @@ where
S: Stream<Item = AckStreamItem<E>> + FusedStream,
{
type Item = AckStreamItem<E>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
match self.as_mut().project().local.poll_next(cx) {
Poll::Pending | Poll::Ready(None) => self.poll_remote(cx),
Poll::Pending => match self.poll_remote(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => Poll::Pending,
},
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => self.poll_remote(cx),
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let (lower, upper) = self.local.size_hint();
(lower, upper.map(|upper| upper + self.total_ack_cnt))
Expand Down
4 changes: 1 addition & 3 deletions crates/socketioxide/src/ack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,7 @@ impl<T: DeserializeOwned, A: Adapter> Future for AckStream<T, A> {
match self.project().inner.poll_next(cx) {
Poll::Ready(Some(v)) => Poll::Ready(map_ack_response(v.1, parser)),
Poll::Pending => Poll::Pending,
Poll::Ready(None) => {
unreachable!("stream should at least yield 1 value")
}
Poll::Ready(None) => Poll::Ready(Err(AckError::Timeout)),
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion crates/socketioxide/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex, OnceLock, RwLock};

use bytes::Bytes;
Expand Down Expand Up @@ -212,7 +213,6 @@ impl<A: Adapter> Client<A> {
}
}

#[derive(Debug)]
pub struct SocketData<A: Adapter> {
pub parser_state: ParserState,
/// Channel used to notify the socket that it has been connected to a namespace for v5
Expand All @@ -230,6 +230,14 @@ impl<A: Adapter> Default for SocketData<A> {
}
}
}
impl<A: Adapter> fmt::Debug for SocketData<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SocketData")
.field("parser_state", &self.parser_state)
.field("connect_recv_tx", &self.connect_recv_tx)
.finish()
}
}

impl<A: Adapter> EngineIoHandler for Client<A> {
type Data = SocketData<A>;
Expand Down
8 changes: 6 additions & 2 deletions crates/socketioxide/src/extract/socket.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::convert::Infallible;
use std::fmt;
use std::sync::Arc;

use crate::{
Expand All @@ -14,7 +15,6 @@ use socketioxide_core::{errors::SocketError, packet::Packet, parser::Parse, Valu
///
/// It is generic over the [`Adapter`] type. If you plan to use it with another adapter than the default,
/// make sure to have a handler that is [generic over the adapter type](crate#adapters).
#[derive(Debug)]
pub struct SocketRef<A: Adapter = LocalAdapter>(Arc<Socket<A>>);

impl<A: Adapter> FromConnectParts<A> for SocketRef<A> {
Expand Down Expand Up @@ -75,13 +75,17 @@ impl<A: Adapter> SocketRef<A> {
self.0.disconnect()
}
}
impl<A: Adapter> fmt::Debug for SocketRef<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}

/// An Extractor to send an ack response corresponding to the current event.
/// If the client sent a normal message without expecting an ack, the ack callback will do nothing.
///
/// It is generic over the [`Adapter`] type. If you plan to use it with another adapter than the default,
/// make sure to have a handler that is [generic over the adapter type](crate#adapters).
#[derive(Debug)]
pub struct AckSender<A: Adapter = LocalAdapter> {
socket: Arc<Socket<A>>,
ack_id: Option<i64>,
Expand Down
22 changes: 17 additions & 5 deletions crates/socketioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
borrow::Cow,
collections::HashMap,
fmt::{self, Debug},
future::Future,
sync::{
atomic::{AtomicBool, AtomicI64, Ordering},
Arc, Mutex, RwLock,
Expand Down Expand Up @@ -130,7 +131,6 @@ impl<'a> PermitExt<'a> for Permit<'a> {

/// A RemoteSocket is a [`Socket`] that is remotely connected on another server.
/// It implements a subset of the [`Socket`] API.
#[derive(Clone)]
pub struct RemoteSocket<A> {
adapter: Arc<A>,
parser: Parser,
Expand Down Expand Up @@ -203,16 +203,19 @@ impl<A: Adapter> RemoteSocket<A> {
///
/// See [`Socket::join`] for more info.
#[inline]
pub async fn join(&self, rooms: impl RoomParam) -> Result<(), A::Error> {
self.adapter.add_sockets(self.get_opts(), rooms).await
pub fn join(&self, rooms: impl RoomParam) -> impl Future<Output = Result<(), A::Error>> + '_ {
self.adapter.add_sockets(self.get_opts(), rooms)
}

/// # Remove the remote socket from the specified room(s).
///
/// See [`Socket::leave`] for more info.
#[inline]
pub async fn leave(&self, rooms: impl RoomParam) -> Result<(), A::Error> {
self.adapter.del_sockets(self.get_opts(), rooms).await
pub fn leave(
&self,
rooms: impl RoomParam,
) -> impl Future<Output = Result<(), A::Error>> + Send + '_ {
self.adapter.del_sockets(self.get_opts(), rooms)
}

/// # Disconnect the remote socket from the current namespace,
Expand All @@ -238,6 +241,15 @@ impl<A> fmt::Debug for RemoteSocket<A> {
.finish()
}
}
impl<A> Clone for RemoteSocket<A> {
fn clone(&self) -> Self {
Self {
adapter: self.adapter.clone(),
parser: self.parser,
data: self.data.clone(),
}
}
}

/// A error that can occur when emitting a message to a remote socket.
#[derive(Debug, thiserror::Error)]
Expand Down
Loading
Loading