Exploring Yahoo S4 with Clojure

S4 is a platform for processing unbounded streams of data introduced by Yahoo in November 2010.
S4 main concern is the processing of streams of data at a rate of thousands of events per second in a scalable, distributed way and offering some guarantees in case of network partition.

S4 programming model is similar to the actors model implemented in languages like Erlang’s OTP platform. This model makes easy to implement the processing of streams using a functional programming style.

In this post I will explore the main features and components of S4 using the Clojure programming language. At the same time, a Clojure library for building S4 applications, clj-s4, will be introduced.

The version of S4 used will be 0.2.1. S4 is still under heavy development so some of the features discussed here could change in the nearly future.

Streams and events

S4 is a platform for the processing of streams of data. In the S4 context a stream of data must be understood as a lazy sequence of events. Each of these events will be an instance of a Java object implemented as a java bean. Java beans are just containers for a set of fields, implementing standard methods for retrieving and modifying the state of the objects.

S4 streams have another important feature: the events in the stream have an associated key. This key can be composed of a single field or a list of fields in the Java bean object. The key of the event is used by S4 similarly to the way Hadoop map-reduce framework uses keys to route data to the different processing units.

Each of these event bean objects are the equivalent to messages exchanged by processes in similar systems like Erlang’s OTP. In this system, the main structure used to store messages data are plain tuples. The decision to use Java beans in S4 makes easy the integration of S4 events with the Java code that will be used in the rest of the application. It also offers a standard interface for manipulating the state of the objects. This is specially useful in S4 since it also uses the Spring application development framework for the application configuration and setup. In Spring state for java beans can be easily injected from the configuration file making it easier to build reusable parametric java components.

Nevertheless, the use of Java beans is not the most common way of storing the state of a program in Clojure applications. Plain maps and types created with the deftype and defrecord macros are preferred way to deal with state.

Unfortunately, when using Clojure types, the resulting Java classes will not integrate easily into the S4 model since they don’t implement the standard bean interface and, as a result, could not be easily configured in a Spring configuration file. Furthermore, the standard serialization mechanism used in S4 at the moment, the Kryo serialization library, cannot serialize most of Clojure generated classes and default types, since it is unable to deserialize fields marked as final and because it requires argument-less constructors that clojure core types, like Keyword do not implement. In fact it makes even impossible to generate a serializable bean using the gen-class macro since this macro places the state into a final field.

Clj-s4 solves this issue introducing a new macro def-s4-message similar to deferecord that generates a mutable java bean for the provided field descriptions.

As an example, the following code declares a new event to store a number that will be part of a S4 stream. Clj-s4 messages can be built from a Clojure hash map and they can be transformed back into immutable maps using the msg-to-map

