ThriftMux Unit Test Roundtrip

I have decided to write this blog post mainly for my own reference. Today I was working on a Thrift service that I wanted to start and test within a unit test. All our services are using the Finagle RPC framework with ThriftMux as transport. I will show you how to find a free port on your machine to start the server on and then create a client to invoke that server. Given this Thrift IDL file.

namespace com.github.reikje
service MicroService {
string request()
}


using Scrooge or the scrooge-sbt-plugin, this is a ScalaTest that does exactly that.

package com.github.reikje
import java.net.{InetAddress, InetSocketAddress}
import com.twitter.finagle.{Address, Name, ThriftMux, param}
import com.twitter.util.{Await, Future}
import org.scalatest.{Matchers, WordSpec}
class RoundtripSpec extends WordSpec with Matchers {
"Microservice" should {
"respond to requests" in {
val service = new MicroServiceImpl("foo")
val server = ThriftMux.server
.configured(param.Label("zuul-thrift"))
.serveIface(new InetSocketAddress(InetAddress.getLoopbackAddress, 0), service)
val client = ThriftMux.client.newIface[MicroService.FutureIface](
Name.bound(Address(server.boundAddress.asInstanceOf[InetSocketAddress])), "microservice-client"
)
val response = Await.result(client.request())
response shouldBe "foo"
}
}
}
class MicroServiceImpl(response: String) extends MicroService.FutureIface {
override def request(): Future[String] = Future.value(response)
}

Dynamic Per-Request Log Level

Looks like it took me three years to write something new for this blog. Kids and other cool stuff got in the way - you know the score! I also should say, that I haven't done anything in Java in the past three years (almost). So to be more accurate, I should rename the blog to jvmsplitter.blogspot.com but whatever - I just drop my Scala blog posts here anyways!

This week I implemented a great idea that a co-worker came up with. Have you ever been in a situation, where you have a running system with a bunch of microservices in production and suddenly something doesn't work as expected? In the world of Battlefield, examples could be that players cannot purchase Battlepacks on PS4 or that matchmaking stopped working. So how do you find the root cause of the problem? Right, you want to look at some logs to get additional information. Only problem is, that your log level in production is usually quite high, for instance WARN or even ERROR - otherwise the amount of logging would just be too much. Wouldn't it be great to alter the log level dynamically on a per-request basis? This would allow you to test in production using TRACE logging – for just your test user. Finagle contexts to the rescue!

Here at DICE we have build all our Scala backend services based on Twitters Finagle framework – which is similar to the Tokio Framework in Rust if you have used that. In a nutshell Finagle is a RPC framework on the JVM with build-in support for various transport protocols, load balancing, service discovery, backoff etc. One semi-hidden feature of Finagle is the broadcast context. Think of the broadcast context as a ThreadLocal that is send along with every request through an RPC graph - from microservice to microservice. Finagle itself uses this internally, for instance to send a unique trace id along with each request. In my implementation, I have used the broadcast context to allow for a per-request log level override. Let's get our hands dirty! The first thing you want to implement is a new Key that Finagle can send over the RPC graph.

package com.github.rschatz
import com.twitter.finagle.context.Contexts
import com.twitter.io.{Buf, Charsets}
import com.twitter.util.Try
import com.github.rschatz.LogLevelName.LogLevelName
/**
* Encapsulates the name of a log level override.
*
* @author rschatz
*/
case class LogLevelOverride(logLevelName: LogLevelName) {
def let[T](f: => T): T = Contexts.broadcast.let(LogLevelOverride, this)(f)
}
object LogLevelName extends Enumeration {
type LogLevelName = Value
val TRACE, DEBUG, INFO, WARN, ERROR = Value
}
object LogLevelOverride extends Contexts.broadcast.Key[LogLevelOverride]("com.github.rschatz.LogLevelOverride") {
def current: Option[LogLevelOverride] = Contexts.broadcast.get(LogLevelOverride)
override def marshal(logLevelOverride: LogLevelOverride): Buf = {
Buf.ByteArray.Owned(logLevelOverride.logLevelName.toString.getBytes(Charsets.Utf8))
}
override def tryUnmarshal(buf: Buf): Try[LogLevelOverride] = {
Try {
val bytes = Buf.ByteArray.Owned.extract(buf)
LogLevelOverride(LogLevelName.withName(new String(bytes, Charsets.Utf8)))
}
}
}


