OCR with Clojure, Tesseract and OpenCV

Last week I’ve been playing a little bit with computer vision and OCR. My ultimate goal was to digitalize the pile of bills I’ve stored since I moved to the UK and try to make some sense of the figures.

All my knowledge about computer vision was some previous experience with OpenCV. This wonderful library stores tons of algorithms for image transformation besides some support for utility functions to work with devices and basic UI widgets.

Research on open source OCR software led me to Tesseract, a robust OCR software originally developed by HP and now mantained by Google.

After writting some software for pre-processing bill pictures taken with an iPhone4 camera using OpenCV and then, attempting text capture with Tesseract, I had built a small C++ library that I intended to wrap using JNA so I could experiment easilly from the Clojure REPL trying different ways of preprocessing the image.

At this point I found a Clojure library called Vision by Nurullah Akkaya that wraps a good part of OpenCV in Clojure. Thanks to Vision I could just concentrate in wrapping the OCR parts exchanging pointers between both libraries in Clojure using JNA.

The clojure wrapper for Tesseract can be found at github with the instructions to install it.

Using the library is quite straightforward:

(use 'vision.core)
(use 'clj-tesseract.core)

;; create an API instance
(def *api* (make-tesseract "/tmp"))

We then can upload an image to attempt OCR or just grab one from the webcam using OpenCV/Vision

;; load an image
(def *img* (load-image "numbers_pre.tiff" :grayscale))

;; Webcam capture
(def *capture* (capture-from-cam 0))
(def *frame* (query-frame *capture*))

Once we have the image, we can try to capture text using the capture function.

(println (capture *api* *img*))
BASICS UM1 MILK £0.49
BASICS UHI MILK £0.49
CRAMULA15u SUGAR £0.98
3 BALANCE Bus £1.96
CASH £1.96
CHANGE £0.00

(end *api*)

The capture function applies some very basic preprocessing, like converting the image to grayscae and applying a threshold:

Is preprocessed to the following image:

Custom preprocessing can be applied using the capture-no-preproces function.

The result was OK for my use case, although no perfect, but thanks to the power of OpenCV many clever ways of preprocessing the image can be tried to improve text recognition. Furthermore, Tesseract is fully trainable, so another interesting way of improving results for a certain applications is to create a new language for Tesseract and then train the OCR software for that language. Creating a new language can be a little bit tricky, but there are some software that can help creating the required files. Custom languages can be used in clj-tesseract, passing the name of the language as an argument to the make-tesseract function.

Translating SPARQL queries into SQL using R2RML

RDF is the data model of the semantic web as it has been conceived by the W3C. This data model is extremely general. Because RDF has been modelled to be after the web and using web technologies, like URIs, it is also very easy to link different RDF graphs using the same mechanism that makes possible to link web pages. RDF is also finally becoming a mainstream technology. Right now, there is already a huge amount of information available as inter-linked RDF graphs, for example as part of the Linked Data initiative.

Another advantage of having information available in the web as RDF graphs is that this information, once it has been retrieved, can be easily queried using the SPARQL W3C recommendation.

SPARQL can be used to query data stored using the RDF data model in the same way SQL can be used to query data stored using the relational data model. In fact, SPARQL could be used as a API to retrieved data from a RESTful web service where each web service URI is attached to a RDF graph or subgraph.

The question from an infrastructure point of view is what software should be used to store RDF graphs and to provide a SPARQL interface to these data.

Special purpose data stores for RDF have been available for a long time: triple stores like Jena, Mulgara or AllegroGraph or Virtuoso are possible solutions. It is also possible to use graph databases supporting the RDF model like Neo4j.

Unfortunately, deploying new infrastructure requirements supposes many times a real barrier for the adoption of semantic technologies in the development of web systems. This is specially true when most of the present web applications are already built using the relational model and migrating these data to a triple store solution, or adopting a hybrid solution are not valid options.

In these situations, recent W3C proposals like R2RML can provide an easy way to introduce semantic technologies in web applications using existing relational solutions and data.

R2ML provides a standard way to lift relational data to the RDF data model. This way, web application developers can expose their information as linked data with important gains for web clients, in example, the abilitiy to link the application data to other data providers using URIs and the possibility to use standard vocabularies in the description of the data, without any modification in the underlying mechanism to store these data.

However, if a web client consuming the generated RDF wants to use SPARQL to query the data, it must first retrieve all the data from the R2RML mapped web service and save it locally in some kind of SPARQL enabled store. This problem would disappear if the web service could offer a SPARQL endpoint to the clients so they could retrieve only the fragment of the whole RDF graph they were interested in.

To accomplish this task it would be necessary to translate the SPARQL query in a semantic preserving SQL query that could be executed against the relational data mapped by the R2RML description.

The efficient translation of SPARQL into SQL is an active field of research in the academy and in the industry. In fact, a number of triple stores are built as a layer on top of a relational solution. Support for SPARQL in these RDF stores supposes the translation of the SPARQL query to a SQL query that can be executed in a certain relational schema.

Some foundational papers in the field include “A Relational Algebra for SPARQL” by Richard Cyganiak that translates the semantics of SPARQL as they were finally defined by the W3C to the Relational Algebra semantics or “Semantics preserving SPARQL-to-SQL translation” by Chebotko, Lu and Fotohui, that introduces an algorithm to translate SPARQL queries to SQL queries.

This latter paper is specially interesting because the translation mechanism is parametric on the underlying relational schema. This makes possible to adapt their translation mechanism to any relational database using a couple of mapping functions, alpha and beta, that map a triple pattern of the SPARQL query and a triple pattern and a position in the triple to a table and a column in the database.

Provided that R2RML offers a generic mechanism for the description of relational databases, in order to support SPARQL queries in any R2RML RDF graph, we just need to find an algorithm that receives as an input the R2RML mapping and builds the mapping functions required by Chebotko et alter algorithm.

The straightest way to accomplished that is using the R2RML mapping to generate a virtual table with a single relation with only subject, predicate and object. The mapping for this table is trivial. A possible implementation of this
algorithm can be found in the following Clojure code.

The code basically generates a SQL query where for each triple-map and their included term-maps in the R2RML mapping generates a SQL SELECT subquery with the right selection and projections. The final table is the UNION of all the generated subqueries.

For the example mapping included in the R2RML specification:

(def test-spec
     [{:logical-table   "select concat('_:Department',deptno) AS 
                         deptid, deptno, dname, loc from Dept"
       :class           "xyz:dept"
       :table-graph-iri "xyz:DeptGraph"
       :subject-map     {:column "deptid"}
       :property-object-map [{:property "dept:deptno"
                              :column   "deptno"
                              :datatype "xsd:positiveInteger"}
                             {:property "dept:name"
                              :column   "dname"}
                             {:property "dept:location"
                              :column   "loc"}
                             {:property       "dept:COMPANY"
                              :constant-value "XYZ Corporation"}]}])

The generated SQL query looks like this:

user> (build-sql-query test-spec)
"SELECT  deptid AS subject,  'dept:deptno' AS predicate,  
deptno AS object,'xyz:DeptGraph' AS graph FROM (select 
concat('_:Department',deptno) AS deptid, deptno, dname, 
loc from Dept) AS TBL0 UNION SELECT  deptid AS subject,
'dept:name' AS predicate,  dname AS object,  'xyz:DeptGraph' 
AS graph FROM (select concat('_:Department',deptno) AS deptid, 
deptno, dname, loc from Dept) AS TBL0 UNION SELECT  deptid AS 
subject,  'dept:location' AS predicate,  loc AS object,  
'xyz:DeptGraph' AS graph FROM (select concat('_:Department'
,deptno) AS deptid, deptno, dname, loc from Dept) AS TBL0 
UNION SELECT  deptid AS subject,'dept:COMPANY' AS predicate,  
'XYZ Corporation' AS object,  'xyz:DeptGraph' AS graph FROM 
(select concat('_:Department',deptno) AS deptid, deptno, dname, 
loc from Dept) AS TBL0"

Using this query, a view can be generated or it can be directly used to define the functions alfa and beta required by the SQL generation algorithm:

(defn alfa
  ([triples-map]
     (let [query (build-sql-query triples-map)]
       (fn [triple-pattern] (str "(" query ") AS " (table-alias))))))

(defn beta
  ([triples-map]
     (let [query (build-sql-query triples-map)]
       (fn [triple-pattern pos]
         (condp = pos
             :subject "subject"
             :predicate "predicate"
             :object "object"
             (throw (Exception. 
               (str "Unknown position for a triple " (name pos)))))))))

A sample implementation of the SQL generation algorithm can be found in this Clojure file.

With this implementation, a parsed SPARQL query, here represented using Lisp S-Expressions can be translated to the equivalent SQL query:

(trans
  (SELECT [:s :p] (tp :s "dept:name" :p))
  test-spec)

Is translate to the following semantics-preserving SQL query:

"SELECT DISTINCT s,p FROM (SELECT DISTINCT subject AS s, 
predicate AS 'dept:name', object AS p FROM 
(SELECT  deptid AS subject,  'dept:deptno' AS predicate,  
deptno AS object,  'xyz:DeptGraph' AS graph 
FROM (select concat('_:Department',deptno) AS deptid, 
deptno, dname, loc from Dept) AS TBL0 UNION 
SELECT  deptid AS subject,  'dept:name' AS predicate, 
dname AS object,  'xyz:DeptGraph' AS graph 
FROM (select concat('_:Department',deptno) AS deptid, 
deptno, dname, loc from Dept) AS TBL0 UNION 
SELECT  deptid AS subject,  'dept:location' AS predicate,
loc AS object,  'xyz:DeptGraph' AS graph 
FROM (select concat('_:Department',deptno) AS deptid, 
deptno, dname, loc from Dept) AS TBL0 UNION 
SELECT  deptid AS subject,  'dept:COMPANY' AS predicate,
'XYZ Corporation' AS object,  'xyz:DeptGraph' 
AS graph FROM (select concat('_:Department',deptno) AS 
deptid, deptno, dname, loc from Dept) AS TBL0) 
AS TBL83347e804edd4b0ca4dddcdac0b14e97 WHERE True AND 
predicate='dept:name') TBL7c7476ba56624ecd992969c37129a6b4;"

The mechanism is completely generic so more complex queries like:

(trans
 (SELECT [:s :c] (FILTER (And (Eq :s 1) (Eq :p 2))
                  (UNION
                   (AND (tp :s "dept:name" :p)
                        (tp :s "dept:COMPANY" :c)
                        (tp :s "something" :d))
                   (tp "_:1" :p :c))))
 test-spec)

could be also be translated.

The generated SQL code is far from being optimal in this sample implementation, but it is another glimpse of the possibilities R2RML introduces to boost the adoption of semantic technologies in common everyday web development.

This implementation does not take into account the use of datatypes and language support in RDF literals or the possibility of select from different graphs in the query. It neither generates any additional triple due any entailment regime.

It should be possible to improve the translation mechanism using the existing research in SPARQL to SQL translation, for example, “A Complete Translation from SPARQL into Efficient SQL” by Elliott, Cheng et alt. or to find better translation alternatives.

Another option could be extending the R2RML vocabulary with optional terms that could be used to give hints to a translation mechanism so it can provide more efficient translations for more specific database schemas.

Exploring Yahoo S4 with Clojure

S4 is a platform for processing unbounded streams of data introduced by Yahoo in November 2010.
S4 main concern is the processing of streams of data at a rate of thousands of events per second in a scalable, distributed way and offering some guarantees in case of network partition.

S4 programming model is similar to the actors model implemented in languages like Erlang’s OTP platform. This model makes easy to implement the processing of streams using a functional programming style.

In this post I will explore the main features and components of S4 using the Clojure programming language. At the same time, a Clojure library for building S4 applications, clj-s4, will be introduced.

The version of S4 used will be 0.2.1. S4 is still under heavy development so some of the features discussed here could change in the nearly future.

Streams and events

S4 is a platform for the processing of streams of data. In the S4 context a stream of data must be understood as a lazy sequence of events. Each of these events will be an instance of a Java object implemented as a java bean. Java beans are just containers for a set of fields, implementing standard methods for retrieving and modifying the state of the objects.

S4 streams have another important feature: the events in the stream have an associated key. This key can be composed of a single field or a list of fields in the Java bean object. The key of the event is used by S4 similarly to the way Hadoop map-reduce framework uses keys to route data to the different processing units.

Each of these event bean objects are the equivalent to messages exchanged by processes in similar systems like Erlang’s OTP. In this system, the main structure used to store messages data are plain tuples. The decision to use Java beans in S4 makes easy the integration of S4 events with the Java code that will be used in the rest of the application. It also offers a standard interface for manipulating the state of the objects. This is specially useful in S4 since it also uses the Spring application development framework for the application configuration and setup. In Spring state for java beans can be easily injected from the configuration file making it easier to build reusable parametric java components.

Nevertheless, the use of Java beans is not the most common way of storing the state of a program in Clojure applications. Plain maps and types created with the deftype and defrecord macros are preferred way to deal with state.

Unfortunately, when using Clojure types, the resulting Java classes will not integrate easily into the S4 model since they don’t implement the standard bean interface and, as a result, could not be easily configured in a Spring configuration file. Furthermore, the standard serialization mechanism used in S4 at the moment, the Kryo serialization library, cannot serialize most of Clojure generated classes and default types, since it is unable to deserialize fields marked as final and because it requires argument-less constructors that clojure core types, like Keyword do not implement. In fact it makes even impossible to generate a serializable bean using the gen-class macro since this macro places the state into a final field.

Clj-s4 solves this issue introducing a new macro def-s4-message similar to deferecord that generates a mutable java bean for the provided field descriptions.

As an example, the following code declares a new event to store a number that will be part of a S4 stream. Clj-s4 messages can be built from a Clojure hash map and they can be transformed back into immutable maps using the msg-to-map
function.

(use 'clj-s4.core)

;; Defines a bean NumberTest with two fields, the first
;; one, :num, is declared to be a int field.
;; The second one, :foo, will be by default an Object
;; feld
(def-s4-message cljs4.NumberTest [[int :num] :foo])

(def *num* (cjls4.Number. {:num 34 :foo "bar"}))

(.getNum *num*)
;; returns 34

(.setNum *num* 15)
(.getNum *num*)
;; returns 15

(msg-to-map *num*)
;; returns {:num 15 :foo "bar"}

S4 has at the moment support for the Avro serialization framework, that relies in the description of data structures using a neutral JSON notation, but clj-s4 does not offer any utils to work with this library.

Adding new serialization mechanisms to S4 is also an easy task. It consist only in implementing a single interface SerializerDeserializer defining two simple methods to serialize and deserialize objects and changing a value in the
Spring configuration file for the S4 cluster. Adding support for the standard Java serialization mechanism of other schemes based on JSON, YAML or even Lisp s-expressions is trivial. The only problem with these serialization mechanisms is that their serialization format is less efficient and could harm the performance of S4 as a high performance stream processing platform.

Inserting events into S4: adapters

S4 streams of events ultimately generates in the outside world. They can be periodically pulled from a web service, pushed using a web socket or read from a log file. They may also have different formats: JSON objects, XML documents or plain text lines.

Adapters are S4 components located at the boundaries of the S4 system. They are named that way because they interact with the original source of data, transform their data format into a java bean objects and insert them into the S4 system as a new stream of events.

Adapters are started or stopped independently from the rest of the S4 cluster but they use the same underlying communication infrastructure and configuration to be able to send streams of events to S4 Processing Nodes (PNs) where these events will be processed.

Adapters can be implemented with clj-s4 in two different places: the implementation of their functionality as a clojure function, and the configuration of the adapter as a Spring bean.

The implementation of the adapter can be achieved using the def-s4-adapter macro.

The following sample code shows the implementation of an adapter that generates a stream of random numbers:

(in-ns 'randomnumbers.core)

(def-s4-adapter cljs4.RandomNumberAdapter [:stream]

   :init (fn [this args]
          (.start (Thread. this)))

   :run (fn [this]
          (loop [num 0]
            (generate-event this
                            (read-state this :stream)
                            (cljs4.Number. {:num num}))
            (Thread/sleep 3000)
            (recur (int (Math/floor (* (rand) 100)))))))

The def-s4-adapter macro receives as the two first arguments the name of the class where the implementation of the adapter will be stored and a vector with keys with the names for the fields of immutable state the adapter will have. In this case, the only state will be the :stream field.

The fields of state for the adapter can be read and manipulated using the functions read-state, write-state and alter-state.

The rest of the implementation is a map with two functions keyed :init and :run.
The :init function will be invoked when the adapter is instantiated by S4 and the :run function will be transformed into an implementation of the run method from the Runnable Java interface that this adapter will implement.

In order to insert a new event into a S4 stream that could be processed, the adapter can use the function generate-event.

generate-event receives as arguments the adapter, the name of the stream where the event will be inserted and the event.

In this example the value of the stream name is stored in the state of the adapter, but this value is never initialized in the code of the adapter.
This kind of values can be injected by Spring using the second component in the definition of the adapter, the wiring of the adapter as a Spring bean.

clj-s4 offers the wire-bean function to generate the required XML for Spring. It also offers the wire-adapters function to wrap the Spring bean definition as a new S4 adapter.

(in-ns 'randomnumbers.wiring)

(wire-adapters "RandomNumbersAdapter"

 (wire-bean
  {:id "randomNumbersGenerator"
   :class "cljs4.RandomNumberAdapter"
   :properties [{:name "stream" :value "RandomNumbers"}]}))

Additional properties can be added to the array of :properties for the bean. clj-s4 will use the :id property of any bean to store it. For adapters, it will use the provided first argument as the identifier of the adapter.

This ID can be used to generate the XML for any bean, adapter or application using the s4-wiring Leiningen task.

The result of the execution of this adapter is a new stream of events, named RandomNumbers that could be processed by S4 nodes.
The RandomNumbers stream will consist of cljs4.Nuber Java beans.

Processing S4 streams: Processing Elements

The computation implemented in a S4 application is executed by Processing Elements (PEs). PEs are the equivalent of actors in actors frameworks or Erlang processes.

PEs receive events from a certain stream or collection of streams, do some computation based on the received events, and optionally, output data in one or serveral output streams.

There are two main types of PEs in S4.
Keyless PEs, does not have an associated key to the stream events. As a consequence, they will receive any event in that stream regardless of their value.
Keyed PEs events on the other hand, have an associated key to the stream of events they process.
They will only receive events in the stream with the same value for the specified stream key.

PEs are also defined by an implementation and a Spring configuration. When a S4 processing node boots, it will instantiate a new PE object for each defined PE in the Spring configuration file. The PN will store this prototype instance associated with the configured stream and key for that PE.

When a new event is received at the PN from the S4 communication layer, the PN will inspect the table of stream and keys to PE prototoypes and execute one of the following options:

  • If the PE is keyless and no PE has been associated to the stream yet, it will clone the PE prototype and pass the event to the prototype regardless of the value of the event.
  • If the event is keyless and a PE was already cloned, the new event will also be passed to the same PE instance
  • If the event has an associated key, the PN will check if a PE was cloned for this key value and it is still available. If no PE was cloned, a new one will be cloned and the event will be passed to it
  • If the event was previously cloned and has not been discarded, it will be retrieved and the event will be passed to it to be processed.

This mechanism ensures that in a PN there will be a maximum of one PE for each keyless stream and as many PEs as different key values for keyed PEs.

Periodically, PEs will be requested by the PN to output their current state. The PE can use this mechanism to persist their state, output the result of a computation to a persistent media or any other design. The frequency the PN will invoke the output of a PE can be configured in the Spring wiring for the PE to depend on time, a certain number of invocations, etc.

PEs functionality is defined in clj-s4 using two main functions, associated to keys :process-event and :output. An additional function keyed as :init can also be implemented to provide additional
initialization for the PE. The PE can also specify the class that will be generated for its bytecode and an array of fields for its state:

(in-ns 'wordcount.core)

(def-s4-pe cljs4.Tokenizer [:tokenizer :posTagger :tokenizerModel :posTaggerModel :stream]
  :init (fn [this]
          (let [tokenizer-model (read-state this :tokenizerModel)
                pos-tagger-model (read-state this :posTaggerModel)
                tokenizer (make-tokenizer tokenizer-model)
                pos-tagger (make-pos-tagger pos-tagger-model)]
            (write-state this :tokenizer tokenizer)
            (write-state this :posTagger pos-tagger)))
  :process-event [[Object] (fn [this text]
                             (let [text (msg-to-map text)
                                   text-id (:textId text)
                                   content (:content text)
                                   tokenize (read-state this :tokenizer)
                                   pos-tag (read-state this :posTagger)
                                   products (pos-tag (tokenize  content))]
                               (doseq [[token pos] products]
                                 (dispatch-event this
                                                 (read-state this :stream)
                                                 (cljs4.Word. {:content token :pos pos :textId text-id})))))]
  :output (fn [this] :not-interested))

The preceding code defines a new PE that will be stored in the cjs4.Tokenizer class.
This PE will receive from the configuration file values with the location for the :tokenizerModel and :posTaggerModel properties. It will also receive from the Spring configuration the value for the output stream.
At initialization time it will create a new tokenizer and pos-tagger using the provided models and will store it the fields :tokenizerand :pos-tagger.

The :process-event function will be invoked whenever the PE receives a new event from some stream, according to its configuration. The result of its computation will be outputted to the configured output stream using the dispatch-event function. In the previous example, the PE does not output any value.

The wiring of a PE in clj-s4 can be accomplished using the wire-pe function:

(wire-pe {:id "tokenizer"
          :class "cljs4.Tokenizer"
          :keys  ["Sentences textId"]
          :properties
          [{:name "tokenizerModel" :value "/PATH/TO/models/en-token.bin"}
           {:name "posTaggerModel" :value "/PATH/TO/models/en-pos-maxent.bin"}
           {:name "dispatcher" :ref "wordsDispatcher"}
           {:name "stream" :value "Tokens"}]})

With this configuration, the PE is connected to the input Sentences stream and will output the pos-tagged tokens to the Tokens stream.
Using the wire-pe function the values for the keys :id, :class, :keys are mandatory.

Events distribution in a S4 cluster

When a S4 adapter generates a new event in a stream or a S4 PE outputs an event, the S4 infrastructure must decide to which PN in the S4 cluster the event is routed for its processing.

The component taking the event routing decisions is the dispatcher. When a component emits an event in a stream, the dispatcher retrieves a partitioner object configured for that stream. The partitioner configuration includes a hash key and a hash function for that key that, provided the value for the hash key in the event object, always output the same cluster processing
node. In this way, the election of the hash function, determines the load balancing in the cluster. Configuration for each PN in the cluster can be retrieved at run-time by other PNs using ZooKeeper, or from a local file in the 'red-button' not distributed mode of operation.

clj-s4 allows the configuration of partitioners and dispatchers using the wire-partitioner and wire-dispatcher functions.

The following code sample shows how a dispatcher, using a single partitioner for the streams RandomNumbers, OddNumbersand EvenNumbers, can be wired in the application Spring configuration.

The partitioner will use the num property of the events beans to dispatch the event to the right PN in the cluster.

(wire-partitioner {:id "mapPartitioner"
                    :stream-names ["RandomNumbers" "OddNumbers" "EvenNumbers"]
                    :hash-keys ["num"]})

 (wire-dispatcher {:id "numbersDispatcher"
                   :partitioners ["mapPartitioner"]})

Deployment and application setup

In order to run a distributed S4 application, the application classes and dependent jars must be bundled in an application directory inside the S4 image directory, also containing the configuration files for the application and for the adapters.

clj-s4 makes possible the creation of the S4 application bundle from some special annotations in the Leiningen project file:

(defproject randomnumbers "1.0.0-SNAPSHOT"
  :description "a demo S4 app built using clj-s4"
  :dependencies [[org.clojure/clojure "1.2.0"]
                 [org.clojure/clojure-contrib "1.2.0"]
                 [clj-s4 "0.2.1.1-SNAPSHOT"]]
  :dev-dependencies [[clj-s4 "0.2.1-SNAPSHOT"]]
  :aot [randomnumbers.core]

  ;; configuration of a S4 application
  :s4-app {:name "RandomNumbers"
           :namespace "randomnumbers.core"
           :configuration ["randomnumbers.wiring" "RandomNumbers"]
           :adapters ["randomnumbers.wiring" "RandomNumbersAdapter"]})

The previous code defines a S4 application called RandomNumbers and defines the application configuration and adapter wiring files.

Wiring information and the implementation of the functionality are stored in different namespaces: randomnumbers.wiring and randomnumbers.core.

Using this information the application can be generated using the lein s4-deploy Leiningen task.
If no argument is passed to the task, the application will be deployed into a subdirectory named as the value of the :name key in the project file, RandomNumbers in this case, inside a local s4 directory. If a path is passed as an argument the application will be deployed into that target path.

Conclusions

S4 is a young platform but it has the potential of offer a whole new range of possibilities when dealing with massive data applications. Stream or complex event processing frameworks have been restricted to certain application domains but S4 can bring this computational paradigm to a wider audience of web developers already dealing with big data applications, making possible to
develop new kind of features for these applications.

S4 design emphasizes re-usability. The use of spring makes easy to build very generic building blocks that can be later be configure for an specific use case. If S4 continues to mature and collections of library of re-usable PEs and adapters become available, S4 will become a very interesting development framework for distributed applications.

Exposing databases as linked data using clojure, compojure and R2RML

The web is increasingly becoming a web of data.

Web APIs are ubiquitous nowadays. Transferring application data between web servers and complex javascript applications being executed inside web browsers, mobile phones, tablets etc is one of the main problems engineers face in modern web development.

The current state of affairs in the building of web APIs is far from being ideal. Web applications read data from a mixture of relational and not relational data sources, and transform it into some kind transport format, usually javascript.
These data are then retrieved by web clients, using some kind of more less RESTful API to be processed in the client application. Some cutting edge frameworks for building javascript applications, like Sproutcore, offer support for a datastore in the web client where JSON encoded data can be saved and manipulated using their own query language.

Developers not relying in such frameworks, must make up their own schemas to process the retrieved API data.

This approach to web API design have many drawbacks. For example, is hard to mix JSON objects from different applications describing the same kind of information. There is not a standard way to link resources inside JSON objects. It is also difficult to write reusable generic libraries to handle this kind of data or even writing framework code to handle the discovery, query and manipulation of web data APIs, as the above mentioned example of Sproutcore shows.

Some of these problems could be solved using some of the technologies developed as part of the semantic web initiative in the past decade. People have started referring to this pragmatic approach to the semantic web with a new title: Linked Data. The pragmatic approach here means putting less emphasis in the inference and ontological layers of the semantic web and just focusing in offer a simple way to expose data in the web linking resources across different web application and data sources.

Many interesting technologies are being developed under the linked data monicker or are commonly associated to it, RDFa for instance. Another of these technologies is R2RML: RDB to RDF Mapping Language.

R2RML describes a standard vocabulary to lift relational data to a RDF graph. It also provides a standard mapping for the relational data. This RDF graph can be serialized to some transport format: RDFa, Turtle, XML and then retrieved by the client. The client can store the triples in the graph locally and use the standard query language SPARQL to retrieve the information. Data from different applications using the same vocabulary (FOAF, GoodRelations) can be easily mixed and manipulated by the client in the same triple store. Furthermore, links to other resources can be inserted inside the RDF graph leading to the discovery of additional information.


clj-r2rml is a small library that implements R2RML in the Clojure programming language that can be used inside web applications.

Let’s see an example of how R2RML could be used in a web application. Imagine we have the following database table:

+--------+--------------+-------------+
| deptno | dname        | loc            |
+--------+--------------+-------------+
|       10 | APPSERVER | NEW YORK  |
|       11 | APPSERVER | BOSTON     |
+--------+--------------+-------------+

We can describe a mapping for this table with the following RDF graph, expressed with the turtle syntax:


    a rr:TriplesMap;
    rr:logicalTable "
       Select ('_:Department' || deptno) AS deptid
            , deptno
            , dname
            , loc
         from dept
       ";
    rr:class xyz:dept;
    rr:tableGraphIRI xyz:DeptGraph;
    rr:subjectMap [ a rr:BlankNodeMap; 
                          rr:column "deptid" ];
    rr:propertyObjectMap [ rr:property dept:deptno; rr:column "deptno"; rr:datatype xsd:positiveInteger ];
    rr:propertyObjectMap [ rr:property dept:name; rr:column "dname" ];
    rr:propertyObjectMap [ rr:property dept:location; rr:column "loc" ];
    rr:propertyObjectMap [ rr:property dept:COMPANY; rr:constantValue "XYZ Corporation" ];
.

The mapping takes a logical table, produced by the SELECT SQL query, and generates triples for the columns and values of the table according to the rules in the R2RML specification.

Using clj-r2rml, this mapping can be expressed as a clojure map:

{:logical-table   "select concat('_:Department',deptno) AS deptid, deptno, dname, loc from Dept"
       :class           "xyz:dept"
       :table-graph-iri "xyz:DeptGraph"
       :subject-map     {:column "deptid"}
       :property-object-map [{:property "dept:deptno"
                              :column   "deptno"
                              :datatype "xsd:positiveInteger"}
                             {:property "dept:name"
                              :column   "dname"}
                             {:property "dept:location"
                              :column   "loc"}
                             {:property       "dept:COMPANY"
                              :constant-value "XYZ Corporation"}

The mapping data will be used by clj-r2rml to generate the correct RDF graph. This graph can be later exposed behind a RESTful interface using Compojure:

(ns clj-r2rml-test.core
  (:use compojure.core)
  (:use clj-r2rml.core)
  (:require [compojure.route :as route]))


(def *db-spec*
     {:classname   "com.mysql.jdbc.Driver"
      :subprotocol "mysql"
      :user        "root"
      :password    ""
      :subname     "//localhost:3306/rdftests"})

(def *context* (make-context *db-spec* {}))

(defn- build-query
  ([id]
     (if (nil? id)
       "select concat('_:Department',deptno) AS deptid, deptno, dname, loc from Dept"
       (str "select concat('_:Department',deptno) AS deptid, deptno, dname, loc from Dept "
            "where deptno='" id "'"))))

(def *additional-ns*
     {"dept" "http://test-clj-r2rml#"
      "xyz"  "http://test-clj-r2rml-xyz#"})

(defn- mapping
  ([] (mapping nil))
  ([id]
     [{:logical-table   (build-query id)
       :class           "xyz:dept"
       :table-graph-iri "xyz:DeptGraph"
       :subject-map     {:column "deptid"}
       :property-object-map [{:property "dept:deptno"
                              :column   "deptno"
                              :datatype "xsd:positiveInteger"}
                             {:property "dept:name"
                              :column   "dname"}
                             {:property "dept:location"
                              :column   "loc"}
                             {:property       "dept:COMPANY"
                              :constant-value "XYZ Corporation"}]}]))

(defroutes example
  (GET "/departments/:id" [id]
       (let [triples (:results (run-mapping (mapping id) *context* *additional-ns*))]
         {:status 200
          :headers {"Content-Type" "text/turtle"}
          :body (to-rdf *additional-ns* triples)}))

  (GET "/departments" []
       (let [triples (:results (run-mapping (mapping) *context* *additional-ns*))]
         {:status 200
          :headers {"Content-Type" "text/turtle"}
          :body (to-rdf *additional-ns* triples)}))

  (route/not-found "Page not found"))

We can run this web application directly from the REPL:

REPL started; server listening on localhost:2178.
user=> (use 'clj-r2rml-test.core)      
nil
user=> (use 'ring.adapter.jetty)       
nil
user=> (run-jetty example {:port 8080})
2010-12-09 21:10:13.060::INFO:  Logging to STDERR via org.mortbay.log.StdErrLog
2010-12-09 21:10:13.061::INFO:  jetty-6.1.14
2010-12-09 21:10:13.078::INFO:  Started SocketConnector@0.0.0.0:8080

And retrieve the RDF graph for all the departments:

$ curl -X GET http://localhost:8080/departments
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> . 
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . 
@prefix rdf:  <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . 
@prefix dept: <http://test-clj-r2rml#> . 
@prefix xyz:  <http://test-clj-r2rml-xyz#> . 
_:Department10 dept:deptno "10"^^xsd:positiveInteger .
_:Department10 dept:name "APPSERVER" .
_:Department10 dept:location "NEW YORK" .
_:Department10 dept:COMPANY "XYZ Corporation" .
_:Department11 dept:deptno "11"^^xsd:positiveInteger .
_:Department11 dept:name "APPSERVER" .
_:Department11 dept:location "BOSTON" .
_:Department11 dept:COMPANY "XYZ Corporation" .
_:Department10 rdf:type xyz:dept .
_:Department11 rdf:type xyz:dept .

Or for a single department:

$ curl -X GET http://localhost:8080/departments/10
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> . 
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . 
@prefix rdf:  <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . 
@prefix dept: <http://test-clj-r2rml#> . 
@prefix xyz:  <http://test-clj-r2rml-xyz#> . 
_:Department10 dept:deptno "10"^^xsd:positiveInteger .
_:Department10 dept:name "APPSERVER" .
_:Department10 dept:location "NEW YORK" .
_:Department10 dept:COMPANY "XYZ Corporation" .
_:Department10 rdf:type xyz:dept .

clj-r2rml is just a tentative implementation of the recommendation, with the only goal of studying the standard but if you take a look at the list of planned implementations fo R2RMLit will not take long until we see robust implementations of R2RML (the presence of Alex Miller in the list will hopefully mean a production ready clojure/java implementation in the future).

R2RML altogether with other projects like JSON-LD, the building of new libraries to work effectively with semantic technologies in the browser and mobile devices and the interest in linked data will hopefully mean a new push for the semantic web and, maybe, better ways of building web APIs.

hiccup-rdfa: semantic markup for Clojure web apps

RDFa is one of the best specifications ever recommended by the W3C. It offers a pragmatic approach to an incredible powerful but intimidating technology like RDF, and it makes it allowing people to build on their experience with previous technologies like microformats.

Last week, I have built hiccup-rdfaa very small Clojure library that makes easy to use RDFa in hiccup templates rendered by a Ring Clojure application, for example, a Compojure web app.

Let’s start with a very minimalistic Compojure application:

(ns hiccuprdfatest.core
  (:use [compojure core]
        [compojure response]
        [ring.adapter jetty]
        [hiccup.core]
        [hiccup.page-helpers]))

(def *geeks*
     [{:name "Pablo"  :nick "pablete"
       :tweet-home "http://twitter.com/pablete"}
      {:name "Mauro"  :nick "malditogeek"
       :tweet-home "http://twitter.com/malditogeek"}
      {:name "Javier" :nick "jneira"
       :tweet-home "http://twitter.com/jneira"}
      {:name "Joel"   :nick "joeerl"
       :tweet-home "http://twitter.com/joeerl"}])

(defn geeks-template
  ([geeks]
     (xhtml-tag :en
       [:head
        [:title "rdfa test"]]
       [:body
        [:h1 "Some geeks"]
        [:div {:id "geeks"}
         (map (fn [geek]
                [:div {:class "geek" :id (:nick geek)}
                 [:p {:class "name"} (:name geek)]
                 [:p {:class "nick"} (:nick geek)]
                 [:a {:class "tweet-home"
                      :href (:tweet-home geek)}
                  (:tweet-home geek)]])
                    geeks)]])))

(defroutes rdfa-test
  (GET "/geeks" request
       (html (geeks-template *geeks*))))

(run-jetty (var rdfa-test) {:port 8081})

This application just exposes a URL showing a listing of people with some links. It uses hiccup to build the markup for the application. The markup used try to be an example of semantic markup. It exposes information about the semantics of the data rather than the visual rendering of the page.

Can we provide a better semantic markup for this trivial sample web app? We could think about adding some kind of microformat, but instead of that, I will show how to use RDFa to accomplish this task. Furthermore, using RDFa will open some interesting possibilities for our sample web application.

The first thing to do is to look for a vocabulary suitable for the description of the semantics of our data. A very good candidate is Friend of a Friend (FOAF) that includes terms for describing people and their relationships. After choosing the vocabulary, we will have to embed this vocabulary in the HTML of our application.

The following example shows how we can modify the previous web application to embed the FOAF semantics in the markup:

(use 'hiccup-rdfa.core)
(use 'hiccup-rdfa.vocabularies)

(register-vocabulary foaf)

(defn rdfa-geeks-template
  ([geeks]
     (xhtml-rdfa-tag :en
       [:head
        [:title "rdfa test"]]
       [:body
         [:h1 "Some geeks"]
         [:div {:id "geeks"}
          (map (fn [geek]
                 (foaf-Person (:tweet-home geek)
                    {:id (:nick geek)
                     :class "geek"
                     :tag :div}
                    (foaf-name (:name geek)
                               {:class "name" :tag :p})
                    (foaf-nick (:nick geek)
                               {:class "nick" :tag :p})
                    (link-foaf-homepage-to 
                                           (:tweet-home geek)
                                           {:class "tweet-home"
                                            :tag :a}
                                           (:tweet-home geek))))
             geeks)]])))

(defroutes rdfa-test
  (GET "/geeks" request
       (html (rdfa-geeks-template *geeks*))))

The rdfa-geeks-template is a variation of the geeks-template function using RDFa. Hiccup-rdfa needs first a description of a vocabulary, it includes already a FOAF vocabulary, so we can use it directly calling to the register-vocabulary function.

After calling to register-vocabulary, all the classes and properties (RDFS and OWL) are transformed into a collection of functions that can be used inside the hiccup template. In this example, as a side effect of the call to register-vocabulary with the FOAF vocabulary description, the functions foaf-Person, foaf-name, foaf-nick and link-foaf-homepage-to used in the template have been generated. HTML attributes can be passed as a parameter to the generated functions as a map. These maps can also include a :tag value with the kind of tag that will be used instead of span that is used by default. Functions link-[property]-to are also generated that can be used to generate links (by default) or anchors to the URL of the resource linked by a RDF property.

The expected output of this template can be seen in a browser:

but we can see how the semantics of the information contained in the page is explicitly stated. Using RDFa is an excellent way of achieving semantic markup.

The use of RDFa has another important implication. Thanks to RDFa a web page can be automatically parsed as data by a HTTP agent supporting RDF, turning a HTML web page into a real “web service”.

We could use a service like the W3C RDFa distiller to feed the source code of the page and extracts the RDF triples embedded into it:

@prefix foaf:  .
@prefix rdf:  .
@prefix rdfs:  .
@prefix xhv:  .
@prefix xml:  .
@prefix xsd:  .


 a foaf:Person ;
     foaf:homepage "http://twitter.com/jneira"@en ;
     foaf:name "Javier"@en ;
     foaf:nick "jneira"@en . 

 a foaf:Person ;
     foaf:homepage "http://twitter.com/joeerl"@en ;
     foaf:name "Joel"@en ;
     foaf:nick "joeerl"@en . 

 a foaf:Person ;
     foaf:homepage "http://twitter.com/malditogeek"@en ;
     foaf:name "Mauro"@en ;
     foaf:nick "malditogeek"@en . 

 a foaf:Person ;
     foaf:homepage "http://twitter.com/pablete"@en ;
     foaf:name "Pablo"@en ;
     foaf:nick "pablete"@en . 

In the same way, we could use a RDF library, like Plaza for Clojure, to retrieve the data that is contained into the HTML page:

plaza.core=> (use 'plaza.rdf.core)                                                
nil

plaza.core=> (use 'plaza.rdf.implementations.jena)                                
nil

plaza.core=> (init-jena-framework)                                
nil

plaza.core=> (def *geeks* (document-to-model 
                         "http://localhost:8081/geeks" 
                         :html))
#'plaza.core/*geeks*

plaza.core=> (alter-root-rdf-ns "http://xmlns.com/foaf/0.1/")
"http://xmlns.com/foaf/0.1/"

plaza.core=> (use 'plaza.rdf.sparql)
nil

plaza.core=>(map (fn [[s p o]] 
                          {:uri (str s) 
                           :name (literal-value o)}) 
                       (flatten-1 (model-pattern-apply 
                                         *geeks* 
                                         [[?s :name ?o]])))
({:uri "http://twitter.com/joeerl", :name "Joel"} 
 {:uri "http://twitter.com/jneira", :name "Javier"} 
 {:uri "http://twitter.com/malditogeek", :name "Mauro"} 
 {:uri "http://twitter.com/pablete", :name "Pablo"})

If you find a vocabulary you want to use for the semantic description of your data, you can use it in hiccup-rdfa just importing the vocabulary from the URL with the OWL or RDFS description of the terms. For example, this function call will import the SIOC vocabulary:

(make-vocabulary-from-url "http://rdfs.org/sioc/ns" 
                                        :xml 
                                        "http://rdfs.org/sioc/ns#" 
                                        "sioc")

You can add hiccup-rdfa to your Leiningen project, with the following clojars reference:

[hiccup-rdfa "1.0.0-SNAPSHOT"]

Generic servers and supervisors in Jobim Clojure’s actors library

The concept of a “supervisor” or “generic-server” are immediately familiar to anyone who has worked in an Erlang application. The main idea behind these concepts is to factorize common patterns in distributed Erlang applications in higher level abstractions so they can be easily reused. These abstractions are called behaviours and the Erlang OTP includes a really useful bunch of them.

A “generic server” is just an abstraction of a regular Erlang process with some scaffolding to maintain the state, handling synchronous and asynchronous messages as well as to handle the ordered termination of the process in the case of error.

A “supervisor” is a more interesting concept. It is a process with three main goals:

  • Starting other processes
  • Monitor their correct execution
  • Trying to recover from children failures

Designers can choose different ways of recovering from children errors. These recovery options are called recovery policies and can have different values: one for all (if a child process fails, the remaining processes are terminated and then all restarted), one for one (only the failing process is restarted), etc. A maximum number of restarts in a certain period of time can also be specified for the supervisor. If this limit is reached, the supervisor will also fail instead of getting trapped in a loop of failures-restarts.

A supervisor can have another supervisor as their children. This is a main idea in Erlang applications, that are usually designed as a tree with different levels of supervisors. If a children in a level fails, the supervisor at that level tries to recover from that failure. If it is impossible, the supervisor will fail and the superior level supervisor will try to recover from the error. If the top level supervisor fails, the application will crash in an ordered manner. This fail early and let some other component of the system recover from the error is a common design principle in Erlang systems.

Jobim is being modeled to a great extent after the Erlang OTP platform. It makes sense to add support for components like generic servers and supervisor to Jobim, since the same issues present in Erlang applications will also be important for a Jobim application.

Support for generic servers can be found in the jobim.behaviours.server namespace. Generic servers in Jobim must implement the jobim.behaviours.server.Server protocol, defined as:

;; Server protocol

(defprotocol Server
  (init [this initial-value] 
      "Invoked when the server starts execution")
  (handle-call [this request from state] 
       "Invoked when a new synchronous request 
        arrives at the server")
  (handle-cast [this request state] 
        "Invoked when a new asynchronous request 
         arrives at the server")
  (handle-info [this request state] 
        "Handles messages distinct to events")
  (terminate [this state] 
        "Clean up code invoked when the server is 
         going to be terminated externally"))

The main functions are handle-call and handle-cast which are invoked by the server process when the server receives a synchronous and asynchronous message respectively. The following snippet shows a sample implementation of a generic server used in the Jobim included example for a supervisor:

(ns jobim.examples.supervisor
  (:use [jobim])
  (:require [jobim.behaviours.server :as gen-server]
            [jobim.behaviours.supervisor :as supervisor]))

;; Test server

(gen-server/def-server ExceptionTestServer

  (init [this _] [])

  (handle-call [this request from state]
               (println (str "RECEIVED " request " PID " (self) " ALIVE"))
               (gen-server/reply :alive []))

  (handle-cast [this request state] (if (= request :exception)
                                      (throw (Exception. (str "I DIED! " (self))))
                                      (gen-server/noreply state)))

  (handle-info [this request state] (gen-server/noreply state))

  (terminate [this state] (println (str "CALLING TO TERMINATE " (self)))))

The test server just replies with :alive if it receives as synchronous message and throws an exception if it receives an asynchronous message with value :exception. The functions reply and noreply are used to send a potential reply for the message and pass the new state of the server. The implementation of the terminate function shows that the server just prints a message when it is going to be terminated.

Once a generic server is implemented, it is a good practice to wrap the interaction with the server with some wrapper functions defining a public interface for the process. In the same namespace some of these functions are defined:

;; Public interface

(defn make-test-client
  ([n] (gen-server/start (str "test-client-" n)  (ExceptionTestServer.) [])))

(defn ping-client
  ([n] (gen-server/send-call! (resolve-name (str "test-client-" n)) :ping)))

(defn trigger-exception
  ([n] (gen-server/send-cast! (resolve-name (str "test-client-" n)) :exception)))

The function make-test-client starts a new test server and register its name with a prefixed index. The functions ping-client and trigger-exception send messages to check that the server is alive and provoke a remote exception.

We can test the defined server from the REPL starting a new jobim node and calling these functions:

user> (use 'jobim)
nil

user> (use 'jobim.examples.supervisor)
nil

user> (bootstrap-node "test-node.clj")

 ** Jobim node started ** 


 - node-name: remote-test
 - id: 82534b8d74ce434d9164a59cf361c7df
 - messaging-type :rabbitmq
 - messaging-args {:host "localhost"}
 - zookeeper-args ["localhost:2181" {:timeout 3000}]


:ok

user> (spawn-in-repl)
"82534b8d74ce434d9164a59cf361c7df.1"

user> (make-test-client 0)
"82534b8d74ce434d9164a59cf361c7df.2"

user> (registered-names)
{"test-client-0" "82534b8d74ce434d9164a59cf361c7df.2"}

user> (ping-client 0)
:alive

user> (trigger-exception 0)
:ok

user> (registered-names)
{}

The previous transcript from the REPL starts a single Jobim node that uses RabbitMQ as the transport mechanism, spawns a Jobim process associated with the REPL thread and starts a test server with index 0. Then different messages are sent to the process with the send-call! and send-cast! functions. It can also be seen how the server ends execution after the exception asynchronous message is sent and no longer appears as a registered process.

A look at the transcript log shows some more information about the interaction with the server:

RECEIVED :hi PID 82534b8d74ce434d9164a59cf361c7df.2 ALIVE
ERROR jobim.core - *** process 82534b8d74ce434d9164a59cf361c7df.2 
died with message : I DIED! 82534b8d74ce434d9164a59cf361c7df.2 ]
...

If we would like to be notified when the test client launches an exception, we could link the actor associated to the REPL thread to the newly created server. For example:

user> (def *client-pid* (make-test-client 0))
#'user/*client-pid*
user> (link *client-pid*)
{"82534b8d74ce434d9164a59cf361c7df.1" ["82534b8d74ce434d9164a59cf361c7df.4"], 
 "82534b8d74ce434d9164a59cf361c7df.4" ["82534b8d74ce434d9164a59cf361c7df.1"]}

user> (trigger-exception 0)
:ok

user> (receive)
{:signal :link-broken, 
 :from "82534b8d74ce434d9164a59cf361c7df.4", 
 :cause "class java.lang.Exception:I DIED! 82534b8d74ce434d9164a59cf361c7df.4"}

This ability to create a bidirectional link between actors is used to define a supervisor actor. The supervisor actor can be created with the jobim.behaviours.supervisor/start function. This function receives a supervisor specification with the restart strategy, the maximum number of restarts, the period to check the maximum number of restarts and a list of child processes specifications. The Jobim’s supervisor example defines the following function to create a new supervisor:

(defn start-supervisor
  ([] (start-supervisor :one-for-one))
  ([restart-strategy]
     (supervisor/start
      (supervisor/supervisor-specification
       restart-strategy                 ; restart strategy
       1                                ; one restart max
       20000                            ; each 5 secs
       ; Children specifications
       [(supervisor/child-specification
         "test-client-1"
         "jobim.examples.supervisor/make-test-client"
         [1])
        (supervisor/child-specification
         "test-client-2"
         "jobim.examples.supervisor/make-test-client"
         [2])
        (supervisor/child-specification
         "test-client-3"
         "jobim.examples.supervisor/make-test-client"
         [3])]))))

The function creates three different children with indexes 0,1 and 3 and receives the restart strategy as a parameter. The possible restart strategies are :one-for-one, :one-for-all and :rest-for-one. We will review all of them.

one-for-one

The simplest restart strategy, if a child process dies, only that process is restarted.

one-for-one strategy from Erlang's official documentation

The following transcript shows how this strategy works:

user> (use 'jobim)
nil

user> (use 'jobim.examples.supervisor)
nil

user> (bootstrap-node "test-node.clj")

 ** Jobim node started ** 


 - node-name: remote-test
 - id: ca6377017189452a90814d95edcac79b
 - messaging-type :rabbitmq
 - messaging-args {:host "localhost"}
 - zookeeper-args ["localhost:2181" {:timeout 3000}]


:ok

user> (spawn-in-repl)
"ca6377017189452a90814d95edcac79b.1"

user> (start-supervisor :one-for-one)
"ca6377017189452a90814d95edcac79b.2"

user> (registered-names)
{"test-client-3" "ca6377017189452a90814d95edcac79b.5", 
 "test-client-1" "ca6377017189452a90814d95edcac79b.3", 
 "test-client-2" "ca6377017189452a90814d95edcac79b.4"}

user> (trigger-exception 2)
:ok

user> (registered-names)
{"test-client-3" "ca6377017189452a90814d95edcac79b.5", 
 "test-client-1" "ca6377017189452a90814d95edcac79b.3", 
 "test-client-2" "ca6377017189452a90814d95edcac79b.6"}

We can see how the PID for the actor identified with test-client-2 has changed once the supervisor restarted it. The other processes are not affected at all.

one-for-all

In this restart strategy all the remaining actors are terminated and then all the processes are restarted again.

one-for-all restart strategy from Erlang's official documentation

The following transcript shows an example of how this strategy works:

user> (use 'jobim)
nil

user> (use 'jobim.examples.supervisor)
nil

user> (bootstrap-node "test-node.clj")

 ** Jobim node started ** 


 - node-name: remote-test
 - id: a1c47e33320149938dbde4564b4a4199
 - messaging-type :rabbitmq
 - messaging-args {:host "localhost"}
 - zookeeper-args ["localhost:2181" {:timeout 3000}]


:ok

user> (spawn-in-repl)
"a1c47e33320149938dbde4564b4a4199.1"

user> (start-supervisor :one-for-all)
"a1c47e33320149938dbde4564b4a4199.2"

user> (registered-names)
{"test-client-3" "a1c47e33320149938dbde4564b4a4199.5", 
 "test-client-1" "a1c47e33320149938dbde4564b4a4199.3", 
 "test-client-2" "a1c47e33320149938dbde4564b4a4199.4"}

user> (trigger-exception 2)
:ok

user> (registered-names)
{"test-client-3" "a1c47e33320149938dbde4564b4a4199.8", 
 "test-client-1" "a1c47e33320149938dbde4564b4a4199.6", 
 "test-client-2" "a1c47e33320149938dbde4564b4a4199.7"}

We can see how all the registered actor’s PID have been restarted by the supervisor once the exception is triggered in one of them. If we check the transcript log, we can see how the terminate function for the restarted actors have been invoked:

ERROR jobim.core - *** process a1c47e33320149938dbde4564b4a4199.4 
died with message : I DIED! a1c47e33320149938dbde4564b4a4199.4 [...]
CALLING TO TERMINATE a1c47e33320149938dbde4564b4a4199.5
CALLING TO TERMINATE a1c47e33320149938dbde4564b4a4199.3

rest-for-one

The last strategy is :rest-for-one when only the actors defined after the failing actor in the supervisor specification are terminated and then, together with the failing one are restarted.

A supervisor can fail if too many children actors fail in the period of time specified in the supervisor definition. In the start-supervisor function the limits are set to one restarted process each 20 seconds. We can force the failure in the supervisor restarting more than one process in that time period. When the supervisor is going to terminate, it terminates all the children actors.

user> (use 'jobim)
nil

user> (use 'jobim.examples.supervisor)
nil

user> (bootstrap-node "test-node.clj")

 ** Jobim node started ** 


 - node-name: remote-test
 - id: 39df41a4d4224e80818e7f33061a5ca2
 - messaging-type :rabbitmq
 - messaging-args {:host "localhost"}
 - zookeeper-args ["localhost:2181" {:timeout 3000}]


:ok

user> (spawn-in-repl)
"39df41a4d4224e80818e7f33061a5ca2.1"

user> (start-supervisor :one-for-one)
"39df41a4d4224e80818e7f33061a5ca2.2"

user> (registered-names)
{"test-client-3" "39df41a4d4224e80818e7f33061a5ca2.5", 
 "test-client-1" "39df41a4d4224e80818e7f33061a5ca2.3", 
 "test-client-2" "39df41a4d4224e80818e7f33061a5ca2.4"}

user> (trigger-exception 2)
:ok

user> (registered-names)
{"test-client-3" "39df41a4d4224e80818e7f33061a5ca2.5", 
 "test-client-1" "39df41a4d4224e80818e7f33061a5ca2.3", 
 "test-client-2" "39df41a4d4224e80818e7f33061a5ca2.6"}

user> (trigger-exception 2)
:ok

user> (registered-names)
{}

If we check the transcript log, we can see how a exception is thrown by the supervisor after terminating all the children processes:

ERROR jobim.core - *** process 39df41a4d4224e80818e7f33061a5ca2.4 
died with message : I DIED! 39df41a4d4224e80818e7f33061a5ca2.4 [...]
ERROR jobim.core - *** process 39df41a4d4224e80818e7f33061a5ca2.6 
died with message : I DIED! 39df41a4d4224e80818e7f33061a5ca2.6 [...]
MAX RESTARTS REACHED
CALLING TO TERMINATE 39df41a4d4224e80818e7f33061a5ca2.7
CALLING TO TERMINATE 39df41a4d4224e80818e7f33061a5ca2.5
CALLING TO TERMINATE 39df41a4d4224e80818e7f33061a5ca2.3
ERROR jobim.core - *** process 39df41a4d4224e80818e7f33061a5ca2.2 
died with message : Max numer of restarts reached in supervisor: 
39df41a4d4224e80818e7f33061a5ca2.2 [...]

Conclusions

Supervisors, generic servers and other components implemented in Jobim (FSM, event managers, generic TCP servers) are important pieces for building distributed applications using actors.

The current implementation is still immature, but already shows how the interface of the basic building blocks for Erlang applications can be implemented in Clojure. Nevertheless, the underlying JVM where Clojure’s code is usually executed poses important difficulties to translate the behaviour of the Erlang VM. Actors must collaborate with the supervisor to terminate its execution since it is impossible to force the termination of a thread. This problem is present, for instance, with blocking operations.
Nevertheless the already implementing blocks are enough to start building test applications and makes possible define new abstractions on top of them at the level of the application being executed in a set of nodes.

The small-case semantic web puzzle

The Semantic Web is already here, but is somehow different from the original vision that was already present in Tim Berners-Lee Design Issues of Web Architecture.

It is a humbler, fragmented but also more pragmatic semantic web. These are some of the pieces conforming this “small-case” semantic web, as I am trying to put them together in a chapter for a future book about RESTful design:

A collection of loosely coupled sometimes redundant technologies and specifications built on top of the HTTP protocol that grant access to a increasingly web of linked data:

 

The Linking Open Data cloud diagram by Richard Cyganiak (CC)

 

Finite State Machines in Clojure with Jobim

When you are designing a distributed system, as in any other software system, you soon start noticing the same patterns happening again and again. Any software tool whose goal is to make easy the development of distributed systems should be capable of offering mechanisms of abstraction so this patterns can be factored in reusable components that can be reused.

One of these common patterns is the finite-state
machine
. FSMs can be easily implemented using reactive actors, that store a
certain internal state that can be mutated as they receive incoming messages
following the rules described in a set of transition functions.

Erlang’s OTP platform offers an easy way of writing processes acting as FSMs
with the gen_fsm behaviour.

A similar mechanism that makes easy the creation of actors implementing FSMs has been implemented in the Jobim Clojure library and can be found in the jobim.behaviours.fsm namespace.

the FSM protocol

The abstraction of a FSM in jobim is defined in the jobim.behaviours.fsm.FSM protocol. To define a new FSM actor you will need to implement that protocol.

(defprotocol FSM
  (init [this initial-message]
    "Returns the initial state of the FSM")
  (next-transition [this current-state state-data  message]
    "Defines which transtion will be applied provided the
     current state and the incoming message")
  (handle-info [this current-state state-data message]
    "Handles messages distinct to events")
  (terminate [this state-name state-data]
    "Clean up code when the FSM is going to be terminated
     externally"))

The main function in the protocol are init and the next-transition functions.
The better way to understand this protocol is taking a look at a concrete example.

The lock automaton

The FSM we are going to describe is a simple lock mechanism. It will be initialized with a secret combination of numbers and the locked state.
Other agents can interact with the lock pushing numbers. The automaton will track the sequence of number and will switch its state to the open state if the correct sequence of numbers is introduced.

Each time a new number is pushed, the automaton will check if the sequence so far is a partial match of the whole combination. If that’s the case, it will maintain in its inner state the sequence so far. In other case, it will reset the sequence of pushed numbers.

The following sequence diagram gives an informal description of such a state machine.

The implementation of the automaton in Jobim requires the implementation of the FSM protocol.The jobim.behaviours.fsm/def-fsm can be used to ease the definition of the FSM.

(def-fsm Lock
  (init [this code] [:locked {:so-far [] :code code}])
  (next-transition [this state-name state-data message]
                   (let [[topic _] message]
                     (condp = [state-name topic]
                       [:locked :button] handle-button
                       [:open   :lock]  handle-lock
                       action-ignore)))
  (handle-info [this current-state current-data message]
               (do
                 (cond-match
                  [[?from :state] message] (send! from current-state))
                 (action-next-state current-state current-data)))
  (termiante [this current_state] :ignore))

The init function is invoked whenever the actor starts execution. It receives an initial message and must returns the initial state name and the initial state data for the FSM.

In the case of the lock FSM, it receives the initial code combination and returns the state :locked and the initial state {so-far [] :code code}.

The function next-transition describes the transition rules for the FSM.
It receives the current state name, the current state data and the received message and must return the function that will implement the next transition for the FSM. In this case we just look at the current state of the automaton and the topic of the incoming message:

 [:locked :button] handle-button
 [:open   :lock]  handle-lock
 action-ignore

If the current state is locked and a button is pushed, the handle-button function will be invoked. If the current state is open and a :lock message is received the handle-lock function will be invoked.
In any other case, the jobim.behaviours.fsm/action-ignore will be invoked that just returns the current state and state data as the next state and state data of the FSM effectively discarding the message.

The transitions of the FSM are encoded in the handle-button and handle-lock functions. State transition functions receive as arguments the current state name and data as well as the incoming message and must return the next state name and data. Sample implementations for the lock FSM can be seen in the following piece of code:

(defn- partial-match?
  ([combination so-far]
     (if (empty? so-far) true
         (if (= (first combination) (first so-far))
           (recur (rest combination) (rest so-far))
           false))))

(defn- handle-button
  ([current-state current-data message]
     (let [[_ code] message
           so-far (conj (:so-far current-data) code)]
       (if (= (:code current-data) so-far)
         (action-next-state :open (assoc current-data :so-far so-far))
         (if (partial-match? (:code current-data) so-far)
           (action-next-state :locked (assoc current-data :so-far so-far))
           (action-next-state :locked (assoc current-data :so-far [])))))))

(defn- handle-lock
  ([current-state current-data message]
     (action-next-state :locked (assoc current-data :so-far []))))

Utility function jobim.behaviours.fsm/action-next-state can be used to build the vector with name and data for the next state.

Sometimes the actor implementing the FSM must deal with messages not belonging to any transition. For instance, in the lock FSM it would be nice to be able to send a message to the FSM and retrieve the current state of the automaton, but
this message is not a regular event that can trigger a stat change in the automaton. The handle-info function can be implemented for the FSM actor to handle any kind of messages as any other actor.

Once the FSM protocol is implemented a new actor for the definition can be started using the jobim.behaviours.fsm/start and jobim.behaviours.fsm/start-evented functions. The first one will start a dedicated thread actor and the second one an evented actor.

The start function can receive a new object with the type of the defined FSM protocol or a string with the qualified package of the protocol implementation. This latter variant is useful to start the FSM actor in a remote node.

The start function can also receive a string as the first argument and will register the FSM actor globally with that name, so it can be retrieved from any node.

To send an event to the FSM actor, the jobim.behaviours.fsm/send-evented! function can be used instead of the regular jobim/send! function. This latter function can be used to send messages that will be handle by the handle-info function of the FSM protocol.

To make the use of the automaton easy for any client, some public interface for the FSM can be made up wrapping jobim.behaviours.fsm function calls:

;; public interface

(defn make-lock
  ([combination]
     (start (jobim.examples.fsm.Lock.) combination)))

(defn make-lock-evented
  ([combination]
     (start-evented (jobim.examples.fsm.Lock.) combination)))

(defn push-button
  ([fsm number]
     (send-event! fsm [:button number])))

(defn lock
  ([fsm]
     (send-event! fsm [:lock :ignore])))

(defn state
  ([fsm]
     (send! fsm [(self) :state])
     (receive)))

A client can use this interface to interact to with the FSM without even knowing the underlying implementation:

user> (use 'jobim)
nil
user> (bootstrap-node "node-config.clj")

 ** Jobim node started **


 - node-name: osx
 - id: 6bdcd512ab1a4df79d060692d008fe72
 - messaging-type :rabbitmq
 - messaging-args {:host "172.21.1.237"}
 - zookeeper-args ["172.21.1.237:2181" {:timeout 3000}]


:ok
user> (spawn-in-repl)
"6bdcd512ab1a4df79d060692d008fe72.1"
user> (use 'jobim.examples.fsm)
nil
user> (def *lock* (make-lock [1 2 3]))
#'user/*lock*
user> (state *lock*)
:locked
user> (push-button *lock* 1)
:ok
user> (push-button *lock* 2)
:ok
user> (push-button *lock* 3)
:ok
user> (state *lock*)
:open
user> (lock *lock*)
:ok
user> (state *lock*)
:locked
user> (push-button *lock* 2)
:ok
user> (push-button *lock* 4)
:ok
user> (state *lock*)
:locked

The implementation of the actor can be found in the jobim.examples.fsm.clj file of the Jobim source code. An alternative implementation of this very same actor can be found in the Erlang documentation.

ZeroMQ and Clojure, a brief introduction

What would BSD sockets look like if they were to be designed today?

That’s the question ZeroMQ designers seem to be trying to answer when they started working in the project.

ZeroMQ is one of the most misleading names for a software product. If you come to ZeroMQ from AMQP or ActiveMQ backgrounds you may be confused for a short period of time when you don’t find some of the components you may take for granted (e.g. broker). The fact is that ZeroMQ is much more than a queuing
system, it is a generalized communication layer that allows communication between application threads, wherever they are located: in different nodes in a network, in different processes being executed in the same machine or in the same application.

In the latest days I’ve working in adding ZeroMQ as an alternative
communication mechanism to RabbitMQ for Jobim actors. This post is a brief
summary of my findings that I hope can be useful for anyone interested in using
ZeroMQ from Clojure.

IN the beginning there were sockets

ZeroMQ main abstraction are good old sockets, but a very different kind of sockets. Quoting ZeroMQ official documentation:

“a ØMQ socket is what you get when you take a normal TCP socket, inject it with a mix of radioactive isotopes stolen from a secret Soviet atomic research project, bombard it with 1950-era cosmic rays, and put it into the hands of a drug-addled comic book author with a badly-disguised fetish for bulging muscles clad in spandex.”

Some of the feature of ZeroMQ sockets from traditional BSD sockets are the following:

  • ZeroMQ sockets are asynchronous and data can be queued before being actually sent to the receivers
  • ZeroMQ works with messages, packets of bytes that are sent from peer to peer
  • ZeroMQ sockets can be used with different communications patterns beside traditional bidirectional communication between two peers: request-reply, push-pull, pub-sub semantics can be imposed on top of ZeroMQ sockets
  • ZeroMQ sockets can be bound to a wide variety of transport mechanisms: TCP is available but also IPC for inter process communication as well as transports for in-process communication and multicast
  • ZeroMQ sockets doest not require some of the traditional BSD sockets operations to work, for instance, a client socket can connect to an endpoint that has not been already bound and endpoints doest not require the use of the accept operation

Taking this into account, the first step when using ZeroMQ into a new application is deciding which combination of transport + communication pattern is going to be used.

If we want to communicate java threads in the same JVM using a publish-subscriber communication pattern the right pattern is inproc transport + pub-sub, processes communicating across the network conforming a pipeline of
data processing can use TCP transport and push-pull communication pattern,  etc.

Installation and setup

Before taking a look at how ZeroMQ can be used inside a Clojure application, it is necessary to setup ZeroMQ as well as the required libraries. ZeroMQ is installed as a shared native library, so using ZeroMQ from java involves installing a JNI java wrapper.
Finally a Clojure library built on top of the JNI wrapper can be used from Clojure code to manipulate ZeroMQ sockets.

Installation of ZeroMQ is fairly straight:

$ wget http://www.zeromq.org/local--files/area:download/zeromq-2.0.9.tar.gz
$ tar -zxvf zeromq-2.0.9.tar.gz
$ cd zeromq-2.0.9/
$ ./configure CFLAGS="-m64" CXXFLAGS="-m64" LDFLAGS="-m64" && make
$ sudo make install

In the previous snippet I’ve used the “-m64” flag because I’m building for a 64bits OSX architecture. Currently, OSX java version does not support 32 bits native libraries (you can check this issuing a “java -d32” command in the shell), so I need a X86_64 build of ZeroMQ in order to build the right JNI library.

The installation of this library is also really simple:

$ git clone http://github.com/zeromq/jzmq.git
$ cd jzmq
$ ./autogen.sh
$ ./configure CFLAGS="-m64" CXXFLAGS="-m64" LDFLAGS="-m64" && make
$ sudo make install

Once these libraries are installed, we are ready to use ZeroMQ from a Clojure application. The easiest way of using the libraries that have been built is adding Clojure/ZeroMQ library as well as the ZeroMQ java wrapper as a dependency to a Leiningen project and adding the path to the native JNI library to the project using the “:native-path” key.

  :dependencies [[org.clojure/clojure "1.2.0"]
                 [org.clojure/clojure-contrib "1.2.0"]
                 ...
                 [org.clojars.mikejs/clojure-zmq "2.0.7-SNAPSHOT"]
                 [org.zmq/jzmq "2.0.6-SNAPSHOT"]
                 ...] 

:native-path "/usr/local/lib"

jzmq library must be installed manually using Maven. Another possibility is
using the excellent native-deps plugin by David Nollen.

Once this setup is accomplished, you should be able to access ZeroMQ from Slime or from a regular REPL or application passing the “-Djava.library.path=/path/to/native/lib” option to the java binary.

ZeroMQ usage

The first step when using ZeroMQ is creating a Context object. As I have mentioned all the operations in ZeroMQ are asynchronous. Internally, they are handled by OS level threads. The Context object initializes this pool of threads that will handle socket operations.

The make-context is able to create the ZeroMQ context. It receives the number of threads that will be created in the pool. If you want to use ZeroMQ as communication mechanism for Java threads inside a single application, using the inproc transport, 0 must be passed as an argument.

user> (use 'org.zeromq.clojure)
nil
user>(def *ctx* (make-context 0))
#'user/*ctx*

The initialized context can be used to create sockets that will receive incoming messages or that will be used to send messages.

The following code creates a new socket with a pull/downstream communication pattern bound to a inproc transport layer. The string “inproc://test” identifies the end point for that socket.

The remaining code just creates a thread that loops in the incoming messages printing out the received message.

Notice how the delivered message is an array of bytes with the same data that will be sent from the client.

(future (let [s (make-socket *ctx* +upstream+)]
             (bind s "inproc://test")
             (loop [msg (recv s)]
               (println (str "Received: " (String. msg)))
               (recur (recv s)))))

The client counterpart is also easy. We just need to create the socket with the right transport (+downstream+) and connect it to the endpoint (“inproc://test”).

In the following piece of code we create some clients that will connect to the same endpoint, using the connect and starts sending messages using the send- function:

(doseq [i (range 0 5)]
  (future (let [s (make-socket *ctx* +downstream+)]
               (connect s "inproc://test")
               (loop [c 0]
                 (send- s (.getBytes (str "hey " i " " c)))
                 (Thread/sleep (rand 5000))
                 (recur (inc c))))))

If everything goes fine, we should start getting the following output:

Received: hey 0 0
Received: hey 1 0
Received: hey 2 0
Received: hey 3 0
Received: hey 4 0
Received: hey 0 1
Received: hey 0 2
Received: hey 2 1
Received: hey 4 1
Received: hey 1 1
Received: hey 3 1
Received: hey 1 2
...

Using the push/pull communication pattern we have created a n-1 fan-in message queue, selecting the inproc transport, we have used this communication layer to exchange data among threads in the same JVM.

We could have used a different transport layer to use the fan-in communication pattern to exchange data among threads in different JVMs. The only change required in the previous code to accomplish this task is changing the transport declaration in the bind and connect functions to a TCP transport, for instance:

(bind s "tcp://192.168.1.35:5555")

Communication patterns

One of the main advantages of ZeroMQ is the support for different communications patterns. Each one of them encapsulate a behaviour that can be reused directly into your application to solve a specific communication need.

It is important to know all of them to understand when it is feasible to use one or another.

Each communication pattern is divided into two roles, that must be matched against each other for each couple of sockets. It is important not to sockets for different communication patterns since unexpected behaviour could arise.

request/reply

allows a sequence of requests/replies to be exchanged between clients and servers. The outoging messages are load-balanced between all the available servers.
If the client reach the limit in the buffer of outgoing messages or any other exceptional condition, will block. The servers will drop messages.

publish/subscribe

Enables pub/sub pattern where the publish socket uses a fan-out pattern to send messages to all the subscribed clients. The client can subscribe to different publishers using the set-socket-option function. Communication is unidirectional.

If the publisher reaches the limit in the outgoing buffer limit, it will start dropping messages.

push/pull

Builds a pipeline of nodes where data is pushed to at leas one connected socket

that pulls data. If a push socket is connected to more than one outgoing pull socket, messages are load-balanced among them.

If the memory limit is reached, the push socket will block.

Conclusions

ZeroMQ is incredible powerful, is not only fast and efficient but it also simplifies enormously the design of the communication layer for any application.
It also makes possible to re-use the same communication pattern in different situations.

Nevertheless, ZeroMQ can be seen, in my opinion, as complementary to queues systems like RabbitMQ rather than a drop in replacement. As with any other software system, a careful look at your application requirements will make clear which communication mechanism suits better to the problem you are trying to
solve.

This article is just a really brief overview of what ZeroMQ has to offer. If you want to know more about ZeroMQ and the rest of features it offers like multi messages or devices I encourage you to dive into the great documentation available in the official ZeroMQ site.

Jobim: an actors library for Clojure

In the latest days I’ve been working in an actors library for Clojure built on
top of RabbitMQ and Zookeeper. I’ve called this little piece of software
Jobim. The source code is available in github.

External dependencies

Jobim depends on RabbitMQ 2.X for dispatching messages between JVM
nodes. RabbitMQ is reliable, high performant messaging solution, supporting the
0.9 version of the AMQP messaging protocol, built on top of Erlang’s OTP
platform.
I’ve been using in my day to day work Rabbit for several months and is one of
the best alternatives you can find to build a message queue.

Jobim also has a dependency on Apache’s ZooKeeper. ZooKeeper is a really
impressive software that in the purest UNIX tradition does only one thing but
does it in the best possible way. In this case Zookeeper allows a set of
distributed processes to manage a shared tree of nodes and get notification
whenever this tree is modified by other processes. This basic functionality
provides out of the box support to use ZooKeeper as a powerful directory service
as well as a group membership service. It can also be extended to provide a whole
bunch of high level coordination services like priority queues or 2 phase commit
protocols.

Turning the JVM into a Node

Jobim actors need to be executed in a JVM that is executing a Node service. JVM
nodes are aware of the existence of other nodes, can exchange messages and
coordinate their activities.

A node service can be started in the JVM using a single function
jobim/bootstrap-node. This function receives as a parameter, a path
to a configuration file where the name of the node as well as the connection
options to RabbitMQ and ZooKeeper must be stated.

The following is a sample configuration file:

;; Default configuration for a node

{:node-name "linux"

 ;; rabbit host, port, username, password and virtual host
 :rabbit-options [:host "192.168.1.35"]

 ;; zookeeper options:
 ;; servers: array of zk servers specified as host:port
 ;; timeout : session timeout
 ;; id : session id
 ;; password
 :zookeeper-options ["192.168.1.35:2181" {:timeout 3000}]}

Once we have started the node, we should be able to use the
jobim/nodes function to check all the available nodes and their
identifiers:

=> (use 'jobim)
nil
=> (bootstrap-node "node-config.clj")
"6811651bd83e4d428359b419e7f76a75"
=> (nodes)
{"osx" "5299491ea4184c02ad8c0fbc100c49f9",
 "linux" "6811651bd83e4d428359b419e7f76a75"}

Nodes are aware of changes in the list of available nodes. They are also notified about
shutdown of nodes or about nodes not being reachable due to network partitions.

Creating an actor

A Jobim actor can be started using any Clojure function. This function can use
two special functions: send! and receive to send and
receive messages from other actors in the same node or in a different node.
In order to send a message, the actor needs to know the PID of the actor
receiving the message. This PID is passed as the first argument of the
send! function. The payload of the message is the second argument.

One actor can retrieve its own PID using the jobim/self function.

This is an implementation of a sample ping actor:

(defn ping
  ([]
     (loop [continue true
            msg (receive)]
       (cond-match
        [#"exit" msg]
          (recur false msg)
        [#"exception" msg]
          (throw (Exception.
            (str "Ping actor with PID:"
                 (self)
                 "received exception")))
        [[?from ?data] msg]
            (do (send! from data)
              (recur true (receive)))))))

The implementation shown above uses Matchure pattern matching library to
provide a more “erlang-like” experience.

To start the execution of a concurrent actor, the jobim/spawn
function can be used. it accepts a function or a string with the qualified
name of a function.

The jobim/spawn function returns the PID of the newly created
process or throws an exception if the creation went wrong.

=> (def *pid* (spawn examples/ping))
#'clojure.core/*pid*
=> *pid*
"5299491ea4184c02ad8c0fbc100c49f9.1"

The REPL or any other thread can be transformed into an actor using the
spawn-in-repl function. We will use this function to send some
messages to the ping actor we just created:

=> (spawn-in-repl)
"5299491ea4184c02ad8c0fbc100c49f9.2"
=> (send! *pid* [(self) 13123])
ok
=> (receive)
13123

Messages payload

Jobim uses standard Java serialization mechanism to build the payload of the
messages. This means that whatever object implementing
java.io.Serializable can be send inside of a message.

For instance, we can send Date objects to our ping actor:

=> (send! *pid* [(self) (java.util.Date.)])
ok
=> (receive)
#

It is possible to change the serialization and deserialization mechanism used by
a node altering the jobim.core/default-encode and
jobim.core/default.decode symbols with suitable functions.

Going distributed

The most basic building block for distributed computing in Jobim is the
jobim/rpc-call function. This function receives a node identifier, a
string containing a function and an array of arguments and tries the invocation
of that function in the remote node.rpc-call returns inmediately
without knowing the result of the invocation in the remote node. If we want to
retrieve the result of the invocation we can use the blocking variant:
jobim/rpc-blocking-call that blocks until the result is returned or
an exception is launched.

RPC functions in Jobim accepts node identifiers, we can transform the name of a
node, maybe already retrieved with the jobim/nodes function into a
node identifier, using the jobim/resolve-node-name function.

The following example is a RPC call to do a simple addition:

=> (nodes)
{"osx" "5299491ea4184c02ad8c0fbc100c49f9", 
 "linux" "6811651bd83e4d428359b419e7f76a75"}
=> (resolve-node-name "linux")
"6811651bd83e4d428359b419e7f76a75"
=> (rpc-blocking-call (resolve-node-name "linux") 
                      "clojure.core/+" [1 2 3 4 5])
15

One specially important use of the RPC functions is to start new actors in other
nodes invoking the jobim/spawn function remotely. If we use the
blocking variant of the the RPC function, we will retrieve the PID of the remote
actor and we could start sending messages to it:

=> (def *pid* (rpc-blocking-call (resolve-node-name "linux") 
                                 "jobim/spawn" 
                                 ["jobim.examples.actors/ping"]))
#'clojure.core/*pid*
=> *pid*
"6811651bd83e4d428359b419e7f76a75.1"
=> (send! *pid* [(self) 345])
nil
=> (receive)
345

Publishing processes

As long as we have the PID of an actor, we will be able to exchange messages
with it. Besides, since the PID is just a string, we can pass the PIDs inside
messages allowing actors to be “mobile actors” in a Pi-Calculus sense.

Nevertheless, it is sometimes convenient to be able to query for an actor using
a constant reference, for instance an alias, we know beforehand, so we can
communicate with the actor without needing to know its PID.

Jobim supports this use case with the jobim/register-name
function. Using this function, we can provide a name for the PID of a process
that will be globally available to all nodes in the system.

Registered names can be queried using the jobim/registered-name
function in a similar way to the jobim/nodes function for node
names and node identifiers.

We can transform a registered name into an actor PID using the
jobim/resolve-name function, so we can pass it as an argument in
jobim/send! function calls.

=> (def *ping* (spawn examples/ping))
#'clojure.core/*ping*
=> *ping*
"5299491ea4184c02ad8c0fbc100c49f9.8"
=> (register-name "ping" *ping*)
ok
=> (registered-names)
{"ping" "5299491ea4184c02ad8c0fbc100c49f9.8"}
=> (resolve-name "ping")
"5299491ea4184c02ad8c0fbc100c49f9.8"
=> (send! (resolve-name "ping") [(self) 1234])
ok
=> (receive)
1234

When things go wrong

Erlang systems have a very particular approach to error handling that consists of
not preventing failures but reacting quickly after a failure happens, most of
the time, restarting the failing component.

The basic mechanism behind this approach is the “linking” of processes. When two
Erlang processes are linked, any failure in one of the two process will produce
a fail signal in the other process that, if not properly handled, will cause the
other process to fail.

Special processes, known as supervisors, take care of creating and linking to
children processes as well as handling exceptions in the children according to some
kind of recovery policy.

Distributed Erlang applications are usually arranged as trees of processes where the
process at a node handle the error in the leafs of that node, and if is not able
to recover from that error, dies and bubble the error to the upper level.

Jobim provides limited support for this style of error handling with the
jobim/link function. The link function receives the PID of an actor
as an argument and links bidirectionally both actors.

From this point on, any error in one actor or a node down in the node where one
of the actors is running will produce a special message signaling the error in
the other actor.

=> (self)
"5299491ea4184c02ad8c0fbc100c49f9.1"
=> (def *pid* (spawn examples/ping))
#'clojure.core/*pid*
=> (link *pid*)
{"5299491ea4184c02ad8c0fbc100c49f9.1" 
 ["5299491ea4184c02ad8c0fbc100c49f9.9"],
"5299491ea4184c02ad8c0fbc100c49f9.9" 
 ["5299491ea4184c02ad8c0fbc100c49f9.1"]}
=> ; the ping actor will throw an exception if receives a message containing the
"exception" string
=> (send! *pid* "exception")
ok
=> (receive)
{:signal :link-broken, 
 :from "5299491ea4184c02ad8c0fbc100c49f9.9", 
 :cause "class java.lang.Exception:Ping actor received exception"}

This means that linked processes in Jobim must explicitly look for this kind of
messages and handle them, maybe throwing an exception, to obtain a similar
behaviour to OTP applications.

Evented actors

The actors introduced so far are executed in their own Java thread. This is a
huge problem since a JVM will start throwing OutOfMemory exceptions
after a few thousands of threads are created.

On the other hand, Erlang systems can handle millions of concurrent actors in a single node, using
a preemptive scheduler that applies a fixed number of reductions in each Erlang
process being executed. This means that Erlang processes are extremely
lightweight and can be benefit from features like the linking of processes
previously discussed.

A possible alternative for building systems using a large amount of actors is to
use “evented actors” so a single java thread can execute different actors. This
solution has been explored in Scala actors.

Jobim evented actors rely on three special functions: jobim/react that is
equivalent to the receive funtion of a regular actor,
jobim/react-loop that creates a recursive evented actor and
jobim/spawn-evented that creates a new evented actor returning its
PID. This PID can be used with the regular jobim/send! function to
send messages to the evented actor.

The following is an evented implementation of the previously defined ping actor:

(defn ping-evented
  ([]
     (let [name "test evented"]
       (react-loop
        (react
         (fn [msg]
           (cond-match
            [#"exit" msg]       
              false
            [#"exception" msg]  
              (throw (Exception. "Ping actor received exception"))
            [[?from ?data] msg] 
              (send! from (str "actor " name " : " data)))))))))

Conclusions

Erlang is an incredible platform for building distributed reliable
systems. Support for an actors library providing support for distributed failure
signals and tolerance to network partition can be a nice addition to Clojure’s
own concurrency mechanism to build distributed applications in the JVM.
It could also be mixed with different distribution mechanisms available in the
JVM.

Jobim is just an experiment on how this kind of systems could be built using two
beautiful pieces of software like RabbitMQ and ZooKeeper.