Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/optimism/flashblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ reth-rpc-eth-api.workspace = true
reth-rpc-eth-types.workspace = true
reth-errors.workspace = true
reth-storage-api.workspace = true
reth-tasks.workspace = true

# alloy
alloy-eips = { workspace = true, features = ["serde"] }
Expand Down
1 change: 1 addition & 0 deletions crates/optimism/flashblocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub use ws::{WsConnect, WsFlashBlockStream};
mod payload;
mod sequence;
mod service;
mod worker;
mod ws;

/// Receiver of the most recent [`PendingBlock`] built out of [`FlashBlock`]s.
Expand Down
297 changes: 149 additions & 148 deletions crates/optimism/flashblocks/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
use crate::{sequence::FlashBlockSequence, ExecutionPayloadBaseV1, FlashBlock};
use alloy_eips::BlockNumberOrTag;
use crate::{
sequence::FlashBlockSequence,
worker::{BuildArgs, FlashBlockBuilder},
ExecutionPayloadBaseV1, FlashBlock,
};
use alloy_eips::eip2718::WithEncoded;
use alloy_primitives::B256;
use futures_util::{FutureExt, Stream, StreamExt};
use reth_chain_state::{
CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions, ExecutedBlock,
};
use reth_errors::RethError;
use reth_evm::{
execute::{BlockBuilder, BlockBuilderOutcome},
ConfigureEvm,
use reth_chain_state::{CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions};
use reth_evm::ConfigureEvm;
use reth_primitives_traits::{
AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, Recovered,
};
use reth_execution_types::ExecutionOutcome;
use reth_primitives_traits::{AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy};
use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State};
use reth_rpc_eth_types::{EthApiError, PendingBlock};
use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory};
use reth_revm::cached::CachedReads;
use reth_rpc_eth_types::PendingBlock;
use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
use reth_tasks::TaskExecutor;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
task::{ready, Context, Poll},
time::Instant,
};
use tokio::pin;
use tokio::{pin, sync::oneshot};
use tracing::{debug, trace, warn};

/// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of
Expand All @@ -37,9 +36,10 @@ pub struct FlashBlockService<
current: Option<PendingBlock<N>>,
blocks: FlashBlockSequence<N::SignedTx>,
rebuild: bool,
evm_config: EvmConfig,
provider: Provider,
builder: FlashBlockBuilder<EvmConfig, Provider>,
canon_receiver: CanonStateNotifications<N>,
spawner: TaskExecutor,
job: Option<BuildJob<N>>,
/// Cached state reads for the current block.
/// Current `PendingBlock` is built out of a sequence of `FlashBlocks`, and executed again when
/// fb received on top of the same block. Avoid redundant I/O across multiple executions
Expand All @@ -50,28 +50,33 @@ pub struct FlashBlockService<
impl<N, S, EvmConfig, Provider> FlashBlockService<N, S, EvmConfig, Provider>
where
N: NodePrimitives,
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin,
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>,
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin + 'static,
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>
+ Clone
+ 'static,
Provider: StateProviderFactory
+ CanonStateSubscriptions<Primitives = N>
+ BlockReaderIdExt<
Header = HeaderTy<N>,
Block = BlockTy<N>,
Transaction = N::SignedTx,
Receipt = ReceiptTy<N>,
> + Unpin,
> + Unpin
+ Clone
+ 'static,
{
/// Constructs a new `FlashBlockService` that receives [`FlashBlock`]s from `rx` stream.
pub fn new(rx: S, evm_config: EvmConfig, provider: Provider) -> Self {
pub fn new(rx: S, evm_config: EvmConfig, provider: Provider, spawner: TaskExecutor) -> Self {
Self {
rx,
current: None,
blocks: FlashBlockSequence::new(),
evm_config,
canon_receiver: provider.subscribe_to_canonical_state(),
provider,
cached_state: None,
builder: FlashBlockBuilder::new(evm_config, provider),
rebuild: false,
spawner,
job: None,
cached_state: None,
}
}

Expand All @@ -88,86 +93,35 @@ where
warn!("Flashblock service has stopped");
}

/// Returns the cached reads at the given head hash.
/// Returns the [`BuildArgs`] made purely out of [`FlashBlock`]s that were received earlier.
///
/// Returns a new cache instance if this is new `head` hash.
fn cached_reads(&mut self, head: B256) -> CachedReads {
if let Some((tracked, cache)) = self.cached_state.take() {
if tracked == head {
return cache
}
}

// instantiate a new cache instance
CachedReads::default()
}

/// Updates the cached reads at the given head hash
fn update_cached_reads(&mut self, head: B256, cached_reads: CachedReads) {
self.cached_state = Some((head, cached_reads));
}

/// Returns the [`ExecutedBlock`] made purely out of [`FlashBlock`]s that were received earlier.
///
/// Returns None if the flashblock doesn't attach to the latest header.
fn execute(&mut self) -> eyre::Result<Option<PendingBlock<N>>> {
trace!("Attempting new flashblock");

let latest = self
.provider
.latest_header()?
.ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
let latest_hash = latest.hash();

let Some(attrs) = self.blocks.payload_base() else {
trace!(flashblock_number = ?self.blocks.block_number(), count = %self.blocks.count(), "Missing flashblock payload base");
return Ok(None)
/// Returns `None` if the flashblock have no `base` or the base is not a child block of latest.
fn build_args(
&mut self,
) -> Option<BuildArgs<impl IntoIterator<Item = WithEncoded<Recovered<N::SignedTx>>>>> {
let Some(base) = self.blocks.payload_base() else {
trace!(
flashblock_number = ?self.blocks.block_number(),
count = %self.blocks.count(),
"Missing flashblock payload base"
);

return None
};

if attrs.parent_hash != latest_hash {
trace!(flashblock_parent = ?attrs.parent_hash, local_latest=?latest.num_hash(),"Skipping non consecutive flashblock");
// doesn't attach to the latest block
return Ok(None)
}

let state_provider = self.provider.history_by_block_hash(latest.hash())?;

let mut request_cache = self.cached_reads(latest_hash);
let cached_db = request_cache.as_db_mut(StateProviderDatabase::new(&state_provider));
let mut state = State::builder().with_database(cached_db).with_bundle_update().build();

let mut builder = self
.evm_config
.builder_for_next_block(&mut state, &latest, attrs.into())
.map_err(RethError::other)?;

builder.apply_pre_execution_changes()?;

for tx in self.blocks.ready_transactions() {
let _gas_used = builder.execute_transaction(tx)?;
// attempt an initial consecutive check
if let Some(latest) = self.builder.provider().latest_header().ok().flatten() {
if latest.hash() != base.parent_hash {
trace!(flashblock_parent=?base.parent_hash, flashblock_number=base.block_number, local_latest=?latest.num_hash(), "Skipping non consecutive build attempt");
return None;
}
}

let BlockBuilderOutcome { execution_result, block, hashed_state, .. } =
builder.finish(NoopProvider::default())?;

let execution_outcome = ExecutionOutcome::new(
state.take_bundle(),
vec![execution_result.receipts],
block.number(),
vec![execution_result.requests],
);

// update cached reads
self.update_cached_reads(latest_hash, request_cache);

Ok(Some(PendingBlock::with_executed_block(
Instant::now() + Duration::from_secs(1),
ExecutedBlock {
recovered_block: block.into(),
execution_output: Arc::new(execution_outcome),
hashed_state: Arc::new(hashed_state),
},
)))
Some(BuildArgs {
base,
transactions: self.blocks.ready_transactions().collect::<Vec<_>>(),
cached_state: self.cached_state.take(),
})
}

/// Takes out `current` [`PendingBlock`] if `state` is not preceding it.
Expand All @@ -180,72 +134,119 @@ where
impl<N, S, EvmConfig, Provider> Stream for FlashBlockService<N, S, EvmConfig, Provider>
where
N: NodePrimitives,
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin,
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>,
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin + 'static,
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>
+ Clone
+ 'static,
Provider: StateProviderFactory
+ CanonStateSubscriptions<Primitives = N>
+ BlockReaderIdExt<
Header = HeaderTy<N>,
Block = BlockTy<N>,
Transaction = N::SignedTx,
Receipt = ReceiptTy<N>,
> + Unpin,
> + Unpin
+ Clone
+ 'static,
{
type Item = eyre::Result<Option<PendingBlock<N>>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

// consume new flashblocks while they're ready
while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) {
match result {
Ok(flashblock) => match this.blocks.insert(flashblock) {
Ok(_) => this.rebuild = true,
Err(err) => debug!(%err, "Failed to prepare flashblock"),
},
Err(err) => return Poll::Ready(Some(Err(err))),
loop {
// drive pending build job to completion
let result = match this.job.as_mut() {
Some((now, rx)) => {
let result = ready!(rx.poll_unpin(cx));
result.ok().map(|res| (*now, res))
}
None => None,
};
// reset job
this.job.take();

if let Some((now, result)) = result {
match result {
Ok(Some((new_pending, cached_reads))) => {
// built a new pending block
this.current = Some(new_pending.clone());
// cache reads
this.cached_state = Some((new_pending.parent_hash(), cached_reads));
this.rebuild = false;

trace!(
parent_hash = %new_pending.block().parent_hash(),
block_number = new_pending.block().number(),
flash_blocks = this.blocks.count(),
elapsed = ?now.elapsed(),
"Built new block with flashblocks"
);

return Poll::Ready(Some(Ok(Some(new_pending))));
}
Ok(None) => {
// nothing to do because tracked flashblock doesn't attach to latest
}
Err(err) => {
// we can ignore this error
debug!(%err, "failed to execute flashblock");
}
}
}
}

if let Poll::Ready(Ok(state)) = {
let fut = this.canon_receiver.recv();
pin!(fut);
fut.poll_unpin(cx)
} {
if let Some(current) = this.on_new_tip(state) {
trace!(
parent_hash = %current.block().parent_hash(),
block_number = current.block().number(),
"Clearing current flashblock on new canonical block"
);

return Poll::Ready(Some(Ok(None)))
// consume new flashblocks while they're ready
while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) {
match result {
Ok(flashblock) => match this.blocks.insert(flashblock) {
Ok(_) => this.rebuild = true,
Err(err) => debug!(%err, "Failed to prepare flashblock"),
},
Err(err) => return Poll::Ready(Some(Err(err))),
}
}
}

if !this.rebuild && this.current.is_some() {
return Poll::Pending
}

let now = Instant::now();
// try to build a block on top of latest
match this.execute() {
Ok(Some(new_pending)) => {
// built a new pending block
this.current = Some(new_pending.clone());
this.rebuild = false;
trace!(parent_hash=%new_pending.block().parent_hash(), block_number=new_pending.block().number(), flash_blocks=this.blocks.count(), elapsed=?now.elapsed(), "Built new block with flashblocks");
return Poll::Ready(Some(Ok(Some(new_pending))));
// update on new head block
if let Poll::Ready(Ok(state)) = {
let fut = this.canon_receiver.recv();
pin!(fut);
fut.poll_unpin(cx)
} {
if let Some(current) = this.on_new_tip(state) {
trace!(
parent_hash = %current.block().parent_hash(),
block_number = current.block().number(),
"Clearing current flashblock on new canonical block"
);

return Poll::Ready(Some(Ok(None)))
}
}
Ok(None) => {
// nothing to do because tracked flashblock doesn't attach to latest

if !this.rebuild && this.current.is_some() {
return Poll::Pending
}
Err(err) => {
// we can ignore this error
debug!(%err, "failed to execute flashblock");

// try to build a block on top of latest
if let Some(args) = this.build_args() {
let now = Instant::now();

let (tx, rx) = oneshot::channel();
let builder = this.builder.clone();

this.spawner.spawn_blocking(async move {
let _ = tx.send(builder.execute(args));
});
this.job.replace((now, rx));

// continue and poll the spawned job
continue
}
}

Poll::Pending
return Poll::Pending
}
}
}

type BuildJob<N> =
(Instant, oneshot::Receiver<eyre::Result<Option<(PendingBlock<N>, CachedReads)>>>);
Loading