ThriftMux Unit Test Roundtrip

I have decided to write this blog post mainly for my own reference. Today I was working on a Thrift service that I wanted to start and test within a unit test. All our services are using the Finagle RPC framework with ThriftMux as transport. I will show you how to find a free port on your machine to start the server on and then create a client to invoke that server. Given this Thrift IDL file.

using Scrooge or the scrooge-sbt-plugin, this is a ScalaTest that does exactly that.

Dynamic Per-Request Log Level

Looks like it took me three years to write something new for this blog. Kids and other cool stuff got in the way - you know the score! I also should say, that I haven't done anything in Java in the past three years (almost). So to be more accurate, I should rename the blog to but whatever - I just drop my Scala blog posts here anyways!

This week I implemented a great idea that a co-worker came up with. Have you ever been in a situation, where you have a running system with a bunch of microservices in production and suddenly something doesn't work as expected? In the world of Battlefield, examples could be that players cannot purchase Battlepacks on PS4 or that matchmaking stopped working. So how do you find the root cause of the problem? Right, you want to look at some logs to get additional information. Only problem is, that your log level in production is usually quite high, for instance WARN or even ERROR - otherwise the amount of logging would just be too much. Wouldn't it be great to alter the log level dynamically on a per-request basis? This would allow you to test in production using TRACE logging – for just your test user. Finagle contexts to the rescue!

Here at DICE we have build all our Scala backend services based on Twitters Finagle framework – which is similar to the Tokio Framework in Rust if you have used that. In a nutshell Finagle is a RPC framework on the JVM with build-in support for various transport protocols, load balancing, service discovery, backoff etc. One semi-hidden feature of Finagle is the broadcast context. Think of the broadcast context as a ThreadLocal that is send along with every request through an RPC graph - from microservice to microservice. Finagle itself uses this internally, for instance to send a unique trace id along with each request. In my implementation, I have used the broadcast context to allow for a per-request log level override. Let's get our hands dirty! The first thing you want to implement is a new Key that Finagle can send over the RPC graph.

Essentially each Key need to implement two methods marshal and unmarshal, so that Finagle knows how to convert the Key from and to a Byte Array. I am not sharing this code here, but if you want to see how to unit test your code, Finagle has an example. No that we have a class for the log level override defined, we need code to set the override into as well as code to read the override from the broadcast context.

In most system architectures you have one system on the outside of your cluster. Here at DICE we call this system the gateway and it is the only service that is accessible from the public internet. All requests arrive at the gateway and it is the root node in the RPC graph. In other words, the gateway calls other microservices, which might call other microservices and so on. The most logical choice to define a log level override would be inside a Finagle Filter. I haven't actually written the Filter yet but it would look similar to this.

You have to be very careful with the Filter as this code is executed for every request entering your system! Now that we have code to set a log level override into the broadcast context, let's actually use it somewhere. To make this a seamless as possible for the developers, it is helpful if all your microservices share the same logging setup. For instance do we use slf4j with logback and the LoggerContext is set up programmatically inside a trait that every microservice is using (btw. our services follow the Twitter Server template).

As you can already guess now, reading from the broadcast context and actually using the override is wrapped inside a logback TurboFilter. Logback consults the filter for every log event and you can use this to decide if something should be logged or not. The following filter reads from the broadcast context and then makes a decision based on a potential override.

Conclusion: you can use Finagles broadcast context to transport a log level override through an RPC graph. You need some service to set the override in the context. It is helpful if this system is on the outside of your architecture and preferably uses HTTP. With HTTP it is easy to write a Finagle Filter and base the override on the HTTP request, i.e. by looking at the HTTP headers. Finagle transports the override magically through your RPC call graph and any microservice can use the override for logging decisions. To make this as simple as possible encapsulate this decision logic in some code that is shared between all your microservices.

Two Scala Serialization Examples

In the last two days I’ve been looking into ways to serialize and deserialize some Scala objects. I tested a few suggestions that were mentioned on this post on Stackoverflow. As a reference for myself (and because sometimes it is hard to find good examples) I am adding two examples for Scala Pickling and Twitter Chill. Let’s have a basic SBT project first.

