diff --git a/crates/rollup-boost/src/cli.rs b/crates/rollup-boost/src/cli.rs index c4a082af..ca1a63df 100644 --- a/crates/rollup-boost/src/cli.rs +++ b/crates/rollup-boost/src/cli.rs @@ -12,13 +12,16 @@ use crate::{ get_version, init_metrics, probe::ProbeLayer, }; -use crate::{FlashblocksService, RpcClient}; +use crate::{FlashblocksService, RpcClient, ShadowBuilderArgs}; #[derive(Clone, Debug, clap::Args)] pub struct RollupBoostLibArgs { #[clap(flatten)] pub builder: BuilderArgs, + #[clap(flatten)] + pub shadow_builder: Option, + #[clap(flatten)] pub l2_client: L2ClientArgs, @@ -117,6 +120,12 @@ impl RollupBoostServiceArgs { let builder_client_args: ClientArgs = self.lib.builder.clone().into(); let builder_http_client = builder_client_args.new_http_client(PayloadSource::Builder)?; + let shadow_builder_client_args: Option = + self.lib.shadow_builder.clone().map(Into::into); + let shadow_builder_http_client = shadow_builder_client_args + .map(|client| client.new_http_client(PayloadSource::Builder)) + .transpose()?; + let (probe_layer, probes) = ProbeLayer::new(); let (health_handle, rpc_module) = if self.lib.flashblocks.flashblocks { @@ -148,8 +157,9 @@ impl RollupBoostServiceArgs { tower::ServiceBuilder::new() .layer(probe_layer) .layer(ProxyLayer::new( - l2_http_client.clone(), - builder_http_client.clone(), + l2_http_client, + builder_http_client, + shadow_builder_http_client, )); let server = Server::builder() diff --git a/crates/rollup-boost/src/client/rpc.rs b/crates/rollup-boost/src/client/rpc.rs index 511eaaab..1478a262 100644 --- a/crates/rollup-boost/src/client/rpc.rs +++ b/crates/rollup-boost/src/client/rpc.rs @@ -25,9 +25,10 @@ use op_alloy_rpc_types_engine::{ use opentelemetry::trace::SpanKind; use paste::paste; use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use tracing::{info, instrument}; +use tracing::{debug, info, instrument}; use super::auth::Auth; @@ -111,6 +112,8 @@ pub struct RpcClient { auth_rpc: Uri, /// The source of the payload payload_source: PayloadSource, + /// Optional shadow builder client for async mirroring (fire-and-forget) + shadow: Option>, } impl RpcClient { @@ -136,9 +139,18 @@ impl RpcClient { auth_client, auth_rpc, payload_source, + shadow: None, }) } + /// Set the shadow builder client for async mirroring. + /// When set, all Engine API calls will be asynchronously forwarded to the shadow builder + /// in a fire-and-forget manner. Errors from the shadow builder are logged but not propagated. + pub fn with_shadow(mut self, shadow: RpcClient) -> Self { + self.shadow = Some(Arc::new(shadow)); + self + } + #[instrument( skip_all, err, @@ -156,6 +168,21 @@ impl RpcClient { fork_choice_state: ForkchoiceState, payload_attributes: Option, ) -> ClientResult { + // Mirror to shadow builder (fire-and-forget) + if let Some(shadow) = &self.shadow { + let shadow_client = shadow.auth_client.clone(); + let fork_choice_state = fork_choice_state; + let payload_attributes = payload_attributes.clone(); + tokio::spawn(async move { + if let Err(e) = shadow_client + .fork_choice_updated_v3(fork_choice_state, payload_attributes) + .await + { + debug!(target: "shadow_builder", error = %e, "shadow fork_choice_updated_v3 failed"); + } + }); + } + info!("Sending fork_choice_updated_v3 to {}", self.payload_source); let res = self .auth_client @@ -195,6 +222,16 @@ impl RpcClient { &self, payload_id: PayloadId, ) -> ClientResult { + // Mirror to shadow builder (fire-and-forget) + if let Some(shadow) = &self.shadow { + let shadow_client = shadow.auth_client.clone(); + tokio::spawn(async move { + if let Err(e) = shadow_client.get_payload_v3(payload_id).await { + debug!(target: "shadow_builder", error = %e, "shadow get_payload_v3 failed"); + } + }); + } + tracing::Span::current().record("payload_id", payload_id.to_string()); info!("Sending get_payload_v3 to {}", self.payload_source); Ok(self @@ -221,6 +258,21 @@ impl RpcClient { versioned_hashes: Vec, parent_beacon_block_root: B256, ) -> ClientResult { + // Mirror to shadow builder (fire-and-forget) + if let Some(shadow) = &self.shadow { + let shadow_client = shadow.auth_client.clone(); + let payload = payload.clone(); + let versioned_hashes = versioned_hashes.clone(); + tokio::spawn(async move { + if let Err(e) = shadow_client + .new_payload_v3(payload, versioned_hashes, parent_beacon_block_root) + .await + { + debug!(target: "shadow_builder", error = %e, "shadow new_payload_v3 failed"); + } + }); + } + info!("Sending new_payload_v3 to {}", self.payload_source); let res = self @@ -250,6 +302,16 @@ impl RpcClient { &self, payload_id: PayloadId, ) -> ClientResult { + // Mirror to shadow builder (fire-and-forget) + if let Some(shadow) = &self.shadow { + let shadow_client = shadow.auth_client.clone(); + tokio::spawn(async move { + if let Err(e) = shadow_client.get_payload_v4(payload_id).await { + debug!(target: "shadow_builder", error = %e, "shadow get_payload_v4 failed"); + } + }); + } + info!("Sending get_payload_v4 to {}", self.payload_source); Ok(self .auth_client @@ -291,6 +353,27 @@ impl RpcClient { parent_beacon_block_root: B256, execution_requests: Vec, ) -> ClientResult { + // Mirror to shadow builder (fire-and-forget) + if let Some(shadow) = &self.shadow { + let shadow_client = shadow.auth_client.clone(); + let payload = payload.clone(); + let versioned_hashes = versioned_hashes.clone(); + let execution_requests = execution_requests.clone(); + tokio::spawn(async move { + if let Err(e) = shadow_client + .new_payload_v4( + payload, + versioned_hashes, + parent_beacon_block_root, + execution_requests, + ) + .await + { + debug!(target: "shadow_builder", error = %e, "shadow new_payload_v4 failed"); + } + }); + } + info!("Sending new_payload_v4 to {}", self.payload_source); let res = self @@ -467,7 +550,11 @@ macro_rules! define_client_args { }; } -define_client_args!((BuilderArgs, builder), (L2ClientArgs, l2)); +define_client_args!( + (BuilderArgs, builder), + (L2ClientArgs, l2), + (ShadowBuilderArgs, shadow_builder) +); #[cfg(test)] pub mod tests { @@ -561,4 +648,212 @@ pub mod tests { server.start(module) } + + mod shadow_tests { + use super::*; + use alloy_primitives::{B256, U256}; + use alloy_rpc_types_engine::{ + ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, ForkchoiceState, + ForkchoiceUpdated, PayloadId, PayloadStatus, PayloadStatusEnum, + }; + use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelopeV3; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; + + /// Spawn a mock Engine API server that tracks received requests + async fn spawn_mock_engine_server( + port: u16, + fcu_counter: Arc, + get_payload_counter: Arc, + new_payload_counter: Arc, + ) -> ServerHandle { + let secret = JwtSecret::from_hex(SECRET).unwrap(); + let addr = format!("127.0.0.1:{port}"); + let layer = AuthLayer::new(secret); + let middleware = tower::ServiceBuilder::new().layer(layer); + + let server = ServerBuilder::default() + .set_http_middleware(middleware) + .build(addr.parse::().unwrap()) + .await + .unwrap(); + + let mut module = RpcModule::new(()); + + // Register fork_choice_updated_v3 + let fcu_counter_clone = fcu_counter.clone(); + module + .register_method("engine_forkchoiceUpdatedV3", move |_params, _, _| { + fcu_counter_clone.fetch_add(1, Ordering::SeqCst); + Ok::<_, ErrorObjectOwned>(ForkchoiceUpdated::new(PayloadStatus::from_status( + PayloadStatusEnum::Valid, + ))) + }) + .unwrap(); + + // Register get_payload_v3 + let get_payload_counter_clone = get_payload_counter.clone(); + module + .register_method("engine_getPayloadV3", move |_params, _, _| { + get_payload_counter_clone.fetch_add(1, Ordering::SeqCst); + let response = r#"{"executionPayload":{"parentHash":"0xe927a1448525fb5d32cb50ee1408461a945ba6c39bd5cf5621407d500ecc8de9","feeRecipient":"0x0000000000000000000000000000000000000000","stateRoot":"0x10f8a0830000e8edef6d00cc727ff833f064b1950afd591ae41357f97e543119","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","prevRandao":"0xe0d8b4521a7da1582a713244ffb6a86aa1726932087386e2dc7973f43fc6cb24","blockNumber":"0x1","gasLimit":"0x2ffbd2","gasUsed":"0x0","timestamp":"0x1235","extraData":"0xd883010d00846765746888676f312e32312e30856c696e7578","baseFeePerGas":"0x342770c0","blockHash":"0x44d0fa5f2f73a938ebb96a2a21679eb8dea3e7b7dd8fd9f35aa756dda8bf0a8a","transactions":[],"withdrawals":[],"blobGasUsed":"0x0","excessBlobGas":"0x0"},"blockValue":"0x0","blobsBundle":{"commitments":[],"proofs":[],"blobs":[]},"shouldOverrideBuilder":false,"parentBeaconBlockRoot":"0xdead00000000000000000000000000000000000000000000000000000000beef"}"#; + let envelope: OpExecutionPayloadEnvelopeV3 = + serde_json::from_str(response).unwrap(); + Ok::<_, ErrorObjectOwned>(envelope) + }) + .unwrap(); + + // Register new_payload_v3 + let new_payload_counter_clone = new_payload_counter.clone(); + module + .register_method("engine_newPayloadV3", move |_params, _, _| { + new_payload_counter_clone.fetch_add(1, Ordering::SeqCst); + Ok::<_, ErrorObjectOwned>(PayloadStatus::from_status(PayloadStatusEnum::Valid)) + }) + .unwrap(); + + server.start(module) + } + + fn create_rpc_client(port: u16) -> RpcClient { + let secret = JwtSecret::from_hex(SECRET).unwrap(); + let uri = Uri::from_str(&format!("http://127.0.0.1:{port}")).unwrap(); + RpcClient::new(uri, secret, 5000, PayloadSource::Builder).unwrap() + } + + #[tokio::test] + async fn test_shadow_mirrors_engine_api_calls() { + // Setup primary and shadow servers + let primary_port = get_available_port(); + let shadow_port = get_available_port(); + + let primary_fcu = Arc::new(AtomicUsize::new(0)); + let primary_get = Arc::new(AtomicUsize::new(0)); + let primary_new = Arc::new(AtomicUsize::new(0)); + + let shadow_fcu = Arc::new(AtomicUsize::new(0)); + let shadow_get = Arc::new(AtomicUsize::new(0)); + let shadow_new = Arc::new(AtomicUsize::new(0)); + + let primary_server = spawn_mock_engine_server( + primary_port, + primary_fcu.clone(), + primary_get.clone(), + primary_new.clone(), + ) + .await; + + let shadow_server = spawn_mock_engine_server( + shadow_port, + shadow_fcu.clone(), + shadow_get.clone(), + shadow_new.clone(), + ) + .await; + + // Create primary client with shadow + let primary_client = create_rpc_client(primary_port); + let shadow_client = create_rpc_client(shadow_port); + let client = primary_client.with_shadow(shadow_client); + + // Test fork_choice_updated_v3 + let fcs = ForkchoiceState { + head_block_hash: B256::ZERO, + safe_block_hash: B256::ZERO, + finalized_block_hash: B256::ZERO, + }; + let result = client.fork_choice_updated_v3(fcs, None).await; + assert!(result.is_ok()); + + // Test get_payload_v3 + let payload_id = PayloadId::new([1, 2, 3, 4, 5, 6, 7, 8]); + let result = client.get_payload_v3(payload_id).await; + assert!(result.is_ok()); + + // Test new_payload_v3 + let payload = ExecutionPayloadV3 { + payload_inner: ExecutionPayloadV2 { + payload_inner: ExecutionPayloadV1 { + base_fee_per_gas: U256::from(7u64), + block_number: 1, + block_hash: B256::ZERO, + logs_bloom: Default::default(), + extra_data: Default::default(), + gas_limit: 0x1c9c380, + gas_used: 0, + timestamp: 0, + fee_recipient: Default::default(), + parent_hash: Default::default(), + prev_randao: Default::default(), + receipts_root: Default::default(), + state_root: Default::default(), + transactions: vec![], + }, + withdrawals: vec![], + }, + blob_gas_used: 0, + excess_blob_gas: 0, + }; + let result = client.new_payload_v3(payload, vec![], B256::ZERO).await; + assert!(result.is_ok()); + + // Wait for async shadow calls to complete + tokio::time::sleep(Duration::from_millis(100)).await; + + // Verify both primary and shadow received all calls + assert_eq!(primary_fcu.load(Ordering::SeqCst), 1); + assert_eq!(shadow_fcu.load(Ordering::SeqCst), 1); + assert_eq!(primary_get.load(Ordering::SeqCst), 1); + assert_eq!(shadow_get.load(Ordering::SeqCst), 1); + assert_eq!(primary_new.load(Ordering::SeqCst), 1); + assert_eq!(shadow_new.load(Ordering::SeqCst), 1); + + primary_server.stop().unwrap(); + shadow_server.stop().unwrap(); + } + + #[tokio::test] + async fn test_shadow_error_does_not_affect_primary() { + // Only start primary server - shadow server will be unavailable + let primary_port = get_available_port(); + let shadow_port = get_available_port(); // No server on this port + + let primary_fcu = Arc::new(AtomicUsize::new(0)); + let primary_get = Arc::new(AtomicUsize::new(0)); + let primary_new = Arc::new(AtomicUsize::new(0)); + + let primary_server = spawn_mock_engine_server( + primary_port, + primary_fcu.clone(), + primary_get.clone(), + primary_new.clone(), + ) + .await; + + // Create client with shadow pointing to non-existent server + let primary_client = create_rpc_client(primary_port); + let shadow_client = create_rpc_client(shadow_port); + let client = primary_client.with_shadow(shadow_client); + + // Make call - should succeed even though shadow will fail + let fcs = ForkchoiceState { + head_block_hash: B256::ZERO, + safe_block_hash: B256::ZERO, + finalized_block_hash: B256::ZERO, + }; + + let result = client.fork_choice_updated_v3(fcs, None).await; + assert!( + result.is_ok(), + "Primary call should succeed despite shadow failure" + ); + + tokio::time::sleep(Duration::from_millis(100)).await; + + // Verify primary received the call + assert_eq!(primary_fcu.load(Ordering::SeqCst), 1); + + primary_server.stop().unwrap(); + } + } } diff --git a/crates/rollup-boost/src/proxy.rs b/crates/rollup-boost/src/proxy.rs index fc1e3c35..7f244222 100644 --- a/crates/rollup-boost/src/proxy.rs +++ b/crates/rollup-boost/src/proxy.rs @@ -6,7 +6,7 @@ use jsonrpsee::server::HttpBody; use std::task::{Context, Poll}; use std::{future::Future, pin::Pin}; use tower::{Layer, Service}; -use tracing::info; +use tracing::{debug, info}; const ENGINE_METHOD: &str = "engine_"; @@ -24,13 +24,19 @@ const FORWARD_REQUESTS: [&str; 6] = [ pub struct ProxyLayer { l2_client: HttpClient, builder_client: HttpClient, + shadow_builder_client: Option, } impl ProxyLayer { - pub fn new(l2_client: HttpClient, builder_client: HttpClient) -> Self { + pub fn new( + l2_client: HttpClient, + builder_client: HttpClient, + shadow_builder_client: Option, + ) -> Self { ProxyLayer { l2_client, builder_client, + shadow_builder_client, } } } @@ -43,6 +49,7 @@ impl Layer for ProxyLayer { inner, l2_client: self.l2_client.clone(), builder_client: self.builder_client.clone(), + shadow_builder_client: self.shadow_builder_client.clone(), } } } @@ -52,6 +59,7 @@ pub struct ProxyService { inner: S, l2_client: HttpClient, builder_client: HttpClient, + shadow_builder_client: Option, } // Consider using `RpcServiceT` when https://github.com/paritytech/jsonrpsee/pull/1521 is merged @@ -108,11 +116,23 @@ where let method_clone = method.clone(); let buffered_clone = buffered.clone(); let mut builder_client = service.builder_client.clone(); - + debug!(target: "proxy::call", message = "forwarding request to builder", ?method); // Fire and forget the builder request tokio::spawn(async move { let _ = builder_client.forward(buffered_clone, method_clone).await; }); + // If the shadow builder is enabled, forward the request to the shadow builder + if let Some(shadow_builder_client) = service.shadow_builder_client.clone() { + let method_clone = method.clone(); + let buffered_clone = buffered.clone(); + let mut shadow_builder_client = shadow_builder_client.clone(); + debug!(target: "proxy::call", message = "proxying request to shadow builder", ?method); + tokio::spawn(async move { + let _ = shadow_builder_client + .forward(buffered_clone, method_clone) + .await; + }); + } } // Return the response from the L2 client @@ -196,6 +216,7 @@ mod tests { timeout: 1, } .new_http_client(PayloadSource::Builder)?, + None, )); let temp_listener = TcpListener::bind("127.0.0.1:0").await?; @@ -512,6 +533,7 @@ mod tests { } .new_http_client(PayloadSource::Builder) .unwrap(), + None, ); // Create a layered server @@ -825,6 +847,7 @@ mod tests { timeout: 200, } .new_http_client(PayloadSource::Builder)?, + None, ); // Start proxy server @@ -926,6 +949,7 @@ mod tests { timeout: 200, } .new_http_client(PayloadSource::Builder)?, + None, ); // Start proxy on dynamic port diff --git a/crates/rollup-boost/src/server.rs b/crates/rollup-boost/src/server.rs index ae7c7a56..5d22c1c5 100644 --- a/crates/rollup-boost/src/server.rs +++ b/crates/rollup-boost/src/server.rs @@ -83,7 +83,14 @@ impl RollupBoostServer { let builder_client_args: ClientArgs = rollup_boost_args.builder.into(); let l2_client = l2_client_args.new_rpc_client(PayloadSource::L2)?; - let builder_client = builder_client_args.new_rpc_client(PayloadSource::Builder)?; + let mut builder_client = builder_client_args.new_rpc_client(PayloadSource::Builder)?; + + // Add shadow builder if configured + if let Some(shadow_args) = rollup_boost_args.shadow_builder { + let shadow_client_args: ClientArgs = shadow_args.into(); + let shadow_client = shadow_client_args.new_rpc_client(PayloadSource::Builder)?; + builder_client = builder_client.with_shadow(shadow_client) + }; let flashblocks_args = rollup_boost_args.flashblocks; let inbound_url = flashblocks_args.flashblocks_builder_url; @@ -125,7 +132,14 @@ impl RollupBoostServer { let builder_client_args: ClientArgs = rollup_boost_args.builder.into(); let l2_client = l2_client_args.new_rpc_client(PayloadSource::L2)?; - let builder_client = builder_client_args.new_rpc_client(PayloadSource::Builder)?; + let mut builder_client = builder_client_args.new_rpc_client(PayloadSource::Builder)?; + + // Add shadow builder if configured + if let Some(shadow_args) = rollup_boost_args.shadow_builder { + let shadow_client_args: ClientArgs = shadow_args.into(); + let shadow_client = shadow_client_args.new_rpc_client(PayloadSource::Builder)?; + builder_client = builder_client.with_shadow(shadow_client) + }; Ok(RollupBoostServer::new( l2_client, @@ -909,7 +923,7 @@ pub mod tests { let http_middleware = tower::ServiceBuilder::new() .layer(probe_layer) - .layer(ProxyLayer::new(l2_http_client, builder_http_client)); + .layer(ProxyLayer::new(l2_http_client, builder_http_client, None)); let server = Server::builder() .set_http_middleware(http_middleware)