Blog

August 17, 2011

Akka: The Future is here!

August 17, 2011

by Viktor Klang

Futures are a neat way to use multiple cores without having to worry about managing specific threads or tasks. If you have some work that you want to get done while you get on with something else, then you simply create one or more Future(s) and give them the work to do in the form of a function that returns a result. Later you can test the Future for completion and pick up the result. An elegant way to get work done that you need in the Future!

Akka, part of the Typesafe Stack, has had a Future implementation for a long time. It’s so useful we decided to put a major effort into enhancing the capability, making it faster, more scalable and more intuitive to use. Key parts of the underpinning code used locks and blocking to ensure the integrity of the multi-threaded execution. These do not scale well in multi processor environments, as processors must wait while contentions are resolved. So we spent a lot of time to create a not-so-obvious non-blocking alternative. With the new implementation, things scale better and execution time is faster.

Comprehensions are the way we Scala programmers tend to think about doing things on collections. It is one of the function styles that leads to concise and easy to read code. We re-thought the Futures library to facilitate using Comprehensions and the more typical Scala “foreach” pattern, You can now handle sequences in an intuitive non-blocking way. We have found it a whole lot more powerful way to express our intentions.

Now let’s take a detailed look how the changes look at the code level:

  1. On akka.dispatch.Futures there are new, Java API methods named "future" to use Callables that are executed on another thread to produce the result in a Future. This means that they are non-blocking and you have the option to specify which Dispatcher will execute your callable.
  2. Futures.awaitAll has been deprecated in favor of: futures.foreach(_.await)
  3. Futures.awaitOne has been deprecated in favor of: firstCompletedOf(futures).await. Because firstCompletedOf is non-blocking, and await is blocking, it's better if this is not encouraged by the API
  4. Futures.awaitMap has been deprecated in favor of: futures map { f => fun(f.await) }. This is because Future now supports a non-blocking map, which wasn't the case before, so you'd have to await before mapping
  5. There is now a Java API version of Futures.reduce.
  6. New! The Futures.sequence method takes a Traversable[Future[T]] and non-blockingly returns a Future[Traversable[T]].
  7. New! The Futures.traverse method transforms a Traversable[A] to a Future[Traversable[B]] using a provided function from A to Future[B]. This is a great way of performing "map" in parallel.
  8. Future has now transformed into being isomorphic to dataflow variables. In recognition of this, we've added the key methods of DataFlowVariable to Future. This includes:
    apply:
    val f: Future[Int] = ...
    val i = f() // Logically f.await.resultOrException.get
        
    <<:
    val f: CompletableFuture[Int] = ... // Write side of Future, compare with the concept of Promise, or DataFlowVariable
    f << 5 // Sets the value 5 into f, since CompletableFutures can only be written once, they act like dataFlowVariables
    f << otherF // You can also set the value to the value of another Future, this will be done when that Future is completed
                // same behavior as CompletableFuture.completeWith(f: Future[...])
    The above two methods are intended for usage within the Future.flow method, like this:
    import Future.flow
    def add(a: Future[Int], b: Future[Int]): Future[Int] = flow { a() + b() }
    This uses Continuation Passing Style with Delimited Continuations under the hood, to be able to write code that looks like it's blocking but it reality it isn't.
  9. New! Future now sports a couple of new methods:
    get: //Warning! Blocking
      val f: Future[Int] = ...
      f.get // Semantically f.await.resultOrException.get but for use in a non "flow" context
        
    value:
      val f: Future[Int] = ...
      val v: Option[Either[Throwable, Int]] = f.value // The current value of the Future, None if no value, and Left(error) or Right(result) otherwise
    
    onResult:
      val f: Future[Any] = ...
      f onResult {
        case "foo" => doSomething
        case 6 => doSomethingElse
        case SomeRegex(param) => doSomethingOther
        case _ => doAnything
      } // Applies the specified partial function to the result of the future when it is completed with a result
    
    recover:
      val f: Future[Any] = ...
      val result = f recover {
        case n: NumberFormatException => 0
      } // Returns a new future that when the first future has been completed with an exception, will contain the transformed result"     
    
    onException:
      val f: Future[Any] = ...
      f onException {
        case npe: NullPointerExcep => doSomething
        case 6 => doSomethingElse
        case SomeRegex(param) => doSomethingOther
        case _ => doAnything
      } // Applies the specified partial function to the result of the future when it is completed with an exception
    
    onTimeout:
      val f: Future[Any] = ...
      f onTimeout {
         future => doSomethingWhenTimeout(future)
      }
  10. New! Future is now fully monadic so it can be used in for-comprehensions; all methods: map, flatMap, filter and foreach are non-blocking

These are exciting times. With Typesafe and Akka, the Future is here!

comments powered by Disqus
Browse Recent Blog Posts