Thomas Rodgers Blog Mostly about programming things

Integrating Boost Asio with ZeroMQ

Or what I have been doing since CppCon2014

I write a lot of code which uses Boost Asio. I also spend a lot more of my professional life integrating C++ code with Java via JNI than any right thinking individual should be required to do. I have long been a fan of ZeroMQ, and earlier this year I decided to look into ZeroMQ as a means of replacing JNI for interacting with C++. The extensive binding ecosystem for ZeroMQ means that the resulting systems would be easily used from many other languages (Python being an increasingly important one in the world of finance).

One catch, I use Asio extensively for reactive I/O. Asio, as part of the io_service type, runs its own reactor (based on epoll on Linux, kqueues on OSX/BSD). ZeroMQ exposes its own polling machinery via zmq_poll. So, how to integrate these two polling mechanisms? A quick search at the time turned up several responses similar to this StackOverflow post, so, I decided to roll my own.

After a couple of early prototypes and extensive digging around deep in the bowels of Asio, I had a version working by early summer, at which point I received a mail from the CppCon program committee telling me, yes, you really do have to finish that talk you proposed, so I shelved the Asio + ZeroMQ effort. After CppCon I returned to the effort and discovered several issues with the initial implementation. I was also uncomfortable with the degree of coupling with Asio internals.

Fortunately, I had a chance to discuss the implementation with Chris Kohlhoff (the author of Asio) at the Fall C++ Standards Committee meeting. The result of the subsequent rewrite is AZMQ and part of the ZeroMQ organization on GitHub, licensed under the same terms as the rest of Boost.

The basics of integrating a ZeroMQ socket into the Asio io_service.

A ZeroMQ socket is constructed by calling zmq_socket. This call returns a void* which is then passed to subsequent ZeroMQ APIs. Closing a socket is achieved by calling the appropriately named zmq_close API. This seems like a reasonable candidate for some RAII love, and std::unique_ptr<> provides most of what we need -

#include <zmq.h>
#include <memory>
namespace detail {
    struct socket_close {
        void operator()(void* socket) {
            int v = 0;
            auto rc = zmq_setsockopt(socket, ZMQ_LINGER, &v, sizeof(int));
            BOOST_ASSERT_MSG(rc == 0, "set linger=0 on shutdown");
            zmq_close(socket);
        }
    };
    using socket_type = std::unique_ptr<void, socket_close>;

    socket_type create_socket(context_type context,
                              int type,
                              system::error_code & ec) {
        BOOST_ASSERT_MSG(context, "Invalid context");
        auto res = zmq_socket(context.get(), type);
        if (!res) {
            ec = make_error_code();
            return socket_type();
        }
        return socket_type(res);
    }
} // namespace detail

The type, detail::socket_close, forces the socket linger time to 0 (works around a ZeroMQ issue where shutdown can hang indefinitely) and then closes the socket.

In the implemenation of socket_close we see an example of setting an option on a ZeroMQ socket. We would like to be able to do this sort of thing generically, the way Asio handles socket options for the socket types that it defines. This allows us to define strongly typed options and not rely generally on the C way of doing things.

A generic option type follows this contract -

#include <type_tratis>
namespace opt {
    template<typename T, int N>
    struct base {
        using static_name = std::integral_constant<int, N>;
        using value_t = T;
        T value_;

        base() = default;
        base(T v) : value_(std::move(v)) { }

        int name() const { return N; }
        const void* data() const { return reinterpret_cast<const void*>(&value_); }
        void* data() { return reinterpret_cast<void*>(&value_); }
        size_t size() const { return sizeof(T); }

        void set(T value) { value_ = value; }
        T value() const { return value_; }
    };
} // namespace opt

Generic set/get operations on detail::socket_type can then be implemented as -

#include <zmq.h>
namespace detail {
    template<typename Option>
    system::error_code set_option(socket_type & socket,
                                  Option const& opt,
                                  system::error_code & ec) {
        auto rc = zmq_setsockopt(socket.get(), opt.name(), opt.data(), opt.size());
        if (rc < 0)
            ec = make_error_code();
        return ec;
    }

    template<typename Option>
    system::error_code get_option(socket_type & socket,
                                  Option & opt,
                                  system::error_code & ec) {
        BOOST_ASSERT_MSG(socket, "invalid socket");
        size_t size = opt.size();
        auto rc = zmq_getsockopt(socket.get(), opt.name(), opt.data(), &size);
        if (rc < 0)
            ec = make_error_code();
        return ec;
    }
} // namespace detail

One option that we care a lot about, for the purposes of integrating into the Asio io_service, is ZMQ_FD. This returns a file descriptor that becomes readable whenever data is available to be either read from or written to the ZeroMQ socket. This is not really an option that a user of our library should care about, so we can provide a specific accessor for it -

