Typesafe Activator

Akka IO over TCP, Spray and MySQL Reactive

Akka IO over TCP, Spray and MySQL Reactive

vngrs
Source
September 25, 2013
scala akka spray spray-can spray-client mysql-async postgresql-mysql-async

Illustrates a reactive application which uses Akka IO for TCP connections, Spray-client for making non-blocking API calls and postgresql/mysql-async for reactive SQL operations.

How to get "Akka IO over TCP, Spray and MySQL Reactive" on your computer

There are several ways to get this template.

Option 1: Choose tcp-async in the Typesafe Activator UI.

Already have Typesafe Activator (get it here)? Launch the UI then search for tcp-async in the list of templates.

Option 2: Download the tcp-async project as a zip archive

If you haven't installed Activator, you can get the code by downloading the template bundle for tcp-async.

  1. Download the Template Bundle for "Akka IO over TCP, Spray and MySQL Reactive"
  2. Extract the downloaded zip file to your system
  3. The bundle includes a small bootstrap script that can start Activator. To start Typesafe Activator's UI:

    In your File Explorer, navigate into the directory that the template was extracted to, right-click on the file named "activator.bat", then select "Open", and if prompted with a warning, click to continue:

    Or from a command line:

     C:\Users\typesafe\tcp-async> activator ui 
    This will start Typesafe Activator and open this template in your browser.

Option 3: Create a tcp-async project from the command line

If you have Typesafe Activator, use its command line mode to create a new project from this template. Type activator new PROJECTNAME tcp-async on the command line.

Option 4: View the template source

The creator of this template maintains it at https://github.com/vngrs/tcp-async.

Option 5: Preview the tutorial below

We've included the text of this template's tutorial below, but it may work better if you view it inside Activator on your computer. Activator tutorials are often designed to be interactive.

Preview the tutorial

Introduction

tcp-async, as the name implies, handles TCP connections reactively using Akka I/O. It also makes reactive API calls using spray-client, and non-blocking SQL DB operations using postgresql/mysql-async.

Thanks to akka-io, tcp-async can serve high amount of requests concurrently. Whole application is designed to be reactive top to bottom.

A few options for handling requests is already created for you. Basic echo handler is the simplest, but there are other handlers as well; writing incoming data to a MySQL database, and making an API call.

All those handlers are reactive, and based on some open source libraries which will be mentioned later.

Lets start with the server now.

Waiting For Clients: Server

Server is an actor waiting for connections which can be made over HTTP, TCP or UDP. You should implement a Server actor for the type you wish to use. tcp-async comes with a built in TcpServer.

Each Server actor binds to an InetSocketAdress. In this case, TcpServer listens for incoming Tcp connections with the following binding:

IO(Tcp) ! Tcp.Bind(self, new InetSocketAddress(appHostName, appPort))

Just below that, you can see receive function:

override def receive = {
    case Tcp.CommandFailed(_: Tcp.Bind) => context stop self

    case Tcp.Connected(remote, local) =>
    val handler = context.actorOf(handlerProps.props(sender))
    sender ! Tcp.Register(handler)
}

When an actor is bound to an InetSocketAddress, it either receives a CommandFailed or a Connected message. CommandFailed message is sent if there was an error binding to the given InetSocketAddress. Connected message is sent whenever there is a client requesting for a connection. Every connection must have its own Handler to process incoming data without blocking the server. This is done by creating a new handler actor and registering that handler to the current connection by sending a Register message.

The key part here is that connection with sender(client) is not managed by Server actor itself. Every connection is handled by a Handler actor instance. Thus the Server actor is free to get new connections, and indeed, a simple load test shows that 10K requests can be easily handled by tcp-async.

Remember, 10K actors doesn't mean 10K threads.

Now lets move on to Handlers.

Deciding What To Do: Handler

Handler is an Actor which deals with the client connection and decides what to do with incoming requests.

Handler takes sender actor as a parameter. Communication with the remote client is performed using this actor.

abstract class Handler(val connection: ActorRef) extends Actor {
    // ...
}

Looking further into the code, you will see that there are six different messages that can be consumed, five of which is actually related to the way the connection is closed, and last one is about the incoming data. Quickly summarizing the first five,

    PeerClosed      => Closed from remote endpoint
    ErrorClosed     => Closed because of an error
    Closed          => Closed as a response to Close message
    ConfirmedClosed => Closed as a response to ConfirmedClose message
    Aborted         => Closed as a response to Abort message

You can read more details about the ways to close a Tcp connection and further explanation of each message from here.

Continuing with the more exciting part of handling incoming data, we should check for the Received message. Received message contains a ByteString as you might expect, and its up to the handler to give that ByteString a meaning. Here for the purposes of this tutorial we decode the ByteString into String using UTF-8, trim any whitespaces and call received function with it. Here is how its done,

def receive: Receive = {
  case Received(data) =>
    data.utf8String.trim match {
      // ...
      case str => received(str)
    }
  // ...
}

Concrete handler classes should implement the received function, processing the incoming data as they please. Lets see a simple example, an echoing handler.

Repeat After Me: EchoHandler

EchoHandler, as you can guess, echoes incoming message.

This is achieved by sending a Write message to the connection actor. Write message requires a ByteString to be sent over the connection, and as an EchoHandler, incoming data is given as the ByteString.

def received(data: String) = connection ! Write(ByteString(data + "\n"))

Since abstract Handler does bulk of the task for us, EchoHandler is plain simple.

Running EchoHandler

Hey, you should be wondering how you can run and test EchoHandler now. Don't worry its already done for you. Go ahead and check MainWithEchoHandler, you will see that it simply creates an ActorSystem for Server and Handler actors to live within, then it fires up a TcpServer actor, giving in EchoHandlerProps as a parameter. You may run this MainWithEchoHandler now, just go to Run, and make sure you pick the correct main class from the list.

