Skip to content

Commit

Permalink
Adds bundle support to block builder (#8)
Browse files Browse the repository at this point in the history
* WIP: adding bundle fetcher support

* WIP: separating auth out into authenticator

* WIP: fixing the cannot drop a runtime in a blocking context error

* WIP: Fixes tokio runtime drop bug

* wip: cleanup and remove imports

* chore: fix dep error

* chore: mark bundle poller test as it

* feat: spawn bundle poller and handle exits

* feat: ingest bundle transactions, drop bundle on failure

This implements ingesting bundle transactions naively. It does not take into account the bundle "rules", so to say, which would make it a bit more complex. If a transaction fails to decode, the bundle is considered "invalid", and dropped.

* chore: clippy

* chore: comments

* chore: clarify comment on signed orders

* chore: dedup oauth, do not use spawn blocking

* chore: remove unneeded async

* chore: clean up authenticator

* feat: track seen bundles to dedup

* feat(`oauth`): caching, shareable authenticator (#19)

* feat: caching, shareable authenticator

* fix: actually spawn builder task lmao

* chore: remove unnecesary mut selfs

---------

Co-authored-by: evalir <[email protected]>
  • Loading branch information
dylanlott and Evalir authored Nov 12, 2024
1 parent d444978 commit 012b9c2
Show file tree
Hide file tree
Showing 12 changed files with 442 additions and 53 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ aws-sdk-kms = "1.15.0"
hex = { package = "const-hex", version = "1", default-features = false, features = [
"alloc",
] }

signet-types = { git = "ssh://[email protected]/init4tech/signet-node.git" }

serde = { version = "1.0.197", features = ["derive"] }
tracing = "0.1.40"

Expand All @@ -41,7 +44,7 @@ openssl = { version = "0.10", features = ["vendored"] }
reqwest = { version = "0.11.24", features = ["blocking", "json"] }
ruint = "1.12.1"
serde_json = "1.0"
thiserror = "1.0.58"
thiserror = "1.0.68"
tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] }
tracing-subscriber = "0.3.18"

Expand Down
24 changes: 18 additions & 6 deletions bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use builder::config::BuilderConfig;
use builder::service::serve_builder_with_span;
use builder::tasks::bundler::BundlePoller;
use builder::tasks::oauth::Authenticator;
use builder::tasks::tx_poller::TxPoller;

