Skip to content

Commit

Permalink
Simulate latency/congestion due to TX propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
SupernaviX committed Oct 9, 2024
1 parent 449b594 commit d9b2463
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 7 deletions.
4 changes: 2 additions & 2 deletions sim-rs/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, fs, path::PathBuf};
use std::{collections::BTreeMap, fs, path::PathBuf, sync::Arc};

use anyhow::Result;
use serde::Serialize;
Expand Down Expand Up @@ -31,7 +31,7 @@ pub struct Block {
pub slot: u64,
pub publisher: NodeId,
pub conflicts: Vec<NodeId>,
pub transactions: Vec<Transaction>,
pub transactions: Vec<Arc<Transaction>>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
75 changes: 70 additions & 5 deletions sim-rs/src/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ pub struct Simulation {
next_slot: u64,
next_tx_id: u64,
event_queue: BinaryHeap<FutureEvent>,
unpublished_txs: VecDeque<Transaction>,
unpublished_txs: VecDeque<Arc<Transaction>>,
txs: BTreeMap<u64, Arc<Transaction>>,
}

impl Simulation {
Expand Down Expand Up @@ -71,6 +72,7 @@ impl Simulation {
next_tx_id: 0,
event_queue: BinaryHeap::new(),
unpublished_txs: VecDeque::new(),
txs: BTreeMap::new(),
};
sim.queue_event(SimulationEvent::NewSlot, Duration::ZERO);
sim.queue_event(SimulationEvent::NewTransaction, Duration::ZERO);
Expand All @@ -83,12 +85,22 @@ impl Simulation {
while let Some(event) = self.next_event().await {
match event {
SimulationEvent::NewSlot => self.run_slot_lottery(&tracker)?,
SimulationEvent::NewTransaction => self.generate_tx(&tracker),
SimulationEvent::NewTransaction => self.generate_tx(&tracker)?,
SimulationEvent::NetworkMessage { from, to, msg } => {
let Some(target) = self.nodes.get_mut(&to) else {
bail!("unrecognized message target {to}");
};
match msg {
SimulationMessage::AnnounceTx(id) => {
target.receive_announce_tx(from, id)?;
}
SimulationMessage::RequestTx(id) => {
let tx = self.txs.get(&id).expect("unexpected missing tx").clone();
target.receive_request_tx(from, tx)?;
}
SimulationMessage::Tx(tx) => {
target.receive_tx(from, tx)?;
}
SimulationMessage::RollForward(slot) => {
target.receive_roll_forward(from, slot)?;
}
Expand Down Expand Up @@ -196,23 +208,37 @@ impl Simulation {
Ok(())
}

fn generate_tx(&mut self, tracker: &EventTracker) {
fn generate_tx(&mut self, tracker: &EventTracker) -> Result<()> {
let id = self.next_tx_id;
let bytes = self
.config
.max_tx_size
.min(self.config.transaction_size_bytes.sample(&mut self.rng) as u64);
let tx = Transaction { id, bytes };
let tx = Arc::new(Transaction { id, bytes });

tracker.track_transaction(&tx);
self.unpublished_txs.push_back(tx);
self.unpublished_txs.push_back(tx.clone());
self.txs.insert(id, tx);

self.next_tx_id += 1;
let ms_until_tx = self.config.transaction_frequency_ms.sample(&mut self.rng) as u64;
self.queue_event(
SimulationEvent::NewTransaction,
Duration::from_millis(ms_until_tx),
);

// any node could be the first to see a transaction
let publisher_id = self.choose_random_node();
let publisher = self
.nodes
.get_mut(&publisher_id)
.expect("chose nonexistent node");
publisher.propagate_tx(id)
}

fn choose_random_node(&mut self) -> NodeId {
let index = self.rng.gen_range(0..self.nodes.len());
NodeId::from_usize(index)
}
}

Expand All @@ -225,6 +251,7 @@ struct Node {
peer_heads: BTreeMap<NodeId, u64>,
blocks_seen: BTreeSet<u64>,
blocks: BTreeMap<u64, Arc<Block>>,
txs_seen: BTreeSet<u64>,
}

impl Node {
Expand All @@ -250,9 +277,18 @@ impl Node {
peer_heads: BTreeMap::new(),
blocks_seen: BTreeSet::new(),
blocks: BTreeMap::new(),
txs_seen: BTreeSet::new(),
}
}

fn propagate_tx(&mut self, id: u64) -> Result<()> {
for peer in &self.peers {
self.msg_sink
.send_to(*peer, SimulationMessage::AnnounceTx(id))?;
}
Ok(())
}

fn publish_block(&mut self, block: Arc<Block>) -> Result<()> {
for peer in &self.peers {
if !self.peer_heads.get(peer).is_some_and(|&s| s >= block.slot) {
Expand All @@ -265,6 +301,29 @@ impl Node {
Ok(())
}

fn receive_announce_tx(&mut self, from: NodeId, id: u64) -> Result<()> {
if self.txs_seen.insert(id) {
self.msg_sink
.send_to(from, SimulationMessage::RequestTx(id))?;
}
Ok(())
}

fn receive_request_tx(&mut self, from: NodeId, tx: Arc<Transaction>) -> Result<()> {
self.msg_sink.send_to(from, SimulationMessage::Tx(tx))
}

fn receive_tx(&mut self, from: NodeId, tx: Arc<Transaction>) -> Result<()> {
for peer in &self.peers {
if *peer == from {
continue;
}
self.msg_sink
.send_to(*peer, SimulationMessage::AnnounceTx(tx.id))?;
}
Ok(())
}

fn receive_roll_forward(&mut self, from: NodeId, slot: u64) -> Result<()> {
if self.blocks_seen.insert(slot) {
self.msg_sink
Expand Down Expand Up @@ -353,6 +412,9 @@ enum SimulationEvent {

#[derive(Clone)]
enum SimulationMessage {
AnnounceTx(u64),
RequestTx(u64),
Tx(Arc<Transaction>),
RollForward(u64),
RequestBlock(u64),
Block(Arc<Block>),
Expand All @@ -361,6 +423,9 @@ enum SimulationMessage {
impl HasBytesSize for SimulationMessage {
fn bytes_size(&self) -> u64 {
match self {
Self::AnnounceTx(_) => 8,
Self::RequestTx(_) => 8,
Self::Tx(tx) => tx.bytes,
Self::RollForward(_) => 8,
Self::RequestBlock(_) => 8,
Self::Block(block) => block.transactions.iter().map(|t| t.bytes).sum(),
Expand Down

0 comments on commit d9b2463

Please sign in to comment.