Category Archives: Uncategorized

Building a Resque like job processing system for JVM apps with RabbitMQ

RabbitMQ is a complex beast.
It’s flexible, powerful but also hard to entirely grasp and master.
Many different use cases and usage patterns can be built on top of this powerful piece of software but missteps and design errors in the first attempts of coding a particular solution are also commonplace.
In this article I will discuss the design and implementation of Palermo, a system implementing one of the possible usage patterns that can be built using RabbitMQ as the underlying queuing mechanism: a batch job processing system.

By batch job processing system we refer to a mechanism for the automated execution of jobs by workers that extract jobs from different queues. Clients can enqueue new jobs into these queues that will eventually be delivered to the workers that will execute the job. If one job fails in its execution, the worker thread will place the failed job into a special queue where it can be retried or inspected for error debugging.
The main inspiration when building Palermo has been Resque , a job processing system built by Github using Ruby and Redis.
Resque makes it possible for users of the system to define queues with different names, enqueue jobs matching the name of a Ruby class in the system with certain input arguments and to start worker processes in different machines that will consume the jobs, instantiate the Ruby class and perform the job with the provided input arguments. If the job execution fails, the job will be routed to a special ‘failed’ job queue where it can be retried or removed. Resque workers are well integrated with the underlying operative system, being able to deal with incoming signals as a way for system operators to control the execution of jobs. It also provides a web interface where the state of the jobs and workers can be inspected and certain actions like the re-enqueue of a job or the clearing of the jobs in a queue can be performed.
From a developer’s point of view, the main advantage of Resque is the extremely ease of use of the whole system. Defining and enqueuing jobs require just a few lines of Ruby code and the system performs in consistent and robust way.

The aim of Palermo is to build a job processing system as easy to use and robust as Resque but for JVM languages and using RabbitMQ as the underlying queuing mechanism instead of Redis.

From a formal point of view a job processing system queuing mechanism can be defined as a tuple space , a kind of persistent associative memory where some processes enqueue jobs by writing tuples with the form:


write(queue_name, job_type, input_argument)

Worker processes remove tuples from memory, one at a time, by using a predicate matching the queue name:


read(queue_name, ?, ?)

The only additional constrain that must be added to this formulation of the job processing queuing mechanism as a tuple space is that the read function from the distributed memory must adhere to first in first out semantics. RabbitMQ queues can be viewed as this kind of memory with FIFO semantics. The name of the queue serves as the first argument passed to the `read` predicate to extract the next matching tuple stored in memory.

palermo_space

In order to get the desired tuple space semantics for reading we need to deal with some of the RabbitMQ internals and configure the following options:

- Persistence of queues
– Persistence of messages
– Quality of service
– Message acknowledgments

Since the memory we need to use must be persistent we need to add persistence to the queues and messages managed by RabbitMQ. In order to do that, queues must be declared in RabbitMQ as `durable` and messages as `persistent`. In this way even if the RabbitMQ broker goes down, information about the queues that have been created and the pending messages will survive a restart.

When more than one worker are connected to the same queue, RabbitMQ will round robin messages between all the available workers. The main problem is that RabbitMQ will deliver a message to the next worker as soon as the message arrives to the queue, no matter if the worker is already processing a different message. If one job takes a long time to process, incoming messages can pile up in the local buffer of the busy worker while the other workers starve. We can deal with situation using two features of RabbitMQ: message acknowledgments and quality of service/pre-fetch counts.

First of all, the worker can be required to notify RabbitMQ when has processed the message with an explicit acknowledge. Messages will not be removed from the queue until they are acknowledged. If the worker dies without acknowledging a message, it will be automatically re-enqueue by RabbitMQ.

At the same time quality of service (qos) configuration can be used to tell RabbitMQ the maximum number of unacknowledged messages that can be sent to a particular worker. If we set this value, known as pre-fetch count to 1, no other message will be sent from the queue until the last message has been acknowledged by the worker process.
In this way, a fair dispatch of the jobs between the workers can be achieved.

palermo_rabbit