#include <boost/asio/io_service.hpp>
#include <zmq.h>
namespace detail {
    using native_handle_type = int;
    native_handle_type get_native_handle(asio::io_service & io_service,
                                         socket_type & socket,
                                         boost::system::error_code & ec) {
        BOOST_ASSERT_MSG(socket, "invalid socket");
        native_handle_type handle = 0;
        auto size = sizeof(native_handle_type);
        auto rc = zmq_getsockopt(socket.get(), ZMQ_FD, &handle, &size);
        if (rc < 0)
            ec = make_error_code();
        return handle;
    }
} // namespace detail

Unfortunately, this raw file descriptor is insufficient in and of itself because there is no direct way to insert an FD into the Asio reactor. There is however the Posix stream_descriptor type that allows us to interface an arbitrary FD with the Asio reactor -

#include <boost/asio/io_service.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <zmq.h>
#include <memory>
namespace detail {
    using native_handle_type = asio::posix::stream_descriptor::native_handle_type;
    struct stream_descriptor_close {
        void operator()(asio::posix::stream_descriptor* sd) {
            if (sd)
                // need to release underlying FD before deleting the stream_descriptor
                sd->release();
            delete sd;
        }
    };
    using stream_descriptor = std::unique_ptr<asio::posix::stream_descriptor,
                                              stream_descriptor_close>;

    stream_descriptor get_stream_descriptor(asio::io_service & io_service,
                                            socket_type & socket,
                                            system::error_code & ec) {
        BOOST_ASSERT_MSG(socket, "invalid socket");
        native_handle_type handle = 0;
        auto size = sizeof(native_handle_type);
        stream_descriptor res;
        auto rc = zmq_getsockopt(socket.get(), ZMQ_FD, &handle, &size);
        if (rc < 0)
            ec = make_error_code();
        else
            res.reset(new asio::posix::stream_descriptor(io_service, handle));
        return res;
    }
} // namespace detail

This creates a posix::stream_descriptor associated with an Asio io_service that wraps the ZMQ_FD associated with a given socket. This can now be used to to register an async_read_some operation that will call us back when the ZeroMQ socket is ready to send or receive at least one message -

#include <boost/asio/buffer.hpp>

namespace detail {
    struct reactor_handler {
        stream_descriptor & sd_;
        socket_type & socket_;

        void operator(system::error_code const& ec, size_t) {
            // ...
        }

        static void schedule(socket_type & socket, stream_descriptor & sd) {
            reactor_handler handler{ socket, sd };
            sd_->async_read_some(boost::asio::null_buffers(), std::move(handler));
        }
    };
} // namespace detail

Note the use of null_buffers here. No data is actually read by this call and null_buffers allows us to state that intention explicitly. Of course, this only tells us that we may send or receive at least one message, but not which operation is actually ready. The ZMQ_EVENTS option allows us to retrieve a bit mask indicating which operations are ready on the socket.

Like, ZMQ_FD this option is not of much general use to client code, so we provide a specific accessor -

#include <boost/asio/io_service.hpp>
#include <zmq.h>
namespace detail {
    int get_events(socket_type & socket,
                   system::error_code & ec) {
        BOOST_ASSERT_MSG(socket, "invalid socket");
        int evs = 0;
        size_t size = sizeof(evs);
        auto rc = zmq_getsockopt(socket.get(), ZMQ_EVENTS, &evs, &size);
        if (rc < 0)
            ec = make_error_code();
        return evs;
    }
} // namespace detail

With this function in place, we can revisit the reactor_handler call operator -

#include <boost/asio/io_service.hpp>
#include <zmq.h>
namespace detail {
    struct reactor_handler {
        stream_descriptor & sd_;
        socket_type & socket_;

        void operator()(system::error_code const& e, size_t) const {
            boost::system::error_code ec{ e };
            int evs = 0;
            if (!ec)
                evs = get_events(socket_, ec);

            if (ec)
                return;

            if (evs & ZMQ_POLLIN)
                // ... receive on message

            if (evs & ZMQ_POLLOUT)
                // ... send one message

            schedule(socket_, sd_);
        }
} // namespace detail

Next time...

The next installment will dig into the details of how Asio schedules async operations and completion handlers, and how to apply that knowledge to the implementation of reactor_handler.

About the second part of my Fusion wire protocols post

I am still working off and on a representative case for more a complicated framing example, but it is not sufficiently ready to discuss.

On the otherhand, you could use AZMQ and let ZeroMQ deal with framing. As part of this series of posts, I will also present an implementation of zproto that leverages the details of the Fusion wire protocol technique alongside existing Boost support for state machines.