Akka and parameterized mutable Actor state

After doing just Python for almost one year, I am back on the JVM with some recent Scala projects. In one of the projects I had the chance to try Akka for the first time – which is an amazing library. In one of my Actors, and I think this is quite a common use case, I needed to run some initialization logic based on the Actors constructor arguments. During construction, the Actor would initialize an object that was expensive to create. This object would then be re-used in the receive method of the Actor.

I knew that Actor instances were shared, i.e. multiple calls to the receive method would be done on the same Actor object. So being new to Akka, I was afraid of having shared mutable state within my Actor and I was researching for a better way to do the initialization, other than just having a mutable field. This is when I found out about the FSM (Finite State Machine) trait. It is a perfect way to model initialization. I created two States for my Actor (if you want to do initialization in multiple Actors it’s a good idea to keep the common states, data holders and initialization messages in a separate object)

Individual states and data holder are then created in the individual Actors. The parent (supervisor) would then create the Actor, send an Initialize message which would in turn create the expensive object. The actor would then move itself to the next state and be ready to receive the further messages.

While this is a very nice way to model initialization one big problem became apparent – restarts. As soon as my Actor failed with an exception in the Initialized state, the parent Actor would restart it in the New state. This made the Actor pretty much unusable. One potential solution to this are probably the lifecycle methods. I could have overwritten the postRestart method in my Actor, where I have access to the constructor arguments, to send an initialization message to myself. But instead and against my gut feeling I decided to use a mutable field instead.

As I learned later, even though multiple threads share the same Actor instance, Akka guarantees that only a single thread will handle a message in the receive method at a time (also called The Actor subsequent processing rule). So now I set the mutable field to a None (Option type) and on the first message that arrives the field is initialized properly to a Some. This works fine but throws up some interesting questions. Since Akka is using Dispatchers (thread pools), subsequent messages in an Actor are most likely handled by different threads. In Java, changes to fields of shared objects done in one thread are not always visible to other threads (unless the field is volatile, the modification is done in a synchronized code section or in a section guarded by a Lock). Apparently this is not a problem for Akka.

In layman’s terms this means that changes to internal fields of the actor are visible when the next message is processed by that actor and you don’t need to make the fields volatile.

Unfortunately it is not further explained how Akka achieves this. The visibility problem DOES exist in Akka - if Actor's contain fields that are modified when receiving a message (i.e. some immutable field of ArrayBuffer where elements are added and removed in the receive method). In that case, how does Akka make sure that those changes are seen in other threads when the next message arrives? In my application at least, I had one issue which seemed to be a visibility problem. Unfortunately until now I wasn’t able to isolate and reproduce this problem in a unit test :( What I have so far (some parts need to be added).

Have to fill the gap and do the HTTP POST. What I have seen is a print indicating that a smaller batch has been pushed out – which can ultimately only be a visibility issue. My guess it that the culprit is either my asynchronous POST using the Dispatch library or the way clear() is implemented in the ArrayBuffer class. Further investigating. For now, this change got rid of the problem for me.

Generating REST docs with Scala and Finatra

More than 2 years ago I wrote a Blog post about Enunciate - a tool which helps you to generate a nice documentation for your REST API if you use Java and JAX-RS. I like documentation that exists very close to the code and is created and updated while you implement the main functionality. This kind of documentation has also been recommended in the Pragmatic Programmer book.

I have not been using JAX-RS and Servlet in a while. We are currently implementing most of our REST API’s on top of Finagle, a RPC System created in the Twitter software forge that runs on Netty. While it is possible to use Finagle directly together with Scala path matching for the routes, I could not find a clever way for self-updating documentation close to the code. Fortunately there is another Twitter project called Finatra, which puts a Sinatra-Flask alike web framework on top of Finagle. Finatra will not only make it easier to define Resources and Routes but also help you with the documentation.

Here is a how you typically define a route in Finatra:

For the documentation itself I am using Swagger, which can generate HTML from annotations. Swagger already comes with a bunch of useful annotations. Unfortunately some annotations like a @Path equivalent was missing, so I was forced to use some JSR-311 (JAX-RS) instead, even though we are not using JAX-RS for the API. Here is the evolution of the Finatra controller from above with the Swagger and JSR-311 annotations added. As you can see it was necessary to move the routes from the constructor into separate methods that can be annotated. This makes the Scala code a bit uglier and harder to read, especially if you have a lot of annotations in place. But hey, you will love the outcome.

The final step is to generate the documentation during our Maven build. We are using the maven-swagger-plugin for that. I even copied and customized the strapdown.html.mustache from the plugin into our project, so that we could tweak the generated documentation and use another Twitter Bootstrap theme instead.

The outcome will be a generated docs.html file in the target folder of your build. The docs.html will contain autoreplaced.com as path - which was specified in the maven-swagger-plugin. I normally replace “autoreplaced.com” with JavaScript (something that can easily be done if you use your own Mustache template). Also it is nice to have Finatra render the docs.html file.

Embedded Cassandra with Python and Java

Testing is important. When developing applications based on Cassandra and Java, you have a lot of options that help you testing your code during development. Unfortunately when using Python it is not as great. For instance there is no straight-forward solution to start an embedded Cassandra server from Python, that your unitests (or rather integration tests) can communicate with. The good news is, starting a Java process from Python code is dead easy. Using this hybrid-approach, we can easily write Cassandra integrated unittests under Python.

First of all I took the existing cassandra-unit library and tweaked it. Normally when starting embedded Cassandra via cassandra-unit, you specify a configuration file (cassandra.yaml) and optionally a temporary directory. Cassandra-unit would then load the file as a classpath resource and copy the contents to a new file into the specified temp directory (default target/embeddedCassandra). Hence the file has to be on the classpath and the path has to be relative. I thought it was much nicer if you could send instead an absolute path and the configuration file was used directly from where it was located. So from Python we could later modify the Cassandra config file the way we wanted and also put it in the final location. So the first thing you want to do is to clone the tweaked embedded-cassandra-starter code from git and create a jar artifact by running (yes you need to use Maven)

The outcome of this will be a jar file that you can put into your Python project, i.e. under resources/cassandra. The next thing you need is a vanilla Cassandra configuration file (cassandra.yaml) in your project. We also have that one checked in along with the jar file, i.e. under resource/cassandra and we called it cassandra.yaml.template

We use the .template extension because the file contains two placeholders ({{CASSANDRA_DIR}} and {{CASSANDRA_PORT}}) which will be replaced later. A great co-worker of mine then wrote a Python class called EmbeddedCassandra. This class will in its __init__ method find an available port and create a random directory in the systems temporary directory (let’s call it work directory for now). EmbeddedCassandra also has a start and a stop method. The start method will copy the configuration template file into the work directory and replace the two placeholders mentioned above. Finally it will start a new Java process using using the subprocess module. It will basically invoke the jar file that we build earlier in the same way as you would from the command-line (Example can be found here). The stop method in EmbeddedCassandra will bring down the process and do some cleanup.

All of this is now wrapped into a EmbeddedCassandraTestCase class, which acts as a base class for unit tests that want to test against Cassandra. This class invokes start and stop in its setup and tearDown method.

So now you are able to write some nice Python unittests (rather integration tests) against Cassandra, for instance using the great Pycassa library. Here is a simple example.

Investigating Cassandra Heap

We are working on a new application which will use Apache Cassandra. Yesterday a co-worker sent me the following warning, which we kept seeing in the logs every now and then on several nodes. I was asked if this was something to worry about.

WARN [ScheduledTasks:1] 2013-01-07 12:14:10,865 GCInspector.java (line 145) Heap is 0.8336618755935529 full. You may need to reduce memtable and/or cache sizes. Cassandra will now flush up to the two largest memtables to free up memory. Adjust flush_largest_memtables_at threshold in cassandra.yaml if you don't want Cassandra to do this automatically.

