Best language features in Kotlin (for me)

I have been doing Java and Scala for many years. Recently my team switched to Kotlin and everybody is enjoying it so far. In this blog post I want to share a personal selection of language features that I really like.

Smart casts on polymorphic collections


Kotlin has a super concise and safe way to narrow down a polymorphic list to a specific sub-type using the as? syntax.

interface Animal {
val name: String
}
data class Monkey(override val name: String): Animal
data class Fish(override val name: String): Animal
val animals: List<Animal> = listOf(Monkey("gorilla"), Fish("trout"), Monkey("oranutan"))
val monkeys: List<Monkey> = animals.mapNotNull { it as? Monkey }


Logging return values


There are situations, where we want to log the return value of a function before actually returning it. Usually, this is done by storing the return value into a variable, doing the log statement and finally returning that variable. Kotlin makes this is a bit simpler with the .also keyword for side-effects. Using .also, we don't have to introduce this artificial variable.

class Service(dependency: Dependency) {
private val logger = KotlinLogging.logger {}
fun doSomething(userId: String): Int {
return dependency.evaluate(userId = userId).also { result ->
logger.trace { "UserId '$userId' was evaluated to: $result" }
}
}
}


... more to come

DynamoDBLocal and UnsatisfiedLinkError in Gradle

This week I started to work on a new project using DynamoDB. As always, I’d like to write some integration tests, to verify my datastore integration works as intended. I know one cheap way to test DynamoDB, is using containers and LocalStack. However, I decided to go even simpler and give DynamoDBLocal a spin. This is just a library, for your tests to depend on. Super easy to integrate in gradle or maven projects.

Unfortunately using DynamoDBLocal is not as straightforward. Relatively soon you might hit a UnsatisfiedLinkError related to SQLLite - similar to:







To fix this in a gradle build, we modified our test task, to copy some binaries around and also make a system property available to the tests.

dependencies {
...
testCompile "com.amazonaws:DynamoDBLocal:1.11.477"
}
task copyDependencies(type: Copy) {
from(configurations.compile + configurations.testCompile) {
include '*.dll'
include '*.dylib'
include '*.so'
}
into 'build/libs'
}
test {
dependsOn copyDependencies
systemProperty 'sqlite4java.library.path', 'build/libs'
useJUnitPlatform()
}
view raw build.gradle hosted with ❤ by GitHub

One last pitfall might be your IntelliJ IDE. Delete all existing test configurations and make sure to run your tests using Gradle Runner.

ThriftMux Unit Test Roundtrip

I have decided to write this blog post mainly for my own reference. Today I was working on a Thrift service that I wanted to start and test within a unit test. All our services are using the Finagle RPC framework with ThriftMux as transport. I will show you how to find a free port on your machine to start the server on and then create a client to invoke that server. Given this Thrift IDL file.

namespace com.github.reikje
service MicroService {
string request()
}


using Scrooge or the scrooge-sbt-plugin, this is a ScalaTest that does exactly that.

package com.github.reikje
import java.net.{InetAddress, InetSocketAddress}
import com.twitter.finagle.{Address, Name, ThriftMux, param}
import com.twitter.util.{Await, Future}
import org.scalatest.{Matchers, WordSpec}
class RoundtripSpec extends WordSpec with Matchers {
"Microservice" should {
"respond to requests" in {
val service = new MicroServiceImpl("foo")
val server = ThriftMux.server
.configured(param.Label("zuul-thrift"))
.serveIface(new InetSocketAddress(InetAddress.getLoopbackAddress, 0), service)
val client = ThriftMux.client.newIface[MicroService.FutureIface](
Name.bound(Address(server.boundAddress.asInstanceOf[InetSocketAddress])), "microservice-client"
)
val response = Await.result(client.request())
response shouldBe "foo"
}
}
}
class MicroServiceImpl(response: String) extends MicroService.FutureIface {
override def request(): Future[String] = Future.value(response)
}

Dynamic Per-Request Log Level

Looks like it took me three years to write something new for this blog. Kids and other cool stuff got in the way - you know the score! I also should say, that I haven't done anything in Java in the past three years (almost). So to be more accurate, I should rename the blog to jvmsplitter.blogspot.com but whatever - I just drop my Scala blog posts here anyways!

This week I implemented a great idea that a co-worker came up with. Have you ever been in a situation, where you have a running system with a bunch of microservices in production and suddenly something doesn't work as expected? In the world of Battlefield, examples could be that players cannot purchase Battlepacks on PS4 or that matchmaking stopped working. So how do you find the root cause of the problem? Right, you want to look at some logs to get additional information. Only problem is, that your log level in production is usually quite high, for instance WARN or even ERROR - otherwise the amount of logging would just be too much. Wouldn't it be great to alter the log level dynamically on a per-request basis? This would allow you to test in production using TRACE logging – for just your test user. Finagle contexts to the rescue!

