Building a Resque like job processing system for JVM apps with RabbitMQ

RabbitMQ is a complex beast.
It’s flexible, powerful but also hard to entirely grasp and master.
Many different use cases and usage patterns can be built on top of this powerful piece of software but missteps and design errors in the first attempts of coding a particular solution are also commonplace.
In this article I will discuss the design and implementation of Palermo, a system implementing one of the possible usage patterns that can be built using RabbitMQ as the underlying queuing mechanism: a batch job processing system.

By batch job processing system we refer to a mechanism for the automated execution of jobs by workers that extract jobs from different queues. Clients can enqueue new jobs into these queues that will eventually be delivered to the workers that will execute the job. If one job fails in its execution, the worker thread will place the failed job into a special queue where it can be retried or inspected for error debugging.
The main inspiration when building Palermo has been Resque , a job processing system built by Github using Ruby and Redis.
Resque makes it possible for users of the system to define queues with different names, enqueue jobs matching the name of a Ruby class in the system with certain input arguments and to start worker processes in different machines that will consume the jobs, instantiate the Ruby class and perform the job with the provided input arguments. If the job execution fails, the job will be routed to a special ‘failed’ job queue where it can be retried or removed. Resque workers are well integrated with the underlying operative system, being able to deal with incoming signals as a way for system operators to control the execution of jobs. It also provides a web interface where the state of the jobs and workers can be inspected and certain actions like the re-enqueue of a job or the clearing of the jobs in a queue can be performed.
From a developer’s point of view, the main advantage of Resque is the extremely ease of use of the whole system. Defining and enqueuing jobs require just a few lines of Ruby code and the system performs in consistent and robust way.

The aim of Palermo is to build a job processing system as easy to use and robust as Resque but for JVM languages and using RabbitMQ as the underlying queuing mechanism instead of Redis.

From a formal point of view a job processing system queuing mechanism can be defined as a tuple space , a kind of persistent associative memory where some processes enqueue jobs by writing tuples with the form:


write(queue_name, job_type, input_argument)

Worker processes remove tuples from memory, one at a time, by using a predicate matching the queue name:


read(queue_name, ?, ?)

The only additional constrain that must be added to this formulation of the job processing queuing mechanism as a tuple space is that the read function from the distributed memory must adhere to first in first out semantics. RabbitMQ queues can be viewed as this kind of memory with FIFO semantics. The name of the queue serves as the first argument passed to the `read` predicate to extract the next matching tuple stored in memory.

palermo_space

In order to get the desired tuple space semantics for reading we need to deal with some of the RabbitMQ internals and configure the following options:

– Persistence of queues
– Persistence of messages
– Quality of service
– Message acknowledgments

Since the memory we need to use must be persistent we need to add persistence to the queues and messages managed by RabbitMQ. In order to do that, queues must be declared in RabbitMQ as `durable` and messages as `persistent`. In this way even if the RabbitMQ broker goes down, information about the queues that have been created and the pending messages will survive a restart.

When more than one worker are connected to the same queue, RabbitMQ will round robin messages between all the available workers. The main problem is that RabbitMQ will deliver a message to the next worker as soon as the message arrives to the queue, no matter if the worker is already processing a different message. If one job takes a long time to process, incoming messages can pile up in the local buffer of the busy worker while the other workers starve. We can deal with situation using two features of RabbitMQ: message acknowledgments and quality of service/pre-fetch counts.

First of all, the worker can be required to notify RabbitMQ when has processed the message with an explicit acknowledge. Messages will not be removed from the queue until they are acknowledged. If the worker dies without acknowledging a message, it will be automatically re-enqueue by RabbitMQ.

At the same time quality of service (qos) configuration can be used to tell RabbitMQ the maximum number of unacknowledged messages that can be sent to a particular worker. If we set this value, known as pre-fetch count to 1, no other message will be sent from the queue until the last message has been acknowledged by the worker process.
In this way, a fair dispatch of the jobs between the workers can be achieved.

palermo_rabbit