The warning is a bit misleading as you will see in a bit - but hey, using 83% of your JVM heap memory should always ring at least some alarm bells. Since I haven’t used Cassandra that much, I needed to investigate how it uses its heap memory. We are using Datastax Community Edition 1.1.x, so the first place to look for more information was Opscenter. Bu it didn’t give me much information about the heap. Next I went into one cluster node via SSH to see if I could get out some stats via JMX, as I was suspecting a big cache to be the problem. For the first time I used jmxterm instead of commandline-jmxclient. So to get some numbers for Cassandras key and row cache via JMX you can do this:

Obviously we were running defaults for the 2 caches. The key cache was very small and the row cache not even enabled. By default Cassandra 1.1 assigns 5% of the JVM heap memory to the key cache, though never more than 100 MB. As next step I wanted to find out how the heap memory was actually used. So I ran jmap -heap `pgrep java` as explained here. Make sure you have only 1 java process running otherwise feed in the pid manually to jmap. Note: doing a heap dump to file wasn't such a great idea. It stopped after about 20 minutes. At this point the dump file was 2.7 GB big and the node had already left the cluster.

Apparently 2.8 GB of our 4 GB heap were used in the old generation (also called concurrent mark and sweep generation if a CMS GC algorithm is in use). The old generation contains objects that have survived a couple of collections in the various stages of the young generation. After reading this blog post about Cassandra GC tuning and this description from Oracle, I was thinking that the old generation might be filled because the JVM never did a major collection. Apparently if –XX: CMSInitiatingOccupancyFraction is not changed via the JAVA_OPTS, a major collection would only be issued at approximately 92% of usage. So if Cassandra was flushing the largest memtable every time at 0.75 percent (default value for flush_largest_memtables_at in cassandra.yaml) it would free heap memory therefore preventing a concurrent major collection.

Then however I realized that we were still running with the default value for memtable_total_space_in_mb, which is the only setting for memtables since Cassandra 1.0. The default is to use a maximum of 1/3 of the JVM heap. So something else was eating up the heap memory, not memtables. So Cassandra dropping the largest memtable at 75% seems kind of desperate in our scenario. So with caching and memtables not being the culprits, what else was left? It turned out the bloom filter for the amount of data and the number of nodes we have, was getting very big. Our test cluster has 6 nodes and the total data size is around 400 GB. Cassandra uses a bloom filter in front of its SSTables to check if a row exists before it does disk IO. This is an extra layer that, if tuned properly, can make Cassandra access to column families more efficient because disk IO is slow. A bloom filter is a probabilistic data structure. It can give you false positives, meaning it will tell you a record exists in an SSTable but it does not. It will however never tell you a record does not exist while it exists in reality (false negative).

The false positives ratio can be tuned using the bloom_filter_fp_chance parameter in cassandra.yaml. We were running default of 0.1 for this parameter, which I think accounts for a 10% chance of a false positive. The value can be anything between 0 and 1. Well nothing is for free and having a better bloom filter increases the size of the data structure.

The bloom filter is defined per column family. So one way to bring down the size of a bloom filter in Cassandra, is to evaluate your column families. Column families which are not getting a lot of read requests should be fine without a effective bloom filter. Another possibility is to add more nodes to the cluster, so that each node maintains less data therefore bringing also down the size of the bloom filter. Finally here is some good news for Cassandra 1.2 (still waiting for the Datastax release for 1.2). The bloom filter can run off-heap since Cassandra 1.2. For this to work you need to enable Java Native Access (JNA), which isn’t done by default when installing Cassandra (even when installing from the Debian packages from what I heard). Running the bloom filter off-heap will solve your immediate heap problems. As far as I know it is not recommended to run Cassandra with more than 8GB of heap memory. However you still need to tune your bloom filter in regards to data size, number of nodes and false positives ratio. Otherwise you might run out of system memory. Finally also tuning the CMS garbage collection is useful. I think we will set it up to be incremental.