Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions docs/P2P.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
┌─────────────────────────────────────────────────────────────────────┐
│ WAVS Operator Node │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Trigger │───▶│ Engine │───▶│ Submission Manager │ │
│ │ Manager │ │ (WASM exec) │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────┬───────────┘ │
│ │ │
│ ┌───────────▼───────────┐ │
│ │ Aggregator │ │
│ │ │ │
│ │ ┌─────────────────┐ │ │
│ │ │ Quorum Queue │ │ │
│ │ │ (per eventId) │ │ │
│ │ └────────┬────────┘ │ │
│ │ │ │ │
│ │ ┌────────▼────────┐ │ │
│ ┌──────────────────────────────────┐ │ │ P2P Network │ │ │
│ │ Other WAVS Nodes │◀───┼──│ mDNS/Kademlia │ │ │
│ │ (receive/send signatures) │───▶│ │ + GossipSub │ │ │
│ └──────────────────────────────────┘ │ └────────┬────────┘ │ │
│ │ │ │ │
│ │ ┌────────▼────────┐ │ │
│ │ │ Submit │ │ │
│ │ │ (on quorum) │ │ │
│ │ └────────┬────────┘ │ │
│ └───────────┼───────────┘ │
│ │ │
└──────────────────────────────────────────────────────┼──────────────┘
┌────────────────┐
│ Blockchain │
└────────────────┘
1 change: 1 addition & 0 deletions packages/layer-tests/layer-tests.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ wavs_concurrency = true
middleware_concurrency = true
grouping = false
evm_middleware_type = "poa" # "eigenlayer" or "poa"
p2p = "kademlia" # "mdns" (local) or "kademlia" (remote)
# Run all tests
mode = "all"
# mode = { "isolated" = [{ evm = "timer_aggregator_reorg" }] }
Expand Down
10 changes: 10 additions & 0 deletions packages/layer-tests/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ use utils::{config::ConfigExt, test_utils::middleware::evm::EvmMiddlewareType};

use crate::e2e::{AnyService, CosmosService, CrossChainService, EvmService, TestMatrix};

#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum TestP2pMode {
#[default]
Mdns,
Kademlia,
}

