Skip to content

Commit

Permalink
Change payload fn signature to return Option
Browse files Browse the repository at this point in the history
..instead of panicking
  • Loading branch information
jbesraa committed Jul 8, 2024
1 parent a1a489d commit 6b54746
Show file tree
Hide file tree
Showing 24 changed files with 97 additions and 60 deletions.
27 changes: 15 additions & 12 deletions benches/benches/src/sv2/criterion_sv2_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ fn client_sv2_setup_connection_serialize_deserialize(c: &mut Criterion) {
let mut dst = vec![0; size];
let _serialized = frame.serialize(&mut dst);
b.iter(|| {
let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap();
let type_ = frame.get_header().unwrap().msg_type().clone();
let payload = frame.payload();
let _ = AnyMessage::try_from((type_, payload)).unwrap();
if let Ok(mut frame) = StdFrame::from_bytes(black_box(dst.clone().into())) {
let msg_type = frame.header().msg_type().clone();
let payload = frame.payload().unwrap();
let _ = AnyMessage::try_from((msg_type, payload)).unwrap();
}
});
});
}
Expand Down Expand Up @@ -94,10 +95,11 @@ fn client_sv2_open_channel_serialize_deserialize(c: &mut Criterion) {
let mut dst = vec![0; size];
frame.serialize(&mut dst);
b.iter(|| {
let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap();
let type_ = frame.get_header().unwrap().msg_type().clone();
let payload = frame.payload();
black_box(AnyMessage::try_from((type_, payload)).unwrap());
if let Ok(mut frame) = StdFrame::from_bytes(black_box(dst.clone().into())) {
let msg_type = frame.header().msg_type().clone();
let payload = frame.payload().unwrap();
black_box(AnyMessage::try_from((msg_type, payload)).unwrap());
}
});
});
}
Expand Down Expand Up @@ -150,10 +152,11 @@ fn client_sv2_mining_message_submit_standard_serialize_deserialize(c: &mut Crite
"client_sv2_mining_message_submit_standard_serialize_deserialize",
|b| {
b.iter(|| {
let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap();
let type_ = frame.get_header().unwrap().msg_type().clone();
let payload = frame.payload();
black_box(AnyMessage::try_from((type_, payload)).unwrap());
if let Ok(mut frame) = StdFrame::from_bytes(black_box(dst.clone().into())) {
let msg_type = frame.header().msg_type().clone();
let payload = frame.payload().unwrap();
black_box(AnyMessage::try_from((msg_type, payload)).unwrap());
}
});
},
);
Expand Down
6 changes: 3 additions & 3 deletions benches/benches/src/sv2/iai_sv2_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ fn client_sv2_setup_connection_serialize_deserialize() {
frame.serialize(&mut dst);
let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap();
let type_ = frame.get_header().unwrap().msg_type().clone();
let payload = frame.payload();
let payload = frame.payload().unwrap();
black_box(AnyMessage::try_from((type_, payload)));
}

Expand Down Expand Up @@ -78,7 +78,7 @@ fn client_sv2_open_channel_serialize_deserialize() {
frame.serialize(&mut dst);
let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap();
let type_ = frame.get_header().unwrap().msg_type().clone();
let payload = frame.payload();
let payload = frame.payload().unwrap();
black_box(AnyMessage::try_from((type_, payload)));
}

Expand Down Expand Up @@ -128,7 +128,7 @@ fn client_sv2_mining_message_submit_standard_serialize_deserialize() {
frame.serialize(&mut dst);
let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap();
let type_ = frame.get_header().unwrap().msg_type().clone();
let payload = frame.payload();
let payload = frame.payload().unwrap();
black_box(AnyMessage::try_from((type_, payload)));
}