(use 'clj-s4.core)

;; Defines a bean NumberTest with two fields, the first
;; one, :num, is declared to be a int field.
;; The second one, :foo, will be by default an Object
;; feld
(def-s4-message cljs4.NumberTest [[int :num] :foo])

(def *num* (cjls4.Number. {:num 34 :foo "bar"}))

(.getNum *num*)
;; returns 34

(.setNum *num* 15)
(.getNum *num*)
;; returns 15

(msg-to-map *num*)
;; returns {:num 15 :foo "bar"}

S4 has at the moment support for the Avro serialization framework, that relies in the description of data structures using a neutral JSON notation, but clj-s4 does not offer any utils to work with this library.

Adding new serialization mechanisms to S4 is also an easy task. It consist only in implementing a single interface SerializerDeserializer defining two simple methods to serialize and deserialize objects and changing a value in the
Spring configuration file for the S4 cluster. Adding support for the standard Java serialization mechanism of other schemes based on JSON, YAML or even Lisp s-expressions is trivial. The only problem with these serialization mechanisms is that their serialization format is less efficient and could harm the performance of S4 as a high performance stream processing platform.

Inserting events into S4: adapters

S4 streams of events ultimately generates in the outside world. They can be periodically pulled from a web service, pushed using a web socket or read from a log file. They may also have different formats: JSON objects, XML documents or plain text lines.

Adapters are S4 components located at the boundaries of the S4 system. They are named that way because they interact with the original source of data, transform their data format into a java bean objects and insert them into the S4 system as a new stream of events.

Adapters are started or stopped independently from the rest of the S4 cluster but they use the same underlying communication infrastructure and configuration to be able to send streams of events to S4 Processing Nodes (PNs) where these events will be processed.

Adapters can be implemented with clj-s4 in two different places: the implementation of their functionality as a clojure function, and the configuration of the adapter as a Spring bean.

The implementation of the adapter can be achieved using the def-s4-adapter macro.

The following sample code shows the implementation of an adapter that generates a stream of random numbers:

(in-ns 'randomnumbers.core)

(def-s4-adapter cljs4.RandomNumberAdapter [:stream]

   :init (fn [this args]
          (.start (Thread. this)))

   :run (fn [this]
          (loop [num 0]
            (generate-event this
                            (read-state this :stream)
                            (cljs4.Number. {:num num}))
            (Thread/sleep 3000)
            (recur (int (Math/floor (* (rand) 100)))))))

The def-s4-adapter macro receives as the two first arguments the name of the class where the implementation of the adapter will be stored and a vector with keys with the names for the fields of immutable state the adapter will have. In this case, the only state will be the :stream field.

The fields of state for the adapter can be read and manipulated using the functions read-state, write-state and alter-state.

The rest of the implementation is a map with two functions keyed :init and :run.
The :init function will be invoked when the adapter is instantiated by S4 and the :run function will be transformed into an implementation of the run method from the Runnable Java interface that this adapter will implement.

In order to insert a new event into a S4 stream that could be processed, the adapter can use the function generate-event.

generate-event receives as arguments the adapter, the name of the stream where the event will be inserted and the event.

In this example the value of the stream name is stored in the state of the adapter, but this value is never initialized in the code of the adapter.
This kind of values can be injected by Spring using the second component in the definition of the adapter, the wiring of the adapter as a Spring bean.

clj-s4 offers the wire-bean function to generate the required XML for Spring. It also offers the wire-adapters function to wrap the Spring bean definition as a new S4 adapter.

(in-ns 'randomnumbers.wiring)

(wire-adapters "RandomNumbersAdapter"

  {:id "randomNumbersGenerator"
   :class "cljs4.RandomNumberAdapter"
   :properties [{:name "stream" :value "RandomNumbers"}]}))

Additional properties can be added to the array of :properties for the bean. clj-s4 will use the :id property of any bean to store it. For adapters, it will use the provided first argument as the identifier of the adapter.

This ID can be used to generate the XML for any bean, adapter or application using the s4-wiring Leiningen task.

The result of the execution of this adapter is a new stream of events, named RandomNumbers that could be processed by S4 nodes.
The RandomNumbers stream will consist of cljs4.Nuber Java beans.

Processing S4 streams: Processing Elements

The computation implemented in a S4 application is executed by Processing Elements (PEs). PEs are the equivalent of actors in actors frameworks or Erlang processes.

PEs receive events from a certain stream or collection of streams, do some computation based on the received events, and optionally, output data in one or serveral output streams.

There are two main types of PEs in S4.
Keyless PEs, does not have an associated key to the stream events. As a consequence, they will receive any event in that stream regardless of their value.
Keyed PEs events on the other hand, have an associated key to the stream of events they process.
They will only receive events in the stream with the same value for the specified stream key.

PEs are also defined by an implementation and a Spring configuration. When a S4 processing node boots, it will instantiate a new PE object for each defined PE in the Spring configuration file. The PN will store this prototype instance associated with the configured stream and key for that PE.

When a new event is received at the PN from the S4 communication layer, the PN will inspect the table of stream and keys to PE prototoypes and execute one of the following options:

  • If the PE is keyless and no PE has been associated to the stream yet, it will clone the PE prototype and pass the event to the prototype regardless of the value of the event.
  • If the event is keyless and a PE was already cloned, the new event will also be passed to the same PE instance
  • If the event has an associated key, the PN will check if a PE was cloned for this key value and it is still available. If no PE was cloned, a new one will be cloned and the event will be passed to it
  • If the event was previously cloned and has not been discarded, it will be retrieved and the event will be passed to it to be processed.

This mechanism ensures that in a PN there will be a maximum of one PE for each keyless stream and as many PEs as different key values for keyed PEs.

Periodically, PEs will be requested by the PN to output their current state. The PE can use this mechanism to persist their state, output the result of a computation to a persistent media or any other design. The frequency the PN will invoke the output of a PE can be configured in the Spring wiring for the PE to depend on time, a certain number of invocations, etc.

PEs functionality is defined in clj-s4 using two main functions, associated to keys :process-event and :output. An additional function keyed as :init can also be implemented to provide additional
initialization for the PE. The PE can also specify the class that will be generated for its bytecode and an array of fields for its state:

(in-ns 'wordcount.core)

(def-s4-pe cljs4.Tokenizer [:tokenizer :posTagger :tokenizerModel :posTaggerModel :stream]
  :init (fn [this]
          (let [tokenizer-model (read-state this :tokenizerModel)
                pos-tagger-model (read-state this :posTaggerModel)
                tokenizer (make-tokenizer tokenizer-model)
                pos-tagger (make-pos-tagger pos-tagger-model)]
            (write-state this :tokenizer tokenizer)
            (write-state this :posTagger pos-tagger)))
  :process-event [[Object] (fn [this text]
                             (let [text (msg-to-map text)
                                   text-id (:textId text)
                                   content (:content text)
                                   tokenize (read-state this :tokenizer)
                                   pos-tag (read-state this :posTagger)
                                   products (pos-tag (tokenize  content))]
                               (doseq [[token pos] products]
                                 (dispatch-event this
                                                 (read-state this :stream)
                                                 (cljs4.Word. {:content token :pos pos :textId text-id})))))]
  :output (fn [this] :not-interested))

The preceding code defines a new PE that will be stored in the cjs4.Tokenizer class.
This PE will receive from the configuration file values with the location for the :tokenizerModel and :posTaggerModel properties. It will also receive from the Spring configuration the value for the output stream.
At initialization time it will create a new tokenizer and pos-tagger using the provided models and will store it the fields :tokenizerand :pos-tagger.

The :process-event function will be invoked whenever the PE receives a new event from some stream, according to its configuration. The result of its computation will be outputted to the configured output stream using the dispatch-event function. In the previous example, the PE does not output any value.

The wiring of a PE in clj-s4 can be accomplished using the wire-pe function:

(wire-pe {:id "tokenizer"
          :class "cljs4.Tokenizer"
          :keys  ["Sentences textId"]
          [{:name "tokenizerModel" :value "/PATH/TO/models/en-token.bin"}
           {:name "posTaggerModel" :value "/PATH/TO/models/en-pos-maxent.bin"}
           {:name "dispatcher" :ref "wordsDispatcher"}
           {:name "stream" :value "Tokens"}]})

With this configuration, the PE is connected to the input Sentences stream and will output the pos-tagged tokens to the Tokens stream.
Using the wire-pe function the values for the keys :id, :class, :keys are mandatory.

Events distribution in a S4 cluster

When a S4 adapter generates a new event in a stream or a S4 PE outputs an event, the S4 infrastructure must decide to which PN in the S4 cluster the event is routed for its processing.

The component taking the event routing decisions is the dispatcher. When a component emits an event in a stream, the dispatcher retrieves a partitioner object configured for that stream. The partitioner configuration includes a hash key and a hash function for that key that, provided the value for the hash key in the event object, always output the same cluster processing
node. In this way, the election of the hash function, determines the load balancing in the cluster. Configuration for each PN in the cluster can be retrieved at run-time by other PNs using ZooKeeper, or from a local file in the 'red-button' not distributed mode of operation.

clj-s4 allows the configuration of partitioners and dispatchers using the wire-partitioner and wire-dispatcher functions.

The following code sample shows how a dispatcher, using a single partitioner for the streams RandomNumbers, OddNumbersand EvenNumbers, can be wired in the application Spring configuration.

The partitioner will use the num property of the events beans to dispatch the event to the right PN in the cluster.

(wire-partitioner {:id "mapPartitioner"
                    :stream-names ["RandomNumbers" "OddNumbers" "EvenNumbers"]
                    :hash-keys ["num"]})

 (wire-dispatcher {:id "numbersDispatcher"
                   :partitioners ["mapPartitioner"]})

Deployment and application setup

In order to run a distributed S4 application, the application classes and dependent jars must be bundled in an application directory inside the S4 image directory, also containing the configuration files for the application and for the adapters.

clj-s4 makes possible the creation of the S4 application bundle from some special annotations in the Leiningen project file:

(defproject randomnumbers "1.0.0-SNAPSHOT"
  :description "a demo S4 app built using clj-s4"
  :dependencies [[org.clojure/clojure "1.2.0"]
                 [org.clojure/clojure-contrib "1.2.0"]
                 [clj-s4 ""]]
  :dev-dependencies [[clj-s4 "0.2.1-SNAPSHOT"]]
  :aot [randomnumbers.core]

  ;; configuration of a S4 application
  :s4-app {:name "RandomNumbers"
           :namespace "randomnumbers.core"
           :configuration ["randomnumbers.wiring" "RandomNumbers"]
           :adapters ["randomnumbers.wiring" "RandomNumbersAdapter"]})

The previous code defines a S4 application called RandomNumbers and defines the application configuration and adapter wiring files.

Wiring information and the implementation of the functionality are stored in different namespaces: randomnumbers.wiring and randomnumbers.core.

Using this information the application can be generated using the lein s4-deploy Leiningen task.
If no argument is passed to the task, the application will be deployed into a subdirectory named as the value of the :name key in the project file, RandomNumbers in this case, inside a local s4 directory. If a path is passed as an argument the application will be deployed into that target path.


S4 is a young platform but it has the potential of offer a whole new range of possibilities when dealing with massive data applications. Stream or complex event processing frameworks have been restricted to certain application domains but S4 can bring this computational paradigm to a wider audience of web developers already dealing with big data applications, making possible to
develop new kind of features for these applications.

S4 design emphasizes re-usability. The use of spring makes easy to build very generic building blocks that can be later be configure for an specific use case. If S4 continues to mature and collections of library of re-usable PEs and adapters become available, S4 will become a very interesting development framework for distributed applications.


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s