Once, its up and running, all you have to do is to connect and send some data in. Here is an example session using telnet:

#Connect on default host and port
telnet localhost 9999
telnet> Hello World!
Hello World!
telnet> close
Connection closed by foreign host.

You should try it on your own now. Keep in mind that you can have as many connections as you wish, try opening up more than one telnet sessions. You may also want to try different keywords for closing (Abort, Close, ConfirmedClose). Come back once you are done!

Lets move on to a more complex example, shall we?

Ground Control To Major Tom: ApiHandler

ApiHandler makes Api calls to Google Elevation Api. Google elevation api simply returns the elevation of given coordinates.

Just like in EchoHandler, received function is implemented to handle incoming data, which is assumed to be coordinates now. What it does is to basically create a URL using the API url and incoming coordinates, and then do a non-blocking HTTP request and Write'ing out the response of the API.

You may have already noticed that Api object is used for making the HTTP request. Api object is a wrapper for spray-client, in which we create an HttpRequest instance via createHttpRequest function and use that instance in a pipeline to do the actual request in a reactive fashion.

Note that sendAndReceive produces an anonymous function with type (HttpRequest) => Future[HttpResponse] which you can make your HTTP request with.

Running ApiHandler

Well, you are experienced now, don't you? Go ahead and run MainWithApiHandler and see it in action. Here is a sample session to guide you, just make sure you enter the coordinates in latitude,longitude form:

#Connect on default host and port
telnet localhost 9999
telnet> 28.976,41.014
{
   "results" : [
      {
         "elevation" : 752.8604736328125,
         "location" : {
            "lat" : 28.976,
            "lng" : 41.014
         },
         "resolution" : 152.7032318115234
      }
   ],
   "status" : "OK"
}

telnet> close
Connection closed by foreign host.

Hey, there is one more handler before we wrap it up. Up to now, you see how to handle incoming requests reactively, and also how to make API calls to remote services reactively. I know its already delicious, but how about adding some reactive DB operations on top as icing? Just follow the lead...

Remember, Remember: DbHandler

DbHandler writes incoming message to a MySQL database and returns all the data in database to sender.

Note: In order to use DbHandler, you must have a MySQL Database and valid database configuration for it. You see the details of configuration on the next page.

postgresql/mysql-async is used as MySql driver.

postgresql/mysql-async

postgresql/mysql-async is an async, Netty based database driver for MySQL and PostgreSQL written in Scala.

Under db folder, there are two objects, Pool and DB.

Pool defines a connection pool for using database and keeps certain amount of connections ready to use. If the pool has available connections, it will execute the given queries immediately, otherwise queries will be pushed into a queue to be executed in future. Pool throws a PoolExhaustedException whenever the queue is full (queue size is defined in application.conf) Connections are returned back to pool after execution is done.

DB is used to execute queries using Pool.

def execute(query: String, values: Any*): Future[QueryResult] = {
  if (values.size > 0)
    pool.sendPreparedStatement(query, values)
  else
    pool.sendQuery(query)
}

If there are values to be set, then the query is assumed to be a prepared statement and is executed with given values bound. Otherwise, the query is executed on the Db as is, which means that there is no parameter binding, nor prepared statement.

If you only need Seq[RowData], you can use fetch:

def fetch(query: String, values: Any*): Future[Option[Seq[RowData]]] =
  execute(query, values: _*).map(_.rows)

fetch is used as part of DbHandler, where the latest state of the table is sent back through the connection.

Running DbHandler

This one is a bit different. First of all, it requires a MySQL server. Then, the SQLs from setup.sql.conf should be executed in order to create the database and table that this handler uses. Finally, application.conf should be updated to reflect your MySQL settings.

If you are all set, just run MainWithDbHandler and see it in action. Here is a sample session to guide you:

#Connect on default host and port
telnet localhost 9999
telnet> testing
values in db are:
testing--Sun Sep 22 23:04:54 UTC 2013
telnet> Hello World!
values in db are:
testing--Sun Sep 22 23:04:54 UTC 2013
Hello World!--Sun Sep 22 23:05:03 UTC 2013
telnet> close
Connection closed by foreign host.

There you have it. All your messages are recorded and listed back to you with dates appended, and all of this happens in a reactive way, without blocking any thread.

Rest of this tutorial will tell you a bit more about the configuration of this application, and list some next steps for you to enjoy. Oh by the way, if you want to remove all traces of messages you sent, just execute the SQL from destroy.sql.conf.

Configuration

Configuration parameters of the application can be found in application.conf. Considering the size of this application, having a single configuration file helps keeping it concise. Configuration is organized hierarchically into 4 parts, namely tcp-async, app, api and db.

When the application starts, configuration is loaded into the Conf object. Its always a good practice to load configuration parameters into the Conf object and read them through it. Mainly because the values can be validated and replaced with defaults in case of an illegal parameter value, and it also prevents application to read .conf file every time. For more information, please see typesafe config.

From Now On

Thanks for following this tutorial. It was just a quick glance at the reactive technologies build around Scala. Dive deeper through the links below for more excitement.

spray-client have useful features like Authentication or marshalling/unmarshalling from and to your custom types.

If you need a more basic Http server/client module, you can look at spray-can which spray client also uses under the hood.

postgresql/mysql-async has many more features than used in tutorial. Take a look at it.

You can also handle HTTP or UDP connections with Akka I/O.

You can define new Servers, Handlers or merge them according to your needs. May the reactive be with you.

comments powered by Disqus