Here at DICE we have build all our Scala backend services based on Twitters Finagle framework – which is similar to the Tokio Framework in Rust if you have used that. In a nutshell Finagle is a RPC framework on the JVM with build-in support for various transport protocols, load balancing, service discovery, backoff etc. One semi-hidden feature of Finagle is the broadcast context. Think of the broadcast context as a ThreadLocal that is send along with every request through an RPC graph - from microservice to microservice. Finagle itself uses this internally, for instance to send a unique trace id along with each request. In my implementation, I have used the broadcast context to allow for a per-request log level override. Let's get our hands dirty! The first thing you want to implement is a new Key that Finagle can send over the RPC graph.

package com.github.rschatz
import com.twitter.finagle.context.Contexts
import com.twitter.io.{Buf, Charsets}
import com.twitter.util.Try
import com.github.rschatz.LogLevelName.LogLevelName
/**
* Encapsulates the name of a log level override.
*
* @author rschatz
*/
case class LogLevelOverride(logLevelName: LogLevelName) {
def let[T](f: => T): T = Contexts.broadcast.let(LogLevelOverride, this)(f)
}
object LogLevelName extends Enumeration {
type LogLevelName = Value
val TRACE, DEBUG, INFO, WARN, ERROR = Value
}
object LogLevelOverride extends Contexts.broadcast.Key[LogLevelOverride]("com.github.rschatz.LogLevelOverride") {
def current: Option[LogLevelOverride] = Contexts.broadcast.get(LogLevelOverride)
override def marshal(logLevelOverride: LogLevelOverride): Buf = {
Buf.ByteArray.Owned(logLevelOverride.logLevelName.toString.getBytes(Charsets.Utf8))
}
override def tryUnmarshal(buf: Buf): Try[LogLevelOverride] = {
Try {
val bytes = Buf.ByteArray.Owned.extract(buf)
LogLevelOverride(LogLevelName.withName(new String(bytes, Charsets.Utf8)))
}
}
}


Essentially each Key need to implement two methods marshal and unmarshal, so that Finagle knows how to convert the Key from and to a Byte Array. I am not sharing this code here, but if you want to see how to unit test your code, Finagle has an example. No that we have a class for the log level override defined, we need code to set the override into as well as code to read the override from the broadcast context.

In most system architectures you have one system on the outside of your cluster. Here at DICE we call this system the gateway and it is the only service that is accessible from the public internet. All requests arrive at the gateway and it is the root node in the RPC graph. In other words, the gateway calls other microservices, which might call other microservices and so on. The most logical choice to define a log level override would be inside a Finagle Filter. I haven't actually written the Filter yet but it would look similar to this.

package com.github.rschatz
import com.twitter.finagle.http._
import com.twitter.finagle.{Service, SimpleFilter}
import com.twitter.util.Future
import com.github.rschatz.LogLevelOverride
import com.github.rschatz.LogLevelName
class LogLevelOverrideFilter extends SimpleFilter[Request, Response] {
override def apply(request: Request, service: Service[Request, Response]): Future[Response] = {
val logLevelOverride: Option[LogLevelName] = // figure out if we should override
logLevelOverride match {
case Some(logLevelName) =>
LogLevelOverride(logLevelName).let {
service(request)
}
case _ =>
service(request)
}
}
}


You have to be very careful with the Filter as this code is executed for every request entering your system! Now that we have code to set a log level override into the broadcast context, let's actually use it somewhere. To make this a seamless as possible for the developers, it is helpful if all your microservices share the same logging setup. For instance do we use slf4j with logback and the LoggerContext is set up programmatically inside a trait that every microservice is using (btw. our services follow the Twitter Server template).

package com.github.rschatz
import com.twitter.app.App
// other imports
trait LoggingSetup { self: App =>
private val root = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME).asInstanceOf[Logger]
private val context = root.getLoggerContext
private[this] val logLevel = flag(
"log.level",
"info",
"Default log level (root logger). One of {trace, debug, info, warn, error}"
)
init {
context.reset()
context.setName("whatever")
context.addTurboFilter(BroadcastContextOverrideFilter) // explained below
}
premain {
val level = // convert logLevel() to a ch.qos.logback.classic.Level
root.setLevel(level)
// setup appenders etc.
}
onExit {
// stop context
}
}


As you can already guess now, reading from the broadcast context and actually using the override is wrapped inside a logback TurboFilter. Logback consults the filter for every log event and you can use this to decide if something should be logged or not. The following filter reads from the broadcast context and then makes a decision based on a potential override.

