What would BSD sockets look like if they were to be designed today?
That’s the question ZeroMQ designers seem to be trying to answer when they started working in the project.
ZeroMQ is one of the most misleading names for a software product. If you come to ZeroMQ from AMQP or ActiveMQ backgrounds you may be confused for a short period of time when you don’t find some of the components you may take for granted (e.g. broker). The fact is that ZeroMQ is much more than a queuing
system, it is a generalized communication layer that allows communication between application threads, wherever they are located: in different nodes in a network, in different processes being executed in the same machine or in the same application.
In the latest days I’ve working in adding ZeroMQ as an alternative
communication mechanism to RabbitMQ for Jobim actors. This post is a brief
summary of my findings that I hope can be useful for anyone interested in using
ZeroMQ from Clojure.
IN the beginning there were sockets
ZeroMQ main abstraction are good old sockets, but a very different kind of sockets. Quoting ZeroMQ official documentation:
“a ØMQ socket is what you get when you take a normal TCP socket, inject it with a mix of radioactive isotopes stolen from a secret Soviet atomic research project, bombard it with 1950-era cosmic rays, and put it into the hands of a drug-addled comic book author with a badly-disguised fetish for bulging muscles clad in spandex.”
Some of the feature of ZeroMQ sockets from traditional BSD sockets are the following:
- ZeroMQ sockets are asynchronous and data can be queued before being actually sent to the receivers
- ZeroMQ works with messages, packets of bytes that are sent from peer to peer
- ZeroMQ sockets can be used with different communications patterns beside traditional bidirectional communication between two peers: request-reply, push-pull, pub-sub semantics can be imposed on top of ZeroMQ sockets
- ZeroMQ sockets can be bound to a wide variety of transport mechanisms: TCP is available but also IPC for inter process communication as well as transports for in-process communication and multicast
- ZeroMQ sockets doest not require some of the traditional BSD sockets operations to work, for instance, a client socket can connect to an endpoint that has not been already bound and endpoints doest not require the use of the accept operation
Taking this into account, the first step when using ZeroMQ into a new application is deciding which combination of transport + communication pattern is going to be used.
If we want to communicate java threads in the same JVM using a publish-subscriber communication pattern the right pattern is inproc transport + pub-sub, processes communicating across the network conforming a pipeline of
data processing can use TCP transport and push-pull communication pattern, etc.
Installation and setup
Before taking a look at how ZeroMQ can be used inside a Clojure application, it is necessary to setup ZeroMQ as well as the required libraries. ZeroMQ is installed as a shared native library, so using ZeroMQ from java involves installing a JNI java wrapper.
Finally a Clojure library built on top of the JNI wrapper can be used from Clojure code to manipulate ZeroMQ sockets.
Installation of ZeroMQ is fairly straight:
$ wget http://www.zeromq.org/local--files/area:download/zeromq-2.0.9.tar.gz $ tar -zxvf zeromq-2.0.9.tar.gz $ cd zeromq-2.0.9/ $ ./configure CFLAGS="-m64" CXXFLAGS="-m64" LDFLAGS="-m64" && make $ sudo make install
In the previous snippet I’ve used the “-m64” flag because I’m building for a 64bits OSX architecture. Currently, OSX java version does not support 32 bits native libraries (you can check this issuing a “java -d32” command in the shell), so I need a X86_64 build of ZeroMQ in order to build the right JNI library.
The installation of this library is also really simple:
$ git clone http://github.com/zeromq/jzmq.git $ cd jzmq $ ./autogen.sh $ ./configure CFLAGS="-m64" CXXFLAGS="-m64" LDFLAGS="-m64" && make $ sudo make install
Once these libraries are installed, we are ready to use ZeroMQ from a Clojure application. The easiest way of using the libraries that have been built is adding Clojure/ZeroMQ library as well as the ZeroMQ java wrapper as a dependency to a Leiningen project and adding the path to the native JNI library to the project using the “:native-path” key.
:dependencies [[org.clojure/clojure "1.2.0"] [org.clojure/clojure-contrib "1.2.0"] ... [org.clojars.mikejs/clojure-zmq "2.0.7-SNAPSHOT"] [org.zmq/jzmq "2.0.6-SNAPSHOT"] ...] :native-path "/usr/local/lib"
jzmq library must be installed manually using Maven. Another possibility is
using the excellent native-deps plugin by David Nollen.
Once this setup is accomplished, you should be able to access ZeroMQ from Slime or from a regular REPL or application passing the “-Djava.library.path=/path/to/native/lib” option to the java binary.
The first step when using ZeroMQ is creating a
Context object. As I have mentioned all the operations in ZeroMQ are asynchronous. Internally, they are handled by OS level threads. The Context object initializes this pool of threads that will handle socket operations.
make-context is able to create the ZeroMQ context. It receives the number of threads that will be created in the pool. If you want to use ZeroMQ as communication mechanism for Java threads inside a single application, using the inproc transport,
0 must be passed as an argument.
user> (use 'org.zeromq.clojure) nil user>(def *ctx* (make-context 0)) #'user/*ctx*
The initialized context can be used to create sockets that will receive incoming messages or that will be used to send messages.
The following code creates a new socket with a pull/downstream communication pattern bound to a inproc transport layer. The string “inproc://test” identifies the end point for that socket.
The remaining code just creates a thread that loops in the incoming messages printing out the received message.
Notice how the delivered message is an array of bytes with the same data that will be sent from the client.
(future (let [s (make-socket *ctx* +upstream+)] (bind s "inproc://test") (loop [msg (recv s)] (println (str "Received: " (String. msg))) (recur (recv s)))))
The client counterpart is also easy. We just need to create the socket with the right transport (+downstream+) and connect it to the endpoint (“inproc://test”).
In the following piece of code we create some clients that will connect to the same endpoint, using the
connect and starts sending messages using the
(doseq [i (range 0 5)] (future (let [s (make-socket *ctx* +downstream+)] (connect s "inproc://test") (loop [c 0] (send- s (.getBytes (str "hey " i " " c))) (Thread/sleep (rand 5000)) (recur (inc c))))))
If everything goes fine, we should start getting the following output:
Received: hey 0 0 Received: hey 1 0 Received: hey 2 0 Received: hey 3 0 Received: hey 4 0 Received: hey 0 1 Received: hey 0 2 Received: hey 2 1 Received: hey 4 1 Received: hey 1 1 Received: hey 3 1 Received: hey 1 2 ...
Using the push/pull communication pattern we have created a n-1 fan-in message queue, selecting the inproc transport, we have used this communication layer to exchange data among threads in the same JVM.
We could have used a different transport layer to use the fan-in communication pattern to exchange data among threads in different JVMs. The only change required in the previous code to accomplish this task is changing the transport declaration in the
connect functions to a TCP transport, for instance:
(bind s "tcp://192.168.1.35:5555")
One of the main advantages of ZeroMQ is the support for different communications patterns. Each one of them encapsulate a behaviour that can be reused directly into your application to solve a specific communication need.
It is important to know all of them to understand when it is feasible to use one or another.
Each communication pattern is divided into two roles, that must be matched against each other for each couple of sockets. It is important not to sockets for different communication patterns since unexpected behaviour could arise.
allows a sequence of requests/replies to be exchanged between clients and servers. The outoging messages are load-balanced between all the available servers.
If the client reach the limit in the buffer of outgoing messages or any other exceptional condition, will block. The servers will drop messages.
Enables pub/sub pattern where the publish socket uses a fan-out pattern to send messages to all the subscribed clients. The client can subscribe to different publishers using the
set-socket-option function. Communication is unidirectional.
If the publisher reaches the limit in the outgoing buffer limit, it will start dropping messages.
Builds a pipeline of nodes where data is pushed to at leas one connected socket
that pulls data. If a push socket is connected to more than one outgoing pull socket, messages are load-balanced among them.
If the memory limit is reached, the push socket will block.
ZeroMQ is incredible powerful, is not only fast and efficient but it also simplifies enormously the design of the communication layer for any application.
It also makes possible to re-use the same communication pattern in different situations.
Nevertheless, ZeroMQ can be seen, in my opinion, as complementary to queues systems like RabbitMQ rather than a drop in replacement. As with any other software system, a careful look at your application requirements will make clear which communication mechanism suits better to the problem you are trying to
This article is just a really brief overview of what ZeroMQ has to offer. If you want to know more about ZeroMQ and the rest of features it offers like multi messages or devices I encourage you to dive into the great documentation available in the official ZeroMQ site.
One thought on “ZeroMQ and Clojure, a brief introduction”
Hi, this is alexis from the rabbitmq team. On comparing the two models, I would say that “complementary” is very accurate. Here are the (ongoing) results of some work we have done with Martin Sustrik, for people who want to use RabbitMQ and ZeroMQ together: http://wiki.github.com/rabbitmq/rmq-0mq/