Blog

April 9, 2014

Reactive Programming Patterns in Akka using Java 8

@kipsigman
April 9, 2014
Akka
Java8

With the recent release of Java 8 we’ve been very excited to show how to use the new language features when building Reactive applications. I previously detailed how to use Java 8 with the Play Framework and this is a follow-up post introducing Akka with Java 8.

Akka is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant event-driven applications on the JVM. It uses the Actor model as an abstraction for concurrency. I’m going to demonstrate some of the basic programming patterns for Akka actors using Java 8 Lambda Expressions.

The basic work unit in Akka is the actor. An actor is a container for state and behavior and can create and supervise child actors. Actors interact with each other through asynchronous message passing and can only process one message at a time. This model protects an actor’s internal state, makes it thread safe, and implements event-driven behavior which won’t block other actors.

Here’s a sample Actor definition taken from the Typesafe Activator template Hello Akka (Java 8):

public class Greeter extends AbstractActor {

   String greeting = "";

   public Greeter() {

       receive(ReceiveBuilder.

           match(WhoToGreet.class, message -> greeting = "hello, " + message.who).
            match(Greet.class, message -> sender().tell(new Greeting(greeting), self())).
            build());
    }
}

The actor’s initial behavior is defined by using a ReceiveBuilder to install a function which can match incoming messages and execute related behavior. Note that behavior for each message is defined using a Lambda Expression.

It’s common for an actor to delegate work to a child actor which can help provide resiliency and responsiveness. For example:

public class HandoffActor extends AbstractActor {{
  receive(ReceiveBuilder.matchEquals("hello", s -> {
    ActorRef worker = 
      context().actorOf(Props.create(AbstractActor.class, 
        () -> new AbstractActor() {{
          receive(ReceiveBuilder.
            match(String.class, s2 -> {
              // some long running or dangerous computation...
              sender().tell(s2.toUpperCase(), self());
              self().tell(PoisonPill.getInstance(), self());
            }).build());
        }}));
    worker.forward(s, context());
  }).build());
}}

When this actor receives a message it spawns a child actor to do the actual work and forwards the message to this worker. The worker performs some work, sends a message to the sender, and then destroys itself with a PoisonPill. Because the message was forwarded, the worker’s call to sender() is a reference to the original message sender rather than its parent.

Actors can manage exceptions thrown by their children by defining a SupervisorStrategy. Here’s an example from the Activator template Akka Supervision in Java with Lambdas:

private SupervisorStrategy strategy = new OneForOneStrategy(false, DeciderBuilder.
      match(FlakinessException.class, e -> SupervisorStrategy.restart()).
      match(ArithmeticException.class, e -> SupervisorStrategy.stop()).
      match(Throwable.class, e -> SupervisorStrategy.stop()).build());
    
  @Override
  public SupervisorStrategy supervisorStrategy() {
      return strategy;
  }

The strategy defines a set of exception types that may be thrown by child actors and maps them to actions to take, such as restarting the child. The pattern of DeciderBuilder is similar to ReceiveBuilder and uses lambda expressions in the same manner.

Actors can swap to new behavior at runtime and revert to previous behaviors. This is an example from the Akka documentation for Actors with Lambda Support:

public class Swapper extends AbstractLoggingActor {
  public Swapper() {
    receive(ReceiveBuilder.
      matchEquals(Swap, s -> {
          log().info("I am an F.B.I. agent!");
          context().become(ReceiveBuilder.
                  matchEquals(Swap, x -> {
                    log().info("I know, isn’t it wild?");
                    context().unbecome(); // go back to previous behavior
                  }).build(), false); // push on top and keep old behavior
      }).build()
    );
  }
}

The first message this actor receives will cause it to log “I am an F.B.I. agent!” and then install new behavior using context().become(). The next message will use the new behavior which will log “I know, itn’t it wild?” and then revert to the previous behavior using context().unbecome(). This ability for an actor to store and change behavior is considered a powerful abstraction for managing state and lifecycle.

The last thing I’d like to demonstrate is how to define an actor as a Finite State Machine (FSM) by using one of Akka’s actor extensions. Here’s an example from the Akka FSM in Java with Lambdas template:

/**
   * Some states the chopstick can be in
   */
  public static enum CS {
    Available,
    Taken
  }

  /**
   * Some state container for the chopstick
   */
  public static final class TakenBy {
    public final ActorRef hakker;
    public TakenBy(ActorRef hakker){
      this.hakker = hakker;
    }
  }

  /*
  * A chopstick is an actor, it can be taken, and put back
  */
  public static class Chopstick extends AbstractLoggingFSM {
    {
    // A chopstick begins its existence as available and taken by no one
    startWith(CS.Available, new TakenBy(context().system().deadLetters()));

    // When a chopstick is available, it can be taken by a some hakker
    when(CS.Available,
      matchEventEquals(Take, (take, data) ->
        goTo(CS.Taken).using(new TakenBy(sender())).replying(new Taken(self()))));

    // When a chopstick is taken by a hakker
    // It will refuse to be taken by other hakkers
    // But the owning hakker can put it back
    when(CS.Taken,
      matchEventEquals(Take, (take, data) ->
        stay().replying(new Busy(self()))).
      event((event, data) -> (event == Put) && (data.hakker == sender()), (event, data) ->
        goTo(CS.Available).using(new TakenBy(context().system().deadLetters()))));

    // Initialze the chopstick
    initialize();
    }
  }

 

A Chopstick can be in one of two states, Available or Taken, and stores data of type TakenBy. The starting state and data is defined by startWith() and behavior for each state is defined by a call to when(). By defining the states and transitions this actor models a chopstick which can be taken and returned by other actors. If you’re interested in learning more about how to use FSM actors, see the Akka docs for FSM (Java with Lambda Support).

This covers a basic introduction to Akka actors with Java 8 programming patterns. To learn more, install Java 8, download Typesafe Activator, and start playing with these templates:

 

 

comments powered by Disqus
Browse Recent Blog Posts