package com.github.rschatz
import ch.qos.logback.classic.turbo.TurboFilter
import ch.qos.logback.classic.{Level, Logger, LoggerContext}
import ch.qos.logback.core.spi.FilterReply
import org.slf4j.{LoggerFactory, Marker}
import com.github.rschatz.{LogLevelName, LogLevelOverride}
object BroadcastContextOverrideFilter extends TurboFilter {
override def decide(marker: Marker, logger: Logger, level: Level, format: String, params: Array[AnyRef], t: Throwable): FilterReply = {
LogLevelOverride.current.map { logLevelOverride =>
if (logLevelOverride.logLevelName == LogLevelName.ERROR && level.isGreaterOrEqual(Level.ERROR)) {
FilterReply.ACCEPT
} else if (logLevelOverride.logLevelName == LogLevelName.WARN && level.isGreaterOrEqual(Level.WARN)) {
FilterReply.ACCEPT
} else if (logLevelOverride.logLevelName == LogLevelName.INFO && level.isGreaterOrEqual(Level.INFO)) {
FilterReply.ACCEPT
} else if (logLevelOverride.logLevelName == LogLevelName.DEBUG && level.isGreaterOrEqual(Level.DEBUG)) {
FilterReply.ACCEPT
} else if (logLevelOverride.logLevelName == LogLevelName.TRACE) {
FilterReply.ACCEPT
} else {
// log event uses a level below the specified override
FilterReply.NEUTRAL
}
}.getOrElse {
// no override in broadcast context
FilterReply.NEUTRAL
}
}
override def isStarted: Boolean = true
}


Conclusion: you can use Finagles broadcast context to transport a log level override through an RPC graph. You need some service to set the override in the context. It is helpful if this system is on the outside of your architecture and preferably uses HTTP. With HTTP it is easy to write a Finagle Filter and base the override on the HTTP request, i.e. by looking at the HTTP headers. Finagle transports the override magically through your RPC call graph and any microservice can use the override for logging decisions. To make this as simple as possible encapsulate this decision logic in some code that is shared between all your microservices.

Two Scala Serialization Examples

In the last two days I’ve been looking into ways to serialize and deserialize some Scala objects. I tested a few suggestions that were mentioned on this post on Stackoverflow. As a reference for myself (and because sometimes it is hard to find good examples) I am adding two examples for Scala Pickling and Twitter Chill. Let’s have a basic SBT project first.

import sbt._
import sbt.Keys._
organization := "me"
name := "scala-ser"
version := "0.1-SNAPSHOT"
scalaVersion := "2.10.4"
resolvers += Resolver.sonatypeRepo("snapshots")
libraryDependencies ++= Seq(
"com.twitter" %% "chill" % "0.3.6",
"org.scalamacros" % "quasiquotes_2.10.4" % "2.0.0-M6",
"org.scala-lang" %% "scala-pickling" % "0.8.0-SNAPSHOT"
)
view raw build.sbt hosted with ❤ by GitHub


Since I work with the Battlefield franchise let’s create some domain classes that we are going to serialize and deserialize.

package me
import java.util.UUID
import scala.util.Random
class WeaponAccessory(
var guid: String,
var slug: Option[String],
var nameSID: Option[String],
var descriptionSID: Option[String],
var categorySID: Option[String],
var requirements: Option[List[UnlockRequirement]],
var weaponData: Option[WeaponData]) {
def this() = this("", None, None, None, None, None, None)
}
class UnlockRequirement(
var requirementType: RequirementType,
var valueNeeded: Float,
var codeNeeded: String) {
def this() = this(Bucket, 0.0F, "")
}
class WeaponData(
var statDamage: Option[Float],
var statAccuracy: Option[Float],
var statMobility: Option[Float],
var statRange: Option[Float],
var statHandling: Option[Float]) {
def this() = this(None, None, None, None, None)
}
sealed trait RequirementType {
def name() = getClass.getSimpleName
}
object Bucket extends RequirementType
object Create {
def randomWeaponAccessory() = {
val accessory = new WeaponAccessory()
accessory.guid = "%s".format(UUID.randomUUID())
accessory.slug = randomOption(randString(10))
accessory.nameSID = randomOption(randString(25))
accessory.descriptionSID = randomOption(randString(25))
accessory.categorySID = randomOption(randString(25))
accessory.requirements = Some(
List.fill(Random.nextInt(5))(randomRequirement())
)
accessory.weaponData = randomOption(randomWeaponData())
accessory
}
private def randomRequirement() = {
val requirement = new UnlockRequirement()
requirement.requirementType = Bucket
requirement.codeNeeded = randString(5)
requirement.valueNeeded = Random.nextFloat()
requirement
}
private def randomWeaponData() = {
val weaponData = new WeaponData()
weaponData.statAccuracy = randomOption(Random.nextFloat())
weaponData.statDamage = randomOption(Random.nextFloat())
weaponData.statHandling = randomOption(Random.nextFloat())
weaponData.statMobility = randomOption(Random.nextFloat())
weaponData.statRange = randomOption(Random.nextFloat())
weaponData
}
private def randomOption[T](value: T) = {
if (Random.nextBoolean()) {
Some(value)
} else {
None
}
}
private def randString(x: Int) = Random.alphanumeric.take(x).mkString
}
view raw Items.scala hosted with ❤ by GitHub


The first candidate will be Scala Pickling. The following code pickles a List of 3000 random WeaponAccessory instances.

package me
import scala.pickling._
import json._
object Pickling {
def main(args: Array[String]) {
val items = List.fill(3000)(Create.randomWeaponAccessory())
val pckl = items.pickle
val json = pckl.value
val jsonPckl = new JSONPickle(json)
val deser = jsonPckl.unpickle[List[WeaponAccessory]]
assert(deser.size == 3000)
}
}
view raw Pickling.scala hosted with ❤ by GitHub


