diff --git a/sim-rs/src/events.rs b/sim-rs/src/events.rs index 3b86d832..9e27ffed 100644 --- a/sim-rs/src/events.rs +++ b/sim-rs/src/events.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, fs, path::PathBuf}; +use std::{collections::BTreeMap, fs, path::PathBuf, sync::Arc}; use anyhow::Result; use serde::Serialize; @@ -31,7 +31,7 @@ pub struct Block { pub slot: u64, pub publisher: NodeId, pub conflicts: Vec, - pub transactions: Vec, + pub transactions: Vec>, } #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/sim-rs/src/sim.rs b/sim-rs/src/sim.rs index bb4c9a95..7d33de45 100644 --- a/sim-rs/src/sim.rs +++ b/sim-rs/src/sim.rs @@ -30,7 +30,8 @@ pub struct Simulation { next_slot: u64, next_tx_id: u64, event_queue: BinaryHeap, - unpublished_txs: VecDeque, + unpublished_txs: VecDeque>, + txs: BTreeMap>, } impl Simulation { @@ -71,6 +72,7 @@ impl Simulation { next_tx_id: 0, event_queue: BinaryHeap::new(), unpublished_txs: VecDeque::new(), + txs: BTreeMap::new(), }; sim.queue_event(SimulationEvent::NewSlot, Duration::ZERO); sim.queue_event(SimulationEvent::NewTransaction, Duration::ZERO); @@ -83,12 +85,22 @@ impl Simulation { while let Some(event) = self.next_event().await { match event { SimulationEvent::NewSlot => self.run_slot_lottery(&tracker)?, - SimulationEvent::NewTransaction => self.generate_tx(&tracker), + SimulationEvent::NewTransaction => self.generate_tx(&tracker)?, SimulationEvent::NetworkMessage { from, to, msg } => { let Some(target) = self.nodes.get_mut(&to) else { bail!("unrecognized message target {to}"); }; match msg { + SimulationMessage::AnnounceTx(id) => { + target.receive_announce_tx(from, id)?; + } + SimulationMessage::RequestTx(id) => { + let tx = self.txs.get(&id).expect("unexpected missing tx").clone(); + target.receive_request_tx(from, tx)?; + } + SimulationMessage::Tx(tx) => { + target.receive_tx(from, tx)?; + } SimulationMessage::RollForward(slot) => { target.receive_roll_forward(from, slot)?; } @@ -196,16 +208,17 @@ impl Simulation { Ok(()) } - fn generate_tx(&mut self, tracker: &EventTracker) { + fn generate_tx(&mut self, tracker: &EventTracker) -> Result<()> { let id = self.next_tx_id; let bytes = self .config .max_tx_size .min(self.config.transaction_size_bytes.sample(&mut self.rng) as u64); - let tx = Transaction { id, bytes }; + let tx = Arc::new(Transaction { id, bytes }); tracker.track_transaction(&tx); - self.unpublished_txs.push_back(tx); + self.unpublished_txs.push_back(tx.clone()); + self.txs.insert(id, tx); self.next_tx_id += 1; let ms_until_tx = self.config.transaction_frequency_ms.sample(&mut self.rng) as u64; @@ -213,6 +226,19 @@ impl Simulation { SimulationEvent::NewTransaction, Duration::from_millis(ms_until_tx), ); + + // any node could be the first to see a transaction + let publisher_id = self.choose_random_node(); + let publisher = self + .nodes + .get_mut(&publisher_id) + .expect("chose nonexistent node"); + publisher.propagate_tx(id) + } + + fn choose_random_node(&mut self) -> NodeId { + let index = self.rng.gen_range(0..self.nodes.len()); + NodeId::from_usize(index) } } @@ -225,6 +251,7 @@ struct Node { peer_heads: BTreeMap, blocks_seen: BTreeSet, blocks: BTreeMap>, + txs_seen: BTreeSet, } impl Node { @@ -250,9 +277,18 @@ impl Node { peer_heads: BTreeMap::new(), blocks_seen: BTreeSet::new(), blocks: BTreeMap::new(), + txs_seen: BTreeSet::new(), } } + fn propagate_tx(&mut self, id: u64) -> Result<()> { + for peer in &self.peers { + self.msg_sink + .send_to(*peer, SimulationMessage::AnnounceTx(id))?; + } + Ok(()) + } + fn publish_block(&mut self, block: Arc) -> Result<()> { for peer in &self.peers { if !self.peer_heads.get(peer).is_some_and(|&s| s >= block.slot) { @@ -265,6 +301,29 @@ impl Node { Ok(()) } + fn receive_announce_tx(&mut self, from: NodeId, id: u64) -> Result<()> { + if self.txs_seen.insert(id) { + self.msg_sink + .send_to(from, SimulationMessage::RequestTx(id))?; + } + Ok(()) + } + + fn receive_request_tx(&mut self, from: NodeId, tx: Arc) -> Result<()> { + self.msg_sink.send_to(from, SimulationMessage::Tx(tx)) + } + + fn receive_tx(&mut self, from: NodeId, tx: Arc) -> Result<()> { + for peer in &self.peers { + if *peer == from { + continue; + } + self.msg_sink + .send_to(*peer, SimulationMessage::AnnounceTx(tx.id))?; + } + Ok(()) + } + fn receive_roll_forward(&mut self, from: NodeId, slot: u64) -> Result<()> { if self.blocks_seen.insert(slot) { self.msg_sink @@ -353,6 +412,9 @@ enum SimulationEvent { #[derive(Clone)] enum SimulationMessage { + AnnounceTx(u64), + RequestTx(u64), + Tx(Arc), RollForward(u64), RequestBlock(u64), Block(Arc), @@ -361,6 +423,9 @@ enum SimulationMessage { impl HasBytesSize for SimulationMessage { fn bytes_size(&self) -> u64 { match self { + Self::AnnounceTx(_) => 8, + Self::RequestTx(_) => 8, + Self::Tx(tx) => tx.bytes, Self::RollForward(_) => 8, Self::RequestBlock(_) => 8, Self::Block(block) => block.transactions.iter().map(|t| t.bytes).sum(),