Typesafe Activator

Reactive Maps

Reactive Maps

Typesafe
Source
August 12, 2014
akka playframework scala reactive

This template and tutorial shows how the Typesafe Reactive Platform can be used to implement scalable, resilient, responsive event driven apps. The application shows the location of every user currently connected on a map, updated in real time. Akka clustering with distributed pub sub allows it to scale horizontally, we tested with 50000 simulated concurrent users, each sending position updates every second, on 10 nodes on Google Compute Engine. The in context tutorial guides you through design features and code of the app, and then gently introduces you to some of the more detailed topics with instructions for adding new features.

How to get "Reactive Maps" on your computer

There are several ways to get this template.

Option 1: Choose reactive-maps in the Typesafe Activator UI.

Already have Typesafe Activator (get it here)? Launch the UI then search for reactive-maps in the list of templates.

Option 2: Download the reactive-maps project as a zip archive

If you haven't installed Activator, you can get the code by downloading the template bundle for reactive-maps.

  1. Download the Template Bundle for "Reactive Maps"
  2. Extract the downloaded zip file to your system
  3. The bundle includes a small bootstrap script that can start Activator. To start Typesafe Activator's UI:

    In your File Explorer, navigate into the directory that the template was extracted to, right-click on the file named "activator.bat", then select "Open", and if prompted with a warning, click to continue:

    Or from a command line:

     C:\Users\typesafe\reactive-maps> activator ui 
    This will start Typesafe Activator and open this template in your browser.

Option 3: Create a reactive-maps project from the command line

If you have Typesafe Activator, use its command line mode to create a new project from this template. Type activator new PROJECTNAME reactive-maps on the command line.

Option 4: View the template source

The creator of this template maintains it at https://github.com/typesafehub/ReactiveMaps#master.

Option 5: Preview the tutorial below

We've included the text of this template's tutorial below, but it may work better if you view it inside Activator on your computer. Activator tutorials are often designed to be interactive.

Preview the tutorial

The world is going reactive

Not long ago, response times in the seconds were considered appropriate. Browser refreshes were the norm in web applications. Systems would go down for hours of maintenance, or even be rebooted nightly, and this was ok because people only expected the systems to be up during business hours. Applications didn't have to scale because they didn't have big user bases. And the complexity requirements put on web applications meant that typical requests could easily be handled by a thread per request model.

Things are changing though. People expect web applications to react instantly. They expect them to be up all the time, while the applications are moving into the cloud, where failures are not exceptional, but rather are the norm, and so applications need to react to failure. Load on a web application can peak unpredictably, to be many orders of magnitude greater than normal, and so applications need to react to load and scale out. The complexity of business requirements means that in order to respond quickly to requests, things must be processed in parallel, reacting to events rather than waiting so as to utilise resources as efficiently as possible.

This application is an example of how to implement the tenets of the Reactive Manifesto.

It uses Play, combined with the latest in client side technologies to implement a reactive user interface. It uses Akka to provide horizontally scalable and resilient message passing and data management.

Browse the app

Before jumping into the code, let's see the app in action. Go to the Run tab, and start the application if it's not already started. Then visit it at: http://localhost:9000.

You will be presented with a screen asking for your email address. After entering it and submitting, you should see a map, and you should be able to find yourself on that map (this may take a short amount of time due to the way data flows through summary regions in the system, the further you zoom out the less realtime the app gets).

If you zoom in on North Carolina, you should see some bots walking around. These bots are simulating other users, the data used to generate their paths is taken from hiking trail data that was grabbed from HikeWNC.

System Overview

The system can be broadly divided into three parts. The first part is the client side app. This is written in CoffeeScript, and runs in the browser. The second part is the web front end, this is a Play application that serves web requests coming in. The third part is the Akka backend, which manages the distribution of data across backend nodes, and the publishing and subscribing of events.

In the demo you're seeing now, the Play web front end and the Akka backend are running as one application, but in a production scenario, they would be run separately, allowing fine grained control of resources between the front and backend.

System Overview - Client

The client talks to the web front end using WebSockets:

All the communication above is fire and forget, after sending a user moved event, the client doesn't need anything in response, after sending a viewing area message, the client might get many messages, or maybe none at all, depending on whether there are any users in that area, and after the server sends position updates, it expects nothing in return from the client.

This differs from many traditional client server where clients make a request and expect a response. In a reactive application, much of the communication will not be request/response based, because the way reactive applications are architected is that data flows to consumers as it becomes available, and consumers of the data react to it, they don't ask for it.

For this reason, WebSockets makes for a perfect transport for client server communication in a reactive application, since it allows events to be passed with low overhead, not needing to wait for a response, and facilitates reacting to events from the server.

System Overview - Backend