Unfortunately the code doesn't even compile properly. Scala Pickling uses Macros and advanced Scala compile features. Trying to compile Pickling.scala fails during compilation. Also people are encouraged to depend on a SNAPSHOT version which means you are always depending on the latest patches. When I wrote this blog post I hit this issue. Verdict: scala-pickling is very easy to use and works great for very simple stuff. As soon as your object graph gets a bit more complicated you will hit weird errors. Another problem is the lack of a non-SNAPSHOT version.

The seconds test candidate was Twitter Chill which is based on Kryo. chill-scala adds some Scala specific extensions. Your SBT project should depend on chill directly, which contains the code in chill-scala (which isn’t published separately). Even though they don’t have Scala examples in their Github documentation and I got some cryptic errors first when doing stuff wrong - I have to say this is an awesome library that works great! Also the authors reply fast on Twitter. Verdict: highly recommended!

package me
import com.twitter.chill.ScalaKryoInstantiator
import java.io.ByteArrayOutputStream
import com.esotericsoftware.kryo.io.{Input, Output}
object Chill {
def main(args: Array[String]) {
val items = List.fill(3000)(Create.randomWeaponAccessory())
val instantiator = new ScalaKryoInstantiator
instantiator.setRegistrationRequired(false)
val kryo = instantiator.newKryo()
val baos = new ByteArrayOutputStream
val output = new Output(baos, 4096)
kryo.writeObject(output, items)
val input = new Input(baos.toByteArray)
val deser = kryo.readObject(input, classOf[List[WeaponAccessory]])
assert(deser.size == 3000)
}
}
view raw Chill.scala hosted with ❤ by GitHub

SBT and faster RPM packaging

We do a lot of Scala coding nowadays and I am trying to introduce SBT as build tool to all our new Scala projects. When we deploy these applications to Amazon EC2 nodes, we use Chef Solo and the Instance User Data feature to install an RPM file. We don’t use custom AMI’s. The RPM file is hosted in S3 and made available as package via this yum plugin. Each time we build our project via our continuous integration server (Bamboo), a new RPM package is created and uploaded to S3.

It became more and more of a problem that building that particular application in Bamboo took a long time. The build plan ran for more than 10 minutes. So yesterday I spent some time to make it build a bit faster.

First of all I have to say it is pretty lame that the SBT plugin is broken in Bamboo since version 4.4.3 and no one from Atlassian is interested in fixing it since August 2013! I tried to fix the Bamboo plugin myself but Atlassian has some non-public Maven repositories so I couldn’t even build it. Given that the top four Java/Scala build tools are Ant, Maven, Gradle and SBT you could also say that Bamboo is somewhat 25% broken currently. Anyway a workaround is to use the Script Task in a Job and run SBT, which is what we do currently.

When I looked at our build there were basically two steps which took a long time. First we were creating a big one-jar (also called uber-jar sometimes). This is a single jar file that contains all compiled classes from all dependencies as well as our own classes. To create the uber-jar we used the sbt-assembly plugin which can run for a bit if you have a lot of dependencies. But actually you don’t need to have a single big jar file as you can add an entire directory to the Java classpath when starting an application. So I switched to a plugin called sbt-pack which dumps the jar files of all managed dependencies into a folder under target along with your project jar. This folder is then used later when building the RPM. Not using the sbt-assembly plugin to create a single uber-jar already saved us about 2 minutes of buildtime.

The second change was addressing creation of the actual RPM package. Previously we were using SBT native packager to assemble the RPM file. Unfortunately it was also not running very fast. Another big issue in Bamboo was that the sbt-native-packager logs some stuff on Std Error. This failed the build because Bamboo is scanning the build log for errors. (Our hack around this issue was to write a SBT task that logs 250 lines of “Build Successful” into the Bamboo log - what a mess). Today the RPM is build using fpm. On your Bamboo server you need to install fpm which is a Ruby Gem (gem install fpm). Then install Python and the fabric library.

And here is how we use fabric and fpm. In the root of your Scala project create a folder called build. Inside this folder store the following file:

#!/usr/bin/env python
import os
import os.path
import shutil
from fabric.api import *
versionNumber = ""
with open('build/version.txt', 'r') as f:
versionNumber = f.readline()
RPM_INCLUDE="projectname"
INSTALL_DIR = "/opt"
WORK_DIR = os.getcwd()
BUILD_NUMBER = os.getenv('BUILD_NUMBER', '1')
@task
def build():
""" Creates a build with an optional RPM package """
print("Starting build %s... in %s" % (BUILD_NUMBER, WORK_DIR))
# folder created by the sbt-pack plugin
pack_dir = os.path.join(WORK_DIR, 'target/pack')
# folder containing all jars
lib_dir = os.path.join(pack_dir, 'lib')
# project folder which be installed to /opt/projectname later
opt_app_folder = os.path.join(pack_dir, 'projectname')
if not os.path.exists(opt_app_folder):
print("Creating %s" % opt_app_folder)
os.makedirs(opt_app_folder)
# move all jars into the project folder
for file in os.listdir(lib_dir):
print("Moving %s to %s" % (file, opt_app_folder))
shutil.move(os.path.join(lib_dir, file), opt_app_folder)
print("Creating RPM...")
# delete old RPM files if any
for file in os.listdir(WORK_DIR):
if ".rpm" in file:
os.remove(file)
# creates an RPM using fpm, aborts if problems
out = local('fpm -a all -s dir -t rpm -C %s --vendor "VENDOR NAME HERE" --prefix %s \
--license "(C) 2011-2014, License Statement Bla Bla. All Rights Reserved." \
--url "http://www.yourcompany.se/" --maintainer "<mail@address.com>" \
--description "Project Name" \
-n "packagename" -v %s --iteration %s %s' % (pack_dir, INSTALL_DIR, versionNumber, BUILD_NUMBER, RPM_INCLUDE))
if out.failed:
abort("RPM creation failed, check logs")
view raw fabfile.py hosted with ❤ by GitHub


