25 #ifndef __ZMQ_HPP_INCLUDED__
26 #define __ZMQ_HPP_INCLUDED__
28 #if __cplusplus >= 201103L
30 #define ZMQ_NOTHROW noexcept
31 #define ZMQ_EXPLICIT explicit
54 #if (defined(__GNUC__) && (__GNUC__ > 4 || \
55 (__GNUC__ == 4 && __GNUC_MINOR__ > 2)) && \
56 defined(__GXX_EXPERIMENTAL_CXX0X__))
57 #define ZMQ_HAS_RVALUE_REFS
58 #define ZMQ_DELETED_FUNCTION = delete
59 #elif defined(__clang__)
60 #if __has_feature(cxx_rvalue_references)
61 #define ZMQ_HAS_RVALUE_REFS
64 #if __has_feature(cxx_deleted_functions)
65 #define ZMQ_DELETED_FUNCTION = delete
67 #define ZMQ_DELETED_FUNCTION
69 #elif defined(_MSC_VER) && (_MSC_VER >= 1600)
70 #define ZMQ_HAS_RVALUE_REFS
71 #define ZMQ_DELETED_FUNCTION
73 #define ZMQ_DELETED_FUNCTION
76 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 3, 0)
77 #define ZMQ_NEW_MONITOR_EVENT_LAYOUT
80 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0)
81 #define ZMQ_HAS_PROXY_STEERABLE
90 #if ZMQ_VERSION < ZMQ_MAKE_VERSION(3, 2, 0)
91 # define zmq_msg_recv(msg, socket, flags) zmq_recvmsg(socket, msg, flags)
98 # define ZMQ_ASSERT(expression) assert(expression)
100 # define ZMQ_ASSERT(expression) (void)(expression)
115 virtual const char *
what ()
const throw ()
117 return zmq_strerror (errnum);
130 inline int poll (zmq_pollitem_t
const* items_,
size_t nitems_,
long timeout_ = -1)
132 int rc = zmq_poll (const_cast<zmq_pollitem_t*>(items_), static_cast<int>(nitems_), timeout_);
138 inline int poll(zmq_pollitem_t
const* items,
size_t nitems)
140 return poll(items, nitems, -1);
144 inline int poll(zmq_pollitem_t
const* items,
size_t nitems, std::chrono::milliseconds timeout)
146 return poll(items, nitems, timeout.count() );
149 inline int poll(std::vector<zmq_pollitem_t>
const& items, std::chrono::milliseconds timeout)
151 return poll(items.data(), items.size(), timeout.count() );
154 inline int poll(std::vector<zmq_pollitem_t>
const& items,
long timeout_ = -1)
156 return poll(items.data(), items.size(), timeout_);
162 inline void proxy (
void *frontend,
void *backend,
void *capture)
164 int rc = zmq_proxy (frontend, backend, capture);
169 #ifdef ZMQ_HAS_PROXY_STEERABLE
170 inline void proxy_steerable (
void *frontend,
void *backend,
void *capture,
void *control)
172 int rc = zmq_proxy_steerable (frontend, backend, capture, control);
178 inline void version (
int *major_,
int *minor_,
int *patch_)
180 zmq_version (major_, minor_, patch_);
184 inline std::tuple<int, int, int>
version()
186 std::tuple<int, int, int> v;
187 zmq_version(&std::get<0>(v), &std::get<1>(v), &std::get<2>(v) );
200 int rc = zmq_msg_init (&msg);
207 int rc = zmq_msg_init_size (&msg, size_);
215 typedef typename std::iterator_traits<I>::difference_type size_type;
216 typedef typename std::iterator_traits<I>::value_type value_t;
218 size_type
const size_ = std::distance(first, last)*
sizeof(value_t);
219 int const rc = zmq_msg_init_size (&msg, size_);
222 value_t* dest = data<value_t>();
223 while (first != last)
232 int rc = zmq_msg_init_size (&msg, size_);
235 memcpy(
data(), data_, size_);
238 inline message_t (
void *data_,
size_t size_, free_fn *ffn_,
241 int rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_);
246 #ifdef ZMQ_HAS_RVALUE_REFS
249 int rc = zmq_msg_init (&rhs.msg);
256 std::swap (msg, rhs.msg);
263 int rc = zmq_msg_close (&msg);
269 int rc = zmq_msg_close (&msg);
272 rc = zmq_msg_init (&msg);
279 int rc = zmq_msg_close (&msg);
282 rc = zmq_msg_init_size (&msg, size_);
287 inline void rebuild (
const void *data_,
size_t size_)
289 int rc = zmq_msg_close (&msg);
292 rc = zmq_msg_init_size (&msg, size_);
295 memcpy(
data(), data_, size_);
298 inline void rebuild (
void *data_,
size_t size_, free_fn *ffn_,
301 int rc = zmq_msg_close (&msg);
304 rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_);
311 int rc = zmq_msg_move (&msg, const_cast<zmq_msg_t*>(&(msg_->msg)));
318 int rc = zmq_msg_copy (&msg, const_cast<zmq_msg_t*>(&(msg_->msg)));
325 int rc = zmq_msg_more (const_cast<zmq_msg_t*>(&msg) );
331 return zmq_msg_data (&msg);
336 return zmq_msg_data (const_cast<zmq_msg_t*>(&msg));
341 return zmq_msg_size (const_cast<zmq_msg_t*>(&msg));
346 return static_cast<T*
>(
data() );
351 return static_cast<T const*
>(
data() );
362 void operator = (const
message_t&) ZMQ_DELETED_FUNCTION;
372 ptr = zmq_ctx_new ();
378 inline explicit context_t (
int io_threads_,
int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT)
380 ptr = zmq_ctx_new ();
384 int rc = zmq_ctx_set (ptr, ZMQ_IO_THREADS, io_threads_);
387 rc = zmq_ctx_set (ptr, ZMQ_MAX_SOCKETS, max_sockets_);
391 #ifdef ZMQ_HAS_RVALUE_REFS
392 inline context_t (context_t &&rhs)
ZMQ_NOTHROW : ptr (rhs.ptr)
396 inline context_t &operator = (context_t &&rhs)
ZMQ_NOTHROW
398 std::swap (ptr, rhs.ptr);
405 int rc = zmq_ctx_destroy (ptr);
411 int rc = zmq_ctx_destroy (ptr);
431 context_t (
const context_t&) ZMQ_DELETED_FUNCTION;
432 void operator = (const context_t&) ZMQ_DELETED_FUNCTION;
436 enum class socket_type: int
448 #if ZMQ_VERSION_MAJOR < 4
463 init(context_, type_);
467 inline socket_t(context_t& context_, socket_type type_)
469 init(context_, static_cast<int>(type_));
473 #ifdef ZMQ_HAS_RVALUE_REFS
480 std::swap(ptr, rhs.ptr);
505 int rc = zmq_close (ptr);
510 template<
typename T>
void setsockopt(
int option_, T
const& optval)
512 setsockopt(option_, &optval,
sizeof(T) );
518 int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_);
524 size_t *optvallen_)
const
526 int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_);
534 size_t optlen =
sizeof(T);
535 getsockopt(option_, &optval, &optlen );
539 inline void bind(std::string
const& addr)
544 inline void bind (
const char *addr_)
546 int rc = zmq_bind (ptr, addr_);
551 inline void unbind(std::string
const& addr)
553 unbind(addr.c_str());
558 int rc = zmq_unbind (ptr, addr_);
565 connect(addr.c_str());
570 int rc = zmq_connect (ptr, addr_);
577 disconnect(addr.c_str());
582 int rc = zmq_disconnect (ptr, addr_);
592 inline size_t send (
const void *buf_,
size_t len_,
int flags_ = 0)
594 int nbytes = zmq_send (ptr, buf_, len_, flags_);
596 return (
size_t) nbytes;
597 if (zmq_errno () == EAGAIN)
604 int nbytes = zmq_msg_send (&(msg_.msg), ptr, flags_);
607 if (zmq_errno () == EAGAIN)
612 template<
typename I>
bool send(I first, I last,
int flags_=0)
615 return send(msg, flags_);
618 #ifdef ZMQ_HAS_RVALUE_REFS
619 inline bool send (
message_t &&msg_,
int flags_ = 0)
621 return send(msg_, flags_);
625 inline size_t recv (
void *buf_,
size_t len_,
int flags_ = 0)
627 int nbytes = zmq_recv (ptr, buf_, len_, flags_);
629 return (
size_t) nbytes;
630 if (zmq_errno () == EAGAIN)
637 int nbytes = zmq_msg_recv (&(msg_->msg), ptr, flags_);
640 if (zmq_errno () == EAGAIN)
646 inline void init(context_t& context_,
int type_)
648 ctxptr = context_.ptr;
649 ptr = zmq_socket (context_.ptr, type_ );
658 void operator = (const
socket_t&) ZMQ_DELETED_FUNCTION;
669 monitor(socket, addr.c_str(), events);
674 int rc = zmq_socket_monitor(socket.ptr, addr_, events);
678 socketPtr = socket.ptr;
679 void *s = zmq_socket (socket.ctxptr, ZMQ_PAIR);
682 rc = zmq_connect (s, addr_);
685 on_monitor_started();
689 zmq_msg_init (&eventMsg);
690 rc = zmq_msg_recv (&eventMsg, s, 0);
691 if (rc == -1 && zmq_errno() == ETERM)
694 #if ZMQ_VERSION_MAJOR >= 4
695 const char*
data =
static_cast<const char*
>(zmq_msg_data(&eventMsg));
697 memcpy(&msgEvent.
event, data,
sizeof(uint16_t)); data +=
sizeof(uint16_t);
698 memcpy(&msgEvent.
value, data,
sizeof(int32_t));
704 #ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
706 zmq_msg_init (&addrMsg);
707 rc = zmq_msg_recv (&addrMsg, s, 0);
708 if (rc == -1 && zmq_errno() == ETERM)
711 const char* str =
static_cast<const char*
>(zmq_msg_data (&addrMsg));
712 std::string address(str, str + zmq_msg_size(&addrMsg));
713 zmq_msg_close (&addrMsg);
716 std::string address =
event->data.connected.addr;
719 #ifdef ZMQ_EVENT_MONITOR_STOPPED
720 if (event->event == ZMQ_EVENT_MONITOR_STOPPED)
724 switch (event->event) {
725 case ZMQ_EVENT_CONNECTED:
726 on_event_connected(*event, address.c_str());
728 case ZMQ_EVENT_CONNECT_DELAYED:
729 on_event_connect_delayed(*event, address.c_str());
731 case ZMQ_EVENT_CONNECT_RETRIED:
732 on_event_connect_retried(*event, address.c_str());
734 case ZMQ_EVENT_LISTENING:
735 on_event_listening(*event, address.c_str());
737 case ZMQ_EVENT_BIND_FAILED:
738 on_event_bind_failed(*event, address.c_str());
740 case ZMQ_EVENT_ACCEPTED:
741 on_event_accepted(*event, address.c_str());
743 case ZMQ_EVENT_ACCEPT_FAILED:
744 on_event_accept_failed(*event, address.c_str());
746 case ZMQ_EVENT_CLOSED:
747 on_event_closed(*event, address.c_str());
749 case ZMQ_EVENT_CLOSE_FAILED:
750 on_event_close_failed(*event, address.c_str());
752 case ZMQ_EVENT_DISCONNECTED:
753 on_event_disconnected(*event, address.c_str());
756 on_event_unknown(*event, address.c_str());
759 zmq_msg_close (&eventMsg);
765 #ifdef ZMQ_EVENT_MONITOR_STOPPED
769 zmq_socket_monitor(socketPtr, NULL, 0);
void monitor(socket_t &socket, const char *addr_, int events=ZMQ_EVENT_ALL)
context_t(int io_threads_, int max_sockets_=ZMQ_MAX_SOCKETS_DFLT)
#define ZMQ_DELETED_FUNCTION
void monitor(socket_t &socket, std::string const &addr, int events=ZMQ_EVENT_ALL)
zmq_pollitem_t pollitem_t
virtual void on_event_connect_delayed(const zmq_event_t &event_, const char *addr_)
void connect(const char *addr_)
T getsockopt(int option_) const
virtual void on_event_closed(const zmq_event_t &event_, const char *addr_)
virtual void on_event_disconnected(const zmq_event_t &event_, const char *addr_)
virtual void on_event_accepted(const zmq_event_t &event_, const char *addr_)
void rebuild(const void *data_, size_t size_)
message_t(const void *data_, size_t size_)
size_t send(const void *buf_, size_t len_, int flags_=0)
void move(message_t const *msg_)
void copy(message_t const *msg_)
T const * data() const ZMQ_NOTHROW
void getsockopt(int option_, void *optval_, size_t *optvallen_) const
void proxy_steerable(void *frontend, void *backend, void *capture, void *control)
void * data() ZMQ_NOTHROW
bool send(I first, I last, int flags_=0)
void proxy(void *frontend, void *backend, void *capture)
size_t recv(void *buf_, size_t len_, int flags_=0)
void bind(std::string const &addr)
void disconnect(const char *addr_)
const void * data() const ZMQ_NOTHROW
void unbind(const char *addr_)
virtual void on_event_close_failed(const zmq_event_t &event_, const char *addr_)
bool send(message_t &msg_, int flags_=0)
virtual const char * what() const
#define ZMQ_ASSERT(expression)
bool recv(message_t *msg_, int flags_=0)
virtual void on_event_unknown(const zmq_event_t &event_, const char *addr_)
socket_t(context_t &context_, int type_)
void connect(std::string const &addr)
void rebuild(size_t size_)
message_t(I first, I last)
virtual void on_event_listening(const zmq_event_t &event_, const char *addr_)
void version(int *major_, int *minor_, int *patch_)
void setsockopt(int option_, const void *optval_, size_t optvallen_)
virtual void on_event_connected(const zmq_event_t &event_, const char *addr_)
size_t size() const ZMQ_NOTHROW
message_t(void *data_, size_t size_, free_fn *ffn_, void *hint_=NULL)
virtual void on_monitor_started()
virtual void on_event_bind_failed(const zmq_event_t &event_, const char *addr_)
virtual void on_event_accept_failed(const zmq_event_t &event_, const char *addr_)
bool connected() const ZMQ_NOTHROW
void rebuild(void *data_, size_t size_, free_fn *ffn_, void *hint_=NULL)
void unbind(std::string const &addr)
int poll(zmq_pollitem_t const *items_, size_t nitems_, long timeout_=-1)
void disconnect(std::string const &addr)
void setsockopt(int option_, T const &optval)
virtual void on_event_connect_retried(const zmq_event_t &event_, const char *addr_)
void bind(const char *addr_)
bool more() const ZMQ_NOTHROW