Skip to content

Commit

Permalink
Merge branch 'feature/bip155'
Browse files Browse the repository at this point in the history
  • Loading branch information
Kixunil committed Jul 10, 2024
2 parents 245f9dd + b991124 commit 79de95c
Show file tree
Hide file tree
Showing 8 changed files with 441 additions and 354 deletions.
429 changes: 263 additions & 166 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 11 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = [
description = "Finer-grained permission management for bitcoind."
edition = "2018"
name = "btc-rpc-proxy"
version = "0.3.0"
version = "0.3.1"

[lib]
name = "btc_rpc_proxy"
Expand All @@ -26,26 +26,28 @@ debug_logs = ["slog/max_level_debug"]
systemd = ["systemd_socket/enable_systemd"]

[dependencies]
anyhow = "1.0.34"
anyhow = "1.0.38"
async-channel = "1.5.1"
base32 = "0.4.0"
base64 = "0.13.0"
bitcoin = { version = "0.25.2", features = ["use-serde"] }
bitcoin = { version = "0.26.0", features = ["use-serde"] }
configure_me = { version = "0.4.0" }
futures = "0.3.8"
futures = "0.3.12"
hex = "0.4.2"
http = "0.2.1"
hyper = "0.13.9"
itertools = "0.9.0"
http = "0.2.3"
hyper = { version = "0.14.4", features = ["client", "server", "stream", "http2", "http1", "tcp"] }
itertools = "0.10.0"
lazy_static = "1.4.0"
linear-map = { version = "1.2.0", features = ["serde_impl"] }
serde = { version = "1.0.117", features = ["derive"] }
serde_json = "1.0.59"
slog = "2.7.0"
slog-async = "2.5.0"
slog-async = "2.6.0"
slog-term = "2.6.0"
socks = "0.3.3"
tokio = { version = "0.2.22", features = ["full"] }
tokio = { version = "1.0.2", features = ["full"] }
tokio-compat-02 = "0.2"
tokio_02 = { version = "0.2.22", package = "tokio", features = ["stream"] }
thiserror = "1.0.22"
systemd_socket = { version = "0.1.1", default-features = false, features = ["tokio_0_2"] }

Expand Down
96 changes: 68 additions & 28 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::SystemTime;
use anyhow::{anyhow, Error};
use futures::{channel::mpsc, StreamExt, TryStreamExt};
use hyper::{
body::Bytes,
body::to_bytes,
client::{Client, HttpConnector},
header::{HeaderValue, AUTHORIZATION, CONTENT_LENGTH},
Body, Method, Request, Response, StatusCode, Uri,
Expand Down Expand Up @@ -284,7 +284,7 @@ impl RpcClient {
message: "internal server error".to_owned(),
status: Some(StatusCode::INTERNAL_SERVER_ERROR),
});
},
}
};