You probably want to adapt projectname, packagename and the fpm settings to match your own project. To invoke the script during a build create a Script task in Bamboo that executes: fab -f build/fabfile.py build. When the Script is executed from Bamboo it is looking for a file called version.txt in the build folder. The file version.txt need to be created upfront via SBT to propagate the project version to the Python script. This is what the custom rpmPrepare task does.

lazy val branchName = settingKey[String]("The name of the Github branch which will prefix the RPM")
branchName := sys.props.getOrElse("branchName", sys.env.getOrElse("branchName", "master"))
lazy val rpmPrepare = taskKey[Unit]("Executes steps which are needed before the RPM can be build")
rpmPrepare := {
val dir = Paths.get("build")
val versionFile = dir.resolve("version.txt")
Files.deleteIfExists(versionFile)
Files.createFile(versionFile)
Some(new PrintWriter(versionFile.toFile)).foreach{p => p.write("%s_%s".format(branchName.value, version.value)); p.close()}
}
view raw build.sbt hosted with ❤ by GitHub


The rpmPrepare task reuses a SettingKey called branchName which contains the name of the branch in Github. The name of the RPM package will contain the branch name, so that you can build multiple branches of the same project in Bamboo in parallel without having to worry about version clashes. The branchName Setting in SBT is retrieved via either a system property or an environment variable called “branchName”. This variable is set from Bamboo. Each build plan in Bamboo is made of individual tasks and for a task you can set individual environment variables. So just add -DbranchName=${bamboo.repository.branch.name} and Bamboo will feed in the Github branch name into the task.

So after running the Python script you will have the RPM file in the WORK_DIR folder. For running Java command-line applications we use Supervisor. Here is an example how to invoke a Main class given that the RPM installs your project in /opt/projectname.

[program:projectname]
stdout_logfile=/var/log/projectname/projectname.log
command=/usr/java/latest/bin/java -cp "/opt/projectname/*" me.MainClass
directory=/opt/projectname
autostart=true
autorestart=true
redirect_stderr=true
view raw project.ini hosted with ❤ by GitHub

Publishing from SBT to Nexus

I am pretty new to SBT. Yesterday, for the first time, we wanted to publish the jar artifact of an in-house utility library into our private Nexus repository. This is an internal Nexus repository which we use mostly in Java projects build with Maven. While the task of publishing an artifact from SBT is well documented, it was not working right away. We hit some problems. Some answers to these problems we found on Stackoverflow, but some things we needed to figure out ourselves.

To prepare your build in SBT basically do these things. Add values for the publishTo Setting and the credentials Task. I recommend using a credentials file not under version control for obvious reasons. The first thing you want to verify is that you are using the correct “realm” value, which can be either a property in the credentials file or the first argument to the constructor of the Credentials class. Use curl to figure out the correct value as explained here. Send a POST to the Nexus repository which you want to publish to without any authentication arguments. For us this was the call.

curl -X POST http://10.20.108.220:8081/nexus/content/repositories/releases -v
view raw nexus_curl.sh hosted with ❤ by GitHub


Look for the WWW-Authenticate header and use the realm value. I think the default is “Sonatype Nexus Repository Manager”.

This was a step in the right direction but we still got the following error in SBT:

[trace] Stack trace suppressed: run last *:publish for the full output.
[error] (*:publish) java.io.IOException: Access to URL http://10.20.108.220:8081/nexus/content/repositories/releases/se/foo/operations/nexustest_2.10/0.1/nexustest_2.10-0.1.pom was refused by the server: Unauthorized


Not super useful but more info is actually available in the Nexus logfiles. Make sure you set the loglevel to DEBUG via the Nexus admin GUI first, then tail nexus.log while you try to publish from SBT. Here is some output in nexus.log, basically saying that SBT did not sent a value for username and password as part of the Basic Authentication.

2014-02-12 13:03:53 DEBUG DefaultSessionManager - Unable to resolve session ID from SessionKey. Returning null to indicate a session could not be found.
2014-02-12 13:03:53 DEBUG NexusContentAuthenticationFilter - No authorization found (header or request parameter)
2014-02-12 13:03:53 DEBUG NexusContentAuthenticationFilter - No authorization found (header or request parameter)
2014-02-12 13:03:53 DEBUG NexusContentAuthenticationFilter - Attempting to authenticate Subject as Anonymous request...
And I was using the following build.sbt file:

// next line contains the error
credentials += Credentials("Sonatype Nexus Repository Manager", "10.20.108.220:8081", "username", "password")
publishTo := {
val nexus = "http://10.20.108.220:8081/nexus/"
if (version.value.trim.endsWith("SNAPSHOT"))
Some("snapshots" at nexus + "content/repositories/snapshots")
else
Some("releases" at nexus + "content/repositories/releases")
}


After running a few tests, I figured out that the second argument to the sbt.Credentials class should only be the host and must not include the port – doh! After fixing this, everything works just fine. Another thing you want to check via the Nexus admin GUI is the Access Settings of your repository. For “Deployment Policy” we have set it to “Allow Redeploy”.

Dynamic Type System Trouble

This week I really learned to appreciate my Java compiler. I learned it the hard way – by not using it. In the last game that we released (Battlefield 4) I have implemented a feature for our players which suggests 3 game items to progress on, i.e. a weapon to unlock, an assignment that should be finished etc. Our internal name for this feature is “Suggestions”. A player would not only see these 3 items but also see his own progress towards reaching the suggestion goal of each item. The code that calculates the 3 items has become quite complex since there are a lot of different item types that we can pick from and we need to match each player individually. The code is written in Python, my favorite language at this point, which uses a dynamic type system.

The “Suggestions” feature was tested thoroughly and worked quite well in production. I implemented some additional functionality on top. Players now also had the opportunity to manually pick individual items so they could see their progress in the game and on our companion website Battlelog. Unfortunately after a few weeks players complained about strange problems. These players would see completely random items being suggested to them – even with the progression totally being off. In some cases, players got items suggested that they had completed or unlocked. These errors happened completely random. Not able to reproduce in any of our test systems. But it was happening mostly to players that played the game a lot. So I started to investigate.

No unit test was broken and also a long code review did not surface any problems. Fortunately we have very short release cycles. So I added some additional logging to this functionality, which was released to production earlier this week. This finally got me something! I could see that in some rare cases the function, which calculates the suggested items of a player, returns not just 3 but more: 4, 5, 6 sometimes 9 items! I am posting you a ridiculous simplified version of the code below. Try to spot the problem.

class ProgressSuggestions(TransportModel):
def __init__(self):
self.suggestions = []
class SuggestionService(object):
def suggest(self, playerId):
current = DB.load(playerId)
currentIds = [current.sug1, current.sug2, current.sug3]
progress = ProgressSuggestions
progress.suggestions = []
for index, suggestedId in enumerate(currentIds):
suggestedItem = DB.loadItem(suggestedId)
if not suggestedItem.stillExists():
suggestedItem = self.calculateNew(playerId)
progress.suggestions.append(suggestedItem)
return progress


I should also tell you, that an instance of the SuggestionService is shared. The Service is used in an application which uses gevent. There are many Greenlets (lightweight Threads) which call the suggest method simultaneously. Ring ring – multithreading issue! The problem is in Line 10, where two parentheses are missing. Instead of creating an instance of the ProgressSuggestions class every time the suggest method is called, the code gets a reference to the ProgressSuggestions class and assigns it to a variable called progress. Then, on the first invocation, it dynamically adds a suggestions class field to that class. Something that would neither be possible nor compile in a statically typed language like Java. All Greenlets modify the same class instance, so player’s suggestions can overwrite each other. The simple fix is to create an instance of the ProgressSuggestions class as it was intended. I am surprised that this bug could live so long. In a real multithreaded application this would have affected much more players. Greenlets are only semi parallel. They must yield at a bad time to trigger this problem. Here is the correct version.

class ProgressSuggestions(TransportModel):
def __init__(self):
self.suggestions = []
class SuggestionService(object):
def suggest(self, playerId):
current = DB.load(playerId)
currentIds = [current.sug1, current.sug2, current.sug3]
progress = ProgressSuggestions()
progress.suggestions = []
for index, suggestedId in enumerate(currentIds):
suggestedItem = DB.loadItem(suggestedId)
if not suggestedItem.stillExists():
suggestedItem = self.calculateNew(playerId)
progress.suggestions.append(suggestedItem)
return progress

Akka and parameterized mutable Actor state

After doing just Python for almost one year, I am back on the JVM with some recent Scala projects. In one of the projects I had the chance to try Akka for the first time – which is an amazing library. In one of my Actors, and I think this is quite a common use case, I needed to run some initialization logic based on the Actors constructor arguments. During construction, the Actor would initialize an object that was expensive to create. This object would then be re-used in the receive method of the Actor.

I knew that Actor instances were shared, i.e. multiple calls to the receive method would be done on the same Actor object. So being new to Akka, I was afraid of having shared mutable state within my Actor and I was researching for a better way to do the initialization, other than just having a mutable field. This is when I found out about the FSM (Finite State Machine) trait. It is a perfect way to model initialization. I created two States for my Actor (if you want to do initialization in multiple Actors it’s a good idea to keep the common states, data holders and initialization messages in a separate object)