Since I work with the Battlefield franchise let’s create some domain classes that we are going to serialize and deserialize.

The first candidate will be Scala Pickling. The following code pickles a List of 3000 random WeaponAccessory instances.

Unfortunately the code doesn't even compile properly. Scala Pickling uses Macros and advanced Scala compile features. Trying to compile Pickling.scala fails during compilation. Also people are encouraged to depend on a SNAPSHOT version which means you are always depending on the latest patches. When I wrote this blog post I hit this issue. Verdict: scala-pickling is very easy to use and works great for very simple stuff. As soon as your object graph gets a bit more complicated you will hit weird errors. Another problem is the lack of a non-SNAPSHOT version.

The seconds test candidate was Twitter Chill which is based on Kryo. chill-scala adds some Scala specific extensions. Your SBT project should depend on chill directly, which contains the code in chill-scala (which isn’t published separately). Even though they don’t have Scala examples in their Github documentation and I got some cryptic errors first when doing stuff wrong - I have to say this is an awesome library that works great! Also the authors reply fast on Twitter. Verdict: highly recommended!

SBT and faster RPM packaging

We do a lot of Scala coding nowadays and I am trying to introduce SBT as build tool to all our new Scala projects. When we deploy these applications to Amazon EC2 nodes, we use Chef Solo and the Instance User Data feature to install an RPM file. We don’t use custom AMI’s. The RPM file is hosted in S3 and made available as package via this yum plugin. Each time we build our project via our continuous integration server (Bamboo), a new RPM package is created and uploaded to S3.

It became more and more of a problem that building that particular application in Bamboo took a long time. The build plan ran for more than 10 minutes. So yesterday I spent some time to make it build a bit faster.

First of all I have to say it is pretty lame that the SBT plugin is broken in Bamboo since version 4.4.3 and no one from Atlassian is interested in fixing it since August 2013! I tried to fix the Bamboo plugin myself but Atlassian has some non-public Maven repositories so I couldn’t even build it. Given that the top four Java/Scala build tools are Ant, Maven, Gradle and SBT you could also say that Bamboo is somewhat 25% broken currently. Anyway a workaround is to use the Script Task in a Job and run SBT, which is what we do currently.

When I looked at our build there were basically two steps which took a long time. First we were creating a big one-jar (also called uber-jar sometimes). This is a single jar file that contains all compiled classes from all dependencies as well as our own classes. To create the uber-jar we used the sbt-assembly plugin which can run for a bit if you have a lot of dependencies. But actually you don’t need to have a single big jar file as you can add an entire directory to the Java classpath when starting an application. So I switched to a plugin called sbt-pack which dumps the jar files of all managed dependencies into a folder under target along with your project jar. This folder is then used later when building the RPM. Not using the sbt-assembly plugin to create a single uber-jar already saved us about 2 minutes of buildtime.

The second change was addressing creation of the actual RPM package. Previously we were using SBT native packager to assemble the RPM file. Unfortunately it was also not running very fast. Another big issue in Bamboo was that the sbt-native-packager logs some stuff on Std Error. This failed the build because Bamboo is scanning the build log for errors. (Our hack around this issue was to write a SBT task that logs 250 lines of “Build Successful” into the Bamboo log - what a mess). Today the RPM is build using fpm. On your Bamboo server you need to install fpm which is a Ruby Gem (gem install fpm). Then install Python and the fabric library.

And here is how we use fabric and fpm. In the root of your Scala project create a folder called build. Inside this folder store the following file:

You probably want to adapt projectname, packagename and the fpm settings to match your own project. To invoke the script during a build create a Script task in Bamboo that executes: fab -f build/ build. When the Script is executed from Bamboo it is looking for a file called version.txt in the build folder. The file version.txt need to be created upfront via SBT to propagate the project version to the Python script. This is what the custom rpmPrepare task does.

