Skip to content

Commit 08777a9

Browse files
committed
fix(rep): handle all cases of recv() and error handling
1 parent 644926e commit 08777a9

File tree

1 file changed

+11
-3
lines changed

1 file changed

+11
-3
lines changed

src/rep.rs

+11-3
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,9 @@ impl SocketRecv for RepSocket {
149149
match self.fair_queue.next().await {
150150
Some((peer_id, Ok(message))) => match message {
151151
Message::Message(mut m) => {
152-
assert!(m.len() > 1);
152+
if m.len() < 2 {
153+
return Err(ZmqError::Other("Invalid message format"));
154+
}
153155
let mut at = 1;
154156
for (index, frame) in m.iter().enumerate() {
155157
if frame.is_empty() {
@@ -163,9 +165,15 @@ impl SocketRecv for RepSocket {
163165
self.current_request = Some(peer_id);
164166
return Ok(data);
165167
}
166-
_ => todo!(),
168+
Message::Greeting(_) | Message::Command(_) => {
169+
// Ignore non-message frames. REP sockets should only process actual messages.
170+
continue;
171+
}
167172
},
168-
Some((_peer_id, _)) => todo!(),
173+
Some((peer_id, Err(e))) => {
174+
self.backend.peer_disconnected(&peer_id);
175+
return Err(e.into());
176+
}
169177
None => return Err(ZmqError::NoMessage),
170178
};
171179
}

0 commit comments

Comments
 (0)