let response = client
Expand All @@ -299,10 +299,7 @@ impl RpcClient {
)
.await
.map_err(Error::from)?;
let body: Bytes =
tokio::stream::StreamExt::collect::<Result<Bytes, _>>(response.into_body())
.await
.map_err(Error::from)?;
let body = to_bytes(response.into_body()).await.map_err(Error::from)?;
let forwarded_res: Vec<RpcResponse<GenericRpcMethod>> =
serde_json::from_slice(body.as_ref())?;
Ok(idxs.into_iter().zip(forwarded_res).collect())
Expand Down Expand Up @@ -341,13 +338,21 @@ impl RpcClient {
)
.await?;
let status = response.status();
let body: Bytes =
tokio::stream::StreamExt::collect::<Result<Bytes, _>>(response.into_body()).await?;
let mut rpc_response: RpcResponse<T> = serde_json::from_slice(&body)
.map_err(|serde_error| {
let body = to_bytes(response.into_body()).await?;
let mut rpc_response: RpcResponse<T> =
serde_json::from_slice(&body).map_err(|serde_error| {
match std::str::from_utf8(&body) {
Ok(body) => ClientError::ParseResponseUtf8 { method: req.method.as_str().to_owned(), status: status, body: body.to_owned(), serde_error },
Err(error) => ClientError::ResponseNotUtf8 { method: req.method.as_str().to_owned(), status: status, utf8_error: error, },
Ok(body) => ClientError::ParseResponseUtf8 {
method: req.method.as_str().to_owned(),
status,
body: body.to_owned(),
serde_error,
},
Err(error) => ClientError::ResponseNotUtf8 {
method: req.method.as_str().to_owned(),
status,
utf8_error: error,
},
}
})?;
if let Some(ref mut error) = rpc_response.error {
Expand All @@ -364,13 +369,25 @@ pub enum ClientError {
#[error("failed to load authentication data")]
LoadAuth(#[from] AuthLoadError),
#[error("hyper failed to process HTTP request")]
Hyper(#[from] hyper::error::Error),
Hyper(#[from] hyper::Error),
#[error("invalid HTTP request")]
Http(#[from] http::Error),
#[error("HTTP response (status: {status}) to method {method} can't be parsed as json, body: {body}")]
ParseResponseUtf8 { method: String, status: http::status::StatusCode, body: String, #[source] serde_error: serde_json::Error },
#[error(
"HTTP response (status: {status}) to method {method} can't be parsed as json, body: {body}"
)]
ParseResponseUtf8 {
method: String,
status: http::status::StatusCode,
body: String,
#[source]
serde_error: serde_json::Error,
},
#[error("HTTP response (status: {status}) to method {method} is not UTF-8")]
ResponseNotUtf8 { method: String, status: http::status::StatusCode, utf8_error: std::str::Utf8Error, },
ResponseNotUtf8 {
method: String,
status: http::status::StatusCode,
utf8_error: std::str::Utf8Error,
},
}

impl From<ClientError> for RpcError {
Expand Down Expand Up @@ -426,13 +443,18 @@ impl AuthSource {
}

async fn load_from_file(path: &PathBuf) -> Result<String, AuthLoadError> {
tokio::fs::read_to_string(path).await.map(|mut cookie| {
if cookie.ends_with('\n') {
cookie.pop();
}
base64::encode(cookie)
})
.map_err(|error| AuthLoadError::Read { path: path.to_owned(), error, })
tokio::fs::read_to_string(path)
.await
.map(|mut cookie| {
if cookie.ends_with('\n') {
cookie.pop();
}
base64::encode(cookie)
})
.map_err(|error| AuthLoadError::Read {
path: path.to_owned(),
error,
})
}

pub async fn try_load(&self) -> Result<HeaderValue, AuthLoadError> {
Expand All @@ -445,9 +467,15 @@ impl AuthSource {
let cache = cached.read().await.clone();
let modified = tokio::fs::metadata(&path)
.await
.map_err(|error| AuthLoadError::Metadata { path: path.to_owned(), error, })?
.map_err(|error| AuthLoadError::Metadata {
path: path.to_owned(),
error,
})?
.modified()
.map_err(|error| AuthLoadError::Modified { path: path.to_owned(), error, })?;
.map_err(|error| AuthLoadError::Modified {
path: path.to_owned(),
error,
})?;
match cache {
Some(cache) if modified == cache.0 => Ok(cache.1.clone()),
_ => {
Expand All @@ -466,11 +494,23 @@ impl AuthSource {
#[derive(Debug, thiserror::Error)]
pub enum AuthLoadError {
#[error("failed to get metadata of file {path}")]
Metadata { path: PathBuf, #[source] error: std::io::Error, },
Metadata {
path: PathBuf,
#[source]
error: std::io::Error,
},
#[error("failed to get modification time of file {path}")]
Modified { path: PathBuf, #[source] error: std::io::Error, },
Modified {
path: PathBuf,
#[source]
error: std::io::Error,
},
#[error("failed to read file {path}")]
Read { path: PathBuf, #[source] error: std::io::Error, },
Read {
path: PathBuf,
#[source]
error: std::io::Error,
},
#[error("invalid header value")]
HeaderValue(#[from] http::header::InvalidHeaderValue),
}
80 changes: 36 additions & 44 deletions src/fetch_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use bitcoin::{
consensus::{Decodable, Encodable},
hash_types::BlockHash,
network::{
address::Address,
constants::{Network::Bitcoin, ServiceFlags},
message::{NetworkMessage, RawNetworkMessage},
message_blockdata::Inventory,
Expand All @@ -21,18 +20,20 @@ use bitcoin::{
use futures::FutureExt;
use socks::Socks5Stream;

use crate::client::{RpcClient, RpcError, ClientError, RpcRequest, MISC_ERROR_CODE, PRUNE_ERROR_MESSAGE};
use crate::client::{
ClientError, RpcClient, RpcError, RpcRequest, MISC_ERROR_CODE, PRUNE_ERROR_MESSAGE,
};
use crate::rpc_methods::{GetBlock, GetBlockParams, GetPeerInfo, PeerAddressError};
use crate::state::{State, TorState};

type VersionMessageProducer = Box<dyn Fn(Address) -> RawNetworkMessage + Send + Sync>;
type VersionMessageProducer = Box<dyn Fn() -> RawNetworkMessage + Send + Sync>;

lazy_static::lazy_static! {
static ref VER_ACK: RawNetworkMessage = RawNetworkMessage {
magic: Bitcoin.magic(),
payload: NetworkMessage::Verack,
};
static ref VERSION_MESSAGE: VersionMessageProducer = Box::new(|addr| {
static ref VERSION_MESSAGE: VersionMessageProducer = Box::new(|| {
use std::time::SystemTime;
RawNetworkMessage {
magic: Bitcoin.magic(),
Expand All @@ -42,9 +43,8 @@ lazy_static::lazy_static! {
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
addr,
Address::new(&([127, 0, 0, 1], 8332).into(), ServiceFlags::NONE),
// This is OK because RPC proxy doesn't listen on P2P
bitcoin::network::Address::new(&([127, 0, 0, 1], 8332).into(), ServiceFlags::NONE),
bitcoin::network::Address::new(&([127, 0, 0, 1], 8332).into(), ServiceFlags::NONE),
0,
format!("BTC RPC Proxy v{}", env!("CARGO_PKG_VERSION")),
0,
Expand All @@ -70,6 +70,9 @@ impl Peers {
.map(|f| f.elapsed() > max_peer_age)
.unwrap_or(true)
}
pub fn is_empty(&self) -> bool {
self.peers.is_empty()
}
pub async fn updated(client: &RpcClient) -> Result<Self, PeerUpdateError> {
Ok(Self {
peers: client
Expand All @@ -83,8 +86,8 @@ impl Peers {
.into_iter()
.filter(|p| !p.inbound)
.filter(|p| p.servicesnames.contains("NETWORK"))
.map(|p| p.into_address().map(Peer::new))
.collect::<Result<_, _>>()?,
.map(|p| Peer::new(Arc::new(p.addr)))
.collect(),
fetched: Some(Instant::now()),
})
}
Expand Down Expand Up @@ -148,40 +151,22 @@ impl Write for BitcoinPeerConnection {
}
}
impl BitcoinPeerConnection {
pub async fn connect(state: Arc<State>, addr: Address) -> Result<Self, Error> {
pub async fn connect(state: Arc<State>, mut addr: Arc<String>) -> Result<Self, Error> {
if !addr.contains(":") {
addr = Arc::new(format!("{}:8333", &*addr));
}
tokio::time::timeout(
state.peer_timeout,
tokio::task::spawn_blocking(move || {
let mut stream = match (addr.socket_addr(), &state.tor) {
(Ok(addr), Some(TorState { only: false, .. })) | (Ok(addr), None) => {
BitcoinPeerConnection::ClearNet(TcpStream::connect(addr)?)
let mut stream = match &state.tor {
Some(TorState { only, proxy })
if *only || addr.split(":").next().unwrap().ends_with(".onion") =>
{
BitcoinPeerConnection::Tor(Socks5Stream::connect(proxy, &**addr)?)
}
(Ok(addr), Some(tor)) => {
BitcoinPeerConnection::Tor(Socks5Stream::connect(tor.proxy, addr)?)
}
(Err(_), Some(tor)) => BitcoinPeerConnection::Tor(Socks5Stream::connect(
tor.proxy,
(
format!(
"{}.onion",
base32::encode(
base32::Alphabet::RFC4648 { padding: false },
&addr
.address
.iter()
.map(|n| *n)
.flat_map(|n| u16::to_be_bytes(n).to_vec())
.collect::<Vec<_>>()
)
.to_lowercase()
)
.as_str(),
addr.port,
),
)?),
(Err(e), None) => return Err(e.into()),
_ => BitcoinPeerConnection::ClearNet(TcpStream::connect(&*addr)?),
};
VERSION_MESSAGE(addr).consensus_encode(&mut stream)?;
VERSION_MESSAGE().consensus_encode(&mut stream)?;
stream.flush()?;
let _ =
bitcoin::network::message::RawNetworkMessage::consensus_decode(&mut stream)?; // version
Expand All @@ -198,12 +183,12 @@ impl BitcoinPeerConnection {
}

pub struct Peer {
addr: Address,
addr: Arc<String>,
send: mpmc::Sender<BitcoinPeerConnection>,
recv: mpmc::Receiver<BitcoinPeerConnection>,
}
impl Peer {
pub fn new(addr: Address) -> Self {
pub fn new(addr: Arc<String>) -> Self {
let (send, recv) = mpmc::bounded(1);
Peer { addr, send, recv }
}
Expand All @@ -222,7 +207,7 @@ impl std::fmt::Debug for Peer {
}

pub struct PeerHandle {
addr: Address,
addr: Arc<String>,
conn: Option<BitcoinPeerConnection>,
send: mpmc::Sender<BitcoinPeerConnection>,
}
Expand Down Expand Up @@ -384,10 +369,17 @@ async fn fetch_block_from_peers(
}
futures::future::ready(())
});
let b = futures::select! {
b = recv.next().fuse() => b,
_ = runner.boxed().fuse() => None,
let mut blk_future = recv.next().fuse();
let mut b = futures::select! {
b = &mut blk_future => b,
_ = runner.boxed().fuse() => None
};
if b.is_none() {
b = match futures::poll!(blk_future) {
std::task::Poll::Ready(Some(b)) => Some(b),
_ => None,
};
}
b
}

Expand Down
Loading

0 comments on commit 79de95c

Please sign in to comment.