The rpmPrepare task reuses a SettingKey called branchName which contains the name of the branch in Github. The name of the RPM package will contain the branch name, so that you can build multiple branches of the same project in Bamboo in parallel without having to worry about version clashes. The branchName Setting in SBT is retrieved via either a system property or an environment variable called “branchName”. This variable is set from Bamboo. Each build plan in Bamboo is made of individual tasks and for a task you can set individual environment variables. So just add -DbranchName=${} and Bamboo will feed in the Github branch name into the task.

So after running the Python script you will have the RPM file in the WORK_DIR folder. For running Java command-line applications we use Supervisor. Here is an example how to invoke a Main class given that the RPM installs your project in /opt/projectname.

Publishing from SBT to Nexus

I am pretty new to SBT. Yesterday, for the first time, we wanted to publish the jar artifact of an in-house utility library into our private Nexus repository. This is an internal Nexus repository which we use mostly in Java projects build with Maven. While the task of publishing an artifact from SBT is well documented, it was not working right away. We hit some problems. Some answers to these problems we found on Stackoverflow, but some things we needed to figure out ourselves.

To prepare your build in SBT basically do these things. Add values for the publishTo Setting and the credentials Task. I recommend using a credentials file not under version control for obvious reasons. The first thing you want to verify is that you are using the correct “realm” value, which can be either a property in the credentials file or the first argument to the constructor of the Credentials class. Use curl to figure out the correct value as explained here. Send a POST to the Nexus repository which you want to publish to without any authentication arguments. For us this was the call.

Look for the WWW-Authenticate header and use the realm value. I think the default is “Sonatype Nexus Repository Manager”.

This was a step in the right direction but we still got the following error in SBT:

Not super useful but more info is actually available in the Nexus logfiles. Make sure you set the loglevel to DEBUG via the Nexus admin GUI first, then tail nexus.log while you try to publish from SBT. Here is some output in nexus.log, basically saying that SBT did not sent a value for username and password as part of the Basic Authentication.

And I was using the following build.sbt file:

After running a few tests, I figured out that the second argument to the sbt.Credentials class should only be the host and must not include the port – doh! After fixing this, everything works just fine. Another thing you want to check via the Nexus admin GUI is the Access Settings of your repository. For “Deployment Policy” we have set it to “Allow Redeploy”.

Dynamic Type System Trouble

This week I really learned to appreciate my Java compiler. I learned it the hard way – by not using it. In the last game that we released (Battlefield 4) I have implemented a feature for our players which suggests 3 game items to progress on, i.e. a weapon to unlock, an assignment that should be finished etc. Our internal name for this feature is “Suggestions”. A player would not only see these 3 items but also see his own progress towards reaching the suggestion goal of each item. The code that calculates the 3 items has become quite complex since there are a lot of different item types that we can pick from and we need to match each player individually. The code is written in Python, my favorite language at this point, which uses a dynamic type system.

The “Suggestions” feature was tested thoroughly and worked quite well in production. I implemented some additional functionality on top. Players now also had the opportunity to manually pick individual items so they could see their progress in the game and on our companion website Battlelog. Unfortunately after a few weeks players complained about strange problems. These players would see completely random items being suggested to them – even with the progression totally being off. In some cases, players got items suggested that they had completed or unlocked. These errors happened completely random. Not able to reproduce in any of our test systems. But it was happening mostly to players that played the game a lot. So I started to investigate.

No unit test was broken and also a long code review did not surface any problems. Fortunately we have very short release cycles. So I added some additional logging to this functionality, which was released to production earlier this week. This finally got me something! I could see that in some rare cases the function, which calculates the suggested items of a player, returns not just 3 but more: 4, 5, 6 sometimes 9 items! I am posting you a ridiculous simplified version of the code below. Try to spot the problem.

I should also tell you, that an instance of the SuggestionService is shared. The Service is used in an application which uses gevent. There are many Greenlets (lightweight Threads) which call the suggest method simultaneously. Ring ring – multithreading issue! The problem is in Line 10, where two parentheses are missing. Instead of creating an instance of the ProgressSuggestions class every time the suggest method is called, the code gets a reference to the ProgressSuggestions class and assigns it to a variable called progress. Then, on the first invocation, it dynamically adds a suggestions class field to that class. Something that would neither be possible nor compile in a statically typed language like Java. All Greenlets modify the same class instance, so player’s suggestions can overwrite each other. The simple fix is to create an instance of the ProgressSuggestions class as it was intended. I am surprised that this bug could live so long. In a real multithreaded application this would have affected much more players. Greenlets are only semi parallel. They must yield at a bad time to trigger this problem. Here is the correct version.