We still need to find a way to implement the write operation into the memory. One important aspect of RabbitMQ that makes it different from other queuing systems is the strong separation of concerns between publishers and consumers. In the RabbitMQ architecture this is achieved by the introduction of a exchange between the queues and the publishers. Publishers don’t know about the final destination of their messages, they only need the name of an exchange and a routing key that can be seen as the address for the message is being sent.
RabbitMQ supports different semantics for different exchange types that will affect the way a message with an address matching an exchange will be deliver to the actual mail boxes (queues) where the consumers are receiving the messages.
In our case, the first argument of the write function is the queue name. The queue name is also the argument to the predicate used by the workers to extract the next job that needs to be processed from the memory.
The approach followed by Palermo is to used a direct exchange type. In this type of exchanges the routing key has to match the name of a queue where the message will be delivered.
If a message is sent to a RabbitMQ exchange and no queue is attached to the exchange, the message will be discarded or sent to an associated alternate exchange. To avoid this situation, every time a publisher enqueues a new job in Palermo, it declares the queue matching the routing key before sending the job data. In this way we can be sure that the message will not be discarded if no worker is awaiting jobs. Re-declaring an existing queue with the same options is a valid operation in RabbitMQ without any effect.

Using the previously described set up for RabbitMQ we can build the core of a job processing system. However, there are certain features, like the management of failed jobs or the serialization of messages that cannot be addressed resorting to RabbitMQ features. In Palermo this features have been implemented as an additional application software layer that can be distributed as a Java library.

The first issue is how to handle with failed jobs. We have seen how, since we are using explicit worker acknowledgments, RabbitMQ will handle fatal failures in the worker execution or timeout issues. This mechanism could also be used to handle any kind of exceptional condition in the job processing logic of the job but our desired functionality requires that the failed message must be sent to a special queue of failed jobs where they can be inspected, removed and if it is the case re-enqueued. This functionality has been accomplished in Palermo wrapping the execution of the job in a generic java try/catch block and piping the failed messaged to the failed queue adding some information about the job error, retry count and original queue in the headers of the message meta-data that are sent along the message to RabbitMQ.

The serialization problem has been addressed defining a job message with header information about the Java class for the job, the serialized arguments for the job and the serialization type. A small pre-visualization of the arguments in human readable string is also sent along the message. Palermo has support for plugin and unplugging different serializations into the system and includes a JSON based serializer, but the default serialization mechanism used is JBoss Serialization. Only the arguments for the job are serialized and sent to the worker, the class with the bytecode for the job must still be available in the worker class path for the job to be correctly executed.
Palermo workers are just generic means of execution of the actual job logic encapsulated inside the definition of the job classes.

The whole logic of Palermo has been implemented in the Clojure programming language, but thanks to Clojure Java inter-op features, it can be used from Java code or any other JVM based language. Since Palermo worker threads run on the JVM they are isolated from the underlying operative system. Integration with the OS in the way Resque workers do is hard to achieve but certain degree of integration has been attempted when possible, e.g. using process identifiers for the identity tags of the RabbitMQ consumers associated to Palermo workers. Also a command line interface has been implemented so new workers can be started easily by scripts and users.

A final component of the solution is a web interface for a running Palermo system. It’s just a simple web application that uses the introspection features coded in the Palermo library to provide an easy to understand view of how the Palermo system is performing. This interface is a copy of Resque web interface and can be used as a replacement of RabbitMQ generic interface for managing exchanges, queues and workers.
All the functionality provided by the web interface can also be used programmatically by any Java code through the Palermo library.

ss1

ss3

ss3

Taking all the previous into consideration we can think about Palermo as just a thin layer of functionality built on top of a particular setup of RabbitMQ and wrapped as a library that can be used as a reusable implementation of the job processing usage pattern. The same approach can be potentially applied to different usage patterns that can be built using RabbitMQ as the underlying engine saving time in the writing of not application specific code as well as dealing with the setup and the rest of the complexity introduced by RabbitMQ specific configuration.

Message Passing Concurrency in Clojure using Kilim

