Typesafe Activator

Spray and Websocket interfaces to actors

Spray and Websocket interfaces to actors

Alain Fagot Béarez
Source
April 18, 2015
akka spray tcp websocket scala

Some applications need to expose an API that will not only handle synchronous requests, but also open a websocket to asynchronously push new notifications to the visualization client and accept plain socket channels from other information feeders. This tutorial will explain how to implement an application that offers three different interfaces to its Akka actors: HTTP request, plain socket and websocket.

How to get "Spray and Websocket interfaces to actors" on your computer

There are several ways to get this template.

Option 1: Choose akka-spray-websocket in the Typesafe Activator UI.

Already have Typesafe Activator (get it here)? Launch the UI then search for akka-spray-websocket in the list of templates.

Option 2: Download the akka-spray-websocket project as a zip archive

If you haven't installed Activator, you can get the code by downloading the template bundle for akka-spray-websocket.

  1. Download the Template Bundle for "Spray and Websocket interfaces to actors"
  2. Extract the downloaded zip file to your system
  3. The bundle includes a small bootstrap script that can start Activator. To start Typesafe Activator's UI:

    In your File Explorer, navigate into the directory that the template was extracted to, right-click on the file named "activator.bat", then select "Open", and if prompted with a warning, click to continue:

    Or from a command line:

     C:\Users\typesafe\akka-spray-websocket> activator ui 
    This will start Typesafe Activator and open this template in your browser.

Option 3: Create a akka-spray-websocket project from the command line

If you have Typesafe Activator, use its command line mode to create a new project from this template. Type activator new PROJECTNAME akka-spray-websocket on the command line.

Option 4: View the template source

The creator of this template maintains it at https://github.com/cuali/SprayEasterEggs#master.

Option 5: Preview the tutorial below

We've included the text of this template's tutorial below, but it may work better if you view it inside Activator on your computer. Activator tutorials are often designed to be interactive.

Preview the tutorial

Goal overview

Some applications need to expose an API that will not only handle synchronous requests, but also open a websocket to asynchronously push new notifications to the visualization client and accept plain socket channels from other information feeders. This tutorial will explain how to implement an application that offers three different interfaces to its Akka actors: HTTP request, plain socket and websocket.

As an illustration, we will mimic an Easter eggs hunt, with one map for the various participants to hide their own egg and another map to find the hidden eggs.

The application will eventually present a visual interface such as the following screenshot when run with the default configuration:

Splitting up the API into services

Required resources

In order to organize our Easter eggs hunt, a deeper thought into our application will reveal the need for many resources.

  1. In order to hide an egg, we will need
    • a page to select a position on a map;
    • a JavaScript to send the new position of the hidden egg to the server.
  2. In order to present the map to find the hidden eggs, we will need
    • a page to present the eggs on a map;
    • a JavaScript to receive from the server the positions of hidden eggs and plot them on the map;
    • a set of markers to represent the hidden eggs.
  3. In order to present a common visual identity, we will need
    • a basic CSS to style both previous pages;
    • a page to embed both previous pages.
  4. In order to handle the server side magic, we will need some components already disclosed on the previous step of this tutorial.

Each of these resources could be considered as a single service. However, we basically need only two services: one to hide one single egg and one to find where the eggs were deposited.

Organizing the services

As previously shown, some of the resources are independent of these services and would be handled as common stuff. This oversimplification leaves a hole since we lost the plain socket interface and the websocket interface, called from both map pages through JavaScript.

The API will then be split into five services:

  1. the SocketService opens its own port to listen for plain socket connections, distinct from the following HTTP-based services;
  2. the FindService defines the routes to static resources (index.html and find.js) for the map;
  3. the HideService defines the routes to static resources (index.html and hide.js) for the map, as well as an HTTP-based (REST-like) webservice;
  4. the RootService handles the static routes to common stuff, as well as the routing to the above two services;
  5. the WebSocketServer websockets server is a Spray service as the above four other services.

The ReactiveSystem application object is responsible for initializing the actor system, the websockets' server, the plain socket listener actor and the HTTP request listener actor.

Plain socket service

The SocketService opens its own port to listen for plain socket connections, distinct from the other HTTP-based services.

  IO(Tcp) ! Tcp.Bind(socketService, new InetSocketAddress(Configuration.host, Configuration.portTcp))