Akka and parameterized mutable Actor state

After doing just Python for almost one year, I am back on the JVM with some recent Scala projects. In one of the projects I had the chance to try Akka for the first time – which is an amazing library. In one of my Actors, and I think this is quite a common use case, I needed to run some initialization logic based on the Actors constructor arguments. During construction, the Actor would initialize an object that was expensive to create. This object would then be re-used in the receive method of the Actor.

I knew that Actor instances were shared, i.e. multiple calls to the receive method would be done on the same Actor object. So being new to Akka, I was afraid of having shared mutable state within my Actor and I was researching for a better way to do the initialization, other than just having a mutable field. This is when I found out about the FSM (Finite State Machine) trait. It is a perfect way to model initialization. I created two States for my Actor (if you want to do initialization in multiple Actors it’s a good idea to keep the common states, data holders and initialization messages in a separate object)

Individual states and data holder are then created in the individual Actors. The parent (supervisor) would then create the Actor, send an Initialize message which would in turn create the expensive object. The actor would then move itself to the next state and be ready to receive the further messages.

While this is a very nice way to model initialization one big problem became apparent – restarts. As soon as my Actor failed with an exception in the Initialized state, the parent Actor would restart it in the New state. This made the Actor pretty much unusable. One potential solution to this are probably the lifecycle methods. I could have overwritten the postRestart method in my Actor, where I have access to the constructor arguments, to send an initialization message to myself. But instead and against my gut feeling I decided to use a mutable field instead.

As I learned later, even though multiple threads share the same Actor instance, Akka guarantees that only a single thread will handle a message in the receive method at a time (also called The Actor subsequent processing rule). So now I set the mutable field to a None (Option type) and on the first message that arrives the field is initialized properly to a Some. This works fine but throws up some interesting questions. Since Akka is using Dispatchers (thread pools), subsequent messages in an Actor are most likely handled by different threads. In Java, changes to fields of shared objects done in one thread are not always visible to other threads (unless the field is volatile, the modification is done in a synchronized code section or in a section guarded by a Lock). Apparently this is not a problem for Akka.

In layman’s terms this means that changes to internal fields of the actor are visible when the next message is processed by that actor and you don’t need to make the fields volatile.

Unfortunately it is not further explained how Akka achieves this. The visibility problem DOES exist in Akka - if Actor's contain fields that are modified when receiving a message (i.e. some immutable field of ArrayBuffer where elements are added and removed in the receive method). In that case, how does Akka make sure that those changes are seen in other threads when the next message arrives? In my application at least, I had one issue which seemed to be a visibility problem. Unfortunately until now I wasn’t able to isolate and reproduce this problem in a unit test :( What I have so far (some parts need to be added).

Have to fill the gap and do the HTTP POST. What I have seen is a print indicating that a smaller batch has been pushed out – which can ultimately only be a visibility issue. My guess it that the culprit is either my asynchronous POST using the Dispatch library or the way clear() is implemented in the ArrayBuffer class. Further investigating. For now, this change got rid of the problem for me.

Generating REST docs with Scala and Finatra

More than 2 years ago I wrote a Blog post about Enunciate - a tool which helps you to generate a nice documentation for your REST API if you use Java and JAX-RS. I like documentation that exists very close to the code and is created and updated while you implement the main functionality. This kind of documentation has also been recommended in the Pragmatic Programmer book.

I have not been using JAX-RS and Servlet in a while. We are currently implementing most of our REST API’s on top of Finagle, a RPC System created in the Twitter software forge that runs on Netty. While it is possible to use Finagle directly together with Scala path matching for the routes, I could not find a clever way for self-updating documentation close to the code. Fortunately there is another Twitter project called Finatra, which puts a Sinatra-Flask alike web framework on top of Finagle. Finatra will not only make it easier to define Resources and Routes but also help you with the documentation.

