Blog

October 27, 2011

Non-blocking Message Flow with Akka Actors

October 27, 2011

by Patrik Nordwall

Akka is an event-driven middleware framework, implemented in Scala, for building high performance and reliable distributed applications.  Akka decouples business logic from low-level mechanisms such as threads, locks and non-blocking IO. Your Scala or Java program logic lives in lightweight actor objects which send and receive messages. With Akka, you can easily configure how actors will be created, destroyed, scheduled, and restarted upon failure.

This article will give you a deeper understanding of how Akka actors are related to threads. It illustrates how bad resource utilization your application may have if you use actors in a blocking fashion, and what to do instead of blocking.

The example we will use is a translation service that delivers two things as response to HTTP requests:

  • translated text
  • word count

The translation takes 100 ms to perform, and so does the word count.

We are only allowed to use 8 threads for the actors. That is indeed a low limit, but in the end you will have an upper limit somewhere of the number of threads that can be used.

The application must be able to handle 4 concurrent client requests with an average response time of less than 110 ms.

We use Akka’s support for async HTTP request, which means that we don’t block the incoming request thread.

To be able to handle 4 concurrent request we must create at least 4 frontend actors and use a dispatcher with 4 threads.

Let’s start with the wrong way of doing it and then change to a better solution.

object Frontend {

  val frontendDispatcher = Dispatchers
    .newExecutorBasedEventDrivenDispatcher("frontend-dispatcher")
    .setCorePoolSize(4)
    .build

  def loadBalanced(poolSize: Int, actor: => ActorRef): ActorRef = {
    val workers = Vector.fill(poolSize)(actor.start())
    Routing.loadBalancerActor(CyclicIterator(workers)).start()
  }

  class WebEndpoint(root: ActorRef) extends Actor with Endpoint {
    self.dispatcher = frontendDispatcher

    val translate = loadBalanced(10, actorOf[TranslateHandler])

    override def preStart() = {
      root ! Endpoint.Attach(hook, provide)
    }

    def receive = handleHttpRequest
    def hook(uri: String) = true
    def provide(uri: String) = translate
  }

Above we have created 10 instances of TranslateHandler actor, managed by a round robin router. The TranslateHandler looks like this:

class TranslateHandler extends Actor {
    self.dispatcher = frontendDispatcher

    def receive = {
      case get: Get =>
        val text = get.request.getParameter("text")
        (translationService ? ranslationRequest(text))
          .as[TranslationResponse] match {
            case Some(TranslationResponse(translated, words)) =>
              get.OK("Translated %s words to: %s".format(words, translated))
            case None =>
              get.Timeout("Timeout")
        }
  }

It asks the translationService to perform the translation and count the words. It blocks until the result is available, and then completes the HTTP request with OK.

For the backend TranslationService we also define a dispatcher and pool of service instances:

val backendDispatcher = Dispatchers
    .newExecutorBasedEventDrivenDispatcher("backend-dispatcher")
    .setCorePoolSize(4)
    .build

  val translationService = loadBalanced(10, actorOf[TranslationService])

The TranslationService and the workers looks like this:

class TranslationService extends Actor {
    self.dispatcher = backendDispatcher

    val translator = actorOf[Translator].start()
    val counter = actorOf[Counter].start()

    def receive = {
      case TranslationRequest(text) =>
        val future1 = (translator ? text)
        val future2 = (counter ? text)

        val result1 = future1.as[String]
        val result2 = future2.as[Int]

        for (translatedText <- result1; words <- result2) {
          self.reply(TranslationResponse(translatedText, words))
        }
    }
  }

  class Translator extends Actor {
    self.dispatcher = backendDispatcher

    def receive = {
      case x: String =>
        // simulate some work
        Thread.sleep(100)
        val result = x.toUpperCase
        self.reply(result)
    }
  }

  class Counter extends Actor {
    self.dispatcher = backendDispatcher

    def receive = {
      case x: String =>
        // simulate some work
        Thread.sleep(100)
        val result = x.split(" ").length
        self.reply(result)
    }
  }

In the receive method of the TranslationService you can see how the text translation and word count is done in parallel.

Running this with JMeter and one client thread the result looks fine. Average response time of 103 ms. When increasing to 2 client threads the response time goes up to 193 ms. What is wrong? The blocking consumes threads, and we only have a few. In this case each request eats 4 threads.

Obviously we can do better. Let’s change the TranslationService to this:

def receive = {
      case TranslationRequest(text) =>
        for (replyTo <- self.sender) {
          val aggregator = actorOf(new Aggregator(replyTo)).start()
          translator.tell(text, aggregator)
          counter.tell(text, aggregator)
        }
    }

It creates a new actor, Aggregator, which is used as sender for the one-way tell to translator and counter. This will not block at all. The Aggregator looks like this, and doesn’t consume any resources until the response from the workers are handled.

class Aggregator(replyTo: ActorRef) extends Actor {
    self.dispatcher = backendDispatcher
    self.lifeCycle = Supervision.Temporary
    self.receiveTimeout = Some(1000)

    var textResult: Option[String] = None
    var lengthResult: Option[Int] = None

    def receive = {
      case text: String =>
        textResult = Some(text)
        replyWhenDone()
      case length: Int =>
        lengthResult = Some(length)
        replyWhenDone()
      case ReceiveTimeout =>
        self.stop()
    }

    def replyWhenDone() {
      for (text <- textResult; length <- lengthResult) {
        replyTo.tell(TranslationResponse(text, length))
        self.stop()
      }
    }
  }

The same thing can be done in the frontend. As an additional optimization, the frontend and backend can share the same dispatcher with 8 threads. With these changes we can run JMeter with 4 client threads and the average response time is 102 ms.

Another solution is to use Akka futures, but that is another story.

The conclusion is that you should be careful with blocking operations and don’t be afraid of creating temporary actors that continue the message flow.

Full source code for the example above is available at https://github.com/patriknw/akka-playground/tree/master/web-to-backend.  

For more about Akka, visit the Typesafe web site and the Akka community project site.

comments powered by Disqus
Browse Recent Blog Posts