Typesafe Activator

Spray and Websocket interfaces to actors

Spray and Websocket interfaces to actors

cuali
Source
April 14, 2014
akka spray tcp websocket

This template implements three different interfaces to its Akka actors: HTTP request, plain socket and websocket.

How to get "Spray and Websocket interfaces to actors" on your computer

There are several ways to get this template.

Option 1: Choose akka-spray-websocket in the Typesafe Activator UI.

Already have Typesafe Activator (get it here)? Launch the UI then search for akka-spray-websocket in the list of templates.

Option 2: Download the akka-spray-websocket project as a zip archive

If you haven't installed Activator, you can get the code by downloading the template bundle for akka-spray-websocket.

  1. Download the Template Bundle for "Spray and Websocket interfaces to actors"
  2. Extract the downloaded zip file to your system
  3. The bundle includes a small bootstrap script that can start Activator. To start Typesafe Activator's UI:

    In your File Explorer, navigate into the directory that the template was extracted to, right-click on the file named "activator.bat", then select "Open", and if prompted with a warning, click to continue:

    Or from a command line:

     C:\Users\typesafe\akka-spray-websocket> activator ui 
    This will start Typesafe Activator and open this template in your browser.

Option 3: Create a akka-spray-websocket project from the command line

If you have Typesafe Activator, use its command line mode to create a new project from this template. Type activator new PROJECTNAME akka-spray-websocket on the command line.

Option 4: View the template source

The creator of this template maintains it at https://github.com/cuali/SprayEasterEggs#master.

Option 5: Preview the tutorial below

We've included the text of this template's tutorial below, but it may work better if you view it inside Activator on your computer. Activator tutorials are often designed to be interactive.

Preview the tutorial

The Goal

Some applications need to expose an API that will not only handle synchronous requests, but also open a websocket to asynchronously push new notifications to the visualization client and accept plain socket channels from other information feeders. This tutorial will explain how to implement an application that offers three different interfaces to its Akka actors: HTTP request, plain socket and websocket.

As an illustration, we will mimic an Easter eggs hunt, with one map for the various participants to hide their own egg and another map to find the hidden eggs.

The application will eventually present a visual interface such as the following screenshot:

Splitting up the API into services

Required resources

In order to organize our Easter eggs hunt, a deeper thought into our application will reveal the need for many resources.

  1. In order to hide an egg, we will need
    • a page to select a position on a map;
    • a JavaScript to send the new position of the hidden egg to the server.
  2. In order to present the map to find the hidden eggs, we will need
    • a page to present the eggs on a map;
    • a JavaScript to receive from the server the positions of hidden eggs and plot them on the map;
    • a set of markers to represent the hidden eggs.
  3. In order to present a common visual identity, we will need
    • a basic CSS to style both previous pages;
    • a page to embed both previous pages.
  4. In order to handle the server side magic, we will need some components already disclosed on the previous step of this tutorial.

Each of these resources could be considered as a single service. However, we basically need only two services: one to hide one single egg and one to find where the eggs were deposited.

Organizing the services

As previously shown, some of the resources are independent of these services and would be handled as common stuff. This oversimplification leaves a hole since we lost the plain socket interface and the websocket interface, called from both map pages through JavaScript.

The API will then be split into five services:

  1. the SocketService opens its own port to listen for plain socket connections, distinct from the following HTTP-based services;
  2. the FindService defines the routes to static resources (index.html and find.js) for the map;
  3. the HideService defines the routes to static resources (index.html and hide.js) for the map, as well as an HTTP-based (REST-like) webservice;
  4. the RootService handles the static routes to common stuff, as well as the routing to the above two services;
  5. the ReactiveServer websockets server is not (yet) a Spray service as the above four other services.

The ReactiveSystem application object is responsible for initializing the actor system, the websockets' server, the plain socket listener actor and the HTTP request listener actor.

Plain socket service

The SocketService opens its own port to listen for plain socket connections, distinct from the other HTTP-based services.

  IO(Tcp) ! Tcp.Bind(socketService, new InetSocketAddress(Configuration.host, Configuration.portTcp))

Each time a new connection is established, it is handled to a new instance of the SocketActor.

      sender ! Tcp.Register(context.actorOf(Props(classOf[SocketActor], sender)))

