Skip to content

Commit

Permalink
Merge pull request #155 from Totodore/fix-hyper-util
Browse files Browse the repository at this point in the history
fix: implement manually `tokio_io` module for `hyper v1`
  • Loading branch information
Totodore authored Nov 12, 2023
2 parents 6980df6 + 8f24525 commit 596442e
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 25 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.7.0
## socketioxide
* Fix [#154](https://github.com/Totodore/socketioxide/issues/154), build was broken when using the `hyper-v1` feature flag because of `hyper-util` dependencie which is not published on crates.io.

# 0.7.0
## socketioxide
* The `extensions` field on sockets has been moved to a separate optional feature flag named `extensions`
Expand Down
31 changes: 15 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ hyper-v1 = { package = "hyper", version = "1.0.0-rc.4", features = [
] }

[workspace.package]
version = "0.7.0"
version = "0.7.1"
edition = "2021"
rust-version = "1.67.0"
authors = ["Théodore Prévot <"]
Expand Down
3 changes: 1 addition & 2 deletions engineioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ unicode-segmentation = { version = "1.10.1", optional = true }
# Hyper v1.0
hyper-v1 = { workspace = true, optional = true }
http-body-v1 = { workspace = true, optional = true }
hyper-util = { git = "https://github.com/hyperium/hyper-util.git", optional = true, version = "0" }

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "parking_lot"] }
Expand All @@ -57,4 +56,4 @@ hyper = { version = "0.14.25", features = [
v3 = ["memchr", "unicode-segmentation"]
test-utils = []
tracing = ["dep:tracing"]
hyper-v1 = ["dep:hyper-v1", "dep:http-body-v1", "dep:hyper-util"]
hyper-v1 = ["dep:hyper-v1", "dep:http-body-v1"]
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ use crate::{
DisconnectReason, Socket,
};

#[cfg(feature = "hyper-v1")]
mod tokio_io;

/// Create a response for websocket upgrade
fn ws_response<B>(ws_key: &HeaderValue) -> Result<Response<ResponseBody<B>>, http::Error> {
let derived = derive_accept_key(ws_key.as_bytes());
Expand Down Expand Up @@ -82,11 +85,7 @@ pub fn new_req<R: Send + 'static, B, H: EngineIoHandler>(
#[cfg(feature = "hyper-v1")]
let res = if hyper_v1 {
// Wraps the hyper-v1 upgrade so it implement `AsyncRead` and `AsyncWrite`
Either::Left(
hyper_v1::upgrade::on(req)
.await
.map(hyper_util::rt::TokioIo::new),
)
Either::Left(hyper_v1::upgrade::on(req).await.map(tokio_io::TokioIo::new))
} else {
Either::Right(hyper::upgrade::on(req).await)
};
Expand Down
163 changes: 163 additions & 0 deletions engineioxide/src/transport/ws/tokio_io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#![allow(dead_code)]
//! This mod is a copy of https://github.com/hyperium/hyper-util/blob/master/src/rt/tokio_io.rs
//! Because the hyper-util is not yet published to crates.io
//!
//! It is an adapter from hyper v1 IO traits to tokio's IO traits.
use std::{
pin::Pin,
task::{Context, Poll},
};

/// A wrapping implementing hyper IO traits for a type that
/// implements Tokio's IO traits.
#[derive(Debug)]
#[pin_project::pin_project]
pub struct TokioIo<T> {
#[pin]
inner: T,
}

impl<T> TokioIo<T> {
/// Wrap a type implementing Tokio's IO traits.
pub fn new(inner: T) -> Self {
Self { inner }
}

/// Borrow the inner type.
pub fn inner(&self) -> &T {
&self.inner
}

/// Consume this wrapper and get the inner type.
pub fn into_inner(self) -> T {
self.inner
}
}

impl<T> hyper_v1::rt::Read for TokioIo<T>
where
T: tokio::io::AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut buf: hyper_v1::rt::ReadBufCursor<'_>,
) -> Poll<Result<(), std::io::Error>> {
let n = unsafe {
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) {
Poll::Ready(Ok(())) => tbuf.filled().len(),
other => return other,
}
};

unsafe {
buf.advance(n);
}
Poll::Ready(Ok(()))
}
}

impl<T> hyper_v1::rt::Write for TokioIo<T>
where
T: tokio::io::AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
}

fn is_write_vectored(&self) -> bool {
tokio::io::AsyncWrite::is_write_vectored(&self.inner)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
}
}

impl<T> tokio::io::AsyncRead for TokioIo<T>
where
T: hyper_v1::rt::Read,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
tbuf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<Result<(), std::io::Error>> {
//let init = tbuf.initialized().len();
let filled = tbuf.filled().len();
let sub_filled = unsafe {
let mut buf = hyper_v1::rt::ReadBuf::uninit(tbuf.unfilled_mut());

match hyper_v1::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) {
Poll::Ready(Ok(())) => buf.filled().len(),
other => return other,
}
};

let n_filled = filled + sub_filled;
// At least sub_filled bytes had to have been initialized.
let n_init = sub_filled;
unsafe {
tbuf.assume_init(n_init);
tbuf.set_filled(n_filled);
}

Poll::Ready(Ok(()))
}
}

impl<T> tokio::io::AsyncWrite for TokioIo<T>
where
T: hyper_v1::rt::Write,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
hyper_v1::rt::Write::poll_write(self.project().inner, cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
hyper_v1::rt::Write::poll_flush(self.project().inner, cx)
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
hyper_v1::rt::Write::poll_shutdown(self.project().inner, cx)
}

fn is_write_vectored(&self) -> bool {
hyper_v1::rt::Write::is_write_vectored(&self.inner)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
hyper_v1::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
}
}
2 changes: 1 addition & 1 deletion socketioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readme = "../README.md"


[dependencies]
engineioxide = { path = "../engineioxide", version = "0.7.0" }
engineioxide = { path = "../engineioxide", version = "0.7.1" }
futures.workspace = true
tokio = { workspace = true, features = ["rt"] }
serde.workspace = true
Expand Down

0 comments on commit 596442e

Please sign in to comment.