diff --git a/crates/delegator/src/api.rs b/crates/delegator/src/api.rs index 17b008a..bb4124c 100644 --- a/crates/delegator/src/api.rs +++ b/crates/delegator/src/api.rs @@ -60,6 +60,7 @@ pub struct JobEventsRequest { #[derive(Debug, Serialize)] #[serde(tag = "type", content = "data")] pub enum JobEventsResponse { + Propagated, BidReceived(String), Delegated(String), Finished(Vec), @@ -70,7 +71,7 @@ pub async fn job_events_handler( Query(input): Query, ) -> Sse>> { let stream = stream! { - let job_key = kad::RecordKey::new( + let job_key = kad::RecordKey::new( &hex::decode(input.job_key) .map_err(|e| io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()))? ); @@ -81,6 +82,7 @@ pub async fn job_events_handler( yield Event::default() .json_data( match event { + DelegatorEvent::Propagated => { JobEventsResponse::Propagated }, DelegatorEvent::BidReceived(peer_id) => { JobEventsResponse::BidReceived(peer_id.to_base58()) }, DelegatorEvent::Delegated(peer_id) => { JobEventsResponse::Delegated(peer_id.to_base58()) }, DelegatorEvent::Finished(data) => { JobEventsResponse::Finished(data) }, diff --git a/crates/delegator/src/delegator.rs b/crates/delegator/src/delegator.rs index cc081f6..d32e978 100644 --- a/crates/delegator/src/delegator.rs +++ b/crates/delegator/src/delegator.rs @@ -10,13 +10,13 @@ use thiserror::Error; use tokio::sync::{broadcast, mpsc}; use tokio::{sync::mpsc::Sender, task::JoinHandle}; use tokio_stream::StreamExt; -use tracing::{error, info}; +use tracing::{error, info, warn}; use zetina_common::graceful_shutdown::shutdown_signal; use zetina_common::hash; use zetina_common::job::{Job, JobBid, JobData}; use zetina_common::job_witness::JobWitness; use zetina_common::process::Process; -use zetina_peer::swarm::{ +use zetina_peer::{ DelegationMessage, GossipsubMessage, KademliaMessage, MarketMessage, PeerBehaviourEvent, Topic, }; @@ -91,7 +91,8 @@ impl Delegator { info!("Propagated job: {} for bidding", hex::encode(&key)); let (process, bid_tx) = BidQueue::run(key.to_owned()); job_bid_scheduler.push(process); - job_hash_store.insert(key, bid_tx); + job_hash_store.insert(key.to_owned(), bid_tx); + events_tx.send((key, DelegatorEvent::Propagated))?; }, kad::QueryResult::GetRecord(Ok( kad::GetRecordOk::FoundRecord(kad::PeerRecord { @@ -111,16 +112,34 @@ impl Delegator { _ => {} } } - Some(Ok((job_key, bids))) = job_bid_scheduler.next() => { - let bid = bids.first_key_value().unwrap(); - let price = *bid.0; - let identity = *bid.1.first().unwrap(); - info!("Job {} delegated to best bidder: {}", hex::encode(&job_key), identity); - gossipsub_tx.send(GossipsubMessage { - topic: Topic::Delegation.into(), - data: serde_json::to_vec(&DelegationMessage::Delegate(JobBid{identity, job_key: job_key.to_owned(), price}))? - }).await?; - events_tx.send((job_key, DelegatorEvent::Delegated(identity)))?; + Some(Ok((job_key, mut bids))) = job_bid_scheduler.next() => { + if let Some((price, identities)) = bids.pop_first() { + if identities.is_empty() { + warn!("Job {} did not receive any bids", hex::encode(&job_key)); + } else { + for identity in identities { + let result = Box::pin(async { + gossipsub_tx.send(GossipsubMessage { + topic: Topic::Delegation.into(), + data: serde_json::to_vec(&DelegationMessage::Delegate(JobBid { + identity, + job_key: job_key.clone(), + price, + }))?, + }).await?; + + events_tx.send((job_key.clone(), DelegatorEvent::Delegated(identity)))?; + info!("Job {} delegated to best bidder: {}", hex::encode(&job_key), &identity); + Ok::<(), Error>(()) + }).await; + + match result { + Ok(_) => break, // Break after successful delegation + Err(err) => error!(?err, "Failed to delegate job {}", hex::encode(&job_key)), + } + } + } + } } _ = shutdown_signal() => { break @@ -147,6 +166,7 @@ impl Drop for Delegator { #[derive(Debug, Clone)] pub enum DelegatorEvent { + Propagated, BidReceived(PeerId), Delegated(PeerId), Finished(Vec), diff --git a/crates/delegator/src/main.rs b/crates/delegator/src/main.rs index fe275f5..314b579 100644 --- a/crates/delegator/src/main.rs +++ b/crates/delegator/src/main.rs @@ -24,7 +24,7 @@ use tower_http::{ }; use tracing_subscriber::EnvFilter; use zetina_common::{graceful_shutdown::shutdown_signal, job::JobData}; -use zetina_peer::swarm::{GossipsubMessage, KademliaMessage, SwarmRunner}; +use zetina_peer::{GossipsubMessage, KademliaMessage, SwarmRunner}; #[derive(Parser)] struct Cli { diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs index 31fa25e..f1a6fd8 100644 --- a/crates/executor/src/executor.rs +++ b/crates/executor/src/executor.rs @@ -14,7 +14,7 @@ use zetina_common::{ graceful_shutdown::shutdown_signal, job::JobBid, job_trace::JobTrace, job_witness::JobWitness, process::Process, }; -use zetina_peer::swarm::{ +use zetina_peer::{ DelegationMessage, GossipsubMessage, KademliaMessage, MarketMessage, PeerBehaviourEvent, Topic, }; use zetina_prover::{ diff --git a/crates/executor/src/main.rs b/crates/executor/src/main.rs index 8fc35c2..2700095 100644 --- a/crates/executor/src/main.rs +++ b/crates/executor/src/main.rs @@ -11,7 +11,7 @@ use tokio::{net::TcpListener, sync::mpsc}; use tower_http::{timeout::TimeoutLayer, trace::TraceLayer}; use tracing_subscriber::EnvFilter; use zetina_common::graceful_shutdown::shutdown_signal; -use zetina_peer::swarm::{GossipsubMessage, KademliaMessage, SwarmRunner}; +use zetina_peer::{GossipsubMessage, KademliaMessage, SwarmRunner}; use zetina_prover::stone_prover::StoneProver; use zetina_runner::cairo_runner::CairoRunner; diff --git a/crates/peer/src/lib.rs b/crates/peer/src/lib.rs index 899c10d..743c199 100644 --- a/crates/peer/src/lib.rs +++ b/crates/peer/src/lib.rs @@ -1 +1,323 @@ -pub mod swarm; +use async_stream::stream; +use futures::stream::Stream; +use libp2p::futures::StreamExt; +use libp2p::gossipsub::{self, IdentTopic, PublishError, TopicHash}; +use libp2p::identity::Keypair; +use libp2p::kad::store::{MemoryStore, MemoryStoreConfig}; +use libp2p::kad::{Config, Mode}; +use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent}; +use libp2p::{kad, noise, tcp, yamux, Multiaddr, Swarm, SwarmBuilder}; +use serde::{Deserialize, Serialize}; +use std::pin::Pin; +use std::time::Duration; +use tokio::sync::mpsc; +use tracing::{debug, error, info}; +use zetina_common::graceful_shutdown::shutdown_signal; +use zetina_common::job::{Job, JobBid}; + +#[derive(NetworkBehaviour)] +pub struct PeerBehaviour { + gossipsub: gossipsub::Behaviour, + kademlia: kad::Behaviour, +} + +pub struct SwarmRunner { + pub swarm: Swarm, + pub listen_multiaddr: Multiaddr, + pub dial_multiaddrs: Vec, + pub p2p_keypair: Keypair, + pub p2p_multiaddr: Multiaddr, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Topic { + Networking, + Market, + Delegation, +} + +impl Topic { + pub fn as_str(&self) -> &'static str { + match self { + Topic::Networking => "networking", + Topic::Market => "market", + Topic::Delegation => "delegation", + } + } +} + +impl From for TopicHash { + fn from(value: Topic) -> Self { + IdentTopic::from(value).into() + } +} + +impl From for IdentTopic { + fn from(value: Topic) -> Self { + IdentTopic::new(value.as_str()) + } +} + +#[derive(Debug)] +pub struct GossipsubMessage { + pub topic: IdentTopic, + pub data: Vec, +} + +#[derive(Debug)] +pub enum KademliaMessage { + GET(kad::RecordKey), + PUT((kad::RecordKey, Vec)), +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum NetworkingMessage { + Multiaddr(Multiaddr), +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum MarketMessage { + Job(Job), + JobBidPropagation(kad::RecordKey), + JobBid(JobBid), +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum DelegationMessage { + Delegate(JobBid), + Finished(kad::RecordKey, kad::RecordKey), +} + +impl SwarmRunner { + pub fn new( + listen_multiaddr: Multiaddr, + dial_multiaddrs: Vec, + p2p_keypair: Keypair, + p2p_multiaddr: Multiaddr, + ) -> Result> { + let mut config = Config::default(); + config.set_max_packet_size(1024 * 1024 * 100); + config.set_query_timeout(Duration::from_secs(60)); + let mut swarm = SwarmBuilder::with_existing_identity(p2p_keypair.to_owned()) + .with_tokio() + .with_tcp( + tcp::Config::default().port_reuse(true), + noise::Config::new, + yamux::Config::default, + )? + .with_quic() + .with_behaviour(|p2p_keypair| PeerBehaviour { + kademlia: kad::Behaviour::with_config( + p2p_keypair.public().to_peer_id(), + MemoryStore::with_config( + p2p_keypair.public().to_peer_id(), + MemoryStoreConfig { + max_value_bytes: 1024 * 1024 * 100, + ..Default::default() + }, + ), + config, + ), + gossipsub: Self::init_gossip(p2p_keypair).unwrap(), + })? + .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) + .build(); + + swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Networking.as_str()))?; + swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Market.as_str()))?; + swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Delegation.as_str()))?; + swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server)); + // swarm.listen_on("/ip4/0.0.0.0/udp/5678/quic-v1".parse()?)?; + swarm.listen_on(listen_multiaddr.to_owned())?; + + dial_multiaddrs.iter().try_for_each(|addr| swarm.dial(addr.clone()))?; + + Ok(SwarmRunner { swarm, listen_multiaddr, dial_multiaddrs, p2p_keypair, p2p_multiaddr }) + } + + fn init_gossip( + p2p_local_keypair: &Keypair, + ) -> Result> { + let message_authenticity = + gossipsub::MessageAuthenticity::Signed(p2p_local_keypair.clone()); + + let config = gossipsub::ConfigBuilder::default() + .heartbeat_interval(Duration::from_secs(10)) + .validation_mode(gossipsub::ValidationMode::Strict) + .max_transmit_size(usize::MAX) + .build()?; + + Ok(gossipsub::Behaviour::new(message_authenticity, config)?) + } + + pub fn run( + mut self, + mut gossipsub_message: mpsc::Receiver, + mut kademlia_message: mpsc::Receiver, + ) -> Pin + Send>> { + let stream = stream! { + loop { + tokio::select! { + Some(message) = gossipsub_message.recv() => { + debug!{"Sending gossipsub_message: topic {}, data {}", message.topic, hex::encode(&message.data)}; + if let Err(e) = self.swarm + .behaviour_mut() + .gossipsub + .publish(message.topic, message.data) + { + error!("Gossipsub error: {e:?}"); + } + }, + Some(message) = kademlia_message.recv() => { + debug!{"Sending kademlia_message: {:?}", message}; + match message { + KademliaMessage::GET(key) => { + self.swarm.behaviour_mut().kademlia.get_record(kad::RecordKey::new(&key)); + }, + KademliaMessage::PUT((key, data)) => { + let record = kad::Record { + key: kad::RecordKey::new(&key), + value: data, + publisher: None, + expires: None, + }; + if let Err(e) = self.swarm.behaviour_mut().kademlia.put_record(record, kad::Quorum::One) { + error!("Kademlia error: {e:?}"); + } + }, + } + }, + event = self.swarm.select_next_some() => match event { + SwarmEvent::Behaviour(PeerBehaviourEvent::Gossipsub(gossipsub::Event::Subscribed { + peer_id, topic + })) => { + if topic == Topic::Networking.into() { + let mut publish = || { + self.swarm.behaviour_mut().gossipsub.publish( + Topic::Networking, + serde_json::to_vec(&NetworkingMessage::Multiaddr(self.p2p_multiaddr.to_owned()))? + )?; + Ok::<(), Error>(()) + }; + + if let Err(error) = publish() { + error!("Dial error: {:?}", error); + } + } + + yield PeerBehaviourEvent::Gossipsub(gossipsub::Event::Subscribed { + peer_id, topic + }); + }, + SwarmEvent::Behaviour(PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { + propagation_source, + message_id, + message, + })) => { + if message.topic == Topic::Networking.into() { + match serde_json::from_slice::(&message.data) { + Ok(NetworkingMessage::Multiaddr(addr)) => { + if let Err(error) = self.swarm.dial(addr) { + error!{"Dial error: {:?}", error}; + } + } + Err(error) => { + error!{"Deserialization error: {:?}", error}; + } + } + } + + yield PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { + propagation_source, + message_id, + message, + }); + } + SwarmEvent::ConnectionEstablished { peer_id, connection_id, num_established, endpoint, .. } => { + info!{"Connection established: peer_id {}, connection_id {}, num_established {}", peer_id, connection_id, num_established}; + self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); + self.swarm.behaviour_mut().kademlia.add_address(&peer_id, endpoint.get_remote_address().to_owned()); + } + SwarmEvent::ConnectionClosed { peer_id, connection_id, num_established, endpoint, .. } => { + info!{"Connection closed: peer_id {}, connection_id {}, num_established {}", peer_id, connection_id, num_established}; + if num_established == 0 { + self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id); + self.swarm.behaviour_mut().kademlia.remove_address(&peer_id, endpoint.get_remote_address()); + if let Err(err) = self.swarm.dial(endpoint.get_remote_address().to_owned()) { + error!("Failed to re-dial peer: {err:?}"); + } + } + } + SwarmEvent::Behaviour(PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { id, result, stats, step })) => { + match result { + kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders { key, providers, .. })) => { + for peer in providers { + info!("Peer {peer:?} provides key {}", hex::encode(&key)); + } + } + kad::QueryResult::GetProviders(Err(err)) => { + error!("Failed to get providers: {err:?}"); + } + kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(kad::PeerRecord {peer, record}))) => { + info!("Successfully got record {}", hex::encode(&record.key)); + + yield PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { id, + result: kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(kad::PeerRecord {peer, record}))), + stats, step }) + } + kad::QueryResult::GetRecord(Ok(_)) => {} + kad::QueryResult::GetRecord(Err(err)) => { + error!("Failed to get record: {err:?}"); + } + kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => { + info!("Successfully put record {}", hex::encode(&key)); + + yield PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { id, + result: kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })), + stats, step }) + } + kad::QueryResult::PutRecord(Err(err)) => { + error!("Failed to put record: {err:?}"); + } + kad::QueryResult::StartProviding(Ok(kad::AddProviderOk { key })) => { + info!("Successfully put provider record {}", hex::encode(&key)); + } + kad::QueryResult::StartProviding(Err(err)) => { + error!("Failed to put provider record: {err:?}"); + } + event => { + debug!("Unhandled event: {:?}", event); + } + } + } + SwarmEvent::Behaviour(event) => { + yield event; + } + event => { + debug!("Unhandled event: {:?}", event); + } + }, + _ = shutdown_signal() => { + break + } + else => break + } + } + }; + Box::pin(stream) + } +} + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Serde error")] + Serde(#[from] serde_json::Error), + + #[error("Dial error")] + Dial(#[from] DialError), + + #[error("Publish error")] + Publish(#[from] PublishError), +} diff --git a/crates/peer/src/swarm.rs b/crates/peer/src/swarm.rs deleted file mode 100644 index c4b7272..0000000 --- a/crates/peer/src/swarm.rs +++ /dev/null @@ -1,313 +0,0 @@ -use async_stream::stream; -use futures::stream::Stream; -use libp2p::futures::StreamExt; -use libp2p::gossipsub::{self, IdentTopic, TopicHash}; -use libp2p::identity::Keypair; -use libp2p::kad::store::{MemoryStore, MemoryStoreConfig}; -use libp2p::kad::{Config, Mode}; -use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent}; -use libp2p::{kad, noise, tcp, yamux, Multiaddr, Swarm, SwarmBuilder}; -use serde::{Deserialize, Serialize}; -use std::pin::Pin; -use std::time::Duration; -use tokio::sync::mpsc; -use tracing::{debug, error, info}; -use zetina_common::graceful_shutdown::shutdown_signal; -use zetina_common::job::{Job, JobBid}; - -#[derive(NetworkBehaviour)] -pub struct PeerBehaviour { - gossipsub: gossipsub::Behaviour, - kademlia: kad::Behaviour, -} - -pub struct SwarmRunner { - pub swarm: Swarm, - pub listen_multiaddr: Multiaddr, - pub dial_multiaddrs: Vec, - pub p2p_keypair: Keypair, - pub p2p_multiaddr: Multiaddr, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Topic { - Networking, - Market, - Delegation, -} - -impl Topic { - pub fn as_str(&self) -> &'static str { - match self { - Topic::Networking => "networking", - Topic::Market => "market", - Topic::Delegation => "delegation", - } - } -} - -impl From for TopicHash { - fn from(value: Topic) -> Self { - IdentTopic::from(value).into() - } -} - -impl From for IdentTopic { - fn from(value: Topic) -> Self { - IdentTopic::new(value.as_str()) - } -} - -#[derive(Debug)] -pub struct GossipsubMessage { - pub topic: IdentTopic, - pub data: Vec, -} - -#[derive(Debug)] -pub enum KademliaMessage { - GET(kad::RecordKey), - PUT((kad::RecordKey, Vec)), -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum NetworkingMessage { - Multiaddr(Multiaddr), -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum MarketMessage { - Job(Job), - JobBidPropagation(kad::RecordKey), - JobBid(JobBid), -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum DelegationMessage { - Delegate(JobBid), - Finished(kad::RecordKey, kad::RecordKey), -} - -impl SwarmRunner { - pub fn new( - listen_multiaddr: Multiaddr, - dial_multiaddrs: Vec, - p2p_keypair: Keypair, - p2p_multiaddr: Multiaddr, - ) -> Result> { - let mut config = Config::default(); - config.set_max_packet_size(1024 * 1024 * 100); - config.set_query_timeout(Duration::from_secs(60)); - let mut swarm = SwarmBuilder::with_existing_identity(p2p_keypair.to_owned()) - .with_tokio() - .with_tcp( - tcp::Config::default().port_reuse(true), - noise::Config::new, - yamux::Config::default, - )? - .with_quic() - .with_behaviour(|p2p_keypair| PeerBehaviour { - kademlia: kad::Behaviour::with_config( - p2p_keypair.public().to_peer_id(), - MemoryStore::with_config( - p2p_keypair.public().to_peer_id(), - MemoryStoreConfig { - max_value_bytes: 1024 * 1024 * 100, - ..Default::default() - }, - ), - config, - ), - gossipsub: Self::init_gossip(p2p_keypair).unwrap(), - })? - .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) - .build(); - - swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Networking.as_str()))?; - swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Market.as_str()))?; - swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Delegation.as_str()))?; - swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server)); - // swarm.listen_on("/ip4/0.0.0.0/udp/5678/quic-v1".parse()?)?; - swarm.listen_on(listen_multiaddr.to_owned())?; - - dial_multiaddrs.iter().try_for_each(|addr| swarm.dial(addr.clone()))?; - - Ok(SwarmRunner { swarm, listen_multiaddr, dial_multiaddrs, p2p_keypair, p2p_multiaddr }) - } - - fn init_gossip( - p2p_local_keypair: &Keypair, - ) -> Result> { - let message_authenticity = - gossipsub::MessageAuthenticity::Signed(p2p_local_keypair.clone()); - - let config = gossipsub::ConfigBuilder::default() - .heartbeat_interval(Duration::from_secs(10)) - .validation_mode(gossipsub::ValidationMode::Strict) - .max_transmit_size(usize::MAX) - .build()?; - - Ok(gossipsub::Behaviour::new(message_authenticity, config)?) - } - - pub fn run( - mut self, - mut gossipsub_message: mpsc::Receiver, - mut kademlia_message: mpsc::Receiver, - ) -> Pin + Send>> { - let stream = stream! { - loop { - tokio::select! { - Some(message) = gossipsub_message.recv() => { - debug!{"Sending gossipsub_message: topic {}, data {}", message.topic, hex::encode(&message.data)}; - if let Err(e) = self.swarm - .behaviour_mut() - .gossipsub - .publish(message.topic, message.data) - { - error!("Gossipsub error: {e:?}"); - } - }, - Some(message) = kademlia_message.recv() => { - debug!{"Sending kademlia_message: {:?}", message}; - match message { - KademliaMessage::GET(key) => { - self.swarm.behaviour_mut().kademlia.get_record(kad::RecordKey::new(&key)); - }, - KademliaMessage::PUT((key, data)) => { - let record = kad::Record { - key: kad::RecordKey::new(&key), - value: data, - publisher: None, - expires: None, - }; - if let Err(e) = self.swarm.behaviour_mut().kademlia.put_record(record, kad::Quorum::One) { - error!("Kademlia error: {e:?}"); - } - }, - } - }, - event = self.swarm.select_next_some() => match event { - SwarmEvent::Behaviour(PeerBehaviourEvent::Gossipsub(gossipsub::Event::Subscribed { - peer_id, topic - })) => { - if topic == Topic::Networking.into() { - self.swarm.behaviour_mut().gossipsub.publish( - Topic::Networking, - serde_json::to_vec(&NetworkingMessage::Multiaddr(self.p2p_multiaddr.to_owned())).unwrap() - ).unwrap(); - } - - yield PeerBehaviourEvent::Gossipsub(gossipsub::Event::Subscribed { - peer_id, topic - }); - }, - SwarmEvent::Behaviour(PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { - propagation_source, - message_id, - message, - })) => { - if message.topic == Topic::Networking.into() { - match serde_json::from_slice::(&message.data) { - Ok(NetworkingMessage::Multiaddr(addr)) => { - if let Err(error) = self.swarm.dial(addr) { - error!{"Dial error: {:?}", error}; - } - } - Err(error) => { - error!{"Deserialization error: {:?}", error}; - } - } - } - - yield PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { - propagation_source, - message_id, - message, - }); - } - SwarmEvent::ConnectionEstablished { peer_id, connection_id, num_established, endpoint, .. } => { - info!{"Connection established: peer_id {}, connection_id {}, num_established {}", peer_id, connection_id, num_established}; - self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); - self.swarm.behaviour_mut().kademlia.add_address(&peer_id, endpoint.get_remote_address().to_owned()); - } - SwarmEvent::ConnectionClosed { peer_id, connection_id, num_established, endpoint, .. } => { - info!{"Connection closed: peer_id {}, connection_id {}, num_established {}", peer_id, connection_id, num_established}; - if num_established == 0 { - self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id); - self.swarm.behaviour_mut().kademlia.remove_address(&peer_id, endpoint.get_remote_address()); - if let Err(err) = self.swarm.dial(endpoint.get_remote_address().to_owned()) { - error!("Failed to re-dial peer: {err:?}"); - } - } - } - SwarmEvent::Behaviour(PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { id, result, stats, step })) => { - match result { - kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders { key, providers, .. })) => { - for peer in providers { - info!("Peer {peer:?} provides key {}", hex::encode(&key)); - } - } - kad::QueryResult::GetProviders(Err(err)) => { - error!("Failed to get providers: {err:?}"); - } - kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(kad::PeerRecord {peer, record}))) => { - info!("Successfully got record {}", hex::encode(&record.key)); - - yield PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { id, - result: kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(kad::PeerRecord {peer, record}))), - stats, step }) - } - kad::QueryResult::GetRecord(Ok(_)) => {} - kad::QueryResult::GetRecord(Err(err)) => { - error!("Failed to get record: {err:?}"); - } - kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => { - info!("Successfully put record {}", hex::encode(&key)); - - yield PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { id, - result: kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })), - stats, step }) - } - kad::QueryResult::PutRecord(Err(err)) => { - error!("Failed to put record: {err:?}"); - } - kad::QueryResult::StartProviding(Ok(kad::AddProviderOk { key })) => { - info!("Successfully put provider record {}", hex::encode(&key)); - } - kad::QueryResult::StartProviding(Err(err)) => { - error!("Failed to put provider record: {err:?}"); - } - event => { - debug!("Unhandled event: {:?}", event); - } - } - } - SwarmEvent::Behaviour(event) => { - yield event; - } - event => { - debug!("Unhandled event: {:?}", event); - } - }, - _ = shutdown_signal() => { - break - } - else => break - } - } - }; - Box::pin(stream) - } -} - -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum Error { - #[error("Serde error")] - Serde(#[from] serde_json::Error), - - #[error("Dial error")] - Dial(#[from] DialError), -} diff --git a/dashboard/src/app/api.ts b/dashboard/src/app/api.ts index afd2363..37eca0b 100644 --- a/dashboard/src/app/api.ts +++ b/dashboard/src/app/api.ts @@ -30,7 +30,8 @@ export const JobEventsResponse = z.object({ type: z .literal("Finished") .or(z.literal("Delegated")) - .or(z.literal("BidReceived")), + .or(z.literal("BidReceived")) + .or(z.literal("Propagated")), data: z.any(), }); export type JobEventsResponse = z.infer; diff --git a/dashboard/src/app/page.tsx b/dashboard/src/app/page.tsx index 110a3dc..d388351 100644 --- a/dashboard/src/app/page.tsx +++ b/dashboard/src/app/page.tsx @@ -25,7 +25,8 @@ import { WorkerMessage, WorkerResponse } from "@/utils/types"; import { matchCommitment, matchLayout } from "@/utils/loadModule"; const steps = [ - "Job propagated to network", + "Job sent", + "Job propagated", "Job bidding", "Job delegated", "Proof received", @@ -182,7 +183,7 @@ export default function Home() { const data: DelegateResponse = DelegateResponse.parse( await response.json(), ); - addLog(`Job ${data.job_key} sent to the p2p network`); + addLog(`Job ${data.job_key} sent to delegator`); setActiveStep(1); setIsProcessing(data.job_key); @@ -191,22 +192,28 @@ export default function Home() { `job_key=${data.job_key.toString()}`, async (event) => { let job_event = JobEventsResponse.parse(event); + if (job_event.type == "Propagated") { + addLog( + `Job ${data.job_key} propagated to network DHT and gossip topics`, + ); + setActiveStep(2); + } if (job_event.type == "BidReceived") { let peer_id = PeerId.parse(job_event.data); addLog( `Recived bid for job ${data.job_key} from peer ${peer_id}`, ); - setActiveStep(2); + setActiveStep(3); } if (job_event.type == "Delegated") { let peer_id = PeerId.parse(job_event.data); addLog(`Job ${data.job_key} delegated to peer ${peer_id}`); - setActiveStep(3); + setActiveStep(4); } if (job_event.type == "Finished") { let proof = Proof.parse(job_event.data); addLog(`Job ${data.job_key} proof received`); - setActiveStep(4); + setActiveStep(5); setDownloadBlob([ new Blob([new Uint8Array(proof)]), `${data.job_key}_proof.json`,