Skip to content

Commit c681966

Browse files
committed
refactor
1 parent ac686a2 commit c681966

File tree

13 files changed

+184
-179
lines changed

13 files changed

+184
-179
lines changed

CMakeLists.txt

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ set(pkg_config_names_private "")
7474

7575
option(WITH_OPENPGM "Build with support for OpenPGM" OFF)
7676
option(WITH_VMCI "Build with support for VMware VMCI socket" OFF)
77+
option(WITH_RDMA "Build with RDMA support" OFF)
7778

7879
if(APPLE)
7980
option(ZMQ_BUILD_FRAMEWORK "Build as OS X framework" OFF)
@@ -634,7 +635,6 @@ set(cxx-sources
634635
epoll.cpp
635636
err.cpp
636637
fq.cpp
637-
ib_res.cpp
638638
io_object.cpp
639639
io_thread.cpp
640640
ip.cpp
@@ -685,10 +685,6 @@ set(cxx-sources
685685
socks_connecter.cpp
686686
stream.cpp
687687
stream_engine.cpp
688-
rdma_address.cpp
689-
rdma_connecter.cpp
690-
rdma_engine.cpp
691-
rdma_listener.cpp
692688
sub.cpp
693689
tcp.cpp
694690
tcp_address.cpp
@@ -757,7 +753,6 @@ set(cxx-sources
757753
i_engine.hpp
758754
i_mailbox.hpp
759755
i_poll_events.hpp
760-
ib_conf.hpp
761756
ib_res.hpp
762757
io_object.hpp
763758
io_thread.hpp
@@ -804,10 +799,6 @@ set(cxx-sources
804799
random.hpp
805800
raw_decoder.hpp
806801
raw_encoder.hpp
807-
rdma_address.hpp
808-
rdma_connecter.hpp
809-
rdma_engine.hpp
810-
rdma_listener.hpp
811802
reaper.hpp
812803
rep.hpp
813804
req.hpp
@@ -918,6 +909,14 @@ if(ZMQ_HAVE_TIPC)
918909
list(APPEND cxx-sources tipc_address.cpp tipc_connecter.cpp tipc_listener.cpp)
919910
endif()
920911

912+
if(WITH_RDMA)
913+
add_definitions(-DZMQ_HAVE_RDMA)
914+
list(APPEND cxx-sources rdma_address.hpp
915+
rdma_connecter.hpp
916+
rdma_engine.hpp
917+
rdma_listener.hpp rdma_address.cpp rdma_connecter.cpp rdma_engine.cpp rdma_listener.cpp)
918+
endif()
919+
921920
#-----------------------------------------------------------------------------
922921
# source generators
923922

include/zmq.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,8 @@ ZMQ_EXPORT void zmq_version (int *major_, int *minor_, int *patch_);
219219
#define ZMQ_THREAD_AFFINITY_CPU_ADD 7
220220
#define ZMQ_THREAD_AFFINITY_CPU_REMOVE 8
221221
#define ZMQ_THREAD_NAME_PREFIX 9
222-
#define ZMQ_ENABLE_RDMA 10
222+
#define ZMQ_IB_NUM_QPS 10
223+
#define ZMQ_IB_BUF_SIZE 11
223224

224225
/* Default for new contexts */
225226
#define ZMQ_IO_THREADS_DFLT 1

src/address.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,10 @@ int zmq::address_t::to_string (std::string &addr_) const
8383
{
8484
if (protocol == protocol_name::tcp && resolved.tcp_addr)
8585
return resolved.tcp_addr->to_string (addr_);
86+
#ifdef ZMQ_HAVE_RDMA
8687
if (protocol == protocol_name::rdma && resolved.rdma_addr)
8788
return resolved.rdma_addr->to_string(addr_);
89+
#endif
8890
if (protocol == protocol_name::udp && resolved.udp_addr)
8991
return resolved.udp_addr->to_string (addr_);
9092
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \

src/address.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@
3131
#define __ZMQ_ADDRESS_HPP_INCLUDED__
3232

3333
#include <string>
34-
#include "rdma_address.hpp"
3534

3635
namespace zmq
3736
{
3837
class ctx_t;
3938
class tcp_address_t;
4039
class udp_address_t;
40+
#ifdef ZMQ_HAVE_RDMA
41+
class rdma_address_t;
42+
#endif
4143
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
4244
class ipc_address_t;
4345
#endif
@@ -84,7 +86,9 @@ struct address_t
8486
{
8587
void *dummy;
8688
tcp_address_t *tcp_addr;
89+
#ifdef ZMQ_HAVE_RDMA
8790
rdma_address_t *rdma_addr;
91+
#endif
8892
udp_address_t *udp_addr;
8993
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
9094
&& !defined ZMQ_HAVE_VXWORKS

src/ctx.cpp

Lines changed: 128 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ zmq::ctx_t::ctx_t () :
8585
_vmci_fd = -1;
8686
_vmci_family = -1;
8787
#endif
88-
88+
#ifdef ZMQ_HAVE_RDMA
89+
_ib_num_qps = 1;
90+
_ib_buf_size = 1024 * 1024;
91+
#endif
8992
// Initialise crypto library, if needed.
9093
zmq::random_open ();
9194
}
@@ -219,6 +222,9 @@ int zmq::ctx_t::shutdown ()
219222
_sockets[i]->stop ();
220223
if (_sockets.empty ())
221224
_reaper->stop ();
225+
#ifdef ZMQ_HAVE_RDMA
226+
close_ib();
227+
#endif
222228
}
223229

224230
return 0;
@@ -246,8 +252,14 @@ int zmq::ctx_t::set (int option_, int optval_)
246252
} else if (option_ == ZMQ_ZERO_COPY_RECV && optval_ >= 0) {
247253
scoped_lock_t locker(_opt_sync);
248254
_zero_copy = (optval_ != 0);
249-
} else if (option_ == ZMQ_ENABLE_RDMA && optval_ >= 0) {
250-
_rdma_enabled = (optval_ != 0);
255+
#ifdef ZMQ_HAVE_RDMA
256+
} else if (option_ == ZMQ_IB_NUM_QPS && optval_ >= 0) {
257+
scoped_lock_t locker(_opt_sync);
258+
_ib_num_qps = optval_;
259+
} else if (option_ == ZMQ_IB_BUF_SIZE && optval_ >= 0) {
260+
scoped_lock_t locker(_opt_sync);
261+
_ib_buf_size = optval_;
262+
#endif
251263
} else {
252264
rc = thread_ctx_t::set (option_, optval_);
253265
}
@@ -271,13 +283,16 @@ int zmq::ctx_t::get (int option_)
271283
rc = _max_msgsz;
272284
else if (option_ == ZMQ_MSG_T_SIZE)
273285
rc = sizeof (zmq_msg_t);
274-
else if (option_ == ZMQ_ZERO_COPY_RECV) {
286+
else if (option_ == ZMQ_ZERO_COPY_RECV)
275287
rc = _zero_copy;
276-
} else if (option_ == ZMQ_ENABLE_RDMA) {
277-
rc = _rdma_enabled;
278-
} else {
288+
#ifdef ZMQ_HAVE_RDMA
289+
else if (option_ == ZMQ_IB_NUM_QPS)
290+
rc = _ib_num_qps;
291+
else if (option_ == ZMQ_IB_BUF_SIZE)
292+
rc = _ib_buf_size;
293+
#endif
294+
else
279295
rc = thread_ctx_t::get (option_);
280-
}
281296
return rc;
282297
}
283298

@@ -340,8 +355,10 @@ bool zmq::ctx_t::start ()
340355
_empty_slots.push_back (i);
341356
}
342357

358+
#ifdef ZMQ_HAVE_RDMA
343359
// RDMA: init ib resources
344-
setup_ib(_ib_res, _max_sockets, _max_msgsz);
360+
setup_ib();
361+
#endif
345362

346363
_starting = false;
347364
return true;
@@ -708,3 +725,105 @@ int zmq::ctx_t::get_vmci_socket_family ()
708725
// is a global variable. Thus, even sockets created in different contexts have
709726
// unique IDs.
710727
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;
728+
729+
#ifdef ZMQ_HAVE_RDMA
730+
#include <infiniband/verbs.h>
731+
732+
int zmq::ctx_t::setup_ib() {
733+
ibv_device **dev_list = NULL;
734+
memset(&_ib_res, 0, sizeof(ib_res_t));
735+
736+
_ib_res.num_qps = get(ZMQ_IB_NUM_QPS);
737+
738+
dev_list =ibv_get_device_list(NULL);
739+
assert(dev_list != NULL);
740+
741+
_ib_res.ctx = ibv_open_device(*dev_list);
742+
assert(_ib_res.ctx != NULL);
743+
744+
_ib_res.pd = ibv_alloc_pd(_ib_res.ctx);
745+
assert(_ib_res.pd != NULL);
746+
747+
int ret = ibv_query_port(_ib_res.ctx, 1, &_ib_res.port_attr);
748+
assert(ret == 0);
749+
750+
_ib_res.ib_buf_size = get(ZMQ_IB_BUF_SIZE);
751+
posix_memalign((void**)_ib_res.ib_buf, 4096, _ib_res.ib_buf_size);
752+
assert(_ib_res.ib_buf != NULL);
753+
754+
_ib_res.mr = ibv_reg_mr (_ib_res.pd, (void*)_ib_res.ib_buf,
755+
_ib_res.ib_buf_size,
756+
IBV_ACCESS_LOCAL_WRITE|
757+
IBV_ACCESS_REMOTE_READ|
758+
IBV_ACCESS_REMOTE_WRITE);
759+
assert(_ib_res.mr != NULL);
760+
761+
ret = ibv_query_device(_ib_res.ctx, &_ib_res.dev_attr);
762+
assert(ret == 0);
763+
764+
_ib_res.cq = ibv_create_cq(_ib_res.ctx, _ib_res.dev_attr.max_cqe,
765+
NULL, NULL, 0);
766+
assert(_ib_res.cq != NULL);
767+
768+
struct ibv_srq_init_attr srq_init_attr;
769+
770+
srq_init_attr.attr.max_wr = _ib_res.dev_attr.max_srq_wr;
771+
srq_init_attr.attr.max_sge = 1;
772+
773+
774+
_ib_res.srq = ibv_create_srq(_ib_res.pd, &srq_init_attr);
775+
776+
struct ibv_qp_init_attr qp_init_attr;
777+
778+
qp_init_attr.send_cq = _ib_res.cq;
779+
qp_init_attr.recv_cq = _ib_res.cq;
780+
qp_init_attr.srq = _ib_res.srq;
781+
qp_init_attr.cap.max_send_wr = _ib_res.dev_attr.max_qp_wr;
782+
qp_init_attr.cap.max_recv_wr = _ib_res.dev_attr.max_qp_wr;
783+
qp_init_attr.cap.max_send_sge = 1;
784+
qp_init_attr.cap.max_recv_sge = 1;
785+
qp_init_attr.qp_type = IBV_QPT_RC;
786+
787+
_ib_res.qp = (struct ibv_qp **) calloc (_ib_res.num_qps,
788+
sizeof(struct ibv_qp *));
789+
assert(_ib_res.qp != NULL);
790+
791+
for (int i = 0; i < _ib_res.num_qps; i++) {
792+
_ib_res.qp[i] = ibv_create_qp (_ib_res.pd, & qp_init_attr);
793+
assert(_ib_res.qp[i] != NULL);
794+
}
795+
796+
ibv_free_device_list (dev_list);
797+
return 0;
798+
}
799+
800+
void zmq::ctx_t::close_ib() {
801+
if (_ib_res.qp != NULL) {
802+
for (int i = 0; i < _ib_res.num_qps; i++) {
803+
if (_ib_res.qp[i] != NULL) {
804+
ibv_destroy_qp(_ib_res.qp[i]);
805+
}
806+
}
807+
free(_ib_res.qp);
808+
}
809+
810+
if (_ib_res.srq != NULL)
811+
ibv_destroy_srq(_ib_res.srq);
812+
if (_ib_res.cq != NULL) {
813+
ibv_destroy_cq (_ib_res.cq);
814+
}
815+
if (_ib_res.mr != NULL) {
816+
ibv_dereg_mr (_ib_res.mr);
817+
}
818+
819+
if (_ib_res.pd != NULL) {
820+
ibv_dealloc_pd (_ib_res.pd);
821+
}
822+
if (_ib_res.ctx != NULL) {
823+
ibv_close_device (_ib_res.ctx);
824+
}
825+
if (_ib_res.ib_buf != NULL) {
826+
free (_ib_res.ib_buf);
827+
}
828+
}
829+
#endif

src/ctx.hpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@
4343
#include "options.hpp"
4444
#include "atomic_counter.hpp"
4545
#include "thread.hpp"
46+
#ifdef ZMQ_HAVE_RDMA
4647
#include "ib_res.hpp"
48+
#endif
4749

4850
namespace zmq
4951
{
@@ -238,6 +240,11 @@ class ctx_t : public thread_ctx_t
238240
// Should we use zero copy message decoding in this context?
239241
bool _zero_copy;
240242

243+
#ifdef ZMQ_HAVE_RDMA
244+
int _ib_num_qps;
245+
int _ib_buf_size; // will it overflow?
246+
#endif
247+
241248
ctx_t (const ctx_t &);
242249
const ctx_t &operator= (const ctx_t &);
243250

@@ -262,9 +269,13 @@ class ctx_t : public thread_ctx_t
262269
mutex_t _vmci_sync;
263270
#endif
264271

272+
#ifdef ZMQ_HAVE_RDMA
265273
// RDMA: Infiniband related resources
266274
ib_res_t _ib_res;
267-
bool _rdma_enabled;
275+
int setup_ib();
276+
void close_ib();
277+
#endif
278+
268279
};
269280
}
270281

src/ib_conf.hpp

Lines changed: 0 additions & 13 deletions
This file was deleted.

0 commit comments

Comments
 (0)