Before explaining the backend interface, we need to have a short lesson in geo based systems. A naive way to create the reactive maps application would be to send all data from all users to every connected user. This might work if there are only 10 users connected, or maybe even 100. At 1000, each user is going to be downloading megabytes of updates per second - it is not going to scale.

To manage this, we break the earth up into regions. There are many different ways to do this, but in our app we're using the simplest to understand, we flatten the map out into a rectangle, and then divide it into many smaller rectangles. How many rectangles is configurable, but we have defaulted this to 16 million. Because the earth is not a reactangle, but is actually a sphere, these rectangles don't all cover the same area, at the equator each one is a few kms wide, at the poles, each rectangle is only a few metres wide. But each rectangle is a constant number of degrees in longitude wide, and degrees in latitude high, so transforming latitude and longitude coordinates to regions is therefore a straightforward equation.

The web front end talks to the backend using Akka clustering:

Actors in an Akka cluster may talk to each other without knowing whether they are on the same node or different nodes. In the above diagram, when a frontend node receives a position update from the client, the region responsible for that position may be on the same node, or may be on a different node. The web frontend doesn't need to worry, all it needs to know is which region to send to, and Akka will work out how to get the message to the right node.

Akka distributed PubSub messaging is used to publish location updates to the frontend. When the web frontend gets a new viewing area from the client, it works out which regions cover that viewing area, and then subscribes to updates from each of those regions. Whether those regions are on the same node or on different nodes is transparent, Akka ensures that the right messages get to the right subscribers on the right nodes. When a region has a new update to publish, it pushes it to the PubSub manager, which then pushes the messages to the right subscribers on the right nodes.

Finally, regions get summarised into summary regions, and these summary regions are used so that clients that are viewing large areas at once aren't consuming too much data. Lowest level regions and higher level summary regions send updates to their higher level summary region, which aggregates and publishes the information. When the client requests a viewing area that contains too many regions, it subscribes instead to updates from summary regions.

The code - Client side

Now that we've got a broad overview of the system architecture, let's start looking at the code. We'll start off with tracing the code through from what happens when a user's GPS enabled device sends an update.

The entry point to this event flow is in gps.coffee. This file contains a class for for handling the GPS integration of the app. It uses the HTML5 Geolocation API to watch for location updates from the browser.

The first thing you'll find in this, and most other CoffeeScript files in this app, is a call to define. This is a RequireJS call, used to define a module. RequireJS allows JavaScript to be developed in a modular way, which is important for rich client side apps that heavily use JavaScript like this one. At the bottom of the file you can see a return statement returning the Gps class that we've declared, this means anything that imports our module will get that class back.

The bulk of the code in this file is actually dealing with ensuring that neither too few, nor too many location updates are sent to the server. It ensures that a location update is sent at least every 10 seconds, but no more frequently than every 2 seconds. The most pertinent code that we're interested in now though is the navigator.geolocation.watchPosition(...) call, this is the HTML5 Geolocation API call to watch for GPS updates, and also the @ws.send(...) call, this sends a user-moved event as JSON through the WebSocket, with the users current position.

The position field of this event is formatted using the GeoJSON standard, which you'll soon see is used throughout the application.

So we can now see how location updates are obtained from the browser. But where does the WebSocket come from that it's getting sent down? You'll see that the constructor of the Gps class accepts the WebSocket as a parameter. This constructor is called from the mainPage.coffee module. In this module, you can see that in the define call it declares a few dependencies, one being the ./gps module that we just looked at.

Scroll down to the connect method, and you'll see the following code:

@ws = new WebSocket($("meta[name='websocketurl']").attr("content") + email)

This is the code that creates the WebSocket, and a few lines below that, in the onopen callback, you can see where we are passing the WebSocket to the Gps constructor. The URL for the WebSocket is coming from a meta element in the page named websocketurl.

Open main.scala.html. This is the template where that meta element is defined. The content is a call to Plays reverse router. Play has a configuration file called routes, this file contains all the configuration for how incoming requests are routed to their corresponding actions in Play. In addition to providing this forward routing, Play also generates a reverse router, that code such as this template can call, and it will return the URL that can be used to reach that route. This means that your path information is kept in one place - in your routes file, and everything else in your application can depend on it.

In the routes file, you can see that the /stream/:email path is routed to controllers.Application.stream, so the reverse router call @routes.Application.stream("").webSocketURL() will return us that path.

You can read more about routing in Play Framework here.

So now we've seen how the client side portion of the app locates and connects to the WebSocket, and how it uses that WebSocket to push location updates to the server.

The code - Web Frontend

In the routes file, we saw how the WebSocket route was defined, and how it gets routed to the controllers.Application.stream method. Let's open that class now, Application.scala.

