-
Notifications
You must be signed in to change notification settings - Fork 105
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
412 additions
and
389 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,45 +1,48 @@ | ||
use bytes::Bytes; | ||
use std::collections::vec_deque::VecDeque; | ||
use std::convert::TryFrom; | ||
use zeromq::ZmqMessage; | ||
#[cfg(test)] | ||
mod test { | ||
use bytes::Bytes; | ||
use std::collections::vec_deque::VecDeque; | ||
use std::convert::TryFrom; | ||
use zeromq::ZmqMessage; | ||
|
||
#[test] | ||
fn test_split_off() { | ||
let mut frames = VecDeque::with_capacity(5); | ||
frames.push_back(Bytes::from("id1")); | ||
frames.push_back(Bytes::from("id2")); | ||
frames.push_back(Bytes::from("")); | ||
frames.push_back(Bytes::from("data1")); | ||
frames.push_back(Bytes::from("data2")); | ||
let mut m = ZmqMessage::try_from(frames).unwrap(); | ||
let data = m.split_off(3); | ||
assert_eq!(m.len(), 3); | ||
assert_eq!(m.get(0), Some(&Bytes::from("id1"))); | ||
assert_eq!(m.get(1), Some(&Bytes::from("id2"))); | ||
assert_eq!(m.get(2), Some(&Bytes::from(""))); | ||
assert_eq!(data.len(), 2); | ||
assert_eq!(data.get(0), Some(&Bytes::from("data1"))); | ||
assert_eq!(data.get(1), Some(&Bytes::from("data2"))); | ||
} | ||
#[test] | ||
fn test_split_off() { | ||
let mut frames = VecDeque::with_capacity(5); | ||
frames.push_back(Bytes::from("id1")); | ||
frames.push_back(Bytes::from("id2")); | ||
frames.push_back(Bytes::from("")); | ||
frames.push_back(Bytes::from("data1")); | ||
frames.push_back(Bytes::from("data2")); | ||
let mut m = ZmqMessage::try_from(frames).unwrap(); | ||
let data = m.split_off(3); | ||
assert_eq!(m.len(), 3); | ||
assert_eq!(m.get(0), Some(&Bytes::from("id1"))); | ||
assert_eq!(m.get(1), Some(&Bytes::from("id2"))); | ||
assert_eq!(m.get(2), Some(&Bytes::from(""))); | ||
assert_eq!(data.len(), 2); | ||
assert_eq!(data.get(0), Some(&Bytes::from("data1"))); | ||
assert_eq!(data.get(1), Some(&Bytes::from("data2"))); | ||
} | ||
|
||
#[test] | ||
fn test_prepend() { | ||
let mut frames = VecDeque::with_capacity(2); | ||
frames.push_back(Bytes::from("data1")); | ||
frames.push_back(Bytes::from("data2")); | ||
let mut m = ZmqMessage::try_from(frames).unwrap(); | ||
#[test] | ||
fn test_prepend() { | ||
let mut frames = VecDeque::with_capacity(2); | ||
frames.push_back(Bytes::from("data1")); | ||
frames.push_back(Bytes::from("data2")); | ||
let mut m = ZmqMessage::try_from(frames).unwrap(); | ||
|
||
let mut envelope_frames = VecDeque::with_capacity(3); | ||
envelope_frames.push_back(Bytes::from("id1")); | ||
envelope_frames.push_back(Bytes::from("id2")); | ||
envelope_frames.push_back(Bytes::from("")); | ||
let envelope = ZmqMessage::try_from(envelope_frames).unwrap(); | ||
let mut envelope_frames = VecDeque::with_capacity(3); | ||
envelope_frames.push_back(Bytes::from("id1")); | ||
envelope_frames.push_back(Bytes::from("id2")); | ||
envelope_frames.push_back(Bytes::from("")); | ||
let envelope = ZmqMessage::try_from(envelope_frames).unwrap(); | ||
|
||
m.prepend(&envelope); | ||
assert_eq!(m.len(), 5); | ||
assert_eq!(m.get(0), Some(&Bytes::from("id1"))); | ||
assert_eq!(m.get(1), Some(&Bytes::from("id2"))); | ||
assert_eq!(m.get(2), Some(&Bytes::from(""))); | ||
assert_eq!(m.get(3), Some(&Bytes::from("data1"))); | ||
assert_eq!(m.get(4), Some(&Bytes::from("data2"))); | ||
m.prepend(&envelope); | ||
assert_eq!(m.len(), 5); | ||
assert_eq!(m.get(0), Some(&Bytes::from("id1"))); | ||
assert_eq!(m.get(1), Some(&Bytes::from("id2"))); | ||
assert_eq!(m.get(2), Some(&Bytes::from(""))); | ||
assert_eq!(m.get(3), Some(&Bytes::from("data1"))); | ||
assert_eq!(m.get(4), Some(&Bytes::from("data2"))); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,104 +1,107 @@ | ||
use zeromq::prelude::*; | ||
use zeromq::Endpoint; | ||
use zeromq::ZmqMessage; | ||
use zeromq::__async_rt as async_rt; | ||
#[cfg(test)] | ||
mod test { | ||
use zeromq::prelude::*; | ||
use zeromq::Endpoint; | ||
use zeromq::ZmqMessage; | ||
use zeromq::__async_rt as async_rt; | ||
|
||
use futures_channel::{mpsc, oneshot}; | ||
use futures_util::{SinkExt, StreamExt}; | ||
use std::time::Duration; | ||
use futures_channel::{mpsc, oneshot}; | ||
use futures_util::{SinkExt, StreamExt}; | ||
use std::time::Duration; | ||
|
||
#[async_rt::test] | ||
async fn test_pub_sub_sockets() { | ||
pretty_env_logger::try_init().ok(); | ||
#[async_rt::test] | ||
async fn test_pub_sub_sockets() { | ||
pretty_env_logger::try_init().ok(); | ||
|
||
async fn helper(bind_addr: &'static str) { | ||
// We will join on these at the end to determine if any tasks we spawned | ||
// panicked | ||
let mut task_handles = Vec::new(); | ||
let payload = chrono::Utc::now().to_rfc2822(); | ||
async fn helper(bind_addr: &'static str) { | ||
// We will join on these at the end to determine if any tasks we spawned | ||
// panicked | ||
let mut task_handles = Vec::new(); | ||
let payload = chrono::Utc::now().to_rfc2822(); | ||
|
||
let cloned_payload = payload.clone(); | ||
let (server_stop_sender, mut server_stop) = oneshot::channel::<()>(); | ||
let (has_bound_sender, has_bound) = oneshot::channel::<Endpoint>(); | ||
task_handles.push(async_rt::task::spawn(async move { | ||
let mut pub_socket = zeromq::PubSocket::new(); | ||
let bound_to = pub_socket | ||
.bind(bind_addr) | ||
.await | ||
.unwrap_or_else(|e| panic!("Failed to bind to {}: {}", bind_addr, e)); | ||
has_bound_sender | ||
.send(bound_to) | ||
.expect("channel was dropped"); | ||
let cloned_payload = payload.clone(); | ||
let (server_stop_sender, mut server_stop) = oneshot::channel::<()>(); | ||
let (has_bound_sender, has_bound) = oneshot::channel::<Endpoint>(); | ||
task_handles.push(async_rt::task::spawn(async move { | ||
let mut pub_socket = zeromq::PubSocket::new(); | ||
let bound_to = pub_socket | ||
.bind(bind_addr) | ||
.await | ||
.unwrap_or_else(|e| panic!("Failed to bind to {}: {}", bind_addr, e)); | ||
has_bound_sender | ||
.send(bound_to) | ||
.expect("channel was dropped"); | ||
|
||
loop { | ||
if let Ok(Some(_)) = server_stop.try_recv() { | ||
break; | ||
loop { | ||
if let Ok(Some(_)) = server_stop.try_recv() { | ||
break; | ||
} | ||
|
||
let s: String = cloned_payload.clone(); | ||
let m = ZmqMessage::from(s); | ||
pub_socket.send(m).await.expect("Failed to send"); | ||
async_rt::task::sleep(Duration::from_millis(1)).await; | ||
} | ||
|
||
let s: String = cloned_payload.clone(); | ||
let m = ZmqMessage::from(s); | ||
pub_socket.send(m).await.expect("Failed to send"); | ||
async_rt::task::sleep(Duration::from_millis(1)).await; | ||
let errs = pub_socket.close().await; | ||
if !errs.is_empty() { | ||
panic!("Could not unbind socket: {:?}", errs); | ||
} | ||
})); | ||
// Block until the pub has finished binding | ||
// TODO: ZMQ sockets should not care about this sort of ordering. | ||
// See https://github.com/zeromq/zmq.rs/issues/73 | ||
let bound_addr = has_bound.await.expect("channel was cancelled"); | ||
if let Endpoint::Tcp(_host, port) = bound_addr.clone() { | ||
assert_ne!(port, 0); | ||
} | ||
|
||
let errs = pub_socket.close().await; | ||
if !errs.is_empty() { | ||
panic!("Could not unbind socket: {:?}", errs); | ||
} | ||
})); | ||
// Block until the pub has finished binding | ||
// TODO: ZMQ sockets should not care about this sort of ordering. | ||
// See https://github.com/zeromq/zmq.rs/issues/73 | ||
let bound_addr = has_bound.await.expect("channel was cancelled"); | ||
if let Endpoint::Tcp(_host, port) = bound_addr.clone() { | ||
assert_ne!(port, 0); | ||
} | ||
let (sub_results_sender, sub_results) = mpsc::channel(100); | ||
for _ in 0..10 { | ||
let mut cloned_sub_sender = sub_results_sender.clone(); | ||
let cloned_payload = payload.clone(); | ||
let cloned_bound_addr = bound_addr.to_string(); | ||
task_handles.push(async_rt::task::spawn(async move { | ||
let mut sub_socket = zeromq::SubSocket::new(); | ||
sub_socket | ||
.connect(&cloned_bound_addr) | ||
.await | ||
.unwrap_or_else(|_| panic!("Failed to connect to {}", bind_addr)); | ||
|
||
let (sub_results_sender, sub_results) = mpsc::channel(100); | ||
for _ in 0..10 { | ||
let mut cloned_sub_sender = sub_results_sender.clone(); | ||
let cloned_payload = payload.clone(); | ||
let cloned_bound_addr = bound_addr.to_string(); | ||
task_handles.push(async_rt::task::spawn(async move { | ||
let mut sub_socket = zeromq::SubSocket::new(); | ||
sub_socket | ||
.connect(&cloned_bound_addr) | ||
.await | ||
.unwrap_or_else(|_| panic!("Failed to connect to {}", bind_addr)); | ||
sub_socket.subscribe("").await.expect("Failed to subscribe"); | ||
|
||
sub_socket.subscribe("").await.expect("Failed to subscribe"); | ||
async_rt::task::sleep(std::time::Duration::from_millis(500)).await; | ||
|
||
async_rt::task::sleep(std::time::Duration::from_millis(500)).await; | ||
for _ in 0..10 { | ||
let recv_message = sub_socket.recv().await.unwrap(); | ||
let recv_payload = | ||
String::from_utf8(recv_message.get(0).unwrap().to_vec()).unwrap(); | ||
assert_eq!(cloned_payload, recv_payload); | ||
cloned_sub_sender.send(()).await.unwrap(); | ||
} | ||
})); | ||
} | ||
drop(sub_results_sender); | ||
let res_vec: Vec<()> = sub_results.collect().await; | ||
assert_eq!(100, res_vec.len()); | ||
|
||
for _ in 0..10 { | ||
let recv_message = sub_socket.recv().await.unwrap(); | ||
let recv_payload = | ||
String::from_utf8(recv_message.get(0).unwrap().to_vec()).unwrap(); | ||
assert_eq!(cloned_payload, recv_payload); | ||
cloned_sub_sender.send(()).await.unwrap(); | ||
} | ||
})); | ||
server_stop_sender.send(()).unwrap(); | ||
for t in task_handles { | ||
t.await.expect("Task failed unexpectedly!"); | ||
} | ||
} | ||
drop(sub_results_sender); | ||
let res_vec: Vec<()> = sub_results.collect().await; | ||
assert_eq!(100, res_vec.len()); | ||
|
||
server_stop_sender.send(()).unwrap(); | ||
for t in task_handles { | ||
t.await.expect("Task failed unexpectedly!"); | ||
} | ||
let addrs = vec![ | ||
"tcp://localhost:0", | ||
"tcp://127.0.0.1:0", | ||
"tcp://[::1]:0", | ||
"tcp://127.0.0.1:0", | ||
"tcp://localhost:0", | ||
"tcp://127.0.0.1:0", | ||
"tcp://[::1]:0", | ||
"ipc://asdf.sock", | ||
"ipc://anothersocket-asdf", | ||
]; | ||
futures_util::future::join_all(addrs.into_iter().map(helper)).await; | ||
} | ||
|
||
let addrs = vec![ | ||
"tcp://localhost:0", | ||
"tcp://127.0.0.1:0", | ||
"tcp://[::1]:0", | ||
"tcp://127.0.0.1:0", | ||
"tcp://localhost:0", | ||
"tcp://127.0.0.1:0", | ||
"tcp://[::1]:0", | ||
"ipc://asdf.sock", | ||
"ipc://anothersocket-asdf", | ||
]; | ||
futures_util::future::join_all(addrs.into_iter().map(helper)).await; | ||
} |
Oops, something went wrong.