@@ -556,16 +556,74 @@ int zmq::socket_base_t::bind (const char *addr_)
556556 return rc;
557557 }
558558
559- if (protocol == " pgm" || protocol == " epgm" || protocol == " norm" || protocol == " udp " ) {
559+ if (protocol == " pgm" || protocol == " epgm" || protocol == " norm" ) {
560560 // For convenience's sake, bind can be used interchangeable with
561- // connect for PGM, EPGM, NORM and UDP transports.
561+ // connect for PGM, EPGM, NORM transports.
562562 EXIT_MUTEX ();
563563 rc = connect (addr_);
564564 if (rc != -1 )
565565 options.connected = true ;
566566 return rc;
567567 }
568568
569+ if (protocol == " udp" ) {
570+ if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
571+ errno = ENOCOMPATPROTO;
572+ EXIT_MUTEX ();
573+ return -1 ;
574+ }
575+
576+ // Choose the I/O thread to run the session in.
577+ io_thread_t *io_thread = choose_io_thread (options.affinity );
578+ if (!io_thread) {
579+ errno = EMTHREAD;
580+ EXIT_MUTEX ();
581+ return -1 ;
582+ }
583+
584+ address_t *paddr = new (std::nothrow) address_t (protocol, address, this ->get_ctx ());
585+ alloc_assert (paddr);
586+
587+ paddr->resolved .udp_addr = new (std::nothrow) udp_address_t ();
588+ alloc_assert (paddr->resolved .udp_addr );
589+ rc = paddr->resolved .udp_addr ->resolve (address.c_str (), true );
590+ if (rc != 0 ) {
591+ LIBZMQ_DELETE (paddr);
592+ EXIT_MUTEX ();
593+ return -1 ;
594+ }
595+
596+ session_base_t *session = session_base_t::create (io_thread, true , this ,
597+ options, paddr);
598+ errno_assert (session);
599+
600+ pipe_t *newpipe = NULL ;
601+
602+ // Create a bi-directional pipe.
603+ object_t *parents [2 ] = {this , session};
604+ pipe_t *new_pipes [2 ] = {NULL , NULL };
605+
606+ int hwms [2 ] = {options.sndhwm , options.rcvhwm };
607+ bool conflates [2 ] = {false , false };
608+ rc = pipepair (parents, new_pipes, hwms, conflates);
609+ errno_assert (rc == 0 );
610+
611+ // Attach local end of the pipe to the socket object.
612+ attach_pipe (new_pipes [0 ], true );
613+ newpipe = new_pipes [0 ];
614+
615+ // Attach remote end of the pipe to the session object later on.
616+ session->attach_pipe (new_pipes [1 ]);
617+
618+ // Save last endpoint URI
619+ paddr->to_string (last_endpoint);
620+
621+ add_endpoint (addr_, (own_t *) session, newpipe);
622+
623+ EXIT_MUTEX ();
624+ return 0 ;
625+ }
626+
569627 // Remaining transports require to be run in an I/O thread, so at this
570628 // point we'll choose one.
571629 io_thread_t *io_thread = choose_io_thread (options.affinity );
@@ -881,9 +939,15 @@ int zmq::socket_base_t::connect (const char *addr_)
881939#endif
882940
883941if (protocol == " udp" ) {
942+ if (options.type != ZMQ_RADIO) {
943+ errno = ENOCOMPATPROTO;
944+ EXIT_MUTEX ();
945+ return -1 ;
946+ }
947+
884948 paddr->resolved .udp_addr = new (std::nothrow) udp_address_t ();
885949 alloc_assert (paddr->resolved .udp_addr );
886- rc = paddr->resolved .udp_addr ->resolve (address.c_str (), (options. type == ZMQ_DISH || options. type == ZMQ_DGRAM) );
950+ rc = paddr->resolved .udp_addr ->resolve (address.c_str (), false );
887951 if (rc != 0 ) {
888952 LIBZMQ_DELETE (paddr);
889953 EXIT_MUTEX ();
@@ -1284,7 +1348,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
12841348int zmq::socket_base_t::close ()
12851349{
12861350 ENTER_MUTEX ();
1287-
1351+
12881352 // Remove all existing signalers for thread safe sockets
12891353 if (thread_safe)
12901354 ((mailbox_safe_t *)mailbox)->clear_signalers ();
0 commit comments