Replies: 2 comments
-
Custom gRPC Server TransportI got close. On the server side, it's basically working. I do Server::builder()
.add_service(...)
.serve_with_incoming(subscription) The subscription is a type is a stream of adapters pub struct DispatcherSubscription {
pub next: Receiver<Result<(SendStream, RecvStream), quinn::ConnectionError>>,
}
impl Stream for DispatcherSubscription {
type Item = Result<Adapter, quinn::ConnectionError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some(result)) = self.get_mut().next.poll_recv(cx) {
match result {
Ok((tx, rx)) => Poll::Ready(Some(Ok(Adapter { tx, rx }))),
Err(e) => Poll::Ready(Some(Err(e))),
}
} else {
Poll::Pending
}
}
} Where the adapter is just a wrapper around both the read and write handles of a bi-directional quic stream. The wrapper then implements pub struct Adapter {
pub tx: SendStream,
pub rx: RecvStream,
}
impl AsyncRead for Adapter {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
AsyncRead::poll_read(Pin::new(&mut self.get_mut().rx), cx, buf)
}
}
... Custom gRPC Client TransportThe gRPC client is proving to be much more difficult. I'm basically stuck in a catch 22. The main problem is that the returned async fn create_grpc_client(conn: &quinn::Connection) -> ClientGreeterClient<Channel> {
let channel = Endpoint::try_from("http://[::]:50051".parse::<Uri>().unwrap())
.unwrap()
.connect_with_connector(service_fn(move |_| async move {
let (send, recv) = conn.open_bi().await.unwrap();
Ok::<Adapter, quinn::ConnectionError>(Adapter { tx: send, rx: recv })
}))
.await
.unwrap();
ClientGreeterClient::new(channel)
} I can't move the quic connection to the closure either because the closure is error[E0525]: expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce`
--> src/server.rs:54:44
|
54 | .connect_with_connector(service_fn(move |_| async move {
| ---------------------- ^^^^^^^^ this closure implements `FnOnce`, not `FnMut`
| |
| the requirement to implement `FnMut` derives from here
55 | let (send, recv) = conn.open_bi().await.unwrap();
| ---- closure is `FnOnce` because it moves the variable `conn` out of its environment So at this point I'm not sure what to do about the client-side of the gRPC custom transport. All I really want to do is to provide an |
Beta Was this translation helpful? Give feedback.
-
Looks like trpc supports this exact thing |
Beta Was this translation helpful? Give feedback.
-
Can I create a gRPC server using a custom transport? The go-grpc library supports this by accepting a
net.Listener
which is an interface that I can implement for any sort of transport that I need.I'm trying to run a server->client gRPC server (server calls client RPCs) over an existing quic connection.
Beta Was this translation helpful? Give feedback.
All reactions