Expand Down
2 changes: 1 addition & 1 deletion examples/interop-cpp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ mod main_ {
stream.read_exact(buffer).unwrap();
if let Ok(mut f) = decoder.next_frame() {
let msg_type = f.get_header().unwrap().msg_type();
let payload = f.payload();
let payload = f.payload().unwrap();
let message: Sv2Message = (msg_type, payload).try_into().unwrap();
match message {
Sv2Message::SetupConnection(_) => panic!(),
Expand Down
4 changes: 2 additions & 2 deletions examples/ping-pong-with-noise/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl Node {
) -> Message<'static> {
match self.expected {
Expected::Ping => {
let ping: Result<Ping, _> = from_bytes(frame.payload());
let ping: Result<Ping, _> = from_bytes(frame.payload().unwrap());
match ping {
Ok(ping) => {
println!("Node {} received:", self.name);
Expand All @@ -118,7 +118,7 @@ impl Node {
}
}
Expected::Pong => {
let pong: Result<Pong, _> = from_bytes(frame.payload());
let pong: Result<Pong, _> = from_bytes(frame.payload().unwrap());
match pong {
Ok(pong) => {
println!("Node {} received:", self.name);
Expand Down
4 changes: 2 additions & 2 deletions examples/ping-pong-without-noise/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Node {
) -> Message<'static> {
match self.expected {
Expected::Ping => {
let ping: Result<Ping, _> = from_bytes(frame.payload());
let ping: Result<Ping, _> = from_bytes(frame.payload().unwrap());
match ping {
Ok(ping) => {
println!("Node {} received:", self.name);
Expand All @@ -107,7 +107,7 @@ impl Node {
}
}
Expected::Pong => {
let pong: Result<Pong, _> = from_bytes(frame.payload());
let pong: Result<Pong, _> = from_bytes(frame.payload().unwrap());
match pong {
Ok(pong) => {
println!("Node {} received:", self.name);
Expand Down
6 changes: 3 additions & 3 deletions protocols/v2/framing-sv2/src/framing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ impl<T: Serialize + GetSize, B: AsMut<[u8]> + AsRef<[u8]>> Sv2Frame<T, B> {
/// This function is only intended as a fast way to get a reference to an
/// already serialized payload. If the frame has not yet been
/// serialized, this function should never be used (it will panic).
pub fn payload(&mut self) -> &mut [u8] {
pub fn payload(&mut self) -> Option<&mut [u8]> {
match self {
Sv2Frame::Raw { serialized, .. } => &mut serialized.as_mut()[Header::SIZE..],
Sv2Frame::Payload { .. } => panic!("fixme"),
Sv2Frame::Raw { serialized, .. } => Some(&mut serialized.as_mut()[Header::SIZE..]),
Sv2Frame::Payload { .. } => None,
}
}

Expand Down
27 changes: 15 additions & 12 deletions protocols/v2/sv2-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,10 @@ pub extern "C" fn next_frame(decoder: *mut DecoderWrapper) -> CResult<CSv2Messag
Some(header) => header.msg_type(),
None => return CResult::Err(Sv2Error::InvalidSv2Frame),
};
let payload = f.payload();
let payload = match f.payload() {
Some(payload) => payload,
None => return CResult::Err(Sv2Error::InvalidSv2Frame),
};
let len = payload.len();
let ptr = payload.as_mut_ptr();
let payload = unsafe { std::slice::from_raw_parts_mut(ptr, len) };
Expand Down Expand Up @@ -761,7 +764,7 @@ mod tests {
let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let payload = decoded.payload();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Sv2Message::CoinbaseOutputDataSize(m) => m,
Expand Down Expand Up @@ -813,7 +816,7 @@ mod tests {

// Extract payload of the frame which is the NewTemplate message
let msg_type = decoded.get_header().unwrap().msg_type();
let payload = decoded.payload();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Sv2Message::NewTemplate(m) => m,
Expand Down Expand Up @@ -861,7 +864,7 @@ mod tests {
let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let payload = decoded.payload();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Sv2Message::RequestTransactionData(m) => m,
Expand Down Expand Up @@ -911,7 +914,7 @@ mod tests {
let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let payload = decoded.payload();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Sv2Message::RequestTransactionDataError(m) => m,
Expand Down Expand Up @@ -961,7 +964,7 @@ mod tests {
let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let payload = decoded.payload();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Sv2Message::RequestTransactionDataSuccess(m) => m,
Expand Down Expand Up @@ -1006,7 +1009,7 @@ mod tests {
let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let payload = decoded.payload();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Sv2Message::SetNewPrevHash(m) => m,
Expand Down Expand Up @@ -1051,7 +1054,7 @@ mod tests {
let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let payload = decoded.payload();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Sv2Message::SubmitSolution(m) => m,
Expand Down Expand Up @@ -1109,7 +1112,7 @@ mod tests {
let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let payload = decoded.payload();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Sv2Message::ChannelEndpointChanged(m) => m,
Expand Down Expand Up @@ -1145,7 +1148,7 @@ mod tests {
let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let payload = decoded.payload();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Sv2Message::SetupConnection(m) => m,
Expand Down Expand Up @@ -1194,7 +1197,7 @@ mod tests {
let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let payload = decoded.payload();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Sv2Message::SetupConnectionError(m) => m,
Expand Down Expand Up @@ -1243,7 +1246,7 @@ mod tests {
let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let payload = decoded.payload();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Sv2Message::SetupConnectionSuccess(m) => m,
Expand Down
4 changes: 2 additions & 2 deletions roles/jd-client/src/lib/downstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl DownstreamMiningNode {
/// Parse the received message and relay it to the right upstream
pub async fn next(self_mutex: &Arc<Mutex<Self>>, mut incoming: StdFrame) {
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let payload = incoming.payload().unwrap();

let routing_logic = roles_logic_sv2::routing_logic::MiningRoutingLogic::None;

Expand Down Expand Up @@ -698,7 +698,7 @@ pub async fn listen_for_downstream_mining(

let mut incoming: StdFrame = node.receiver.recv().await.unwrap().try_into().unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let payload = incoming.payload().unwrap();
let routing_logic = roles_logic_sv2::routing_logic::CommonRoutingLogic::None;
let node = Arc::new(Mutex::new(node));
if let Some(upstream) = upstream {
Expand Down
2 changes: 1 addition & 1 deletion roles/jd-client/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl JobDeclarator {
loop {
let mut incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let payload = incoming.payload().unwrap();
let next_message_to_send =
ParseServerJobDeclarationMessages::handle_message_job_declaration(
self_mutex.clone(),
Expand Down
2 changes: 1 addition & 1 deletion roles/jd-client/src/lib/job_declarator/setup_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl SetupConnectionHandler {
let mut incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap();

let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let payload = incoming.payload().unwrap();
ParseUpstreamCommonMessages::handle_message_common(
Arc::new(Mutex::new(SetupConnectionHandler {})),
message_type,
Expand Down
2 changes: 1 addition & 1 deletion roles/jd-client/src/lib/template_receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl TemplateRx {
let mut frame: StdFrame =
handle_result!(tx_status.clone(), received.try_into());
let message_type = frame.get_header().unwrap().msg_type();
let payload = frame.payload();
let payload = frame.payload().expect("No payload set");

let next_message_to_send =
ParseServerTemplateDistributionMessages::handle_message_template_distribution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl SetupConnectionHandler {
.try_into()
.expect("Failed to parse incoming SetupConnectionResponse");
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let payload = incoming.payload().unwrap();
ParseUpstreamCommonMessages::handle_message_common(
Arc::new(Mutex::new(SetupConnectionHandler {})),
message_type,
Expand Down
9 changes: 7 additions & 2 deletions roles/jd-client/src/lib/upstream_sv2/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,12 @@ impl Upstream {
return Err(framing_sv2::Error::ExpectedHandshakeFrame.into());
};
// Gets the message payload
let payload = incoming.payload();
let payload = match incoming.payload() {
Some(payload) => payload,
None => {
return Err(framing_sv2::Error::ExpectedHandshakeFrame.into());
}
};

// Handle the incoming message (should be either `SetupConnectionSuccess` or
// `SetupConnectionError`)
Expand Down Expand Up @@ -333,7 +338,7 @@ impl Upstream {

let message_type = handle_result!(tx_status, message_type).msg_type();

let payload = incoming.payload();
let payload = incoming.payload().expect("Payload not found");

// Since this is not communicating with an SV2 proxy, but instead a custom SV1
// proxy where the routing logic is handled via the `Upstream`'s communication
Expand Down
10 changes: 9 additions & 1 deletion roles/jd-server/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,15 @@ impl JobDeclaratorDownstream {
.ok_or_else(|| JdsError::Custom(String::from("No header set")));
let header = handle_result!(tx_status, header);
let message_type = header.msg_type();
let payload = frame.payload();
let payload = match frame.payload() {
Some(p) => p,
None => {
handle_result!(
tx_status,
Err(JdsError::Custom("No payload set".to_string()))
)
}
};
let next_message_to_send =
ParseClientJobDeclarationMessages::handle_message_job_declaration(
self_mutex.clone(),
Expand Down
4 changes: 2 additions & 2 deletions roles/mining-proxy/src/lib/downstream_mining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl DownstreamMiningNode {
/// Parse the received message and relay it to the right upstream
pub async fn next(self_mutex: Arc<Mutex<Self>>, mut incoming: StdFrame) {
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let payload = incoming.payload().unwrap();

let routing_logic = super::get_routing_logic();

Expand Down Expand Up @@ -500,7 +500,7 @@ pub async fn listen_for_downstream_mining(address: SocketAddr) {
task::spawn(async move {
let mut incoming: StdFrame = node.receiver.recv().await.unwrap().try_into().unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let payload = incoming.payload().unwrap();
let routing_logic = super::get_common_routing_logic();
let node = Arc::new(Mutex::new(node));

Expand Down
6 changes: 3 additions & 3 deletions roles/mining-proxy/src/lib/upstream_mining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ impl UpstreamMiningNode {
.unwrap();

let message_type = response.get_header().unwrap().msg_type();
let payload = response.payload();
let payload = response.payload().unwrap();
match (message_type, payload).try_into() {
Ok(CommonMessages::SetupConnectionSuccess(_)) => {
let receiver = self_mutex
Expand Down Expand Up @@ -578,7 +578,7 @@ impl UpstreamMiningNode {

pub async fn next(self_mutex: Arc<Mutex<Self>>, mut incoming: StdFrame) {
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let payload = incoming.payload().unwrap();

let routing_logic = super::get_routing_logic();

Expand Down Expand Up @@ -616,7 +616,7 @@ impl UpstreamMiningNode {
.unwrap();

let message_type = response.get_header().unwrap().msg_type();
let payload = response.payload();
let payload = response.payload().unwrap();
match (message_type, payload).try_into() {
Ok(CommonMessages::SetupConnectionSuccess(m)) => {
let receiver = self_mutex
Expand Down
5 changes: 4 additions & 1 deletion roles/pool/src/lib/mining_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ impl Downstream {
.get_header()
.ok_or_else(|| PoolError::Custom(String::from("No header set")))?
.msg_type();
let payload = incoming.payload();
let payload = match incoming.payload() {
Some(p) => p,
None => return Err(PoolError::Custom(String::from("No payload set"))),
};
debug!(
"Received downstream message type: {:?}, payload: {:?}",
message_type, payload
Expand Down
Loading

0 comments on commit 6b54746

Please sign in to comment.