Essentially each Key need to implement two methods marshal and unmarshal, so that Finagle knows how to convert the Key from and to a Byte Array. I am not sharing this code here, but if you want to see how to unit test your code, Finagle has an example. No that we have a class for the log level override defined, we need code to set the override into as well as code to read the override from the broadcast context.

In most system architectures you have one system on the outside of your cluster. Here at DICE we call this system the gateway and it is the only service that is accessible from the public internet. All requests arrive at the gateway and it is the root node in the RPC graph. In other words, the gateway calls other microservices, which might call other microservices and so on. The most logical choice to define a log level override would be inside a Finagle Filter. I haven't actually written the Filter yet but it would look similar to this.

package com.github.rschatz
import com.twitter.finagle.http._
import com.twitter.finagle.{Service, SimpleFilter}
import com.twitter.util.Future
import com.github.rschatz.LogLevelOverride
import com.github.rschatz.LogLevelName
class LogLevelOverrideFilter extends SimpleFilter[Request, Response] {
override def apply(request: Request, service: Service[Request, Response]): Future[Response] = {
val logLevelOverride: Option[LogLevelName] = // figure out if we should override
logLevelOverride match {
case Some(logLevelName) =>
LogLevelOverride(logLevelName).let {
service(request)
}
case _ =>
service(request)
}
}
}


You have to be very careful with the Filter as this code is executed for every request entering your system! Now that we have code to set a log level override into the broadcast context, let's actually use it somewhere. To make this a seamless as possible for the developers, it is helpful if all your microservices share the same logging setup. For instance do we use slf4j with logback and the LoggerContext is set up programmatically inside a trait that every microservice is using (btw. our services follow the Twitter Server template).

package com.github.rschatz
import com.twitter.app.App
// other imports
trait LoggingSetup { self: App =>
private val root = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME).asInstanceOf[Logger]
private val context = root.getLoggerContext
private[this] val logLevel = flag(
"log.level",
"info",
"Default log level (root logger). One of {trace, debug, info, warn, error}"
)
init {
context.reset()
context.setName("whatever")
context.addTurboFilter(BroadcastContextOverrideFilter) // explained below
}
premain {
val level = // convert logLevel() to a ch.qos.logback.classic.Level
root.setLevel(level)
// setup appenders etc.
}
onExit {
// stop context
}
}


As you can already guess now, reading from the broadcast context and actually using the override is wrapped inside a logback TurboFilter. Logback consults the filter for every log event and you can use this to decide if something should be logged or not. The following filter reads from the broadcast context and then makes a decision based on a potential override.

package com.github.rschatz
import ch.qos.logback.classic.turbo.TurboFilter
import ch.qos.logback.classic.{Level, Logger, LoggerContext}
import ch.qos.logback.core.spi.FilterReply
import org.slf4j.{LoggerFactory, Marker}
import com.github.rschatz.{LogLevelName, LogLevelOverride}
object BroadcastContextOverrideFilter extends TurboFilter {
override def decide(marker: Marker, logger: Logger, level: Level, format: String, params: Array[AnyRef], t: Throwable): FilterReply = {
LogLevelOverride.current.map { logLevelOverride =>
if (logLevelOverride.logLevelName == LogLevelName.ERROR && level.isGreaterOrEqual(Level.ERROR)) {
FilterReply.ACCEPT
} else if (logLevelOverride.logLevelName == LogLevelName.WARN && level.isGreaterOrEqual(Level.WARN)) {
FilterReply.ACCEPT
} else if (logLevelOverride.logLevelName == LogLevelName.INFO && level.isGreaterOrEqual(Level.INFO)) {
FilterReply.ACCEPT
} else if (logLevelOverride.logLevelName == LogLevelName.DEBUG && level.isGreaterOrEqual(Level.DEBUG)) {
FilterReply.ACCEPT
} else if (logLevelOverride.logLevelName == LogLevelName.TRACE) {
FilterReply.ACCEPT
} else {
// log event uses a level below the specified override
FilterReply.NEUTRAL
}
}.getOrElse {
// no override in broadcast context
FilterReply.NEUTRAL
}
}
override def isStarted: Boolean = true
}


Conclusion: you can use Finagles broadcast context to transport a log level override through an RPC graph. You need some service to set the override in the context. It is helpful if this system is on the outside of your architecture and preferably uses HTTP. With HTTP it is easy to write a Finagle Filter and base the override on the HTTP request, i.e. by looking at the HTTP headers. Finagle transports the override magically through your RPC call graph and any microservice can use the override for logging decisions. To make this as simple as possible encapsulate this decision logic in some code that is shared between all your microservices.