Typesafe Activator

CEP with Akka and Esper or Streams

CEP with Akka and Esper or Streams

fsauer65
Source
May 10, 2014
akka scala esper cep scalavro union-type reactivestreams akkastreams scaladays2014

An example showing two ways to use the Esper Complex Event Processing (CEP) engine with Akka as well as a implementation of the same algorithm using Akka Streams

How to get "CEP with Akka and Esper or Streams" on your computer

There are several ways to get this template.

Option 1: Choose akka-with-esper in the Typesafe Activator UI.

Already have Typesafe Activator (get it here)? Launch the UI then search for akka-with-esper in the list of templates.

Option 2: Download the akka-with-esper project as a zip archive

If you haven't installed Activator, you can get the code by downloading the template bundle for akka-with-esper.

  1. Download the Template Bundle for "CEP with Akka and Esper or Streams"
  2. Extract the downloaded zip file to your system
  3. The bundle includes a small bootstrap script that can start Activator. To start Typesafe Activator's UI:

    In your File Explorer, navigate into the directory that the template was extracted to, right-click on the file named "activator.bat", then select "Open", and if prompted with a warning, click to continue:

    Or from a command line:

     C:\Users\typesafe\akka-with-esper> activator ui 
    This will start Typesafe Activator and open this template in your browser.

Option 3: Create a akka-with-esper project from the command line

If you have Typesafe Activator, use its command line mode to create a new project from this template. Type activator new PROJECTNAME akka-with-esper on the command line.

Option 4: View the template source

The creator of this template maintains it at https://github.com/fsauer65/akka-esper-integration#master.

Option 5: Preview the tutorial below

We've included the text of this template's tutorial below, but it may work better if you view it inside Activator on your computer. Activator tutorials are often designed to be interactive.

Preview the tutorial

Introduction

Esper is a popular open source Complex Event Processing (CEP) engine. The ideas of CEP are a good fit with the Akka actor model and this code explores these ideas. It is also described in a series of blog posts on franklysauer.com. The code is all available on github and you can use it any way you see fit.

The next 2 sections explain how Esper can be wrapped up in a few scala traits (by no means meant to be a comprehensive scala API to esper!), and the 2 sections after that show how you can use those traits to embed esper in either an Akka actor or an akka event bus.

This is not meant to be an EPL tutorial. For details on EPL please read the EPL reference. The sample EPL used in this code implements a simple and totally fake trading algorithm. It generates a Buy iff the moving average of a given window length is higher than the oldest price in that window (in other words, it is trending upwards). To do so, it inserts incoming Price events into two streams, a 'Delayer' and an 'Averager'. These two streams are then joined to generate the trades.

The Price events inserted into the CEP engine are instances of a scala Price case class. This works because its fields are annotated with @BeanProperty and it gets registered as an Esper Event type.

Wrapping Esper

The EsperEngine trait encapsulates the bare minimum code required to get a functional esper engine up and running. It is written as a trait so that it can be used both with an Actor and an Event Bus as shown in the next two sections.

Note that epService and epRuntime are declared lazy. This is needed to ensure they are not instantiated until after all the event types are registered with the esperConfig. When using this trait it is critical that you register all the required esper event types BEFORE creating any EPL statements using createEPL.

Creating EPL statements is done with createEPL which takes the statement text and a function that is invoked when the statement fires and produces events. The parameter type to the callback is EsperEvent. Here is an Example:


createEPL(s"""
        insert into Averages
        select symbol,avg(price) as price
        from Price.std:groupwin(symbol).win:length_batch($windowSize) group by symbol
        """) { evt => println(s"Got a ${evt.evtType}: ${evt.underlying}")

}

Esper Modules

In the previous section we saw how to register single EPL statements using createEPL. Esper also supports the concept of modules that can contain multiple statements, import other modules, etc. See Esper Module Syntax for all the details.

The EsperModule trait wraps this idea. You can mix-in this trait into anything that also mixes in the EsperEngine trait which is enforced by the self type.

