Category Archives: Uncategorized

Message Passing Concurrency in Clojure using Kilim

Some slides from my talk about Clojure and Kilim.
You can get the code and the examples in github

Monte Carlo integration with Clojure and Mahout

Monte Carlo simulations can be used to approximate the value of different mathematical models where computing an analytical solution is an intractable problem or it is impossible to find because of the uncertainty in some variables.

Conceptually Monte Carlo simulations are simple.
They are based in the repeated computation of the model where the value for the random variables are extracted from some known probability distribution.
Finally, the results of all the simulations are averaged to obtain the approximated value.

The two main requirements to run a Monte Carlo simulation, provided you already have a mathematical model of the proble, are a good random number generator and some tool to model the different probability distributions. The Apache Mahout project has implemented these tools. It includes a powerful pseudo-random numbers generator that passes all the DieHard randomness tests. It also includes Java classes that make easy to work with different probability distributions.

This post is a small example of how to use these classes from Clojure as a convenient scripting language to work with Mahout. The example implemented is extracted from this excellent introductory book published by Springer.
It consists of computing the integral for the following function:

  (defn h
    ([x] (pow (+ (cos (* 50 x)) (sin (* 20 x))) 2)))

We can use Incanter to visualize this function:

  (use 'incanter.core)
  (use 'incanter.charts)

  (view  (function-plot h 0 1))

The integral can be computed anallytically obtaining the value 0.965.

The montcarlo approach is based in extracting iid variables from an uniform probability distribution U(0,1), and approximate the value with the function:

  (def *solution* (/ (reduce + *randoms*) *trials*))

Where *randoms* is the collection of randomly generated iid variables and *trials* is the number of variables generated.

In order to generate these values we can use the org.apache.mahout.math.jet.random.Uniform Mahout class:

  (import 'org.apache.mahout.math.jet.random.Uniform)
  
  (def *unif* (Uniform. (double 0.0)
                        (double 1.0)
                        (mod (.getTime (java.util.Date.)) 10000)))

  (def *trials* 100000)

  (def *randoms* (take *trials* (repeatedly #(h (.nextDouble *unif*)))))

To check that the values generated, we can plot the generated values using Incanter:

  (view (bar-chart (range 0 *trials*) *randoms* ))

The computed value is 0.9651838354718588, very close to the actual solution of 0.965.

We can also compute the running averages and plot them to see how the algorithm converges to the solution:

  (def *averages* (reduce (fn [ac i]
                            (conj ac (/ (+ (last ac) (nth *randoms* i)) (inc i))))
                          [0]
                          (range 0 *trials*)))

  (view  (function-plot (fn [x] (nth *averages* (Math/round (float x)))) 0 *trials*))

Monte Carlo simulations are a simple yet very powerful idea that can be applied to a lots of different of situations. If you want to learn more about Monte Carlo methods, these are some useful resources:

visualizing Mahout’s output with Clojure and Incanter

Some Clojure code to visualize clusters built using Apache Mahout implementation of the K-Means clustering algorithm.

The code retrieves the output of the algorithm (clustered-points and centroids) from HDFS, builds a Clojure friendly representation of the output (a map and a couple of lazy-seqs) and finally uses Incanter’s wrapper around JFreeChart to visualize the results.

A sample execution using the output generated by the example from Mahout’s documentation:

(use 'mahout-vis.core)

(bootstrap! "/Users/antonio/Development/tmp/hadoop-0.20.2/
conf/core-site.xml")

(def *results* 
  (k-means-output "output/clusters-9/part-r-00000" 
                  "output/clusteredPoints/part-m-00000"))

(visualize-plots 
  (compute-comps *results* [5 10] [15 50] 
                           {:display-centroids true}))

The output of the previous code are 4 frames displaying the clusters for the components with indices 5,10,15 and 50 of the input data.

Other visualizations can be generated interactively from Clojure’s REPL. Another example of how Clojure can provide an interactive and powerful interface to complex Java systems.

Extending MongoDB with custom commands

MongoDB is one of the fastest alternatives to store and retrieve data. Mongo stores information organized as schema-less documents. Queries and data objects are expressed as JSON objects encoded using a “binary” serialization instead of plain text for improved performance.

There are two main mechanisms to extract information from MongoDB. A find command wrapping a small ad-hoc query language including conditionals, sorting, etc. and a mapReduce command for batch processing.

When not standard retrieval of data is required, the mapReduce command is the only option MongoDB offers. In a recent project I’ve been working on, I had to select documents stored in mongo with a value for the Levenshtein distance from a query term inferior to a certain threshold, a similar functionality tha the one offered by the fuzzystrmatch module in PostgreSQL. The task can be accomplished using Mongo’s mapReduce command, but the performance of the queries was not optimal.

As an experiment, I started reading the code of MongoDB to see if there was an easy way to implement this functionality directly in the database. What I found is that Mongo’s code is really modular and easy to extend.
The outcome has been a new command that implements the functionality with a big improvement in performance, as the following table shows:

implementation mapReduce native
levenshtein 0.7 (0 matches) 1.941 s 0.077 s
levenshtein 0.4 (21 matches) 2.691 s 0.091 s
levenshtein 0.1 (22.478 matches) 22.857 s 7.962 s

The collection being queried in this test is a collection with 100.000 documents containgin random strings of text between 30 and 100 characters.

The code for the new command can be found at Github. The files in this commit contain all the code required to implement the command.

The following is a small summary of the steps required to extend MongoDB to support this kind of queries.

Retrieving MongoDB’s code and building process

MongoDB’s code is available at Github. Once the code has been retrieved, the next step is to build the data base and all the additional functionality, like the Javascript shell.
Mongo uses SCons as the build infrastructure. SCons is itself built using python, so this is a dependency that must be installed in your system before you can build Mongo.

To build the whole system, a single command is enough:

$scons .

The task can take quite a long time but after building the whole system, SCons does a great work just re-building only the modified sources.
Different parts of the system can also be built as independent targets:

# builds only the db
$scons mongod
# builds only the JS shell
$scons mongo

Creating a new DB command

Mongo’s core functinality can be found in the db directory of the source distribution. It includes the implementation of Mongo’s RESTful API, indexes/BTree support, standard Mongo’s queries and also the list of commands that can be issued to the database, e.g. the mapReduce.
Adding a new command to the list means implementing a new C++ class with the functionality of the command and registering a name for this command in a map of command-names to command classes.

If we take a look at db/commands.cpp we will find the function used by the server frontend to look for the function it has to execute:

    map * Command::_commands;
   ... 

    Command* Command::findCommand( const string& name ) {
        map::iterator i = _commands->find( name );
        if ( i == _commands->end() )
            return 0;
        return i->second;
    }

All commands implement the abstract mongo::Command class. The subclass must implement some functions in order for the command to be executed. The mos important function is mongo::Command::run defined in db/commands.h:

  // db/commands.h line 50
   virtual bool run(const string& db, BSONObj& cmdObj, 
                    string& errmsg, BSONObjBuilder& result, 
                    bool fromRepl) = 0;

The base Command class also provides a base constructor that will automatically register the command in the commands map when invoked in the subclass. For example, the implementation of the mapReduce command registers itself for execution invoking the base constructor:

/**
 * This class represents a map/reduce command executed on a single server
 */
class MapReduceCommand : public Command {
  public:
     MapReduceCommand() : Command("mapReduce", false, "mapreduce") {}

Retrieving the arguments for the command
The query retrieved from the client is encoded as a BSON object and passed as the second argument to the run function.
There is a whole suite of functions to manipulate BSON objects defined in MongoDB. They can be found in bson/bsonobj.h and bson/bsonelement.h.
In this fragment of code from the mapReduce command implementation the out parameter of the query is handled. The BSON object is stored in the variable cmdObj:

if ( cmdObj["out"].type() == String ) {
    finalShort = cmdObj["out"].String();
    outType = REPLACE;
}
else if ( cmdObj["out"].type() == Object ) {
    BSONObj o = cmdObj["out"].embeddedObject();

    BSONElement e = o.firstElement();
    string t = e.fieldName();

    if ( t == "normal" || t == "replace" ) {
        outType = REPLACE;
        finalShort = e.String();
    }
    else if ( t == "merge" ) {
        outType = MERGE;
        finalShort = e.String();
    }
    else if ( t == "reduce" ) {
        outType = REDUCE;
        finalShort = e.String();
    }
    else if ( t == "inline" ) {
        outType = INMEMORY;
    }
    else {
        uasserted( 13522 , str::stream() << "unknown out specifier [" << t << "]" );
    }

    if (o.hasElement("db")) {
        outDB = o["db"].String();
    }
}

Obtaining a cursor
To implement the desired functionality it usually necessary to traverse the collection of Mongo documents stored in the DB. Mongo implements this functionality using cursors.
Cursors can be obtained using a factory function called bestGuessCursor that receives as a parameter an unique namespace for the command and a description of a DB query.
The cursor is returned as a Boost smart pointer so we don’t have to deal with the deallocation of the resources consumed by the pointer. A possible template for a function using a collection pointer could be:

// run function
bool run(...) {
  
  // get the cursor
  shared_ptr temp = bestGuessCursor( ns, BSONQuery, BSONObj() );        
  auto_ptr cursor( new ClientCursor( timeoutOpts , temp , ns ) );

  // main loop
  while ( cursor->ok() ) {

    // get current document
    BSONObj o = cursor->current();

    ... logic ...

    cursor->advance(); 
  }
}

Building the output

The result of the command must be returned also as a BSON object. To build this object a reference to a BSONObjBuilder object is passed as an argument to the run function. The logic of the function can use functions like append to add values to the resulting BSON object. If the values of this object must also be BSON objects, additional BSONObjBuilder instances can be created. Once the object has been built, it can be retrieved from the builder calling to the obj funtion.

The run function must also signal if the execution of the command has been successful returning a boolean value.

Adding support for the command in the shell
In order to use the command we have implemented, we can add support for the command in the Mongo JS shell. A good location for the JS code invoking the command is shell/collection.js.
The function must build the JSON object tha will be later received as a parameter in the command implementation at the server. The only requirement for this JSON object is that the first property of the object must have the same name that the string used to register the command in the DB. The value for that property must be the short name of the collection. The rest of properties are optional. The command can be executed using the this._db.runCommand object from the present collection object.

As an example, this is the implementation of the custom levenshtein command:

DBCollection.prototype.levenshtein = function( sourceTerm , field, threshold, opts ){
    var c = { levenshtein : this._shortName , sourceTerm : sourceTerm , field : field, threshold : threshold };
    opts = opts || {"level":"word"};

    if(!opts["level"] || opts["level"] === "word") {
        c["word"] = true;
        c["sentence"] = false;
    } else {
        c["word"] = false;
        c["sentence"] = true;    
    }

    c["separators"] = (opts["separators"]||".,:; ");

    if(opts["limit"]) {
        c["limit"] = opts["limit"];
    }
    if(opts["outputField"]) {
        c["outputField"] = opts["outputField"];
    }

    var raw = this._db.runCommand( c );
    if ( ! raw.ok ){
        __mrerror__ = raw;
        throw "levenshtein matches failed:" + tojson(raw);
    }

    return tojson(raw);

}

Adding support in a driver
One problem of extending MongoDB this way is that we must add support for the new command in all the layers between our client code and the DB. We have already added support to the JS shell but many applications access MongoDB through some kind of driver interfacing with the DB server.

In my case, I was using Sominum’s congomongo Clojure library. This means adding support in two different layers, the low level Java driver and the Clojure wrapper library.
Fortunately, the design of the library and the Java driver make possible to add support for the command entirely in client code without further modification of the library sources. Congomongo’s coerce function also makes very easy to transform data structures to and from Clojure’s native data types and BSON objects. An example implementation can be found in this Github’s gist

Chasing Erlang: profiling a Clojure library

This is a short summary of my efforts profiling and benchmarking Jobim, the actors library for Clojure I’ve been working for the last few months. I have very little experience profiling Clojure applications, so I thought a brief summary of the process may be interesting for other people in the same situation.

How to measure performance?

The first problem I found wast to make up a good way of testing the actual performance of the library. Performance of complex systems is really hard to test. The number of moving parts make difficult to spot the bottlenecks and wrong conclusions can be drawn easily.
Fortunately, in my case I found an easy solution for the problem. I used the same test suite built to benchmark Termite Scheme. Termite is a variant of Scheme built for distributed computing and modelled after Erlang programming model on the Gambit Scheme system.
The Termite paper has a full section discussing different performance tests and showing the results for Termite and Erlang. Furthermore, the implementation of the tests can be found in Termite’s source code.

Because of the similarities between the projects, implementing these benchmark tests in Clojure/Jobim provide a good starting point to test the performance of the library.

Finding tools

To measure performance, System/nanoTime can be good enough. On the other hand, to profile the code when performance is bad, you really need a good profiler tool. Unfortunately, there is no a good Clojure profiler tool as fas as I know. Commercial Java profilers like Yourkit are a good option but they can also be expensive. A good free alternative to these profilers is VisualVM. The profiler is included in the Java distribution and it’s really ease to use. Using VisualVM with Clojure is just a matter of selecting the REPL process from the profiler user interface and start gathering data.

Adding type hints

One first step to increase the performance of a Clojure application is to add type hints to the code of your functions. Clojure is a dynamic language, but that’s not the case of Java, the host-language for Clojure. In order to invoke Java methods, Clojure needs to find the type of the java object passed as arguments to these methods or returned from them. Java reflection is the mechanism that makes possible to look up for this informatipn at run-time. The use of reflection has a double impact on performance, it adds processing time and it consumes additional memory creating reflection objects. The use of reflection is usually not a problem in regular applications, memory is freed by the garbage collector and the performance penalty is not significative. Nevertheless, in some other cases the use of the reflection mechanism can become a problem.

The following images show a profiler session for the execution of the Termite ring test with a ring of 40k actors. The non-deterministic execution of the garbage collector has been disabled with the -Xincgc JVM parameter.

The screenshots show how the execution of the test allocates a huge amount of memory as java.lang.reflect.Method objects. These objects are released by the GC once it is manually executed but in a regular execution, the execution of the GC thread might have impacted very negatively the outcome of the test.

In order to avoid these situations, Clojure can notify us when it will use reflection to resolve a method call if we set up the *warn-on-reflection* var to true. With this modification, the list of conflicting calls will be shown when the source code is being compiled. Using information we can add type hints with the correct type for the call to the metadata of the problematic symbols. Clojure will use this information instead of using reflection to discover the right type for the objects.

The following figures show another run of the same test after having added the required type hints. We can see how the use of memory has decreased and the java.lang.reflect.Method objects have dissapeared.

In the case of Jobim, the number of type hints required to resolve all the problematic calls was really small. Around 15 type hints were enough. Nevertheless, some calls involving protocols were harder to solve. The use of VisualVM was a great help to check that no reflection objects were being created.

Object serialization
One of the main performance problems in tests involving the exchange of messages among actors in different nodes is the serialization and deserialization of messages. Using standard Java serialization the amount of measured time for some tests spent in the serialization of messages could reach 20% of the total. Besides, serialization of some Clojure objects like lambda functions is not supported.

Lacking an unique solution for the problem. Different serialization services have been implemented as plugins for Jobim. The following table shows some performance results for a test consisting of encoding and decoding a java HasMap with long lists of integers as values:

Serialization mechanism Time encoding+decoding
JSON serialization 5.566 ms
Java serialization 4.419 ms
JBoss serialization 1.437 ms
Kryo serialization 0.573 ms

The result shows how the standard Java serialization is almost as slow as JSON serialization. Kryo offers a very good performance with the drawback of not supporting Clojure core data types. Finally JBoss serialization library offers a much better performance than the standard Java serialization with a compatible interface and the same degree of support for Java types, including Clojure core data types.

The lack of support for clojure data structures makes Kryo a valid option only for certain cases where actors restrict the kind of messages they exchange so they can be serialized by Kryo. JBoss serialization is a good default option and has been used in the rest of the tests. Jobim data types used in the exchange of messages between nodes by the library have restricted to standard Java types so they can be used with any serialization mechanism.

Results

The following table shows the results I have obtained profiling the current version of Jobim. The results for Erlang have been measured using the same hardware where the Clojure version was tested.

test Erlang Jobim threaded Jobim evented
fib(534) 585 ms 378 ms -
Spawn 6.92 us 49 us 27 us
Send 0.2 us 120 us 16 us
ring 40k/0 3 us - 83 us
ring 40k/4 5 us - 166 us
ping-pong internal (0) 0.7 us 148 us 25 us
ping-pong internal (1000) 16 us 152 us 29 us
ping-pong same node (0) 58 us 14333 us 5027 us
ping-pong same node (1000) 182 us 25921 us 6303 us
ping-pong remote (0) 3267 us 18481 us 7022 us
ping-pong remote (1000) 16556 us 38730 us 8400 us

Ping-Pong tests where executed using a Jobim configuration consisting of the ZooKeeper plugin for the coordination service, the TCP plugin for the messaging service and the JBoss plugin for the serialization service. The rest of the tests where run with the localnode plugin for coordination, service and serialization.

Some conclusions:

  • Clojure is faster than Erlang in the base case chosen, computing fibonacci of a small number with a naive implementation.
  • Actor models primitives: spawn, send, receive are orders of magnitude faster in Erlang that implemented as library functions in Jobim.
  • The number of processes/actors that can be created in a single node is far inferior in Jobim than in Erlang. In a machine where the Erlang version of the ring test was running without problems with 1M processes, the JVM was struggling with ~50k evented actors. Threaded actors are even less scalable.
  • Erlang seems to have some issues with the size of network messages, Jobim faces less variation in the response times when the size of the messages varies
  • Evented actors offer far superior performance than the threaded implementation in the tests.
  • In tests involving actual network communication, Jobim gets closer to Erlang and it even outperforms Erlang in one case.

With these results, it is probably not a good idea to write Erlang-in-Clojure applications using Jobim. Nevertheless, the library starts to be performant enough to be used as a distribution mechanism for Clojure applications using actors semantics and some of Erlang features like actors linking or behaviours.

Barber shop problem again

Today, ther have been some talking about concurrency in Clojure vs Scala. Most of the debate was based on an excellent article Scala vs Clojure – Round 2: Concurrency! by Lau B. Jensen.

The article discusses the classic “barber shop” problem and offers two implementations one in Clojure, using Clojure concurrency primitives, and another one in Scala using Scala implementation of actors.

Since I’ve been working in the latest months in Jobim, an actor’s library for Clojure, I thought it would be interesting to offer an additional implementation using actors in Clojure. The code can be found here.

The implementation of the actors is pretty straightforward. It is interesting to compare it with the Scala version from the previous article.

The shop actor:

(defn do-shop
  "Shop actor"
  ([max-seats max-clients]
     (let [seats (atom [])
           max-clients (atom max-clients)]
       (loop [[msg pid] (receive)]
         (condp = msg
             ;; allocate a seat for the client
             :get-seat (if (< (count @seats) max-seats)
                         (do (swap! seats conj pid)
                             (send! pid :seated))
                         (send! pid :no-space))
             ;; barber asks for a client
             :next-client (let [pid (first @seats)]
                            (swap! seats rest)
                            (send! (resolve-name "the-barber") pid))
             ;; A client is leaving the shop
             :exiting (swap! max-clients dec))
         (if (> @max-clients 0)
           (recur (receive))
           (notify-shop "Closing barber shop"))))))

The customer:

;; Customer
(defn customer
  "Customer actor"
  ([id]
     (spawn
      ;; enters the shop and ask for a seat
      #(loop []
         (println (str "Customer id " id "..."))
         (let [seated (do (send! (resolve-name "barber-shop") [:get-seat (self)])
                          (receive))]
           ;; we have a seat, wait to have a hair cut
           (if (= seated :seated)
             (let [_ (send! (resolve-name "the-barber") :hey)
                    ;; Awaking the barber
                   msg (receive)]
               ;; barber starting to do his thing
               (when (= msg :cutting)
                 (notify-shop (str "Customer " id " getting his hair cut")))
               ;; barber finished
               (let [msg (receive)]
                 (when (= msg :finished)
                   (notify-shop (str "Customer " id " got his hair cut")))
                 ;; leaving the shop
                 (send! (resolve-name "barber-shop") 
                           [:exiting (self)])
                 (notify-shop (str "Customer " id " leaving the shop"))))
             ;; No space available in the shop, exiting
             (do
               (notify-shop (str "Customer " id " could not get his hair cut"))
               (Thread/sleep (rand 5000))
               (recur))))))))

And finally the barber:

;; Barber
(defn barber
  "Barber actor"
  ([]
     (let [pid (spawn
                #(loop [msg (receive)] ;; wait for a customer
                   (when (= msg :hey) ;; awaken!
                     ;; ask for the customer to come in
                     (send! (resolve-name "barber-shop") [:next-client (self)])
                     ;; ignore the customers trying to awake us
                     (let [client-pid (receive (fn [msg] (not (= :hey msg))))] 
                       ;; starting to cut
                       (send! client-pid :cutting)
                       (Thread/sleep (rand 4000))
                       ;; finished cutting
                       (send! client-pid :finished)))
                   (recur (receive))))]
       (register-name "the-barber" pid))))

Additionally, the implementation includes the user of a behaviour. Behaviours is a concept taken from Erlang that refers to a piece of functionality in a distributed system that is encapsulated so it can be easily reused.

In this case, since the customers can be executed in any node, if we would like to collect the messages produced by all the customers, we could not use the traditional mechanism of printing to the standard output. The messages will appear all across the cluster. Using an event manager and a event handler we can collect messages from all the customers in the cluster in a central location.
Behaviours in Jobim are implemented using Clojure’s protocols. In this case:

;; Message logger

(def-event-handler BarberLogger
  (init [this initial-state] initial-state)
  (handle-event [this event state]
                (println (str "*** " event)))
  (terminate [this state] (println ("BarberLogger exiting"))))

(defn notify-shop [msg]
  (notify (resolve-name "barber-manager") msg))

More information about the event manager behaviour can be found in the Erlang manual.

One of the fundamental advantages of using a platform like Erlang is that you can run the same application in a single node or in a set of nodes with minimal changes. In this example, there are a couple of function to start the application in a single node or in multiple node. The only difference is the way the customer actors are initiated:

(defn run-barber-shop
  "Runs the barber shop problem in a single node"
  ([num-seats num-clients]
     (start-event-manager "barber-manager")
     (add-handler (resolve-name "barber-manager")
                  [name jobim.examples.barber.BarberLogger]
                  nil)
     (shop num-seats num-clients)
     (barber)
     (doseq [i (range 0 num-clients)]
       (customer i))))

(defn run-barber-shop-distributed
  "Runs the barber shop problem distributed in a set of nodes"
  ([num-seats num-clients nodes]
     (start-event-manager "barber-manager")
     (add-handler (resolve-name "barber-manager")
                  [name jobim.examples.barber.BarberLogger]
                  nil)
     (shop num-seats num-clients)
     (barber)
     (loop [ids (range 0 num-clients)
            nodes (cycle nodes)]
       (if (empty? ids)
         (println (str "*** All clients spawned"))
         (do (rpc-blocking-call (resolve-node-name (first nodes))
                                "jobim.examples.barber/customer"
                                [(first ids)])
             (recur (rest ids)
                    (rest nodes)))))))

The result of a test run can be seen in this video:

As you can see, mixing actors in Clojure is just another possibility and it makes a really good alternative when you want to build systems spanning more than a single node.

Embedding V8 in a Clojure SPARQL library

Using Javascript in Java applications is a common practice. The easiest path to achieve this integration is to use the excellent Rhino library from the Mozilla Foundation. This Library is an implementation of Javascript in Java, so interop. from both sides is really easy.
Nevertheless, when I was in the need of re-using a Javascript implementation of a SPARQL 1.1 parser I has been working in a different Clojure project, I decided to try a different approach and use Google’s V8 Javascript engine. These are the results of this small experiment.

The V8 project has some instructions about how to embed the engine into a C application. V8 exposes JavaScript objects to C code through ‘handlers’ referencing these objects. Objects, can be createad and released from C but instead of manually tracking each object individually, they can be grouped into ‘handle scopes’. This mechanism simplifies the management of JavaScript objects memory, since all the objects associated to a scope can be garbage-collected by V8 once the scope is deleted. Additionallly, in the same way that a C object can be created in the heap or in the stack, V8 object handlers can also be persistent or transient.

V8 objects are created and manipulated in a certain ‘execution context’. These contexts are persistent between function calls, and a scope to declare or manipulate additional JS objects can always be retrieved from the context object. C data structures and functions can also be used in JS code using ‘templates’ that wrap the C object in the JS execution context.

As an example, the following code shows the constructor of a C++ class that loads the JS parser code from a file, initializes a persistent execution context, and evaluates the JS script in that context. As a result the JS parsing function will be available into that V8 execution context.

SparqlParser::SparqlParser() {

  char script[500000];

  // load the parser js code
  std::ifstream fin;
  fin.open("sparql_parser.js", std::ios::in);
  fin.read(script,500000);
  fin.close();

  // The context where the parser will be executed
  context = Context::New();
  HandleScope handle_scope;
  Context::Scope context_scope(context);

  // compiles the parser function
  Handle parserScript = Script::Compile(String::New(script));
  parserScript->Run();

};

The method parse from the same clase, just builds the invocation to the parse JS function, retrieves a new scope in the persistent execution context and executes the actual JS function invocation:

std::string SparqlParser::parse(std::string query) {
  Context::Scope context_scope(context);
  HandleScope handle_scope;

  // executes the parser function call on the global object
  std::string query_string = "sparql_query('";
  query_string = query_string.append(query);
  query_string = query_string.append("');");

  Handle runnerScript = Script::Compile(String::New(query_string.c_str()));
  Handle result = runnerScript->Run();

  if (!result.IsEmpty() && !result->IsUndefined()) {
    String::Utf8Value str(result);
    std::string toReturn = std::string(ToCString(str));

    return toReturn;
  } else {
    std::cout << "Something went wrong";
    return NULL;
  }

The destructor of the class, just releases the persistent execution context.

To be able to use this code from a Clojure function, a mechnism to execute native code from Java code must be used. The main alternatives are the Java Native Interface (JNI) and the newer Java Native Access (JNA). JNA offers a simpler and cleaner integration with Java code, and is well supported in Clojure and Leiningen thanks to projects like clojure-jna and clj-native. Unfortunately JNA is oriented towards plain C APIs and using C++ code like V8 supposes writing a C interface layer on top of C++ classes. Taking this into account, I decided to write a JNI interface, consisting of a Java class, that will wrap the C++ code using the JNI automatically generated C interface code.

This is the Java wrapper class:

package sparqltest;

public class SparqlParser {
     static {
         System.loadLibrary("SparqlParserWrapper");
     }

    private native void init();
    private native String parse_query(String query);

    private static SparqlParser parser;

    private SparqlParser() {
        init();
    }

    public String _parse(String query){
        return parse_query(query);
    }

    public static String parse(String query) {
        if(parser == null) {
            parser = new SparqlParser();
        }
        return parser._parse(query);
    }
 }

And this is the generated C interface, with the usual conversion between wrapped Java types and C types:

#include <jni.h>
#include <stdio.h>
#include "sparql_parser.h"
#include "sparqltest_SparqlParser.h"

SparqlParser parser;

JNIEXPORT void JNICALL Java_sparqltest_SparqlParser_init (JNIEnv *env , jobject obj) {
  parser = SparqlParser();
}

JNIEXPORT jstring JNICALL Java_sparqltest_SparqlParser_parse_1query (JNIEnv *env, jobject obj, jstring javaQuery) {

  const char *query = env->GetStringUTFChars(javaQuery, 0);
  std::string result = parser.parse(std::string(query));

  jstring javaResult = env->NewStringUTF(result.c_str());

  return javaResult;
}

After compiling and packaging the Java wrapper in a Jar file, we are ready to test the code from Clojure. Before that, the C++ code must have been compiled into a library that will be loaded by the JNI framework. The library must be located somewhere in the load path of JNI, as well as the V8 library if we have decided to compile it as a shared library. This path can be configured in the Leiningen project specification using the :native-path keyword (equivalent to use the -Djava.library.path argument in the Java invocation.
Since the C code, reads the JS parser from disk, the file containing the parser must also be located in the same directory where we are running the application.

If all the paths are OK and JNI can found the native libraries, the JS parser can be invoked without problems from Clojure code:

(ns clj-sparql-test.core
  (:import sparqltest.SparqlParser)
  (:use [clojure.contrib.json :only [read-json]]))

(defn parse-sparql [query]
  (read-json (SparqlParser/parse query)))

A sample invocation:

  user> (use 'clj-sparql-test.core)
  nil
  user> (parse-sparql "SELECT * { ?s ?p ?o }")
  [{:token "prologue",
    :base "",
    :prefixes []}
   {:kind "select",
    :token "executableunit",
    :dataset [],
    :projection [{:token "variable", :kind "*"}],
    :pattern {:token "groupgraphpattern",
              :patterns [{:token "basicgraphpattern",
                          :triplesContext 
                                [{:subject {:token "var", :value "s"},
                                  :predicate {:token "var", :value "p"},
                                  :object {:token "var", :value "o"}}]}],
                          :filters []}}]

The code of this experiment along with more details about the building of the differents components, ca be found here.

OCR with Clojure, Tesseract and OpenCV

Last week I’ve been playing a little bit with computer vision and OCR. My ultimate goal was to digitalize the pile of bills I’ve stored since I moved to the UK and try to make some sense of the figures.

All my knowledge about computer vision was some previous experience with OpenCV. This wonderful library stores tons of algorithms for image transformation besides some support for utility functions to work with devices and basic UI widgets.

Research on open source OCR software led me to Tesseract, a robust OCR software originally developed by HP and now mantained by Google.

After writting some software for pre-processing bill pictures taken with an iPhone4 camera using OpenCV and then, attempting text capture with Tesseract, I had built a small C++ library that I intended to wrap using JNA so I could experiment easilly from the Clojure REPL trying different ways of preprocessing the image.

At this point I found a Clojure library called Vision by Nurullah Akkaya that wraps a good part of OpenCV in Clojure. Thanks to Vision I could just concentrate in wrapping the OCR parts exchanging pointers between both libraries in Clojure using JNA.

The clojure wrapper for Tesseract can be found at github with the instructions to install it.

Using the library is quite straightforward:

(use 'vision.core)
(use 'clj-tesseract.core)

;; create an API instance
(def *api* (make-tesseract "/tmp"))

We then can upload an image to attempt OCR or just grab one from the webcam using OpenCV/Vision

;; load an image
(def *img* (load-image "numbers_pre.tiff" :grayscale))

;; Webcam capture
(def *capture* (capture-from-cam 0))
(def *frame* (query-frame *capture*))

Once we have the image, we can try to capture text using the capture function.

(println (capture *api* *img*))
BASICS UM1 MILK £0.49
BASICS UHI MILK £0.49
CRAMULA15u SUGAR £0.98
3 BALANCE Bus £1.96
CASH £1.96
CHANGE £0.00

(end *api*)

The capture function applies some very basic preprocessing, like converting the image to grayscae and applying a threshold:

Is preprocessed to the following image:

Custom preprocessing can be applied using the capture-no-preproces function.

The result was OK for my use case, although no perfect, but thanks to the power of OpenCV many clever ways of preprocessing the image can be tried to improve text recognition. Furthermore, Tesseract is fully trainable, so another interesting way of improving results for a certain applications is to create a new language for Tesseract and then train the OCR software for that language. Creating a new language can be a little bit tricky, but there are some software that can help creating the required files. Custom languages can be used in clj-tesseract, passing the name of the language as an argument to the make-tesseract function.

Translating SPARQL queries into SQL using R2RML

RDF is the data model of the semantic web as it has been conceived by the W3C. This data model is extremely general. Because RDF has been modelled to be after the web and using web technologies, like URIs, it is also very easy to link different RDF graphs using the same mechanism that makes possible to link web pages. RDF is also finally becoming a mainstream technology. Right now, there is already a huge amount of information available as inter-linked RDF graphs, for example as part of the Linked Data initiative.

Another advantage of having information available in the web as RDF graphs is that this information, once it has been retrieved, can be easily queried using the SPARQL W3C recommendation.

SPARQL can be used to query data stored using the RDF data model in the same way SQL can be used to query data stored using the relational data model. In fact, SPARQL could be used as a API to retrieved data from a RESTful web service where each web service URI is attached to a RDF graph or subgraph.

The question from an infrastructure point of view is what software should be used to store RDF graphs and to provide a SPARQL interface to these data.

Special purpose data stores for RDF have been available for a long time: triple stores like Jena, Mulgara or AllegroGraph or Virtuoso are possible solutions. It is also possible to use graph databases supporting the RDF model like Neo4j.

Unfortunately, deploying new infrastructure requirements supposes many times a real barrier for the adoption of semantic technologies in the development of web systems. This is specially true when most of the present web applications are already built using the relational model and migrating these data to a triple store solution, or adopting a hybrid solution are not valid options.

In these situations, recent W3C proposals like R2RML can provide an easy way to introduce semantic technologies in web applications using existing relational solutions and data.

R2ML provides a standard way to lift relational data to the RDF data model. This way, web application developers can expose their information as linked data with important gains for web clients, in example, the abilitiy to link the application data to other data providers using URIs and the possibility to use standard vocabularies in the description of the data, without any modification in the underlying mechanism to store these data.

However, if a web client consuming the generated RDF wants to use SPARQL to query the data, it must first retrieve all the data from the R2RML mapped web service and save it locally in some kind of SPARQL enabled store. This problem would disappear if the web service could offer a SPARQL endpoint to the clients so they could retrieve only the fragment of the whole RDF graph they were interested in.

To accomplish this task it would be necessary to translate the SPARQL query in a semantic preserving SQL query that could be executed against the relational data mapped by the R2RML description.

The efficient translation of SPARQL into SQL is an active field of research in the academy and in the industry. In fact, a number of triple stores are built as a layer on top of a relational solution. Support for SPARQL in these RDF stores supposes the translation of the SPARQL query to a SQL query that can be executed in a certain relational schema.

Some foundational papers in the field include “A Relational Algebra for SPARQL” by Richard Cyganiak that translates the semantics of SPARQL as they were finally defined by the W3C to the Relational Algebra semantics or “Semantics preserving SPARQL-to-SQL translation” by Chebotko, Lu and Fotohui, that introduces an algorithm to translate SPARQL queries to SQL queries.

This latter paper is specially interesting because the translation mechanism is parametric on the underlying relational schema. This makes possible to adapt their translation mechanism to any relational database using a couple of mapping functions, alpha and beta, that map a triple pattern of the SPARQL query and a triple pattern and a position in the triple to a table and a column in the database.

Provided that R2RML offers a generic mechanism for the description of relational databases, in order to support SPARQL queries in any R2RML RDF graph, we just need to find an algorithm that receives as an input the R2RML mapping and builds the mapping functions required by Chebotko et alter algorithm.

The straightest way to accomplished that is using the R2RML mapping to generate a virtual table with a single relation with only subject, predicate and object. The mapping for this table is trivial. A possible implementation of this
algorithm can be found in the following Clojure code.

The code basically generates a SQL query where for each triple-map and their included term-maps in the R2RML mapping generates a SQL SELECT subquery with the right selection and projections. The final table is the UNION of all the generated subqueries.

For the example mapping included in the R2RML specification:

(def test-spec
     [{:logical-table   "select concat('_:Department',deptno) AS 
                         deptid, deptno, dname, loc from Dept"
       :class           "xyz:dept"
       :table-graph-iri "xyz:DeptGraph"
       :subject-map     {:column "deptid"}
       :property-object-map [{:property "dept:deptno"
                              :column   "deptno"
                              :datatype "xsd:positiveInteger"}
                             {:property "dept:name"
                              :column   "dname"}
                             {:property "dept:location"
                              :column   "loc"}
                             {:property       "dept:COMPANY"
                              :constant-value "XYZ Corporation"}]}])

The generated SQL query looks like this:

user> (build-sql-query test-spec)
"SELECT  deptid AS subject,  'dept:deptno' AS predicate,  
deptno AS object,'xyz:DeptGraph' AS graph FROM (select 
concat('_:Department',deptno) AS deptid, deptno, dname, 
loc from Dept) AS TBL0 UNION SELECT  deptid AS subject,
'dept:name' AS predicate,  dname AS object,  'xyz:DeptGraph' 
AS graph FROM (select concat('_:Department',deptno) AS deptid, 
deptno, dname, loc from Dept) AS TBL0 UNION SELECT  deptid AS 
subject,  'dept:location' AS predicate,  loc AS object,  
'xyz:DeptGraph' AS graph FROM (select concat('_:Department'
,deptno) AS deptid, deptno, dname, loc from Dept) AS TBL0 
UNION SELECT  deptid AS subject,'dept:COMPANY' AS predicate,  
'XYZ Corporation' AS object,  'xyz:DeptGraph' AS graph FROM 
(select concat('_:Department',deptno) AS deptid, deptno, dname, 
loc from Dept) AS TBL0"

Using this query, a view can be generated or it can be directly used to define the functions alfa and beta required by the SQL generation algorithm:

(defn alfa
  ([triples-map]
     (let [query (build-sql-query triples-map)]
       (fn [triple-pattern] (str "(" query ") AS " (table-alias))))))

(defn beta
  ([triples-map]
     (let [query (build-sql-query triples-map)]
       (fn [triple-pattern pos]
         (condp = pos
             :subject "subject"
             :predicate "predicate"
             :object "object"
             (throw (Exception. 
               (str "Unknown position for a triple " (name pos)))))))))

A sample implementation of the SQL generation algorithm can be found in this Clojure file.

With this implementation, a parsed SPARQL query, here represented using Lisp S-Expressions can be translated to the equivalent SQL query:

(trans
  (SELECT [:s :p] (tp :s "dept:name" :p))
  test-spec)

Is translate to the following semantics-preserving SQL query:

"SELECT DISTINCT s,p FROM (SELECT DISTINCT subject AS s, 
predicate AS 'dept:name', object AS p FROM 
(SELECT  deptid AS subject,  'dept:deptno' AS predicate,  
deptno AS object,  'xyz:DeptGraph' AS graph 
FROM (select concat('_:Department',deptno) AS deptid, 
deptno, dname, loc from Dept) AS TBL0 UNION 
SELECT  deptid AS subject,  'dept:name' AS predicate, 
dname AS object,  'xyz:DeptGraph' AS graph 
FROM (select concat('_:Department',deptno) AS deptid, 
deptno, dname, loc from Dept) AS TBL0 UNION 
SELECT  deptid AS subject,  'dept:location' AS predicate,
loc AS object,  'xyz:DeptGraph' AS graph 
FROM (select concat('_:Department',deptno) AS deptid, 
deptno, dname, loc from Dept) AS TBL0 UNION 
SELECT  deptid AS subject,  'dept:COMPANY' AS predicate,
'XYZ Corporation' AS object,  'xyz:DeptGraph' 
AS graph FROM (select concat('_:Department',deptno) AS 
deptid, deptno, dname, loc from Dept) AS TBL0) 
AS TBL83347e804edd4b0ca4dddcdac0b14e97 WHERE True AND 
predicate='dept:name') TBL7c7476ba56624ecd992969c37129a6b4;"

The mechanism is completely generic so more complex queries like:

(trans
 (SELECT [:s :c] (FILTER (And (Eq :s 1) (Eq :p 2))
                  (UNION
                   (AND (tp :s "dept:name" :p)
                        (tp :s "dept:COMPANY" :c)
                        (tp :s "something" :d))
                   (tp "_:1" :p :c))))
 test-spec)

could be also be translated.

The generated SQL code is far from being optimal in this sample implementation, but it is another glimpse of the possibilities R2RML introduces to boost the adoption of semantic technologies in common everyday web development.

This implementation does not take into account the use of datatypes and language support in RDF literals or the possibility of select from different graphs in the query. It neither generates any additional triple due any entailment regime.

It should be possible to improve the translation mechanism using the existing research in SPARQL to SQL translation, for example, “A Complete Translation from SPARQL into Efficient SQL” by Elliott, Cheng et alt. or to find better translation alternatives.

Another option could be extending the R2RML vocabulary with optional terms that could be used to give hints to a translation mechanism so it can provide more efficient translations for more specific database schemas.

Exploring Yahoo S4 with Clojure

S4 is a platform for processing unbounded streams of data introduced by Yahoo in November 2010.
S4 main concern is the processing of streams of data at a rate of thousands of events per second in a scalable, distributed way and offering some guarantees in case of network partition.

S4 programming model is similar to the actors model implemented in languages like Erlang’s OTP platform. This model makes easy to implement the processing of streams using a functional programming style.

In this post I will explore the main features and components of S4 using the Clojure programming language. At the same time, a Clojure library for building S4 applications, clj-s4, will be introduced.

The version of S4 used will be 0.2.1. S4 is still under heavy development so some of the features discussed here could change in the nearly future.

Streams and events

S4 is a platform for the processing of streams of data. In the S4 context a stream of data must be understood as a lazy sequence of events. Each of these events will be an instance of a Java object implemented as a java bean. Java beans are just containers for a set of fields, implementing standard methods for retrieving and modifying the state of the objects.

S4 streams have another important feature: the events in the stream have an associated key. This key can be composed of a single field or a list of fields in the Java bean object. The key of the event is used by S4 similarly to the way Hadoop map-reduce framework uses keys to route data to the different processing units.

Each of these event bean objects are the equivalent to messages exchanged by processes in similar systems like Erlang’s OTP. In this system, the main structure used to store messages data are plain tuples. The decision to use Java beans in S4 makes easy the integration of S4 events with the Java code that will be used in the rest of the application. It also offers a standard interface for manipulating the state of the objects. This is specially useful in S4 since it also uses the Spring application development framework for the application configuration and setup. In Spring state for java beans can be easily injected from the configuration file making it easier to build reusable parametric java components.

Nevertheless, the use of Java beans is not the most common way of storing the state of a program in Clojure applications. Plain maps and types created with the deftype and defrecord macros are preferred way to deal with state.

Unfortunately, when using Clojure types, the resulting Java classes will not integrate easily into the S4 model since they don’t implement the standard bean interface and, as a result, could not be easily configured in a Spring configuration file. Furthermore, the standard serialization mechanism used in S4 at the moment, the Kryo serialization library, cannot serialize most of Clojure generated classes and default types, since it is unable to deserialize fields marked as final and because it requires argument-less constructors that clojure core types, like Keyword do not implement. In fact it makes even impossible to generate a serializable bean using the gen-class macro since this macro places the state into a final field.

Clj-s4 solves this issue introducing a new macro def-s4-message similar to deferecord that generates a mutable java bean for the provided field descriptions.

As an example, the following code declares a new event to store a number that will be part of a S4 stream. Clj-s4 messages can be built from a Clojure hash map and they can be transformed back into immutable maps using the msg-to-map
function.

(use 'clj-s4.core)

;; Defines a bean NumberTest with two fields, the first
;; one, :num, is declared to be a int field.
;; The second one, :foo, will be by default an Object
;; feld
(def-s4-message cljs4.NumberTest [[int :num] :foo])

(def *num* (cjls4.Number. {:num 34 :foo "bar"}))

(.getNum *num*)
;; returns 34

(.setNum *num* 15)
(.getNum *num*)
;; returns 15

(msg-to-map *num*)
;; returns {:num 15 :foo "bar"}

S4 has at the moment support for the Avro serialization framework, that relies in the description of data structures using a neutral JSON notation, but clj-s4 does not offer any utils to work with this library.

Adding new serialization mechanisms to S4 is also an easy task. It consist only in implementing a single interface SerializerDeserializer defining two simple methods to serialize and deserialize objects and changing a value in the
Spring configuration file for the S4 cluster. Adding support for the standard Java serialization mechanism of other schemes based on JSON, YAML or even Lisp s-expressions is trivial. The only problem with these serialization mechanisms is that their serialization format is less efficient and could harm the performance of S4 as a high performance stream processing platform.

Inserting events into S4: adapters

S4 streams of events ultimately generates in the outside world. They can be periodically pulled from a web service, pushed using a web socket or read from a log file. They may also have different formats: JSON objects, XML documents or plain text lines.

Adapters are S4 components located at the boundaries of the S4 system. They are named that way because they interact with the original source of data, transform their data format into a java bean objects and insert them into the S4 system as a new stream of events.

Adapters are started or stopped independently from the rest of the S4 cluster but they use the same underlying communication infrastructure and configuration to be able to send streams of events to S4 Processing Nodes (PNs) where these events will be processed.

Adapters can be implemented with clj-s4 in two different places: the implementation of their functionality as a clojure function, and the configuration of the adapter as a Spring bean.

The implementation of the adapter can be achieved using the def-s4-adapter macro.

The following sample code shows the implementation of an adapter that generates a stream of random numbers:

(in-ns 'randomnumbers.core)

(def-s4-adapter cljs4.RandomNumberAdapter [:stream]

   :init (fn [this args]
          (.start (Thread. this)))

   :run (fn [this]
          (loop [num 0]
            (generate-event this
                            (read-state this :stream)
                            (cljs4.Number. {:num num}))
            (Thread/sleep 3000)
            (recur (int (Math/floor (* (rand) 100)))))))

The def-s4-adapter macro receives as the two first arguments the name of the class where the implementation of the adapter will be stored and a vector with keys with the names for the fields of immutable state the adapter will have. In this case, the only state will be the :stream field.

The fields of state for the adapter can be read and manipulated using the functions read-state, write-state and alter-state.

The rest of the implementation is a map with two functions keyed :init and :run.
The :init function will be invoked when the adapter is instantiated by S4 and the :run function will be transformed into an implementation of the run method from the Runnable Java interface that this adapter will implement.

In order to insert a new event into a S4 stream that could be processed, the adapter can use the function generate-event.

generate-event receives as arguments the adapter, the name of the stream where the event will be inserted and the event.

In this example the value of the stream name is stored in the state of the adapter, but this value is never initialized in the code of the adapter.
This kind of values can be injected by Spring using the second component in the definition of the adapter, the wiring of the adapter as a Spring bean.

clj-s4 offers the wire-bean function to generate the required XML for Spring. It also offers the wire-adapters function to wrap the Spring bean definition as a new S4 adapter.

(in-ns 'randomnumbers.wiring)

(wire-adapters "RandomNumbersAdapter"

 (wire-bean
  {:id "randomNumbersGenerator"
   :class "cljs4.RandomNumberAdapter"
   :properties [{:name "stream" :value "RandomNumbers"}]}))

Additional properties can be added to the array of :properties for the bean. clj-s4 will use the :id property of any bean to store it. For adapters, it will use the provided first argument as the identifier of the adapter.

This ID can be used to generate the XML for any bean, adapter or application using the s4-wiring Leiningen task.

The result of the execution of this adapter is a new stream of events, named RandomNumbers that could be processed by S4 nodes.
The RandomNumbers stream will consist of cljs4.Nuber Java beans.

Processing S4 streams: Processing Elements

The computation implemented in a S4 application is executed by Processing Elements (PEs). PEs are the equivalent of actors in actors frameworks or Erlang processes.

PEs receive events from a certain stream or collection of streams, do some computation based on the received events, and optionally, output data in one or serveral output streams.

There are two main types of PEs in S4.
Keyless PEs, does not have an associated key to the stream events. As a consequence, they will receive any event in that stream regardless of their value.
Keyed PEs events on the other hand, have an associated key to the stream of events they process.
They will only receive events in the stream with the same value for the specified stream key.

PEs are also defined by an implementation and a Spring configuration. When a S4 processing node boots, it will instantiate a new PE object for each defined PE in the Spring configuration file. The PN will store this prototype instance associated with the configured stream and key for that PE.

When a new event is received at the PN from the S4 communication layer, the PN will inspect the table of stream and keys to PE prototoypes and execute one of the following options:

  • If the PE is keyless and no PE has been associated to the stream yet, it will clone the PE prototype and pass the event to the prototype regardless of the value of the event.
  • If the event is keyless and a PE was already cloned, the new event will also be passed to the same PE instance
  • If the event has an associated key, the PN will check if a PE was cloned for this key value and it is still available. If no PE was cloned, a new one will be cloned and the event will be passed to it
  • If the event was previously cloned and has not been discarded, it will be retrieved and the event will be passed to it to be processed.

This mechanism ensures that in a PN there will be a maximum of one PE for each keyless stream and as many PEs as different key values for keyed PEs.

Periodically, PEs will be requested by the PN to output their current state. The PE can use this mechanism to persist their state, output the result of a computation to a persistent media or any other design. The frequency the PN will invoke the output of a PE can be configured in the Spring wiring for the PE to depend on time, a certain number of invocations, etc.

PEs functionality is defined in clj-s4 using two main functions, associated to keys :process-event and :output. An additional function keyed as :init can also be implemented to provide additional
initialization for the PE. The PE can also specify the class that will be generated for its bytecode and an array of fields for its state:

(in-ns 'wordcount.core)

(def-s4-pe cljs4.Tokenizer [:tokenizer :posTagger :tokenizerModel :posTaggerModel :stream]
  :init (fn [this]
          (let [tokenizer-model (read-state this :tokenizerModel)
                pos-tagger-model (read-state this :posTaggerModel)
                tokenizer (make-tokenizer tokenizer-model)
                pos-tagger (make-pos-tagger pos-tagger-model)]
            (write-state this :tokenizer tokenizer)
            (write-state this :posTagger pos-tagger)))
  :process-event [[Object] (fn [this text]
                             (let [text (msg-to-map text)
                                   text-id (:textId text)
                                   content (:content text)
                                   tokenize (read-state this :tokenizer)
                                   pos-tag (read-state this :posTagger)
                                   products (pos-tag (tokenize  content))]
                               (doseq [[token pos] products]
                                 (dispatch-event this
                                                 (read-state this :stream)
                                                 (cljs4.Word. {:content token :pos pos :textId text-id})))))]
  :output (fn [this] :not-interested))

The preceding code defines a new PE that will be stored in the cjs4.Tokenizer class.
This PE will receive from the configuration file values with the location for the :tokenizerModel and :posTaggerModel properties. It will also receive from the Spring configuration the value for the output stream.
At initialization time it will create a new tokenizer and pos-tagger using the provided models and will store it the fields :tokenizerand :pos-tagger.

The :process-event function will be invoked whenever the PE receives a new event from some stream, according to its configuration. The result of its computation will be outputted to the configured output stream using the dispatch-event function. In the previous example, the PE does not output any value.

The wiring of a PE in clj-s4 can be accomplished using the wire-pe function:

(wire-pe {:id "tokenizer"
          :class "cljs4.Tokenizer"
          :keys  ["Sentences textId"]
          :properties
          [{:name "tokenizerModel" :value "/PATH/TO/models/en-token.bin"}
           {:name "posTaggerModel" :value "/PATH/TO/models/en-pos-maxent.bin"}
           {:name "dispatcher" :ref "wordsDispatcher"}
           {:name "stream" :value "Tokens"}]})

With this configuration, the PE is connected to the input Sentences stream and will output the pos-tagged tokens to the Tokens stream.
Using the wire-pe function the values for the keys :id, :class, :keys are mandatory.

Events distribution in a S4 cluster

When a S4 adapter generates a new event in a stream or a S4 PE outputs an event, the S4 infrastructure must decide to which PN in the S4 cluster the event is routed for its processing.

The component taking the event routing decisions is the dispatcher. When a component emits an event in a stream, the dispatcher retrieves a partitioner object configured for that stream. The partitioner configuration includes a hash key and a hash function for that key that, provided the value for the hash key in the event object, always output the same cluster processing
node. In this way, the election of the hash function, determines the load balancing in the cluster. Configuration for each PN in the cluster can be retrieved at run-time by other PNs using ZooKeeper, or from a local file in the 'red-button' not distributed mode of operation.

clj-s4 allows the configuration of partitioners and dispatchers using the wire-partitioner and wire-dispatcher functions.

The following code sample shows how a dispatcher, using a single partitioner for the streams RandomNumbers, OddNumbersand EvenNumbers, can be wired in the application Spring configuration.

The partitioner will use the num property of the events beans to dispatch the event to the right PN in the cluster.

(wire-partitioner {:id "mapPartitioner"
                    :stream-names ["RandomNumbers" "OddNumbers" "EvenNumbers"]
                    :hash-keys ["num"]})

 (wire-dispatcher {:id "numbersDispatcher"
                   :partitioners ["mapPartitioner"]})

Deployment and application setup

In order to run a distributed S4 application, the application classes and dependent jars must be bundled in an application directory inside the S4 image directory, also containing the configuration files for the application and for the adapters.

clj-s4 makes possible the creation of the S4 application bundle from some special annotations in the Leiningen project file:

(defproject randomnumbers "1.0.0-SNAPSHOT"
  :description "a demo S4 app built using clj-s4"
  :dependencies [[org.clojure/clojure "1.2.0"]
                 [org.clojure/clojure-contrib "1.2.0"]
                 [clj-s4 "0.2.1.1-SNAPSHOT"]]
  :dev-dependencies [[clj-s4 "0.2.1-SNAPSHOT"]]
  :aot [randomnumbers.core]

  ;; configuration of a S4 application
  :s4-app {:name "RandomNumbers"
           :namespace "randomnumbers.core"
           :configuration ["randomnumbers.wiring" "RandomNumbers"]
           :adapters ["randomnumbers.wiring" "RandomNumbersAdapter"]})

The previous code defines a S4 application called RandomNumbers and defines the application configuration and adapter wiring files.

Wiring information and the implementation of the functionality are stored in different namespaces: randomnumbers.wiring and randomnumbers.core.

Using this information the application can be generated using the lein s4-deploy Leiningen task.
If no argument is passed to the task, the application will be deployed into a subdirectory named as the value of the :name key in the project file, RandomNumbers in this case, inside a local s4 directory. If a path is passed as an argument the application will be deployed into that target path.

Conclusions

S4 is a young platform but it has the potential of offer a whole new range of possibilities when dealing with massive data applications. Stream or complex event processing frameworks have been restricted to certain application domains but S4 can bring this computational paradigm to a wider audience of web developers already dealing with big data applications, making possible to
develop new kind of features for these applications.

S4 design emphasizes re-usability. The use of spring makes easy to build very generic building blocks that can be later be configure for an specific use case. If S4 continues to mature and collections of library of re-usable PEs and adapters become available, S4 will become a very interesting development framework for distributed applications.