Each time a new connection is established, it is handled to a new instance of the SocketActor.

      sender ! Tcp.Register(context.actorOf(Props(classOf[SocketActor], sender)))

The actor can then handle each received data message to analyze it and send further messages into the actor system.

  val coords = "(-?\\d+\\.\\d+) (-?\\d+\\.\\d+)".r
  override def receive = {
    case Tcp.Received(data) =>
      data.utf8String.trim match {
        case coords(lng, lat) =>
          marker ! MarkerActor.Move(lng, lat)
        case msg => log.info(msg)
      }

Websocket service

The WebSocketServer websockets' server is a Spray service using the WebSocket for spray-can despite its lacking the ease of use of the RFC6455 basic methods.

The WebSocketServer allows for registration of several actors reacting each one to its own resource URL, in much the same way as the routes are usually defined with Spray.

  val wsService = system.actorOf(Props(new RootService[WebSocketServer](wsroutes)), "wss")
  lazy val wsroutes = logRequest(showReq _) {
    new FindService(find).wsroute ~
    new HideService(hide).wsroute ~
    complete(NotFound)
  }
  IO(UHttp) ! Http.Bind(wsService, Configuration.host, Configuration.portWs)

As the WebSocketServer registers at most one actor for each resource URL, it can forward to this actor the incoming web socket's TextFrame as Message, web socket's CloseFrame and HTTP's Http.ConnectionClosed as Close as well as HTTP's Http.ErrorClosed as Error, thus getting closer to the RFC6455 methods through the use of ReactiveServerMessage.

  // this is the actor's behavior after the WebSocket handshaking resulted in an upgraded request 
  override def businessLogic = {
    case TextFrame(message) =>
      ping
      handler ! WebSocket.Message(this, message.utf8String)
// ...
    case Http.ErrorClosed(cause) =>
      handler ! WebSocket.Error(this, cause)
    case CloseFrame(status, reason) =>
      handler ! WebSocket.Close(this, status.code, reason)
    case Http.Closed =>
      handler ! WebSocket.Close(this, StatusCode.NormalClose.code, "")
// ...
  }

Note that there is no way to force the registered handler actor to correctly handle the four messages defining the server-side protocol in the WebSocket companion object.

object WebSocket {
  sealed trait WebSocketMessage
  case class Open(ws : WebSocket) extends WebSocketMessage
  case class Message(ws : WebSocket, msg : String) extends WebSocketMessage
  case class Close(ws : WebSocket, code : Int, reason : String) extends WebSocketMessage
  case class Error(ws : WebSocket, reason : String) extends WebSocketMessage
}

HTTP services

Let's explore the ReactiveApi trait, which defines the REST endpoints. In keeping with the structure exposed above, each endpoint has been kept in its own class. The ReactiveApi trait constructs the classes for these endpoints and then concatenates their respective routes.

trait ReactiveApi extends RouteConcatenation with StaticRoute with AbstractSystem {
  this : MainActors =>
  private def showReq(req : HttpRequest) = LogEntry(req.uri, InfoLevel)

  val rootService = system.actorOf(Props(new RootService[BasicRouteActor](routes)), "routes")
  lazy val routes = logRequest(showReq _) {
    new FindService(find).route ~
    new HideService(hide).route ~
    staticRoute
  }
  val wsService = system.actorOf(Props(new RootService[WebSocketServer](wsroutes)), "wss")
  lazy val wsroutes = logRequest(showReq _) {
    new FindService(find).wsroute ~
    new HideService(hide).wsroute ~
    complete(NotFound)
  }
  val socketService = system.actorOf(Props[SocketService], "tcp")
}

The RootService then routes the incoming HTTP requests accordingly.

class RootService[RA <: RouteActor](val route : Route)(implicit tag : ClassTag[RA]) extends HttpServiceActor with ActorLogging {
  override def receive = {
    case connected : Http.Connected =>
      // implement the "per-request actor" pattern
      sender ! Http.Register(context.actorOf(Props(tag.runtimeClass, sender, route)))
    case whatever => log.debug("RootService got some {}", whatever)
  }
}
trait RouteActor extends HttpServiceActor {
  def connection : ActorRef
  def route : Route
}
private[api] class BasicRouteActor(val connection : ActorRef, val route : Route) extends RouteActor {
  override def receive = runRoute(route)
}

Since the FindService is rather simple, let's study the HideService implementation instead.

class HideService(val hide : ActorRef)(implicit system : ActorSystem) extends Directives with ApplicationJsonFormats {
  private implicit val moveFormat = jsonFormat2(MarkerActor.Move)
  lazy val route =
    pathPrefix("hide") {
      val dir = "hide/"
      pathEndOrSingleSlash {
        get {
          getFromResource(dir + "index.html")
        } ~
        post {
          handleWith {
            move : MarkerActor.Move =>
              hide ! move
              "hidden"
          }
        }
      } ~
      path("ws") {
        requestUri { uri =>
          val wsUri = uri.withPort(Configuration.portWs)
          system.log.debug("redirect {} to {}", uri, wsUri)
          redirect(wsUri, StatusCodes.PermanentRedirect)
        }
      } ~
      getFromResourceDirectory(dir)
    }
  lazy val wsroute = 
    pathPrefix("hide") {
      path("ws") {
        implicit ctx =>
          ctx.responder ! WebSocket.Register(ctx.request, hide, true)
      }
    }
}

This service will handle all requests under the /hide path prefix. For the /hide/ws path, it will redirect the request to the websocket port. A special treatment is reserved to the /hide/ path. For all other URL, it will try to serve the named resource from the hide resources directory.

Depending on the HTTP method used to access the /hide/ path, either the index.html is served for a GET or, for a POST, the body of the request is unmarshalled from the JSON representation of a MarkerActor.Move object and sent forward to the hide actor.

Testing

Testing such an architecture involves testing every distinct service and the routes which lead to the static pages. Since this is the simplest task, we will start with these tests, before extending to the more complex cases for services out of the reach of Spray routing capabilities.

Testing the routes

Testing the static routes mainly consists in sending a request to the routes defined in the ReactiveApi and checking the response status and content are as expected.

There is no need to start any real HTTP server, since the HTTP request objects are simply created and directly handled to the routes.

class ReactiveApiSpec extends Specification with Specs2RouteTest with MainActors with ReactiveApi {
  def actorRefFactory = system
  "Reactive API" should {
    "return the correct page for GET requests to the pages' path" in {
      Get() ~> routes ~> check {
        status === StatusCodes.OK
        responseAs[String] must contain ("Easter Eggs")
      }
      Get("/") ~> routes ~> check {
        status === StatusCodes.OK
        responseAs[String] must contain ("Easter Eggs")
      }
      Get("/find") ~> routes ~> check {
        status === StatusCodes.OK
        responseAs[String] must contain ("-- Find")
        responseAs[String] must contain ("find.js")
        responseAs[String] must contain (">Find<")
      }
      Get("/hide") ~> routes ~> check {
        status === StatusCodes.OK
        responseAs[String] must contain ("-- Hide")
        responseAs[String] must contain ("hide.js")
        responseAs[String] must contain (">Hide<")
      }
    }
    "return the javascripts for GET requests to the js files" in {
      Get("/find/find.js") ~> routes ~> check {
        status === StatusCodes.OK
        responseAs[String] must contain ("function")
      }
      Get("/hide/hide.js") ~> routes ~> check {
        status === StatusCodes.OK
        responseAs[String] must contain ("function")
      }
    }
    "return the image for GET requests to the markers files" in {
      Get("/markers/marker0.png") ~> routes ~> check {
        status === StatusCodes.OK
        mediaType must be (MediaTypes.`image/png`)
      }
      // ...
      Get("/markers/markerC.png") ~> routes ~> check {
        status === StatusCodes.OK
        mediaType must be (MediaTypes.`image/png`)
      }
    }
    "redirect the GET requests to websocket paths" in {
      Get("/find/ws") ~> routes ~> check {
        status === StatusCodes.PermanentRedirect
      }
      Get("/hide/ws") ~> routes ~> check {
        status === StatusCodes.PermanentRedirect
      }
    }
    "leave GET requests to unknown paths unhandled" in {
      Get("/play") ~> routes ~> check {
        status === StatusCodes.NotFound
      }
    }
  }
}

Testing the services

As the availability of the ReactiveApi does not depend on the system instantiation, this enables to write tests that exercise just that API.

Building HTTP request and passing them to the routes is very similar to the case for static resources. The main differences may be the necessity to build the entity body or to add credentials headers, before handing the request object to the routes.

@RunWith(classOf[JUnitRunner])
class HideServiceSpec extends Specification with Directives with Specs2RouteTest with MainActors with ReactiveApi {
  def actorRefFactory = system

  "Reactive API" should {
    "hide bunny" in {
      Post("/hide").withEntity(HttpEntity(MediaTypes.`application/json`, """
          {"longitude" : "-38.4798", "latitude" : "-3.8093"}
          """)) ~> routes ~> check {
        status === StatusCodes.OK
        responseAs[String] must === ("hidden")
      }
    }
  }
}

Testing the websocket service

In order to test the websocket service, we need to start the server from within the test.

@RunWith(classOf[JUnitRunner])
class WebSocketTest extends FunSuite with MainActors with ReactiveApi {
  implicit lazy val system = ActorSystem("reactive-socket-WebSocketTest")
  sys.addShutdownHook({ system.shutdown })
  test("websocket connection") {
    val wss = system.actorOf(Props(new RootService[WebSocketServer](new FindService(find).wsroute ~ new HideService(hide).wsroute)), "wswss")
    IO(UHttp) ! Http.Bind(wss, Configuration.host, Configuration.portWs)
    Thread.sleep(2000L) // wait for all servers to be cleanly started

Then we can start one websocket client to each service.

    var wsmsg = ""
    val wsf = system.actorOf(Props(new TestingWebSocketClient {
      override def businessLogic = {
        case WebSocket.Release => close
        case TextFrame(msg) => wsmsg = msg.utf8String
        case whatever => // ignore
      }
    }))
    wsf ! WebSocket.Connect(Configuration.host, Configuration.portWs, "/find/ws")
    val wsh = system.actorOf(Props(new TestingWebSocketClient {
      override def businessLogic = {
        case WebSocket.Send(message) =>
          log.info("Client sending message {}", message)
          send(message)
        case WebSocket.Release => close
        case whatever => // ignore
      }
    }))
    wsh ! WebSocket.Connect(Configuration.host, Configuration.portWs, "/hide/ws")

And only then we can really perform the tests, sending some messages through the websocket linked to the hide service and checking for the correct answer from the websocket linked to the find service.

    wsh ! WebSocket.Send("2.1523721 41.4140567")
    Thread.sleep(1000L)
    val first = """\{"move":\{"id":"[-0-9a-f]+","idx":"0","longitude":2\.1523721,"latitude":41\.4140567\}\}""".r
    assert(None != first.findFirstIn(wsmsg))

Testing the plain socket service

Testing the plain socket service will be very similar, but instead of starting the websocket listener actor for the hide service, we will start the plain socket service.

@RunWith(classOf[JUnitRunner])
class PlainSocketTest extends FunSuite with MainActors with ReactiveApi {
  implicit lazy val system = ActorSystem("reactive-socket-PlainSocketTest")
  sys.addShutdownHook({ system.shutdown })
  test("pure socket connection") {
    val wss = system.actorOf(Props(new RootService[WebSocketServer](new FindService(find).wsroute)), "pswss")
    IO(UHttp) ! Http.Bind(wss, Configuration.host, Configuration.portWs)
    IO(Tcp) ! Tcp.Bind(socketService, new InetSocketAddress(Configuration.host, Configuration.portTcp))
    Thread.sleep(2000L) // wait for all servers to be cleanly started

The tests will then be performed by sending some messages through a socket to our hide service and checking for the correct answer from the websocket linked to the find service.

    val conn = new Socket
    conn.connect(new InetSocketAddress("localhost", Configuration.portTcp))
    conn.getOutputStream.write("2.1523721 41.4140567\n".getBytes)
    conn.getOutputStream.flush
    Thread.sleep(1000L)
    val first = """\{"move":\{"id":"[-0-9a-f]+","idx":"C","longitude":2\.1523721,"latitude":41\.4140567\}\}""".r
    assert(None != first.findFirstIn(wsmsg))

Summary

This tutorial exposed a solution to implement an application that offers three different kind of interfaces to its Akka actors: HTTP request, plain socket and websocket.

Testing such an architecture involves testing every distinct service and the routes which lead to the static resources. Strategies to organize the tests have been presented.

comments powered by Disqus