Use installModule to deploy an esper module into the engine. Since a module may contain many EPL statements and we are probably not interested in many of the intermediary events being passed from stream to stream, only statements annotated with a @Name annotation will get a listener installed on them. The eventType of the resulting EsperEvent is the value of this @Name annotation. The following shows an example with a module defined in an embedded string. Modules can also be loaded from various other sources.


     installModule(s"""
        module SimpleAverageTrader;

        insert rstream into Delayed
        select rstream symbol,price
        from Price.std:groupwin(symbol).win:length(${windowSize-1});

        insert into Averages
        select symbol,avg(price) as price
        from Price.std:groupwin(symbol).win:length_batch($windowSize) group by symbol;

        @Name("Buy")
        insert into Buy
        select p.symbol, p.price, $orderSize as amount
        from Price.std:unique(symbol) p
        join Delayed.std:unique(symbol) d on d.symbol = p.symbol
        join Averages a unidirectional on a.symbol = p.symbol
        where a.price > d.price;
        """) { evt => // will only get called when the Buy rule fires
           println(s"Got a ${evt.evtType}: ${evt.underlying}")
     }
    

CEP in an Akka Actor

In this section we will explore embedding Esper inside an actor.

This EsperActor will accept messages to register esper event types, install EPL statement or entire modules, and finally to start processing events.

All the messages are defined in the companion object to the esper actor class. On receipt of the StartProcessing message the actor uses become to change its behavior so that all subsequent messages are inserted into the now initialized and started esper engine.

The DeployStatement message contains an Option[ActorRef]. This actor ref is used as the receiver for EsperEvent messages that are sent when the EPL statement invokes its update listener. If you don't require a callback simply use None.

The DeployModule message is very similar but it deploys an entire esper module and since those contain multiple statements, the listeners are a map that maps event types to actor refs. If you want to send the events produced by a statement with annotation @Name("Foo") to actor ref Bar, use Map("Foo"->Bar).

The EsperActorExample shows how to use this actor using individual EPL statements. The EsperActorModuleExample demonstrates the use of the DeployModule message.

Note that there is a lot of room for improvement here, especially with respect to error handling. We should probably use a supervisor to deal with exceptions, especially during the initialization, where we can encounter bad EPL or unregistered event types, etc.

CEP in an Akka Event Bus

An alternative way to incorporate esper in an actor system is by using it as a message classifier in an Eventbus. This is implemented in the EsperClassification trait, which extends the standard akka eventbus LookupClassification trait. This trait defines an abstract type EsperEvents that must be defined in a concrete implementation of an event bus using this EsperClassification trait.

To make things a bit more interesting, it makes use of the Union type from the scalavro library, which in turn is based on this amazing type sorcery...

This allows us to define multiple event types as acceptable to be posted on the bus. This is type checked at compile time, and they could be any combination of primitives, case classes, collections, etc. Any calls to broadcast with type outside the Union will result in a compile time error; for example, calling EsperModuleEventBus.broadcast("Hello there!") in the EsperEventBusExample will result in:

        Cannot prove that com.gensler.scalavro.util.Union.not[com.gensler.scalavro.util.Union.not[String]] <:< experiments.esperakka.EsperModuleEventBus.EsperEvents.
        EsperModuleEventBus.broadcast("Hello there!")
        ^
    
All events put on the bus using broadcast are inserted into the esper engine and the bus - which has registered itself as the update listener with the EPL statements - will publish all the EsperEvents coming out of the esper engine with the event type as the topic. It does so using the mechanisms of its parent LookupClassification trait, so any actor that has subscribed to the event type will receive these events. The examples show how to subscribe. There are two example bus implementations defined in the example. One example uses the epl method to install individual EPL statements, the other mixes in the ExampleEsperModule trait to deploy an entire esper module into the bus. Defining a bus this way is so compact it can be shown right here:

    object EsperModuleEventBus extends ActorEventBus with EsperClassification with ExampleEsperModule {
      type EsperEvents = union[Price] #or [Sell] #or [Buy]
      override def esperEventTypes = new Union[EsperEvents]
    }
    
The override def esperEventTypes = new Union[EsperEvents] will always be exactly same, but unfortunately we can't pull it up because of a technicality of TypeTags...

CEP using Akka Streams

As a final example let's look at how to implement this same algorithm without Esper but rather using the recently announced Akka Streams.

StreamCEP is the akka stream implementation of the algorithm. It demonstrates the Flow DSL and some of its primitives like groupBy, grouped , filter and map to implement the same rule implemented earlier using esper.

For a step-by-step walk-through see my blog

comments powered by Disqus