In the latest days I’ve been working in an actors library for Clojure built on
top of RabbitMQ and Zookeeper. I’ve called this little piece of software
Jobim. The source code is available in github.
External dependencies
Jobim depends on RabbitMQ 2.X for dispatching messages between JVM
nodes. RabbitMQ is reliable, high performant messaging solution, supporting the
0.9 version of the AMQP messaging protocol, built on top of Erlang’s OTP
platform.
I’ve been using in my day to day work Rabbit for several months and is one of
the best alternatives you can find to build a message queue.
Jobim also has a dependency on Apache’s ZooKeeper. ZooKeeper is a really
impressive software that in the purest UNIX tradition does only one thing but
does it in the best possible way. In this case Zookeeper allows a set of
distributed processes to manage a shared tree of nodes and get notification
whenever this tree is modified by other processes. This basic functionality
provides out of the box support to use ZooKeeper as a powerful directory service
as well as a group membership service. It can also be extended to provide a whole
bunch of high level coordination services like priority queues or 2 phase commit
protocols.
Turning the JVM into a Node
Jobim actors need to be executed in a JVM that is executing a Node service. JVM
nodes are aware of the existence of other nodes, can exchange messages and
coordinate their activities.
A node service can be started in the JVM using a single function
jobim/bootstrap-node
. This function receives as a parameter, a path
to a configuration file where the name of the node as well as the connection
options to RabbitMQ and ZooKeeper must be stated.
The following is a sample configuration file:
;; Default configuration for a node {:node-name "linux" ;; rabbit host, port, username, password and virtual host :rabbit-options [:host "192.168.1.35"] ;; zookeeper options: ;; servers: array of zk servers specified as host:port ;; timeout : session timeout ;; id : session id ;; password :zookeeper-options ["192.168.1.35:2181" {:timeout 3000}]}
Once we have started the node, we should be able to use the
jobim/nodes
function to check all the available nodes and their
identifiers:
=> (use 'jobim) nil => (bootstrap-node "node-config.clj") "6811651bd83e4d428359b419e7f76a75" => (nodes) {"osx" "5299491ea4184c02ad8c0fbc100c49f9", "linux" "6811651bd83e4d428359b419e7f76a75"}
Nodes are aware of changes in the list of available nodes. They are also notified about
shutdown of nodes or about nodes not being reachable due to network partitions.
Creating an actor
A Jobim actor can be started using any Clojure function. This function can use
two special functions: send!
and receive
to send and
receive messages from other actors in the same node or in a different node.
In order to send a message, the actor needs to know the PID of the actor
receiving the message. This PID is passed as the first argument of the
send!
function. The payload of the message is the second argument.
One actor can retrieve its own PID using the jobim/self
function.
This is an implementation of a sample ping actor:
(defn ping ([] (loop [continue true msg (receive)] (cond-match [#"exit" msg] (recur false msg) [#"exception" msg] (throw (Exception. (str "Ping actor with PID:" (self) "received exception"))) [[?from ?data] msg] (do (send! from data) (recur true (receive)))))))
The implementation shown above uses Matchure pattern matching library to
provide a more “erlang-like” experience.
To start the execution of a concurrent actor, the jobim/spawn
function can be used. it accepts a function or a string with the qualified
name of a function.
The jobim/spawn
function returns the PID of the newly created
process or throws an exception if the creation went wrong.
=> (def *pid* (spawn examples/ping)) #'clojure.core/*pid* => *pid* "5299491ea4184c02ad8c0fbc100c49f9.1"
The REPL or any other thread can be transformed into an actor using the
spawn-in-repl
function. We will use this function to send some
messages to the ping actor we just created:
=> (spawn-in-repl) "5299491ea4184c02ad8c0fbc100c49f9.2" => (send! *pid* [(self) 13123]) ok => (receive) 13123
Messages payload
Jobim uses standard Java serialization mechanism to build the payload of the
messages. This means that whatever object implementing
java.io.Serializable
can be send inside of a message.
For instance, we can send Date
objects to our ping actor:
=> (send! *pid* [(self) (java.util.Date.)]) ok => (receive) #
It is possible to change the serialization and deserialization mechanism used by
a node altering the jobim.core/default-encode
and
jobim.core/default.decode
symbols with suitable functions.
Going distributed
The most basic building block for distributed computing in Jobim is the
jobim/rpc-call
function. This function receives a node identifier, a
string containing a function and an array of arguments and tries the invocation
of that function in the remote node.rpc-call
returns inmediately
without knowing the result of the invocation in the remote node. If we want to
retrieve the result of the invocation we can use the blocking variant:
jobim/rpc-blocking-call
that blocks until the result is returned or
an exception is launched.
RPC functions in Jobim accepts node identifiers, we can transform the name of a
node, maybe already retrieved with the jobim/nodes
function into a
node identifier, using the jobim/resolve-node-name
function.
The following example is a RPC call to do a simple addition:
=> (nodes) {"osx" "5299491ea4184c02ad8c0fbc100c49f9", "linux" "6811651bd83e4d428359b419e7f76a75"} => (resolve-node-name "linux") "6811651bd83e4d428359b419e7f76a75" => (rpc-blocking-call (resolve-node-name "linux") "clojure.core/+" [1 2 3 4 5]) 15
One specially important use of the RPC functions is to start new actors in other
nodes invoking the jobim/spawn
function remotely. If we use the
blocking variant of the the RPC function, we will retrieve the PID of the remote
actor and we could start sending messages to it:
=> (def *pid* (rpc-blocking-call (resolve-node-name "linux") "jobim/spawn" ["jobim.examples.actors/ping"])) #'clojure.core/*pid* => *pid* "6811651bd83e4d428359b419e7f76a75.1" => (send! *pid* [(self) 345]) nil => (receive) 345
Publishing processes
As long as we have the PID of an actor, we will be able to exchange messages
with it. Besides, since the PID is just a string, we can pass the PIDs inside
messages allowing actors to be “mobile actors” in a Pi-Calculus sense.
Nevertheless, it is sometimes convenient to be able to query for an actor using
a constant reference, for instance an alias, we know beforehand, so we can
communicate with the actor without needing to know its PID.
Jobim supports this use case with the jobim/register-name
function. Using this function, we can provide a name for the PID of a process
that will be globally available to all nodes in the system.
Registered names can be queried using the jobim/registered-name
function in a similar way to the jobim/nodes
function for node
names and node identifiers.
We can transform a registered name into an actor PID using the
jobim/resolve-name
function, so we can pass it as an argument in
jobim/send!
function calls.
=> (def *ping* (spawn examples/ping)) #'clojure.core/*ping* => *ping* "5299491ea4184c02ad8c0fbc100c49f9.8" => (register-name "ping" *ping*) ok => (registered-names) {"ping" "5299491ea4184c02ad8c0fbc100c49f9.8"} => (resolve-name "ping") "5299491ea4184c02ad8c0fbc100c49f9.8" => (send! (resolve-name "ping") [(self) 1234]) ok => (receive) 1234
When things go wrong
Erlang systems have a very particular approach to error handling that consists of
not preventing failures but reacting quickly after a failure happens, most of
the time, restarting the failing component.
The basic mechanism behind this approach is the “linking” of processes. When two
Erlang processes are linked, any failure in one of the two process will produce
a fail signal in the other process that, if not properly handled, will cause the
other process to fail.
Special processes, known as supervisors, take care of creating and linking to
children processes as well as handling exceptions in the children according to some
kind of recovery policy.
Distributed Erlang applications are usually arranged as trees of processes where the
process at a node handle the error in the leafs of that node, and if is not able
to recover from that error, dies and bubble the error to the upper level.
Jobim provides limited support for this style of error handling with the
jobim/link
function. The link function receives the PID of an actor
as an argument and links bidirectionally both actors.
From this point on, any error in one actor or a node down in the node where one
of the actors is running will produce a special message signaling the error in
the other actor.
=> (self) "5299491ea4184c02ad8c0fbc100c49f9.1" => (def *pid* (spawn examples/ping)) #'clojure.core/*pid* => (link *pid*) {"5299491ea4184c02ad8c0fbc100c49f9.1" ["5299491ea4184c02ad8c0fbc100c49f9.9"], "5299491ea4184c02ad8c0fbc100c49f9.9" ["5299491ea4184c02ad8c0fbc100c49f9.1"]} => ; the ping actor will throw an exception if receives a message containing the "exception" string => (send! *pid* "exception") ok => (receive) {:signal :link-broken, :from "5299491ea4184c02ad8c0fbc100c49f9.9", :cause "class java.lang.Exception:Ping actor received exception"}
This means that linked processes in Jobim must explicitly look for this kind of
messages and handle them, maybe throwing an exception, to obtain a similar
behaviour to OTP applications.
Evented actors
The actors introduced so far are executed in their own Java thread. This is a
huge problem since a JVM will start throwing OutOfMemory
exceptions
after a few thousands of threads are created.
On the other hand, Erlang systems can handle millions of concurrent actors in a single node, using
a preemptive scheduler that applies a fixed number of reductions in each Erlang
process being executed. This means that Erlang processes are extremely
lightweight and can be benefit from features like the linking of processes
previously discussed.
A possible alternative for building systems using a large amount of actors is to
use “evented actors” so a single java thread can execute different actors. This
solution has been explored in Scala actors.
Jobim evented actors rely on three special functions: jobim/react
that is
equivalent to the receive
funtion of a regular actor,
jobim/react-loop
that creates a recursive evented actor and
jobim/spawn-evented
that creates a new evented actor returning its
PID. This PID can be used with the regular jobim/send!
function to
send messages to the evented actor.
The following is an evented implementation of the previously defined ping actor:
(defn ping-evented ([] (let [name "test evented"] (react-loop (react (fn [msg] (cond-match [#"exit" msg] false [#"exception" msg] (throw (Exception. "Ping actor received exception")) [[?from ?data] msg] (send! from (str "actor " name " : " data)))))))))
Conclusions
Erlang is an incredible platform for building distributed reliable
systems. Support for an actors library providing support for distributed failure
signals and tolerance to network partition can be a nice addition to Clojure’s
own concurrency mechanism to build distributed applications in the JVM.
It could also be mixed with different distribution mechanisms available in the
JVM.
Jobim is just an experiment on how this kind of systems could be built using two
beautiful pieces of software like RabbitMQ and ZooKeeper.
Beautiful work! I thought myself about bindings to ZooKeeper, but you already done this. Maybe it worth to put these bindings into separate library and upload to clojars as separate project?
P.S. I added your blog into Clojure Planet