Looking at the stream method, the first thing to notice is that it is declared to be a WebSocket action that works with ClientEvent messages. These messages are defined in ClientConnection.scala, we can see our three types of messages there, UserMoved, ViewingArea and UserPositions.

Below the declaration of the message types, we can see formats for serialising these events to and from JSON, and for formatting the WebSocket frames. We won't go into too much detail here, you can read more about Play's JSON support here.

You can see back in Application.scala that we have told Play to use an actor to handle the WebSocket. This means our deserialized ClientEvent messages are sent to this actor, while when this actor sends ClientEvent messages to the passed in upstream actor, these messages will be serialized and sent over the WebSocket to the client.

Back in ClientConnection.scala, beneath the event message types, you will find the actual actor that handles the client connection. The receive method shows the handling of the different message types that this actor will receive. We'll focus on just one of these message types.

Each time a UserMoved event is received, it's translated to a UserPosition object, and sent to the RegionManagerClient. This class is responsible for sending user position updates to the right node for the region that that position lives in. You can see in that class that the first thing it does is look up the regionId, and then it creates a UserPositionUpdate message, and sends that message to a router.

But how does that router get it to the right node? The configuration for that router can be found in application.conf. Scrolling down to the configuration in the akka section, you'll see this:

/regionManagerClient/router {
  router = consistent-hashing
  nr-of-instances = 1000
  cluster {
    enabled = on
    routees-path = "/user/regionManager"
    allow-local-routees = on
    use-role = "backend-region"
  }
}

The routing to the node responsible for a region is done with a cluster aware consistent hashing router. The region identifier is used as key for the consistent hashing. This means that updates for a region are routed to the backend node responsible for that region. When the number of nodes in the cluster changes the responsibility for a region may change. In this application the states of the regions don't have to be migrated when this happens. Updates for some regions are routed to a new backend node and old data will expire. For a short period the region points (counts of users) might be slightly inaccurate, but that is acceptable for this application.

The hash code used to route messages is specified by the ConsistentHashable interface, you can see that the UpdateUserPosition message implements this interface, and defines the hash key to be the region ID that the update is for.

If you're interested in learning the full details of Akka routing and how to configure it, you can read about Routing and Cluster Aware Routers in the Akka documentation.

The code - Backend

We've seen how the web frontend receives GPS user position events and then routes them to the right backend node in the Akka cluster. Now let's find out what happens with the events when it reaches the backend node.

In the configuration for the router that we saw before, we could see this config item defined:

routees-path = "/user/regionManager"

/user is the namespace for all user defined actors (as opposed to actors defined by the system itself), so this says that the messages get sent to a user defined actor called regionManager, which is implemented by the RegionManager class.

The region manager is responsible for managing all the regions that belong on that node. If it gets some data for a region, and an actor for that region doesn't exist yet, it creates it. Once it has ensured that an actor exists for that region, then it sends the user position to that region.

The actor class that represents a region is called Region. This class has a map called activeUsers, and when it receives the user position, it adds that users position to the map.

The code - Subscriptions between frontend and backend

A client displays a section of the map, which is decorated with live markers of other users in that area. How are those user positions published to the client?

When the user zooms or changes map position the client sends a ViewingArea event to the server, which ends up in PositionSubscriber via the controller. The PositionSubscriber works out which regions cover that viewing area, and then subscribes to updates from each of those regions.

The published updates of user positions come from the backend Region actors. The thing that ties the publisher and subscriber together is the named topic, which in this case is the region id.

In a similar way the PositionSubscriber may decide to subscribe to summary regions, and then the published region points comes from the SummaryRegion actors.

The publish/subscribe mechanism in Akka is a registry of subscribers that is replicated to members in the cluster. There is no central hub or broker. When publishing a message to a named topic it sends the message to nodes with subscribers of that topic, and then delivers the message to all subscribing actors on that node. The message is sent over the wire only once per node that has at least one subscriber of the topic. The decoupling of publisher and subscriber makes it easy to add and remove nodes in the cluster as needed.

Changes of subscribers are disseminated in a scalable way to other nodes with a gossip protocol. The registry is eventually consistent, i.e. changes are not immediately visible at other nodes, but typically they will be fully replicated to all other nodes after a few seconds.

The code - Summary

At this stage of the tutorial, we've seen:

  • How the browser gets the users position from their GPS enabled device
  • How the browser sends the users position to the web front end via WebSockets
  • How the WebSocket is routed to the corresponding action
  • How the WebSocket action sends the users position to an Akka router
  • How the Akka router routes the users position to the correct node for the region in the Akka cluster
  • How the node receives the users position and sends it to the right actor that manages that position
  • How the backend nodes publishes updates to frontend subscribers

And now for something completely different.

Add more nodes

So far you are running the application in one single JVM, hosting both frontend and backend. Let's try to add more backend and frontend nodes to the cluster.