The actor can then handle each received data message to analyze it and send further messages into the actor system.

  val coords = "(-?\\d+\\.\\d+) (-?\\d+\\.\\d+)".r
  override def receive = {
    case Tcp.Received(data) =>
      data.utf8String.trim match {
        case coords(lng, lat) =>
          marker ! MarkerActor.Move(lng, lat)
        case msg => log.info(msg)
      }

Websocket service

The ReactiveServer websockets' server is not (yet) a Spray service since the SprayWebSockets library does not yet offer the ease of use of the RFC6455 basic methods.

The ReactiveServer allows for registration of several actors reacting each one to its own resource URL.

  private val rs = new ReactiveServer(Configuration.portWs)
  rs.forResource("/find/ws", Some(find))
  rs.forResource("/hide/ws", Some(hide))
  rs.start

As the ReactiveServer registers at most one actor for each resource URL, it can forward to this actor the arguments to its onMessage, onOpen, onClose and onError methods, wrapped as ReactiveServerMessage.

  private val reactors = Map[String, ActorRef]()
  final def forResource(descriptor : String, reactor : Option[ActorRef]) {
    reactor match {
      case Some(actor) => reactors += ((descriptor, actor))
      case None => reactors -= descriptor
    }
  }
  final override def onMessage(ws : WebSocket, msg : String) {
    if (null != ws) {
      reactors.get(ws.getResourceDescriptor) match {
        case Some(actor) => actor ! ReactiveServer.Message(ws, msg)
        case None => ws.close(CloseFrame.REFUSE)
      }
    }
  }

Note that there is no way to force the registered actor to correctly handle the four messages defining the protocol in the ReactiveServer companion object.

object ReactiveServer {
  sealed trait ReactiveServerMessage
  case class Message(ws : WebSocket, msg : String)
    extends ReactiveServerMessage
  case class Open(ws : WebSocket, hs : ClientHandshake)
    extends ReactiveServerMessage
  case class Close(ws : WebSocket, code : Int, reason : String, external : Boolean)
    extends ReactiveServerMessage
  case class Error(ws : WebSocket, ex : Exception)
    extends ReactiveServerMessage
}

HTTP services

Let's explore the ReactiveApi trait, which defines the REST endpoints. In keeping with the structure exposed above, each endpoint has been kept in its own class. The ReactiveApi trait constructs the classes for these endpoints and then concatenates their respective routes.

trait ReactiveApi extends RouteConcatenation with StaticRoute with AbstractSystem {
  this: MainActors =>

  val rootService = system.actorOf(Props(classOf[RootService], routes))
  val socketService = system.actorOf(Props[SocketService])

  lazy val routes = logRequest(showReq _) {
    new FindService(find).route ~
    new HideService(hide).route ~
    staticRoute
  }
  private def showReq(req : HttpRequest) = LogEntry(req.uri, InfoLevel)
}

The RootService then routes the incoming HTTP requests accordingly.

class RootService(route : Route) extends Actor with HttpService with ActorLogging {
  implicit def actorRefFactory = context
  def receive = runRoute(route)
}

Since the FindService is rather simple, let's study the HideService implementation.

class HideService(hide : ActorRef)(implicit system : ActorSystem) extends Directives with ApplicationJsonFormats {
  private implicit val moveFormat = jsonFormat2(MarkerActor.Move)
  lazy val route =
    pathPrefix("hide") {
      val dir = "hide/"
      pathEndOrSingleSlash {
        get {
          getFromResource(dir + "index.html")
        } ~
        post {
          handleWith {
            move : MarkerActor.Move => 
              hide ! move
              "hidden"
          }
        }
      } ~
      path("ws") {
        requestUri { uri =>
          val wsUri = uri.withPort(Configuration.portWs)
          redirect(wsUri, StatusCodes.PermanentRedirect)
        }
      } ~
      getFromResourceDirectory(dir)
    }
}

This service will handle all requests under the /hide path prefix. For the /hide/ws path, it will redirect the request to the websocket port. A special treatment is reserved to the /hide/ path. For all other URL, it will try to serve the named resource from the hide resources directory.

Depending on the HTTP method used to access the /hide/ path, either the index.html is served for a GET or, for a POST, the body of the request is unmarshalled from the JSON representation of a MarkerActor.Move object and sent forward to the hide actor.

Testing

Testing such an architecture involves testing every distinct service and the routes which lead to the static pages. Since this is the simplest task, we will start with these tests, before extending to the more complex cases for services out of the reach of Spray routing capabilities.

Testing the routes

Testing the static routes mainly consists in sending a request to the routes defined in the ReactiveApi and checking the response status and content are as expected.

There is no need to start any real HTTP server, since the HTTP request objects are simply created and directly handled to the routes.

class ReactiveApiSpec extends Specification with Specs2RouteTest with MainActors with ReactiveApi {
  def actorRefFactory = system
  "Reactive API" should {
    "return the correct page for GET requests to the pages' path" in {
      Get() ~> routes ~> check {
        status === StatusCodes.OK
        responseAs[String] must contain("Easter Eggs")
      }
      Get("/") ~> routes ~> check {
        status === StatusCodes.OK
        responseAs[String] must contain("Easter Eggs")
      }
      Get("/find") ~> routes ~> check {
        status === StatusCodes.OK
        responseAs[String] must contain("-- Find")
        responseAs[String] must contain("find.js")
        responseAs[String] must contain("href='/find/'>Reload")
      }
      Get("/hide") ~> routes ~> check {
        status === StatusCodes.OK
        responseAs[String] must contain("-- Hide")
        responseAs[String] must contain("hide.js")
        responseAs[String] must contain("href='/hide/'>Reload")
      }
    }
    // many other tests omitted
    "leave GET requests to unknown paths unhandled" in {
      Get("/play") ~> routes ~> check {
        status === StatusCodes.NotFound
      }
    }
  }

Testing the services

As the availability of the ReactiveApi does not depend on the system instantiation, this enables to write tests that exercise just that API.

Building HTTP request and passing them to the routes is very similar to the case for static resources. The main differences may be the necessity to build the entity body or to add credentials headers, before handing the request object to the routes.

@RunWith(classOf[JUnitRunner])
class HideServiceSpec extends Specification with Directives with Specs2RouteTest with MainActors with ReactiveApi {
  def actorRefFactory = system
  
  "Reactive API" should {
    "hide bunny" in {
      Post("/hide").withEntity(HttpEntity(MediaTypes.`application/json`, """
          {"longitude" : "-38.4798", "latitude" : "-3.8093"}
          """)) ~> routes ~> check {
        status === StatusCodes.OK
        responseAs[String] must === ("hidden")
      }
    }
  }
}

Testing the websocket service

In order to test the websocket service, we need to start the server from within the test.

class WebSocketTest extends FunSuite with MainActors with ReactiveApi {
  implicit lazy val system = ActorSystem("reactive-system")
  sys.addShutdownHook({system.shutdown})
  test("websocket connection") {
    val rs = new ReactiveServer(Configuration.portWs)
    rs.forResource("/find/ws", Some(find))
    rs.forResource("/hide/ws", Some(hide))
    rs.start

Then we can start one websocket to each service.

    var wsmsg = ""
    val wsf = new WebSocketClient(URI.create(s"ws://localhost:${Configuration.portWs}/find/ws")) {
      override def onMessage(msg : String) {
        wsmsg = msg
      }
      override def onOpen(hs : ServerHandshake) {}
      override def onClose(code : Int, reason : String, intentional : Boolean) {}
      override def onError(ex : Exception) {println(ex.getMessage)}
    }
    wsf.connect
    val wsh = new WebSocketClient(URI.create(s"ws://localhost:${Configuration.portWs}/hide/ws")) {
      override def onMessage(msg : String) {}
      override def onOpen(hs : ServerHandshake) {}
      override def onClose(code : Int, reason : String, intentional : Boolean) {}
      override def onError(ex : Exception) {}
    }
    wsh.connect

And only then we can really perform the tests, sending some messages through the websocket linked to the hide service and checking for the correct answer from the websocket linked to the find service.

    wsh.send("2.1523721 41.4140567")
    Thread.sleep(1000L)
    val first = """\{"move":\{"id":"[-0-9a-f]+","idx":"0","longitude":2\.1523721,"latitude":41\.4140567\}\}""".r
    assert(None != first.findFirstIn(wsmsg))

Testing the plain socket service

Testing the plain socket service will be very similar, but instead of starting the websocket listener actor for the hide service, we will start the plain socket service.

class PlainSocketTest extends FunSuite with MainActors with ReactiveApi {
  implicit lazy val system = ActorSystem("reactive-system")
  sys.addShutdownHook({system.shutdown})
  test("pure socket connection") {
    val rs = new ReactiveServer(Configuration.portWs)
    rs.forResource("/find/ws", Some(find))
    rs.start
    IO(Tcp) ! Tcp.Bind(socketService, new InetSocketAddress(Configuration.host, Configuration.portTcp))

The tests will then be performed by sending some messages through a socket to our hide service and checking for the correct answer from the websocket linked to the find service.

    val conn = new Socket
    conn.connect(new InetSocketAddress("localhost", Configuration.portTcp))
    conn.getOutputStream.write("2.1523721 41.4140567\n".getBytes)
    conn.getOutputStream.flush
    Thread.sleep(1000L)
    val first = """\{"move":\{"id":"[-0-9a-f]+","idx":"C","longitude":2\.1523721,"latitude":41\.4140567\}\}""".r
    assert(None != first.findFirstIn(wsmsg))

Summary

Next Steps

Further reading for more elaborated architectures are the following Activator templates:

comments powered by Disqus