Skip to content

Commit

Permalink
return exception response on load shed info requests
Browse files Browse the repository at this point in the history
  • Loading branch information
tzemanovic committed Nov 11, 2023
1 parent bc0352e commit a41ab4b
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 30 deletions.
44 changes: 29 additions & 15 deletions src/v034/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::{

use crate::BoxError;
use tendermint::abci::MethodKind;
use tendermint::v0_34::abci::response::Exception;
use tendermint::v0_34::abci::{
ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
Expand Down Expand Up @@ -202,8 +203,6 @@ where
S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError> + Send + 'static,
S::Future: Send + 'static,
{
// XXX handle errors gracefully
// figure out how / if to return errors to tendermint
async fn run(
mut self,
read: impl AsyncReadExt + std::marker::Unpin,
Expand Down Expand Up @@ -232,49 +231,64 @@ where
};
let request = Request::try_from(proto)?;
tracing::debug!(?request, "new request");
match request.kind() {
let kind = request.kind();
match &kind {
MethodKind::Consensus => {
let request = request.try_into().expect("checked kind");
let response = self.consensus.ready().await?.call(request);
// Need to box here for type erasure
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Mempool => {
let request = request.try_into().expect("checked kind");
let response = self.mempool.ready().await?.call(request);
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Snapshot => {
let request = request.try_into().expect("checked kind");
let response = self.snapshot.ready().await?.call(request);
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Info => {
let request = request.try_into().expect("checked kind");
let response = self.info.ready().await?.call(request);
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Flush => {
// Instead of propagating Flush requests to the application,
// handle them here by awaiting all pending responses.
tracing::debug!(responses.len = responses.len(), "flushing responses");
while let Some(response) = responses.next().await {
// XXX: sometimes we might want to send errors to tendermint
// https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
while let Some((response, kind)) = responses.next().await {
let response = match response {
Ok(rsp) => rsp,
Err(err) => match kind {
// TODO: allow to fail on Snapshot?
MethodKind::Info =>
Response::Exception(Exception{error:err.to_string()}),
_ => return Err(err)
}
};
tracing::debug!(?response, "flushing response");
response_sink.send(response?.into()).await?;
response_sink.send(response.into()).await?;
}
// Now we need to tell Tendermint we've flushed responses
response_sink.send(Response::Flush.into()).await?;
}
}
}
rsp = responses.next(), if !responses.is_empty() => {
let response = rsp.expect("didn't poll when responses was empty");
// XXX: sometimes we might want to send errors to tendermint
// https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
let (rsp, kind) = rsp.expect("didn't poll when responses was empty");
let response = match rsp {
Ok(rsp) => rsp,
Err(err) => match kind {
// TODO: allow to fail on Snapshot?
MethodKind::Info =>
Response::Exception(Exception{error:err.to_string()}),
_ => return Err(err)
}
};
tracing::debug!(?response, "sending response");
response_sink.send(response?.into()).await?;
response_sink.send(response.into()).await?;
}
}
}
Expand Down
44 changes: 29 additions & 15 deletions src/v037/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::path::Path;
#[cfg(target_family = "unix")]
use tokio::net::UnixListener;

use tendermint::v0_37::abci::response::Exception;
use tendermint::v0_37::abci::{
ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
Expand Down Expand Up @@ -204,8 +205,6 @@ where
S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError> + Send + 'static,
S::Future: Send + 'static,
{
// XXX handle errors gracefully
// figure out how / if to return errors to tendermint
async fn run(
mut self,
read: impl AsyncReadExt + std::marker::Unpin,
Expand Down Expand Up @@ -234,49 +233,64 @@ where
};
let request = Request::try_from(proto)?;
tracing::debug!(?request, "new request");
match request.kind() {
let kind = request.kind();
match &kind {
MethodKind::Consensus => {
let request = request.try_into().expect("checked kind");
let response = self.consensus.ready().await?.call(request);
// Need to box here for type erasure
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Mempool => {
let request = request.try_into().expect("checked kind");
let response = self.mempool.ready().await?.call(request);
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Snapshot => {
let request = request.try_into().expect("checked kind");
let response = self.snapshot.ready().await?.call(request);
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Info => {
let request = request.try_into().expect("checked kind");
let response = self.info.ready().await?.call(request);
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Flush => {
// Instead of propagating Flush requests to the application,
// handle them here by awaiting all pending responses.
tracing::debug!(responses.len = responses.len(), "flushing responses");
while let Some(response) = responses.next().await {
// XXX: sometimes we might want to send errors to tendermint
// https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
while let Some((response, kind)) = responses.next().await {
let response = match response {
Ok(rsp) => rsp,
Err(err) => match kind {
// TODO: allow to fail on Snapshot?
MethodKind::Info =>
Response::Exception(Exception{error:err.to_string()}),
_ => return Err(err)
}
};
tracing::debug!(?response, "flushing response");
response_sink.send(response?.into()).await?;
response_sink.send(response.into()).await?;
}
// Now we need to tell Tendermint we've flushed responses
response_sink.send(Response::Flush.into()).await?;
}
}
}
rsp = responses.next(), if !responses.is_empty() => {
let response = rsp.expect("didn't poll when responses was empty");
// XXX: sometimes we might want to send errors to tendermint
// https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
let (rsp, kind) = rsp.expect("didn't poll when responses was empty");
let response = match rsp {
Ok(rsp) => rsp,
Err(err) => match kind {
// TODO: allow to fail on Snapshot?
MethodKind::Info =>
Response::Exception(Exception{error:err.to_string()}),
_ => return Err(err)
}
};
tracing::debug!(?response, "sending response");
response_sink.send(response?.into()).await?;
response_sink.send(response.into()).await?;
}
}
}
Expand Down

0 comments on commit a41ab4b

Please sign in to comment.