use tokio::select;
Expand All @@ -11,8 +13,9 @@ async fn main() -> eyre::Result<()> {
tracing_subscriber::fmt::try_init().unwrap();
let span = tracing::info_span!("zenith-builder");

let config = BuilderConfig::load_from_env()?;
let config = BuilderConfig::load_from_env()?.clone();
let provider = config.connect_provider().await?;
let authenticator = Authenticator::new(&config);

tracing::debug!(
rpc_url = config.host_rpc_url.as_ref(),
Expand All @@ -23,23 +26,26 @@ async fn main() -> eyre::Result<()> {
let zenith = config.connect_zenith(provider.clone());

let port = config.builder_port;

let tx_poller = TxPoller::new(&config);
let bundle_poller = BundlePoller::new(&config, authenticator.clone()).await;
let builder = builder::tasks::block::BlockBuilder::new(&config);

let submit = builder::tasks::submit::SubmitTask {
authenticator: authenticator.clone(),
provider,
zenith,
client: reqwest::Client::new(),
sequencer_signer,
config,
config: config.clone(),
};

let authenticator_jh = authenticator.spawn();
let (submit_channel, submit_jh) = submit.spawn();
let (build_channel, build_jh) = builder.spawn(submit_channel);
let tx_poller_jh = tx_poller.spawn(build_channel.clone());
let (tx_channel, bundle_channel, build_jh) = builder.spawn(submit_channel);
let tx_poller_jh = tx_poller.spawn(tx_channel.clone());
let bundle_poller_jh = bundle_poller.spawn(bundle_channel);

let server = serve_builder_with_span(build_channel, ([0, 0, 0, 0], port), span);
let server = serve_builder_with_span(tx_channel, ([0, 0, 0, 0], port), span);

select! {
_ = submit_jh => {
Expand All @@ -54,6 +60,12 @@ async fn main() -> eyre::Result<()> {
_ = tx_poller_jh => {
tracing::info!("tx_poller finished");
}
_ = bundle_poller_jh => {
tracing::info!("bundle_poller finished");
}
_ = authenticator_jh => {
tracing::info!("authenticator finished");
}
}

tracing::info!("shutting down");
Expand Down
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const BUILDER_REWARDS_ADDRESS: &str = "BUILDER_REWARDS_ADDRESS";
const ROLLUP_BLOCK_GAS_LIMIT: &str = "ROLLUP_BLOCK_GAS_LIMIT";
const TX_POOL_URL: &str = "TX_POOL_URL";
const TX_POOL_POLL_INTERVAL: &str = "TX_POOL_POLL_INTERVAL";
const AUTH_TOKEN_REFRESH_INTERVAL: &str = "AUTH_TOKEN_REFRESH_INTERVAL";
const TX_POOL_CACHE_DURATION: &str = "TX_POOL_CACHE_DURATION";
const OAUTH_CLIENT_ID: &str = "OAUTH_CLIENT_ID";
const OAUTH_CLIENT_SECRET: &str = "OAUTH_CLIENT_SECRET";
Expand Down Expand Up @@ -82,6 +83,8 @@ pub struct BuilderConfig {
pub oauth_token_url: String,
/// OAuth audience for the builder.
pub oauth_audience: String,
/// The oauth token refresh interval in seconds.
pub oauth_token_refresh_interval: u64,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -159,6 +162,7 @@ impl BuilderConfig {
oauth_authenticate_url: load_string(OAUTH_AUTHENTICATE_URL)?,
oauth_token_url: load_string(OAUTH_TOKEN_URL)?,
oauth_audience: load_string(OAUTH_AUDIENCE)?,
oauth_token_refresh_interval: load_u64(AUTH_TOKEN_REFRESH_INTERVAL)?,
})
}

Expand Down
1 change: 0 additions & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ pub async fn ingest_raw_handler(
) -> Result<Response, AppError> {
let body = body.strip_prefix("0x").unwrap_or(&body);
let buf = hex::decode(body).map_err(AppError::bad_req)?;

let envelope = TxEnvelope::decode_2718(&mut buf.as_slice()).map_err(AppError::bad_req)?;

ingest_handler(State(state), Json(envelope)).await
Expand Down
57 changes: 50 additions & 7 deletions src/tasks/block.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use alloy::consensus::{SidecarBuilder, SidecarCoder, TxEnvelope};
use alloy::{
consensus::{SidecarBuilder, SidecarCoder, TxEnvelope},
eips::eip2718::Decodable2718,
};
use alloy_primitives::{keccak256, Bytes, B256};
use alloy_rlp::Buf;
use std::{sync::OnceLock, time::Duration};
use tokio::{select, sync::mpsc, task::JoinHandle};
use tracing::Instrument;
use zenith_types::{encode_txns, Alloy2718Coder};

use crate::config::BuilderConfig;

use super::bundler::Bundle;

#[derive(Debug, Default, Clone)]
/// A block in progress.
pub struct InProgressBlock {
Expand Down Expand Up @@ -57,6 +63,29 @@ impl InProgressBlock {
self.transactions.push(tx.clone());
}

/// Ingest a bundle into the in-progress block.
/// Ignores Signed Orders for now.
pub fn ingest_bundle(&mut self, bundle: Bundle) {
tracing::info!(bundle = %bundle.id, "ingesting bundle");

let txs = bundle
.bundle
.bundle
.txs
.into_iter()
.map(|tx| TxEnvelope::decode_2718(&mut tx.chunk()))
.collect::<Result<Vec<_>, _>>();

if let Ok(txs) = txs {
self.unseal();
// extend the transactions with the decoded transactions.
// As this builder does not provide bundles landing "top of block", its fine to just extend.
self.transactions.extend(txs);
} else {
tracing::error!("failed to decode bundle. dropping");
}
}

/// Encode the in-progress block
fn encode_raw(&self) -> &Bytes {
self.seal();
Expand Down Expand Up @@ -102,10 +131,15 @@ impl BlockBuilder {
pub fn spawn(
self,
outbound: mpsc::UnboundedSender<InProgressBlock>,
) -> (mpsc::UnboundedSender<TxEnvelope>, JoinHandle<()>) {
) -> (
mpsc::UnboundedSender<TxEnvelope>,
mpsc::UnboundedSender<Bundle>,
JoinHandle<()>,
) {
let mut in_progress = InProgressBlock::default();

let (sender, mut inbound) = mpsc::unbounded_channel();
let (tx_sender, mut tx_inbound) = mpsc::unbounded_channel();
let (bundle_sender, mut bundle_inbound) = mpsc::unbounded_channel();

let mut sleep = Box::pin(tokio::time::sleep(Duration::from_secs(
self.incoming_transactions_buffer,
Expand All @@ -131,9 +165,18 @@ impl BlockBuilder {
// irrespective of whether we have any blocks to build.
sleep.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(self.incoming_transactions_buffer));
}
item_res = inbound.recv() => {
match item_res {
Some(item) => in_progress.ingest_tx(&item),
tx_resp = tx_inbound.recv() => {
match tx_resp {
Some(tx) => in_progress.ingest_tx(&tx),
None => {
tracing::debug!("upstream task gone");
break
}
}
}
bundle_resp = bundle_inbound.recv() => {
match bundle_resp {
Some(bundle) => in_progress.ingest_bundle(bundle),
None => {
tracing::debug!("upstream task gone");
break
Expand All @@ -146,6 +189,6 @@ impl BlockBuilder {
.in_current_span(),
);

(sender, handle)
(tx_sender, bundle_sender, handle)
}
}
127 changes: 127 additions & 0 deletions src/tasks/bundler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//! Bundler service responsible for polling and submitting bundles to the in-progress block.
use std::time::{Duration, Instant};

pub use crate::config::BuilderConfig;
use alloy_primitives::map::HashMap;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use signet_types::SignetEthBundle;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::debug;

use oauth2::TokenResponse;

use super::oauth::Authenticator;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Bundle {
pub id: String,
pub bundle: SignetEthBundle,
}

/// Response from the tx-pool containing a list of bundles.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TxPoolBundleResponse {
pub bundles: Vec<Bundle>,
}

pub struct BundlePoller {
pub config: BuilderConfig,
pub authenticator: Authenticator,
pub seen_uuids: HashMap<String, Instant>,
}

/// Implements a poller for the block builder to pull bundles from the tx cache.
impl BundlePoller {
/// Creates a new BundlePoller from the provided builder config.
pub async fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
Self {
config: config.clone(),
authenticator,
seen_uuids: HashMap::new(),
}
}

/// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache.
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<Bundle>> {
let mut unique: Vec<Bundle> = Vec::new();

let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?;
let token = self.authenticator.fetch_oauth_token().await?;

// Add the token to the request headers
let result = reqwest::Client::new()
.get(bundle_url)
.bearer_auth(token.access_token().secret())
.send()
.await?
.error_for_status()?;

let body = result.bytes().await?;
let bundles: TxPoolBundleResponse = serde_json::from_slice(&body)?;

bundles.bundles.iter().for_each(|bundle| {
self.check_seen_bundles(bundle.clone(), &mut unique);
});

Ok(unique)
}

/// Checks if the bundle has been seen before and if not, adds it to the unique bundles list.
fn check_seen_bundles(&mut self, bundle: Bundle, unique: &mut Vec<Bundle>) {
self.seen_uuids.entry(bundle.id.clone()).or_insert_with(|| {
// add to the set of unique bundles
unique.push(bundle.clone());
Instant::now() + Duration::from_secs(self.config.tx_pool_cache_duration)
});
}

/// Evicts expired bundles from the cache.
fn evict(&mut self) {
let expired_keys: Vec<String> = self
.seen_uuids
.iter()
.filter_map(|(key, expiry)| {
if expiry.elapsed().is_zero() {
Some(key.clone())
} else {
None
}
})
.collect();

for key in expired_keys {
self.seen_uuids.remove(&key);
}
}

pub fn spawn(mut self, bundle_channel: mpsc::UnboundedSender<Bundle>) -> JoinHandle<()> {
let handle: JoinHandle<()> = tokio::spawn(async move {
loop {
let bundle_channel = bundle_channel.clone();
let bundles = self.check_bundle_cache().await;

match bundles {
Ok(bundles) => {
for bundle in bundles {
let result = bundle_channel.send(bundle);
if result.is_err() {
tracing::debug!("bundle_channel failed to send bundle");
}
}
}
Err(err) => {
debug!(?err, "error fetching bundles from tx-pool");
}
}

// evict expired bundles once every loop
self.evict();

tokio::time::sleep(Duration::from_secs(self.config.tx_pool_poll_interval)).await;
}
});

handle
}
}
2 changes: 2 additions & 0 deletions src/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod block;
pub mod bundler;
pub mod oauth;
pub mod submit;
pub mod tx_poller;
Loading

0 comments on commit 012b9c2

Please sign in to comment.