/// The fully parsed and validated config struct we use in the application
/// this is built up from the ConfigBuilder which can load from multiple sources (in order of preference):
///
Expand All @@ -19,6 +27,7 @@ pub struct TestConfig {
pub wavs_concurrency: bool,
pub grouping: bool,
pub middleware_type: EvmMiddlewareType,
pub p2p: TestP2pMode,
pub jaeger: Option<String>,
pub prometheus: Option<String>,
_log_levels: Vec<String>,
Expand Down Expand Up @@ -52,6 +61,7 @@ impl Default for TestConfig {
middleware_concurrency: false,
grouping: true,
middleware_type: EvmMiddlewareType::default(),
p2p: TestP2pMode::default(),
jaeger: None,
prometheus: None,
_data_dir: tempfile::tempdir().unwrap().keep(),
Expand Down
2 changes: 1 addition & 1 deletion packages/layer-tests/src/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub fn run(args: TestArgs, ctx: AppContext) {

let configs: Configs = config.into();

let handles = AppHandles::start(&ctx, &configs, metrics, configs.evm_middleware_type);
let handles = AppHandles::start(&ctx, &configs, metrics);
tracing::info!("Background processes started");

let clients = ctx
Expand Down
43 changes: 33 additions & 10 deletions packages/layer-tests/src/e2e/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use utils::{
use wavs::subsystems::aggregator::p2p::P2pConfig;
use wavs_types::{ChainConfigs, CosmosChainConfigBuilder, Credential, EvmChainConfigBuilder};

use crate::config::TestConfig;
use crate::config::{TestConfig, TestP2pMode};

use super::matrix::TestMatrix;

Expand All @@ -37,6 +37,7 @@ pub struct Configs {
pub wavs_concurrency: bool,
pub grouping: bool,
pub evm_middleware_type: EvmMiddlewareType,
pub p2p: TestP2pMode,
}

impl Configs {
Expand Down Expand Up @@ -146,7 +147,7 @@ impl TestMnemonics {
pub const MULTI_OPERATOR_COUNT: usize = 3;
/// Base port for WAVS HTTP servers
pub const WAVS_BASE_PORT: u32 = 8000;
/// Base port for P2P networking (separate from HTTP)
/// Base port for WAVS P2P servers
pub const P2P_BASE_PORT: u16 = 9000;

impl From<TestConfig> for Configs {
Expand Down Expand Up @@ -248,15 +249,36 @@ impl From<TestConfig> for Configs {
// Each operator gets a unique port
wavs_config.port = WAVS_BASE_PORT + operator_index as u32;

// Enable P2P for multi-operator tests with mDNS discovery
// Enable P2P for multi-operator tests
if num_operators > 1 {
wavs_config.p2p = P2pConfig::Local {
listen_port: P2P_BASE_PORT + operator_index as u16,
max_retry_duration_secs: None,
retry_interval_ms: None,
submission_ttl_secs: None,
max_catchup_submissions: None,
};
match test_config.p2p {
TestP2pMode::Kademlia => {
// Remote mode: Kademlia DHT discovery
// Operator 0 is the bootstrap server (empty bootstrap_nodes)
// Operators 1+ will have bootstrap_nodes set at runtime after operator 0 starts
wavs_config.p2p = P2pConfig::Remote {
listen_port: P2P_BASE_PORT + operator_index as u16,
bootstrap_nodes: vec![], // Set at runtime for operators 1+
max_retry_duration_secs: None,
retry_interval_ms: None,
submission_ttl_secs: None,
max_catchup_submissions: None,
cleanup_interval_secs: None,
kademlia_discovery_interval_secs: Some(2),
};
}
TestP2pMode::Mdns => {
// Local mode: mDNS discovery
wavs_config.p2p = P2pConfig::Local {
listen_port: P2P_BASE_PORT + operator_index as u16,
max_retry_duration_secs: None,
retry_interval_ms: None,
submission_ttl_secs: None,
max_catchup_submissions: None,
cleanup_interval_secs: None,
};
}
}
}

wavs_configs.push(wavs_config);
Expand Down Expand Up @@ -290,6 +312,7 @@ impl From<TestConfig> for Configs {
wavs_concurrency: test_config.wavs_concurrency,
grouping: test_config.grouping,
evm_middleware_type: test_config.middleware_type,
p2p: test_config.p2p,
}
}
}
172 changes: 139 additions & 33 deletions packages/layer-tests/src/e2e/handles.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod cosmos;
mod evm;

use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, time::Duration};

use cosmos::CosmosInstance;
use evm::EvmInstance;
Expand All @@ -10,12 +10,16 @@ use utils::{
telemetry::Metrics,
test_utils::middleware::{
cosmos::{CosmosMiddleware, CosmosMiddlewareKind},
evm::{EvmMiddleware, EvmMiddlewareType},
evm::EvmMiddleware,
},
};
use wavs::dispatcher::Dispatcher;
use wavs::subsystems::aggregator::p2p::P2pConfig;
use wavs_cli::clients::HttpClient;
use wavs_types::{ChainKey, ChainKeyNamespace};

use crate::config::TestP2pMode;

use super::config::Configs;

pub struct AppHandles {
Expand All @@ -30,12 +34,7 @@ pub struct AppHandles {
pub type CosmosMiddlewares = Arc<HashMap<ChainKey, CosmosMiddleware>>;

impl AppHandles {
pub fn start(
ctx: &AppContext,
configs: &Configs,
metrics: Metrics,
evm_middleware_type: EvmMiddlewareType,
) -> Self {
pub fn start(ctx: &AppContext, configs: &Configs, metrics: Metrics) -> Self {
let mut evm_chains = Vec::new();
let mut cosmos_chains = Vec::new();

Expand Down Expand Up @@ -72,36 +71,23 @@ impl AppHandles {
// Spawn one WAVS instance per operator
let mut wavs_handles = Vec::with_capacity(configs.num_operators());

for (operator_index, wavs_config) in configs.wavs_configs.iter().enumerate() {
// Each operator gets its own dispatcher and metrics
// Note: For now, we share the same metrics instance - in the future we may want
// to have separate metrics per operator
let dispatcher = Arc::new(Dispatcher::new(wavs_config, metrics.wavs.clone()).unwrap());

let wavs_handle = std::thread::spawn({
let dispatcher = dispatcher.clone();
let ctx = ctx.clone();
let config = wavs_config.clone();
let http_metrics = metrics.http.clone();

move || {
tracing::info!(
"Starting WAVS operator {} on port {}",
operator_index,
config.port
);
let health_status = wavs::health::SharedHealthStatus::new();
wavs::run_server(ctx, config, dispatcher, http_metrics, health_status);
}
});
// Check if we're using Remote P2P mode (Kademlia)

wavs_handles.push(wavs_handle);
if configs.p2p == TestP2pMode::Kademlia && configs.num_operators() > 1 {
// Remote mode: start operator 0 first, get bootstrap address, then start others
wavs_handles = Self::start_wavs_remote_mode(ctx, configs, &metrics);
} else {
// Local mode or single operator: start all at once
for (operator_index, wavs_config) in configs.wavs_configs.iter().enumerate() {
let handle = Self::spawn_wavs_operator(ctx, wavs_config, &metrics, operator_index);
wavs_handles.push(handle);
}
}

let evm_middleware = if evm_chains.is_empty() {
None
} else {
Some(EvmMiddleware::new(evm_middleware_type).unwrap())
Some(EvmMiddleware::new(configs.evm_middleware_type).unwrap())
};

Self {
Expand All @@ -118,7 +104,127 @@ impl AppHandles {
for handle in self.wavs_handles {
results.push(handle.join());
}

results
}

/// Spawn a single WAVS operator
fn spawn_wavs_operator(
ctx: &AppContext,
wavs_config: &wavs::config::Config,
metrics: &Metrics,
operator_index: usize,
) -> std::thread::JoinHandle<()> {
let dispatcher = Arc::new(Dispatcher::new(wavs_config, metrics.wavs.clone()).unwrap());

std::thread::spawn({
let dispatcher = dispatcher.clone();
let ctx = ctx.clone();
let config = wavs_config.clone();
let http_metrics = metrics.http.clone();

move || {
tracing::info!(
"Starting WAVS operator {} on port {}",
operator_index,
config.port
);
let health_status = wavs::health::SharedHealthStatus::new();
wavs::run_server(ctx, config, dispatcher, http_metrics, health_status);
}
})
}

/// Start WAVS operators in Remote P2P mode (Kademlia)
/// Operator 0 starts first as bootstrap server, others connect to it
fn start_wavs_remote_mode(
ctx: &AppContext,
configs: &Configs,
metrics: &Metrics,
) -> Vec<std::thread::JoinHandle<()>> {
let mut handles = Vec::with_capacity(configs.num_operators());

// Start operator 0 (bootstrap server)
let op0_config = &configs.wavs_configs[0];
tracing::info!("Starting operator 0 as bootstrap server");
handles.push(Self::spawn_wavs_operator(ctx, op0_config, metrics, 0));

// Wait for operator 0 to be ready and get its bootstrap address
let op0_url = format!("http://127.0.0.1:{}", op0_config.port);
let bootstrap_addr = ctx.rt.block_on(async {
let client = HttpClient::new(op0_url);

// Wait for the server to be ready
tokio::time::sleep(Duration::from_millis(500)).await;

// Poll until we get an address
for _ in 0..60 {
match client.get_p2p_status().await {
Ok(status) => {
// Prefer external_addresses, fall back to listen_addresses
let addr = status
.external_addresses
.first()
.or(status.listen_addresses.first())
.cloned();

if let Some(addr) = addr {
tracing::info!("Got bootstrap address from operator 0: {}", addr);
return Some(addr);
}
}
Err(e) => {
tracing::debug!("Waiting for operator 0 P2P status: {:?}", e);
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
tracing::error!("Failed to get bootstrap address from operator 0");
None
});

let bootstrap_addr =
bootstrap_addr.expect("Failed to get bootstrap address from operator 0");

// Start remaining operators with the bootstrap address
for (operator_index, wavs_config) in configs.wavs_configs.iter().enumerate().skip(1) {
// Clone and modify config to add bootstrap address
let mut config = wavs_config.clone();
if let P2pConfig::Remote {
listen_port,
bootstrap_nodes: _,
max_retry_duration_secs,
retry_interval_ms,
submission_ttl_secs,
max_catchup_submissions,
cleanup_interval_secs,
kademlia_discovery_interval_secs,
} = &config.p2p
{
config.p2p = P2pConfig::Remote {
listen_port: *listen_port,
bootstrap_nodes: vec![bootstrap_addr.clone()],
max_retry_duration_secs: *max_retry_duration_secs,
retry_interval_ms: *retry_interval_ms,
submission_ttl_secs: *submission_ttl_secs,
max_catchup_submissions: *max_catchup_submissions,
cleanup_interval_secs: *cleanup_interval_secs,
kademlia_discovery_interval_secs: *kademlia_discovery_interval_secs,
};
}

tracing::info!(
"Starting operator {} with bootstrap: {}",
operator_index,
bootstrap_addr
);
handles.push(Self::spawn_wavs_operator(
ctx,
&config,
metrics,
operator_index,
));
}

handles
}
}
1 change: 0 additions & 1 deletion packages/layer-tests/src/e2e/matrix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ pub enum EvmService {
TimerAggregator,
TimerAggregatorReorg,
GasPrice,
/// Multi-operator test that requires 2/3 quorum - expected to fail until P2P aggregation is implemented
MultiOperator,
}

Expand Down
2 changes: 1 addition & 1 deletion packages/layer-tests/src/e2e/test_definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub enum TestGroupId {
CosmosIntervalStartStop,
Backpressure,
AggregatorTimer,
P2pLocal,
P2p,
Other(usize),
}

Expand Down
Loading