Some slides from my talk about Clojure and Kilim.
You can get the code and the examples in github

Monte Carlo integration with Clojure and Mahout

Monte Carlo simulations can be used to approximate the value of different mathematical models where computing an analytical solution is an intractable problem or it is impossible to find because of the uncertainty in some variables.

Conceptually Monte Carlo simulations are simple.
They are based in the repeated computation of the model where the value for the random variables are extracted from some known probability distribution.
Finally, the results of all the simulations are averaged to obtain the approximated value.

The two main requirements to run a Monte Carlo simulation, provided you already have a mathematical model of the proble, are a good random number generator and some tool to model the different probability distributions. The Apache Mahout project has implemented these tools. It includes a powerful pseudo-random numbers generator that passes all the DieHard randomness tests. It also includes Java classes that make easy to work with different probability distributions.

This post is a small example of how to use these classes from Clojure as a convenient scripting language to work with Mahout. The example implemented is extracted from this excellent introductory book published by Springer.
It consists of computing the integral for the following function:

  (defn h
    ([x] (pow (+ (cos (* 50 x)) (sin (* 20 x))) 2)))

We can use Incanter to visualize this function:

  (use 'incanter.core)
  (use 'incanter.charts)

  (view  (function-plot h 0 1))

The integral can be computed anallytically obtaining the value 0.965.

The montcarlo approach is based in extracting iid variables from an uniform probability distribution U(0,1), and approximate the value with the function:

  (def *solution* (/ (reduce + *randoms*) *trials*))

Where *randoms* is the collection of randomly generated iid variables and *trials* is the number of variables generated.

In order to generate these values we can use the org.apache.mahout.math.jet.random.Uniform Mahout class:

  (import 'org.apache.mahout.math.jet.random.Uniform)
  
  (def *unif* (Uniform. (double 0.0)
                        (double 1.0)
                        (mod (.getTime (java.util.Date.)) 10000)))

  (def *trials* 100000)

  (def *randoms* (take *trials* (repeatedly #(h (.nextDouble *unif*)))))

To check that the values generated, we can plot the generated values using Incanter:

  (view (bar-chart (range 0 *trials*) *randoms* ))

The computed value is 0.9651838354718588, very close to the actual solution of 0.965.

We can also compute the running averages and plot them to see how the algorithm converges to the solution:

  (def *averages* (reduce (fn [ac i]
                            (conj ac (/ (+ (last ac) (nth *randoms* i)) (inc i))))
                          [0]
                          (range 0 *trials*)))

  (view  (function-plot (fn [x] (nth *averages* (Math/round (float x)))) 0 *trials*))

Monte Carlo simulations are a simple yet very powerful idea that can be applied to a lots of different of situations. If you want to learn more about Monte Carlo methods, these are some useful resources:

visualizing Mahout’s output with Clojure and Incanter

Some Clojure code to visualize clusters built using Apache Mahout implementation of the K-Means clustering algorithm.

The code retrieves the output of the algorithm (clustered-points and centroids) from HDFS, builds a Clojure friendly representation of the output (a map and a couple of lazy-seqs) and finally uses Incanter’s wrapper around JFreeChart to visualize the results.

A sample execution using the output generated by the example from Mahout’s documentation:

(use 'mahout-vis.core)

(bootstrap! "/Users/antonio/Development/tmp/hadoop-0.20.2/
conf/core-site.xml")

(def *results* 
  (k-means-output "output/clusters-9/part-r-00000" 
                  "output/clusteredPoints/part-m-00000"))

(visualize-plots 
  (compute-comps *results* [5 10] [15 50] 
                           {:display-centroids true}))

The output of the previous code are 4 frames displaying the clusters for the components with indices 5,10,15 and 50 of the input data.

Other visualizations can be generated interactively from Clojure’s REPL. Another example of how Clojure can provide an interactive and powerful interface to complex Java systems.

Extending MongoDB with custom commands

MongoDB is one of the fastest alternatives to store and retrieve data. Mongo stores information organized as schema-less documents. Queries and data objects are expressed as JSON objects encoded using a “binary” serialization instead of plain text for improved performance.

There are two main mechanisms to extract information from MongoDB. A find command wrapping a small ad-hoc query language including conditionals, sorting, etc. and a mapReduce command for batch processing.

When not standard retrieval of data is required, the mapReduce command is the only option MongoDB offers. In a recent project I’ve been working on, I had to select documents stored in mongo with a value for the Levenshtein distance from a query term inferior to a certain threshold, a similar functionality tha the one offered by the fuzzystrmatch module in PostgreSQL. The task can be accomplished using Mongo’s mapReduce command, but the performance of the queries was not optimal.

As an experiment, I started reading the code of MongoDB to see if there was an easy way to implement this functionality directly in the database. What I found is that Mongo’s code is really modular and easy to extend.
The outcome has been a new command that implements the functionality with a big improvement in performance, as the following table shows:

implementation mapReduce native
levenshtein 0.7 (0 matches) 1.941 s 0.077 s
levenshtein 0.4 (21 matches) 2.691 s 0.091 s
levenshtein 0.1 (22.478 matches) 22.857 s 7.962 s

The collection being queried in this test is a collection with 100.000 documents containgin random strings of text between 30 and 100 characters.

The code for the new command can be found at Github. The files in this commit contain all the code required to implement the command.

The following is a small summary of the steps required to extend MongoDB to support this kind of queries.

Retrieving MongoDB’s code and building process

MongoDB’s code is available at Github. Once the code has been retrieved, the next step is to build the data base and all the additional functionality, like the Javascript shell.
Mongo uses SCons as the build infrastructure. SCons is itself built using python, so this is a dependency that must be installed in your system before you can build Mongo.

To build the whole system, a single command is enough:

$scons .

The task can take quite a long time but after building the whole system, SCons does a great work just re-building only the modified sources.
Different parts of the system can also be built as independent targets:

# builds only the db
$scons mongod
# builds only the JS shell
$scons mongo

Creating a new DB command

Mongo’s core functinality can be found in the db directory of the source distribution. It includes the implementation of Mongo’s RESTful API, indexes/BTree support, standard Mongo’s queries and also the list of commands that can be issued to the database, e.g. the mapReduce.
Adding a new command to the list means implementing a new C++ class with the functionality of the command and registering a name for this command in a map of command-names to command classes.

If we take a look at db/commands.cpp we will find the function used by the server frontend to look for the function it has to execute:

    map * Command::_commands;
   ... 

    Command* Command::findCommand( const string& name ) {
        map::iterator i = _commands->find( name );
        if ( i == _commands->end() )
            return 0;
        return i->second;
    }

All commands implement the abstract mongo::Command class. The subclass must implement some functions in order for the command to be executed. The mos important function is mongo::Command::run defined in db/commands.h:

  // db/commands.h line 50
   virtual bool run(const string& db, BSONObj& cmdObj, 
                    string& errmsg, BSONObjBuilder& result, 
                    bool fromRepl) = 0;

The base Command class also provides a base constructor that will automatically register the command in the commands map when invoked in the subclass. For example, the implementation of the mapReduce command registers itself for execution invoking the base constructor:

/**
 * This class represents a map/reduce command executed on a single server
 */
class MapReduceCommand : public Command {
  public:
     MapReduceCommand() : Command("mapReduce", false, "mapreduce") {}

Retrieving the arguments for the command
The query retrieved from the client is encoded as a BSON object and passed as the second argument to the run function.
There is a whole suite of functions to manipulate BSON objects defined in MongoDB. They can be found in bson/bsonobj.h and bson/bsonelement.h.
In this fragment of code from the mapReduce command implementation the out parameter of the query is handled. The BSON object is stored in the variable cmdObj:

if ( cmdObj["out"].type() == String ) {
    finalShort = cmdObj["out"].String();
    outType = REPLACE;
}
else if ( cmdObj["out"].type() == Object ) {
    BSONObj o = cmdObj["out"].embeddedObject();

    BSONElement e = o.firstElement();
    string t = e.fieldName();

    if ( t == "normal" || t == "replace" ) {
        outType = REPLACE;
        finalShort = e.String();
    }
    else if ( t == "merge" ) {
        outType = MERGE;
        finalShort = e.String();
    }
    else if ( t == "reduce" ) {
        outType = REDUCE;
        finalShort = e.String();
    }
    else if ( t == "inline" ) {
        outType = INMEMORY;
    }
    else {
        uasserted( 13522 , str::stream() << "unknown out specifier [" << t << "]" );
    }

    if (o.hasElement("db")) {
        outDB = o["db"].String();
    }
}

Obtaining a cursor
To implement the desired functionality it usually necessary to traverse the collection of Mongo documents stored in the DB. Mongo implements this functionality using cursors.
Cursors can be obtained using a factory function called bestGuessCursor that receives as a parameter an unique namespace for the command and a description of a DB query.
The cursor is returned as a Boost smart pointer so we don’t have to deal with the deallocation of the resources consumed by the pointer. A possible template for a function using a collection pointer could be:

// run function
bool run(...) {
  
  // get the cursor
  shared_ptr temp = bestGuessCursor( ns, BSONQuery, BSONObj() );        
  auto_ptr cursor( new ClientCursor( timeoutOpts , temp , ns ) );

  // main loop
  while ( cursor->ok() ) {

    // get current document
    BSONObj o = cursor->current();

    ... logic ...

    cursor->advance(); 
  }
}

Building the output

The result of the command must be returned also as a BSON object. To build this object a reference to a BSONObjBuilder object is passed as an argument to the run function. The logic of the function can use functions like append to add values to the resulting BSON object. If the values of this object must also be BSON objects, additional BSONObjBuilder instances can be created. Once the object has been built, it can be retrieved from the builder calling to the obj funtion.

The run function must also signal if the execution of the command has been successful returning a boolean value.

Adding support for the command in the shell
In order to use the command we have implemented, we can add support for the command in the Mongo JS shell. A good location for the JS code invoking the command is shell/collection.js.
The function must build the JSON object tha will be later received as a parameter in the command implementation at the server. The only requirement for this JSON object is that the first property of the object must have the same name that the string used to register the command in the DB. The value for that property must be the short name of the collection. The rest of properties are optional. The command can be executed using the this._db.runCommand object from the present collection object.

As an example, this is the implementation of the custom levenshtein command:

DBCollection.prototype.levenshtein = function( sourceTerm , field, threshold, opts ){
    var c = { levenshtein : this._shortName , sourceTerm : sourceTerm , field : field, threshold : threshold };
    opts = opts || {"level":"word"};

    if(!opts["level"] || opts["level"] === "word") {
        c["word"] = true;
        c["sentence"] = false;
    } else {
        c["word"] = false;
        c["sentence"] = true;    
    }

    c["separators"] = (opts["separators"]||".,:; ");

    if(opts["limit"]) {
        c["limit"] = opts["limit"];
    }
    if(opts["outputField"]) {
        c["outputField"] = opts["outputField"];
    }

    var raw = this._db.runCommand( c );
    if ( ! raw.ok ){
        __mrerror__ = raw;
        throw "levenshtein matches failed:" + tojson(raw);
    }

    return tojson(raw);

}

Adding support in a driver
One problem of extending MongoDB this way is that we must add support for the new command in all the layers between our client code and the DB. We have already added support to the JS shell but many applications access MongoDB through some kind of driver interfacing with the DB server.

In my case, I was using Sominum’s congomongo Clojure library. This means adding support in two different layers, the low level Java driver and the Clojure wrapper library.
Fortunately, the design of the library and the Java driver make possible to add support for the command entirely in client code without further modification of the library sources. Congomongo’s coerce function also makes very easy to transform data structures to and from Clojure’s native data types and BSON objects. An example implementation can be found in this Github’s gist

Chasing Erlang: profiling a Clojure library

This is a short summary of my efforts profiling and benchmarking Jobim, the actors library for Clojure I’ve been working for the last few months. I have very little experience profiling Clojure applications, so I thought a brief summary of the process may be interesting for other people in the same situation.

How to measure performance?

The first problem I found wast to make up a good way of testing the actual performance of the library. Performance of complex systems is really hard to test. The number of moving parts make difficult to spot the bottlenecks and wrong conclusions can be drawn easily.
Fortunately, in my case I found an easy solution for the problem. I used the same test suite built to benchmark Termite Scheme. Termite is a variant of Scheme built for distributed computing and modelled after Erlang programming model on the Gambit Scheme system.
The Termite paper has a full section discussing different performance tests and showing the results for Termite and Erlang. Furthermore, the implementation of the tests can be found in Termite’s source code.

Because of the similarities between the projects, implementing these benchmark tests in Clojure/Jobim provide a good starting point to test the performance of the library.

Finding tools

To measure performance, System/nanoTime can be good enough. On the other hand, to profile the code when performance is bad, you really need a good profiler tool. Unfortunately, there is no a good Clojure profiler tool as fas as I know. Commercial Java profilers like Yourkit are a good option but they can also be expensive. A good free alternative to these profilers is VisualVM. The profiler is included in the Java distribution and it’s really ease to use. Using VisualVM with Clojure is just a matter of selecting the REPL process from the profiler user interface and start gathering data.

Adding type hints

One first step to increase the performance of a Clojure application is to add type hints to the code of your functions. Clojure is a dynamic language, but that’s not the case of Java, the host-language for Clojure. In order to invoke Java methods, Clojure needs to find the type of the java object passed as arguments to these methods or returned from them. Java reflection is the mechanism that makes possible to look up for this informatipn at run-time. The use of reflection has a double impact on performance, it adds processing time and it consumes additional memory creating reflection objects. The use of reflection is usually not a problem in regular applications, memory is freed by the garbage collector and the performance penalty is not significative. Nevertheless, in some other cases the use of the reflection mechanism can become a problem.

The following images show a profiler session for the execution of the Termite ring test with a ring of 40k actors. The non-deterministic execution of the garbage collector has been disabled with the -Xincgc JVM parameter.

The screenshots show how the execution of the test allocates a huge amount of memory as java.lang.reflect.Method objects. These objects are released by the GC once it is manually executed but in a regular execution, the execution of the GC thread might have impacted very negatively the outcome of the test.

In order to avoid these situations, Clojure can notify us when it will use reflection to resolve a method call if we set up the *warn-on-reflection* var to true. With this modification, the list of conflicting calls will be shown when the source code is being compiled. Using information we can add type hints with the correct type for the call to the metadata of the problematic symbols. Clojure will use this information instead of using reflection to discover the right type for the objects.

The following figures show another run of the same test after having added the required type hints. We can see how the use of memory has decreased and the java.lang.reflect.Method objects have dissapeared.

In the case of Jobim, the number of type hints required to resolve all the problematic calls was really small. Around 15 type hints were enough. Nevertheless, some calls involving protocols were harder to solve. The use of VisualVM was a great help to check that no reflection objects were being created.

Object serialization
One of the main performance problems in tests involving the exchange of messages among actors in different nodes is the serialization and deserialization of messages. Using standard Java serialization the amount of measured time for some tests spent in the serialization of messages could reach 20% of the total. Besides, serialization of some Clojure objects like lambda functions is not supported.

Lacking an unique solution for the problem. Different serialization services have been implemented as plugins for Jobim. The following table shows some performance results for a test consisting of encoding and decoding a java HasMap with long lists of integers as values:

Serialization mechanism Time encoding+decoding
JSON serialization 5.566 ms
Java serialization 4.419 ms
JBoss serialization 1.437 ms
Kryo serialization 0.573 ms

The result shows how the standard Java serialization is almost as slow as JSON serialization. Kryo offers a very good performance with the drawback of not supporting Clojure core data types. Finally JBoss serialization library offers a much better performance than the standard Java serialization with a compatible interface and the same degree of support for Java types, including Clojure core data types.

The lack of support for clojure data structures makes Kryo a valid option only for certain cases where actors restrict the kind of messages they exchange so they can be serialized by Kryo. JBoss serialization is a good default option and has been used in the rest of the tests. Jobim data types used in the exchange of messages between nodes by the library have restricted to standard Java types so they can be used with any serialization mechanism.

Results

The following table shows the results I have obtained profiling the current version of Jobim. The results for Erlang have been measured using the same hardware where the Clojure version was tested.

test Erlang Jobim threaded Jobim evented
fib(534) 585 ms 378 ms -
Spawn 6.92 us 49 us 27 us
Send 0.2 us 120 us 16 us
ring 40k/0 3 us - 83 us
ring 40k/4 5 us - 166 us
ping-pong internal (0) 0.7 us 148 us 25 us
ping-pong internal (1000) 16 us 152 us 29 us
ping-pong same node (0) 58 us 14333 us 5027 us
ping-pong same node (1000) 182 us 25921 us 6303 us
ping-pong remote (0) 3267 us 18481 us 7022 us
ping-pong remote (1000) 16556 us 38730 us 8400 us

Ping-Pong tests where executed using a Jobim configuration consisting of the ZooKeeper plugin for the coordination service, the TCP plugin for the messaging service and the JBoss plugin for the serialization service. The rest of the tests where run with the localnode plugin for coordination, service and serialization.

Some conclusions:

  • Clojure is faster than Erlang in the base case chosen, computing fibonacci of a small number with a naive implementation.
  • Actor models primitives: spawn, send, receive are orders of magnitude faster in Erlang that implemented as library functions in Jobim.
  • The number of processes/actors that can be created in a single node is far inferior in Jobim than in Erlang. In a machine where the Erlang version of the ring test was running without problems with 1M processes, the JVM was struggling with ~50k evented actors. Threaded actors are even less scalable.
  • Erlang seems to have some issues with the size of network messages, Jobim faces less variation in the response times when the size of the messages varies
  • Evented actors offer far superior performance than the threaded implementation in the tests.
  • In tests involving actual network communication, Jobim gets closer to Erlang and it even outperforms Erlang in one case.

With these results, it is probably not a good idea to write Erlang-in-Clojure applications using Jobim. Nevertheless, the library starts to be performant enough to be used as a distribution mechanism for Clojure applications using actors semantics and some of Erlang features like actors linking or behaviours.

Barber shop problem again

Today, ther have been some talking about concurrency in Clojure vs Scala. Most of the debate was based on an excellent article Scala vs Clojure – Round 2: Concurrency! by Lau B. Jensen.

The article discusses the classic “barber shop” problem and offers two implementations one in Clojure, using Clojure concurrency primitives, and another one in Scala using Scala implementation of actors.

Since I’ve been working in the latest months in Jobim, an actor’s library for Clojure, I thought it would be interesting to offer an additional implementation using actors in Clojure. The code can be found here.

The implementation of the actors is pretty straightforward. It is interesting to compare it with the Scala version from the previous article.

The shop actor:

(defn do-shop
  "Shop actor"
  ([max-seats max-clients]
     (let [seats (atom [])
           max-clients (atom max-clients)]
       (loop [[msg pid] (receive)]
         (condp = msg
             ;; allocate a seat for the client
             :get-seat (if (< (count @seats) max-seats)
                         (do (swap! seats conj pid)
                             (send! pid :seated))
                         (send! pid :no-space))
             ;; barber asks for a client
             :next-client (let [pid (first @seats)]
                            (swap! seats rest)
                            (send! (resolve-name "the-barber") pid))
             ;; A client is leaving the shop
             :exiting (swap! max-clients dec))
         (if (> @max-clients 0)
           (recur (receive))
           (notify-shop "Closing barber shop"))))))

The customer:

;; Customer
(defn customer
  "Customer actor"
  ([id]
     (spawn
      ;; enters the shop and ask for a seat
      #(loop []
         (println (str "Customer id " id "..."))
         (let [seated (do (send! (resolve-name "barber-shop") [:get-seat (self)])
                          (receive))]
           ;; we have a seat, wait to have a hair cut
           (if (= seated :seated)
             (let [_ (send! (resolve-name "the-barber") :hey)
                    ;; Awaking the barber
                   msg (receive)]
               ;; barber starting to do his thing
               (when (= msg :cutting)
                 (notify-shop (str "Customer " id " getting his hair cut")))
               ;; barber finished
               (let [msg (receive)]
                 (when (= msg :finished)
                   (notify-shop (str "Customer " id " got his hair cut")))
                 ;; leaving the shop
                 (send! (resolve-name "barber-shop") 
                           [:exiting (self)])
                 (notify-shop (str "Customer " id " leaving the shop"))))
             ;; No space available in the shop, exiting
             (do
               (notify-shop (str "Customer " id " could not get his hair cut"))
               (Thread/sleep (rand 5000))
               (recur))))))))

And finally the barber:

;; Barber
(defn barber
  "Barber actor"
  ([]
     (let [pid (spawn
                #(loop [msg (receive)] ;; wait for a customer
                   (when (= msg :hey) ;; awaken!
                     ;; ask for the customer to come in
                     (send! (resolve-name "barber-shop") [:next-client (self)])
                     ;; ignore the customers trying to awake us
                     (let [client-pid (receive (fn [msg] (not (= :hey msg))))] 
                       ;; starting to cut
                       (send! client-pid :cutting)
                       (Thread/sleep (rand 4000))
                       ;; finished cutting
                       (send! client-pid :finished)))
                   (recur (receive))))]
       (register-name "the-barber" pid))))

Additionally, the implementation includes the user of a behaviour. Behaviours is a concept taken from Erlang that refers to a piece of functionality in a distributed system that is encapsulated so it can be easily reused.

In this case, since the customers can be executed in any node, if we would like to collect the messages produced by all the customers, we could not use the traditional mechanism of printing to the standard output. The messages will appear all across the cluster. Using an event manager and a event handler we can collect messages from all the customers in the cluster in a central location.
Behaviours in Jobim are implemented using Clojure’s protocols. In this case:

;; Message logger

(def-event-handler BarberLogger
  (init [this initial-state] initial-state)
  (handle-event [this event state]
                (println (str "*** " event)))
  (terminate [this state] (println ("BarberLogger exiting"))))

(defn notify-shop [msg]
  (notify (resolve-name "barber-manager") msg))

More information about the event manager behaviour can be found in the Erlang manual.

One of the fundamental advantages of using a platform like Erlang is that you can run the same application in a single node or in a set of nodes with minimal changes. In this example, there are a couple of function to start the application in a single node or in multiple node. The only difference is the way the customer actors are initiated:

(defn run-barber-shop
  "Runs the barber shop problem in a single node"
  ([num-seats num-clients]
     (start-event-manager "barber-manager")
     (add-handler (resolve-name "barber-manager")
                  [name jobim.examples.barber.BarberLogger]
                  nil)
     (shop num-seats num-clients)
     (barber)
     (doseq [i (range 0 num-clients)]
       (customer i))))

(defn run-barber-shop-distributed
  "Runs the barber shop problem distributed in a set of nodes"
  ([num-seats num-clients nodes]
     (start-event-manager "barber-manager")
     (add-handler (resolve-name "barber-manager")
                  [name jobim.examples.barber.BarberLogger]
                  nil)
     (shop num-seats num-clients)
     (barber)
     (loop [ids (range 0 num-clients)
            nodes (cycle nodes)]
       (if (empty? ids)
         (println (str "*** All clients spawned"))
         (do (rpc-blocking-call (resolve-node-name (first nodes))
                                "jobim.examples.barber/customer"
                                [(first ids)])
             (recur (rest ids)
                    (rest nodes)))))))

The result of a test run can be seen in this video:

As you can see, mixing actors in Clojure is just another possibility and it makes a really good alternative when you want to build systems spanning more than a single node.