Here is a how you typically define a route in Finatra:

For the documentation itself I am using Swagger, which can generate HTML from annotations. Swagger already comes with a bunch of useful annotations. Unfortunately some annotations like a @Path equivalent was missing, so I was forced to use some JSR-311 (JAX-RS) instead, even though we are not using JAX-RS for the API. Here is the evolution of the Finatra controller from above with the Swagger and JSR-311 annotations added. As you can see it was necessary to move the routes from the constructor into separate methods that can be annotated. This makes the Scala code a bit uglier and harder to read, especially if you have a lot of annotations in place. But hey, you will love the outcome.

The final step is to generate the documentation during our Maven build. We are using the maven-swagger-plugin for that. I even copied and customized the strapdown.html.mustache from the plugin into our project, so that we could tweak the generated documentation and use another Twitter Bootstrap theme instead.

The outcome will be a generated docs.html file in the target folder of your build. The docs.html will contain as path - which was specified in the maven-swagger-plugin. I normally replace “” with JavaScript (something that can easily be done if you use your own Mustache template). Also it is nice to have Finatra render the docs.html file.

Embedded Cassandra with Python and Java

Testing is important. When developing applications based on Cassandra and Java, you have a lot of options that help you testing your code during development. Unfortunately when using Python it is not as great. For instance there is no straight-forward solution to start an embedded Cassandra server from Python, that your unitests (or rather integration tests) can communicate with. The good news is, starting a Java process from Python code is dead easy. Using this hybrid-approach, we can easily write Cassandra integrated unittests under Python.

First of all I took the existing cassandra-unit library and tweaked it. Normally when starting embedded Cassandra via cassandra-unit, you specify a configuration file (cassandra.yaml) and optionally a temporary directory. Cassandra-unit would then load the file as a classpath resource and copy the contents to a new file into the specified temp directory (default target/embeddedCassandra). Hence the file has to be on the classpath and the path has to be relative. I thought it was much nicer if you could send instead an absolute path and the configuration file was used directly from where it was located. So from Python we could later modify the Cassandra config file the way we wanted and also put it in the final location. So the first thing you want to do is to clone the tweaked embedded-cassandra-starter code from git and create a jar artifact by running (yes you need to use Maven)

The outcome of this will be a jar file that you can put into your Python project, i.e. under resources/cassandra. The next thing you need is a vanilla Cassandra configuration file (cassandra.yaml) in your project. We also have that one checked in along with the jar file, i.e. under resource/cassandra and we called it cassandra.yaml.template

We use the .template extension because the file contains two placeholders ({{CASSANDRA_DIR}} and {{CASSANDRA_PORT}}) which will be replaced later. A great co-worker of mine then wrote a Python class called EmbeddedCassandra. This class will in its __init__ method find an available port and create a random directory in the systems temporary directory (let’s call it work directory for now). EmbeddedCassandra also has a start and a stop method. The start method will copy the configuration template file into the work directory and replace the two placeholders mentioned above. Finally it will start a new Java process using using the subprocess module. It will basically invoke the jar file that we build earlier in the same way as you would from the command-line (Example can be found here). The stop method in EmbeddedCassandra will bring down the process and do some cleanup.

All of this is now wrapped into a EmbeddedCassandraTestCase class, which acts as a base class for unit tests that want to test against Cassandra. This class invokes start and stop in its setup and tearDown method.

So now you are able to write some nice Python unittests (rather integration tests) against Cassandra, for instance using the great Pycassa library. Here is a simple example.

Investigating Cassandra Heap

We are working on a new application which will use Apache Cassandra. Yesterday a co-worker sent me the following warning, which we kept seeing in the logs every now and then on several nodes. I was asked if this was something to worry about.

WARN [ScheduledTasks:1] 2013-01-07 12:14:10,865 (line 145) Heap is 0.8336618755935529 full. You may need to reduce memtable and/or cache sizes. Cassandra will now flush up to the two largest memtables to free up memory. Adjust flush_largest_memtables_at threshold in cassandra.yaml if you don't want Cassandra to do this automatically.

