Skip to content

Commit

Permalink
feat: filter confirmed transactions from the block (#31)
Browse files Browse the repository at this point in the history
* feat: filter confirmed transactions from the block

* add tracing

* rm mut self

* fix: imports

* update log levels

* update: don't connect signer to rollup provider

* fix: no need to clone

* comments

* fix: use dummy rpcs
  • Loading branch information
anna-carroll authored Dec 12, 2024
1 parent dfa90a0 commit 32a6998
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 38 deletions.
9 changes: 5 additions & 4 deletions bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,21 @@ async fn main() -> eyre::Result<()> {
let span = tracing::info_span!("zenith-builder");

let config = BuilderConfig::load_from_env()?.clone();
let provider = config.connect_provider().await?;
let host_provider = config.connect_host_provider().await?;
let ru_provider = config.connect_ru_provider().await?;
let authenticator = Authenticator::new(&config);

PrometheusBuilder::new().install().expect("failed to install prometheus exporter");

tracing::debug!(rpc_url = config.host_rpc_url.as_ref(), "instantiated provider");

let sequencer_signer = config.connect_sequencer_signer().await?;
let zenith = config.connect_zenith(provider.clone());
let zenith = config.connect_zenith(host_provider.clone());

let builder = BlockBuilder::new(&config, authenticator.clone());
let builder = BlockBuilder::new(&config, authenticator.clone(), ru_provider);
let submit = SubmitTask {
authenticator: authenticator.clone(),
provider,
host_provider,
zenith,
client: reqwest::Client::new(),
sequencer_signer,
Expand Down
30 changes: 27 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use zenith_types::Zenith;
const HOST_CHAIN_ID: &str = "HOST_CHAIN_ID";
const RU_CHAIN_ID: &str = "RU_CHAIN_ID";
const HOST_RPC_URL: &str = "HOST_RPC_URL";
const ROLLUP_RPC_URL: &str = "ROLLUP_RPC_URL";
const TX_BROADCAST_URLS: &str = "TX_BROADCAST_URLS";
const ZENITH_ADDRESS: &str = "ZENITH_ADDRESS";
const QUINCEY_URL: &str = "QUINCEY_URL";
Expand Down Expand Up @@ -44,6 +45,8 @@ pub struct BuilderConfig {
pub ru_chain_id: u64,
/// URL for Host RPC node.
pub host_rpc_url: Cow<'static, str>,
/// URL for the Rollup RPC node.
pub ru_rpc_url: Cow<'static, str>,
/// Additional RPC URLs to which to broadcast transactions.
pub tx_broadcast_urls: Vec<Cow<'static, str>>,
/// address of the Zenith contract on Host.
Expand Down Expand Up @@ -116,7 +119,7 @@ impl ConfigError {
}
}

/// Provider type used by this transaction
/// Provider type used to read & write.
pub type Provider = FillProvider<
JoinFill<
JoinFill<
Expand All @@ -130,6 +133,17 @@ pub type Provider = FillProvider<
Ethereum,
>;

/// Provider type used to read-only.
pub type WalletlessProvider = FillProvider<
JoinFill<
Identity,
JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
>,
RootProvider<BoxTransport>,
BoxTransport,
Ethereum,
>;

pub type ZenithInstance = Zenith::ZenithInstance<BoxTransport, Provider>;

impl BuilderConfig {
Expand All @@ -139,6 +153,7 @@ impl BuilderConfig {
host_chain_id: load_u64(HOST_CHAIN_ID)?,
ru_chain_id: load_u64(RU_CHAIN_ID)?,
host_rpc_url: load_url(HOST_RPC_URL)?,
ru_rpc_url: load_url(ROLLUP_RPC_URL)?,
tx_broadcast_urls: env::var(TX_BROADCAST_URLS)
.unwrap_or_default()
.split(',')
Expand Down Expand Up @@ -180,8 +195,17 @@ impl BuilderConfig {
}
}

/// Connect to the provider using the configuration.
pub async fn connect_provider(&self) -> Result<Provider, ConfigError> {
/// Connect to the Rollup rpc provider.
pub async fn connect_ru_provider(&self) -> Result<WalletlessProvider, ConfigError> {
ProviderBuilder::new()
.with_recommended_fillers()
.on_builtin(&self.ru_rpc_url)
.await
.map_err(Into::into)
}

/// Connect to the Host rpc provider.
pub async fn connect_host_provider(&self) -> Result<Provider, ConfigError> {
let builder_signer = self.connect_builder_signer().await?;
ProviderBuilder::new()
.with_recommended_fillers()
Expand Down
72 changes: 54 additions & 18 deletions src/tasks/block.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use super::bundler::{Bundle, BundlePoller};
use super::oauth::Authenticator;
use super::tx_poller::TxPoller;
use crate::config::{BuilderConfig, WalletlessProvider};
use alloy::providers::Provider;
use alloy::{
consensus::{SidecarBuilder, SidecarCoder, TxEnvelope},
eips::eip2718::Decodable2718,
Expand All @@ -10,19 +15,13 @@ use tokio::{sync::mpsc, task::JoinHandle};
use tracing::Instrument;
use zenith_types::{encode_txns, Alloy2718Coder};

use super::bundler::{Bundle, BundlePoller};
use super::oauth::Authenticator;
use super::tx_poller::TxPoller;
use crate::config::BuilderConfig;

/// Ethereum's slot time in seconds.
pub const ETHEREUM_SLOT_TIME: u64 = 12;

#[derive(Debug, Default, Clone)]
/// A block in progress.
pub struct InProgressBlock {
transactions: Vec<TxEnvelope>,

raw_encoding: OnceLock<Bytes>,
hash: OnceLock<B256>,
}
Expand Down Expand Up @@ -57,15 +56,22 @@ impl InProgressBlock {

/// Ingest a transaction into the in-progress block. Fails
pub fn ingest_tx(&mut self, tx: &TxEnvelope) {
tracing::info!(hash = %tx.tx_hash(), "ingesting tx");
tracing::trace!(hash = %tx.tx_hash(), "ingesting tx");
self.unseal();
self.transactions.push(tx.clone());
}

/// Remove a transaction from the in-progress block.
pub fn remove_tx(&mut self, tx: &TxEnvelope) {
tracing::trace!(hash = %tx.tx_hash(), "removing tx");
self.unseal();
self.transactions.retain(|t| t.tx_hash() != tx.tx_hash());
}

/// Ingest a bundle into the in-progress block.
/// Ignores Signed Orders for now.
pub fn ingest_bundle(&mut self, bundle: Bundle) {
tracing::info!(bundle = %bundle.id, "ingesting bundle");
tracing::trace!(bundle = %bundle.id, "ingesting bundle");

let txs = bundle
.bundle
Expand Down Expand Up @@ -113,26 +119,32 @@ impl InProgressBlock {
/// BlockBuilder is a task that periodically builds a block then sends it for signing and submission.
pub struct BlockBuilder {
pub config: BuilderConfig,
pub ru_provider: WalletlessProvider,
pub tx_poller: TxPoller,
pub bundle_poller: BundlePoller,
}

impl BlockBuilder {
// create a new block builder with the given config.
pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
pub fn new(
config: &BuilderConfig,
authenticator: Authenticator,
ru_provider: WalletlessProvider,
) -> Self {
Self {
config: config.clone(),
ru_provider,
tx_poller: TxPoller::new(config),
bundle_poller: BundlePoller::new(config, authenticator),
}
}

async fn get_transactions(&mut self, in_progress: &mut InProgressBlock) {
tracing::info!("query transactions from cache");
tracing::trace!("query transactions from cache");
let txns = self.tx_poller.check_tx_cache().await;
match txns {
Ok(txns) => {
tracing::info!("got transactions response");
tracing::trace!("got transactions response");
for txn in txns.into_iter() {
in_progress.ingest_tx(&txn);
}
Expand All @@ -145,11 +157,11 @@ impl BlockBuilder {
}

async fn _get_bundles(&mut self, in_progress: &mut InProgressBlock) {
tracing::info!("query bundles from cache");
tracing::trace!("query bundles from cache");
let bundles = self.bundle_poller.check_bundle_cache().await;
match bundles {
Ok(bundles) => {
tracing::info!("got bundles response");
tracing::trace!("got bundles response");
for bundle in bundles {
in_progress.ingest_bundle(bundle);
}
Expand All @@ -161,15 +173,36 @@ impl BlockBuilder {
self.bundle_poller.evict();
}

async fn filter_transactions(&self, in_progress: &mut InProgressBlock) {
// query the rollup node to see which transaction(s) have been included
let mut confirmed_transactions = Vec::new();
for transaction in in_progress.transactions.iter() {
let tx = self
.ru_provider
.get_transaction_by_hash(*transaction.tx_hash())
.await
.expect("failed to get receipt");
if tx.is_some() {
confirmed_transactions.push(transaction.clone());
}
}
tracing::trace!(confirmed = confirmed_transactions.len(), "found confirmed transactions");

// remove already-confirmed transactions
for transaction in confirmed_transactions {
in_progress.remove_tx(&transaction);
}
}

// calculate the duration in seconds until the beginning of the next block slot.
fn secs_to_next_slot(&mut self) -> u64 {
fn secs_to_next_slot(&self) -> u64 {
let curr_timestamp: u64 = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
let current_slot_time = (curr_timestamp - self.config.chain_offset) % ETHEREUM_SLOT_TIME;
(ETHEREUM_SLOT_TIME - current_slot_time) % ETHEREUM_SLOT_TIME
}

// add a buffer to the beginning of the block slot.
fn secs_to_next_target(&mut self) -> u64 {
fn secs_to_next_target(&self) -> u64 {
self.secs_to_next_slot() + self.config.target_slot_time
}

Expand All @@ -190,16 +223,19 @@ impl BlockBuilder {
// TODO: Implement bundle ingestion #later
// self.get_bundles(&mut in_progress).await;

// Filter confirmed transactions from the block
self.filter_transactions(&mut in_progress).await;

// submit the block if it has transactions
if !in_progress.is_empty() {
tracing::info!(txns = in_progress.len(), "sending block to submit task");
tracing::debug!(txns = in_progress.len(), "sending block to submit task");
let in_progress_block = std::mem::take(&mut in_progress);
if outbound.send(in_progress_block).is_err() {
tracing::debug!("downstream task gone");
tracing::error!("downstream task gone");
break;
}
} else {
tracing::info!("no transactions, skipping block submission");
tracing::debug!("no transactions, skipping block submission");
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/tasks/oauth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ mod tests {
let config = BuilderConfig {
host_chain_id: 17000,
ru_chain_id: 17001,
host_rpc_url: "http://rpc.holesky.signet.sh".into(),
host_rpc_url: "host-rpc.example.com".into(),
ru_rpc_url: "ru-rpc.example.com".into(),
zenith_address: Address::default(),
quincey_url: "http://localhost:8080".into(),
builder_port: 8080,
Expand Down
20 changes: 10 additions & 10 deletions src/tasks/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub enum ControlFlow {
/// Submits sidecars in ethereum txns to mainnet ethereum
pub struct SubmitTask {
/// Ethereum Provider
pub provider: Provider,
pub host_provider: Provider,
/// Zenith
pub zenith: ZenithInstance,
/// Reqwest
Expand Down Expand Up @@ -127,7 +127,7 @@ impl SubmitTask {
}

async fn next_host_block_height(&self) -> eyre::Result<u64> {
let result = self.provider.get_block_number().await?;
let result = self.host_provider.get_block_number().await?;
let next = result.checked_add(1).ok_or_else(|| eyre!("next host block height overflow"))?;
Ok(next)
}
Expand All @@ -152,12 +152,12 @@ impl SubmitTask {

let tx = self
.build_blob_tx(header, v, r, s, in_progress)?
.with_from(self.provider.default_signer_address())
.with_from(self.host_provider.default_signer_address())
.with_to(self.config.zenith_address)
.with_gas_limit(1_000_000);

if let Err(TransportError::ErrorResp(e)) =
self.provider.call(&tx).block(BlockNumberOrTag::Pending.into()).await
self.host_provider.call(&tx).block(BlockNumberOrTag::Pending.into()).await
{
error!(
code = e.code,
Expand Down Expand Up @@ -186,16 +186,16 @@ impl SubmitTask {
"sending transaction to network"
);

let SendableTx::Envelope(tx) = self.provider.fill(tx).await? else {
let SendableTx::Envelope(tx) = self.host_provider.fill(tx).await? else {
bail!("failed to fill transaction")
};

// Send the tx via the primary provider
let fut = spawn_provider_send!(&self.provider, &tx);
// Send the tx via the primary host_provider
let fut = spawn_provider_send!(&self.host_provider, &tx);

// Spawn send_tx futures for all additional broadcast providers
for provider in self.config.connect_additional_broadcast().await? {
spawn_provider_send!(&provider, &tx);
// Spawn send_tx futures for all additional broadcast host_providers
for host_provider in self.config.connect_additional_broadcast().await? {
spawn_provider_send!(&host_provider, &tx);
}

// question mark unwraps join error, which would be an internal panic
Expand Down
3 changes: 2 additions & 1 deletion tests/bundle_poller_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ mod tests {
let config = BuilderConfig {
host_chain_id: 17000,
ru_chain_id: 17001,
host_rpc_url: "http://rpc.holesky.signet.sh".into(),
host_rpc_url: "host-rpc.example.com".into(),
ru_rpc_url: "ru-rpc.example.com".into(),
zenith_address: Address::default(),
quincey_url: "http://localhost:8080".into(),
builder_port: 8080,
Expand Down
3 changes: 2 additions & 1 deletion tests/tx_poller_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ mod tests {
let config = BuilderConfig {
host_chain_id: 17000,
ru_chain_id: 17001,
host_rpc_url: "http://rpc.holesky.signet.sh".into(),
host_rpc_url: "host-rpc.example.com".into(),
ru_rpc_url: "ru-rpc.example.com".into(),
tx_broadcast_urls: vec!["http://localhost:9000".into()],
zenith_address: Address::default(),
quincey_url: "http://localhost:8080".into(),
Expand Down

0 comments on commit 32a6998

Please sign in to comment.