We still need to find a way to implement the write operation into the memory. One important aspect of RabbitMQ that makes it different from other queuing systems is the strong separation of concerns between publishers and consumers. In the RabbitMQ architecture this is achieved by the introduction of a exchange between the queues and the publishers. Publishers don’t know about the final destination of their messages, they only need the name of an exchange and a routing key that can be seen as the address for the message is being sent.
RabbitMQ supports different semantics for different exchange types that will affect the way a message with an address matching an exchange will be deliver to the actual mail boxes (queues) where the consumers are receiving the messages.
In our case, the first argument of the write function is the queue name. The queue name is also the argument to the predicate used by the workers to extract the next job that needs to be processed from the memory.
The approach followed by Palermo is to used a direct exchange type. In this type of exchanges the routing key has to match the name of a queue where the message will be delivered.
If a message is sent to a RabbitMQ exchange and no queue is attached to the exchange, the message will be discarded or sent to an associated alternate exchange. To avoid this situation, every time a publisher enqueues a new job in Palermo, it declares the queue matching the routing key before sending the job data. In this way we can be sure that the message will not be discarded if no worker is awaiting jobs. Re-declaring an existing queue with the same options is a valid operation in RabbitMQ without any effect.

Using the previously described set up for RabbitMQ we can build the core of a job processing system. However, there are certain features, like the management of failed jobs or the serialization of messages that cannot be addressed resorting to RabbitMQ features. In Palermo this features have been implemented as an additional application software layer that can be distributed as a Java library.

The first issue is how to handle with failed jobs. We have seen how, since we are using explicit worker acknowledgments, RabbitMQ will handle fatal failures in the worker execution or timeout issues. This mechanism could also be used to handle any kind of exceptional condition in the job processing logic of the job but our desired functionality requires that the failed message must be sent to a special queue of failed jobs where they can be inspected, removed and if it is the case re-enqueued. This functionality has been accomplished in Palermo wrapping the execution of the job in a generic java try/catch block and piping the failed messaged to the failed queue adding some information about the job error, retry count and original queue in the headers of the message meta-data that are sent along the message to RabbitMQ.

The serialization problem has been addressed defining a job message with header information about the Java class for the job, the serialized arguments for the job and the serialization type. A small pre-visualization of the arguments in human readable string is also sent along the message. Palermo has support for plugin and unplugging different serializations into the system and includes a JSON based serializer, but the default serialization mechanism used is JBoss Serialization. Only the arguments for the job are serialized and sent to the worker, the class with the bytecode for the job must still be available in the worker class path for the job to be correctly executed.
Palermo workers are just generic means of execution of the actual job logic encapsulated inside the definition of the job classes.

The whole logic of Palermo has been implemented in the Clojure programming language, but thanks to Clojure Java inter-op features, it can be used from Java code or any other JVM based language. Since Palermo worker threads run on the JVM they are isolated from the underlying operative system. Integration with the OS in the way Resque workers do is hard to achieve but certain degree of integration has been attempted when possible, e.g. using process identifiers for the identity tags of the RabbitMQ consumers associated to Palermo workers. Also a command line interface has been implemented so new workers can be started easily by scripts and users.

A final component of the solution is a web interface for a running Palermo system. It’s just a simple web application that uses the introspection features coded in the Palermo library to provide an easy to understand view of how the Palermo system is performing. This interface is a copy of Resque web interface and can be used as a replacement of RabbitMQ generic interface for managing exchanges, queues and workers.
All the functionality provided by the web interface can also be used programmatically by any Java code through the Palermo library.

ss1

ss3

ss3

Taking all the previous into consideration we can think about Palermo as just a thin layer of functionality built on top of a particular setup of RabbitMQ and wrapped as a library that can be used as a reusable implementation of the job processing usage pattern. The same approach can be potentially applied to different usage patterns that can be built using RabbitMQ as the underlying engine saving time in the writing of not application specific code as well as dealing with the setup and the rest of the complexity introduced by RabbitMQ specific configuration.