The warning is a bit misleading as you will see in a bit - but hey, using 83% of your JVM heap memory should always ring at least some alarm bells. Since I haven’t used Cassandra that much, I needed to investigate how it uses its heap memory. We are using Datastax Community Edition 1.1.x, so the first place to look for more information was Opscenter. Bu it didn’t give me much information about the heap. Next I went into one cluster node via SSH to see if I could get out some stats via JMX, as I was suspecting a big cache to be the problem. For the first time I used jmxterm instead of commandline-jmxclient. So to get some numbers for Cassandras key and row cache via JMX you can do this:

Obviously we were running defaults for the 2 caches. The key cache was very small and the row cache not even enabled. By default Cassandra 1.1 assigns 5% of the JVM heap memory to the key cache, though never more than 100 MB. As next step I wanted to find out how the heap memory was actually used. So I ran jmap -heap `pgrep java` as explained here. Make sure you have only 1 java process running otherwise feed in the pid manually to jmap. Note: doing a heap dump to file wasn't such a great idea. It stopped after about 20 minutes. At this point the dump file was 2.7 GB big and the node had already left the cluster.

Apparently 2.8 GB of our 4 GB heap were used in the old generation (also called concurrent mark and sweep generation if a CMS GC algorithm is in use). The old generation contains objects that have survived a couple of collections in the various stages of the young generation. After reading this blog post about Cassandra GC tuning and this description from Oracle, I was thinking that the old generation might be filled because the JVM never did a major collection. Apparently if –XX: CMSInitiatingOccupancyFraction is not changed via the JAVA_OPTS, a major collection would only be issued at approximately 92% of usage. So if Cassandra was flushing the largest memtable every time at 0.75 percent (default value for flush_largest_memtables_at in cassandra.yaml) it would free heap memory therefore preventing a concurrent major collection.

Then however I realized that we were still running with the default value for memtable_total_space_in_mb, which is the only setting for memtables since Cassandra 1.0. The default is to use a maximum of 1/3 of the JVM heap. So something else was eating up the heap memory, not memtables. So Cassandra dropping the largest memtable at 75% seems kind of desperate in our scenario. So with caching and memtables not being the culprits, what else was left? It turned out the bloom filter for the amount of data and the number of nodes we have, was getting very big. Our test cluster has 6 nodes and the total data size is around 400 GB. Cassandra uses a bloom filter in front of its SSTables to check if a row exists before it does disk IO. This is an extra layer that, if tuned properly, can make Cassandra access to column families more efficient because disk IO is slow. A bloom filter is a probabilistic data structure. It can give you false positives, meaning it will tell you a record exists in an SSTable but it does not. It will however never tell you a record does not exist while it exists in reality (false negative).

The false positives ratio can be tuned using the bloom_filter_fp_chance parameter in cassandra.yaml. We were running default of 0.1 for this parameter, which I think accounts for a 10% chance of a false positive. The value can be anything between 0 and 1. Well nothing is for free and having a better bloom filter increases the size of the data structure.

The bloom filter is defined per column family. So one way to bring down the size of a bloom filter in Cassandra, is to evaluate your column families. Column families which are not getting a lot of read requests should be fine without a effective bloom filter. Another possibility is to add more nodes to the cluster, so that each node maintains less data therefore bringing also down the size of the bloom filter. Finally here is some good news for Cassandra 1.2 (still waiting for the Datastax release for 1.2). The bloom filter can run off-heap since Cassandra 1.2. For this to work you need to enable Java Native Access (JNA), which isn’t done by default when installing Cassandra (even when installing from the Debian packages from what I heard). Running the bloom filter off-heap will solve your immediate heap problems. As far as I know it is not recommended to run Cassandra with more than 8GB of heap memory. However you still need to tune your bloom filter in regards to data size, number of nodes and false positives ratio. Otherwise you might run out of system memory. Finally also tuning the CMS garbage collection is useful. I think we will set it up to be incremental.