From 97d15edc39f31ea52e6c69a81290463ff8b53290 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 19 Jan 2025 23:17:16 +0100 Subject: [PATCH] Revert "test(engineio): mock http/ws connections (#444)" This reverts commit 88dc4df2b5c22757d7ad6030bb0c5b376368c1d0. --- crates/engineioxide/src/service/mod.rs | 23 --- crates/engineioxide/src/transport/ws.rs | 2 +- .../engineioxide/tests/disconnect_reason.rs | 42 ++-- crates/engineioxide/tests/fixture.rs | 179 +++++++----------- 4 files changed, 86 insertions(+), 160 deletions(-) diff --git a/crates/engineioxide/src/service/mod.rs b/crates/engineioxide/src/service/mod.rs index af9b6a27..261fbb1b 100644 --- a/crates/engineioxide/src/service/mod.rs +++ b/crates/engineioxide/src/service/mod.rs @@ -159,29 +159,6 @@ where } } -#[cfg(feature = "__test_harness")] -#[doc(hidden)] -impl EngineIoService -where - H: EngineIoHandler, -{ - /// Create a new engine.io over websocket through a raw stream. - /// Mostly used for testing. - pub fn ws_init( - &self, - conn: S, - protocol: ProtocolVersion, - sid: Option, - req_data: http::request::Parts, - ) -> impl std::future::Future> - where - S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, - { - let engine = self.engine.clone(); - crate::transport::ws::on_init(engine, conn, protocol, sid, req_data) - } -} - /// A MakeService that always returns a clone of the [`EngineIoService`] it was created with. pub struct MakeEngineIoService { svc: EngineIoService, diff --git a/crates/engineioxide/src/transport/ws.rs b/crates/engineioxide/src/transport/ws.rs index a7acdcf7..f5a13978 100644 --- a/crates/engineioxide/src/transport/ws.rs +++ b/crates/engineioxide/src/transport/ws.rs @@ -100,7 +100,7 @@ pub fn new_req( /// Sends an open packet if it is not an upgrade from a polling request /// /// Read packets from the websocket and handle them, it will block until the connection is closed -pub async fn on_init( +async fn on_init( engine: Arc>, conn: S, protocol: ProtocolVersion, diff --git a/crates/engineioxide/tests/disconnect_reason.rs b/crates/engineioxide/tests/disconnect_reason.rs index 394d66b1..b29ab819 100644 --- a/crates/engineioxide/tests/disconnect_reason.rs +++ b/crates/engineioxide/tests/disconnect_reason.rs @@ -18,10 +18,10 @@ use tokio::sync::mpsc; mod fixture; -use fixture::{create_server, create_ws_connection, send_req}; +use fixture::{create_server, send_req}; use tokio_tungstenite::tungstenite::Message; -use crate::fixture::create_polling_connection; +use crate::fixture::{create_polling_connection, create_ws_connection}; #[derive(Debug, Clone)] struct MyHandler { @@ -53,8 +53,8 @@ impl EngineIoHandler for MyHandler { #[tokio::test] pub async fn polling_heartbeat_timeout() { let (disconnect_tx, mut rx) = mpsc::channel(10); - let mut svc = create_server(MyHandler { disconnect_tx }).await; - create_polling_connection(&mut svc).await; + create_server(MyHandler { disconnect_tx }, 1234).await; + create_polling_connection(1234).await; let data = tokio::time::timeout(Duration::from_millis(500), rx.recv()) .await @@ -67,8 +67,8 @@ pub async fn polling_heartbeat_timeout() { #[tokio::test] pub async fn ws_heartbeat_timeout() { let (disconnect_tx, mut rx) = mpsc::channel(10); - let mut svc = create_server(MyHandler { disconnect_tx }).await; - let _stream = create_ws_connection(&mut svc).await; + create_server(MyHandler { disconnect_tx }, 12344).await; + let _stream = create_ws_connection(12344).await; let data = tokio::time::timeout(Duration::from_millis(500), rx.recv()) .await @@ -81,11 +81,11 @@ pub async fn ws_heartbeat_timeout() { #[tokio::test] pub async fn polling_transport_closed() { let (disconnect_tx, mut rx) = mpsc::channel(10); - let mut svc = create_server(MyHandler { disconnect_tx }).await; - let sid = create_polling_connection(&mut svc).await; + create_server(MyHandler { disconnect_tx }, 1235).await; + let sid = create_polling_connection(1235).await; send_req( - &mut svc, + 1235, format!("transport=polling&sid={sid}"), http::Method::POST, Some("1".into()), @@ -103,8 +103,8 @@ pub async fn polling_transport_closed() { #[tokio::test] pub async fn ws_transport_closed() { let (disconnect_tx, mut rx) = mpsc::channel(10); - let mut svc = create_server(MyHandler { disconnect_tx }).await; - let mut stream = create_ws_connection(&mut svc).await; + create_server(MyHandler { disconnect_tx }, 12345).await; + let mut stream = create_ws_connection(12345).await; stream.send(Message::Text("1".into())).await.unwrap(); @@ -119,10 +119,10 @@ pub async fn ws_transport_closed() { #[tokio::test] pub async fn multiple_http_polling() { let (disconnect_tx, mut rx) = mpsc::channel(10); - let mut svc = create_server(MyHandler { disconnect_tx }).await; - let sid = create_polling_connection(&mut svc).await; + create_server(MyHandler { disconnect_tx }, 1236).await; + let sid = create_polling_connection(1236).await; send_req( - &mut svc, + 1236, format!("transport=polling&sid={sid}"), http::Method::GET, None, @@ -131,13 +131,13 @@ pub async fn multiple_http_polling() { tokio::spawn(futures_util::future::join_all(vec![ send_req( - &mut svc, + 1236, format!("transport=polling&sid={sid}"), http::Method::GET, None, ), send_req( - &mut svc, + 1236, format!("transport=polling&sid={sid}"), http::Method::GET, None, @@ -155,10 +155,10 @@ pub async fn multiple_http_polling() { #[tokio::test] pub async fn polling_packet_parsing() { let (disconnect_tx, mut rx) = mpsc::channel(10); - let mut svc = create_server(MyHandler { disconnect_tx }).await; - let sid = create_polling_connection(&mut svc).await; + create_server(MyHandler { disconnect_tx }, 1237).await; + let sid = create_polling_connection(1237).await; send_req( - &mut svc, + 1237, format!("transport=polling&sid={sid}"), http::Method::POST, Some("aizdunazidaubdiz".into()), @@ -176,8 +176,8 @@ pub async fn polling_packet_parsing() { #[tokio::test] pub async fn ws_packet_parsing() { let (disconnect_tx, mut rx) = mpsc::channel(10); - let mut svc = create_server(MyHandler { disconnect_tx }).await; - let mut stream = create_ws_connection(&mut svc).await; + create_server(MyHandler { disconnect_tx }, 12347).await; + let mut stream = create_ws_connection(12347).await; stream .send(Message::Text("aizdunazidaubdiz".into())) .await diff --git a/crates/engineioxide/tests/fixture.rs b/crates/engineioxide/tests/fixture.rs index 08bd6fd1..42b9a25c 100644 --- a/crates/engineioxide/tests/fixture.rs +++ b/crates/engineioxide/tests/fixture.rs @@ -1,30 +1,21 @@ use std::{ collections::VecDeque, - future::Future, - io, - pin::Pin, + net::{IpAddr, Ipv4Addr, SocketAddr}, sync::Arc, - task::{Context, Poll}, time::Duration, }; -use bytes::{BufMut, Bytes}; -use engineioxide::{ - config::EngineIoConfig, handler::EngineIoHandler, service::EngineIoService, sid::Sid, - ProtocolVersion, -}; +use engineioxide::{config::EngineIoConfig, handler::EngineIoHandler, service::EngineIoService}; use http::Request; use http_body_util::{BodyExt, Either, Empty, Full}; -use serde::{Deserialize, Serialize}; -use tokio::{ - io::{AsyncRead, AsyncWrite, ReadBuf}, - sync::mpsc, -}; -use tokio_tungstenite::{ - tungstenite::{handshake::client::generate_key, protocol::Role}, - WebSocketStream, +use hyper::server::conn::http1; +use hyper_util::{ + client::legacy::Client, + rt::{TokioExecutor, TokioIo}, }; -use tower_service::Service; +use serde::{Deserialize, Serialize}; +use tokio::net::{TcpListener, TcpStream}; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; /// An OpenPacket is used to initiate a connection #[derive(Debug, Serialize, Deserialize, PartialEq, PartialOrd)] @@ -38,12 +29,12 @@ struct OpenPacket { } /// Params should be in the form of `key1=value1&key2=value2` -pub fn send_req( - svc: &mut EngineIoService, +pub async fn send_req( + port: u16, params: String, method: http::Method, body: Option, -) -> impl Future + 'static { +) -> String { let body = match body { Some(b) => Either::Left(Full::new(VecDeque::from(b.into_bytes()))), None => Either::Right(Empty::>::new()), @@ -51,30 +42,28 @@ pub fn send_req( let req = Request::builder() .method(method) - .uri(format!("http://127.0.0.1/engine.io/?EIO=4&{}", params)) + .uri(format!( + "http://127.0.0.1:{port}/engine.io/?EIO=4&{}", + params + )) .body(body) .unwrap(); - let res = svc.call(req); - async move { - let body = res - .await - .unwrap() - .body_mut() - .collect() - .await - .unwrap() - .to_bytes(); - String::from_utf8(body.to_vec()) - .unwrap() - .chars() - .skip(1) - .collect() - } + let mut res = Client::builder(TokioExecutor::new()) + .build_http() + .request(req) + .await + .unwrap(); + let body = res.body_mut().collect().await.unwrap().to_bytes(); + String::from_utf8(body.to_vec()) + .unwrap() + .chars() + .skip(1) + .collect() } -pub async fn create_polling_connection(svc: &mut EngineIoService) -> String { +pub async fn create_polling_connection(port: u16) -> String { let body = send_req( - svc, + port, "transport=polling".to_string(), http::Method::GET, None, @@ -83,88 +72,48 @@ pub async fn create_polling_connection(svc: &mut EngineIoSer let open_packet: OpenPacket = serde_json::from_str(&body).unwrap(); open_packet.sid } -pub async fn create_ws_connection( - svc: &mut EngineIoService, -) -> WebSocketStream { - new_ws_mock_conn(svc, ProtocolVersion::V4, None).await -} - -pub struct StreamImpl(mpsc::UnboundedSender, mpsc::UnboundedReceiver); - -impl AsyncRead for StreamImpl { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - self.1.poll_recv(cx).map(|e| { - if let Some(e) = e { - buf.put(e); - } - Ok(()) - }) - } -} -impl AsyncWrite for StreamImpl { - fn poll_write( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let len = buf.len(); - self.0.send(Bytes::copy_from_slice(buf)).unwrap(); - Poll::Ready(Ok(len)) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { - self.1.close(); - Poll::Ready(Ok(())) - } -} -async fn new_ws_mock_conn( - svc: &mut EngineIoService, - protocol: ProtocolVersion, - sid: Option, -) -> WebSocketStream { - let (tx, rx) = mpsc::unbounded_channel(); - let (tx1, rx1) = mpsc::unbounded_channel(); - - let parts = Request::builder() - .method("GET") - .header("Host", "127.0.0.1") - .header("Connection", "Upgrade") - .header("Upgrade", "websocket") - .header("Sec-WebSocket-Version", "13") - .header("Sec-WebSocket-Key", generate_key()) - .uri("ws://127.0.0.1/engine.io/?EIO=4&transport=websocket") - .body(http_body_util::Empty::::new()) - .unwrap() - .into_parts() - .0; - - tokio::spawn(svc.ws_init(StreamImpl(tx, rx1), protocol, sid, parts)); - - tokio_tungstenite::WebSocketStream::from_raw_socket( - StreamImpl(tx1, rx), - Role::Client, - Default::default(), - ) +pub async fn create_ws_connection(port: u16) -> WebSocketStream> { + tokio_tungstenite::connect_async(format!( + "ws://127.0.0.1:{port}/engine.io/?EIO=4&transport=websocket" + )) .await + .unwrap() + .0 } -pub async fn create_server(handler: H) -> EngineIoService { +pub async fn create_server(handler: H, port: u16) { let config = EngineIoConfig::builder() .ping_interval(Duration::from_millis(300)) .ping_timeout(Duration::from_millis(200)) .max_payload(1e6 as u64) .build(); - EngineIoService::with_config(Arc::new(handler), config) + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); + + let svc = EngineIoService::with_config(Arc::new(handler), config); + + let listener = TcpListener::bind(&addr).await.unwrap(); + tokio::spawn(async move { + // We start a loop to continuously accept incoming connections + loop { + let (stream, _) = listener.accept().await.unwrap(); + + // Use an adapter to access something implementing `tokio::io` traits as if they implement + // `hyper::rt` IO traits. + let io = TokioIo::new(stream); + let svc = svc.clone(); + + // Spawn a tokio task to serve multiple connections concurrently + tokio::task::spawn(async move { + // Finally, we bind the incoming connection to our `hello` service + if let Err(err) = http1::Builder::new() + .serve_connection(io, svc) + .with_upgrades() + .await + { + println!("Error serving connection: {:?}", err); + } + }); + } + }); }