trait State
case object New extends State
case object Initialized extends State
trait Data
case object NotSet extends Data
object FSMStateProtocol {
case class Initialize(environment: String)
}


Individual states and data holder are then created in the individual Actors. The parent (supervisor) would then create the Actor, send an Initialize message which would in turn create the expensive object. The actor would then move itself to the next state and be ready to receive the further messages.

import akka.actor.FSM
import FSMStateProtocol._
import ActorWithInitializationProtocol._
case class ActorContext(expensive: Reuseable) extends Data
class ActorWithInitialization extends FSM[State, Data] {
startWith(New, NotSet)
when(New) {
case Event(Initialize(environment: String), NotSet) =>
val expensive = doExpensiveComputation(environment)
goto(Initialized).using(ActorContext(expensive))
}
when(Initialized) {
case Event(MyMessage(), ActorContext(expensive)) => {
println("Received message. Actor initialized with %s".format(expensive.toString))
stay()
}
}
private def doExpensiveComputation(environment: String): Reuseable = {
// do expensive computation and return a Reuseable
}
}
object ActorWithInitializationProtocol {
case class MyMessage()
}
class Reuseable() {
// not shown
}
While this is a very nice way to model initialization one big problem became apparent – restarts. As soon as my Actor failed with an exception in the Initialized state, the parent Actor would restart it in the New state. This made the Actor pretty much unusable. One potential solution to this are probably the lifecycle methods. I could have overwritten the postRestart method in my Actor, where I have access to the constructor arguments, to send an initialization message to myself. But instead and against my gut feeling I decided to use a mutable field instead.

As I learned later, even though multiple threads share the same Actor instance, Akka guarantees that only a single thread will handle a message in the receive method at a time (also called The Actor subsequent processing rule). So now I set the mutable field to a None (Option type) and on the first message that arrives the field is initialized properly to a Some. This works fine but throws up some interesting questions. Since Akka is using Dispatchers (thread pools), subsequent messages in an Actor are most likely handled by different threads. In Java, changes to fields of shared objects done in one thread are not always visible to other threads (unless the field is volatile, the modification is done in a synchronized code section or in a section guarded by a Lock). Apparently this is not a problem for Akka.

In layman’s terms this means that changes to internal fields of the actor are visible when the next message is processed by that actor and you don’t need to make the fields volatile.


Unfortunately it is not further explained how Akka achieves this. The visibility problem DOES exist in Akka - if Actor's contain fields that are modified when receiving a message (i.e. some immutable field of ArrayBuffer where elements are added and removed in the receive method). In that case, how does Akka make sure that those changes are seen in other threads when the next message arrives? In my application at least, I had one issue which seemed to be a visibility problem. Unfortunately until now I wasn’t able to isolate and reproduce this problem in a unit test :( What I have so far (some parts need to be added).

object App {
implicit var system: Option[ActorSystem] = None
def main(args: Array[String]) = {
system = Some(ActorSystem("as"))
system.foreach(s => register(s))
}
private def register(implicit system: ActorSystem) = {
val msa = system.actorOf(
Props[MutableStateActor](new MutableStateActor(1000)).withRouter(RoundRobinRouter(nrOfInstances = 3))
, name = "subscriber")
for (i <- 0 until 100000000) {
msa ! AddElement(i)
}
system.shutdown()
}
}
class MutableStateActor(batchSize: Int) extends Actor {
val batch = new scala.collection.mutable.ArrayBuffer[Int](batchSize)
def receive = {
case AddElement(i) => {
batch += i
if (batch.size >= batchSize) {
println("Size is: %d".format(batch.size))
// todo: convert batch into a json of about 6 MB size
// and HTTP post asynchronously via the Dispatch library
batch.clear()
}
}
}
}
object MutableStateProtocol {
case class AddElement(i: Int)
}
view raw App.scala hosted with ❤ by GitHub


Have to fill the gap and do the HTTP POST. What I have seen is a print indicating that a smaller batch has been pushed out – which can ultimately only be a visibility issue. My guess it that the culprit is either my asynchronous POST using the Dispatch library or the way clear() is implemented in the ArrayBuffer class. Further investigating. For now, this change got rid of the problem for me.

class MutableStateActor(batchSize: Int) extends Actor {
@volatile var batch = new scala.collection.mutable.ArrayBuffer[Int](batchSize)
def receive = {
case AddElement(i) => {
batch += i
if (batch.size >= batchSize) {
println("Size is: %d".format(batch.size))
// todo: convert batch into a json of about 6 MB size
// and HTTP post asynchronously via the Dispatch library
// atomic ref assignment on volatile field
batch = new scala.collection.mutable.ArrayBuffer[Int](batchSize)
}
}
}
}

Generating REST docs with Scala and Finatra

More than 2 years ago I wrote a Blog post about Enunciate - a tool which helps you to generate a nice documentation for your REST API if you use Java and JAX-RS. I like documentation that exists very close to the code and is created and updated while you implement the main functionality. This kind of documentation has also been recommended in the Pragmatic Programmer book.

