Skip to content

Commit

Permalink
Fix full block reconstruction (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Feb 1, 2023
1 parent f370f0e commit ea37a3f
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 88 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "solana-geyser-grpc"
version = "0.5.0+solana.1.14.13"
version = "0.5.1+solana.1.14.13"
authors = ["Triton One"]
edition = "2021"

Expand Down
14 changes: 6 additions & 8 deletions src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,24 +151,22 @@ pub struct MessageBlock {
pub transactions: Vec<MessageTransactionInfo>,
}

impl<'a> From<(&'a ReplicaBlockInfoV2<'a>, Vec<MessageTransactionInfo>)> for MessageBlock {
fn from(
(blockinfo, transactions): (&'a ReplicaBlockInfoV2<'a>, Vec<MessageTransactionInfo>),
) -> Self {
impl<'a> From<(MessageBlockMeta, Vec<MessageTransactionInfo>)> for MessageBlock {
fn from((blockinfo, transactions): (MessageBlockMeta, Vec<MessageTransactionInfo>)) -> Self {
Self {
parent_slot: blockinfo.parent_slot,
slot: blockinfo.slot,
blockhash: blockinfo.blockhash.to_string(),
parent_blockhash: blockinfo.parent_blockhash.to_string(),
rewards: blockinfo.rewards.into(),
blockhash: blockinfo.blockhash,
parent_blockhash: blockinfo.parent_blockhash,
rewards: blockinfo.rewards,
block_time: blockinfo.block_time,
block_height: blockinfo.block_height,
transactions,
}
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MessageBlockMeta {
pub parent_slot: u64,
pub slot: u64,
Expand Down
108 changes: 54 additions & 54 deletions src/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use {
crate::{
config::Config,
grpc::{GrpcService, Message, MessageTransaction, MessageTransactionInfo},
grpc::{
GrpcService, Message, MessageBlockMeta, MessageTransaction, MessageTransactionInfo,
},
prom::{self, PrometheusService},
},
log::*,
solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
},
std::time::Duration,
std::{collections::BTreeMap, time::Duration},
tokio::{
runtime::Runtime,
sync::{mpsc, oneshot},
Expand All @@ -24,7 +26,20 @@ pub struct PluginInner {
grpc_channel: mpsc::UnboundedSender<Message>,
grpc_shutdown_tx: oneshot::Sender<()>,
prometheus: PrometheusService,
transactions: Option<(u64, Vec<MessageTransactionInfo>)>,
transactions: BTreeMap<u64, (Option<MessageBlockMeta>, Vec<MessageTransactionInfo>)>,
}

impl PluginInner {
fn try_send_full_block(&mut self, slot: u64) {
if matches!(
self.transactions.get(&slot),
Some((Some(block_meta), transactions)) if block_meta.executed_transaction_count as usize == transactions.len()
) {
let (block_meta, transactions) = self.transactions.remove(&slot).expect("checked");
let message = Message::Block((block_meta.expect("checked"), transactions).into());
let _ = self.grpc_channel.send(message);
}
}
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -75,7 +90,7 @@ impl GeyserPlugin for Plugin {
grpc_channel,
grpc_shutdown_tx,
prometheus,
transactions: None,
transactions: BTreeMap::new(),
});

Ok(())
Expand Down Expand Up @@ -131,9 +146,30 @@ impl GeyserPlugin for Plugin {
}

self.with_inner(|inner| {
// Remove outdated records
if status == SlotStatus::Rooted {
loop {
match inner.transactions.keys().next().cloned() {
// Block was dropped, not in chain
Some(kslot) if kslot < slot => {
inner.transactions.remove(&kslot);
}
// Maybe log error
Some(kslot) if kslot == slot => {
if let Some((Some(_), vec)) = inner.transactions.remove(&kslot) {
prom::INVALID_FULL_BLOCKS.inc();
error!("{} transactions left for block {kslot}", vec.len());
}
}
_ => break,
}
}
}

let message = Message::Slot((slot, parent, status).into());
let _ = inner.grpc_channel.send(message);
prom::update_slot_status(slot, status);

Ok(())
})
}
Expand All @@ -152,23 +188,11 @@ impl GeyserPlugin for Plugin {
};

let msg_tx: MessageTransaction = (transaction, slot).into();
match &mut inner.transactions {
Some((current_slot, transactions)) if *current_slot == slot => {
transactions.push(msg_tx.transaction.clone());
}
Some((current_slot, _)) => {
prom::block_transactions::inc_tx();
let msg = format!(
"got tx from block {}, while current block is {}",
slot, current_slot
);
error!("{}", msg);
return Err(GeyserPluginError::Custom(msg.into()));
}
None => {
inner.transactions = Some((slot, vec![msg_tx.transaction.clone()]));
}
}

// Collect Transactions for full block message
let tx = msg_tx.transaction.clone();
inner.transactions.entry(slot).or_default().1.push(tx);
inner.try_send_full_block(slot);

let message = Message::Transaction(msg_tx);
let _ = inner.grpc_channel.send(message);
Expand All @@ -189,40 +213,16 @@ impl GeyserPlugin for Plugin {
ReplicaBlockInfoVersions::V0_0_2(info) => info,
};

let transactions = match inner.transactions.take() {
Some((slot, _transactions)) if slot != blockinfo.slot => Err(format!(
"invalid transactions for block {}, expected block {}",
blockinfo.slot, slot
)),
Some((_slot, transactions))
if transactions.len() != blockinfo.executed_transaction_count as usize =>
{
Err(format!(
"invalid count of transactions for block {}, collected {}, expected {}",
blockinfo.slot,
transactions.len(),
blockinfo.executed_transaction_count
))
}
Some((_slot, transactions)) => Ok(transactions),
None if blockinfo.executed_transaction_count == 0 => Ok(vec![]),
None => Err(format!("no transactions for block {}", blockinfo.slot)),
};
let block_meta: MessageBlockMeta = (blockinfo).into();

match transactions {
Ok(transactions) => {
let message = Message::Block((blockinfo, transactions).into());
let _ = inner.grpc_channel.send(message);
let message = Message::BlockMeta((blockinfo).into());
let _ = inner.grpc_channel.send(message);
Ok(())
}
Err(msg) => {
prom::block_transactions::inc_block();
error!("{msg}");
Err(GeyserPluginError::Custom(msg.into()))
}
}
// Save block meta for full block message
inner.transactions.entry(block_meta.slot).or_default().0 = Some(block_meta.clone());
inner.try_send_full_block(block_meta.slot);

let message = Message::BlockMeta(block_meta);
let _ = inner.grpc_channel.send(message);

Ok(())
})
}

Expand Down
28 changes: 4 additions & 24 deletions src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ lazy_static::lazy_static! {
"connections_total", "Total number of connections to GRPC service"
).unwrap();

pub static ref BLOCK_TRANSACTIONS: IntGaugeVec = IntGaugeVec::new(
Opts::new("block_transactions", "Invalid transactions for block metadata"),
&["topic"]
pub static ref INVALID_FULL_BLOCKS: IntGauge = IntGauge::new(
"invalid_full_blocks_total", "Total number of fails on constructin full blocks"
).unwrap();
}

Expand All @@ -55,7 +54,7 @@ impl PrometheusService {
register!(VERSION);
register!(SLOT_STATUS);
register!(CONNECTIONS_TOTAL);
register!(BLOCK_TRANSACTIONS);
register!(INVALID_FULL_BLOCKS);

VERSION
.with_label_values(&[
Expand All @@ -66,8 +65,6 @@ impl PrometheusService {
VERSION_INFO.version,
])
.inc();

block_transactions::install();
});

let (shutdown_signal, shutdown) = oneshot::channel();
Expand Down Expand Up @@ -120,24 +117,7 @@ pub fn update_slot_status(slot: u64, status: SlotStatus) {
.with_label_values(&[match status {
SlotStatus::Processed => "processed",
SlotStatus::Confirmed => "confirmed",
SlotStatus::Rooted => "rooted",
SlotStatus::Rooted => "finalized",
}])
.set(slot as i64);
}

pub mod block_transactions {
use super::BLOCK_TRANSACTIONS;

pub(super) fn install() {
BLOCK_TRANSACTIONS.with_label_values(&["block"]).set(0);
BLOCK_TRANSACTIONS.with_label_values(&["tx"]).set(0);
}

pub fn inc_block() {
BLOCK_TRANSACTIONS.with_label_values(&["block"]).inc();
}

pub fn inc_tx() {
BLOCK_TRANSACTIONS.with_label_values(&["tx"]).inc();
}
}

0 comments on commit ea37a3f

Please sign in to comment.