Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor node.rs into multiple modules #52

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/api/root.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
errors::BlockfrostError,
node::{NodeConnPool, SyncProgress},
node::{pool::NodeConnPool, SyncProgress},
};
use axum::{response::IntoResponse, Extension, Json};
use serde::Serialize;
Expand Down
2 changes: 1 addition & 1 deletion src/api/tx/submit.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{common::validate_content_type, errors::BlockfrostError, node::NodeConnPool};
use crate::{common::validate_content_type, errors::BlockfrostError, node::pool::NodeConnPool};
use axum::{http::HeaderMap, response::IntoResponse, Extension, Json};

pub async fn route(
Expand Down
5 changes: 3 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use errors::BlockfrostError;
use icebreakers_api::IcebreakersAPI;
use middlewares::errors::error_middleware;
use middlewares::metrics::track_http_metrics;
use node::pool;
use std::sync::Arc;
use tokio::sync::RwLock;
use tower::ServiceBuilder;
Expand All @@ -48,7 +49,7 @@ async fn main() -> Result<(), AppError> {

let max_node_connections = 8;

let node_conn_pool = node::NodeConnPool::new(
let node_conn_pool = pool::NodeConnPool::new(
max_node_connections,
&config.node_socket_path,
config.network_magic,
Expand Down Expand Up @@ -86,7 +87,7 @@ async fn main() -> Result<(), AppError> {
Ok(())
}

async fn node_health_check_task(node: node::NodeConnPool) {
async fn node_health_check_task(node: pool::NodeConnPool) {
loop {
// It’s enough to get a working connection from the pool, because it’s being checked then.
let health = node.get().await.map(drop).inspect_err(|err| {
Expand Down
348 changes: 2 additions & 346 deletions src/node.rs
Original file line number Diff line number Diff line change
@@ -1,264 +1,5 @@
use crate::cbor::haskell_types::TxValidationError;
use crate::errors::{AppError, BlockfrostError};
use chrono::{Duration, TimeZone, Utc};
use deadpool::managed::Object;
use metrics::gauge;
use pallas_codec::minicbor::{display, Decoder};
use pallas_crypto::hash::Hasher;
use pallas_network::miniprotocols::localtxsubmission::{EraTx, Response};
use pallas_network::multiplexer::Error;
use pallas_network::{miniprotocols, miniprotocols::localstate};
use pallas_traverse::wellknown;
use std::boxed::Box;
use std::pin::Pin;
use tracing::{error, info, warn};

/// This represents a pool of Node2Client connections to a single `cardano-node`.
///
/// It can be safely cloned to multiple threads, while still sharing the same
/// set of underlying connections to the node.
#[derive(Clone)]
pub struct NodeConnPool {
underlying: deadpool::managed::Pool<NodeConnPoolManager>,
}

/// Our wrapper around [`pallas_network::facades::NodeClient`]. If you only use
/// this, you won’t get any deadlocks, inconsistencies, etc.
pub struct NodeConn {
/// Note: this is an [`Option`] *only* to satisfy the borrow checker. It’s
/// *always* [`Some`]. See [`NodeConnPoolManager::recycle`] for an
/// explanation.
underlying: Option<pallas_network::facades::NodeClient>,
}

impl NodeConnPool {
/// Creates a new pool of [`NodeConn`] connections.
pub fn new(
pool_max_size: usize,
socket_path: &str,
network_magic: u64,
) -> Result<Self, AppError> {
let manager = NodeConnPoolManager {
network_magic,
socket_path: socket_path.to_string(),
};
let underlying = deadpool::managed::Pool::builder(manager)
.max_size(pool_max_size)
.build()
.map_err(|err| AppError::Node(err.to_string()))?;
Ok(Self { underlying })
}

/// Borrows a single [`NodeConn`] connection from the pool.
pub async fn get(&self) -> Result<Object<NodeConnPoolManager>, AppError> {
self.underlying
.get()
.await
.map_err(|err| AppError::Node(format!("NodeConnPool: {}", err)))
}
}

impl NodeConn {
/// Submits a transaction to the connected Cardano node.
pub async fn submit_transaction(&mut self, tx: String) -> Result<String, BlockfrostError> {
let tx = hex::decode(tx).map_err(|e| BlockfrostError::custom_400(e.to_string()))?;
let txid = hex::encode(Hasher::<256>::hash_cbor(&tx));

let era_tx = EraTx(6, tx);

// Connect to the node
let submission_client = self.underlying.as_mut().unwrap().submission();

// Submit the transaction
match submission_client.submit_tx(era_tx).await {
Ok(Response::Accepted) => {
info!("Transaction accepted by the node {}", txid);
Ok(txid)
}
Ok(Response::Rejected(reason)) => {
let reason = reason.0;

let msg_res = Self::try_decode_error(&reason);

let error_message = format!("Transaction rejected with reason: {:?}", msg_res);

warn!(error_message);

Err(BlockfrostError::custom_400(error_message))
}
Err(e) => {
let error_message = format!("Error during transaction submission: {:?}", e);

Err(BlockfrostError::custom_400(error_message))
}
}
}

/// We always have to release the [`localstate::GenericClient`], even on errors,
/// otherwise `cardano-node` stalls. If you use this function, it’s handled for you.
async fn with_statequery<A, F>(&mut self, action: F) -> Result<A, BlockfrostError>
where
F: for<'a> FnOnce(
&'a mut localstate::GenericClient,
) -> Pin<
Box<dyn std::future::Future<Output = Result<A, BlockfrostError>> + 'a + Sync + Send>,
>,
{
// Acquire the client
let client = self.underlying.as_mut().unwrap().statequery();
client.acquire(None).await?;

// Run the action and ensure the client is released afterwards
let result = action(client).await;

// Always release the client, even if action fails
if let Err(e) = client.send_release().await {
warn!("Failed to release client: {:?}", e);
}

result
}

/// Pings the node, e.g. to see if the connection is still alive.
pub async fn ping(&mut self) -> Result<(), BlockfrostError> {
// FIXME: we should be able to use `miniprotocols::keepalive`
// (cardano-cli does), but for some reason it’s not added to
// `NodeClient`? Let’s try to acquire a local state client instead:

self.with_statequery(|_| Box::pin(async { Ok(()) })).await
}

/// Reports the sync progress of the node.
pub async fn sync_progress(&mut self) -> Result<SyncProgress, BlockfrostError> {
self.with_statequery(|generic_client: &mut localstate::GenericClient| {
Box::pin(async {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ginnun I didn't look closely enough! But look here. The changes aren't the same in the new file. Here it's after #39, and there before?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let current_era = localstate::queries_v16::get_current_era(generic_client).await?;

let epoch =
localstate::queries_v16::get_block_epoch_number(generic_client, current_era)
.await?;

let geneses =
localstate::queries_v16::get_genesis_config(generic_client, current_era)
.await?;
let genesis = geneses.first().ok_or_else(|| {
BlockfrostError::internal_server_error(
"Expected at least one genesis".to_string(),
)
})?;

let system_start =
localstate::queries_v16::get_system_start(generic_client).await?;
let chain_point = localstate::queries_v16::get_chain_point(generic_client).await?;
let slot = chain_point.slot_or_default();

// FIXME: this is debatable, because it won’t work for custom networks; we should rather
// get this information by calling `Ouroboros.Consensus.HardFork.History.Qry.slotToWallclock`
// like both cardano-cli (through cardano-api) and Ogmios do, but it’s not implemented
// in pallas_network yet.
let wellknown_genesis = wellknown::GenesisValues::from_magic(
genesis.network_magic.into(),
)
.ok_or_else(|| {
BlockfrostError::internal_server_error(format!(
"Only well-known networks are supported (unsupported network magic: {})",
genesis.network_magic
))
})?;

let year: i32 = system_start.year.try_into().map_err(|e| {
BlockfrostError::internal_server_error(format!("Failed to convert year: {}", e))
})?;

let base_date = Utc
.with_ymd_and_hms(year, 1, 1, 0, 0, 0)
.single()
.ok_or_else(|| {
BlockfrostError::internal_server_error("Invalid base date".to_string())
})?;

let days = Duration::days((system_start.day_of_year - 1).into());

let nanoseconds: i64 = (system_start.picoseconds_of_day / 1_000)
.try_into()
.map_err(|e| {
BlockfrostError::internal_server_error(format!(
"Failed to convert picoseconds: {}",
e
))
})?;

let duration_ns = Duration::nanoseconds(nanoseconds);

let utc_start = base_date + days + duration_ns;

let slot_time_secs: i64 = wellknown_genesis
.slot_to_wallclock(slot)
.try_into()
.map_err(|e| {
BlockfrostError::internal_server_error(format!(
"Failed to convert slot time: {}",
e
))
})?;

let utc_slot = Utc
.timestamp_opt(slot_time_secs, 0)
.single()
.ok_or_else(|| {
BlockfrostError::internal_server_error("Invalid slot timestamp".to_string())
})?;

let utc_now = Utc::now();

let utc_slot_capped = std::cmp::min(utc_now, utc_slot);

let tolerance = 60; // [s]
let percentage = if (utc_now - utc_slot_capped).num_seconds() < tolerance {
1.0
} else {
let network_duration = (utc_now - utc_start).num_seconds() as f64;
let duration_up_to_slot = (utc_slot_capped - utc_start).num_seconds() as f64;
duration_up_to_slot / network_duration
};

let block = match chain_point {
miniprotocols::Point::Origin => String::new(),
miniprotocols::Point::Specific(_, block) => hex::encode(&block),
};

Ok(SyncProgress {
percentage,
era: current_era,
epoch,
slot,
block,
})
})
})
.await
}

fn try_decode_error(buffer: &[u8]) -> Result<Option<TxValidationError>, Error> {
let maybe_error = Decoder::new(buffer).decode();

match maybe_error {
Ok(error) => Ok(Some(error)),
Err(err) => {
let buffer_display = display(buffer);
warn!(
"Failed to decode error: {:?}, buffer: {}",
err, buffer_display
);

// Decoding failures are not errors, but some missing implementation or mis-implementations on our side.
// A decoding failure is a bug in our code, not a bug in the node.
// It should not effect the program flow, but should be logged and reported.
Err(Error::Decoding(err.to_string()))
}
}
}
}
pub mod connection;
pub mod pool;

#[derive(serde::Serialize)]
pub struct SyncProgress {
Expand All @@ -268,88 +9,3 @@ pub struct SyncProgress {
slot: u64,
block: String,
}

pub struct NodeConnPoolManager {
network_magic: u64,
socket_path: String,
}

impl deadpool::managed::Manager for NodeConnPoolManager {
type Type = NodeConn;
type Error = AppError;

async fn create(&self) -> Result<NodeConn, AppError> {
// TODO: maybe use `ExponentialBackoff` from `tokio-retry`, to have at
// least _some_ debouncing between requests, if the node is down?
match pallas_network::facades::NodeClient::connect(&self.socket_path, self.network_magic)
.await
{
Ok(conn) => {
info!(
"N2C connection to node was successfully established at socket: {}",
self.socket_path
);
gauge!("cardano_node_connections").increment(1);
Ok(NodeConn {
underlying: Some(conn),
})
}
Err(err) => {
error!(
"Failed to connect to a N2C node socket: {}: {:?}",
self.socket_path, err
);
Err(AppError::Node(err.to_string()))
}
}
}

/// Pallas decided to make the
/// [`pallas_network::facades::NodeClient::abort`] take ownership of `self`.
/// That’s why we need our [`NodeConn::underlying`] to be an [`Option`],
/// because in here we only get a mutable reference. If the connection is
/// broken, we have to call `abort`, because it joins certain multiplexer
/// threads. Otherwise, it’s a resource leak.
async fn recycle(
&self,
conn: &mut NodeConn,
metrics: &deadpool::managed::Metrics,
) -> deadpool::managed::RecycleResult<AppError> {
// Check if the connection is still viable
match conn.ping().await {
Ok(_) => Ok(()),
Err(err) => {
error!(
"N2C connection no longer viable: {}, {}, {:?}",
self.socket_path, err, metrics
);
// Take ownership of the `NodeClient` from Pallas
let owned = conn.underlying.take().unwrap();
// This is the only moment when `underlying` becomes `None`. But
// it will never be used again.
gauge!("cardano_node_connections").decrement(1);
// Now call `abort` to clean up their resources:
owned.abort().await;
// And scrap the connection from the pool:
Err(deadpool::managed::RecycleError::Backend(AppError::Node(
err.to_string(),
)))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_try_decode_error() {
let buffer = [
130, 2, 129, 130, 6, 130, 130, 1, 130, 0, 131, 6, 27, 0, 0, 0, 2, 54, 42, 119, 48, 27,
0, 0, 0, 2, 83, 185, 193, 29, 130, 1, 130, 0, 131, 5, 26, 0, 2, 139, 253, 24, 173,
];
let error = NodeConn::try_decode_error(&buffer).unwrap();

assert!(error.is_some());
}
}
Loading
Loading