Open a terminal and change directory to the root directory of the reactive-maps application. Start a backend node with the following command (on one line):


    <path to activator dir>/activator 
      -Dakka.remote.netty.tcp.port=0 
      -Dakka.cluster.roles.1=backend-region 
      "run-main backend.Main"
    

This runs the backend.Main class and overrides the configuration to bind Akka remoting to a random available port and use the "backend-region" cluster role for this node.

If you take a look at the log in Run you can see that the new node joined the cluster. The new node knows how to join the cluster because the first node running on port 2552 is configured as initial contact point in the 'seed-nodes' property in the application.conf. You can read more about Akka Clustering in the documentation.

You can repeat the command in new terminal windows to add more backend nodes.

You can also add more simulated users with the following command (on one line):


    <path to activator dir>/activator 
      -Dakka.remote.netty.tcp.port=0 
      -Dakka.cluster.roles.1=frontend 
      -DreactiveMaps.bots.totalNumberOfBots=500 
      "run-main backend.Main"		
	

The following command (on one line) will start another frontend node listening on HTTP port 9001:


    <path to activator dir>/activator 
      -Dhttp.port=9001
      -Dakka.remote.netty.tcp.port=0
      -Dakka.cluster.roles.1=frontend 
      run
    

Try the added frontend in a new browser window: http://localhost:9001

Adding a new feature

Now that we've had a detailed look at some of the system, let's try and add a new feature. Until now, our view of the data has been region based - all data is associated with and stored in an actor for a region. This allows us to shard regions over multiple nodes, allowing efficient access to the data by node.

We're going to add functionality that is user based. We'll use exactly the same methods for scaling as for the region based data, so we can see how to build such a system from scratch.

We'll start off with a simple implementation that only works when there is one node, implementing first the backend, then the client side. Then we'll demonstrate how this implementation can be scaled out to shard the data across many nodes. Finally we'll show some techniques for ensuring data consistency when nodes are introduced into or removed from the cluster.

The new feature that we'll add is tracking the distance that a user has travelled. We'll make the client fetch this data when a user clicks on a marker.

Handling the maths

The first thing we need to do is add a function for calculating the distance between two points. There are many such formulas that can be used to do this, but a simple general purpose one which will suit our purposes is called the haversine formula.

Let's create a new method called distanceBetweenTwoPoints in GeoFunctions:

  def distanceBetweenPoints(pointA: LatLng, pointB: LatLng): Double = {
    import Math._
    // Setup the inputs to the formula
    val R = 6371009d // average radius of the earth in metres
    val dLat = toRadians(pointB.lat - pointA.lat)
    val dLng = toRadians(pointB.lng - pointA.lng)
    val latA = toRadians(pointA.lat)
    val latB = toRadians(pointB.lat)

    // The actual haversine formula. a and c are well known value names in the formula.
    val a = sin(dLat / 2) * sin(dLat / 2) +
    sin(dLng / 2) * sin(dLng / 2) * cos(latA) * cos(latB)
    val c = 2 * atan2(sqrt(a), sqrt(1 - a))
    val distance = R * c

    distance
  }

User meta data register

The first thing we need to implement is a user meta data register. As a first implementation, we'll write one actor that will store all user meta data, we'll call it UserMetaData. Create a new file called app/backend/UserMetaData.scala now, and start off by adding the following:

package backend

  import akka.actor.{Props, Actor}
  import play.extras.geojson.LatLng

  object UserMetaData {
    case class GetUser(id: String)
    case class User(id: String, distance: Double)
    case class UpdateUserPosition(id: String, position: LatLng)

    val props = Props[UserMetaData]
}

Some of the imports are unused for now, but the important thing to see here is the message types we've defined. This actor will receive UpdateUserPosition messages to update the user position, and will receive GetUser messages, and send back User messages to the sender.

Now implement the actor itself:

class UserMetaData extends Actor {

  import UserMetaData._

  val settings = Settings(context.system)

  var users = Map.empty[String, (LatLng, Double)]

  def receive = {
  }
}

Our actor depends on the settings (which provides the GeoFunctions class we already edited), and has a map that maps the userId to a tuple of the last position the user was seen at, and the distance they've travelled. We'll now implement handling the GetUser message in the receive method:

    case GetUser(id) =>
      users.get(id) match {
        case Some((_, distance)) => sender ! User(id, distance)
        case None => sender ! User(id, 0)
      }

You can see that if the user wasn't found, we just return 0. Now implement handling the UpdateUserPosition message:

    case UpdateUserPosition(id, position) =>
      val distance = users.get(id) match {
        case Some((lastPosition, lastDistance)) =>
          lastDistance + settings.GeoFunctions.distanceBetweenPoints(lastPosition, position)
        case None => 0
      }

      users += (id -> (position, distance))