I have not been using JAX-RS and Servlet in a while. We are currently implementing most of our REST API’s on top of Finagle, a RPC System created in the Twitter software forge that runs on Netty. While it is possible to use Finagle directly together with Scala path matching for the routes, I could not find a clever way for self-updating documentation close to the code. Fortunately there is another Twitter project called Finatra, which puts a Sinatra-Flask alike web framework on top of Finagle. Finatra will not only make it easier to define Resources and Routes but also help you with the documentation.

Here is a how you typically define a route in Finatra:

object ExampleRestApp {
class RestController(statsReceiver: StatsReceiver = NullStatsReceiver) extends Controller(statsReceiver) {
get("/user/:userId") { request =>
val userId: Long = request.routeParams.getOrElse("userId", "0").toLong
val user = someService.lookupUser(userId)
if (user == null) {
render.notFound.plain("").toFuture
} else {
render.body(user).header("Content-Type", "application/json").toFuture
}
}
}
}


For the documentation itself I am using Swagger, which can generate HTML from annotations. Swagger already comes with a bunch of useful annotations. Unfortunately some annotations like a @Path equivalent was missing, so I was forced to use some JSR-311 (JAX-RS) instead, even though we are not using JAX-RS for the API. Here is the evolution of the Finatra controller from above with the Swagger and JSR-311 annotations added. As you can see it was necessary to move the routes from the constructor into separate methods that can be annotated. This makes the Scala code a bit uglier and harder to read, especially if you have a lot of annotations in place. But hey, you will love the outcome.

object ExampleRestApp {
@Api(value = "/", description = "Docs for our example controller")
@Path("/")
class RestController(statsReceiver: StatsReceiver = NullStatsReceiver) extends Controller(statsReceiver) {
get_user_id()
@ApiOperation(value = "Returns a user in json format.", notes = "Will only return a subset of the attributes.", responseClass = "java.lang.String", httpMethod = "GET")
@Path("user/{userId}")
@ApiErrors(Array(new ApiError(code = 404, reason = "No such user")))
def get_user_id(@ApiParam(name = "UserId", value = "A valid user id", required = true) env: Long = 0L) {
get("/user/:userId") { request =>
val userId: Long = request.routeParams.getOrElse("userId", "0").toLong
val user = someService.lookupUser(userId)
if (user == null) {
render.notFound.plain("").toFuture
} else {
render.body(user).header("Content-Type", "application/json").toFuture
}
}
}
}
}


The final step is to generate the documentation during our Maven build. We are using the maven-swagger-plugin for that. I even copied and customized the strapdown.html.mustache from the plugin into our project, so that we could tweak the generated documentation and use another Twitter Bootstrap theme instead.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<dependencies>
// not all shown
<dependency>
<groupId>com.wordnik</groupId>
<artifactId>swagger-annotations_2.10.0</artifactId>
<version>1.2.4</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
<version>1.1.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
// not all shown
<plugin>
<groupId>com.github.kongchen</groupId>
<artifactId>swagger-maven-plugin</artifactId>
<version>1.1-SNAPSHOT</version>
<configuration>
<apiSources>
<apiSource>
<locations>your.scala.package.here;</locations>
<apiVersion>${project.version}</apiVersion>
<basePath>http://autoreplaced.com</basePath>
<outputTemplate>${basedir}/doc/modified-strapdown.html.mustache</outputTemplate>
<outputPath>${project.build.outputDirectory}/docs.html</outputPath>
<withFormatSuffix>false</withFormatSuffix>
</apiSource>
</apiSources>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
view raw swagger_pom.xml hosted with ❤ by GitHub


The outcome will be a generated docs.html file in the target folder of your build. The docs.html will contain autoreplaced.com as path - which was specified in the maven-swagger-plugin. I normally replace “autoreplaced.com” with JavaScript (something that can easily be done if you use your own Mustache template). Also it is nice to have Finatra render the docs.html file.

object ExampleRestApp {
@Api(value = "/", description = "Docs for our example controller")
@Path("/")
class RestController(statsReceiver: StatsReceiver = NullStatsReceiver) extends Controller(statsReceiver) {
get("/docs") { request =>
val content = Source.fromURL(getClass.getResource("/docs.html")).mkString
render.html(content).toFuture
}
get_user_id()
@ApiOperation(value = "Returns a user in json format.", notes = "Will only return a subset of the attributes.", responseClass = "java.lang.String", httpMethod = "GET")
@Path("user/{userId}")
@ApiErrors(Array(new ApiError(code = 404, reason = "No such user")))
def get_user_id(@ApiParam(name = "UserId", value = "A valid user id", required = true) env: Long = 0L) {
get("/user/:userId") { request =>
val userId: Long = request.routeParams.getOrElse("userId", "0").toLong
val user = someService.lookupUser(userId)
if (user == null) {
render.notFound.plain("").toFuture
} else {
render.body(user).header("Content-Type", "application/json").toFuture
}
}
}
}
}