You can see here we're using the distanceBetweenPoints method we implemented earlier, updating the distance if we have a last position to compare it to, and updating the map with the new user data.

Send user position updates

Now we need to implement the code that sends the user position updates to this actor. This is a little tedious because there a two things that will do this, the web client, and the bots. We'll start with the web client.

The web client is initialised by a Play plugin called Actors. In this class, add a new lazy val to initialise the user meta data actor:

  private lazy val userMetaData = system.actorOf(UserMetaData.props, "userMetaData")

Now provide access to this in the Actors object in the same way that regionManagerClient is made accessible:

  def userMetaData(implicit app: Application) = actors.userMetaData

You may need to add imports to the class to import the backend.UserMetaData class.

The class that will ultimately use this actor is the ClientConnection.scala actor. Open it, and modify the constructor of ClientConnection to accept it as a parameter:

class ClientConnection(email: String, upstream: ActorRef, regionManagerClient: ActorRef,
  userMetaData: ActorRef) extends Actor {

Now when the client sends a UserMoved message through the web socket, in addition to sending a message to the regionManagerClient, we also want to update the user meta data:

import backend.UserMetaData.UpdateUserPosition
      userMetaData ! UpdateUserPosition(email, point.coordinates)

Since we added a constructor parameter to the ClientConnection actor, we also need to update its props method on the companion object to also accept and pass the user meta data actor:

  def props(email: String, upstream: ActorRef, regionManagerClient: ActorRef,
      userMetaData: ActorRef): Props = {
    Props(new ClientConnection(email, upstream, regionManagerClient, userMetaData))
  }

And finally, we need to pass the actor in to the place that uses this, the Application controller, using the userMetaData method on Actors that we just wrote:

  def stream(email: String) = WebSocket.acceptWithActor[ClientEvent, ClientEvent] { _ => upstream =>
    ClientConnection.props(email, upstream, Actors.regionManagerClient, Actors.userMetaData)
  }

The web front end is set to go, but the bots also need to be updated. As with ClientConnection, add a constructor parameter for the user meta data to GeoJsonBot.scala:

class GeoJsonBot(trail: LineString[LatLng], offset: (Double, Double), userId: String,
  regionManagerClient: ActorRef, userMetaData: ActorRef) extends Actor {

And immediately after the bot sends a position update to the regionManagerClient, make it also send one to userMetaData:

import backend.UserMetaData.UpdateUserPosition
      userMetaData ! UpdateUserPosition(userId, userPos.position)

Update the props method:

  def props(trail: LineString[LatLng], offset: (Double, Double), userId: String, regionManagerClient: ActorRef,
      userMetaData: ActorRef): Props =
    Props(classOf[GeoJsonBot], trail, offset, userId, regionManagerClient, userMetaData)

Now this is called by BotManager, modify the props method, constructor parameter and the the call that creates the bot:

  def props(regionManagerClient: ActorRef, userMetaData: ActorRef, data: Seq[URL]): Props =
    Props(classOf[BotManager], regionManagerClient, userMetaData, data)
class BotManager(regionManagerClient: ActorRef, userMetaData: ActorRef, data: Seq[URL]) extends Actor {
                    context.actorOf(GeoJsonBot.props(route, offset, userId, regionManagerClient, userMetaData))

BotManager is initialised in two places, once in the web front end by Play in the Actors class, so update that to pass the userMetaData:

      system.actorOf(BotManager.props(regionManagerClient, userMetaData, findUrls(1)))

And finally, we need to update the Main class, this is used when running a non Play node in the cluster. Initialise the UserMetaData actor after creating RegionManagerClient:

      val userMetaData = system.actorOf(UserMetaData.props, "userMetaData")

Now pass the actor to the BotsManager props methed:

      system.actorOf(BotManager.props(regionManagerClient, userMetaData, findUrls(1)))

Exposing user data to the web

The users distance is going to be requested by the user on an as needed basis. Since the action is triggered by the user and the user expects a response, it makes sense in this case to use a simple HTTP request to get the data. So we're going to write a Play action to get the user meta data.

Create a new Scala class called controllers.UserController:

package controllers

import akka.pattern.{AskTimeoutException, ask}
import akka.util.Timeout

import play.api.mvc._
import play.api.libs.concurrent.Execution.Implicits._
import play.api.libs.json.Json
import play.api.Play.current

import scala.concurrent.duration._

import actors.Actors
import backend.UserMetaData._

object UserController extends Controller {

}

There are quite a number of imports here, but don't be put off! The first thing we will do is define the return format for the user meta data. We're going to return JSON, so we need something to convert instances of User into JSON. We can do this by implementing a JSON Writes for User. Play's JSON API provides a handy macro that does this for you at compile time, so you can have type safe JSON converters with a minimum of code. Create this writes instance in the UserController object:

implicit val userWrites = Json.writes[User]

Since it is declared to be implicit, it will be used whenever we call a method that requires an implicit writes for User, which the Json.toJson method does in the next block of code that we'll write:

  def get(id: String) = Action.async {
    implicit val timeout = Timeout(2.seconds)

    (Actors.userMetaData ? GetUser(id))
      .mapTo[User]
      .map { user =>
        Ok(Json.toJson(user))
      } recover {
        case _: AskTimeoutException => NotFound
      }
  }

Here we have declared the action itself. It's an asynchronous action, meaning it returns a future. The action also takes a parameter, the id of the user. We'll see later how that parameter is passed to the action.

As a first step we have defined a timeout. In the code below that, we use the ask pattern, represented by the ? operator, to ask the user meta data action for the user. Akka will not let you ask for something without specifying a timeout, the ? parameter takes an implicit timeout, so we've defined that to be two seconds.

Having asked the actor for a user, we get back a future, and the first thing we do is map it to the type of User. Then we map that to something that will generate our response, we're returning an Ok response, with the body being the user serialised to JSON.

Finally, we also want to recover from a timeout. We assume that if it timed out, it means the user could not be found.

The last thing we need to do on the server side is declare how requests will be routed to this action. We do this in the routes file:

GET        /user/:email         controllers.UserController.get(email)

You can see that we are defining a dynamic route with an email parameter, signalled by the colon before the parameter name. Then we invoke the action we just created, passing that email parameter as the id of the user.

Consuming user data on the client side

Now that we've got the server side ready, we can write the client side code to consume the new action we just created. Let's start with creating a new module, app/assets/javascripts/services/userInfo.coffee, that will make the AJAX call. Although this module will be very simple, it's best practice to split out the making of AJAX calls from the business logic of your code, so you can easily mock and test.

define ["jquery"], ->
  {
    get: (email) ->
      $.getJSON("/user/" + email)
  }

Our userInfo module depends on jquery, and simply provides one method, get, which calls the action. jQueries getJSON method returns a promise of the json, so we can consume that by attaching then or done callbacks to the returned promise.

Now in marker.coffee, we want to use this service to look up the user info. So we will add the userInfo module as a dependency. If you're familiar with requireJS, you might notice that we're not using a relative path name here, this is because we are using path aliases that we configure elsewhere, this also makes mocking dependencies simpler, as it decouples modules from their implementation.

define ["leaflet", "markerRenderer", "userInfo"], (Leaflet, renderer, userInfo) ->

In the constructor of the Marker class, after attaching the popup to the marker, we want to bind to the click event of the marker so that we can update the popup with the users distance each time the user clicks on it:

        @marker.bindPopup(renderer.renderPopup(userId))

        @marker.on "click", =>
          userInfo.get(userId).done (user) =>
            @marker.getPopup()
              .setContent(renderer.renderPopup(userId, user.distance)).update()

And now we want to handle that additional distance parameter that we've passed to renderPopup, in markerRenderer.coffee:

  renderPopup: (userId, distance) ->
    popup = "<p><img src='http://www.gravatar.com/avatar/" +
      md5(userId.toLowerCase()) + "'/></p><p>" + escapeHtml(userId) + "</p>"
    if (distance)
      popup + "<p>Travelled: " + Math.floor(distance) + "m</p>"
    else
      popup

Finally, since we've defined the new userInfo module, we need to declare what the path for that module is. This can be done in main.coffee:

    userInfo: "./services/userInfo"

And now we should be good to go, refresh your browser, and try clicking on a marker to see if the distance is rendered. If you're looking at the bots walking around North Carolina, you might see that they are moving at hundreds of metres per second - these bots are designed to provide interesting data, not necessarily realistic data.

Client side testing

After making the previous changes to the client side logic, we've left some of the client side tests in a failing state. Let's fix them. Start by running them to see that they are failing, by going to the test tab. Run the tests, you should see some of them are failing.

We're using mocha to write tests, in combination with Squire.js to mock out Require.js dependencies. We've also been very careful to design our client side code in such a way that the DOM manipulation code, and any code doing AJAX or WebSockets or using any other browser based APIs are separated from the business logic. This allows us to comprehensively test the important code.

Open MarkerSpec.coffee. This is where our failing tests are. Since we've added a new user info service that Marker.coffee depends on, we need to create a mocked version for this:

class MockUserInfo
  users: {}
  get: (userId) ->
    new MockPromise({
      distance: @users[userId]
    })

Now to test that the marker correctly passes the looked up user distance to marker renderer, we'll modify the MockMarkerRenderer.renderPopup method to "render" it:

  renderPopup: (userId, distance) ->
    if distance
      userId + ":" + distance
    else
      "Popup " + userId

Now lets modify the test setup code to instantiate and mock the mock user info service that we created:

    # Create mocks
    leaflet = new MockLeaflet()
    renderer = new MockMarkerRenderer()
    userInfo = new MockUserInfo()

    # Mockout require js environment
    new Squire()
    .mock("markerRenderer", renderer)
    .mock("leaflet", leaflet)
    .mock("userInfo", userInfo)
    .require ["javascripts/map/marker"], (Marker) ->
        test({
          leaflet: leaflet,
          userInfo: userInfo,
          renderer: renderer,
        }, Marker)
        done()

Now try running the tests again. They should pass. Finally, let's add a new test that tests that the popup is updated with the distance when the marker is clicked:

  it "should update the popup with the current distance when clicked", testMarker (deps, Marker) ->
    marker = new Marker(new MockMap(), single, new LatLng(10, 20))
    deps.userInfo.users["userid"] = 50
    marker.marker.onClick()
    assert.equal("userid:50", marker.marker.popup.content)

Run the tests to ensure the new test also passes.

Scaling out

So now we've implemented something that works on a single node. However, this application has been designed to scale to millions of users on hundreds of nodes - a feature that only works on a single node will not suffice.

The system should also be resilient to system crashes and restarts. To address these issues we will make it persistent and distribute the actors over the backend nodes in the cluster.

Make UserMetaData event sourced

The UserMetaData actor that we developed previously holds the data for all users, which is obviously not scalable. Instead, let's rewrite the actor to represent the data for one single user. We will use Akka Persistence to make it durable.

package backend

import java.net.URLDecoder
import scala.concurrent.duration._
import akka.actor.PoisonPill
import akka.actor.Props
import akka.actor.ReceiveTimeout
import akka.contrib.pattern.ShardRegion
import akka.persistence.EventsourcedProcessor
import play.extras.geojson.LatLng

object UserMetaData {
  case class GetUser(id: String)
  case class User(id: String, distance: Double)
  case class UpdateUserPosition(id: String, position: LatLng)

  val props = Props[UserMetaData]

  sealed trait Event
  case class FirstObservation(position: LatLng) extends Event
  case class Moved(to: LatLng, distance: Double) extends Event

  private case class State(position: Option[LatLng], distance: Double) {
    def updated(evt: Event): State = evt match {
      case Moved(to, d)          => copy(position = Some(to), distance = distance + d)
      case FirstObservation(pos) => copy(position = Some(pos))
    }
  }
}

class UserMetaData extends EventsourcedProcessor {

  import UserMetaData._

  val settings = Settings(context.system)
  val userId = URLDecoder.decode(self.path.name, "utf-8")
  private var state = State(None, 0.0)

  // passivate the entity when no activity
  context.setReceiveTimeout(30.seconds)

  override def receiveRecover: Receive = {
    case evt: Event => state = state.updated(evt)
  }

  override def receiveCommand: Receive = {
    case _: GetUser =>
      sender() ! User(userId, state.distance)

    case UpdateUserPosition(_, position) =>
      state match {
        case State(Some(lastPosition), _) =>
          val d = settings.GeoFunctions.distanceBetweenPoints(lastPosition, position)
          persist(Moved(position, d)) { evt =>
            state = state.updated(evt)
          }
        case State(None, _) =>
          persist(FirstObservation(position)) { evt =>
            state = state.updated(evt)
          }
      }
      
    case ReceiveTimeout => 
      context.parent ! ShardRegion.Passivate(stopMessage = PoisonPill)
  }
}

Akka Persistence takes an event sourced approach and stores the changes that build up its current state. In this case the current position and the total distance is stored by the events FirstObservation and Moved.

It is recommended to encapsulate the state in an immutable class as illustrated in the UserMetaData.State class. It knows how to create a new State instance when applying the changes represented by domain events. It is important that the state updates are free from side effect, because they are applied when the actor is recovered from the persisted events. See receiveRecover.

Add sharding

Akka cluster Sharding is useful when you need to distribute actors across several nodes in the cluster and want to be able to interact with them using their logical identifier, but without having to care about their physical location in the cluster, which might also change over time.

To use the UserMetaData actor with cluster sharding we must be able to extract the identifier from the messages and define a hash function for the identifier. The hash function is used to group actors into shards, potentially running on different nodes. Those functions can be defined in the companion object like this:

  sealed trait Command {
    def id: String
  }
  case class GetUser(id: String) extends Command
  case class User(id: String, distance: Double) extends Command
  case class UpdateUserPosition(id: String, position: LatLng) extends Command

  val idExtractor: ShardRegion.IdExtractor = {
    case cmd: Command => (cmd.id, cmd)
  }

  val shardResolver: ShardRegion.ShardResolver = msg => msg match {
    case cmd: Command => (math.abs(cmd.id.hashCode) % 100).toString
  }

  val shardName: String = "UserMetaData"

To make the UserMetaData actors sharded in the cluster we need to register it to the ClusterSharding extension. This must be done on all nodes in the cluster, but we do it slightly different on the nodes with frontend role compared to backend because the actors must only be created on the backend nodes, and only proxied from frontend nodes. Add the following in Actors.scala

import akka.contrib.pattern.ClusterSharding
    if (Cluster(system).selfRoles.exists(r => r.startsWith("backend"))) {
      system.actorOf(RegionManager.props(), "regionManager")
      
      ClusterSharding(system).start(
        typeName = UserMetaData.shardName,
        entryProps = Some(UserMetaData.props),
        idExtractor = UserMetaData.idExtractor,
        shardResolver = UserMetaData.shardResolver)
    } else {
      ClusterSharding(system).start(
        typeName = UserMetaData.shardName,
        entryProps = None,
        idExtractor = UserMetaData.idExtractor,
        shardResolver = UserMetaData.shardResolver)
    }
  private lazy val userMetaData = ClusterSharding(system).shardRegion(UserMetaData.shardName)

And corresponding in Main.scala:

import akka.contrib.pattern.ClusterSharding

      ClusterSharding(system).start(
        typeName = UserMetaData.shardName,
        entryProps = Some(UserMetaData.props),
        idExtractor = UserMetaData.idExtractor,
        shardResolver = UserMetaData.shardResolver)

      ClusterSharding(system).start(
        typeName = UserMetaData.shardName,
        entryProps = None,
        idExtractor = UserMetaData.idExtractor,
        shardResolver = UserMetaData.shardResolver)
      val userMetaData = ClusterSharding(system).shardRegion(UserMetaData.shardName)

Setup the journal

When using Akka Persistence in a cluster we need a journal that is replicated or accessible from all nodes. In this sample we will use a shared LevelDB journal running on the node with port 2551. This is a single point of failure, and should not be used in production. A real system would use a distributed journal.

Add the configuration for the journal:


  persistence {
    journal.plugin = "akka.persistence.journal.leveldb-shared"
    journal.leveldb-shared.store {
      # DO NOT USE 'native = off' IN PRODUCTION !!!
      native = off
      dir = "target/shared-journal"
    }
    snapshot-store.local.dir = "target/snapshots"
  }

Add the file app/backend/SharedJournalHelper.scala with the following content:

package backend

import scala.concurrent.duration._
import akka.actor.ActorIdentity
import akka.actor.ActorPath
import akka.actor.ActorSystem
import akka.actor.Identify
import akka.actor.Props
import akka.pattern.ask
import akka.persistence.journal.leveldb.SharedLeveldbJournal
import akka.persistence.journal.leveldb.SharedLeveldbStore
import akka.util.Timeout
import akka.cluster.Cluster

object SharedJournalHelper {

  def startupSharedJournal(system: ActorSystem): Unit = {
    // Start the shared journal one one node (don't crash this SPOF)
    // This will not be needed with a distributed journal
    val storePort = 2552
    if (Cluster(system).selfAddress.port.get == storePort)
      system.actorOf(Props[SharedLeveldbStore], "store")
    // register the shared journal
    import system.dispatcher
    implicit val timeout = Timeout(10.seconds)
    val storePath = ActorPath.fromString(s"akka.tcp://${system.name}@127.0.0.1:$storePort/user/store")
    val f = (system.actorSelection(storePath) ? Identify(None))
    f.onSuccess {
      case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system)
      case _ =>
        system.log.error("Shared journal not started at {}", storePath)
        system.shutdown()
    }
    f.onFailure {
      case _ =>
        system.log.error("Lookup of shared journal at {} timed out", storePath)
        system.shutdown()
    }
  }

}

Initialize the shared journal in Actors.scala


    // This will not be needed with a distributed journal
    SharedJournalHelper.startupSharedJournal(system)

And corresponding in Main.scala:

    // This will not be needed with a distributed journal
    SharedJournalHelper.startupSharedJournal(system)

Run it again

We have now added scalability and resilience to the new user meta data feature. Try it again, by refreshing your browser at http://localhost:9000.

The files of the shared journal are saved in the target directory and when you restart the application the state is recovered. You can clean the state with:


<path to activator dir>/activator clean

Learn more about reactive design

Essential characteristic of a reactive application:

  • react to events - the event-driven nature enables the following qualities
  • react to load - focus on scalability by avoiding contention on shared resources
  • react to failure - build resilient systems with the ability to recover at all levels
  • react to users - honor response time guarantees regardless of load

Read more about how to build reactive applications in the Reactive Manifesto.

comments powered by Disqus