I knew that Actor instances were shared, i.e. multiple calls to the receive method would be done on the same Actor object. So being new to Akka, I was afraid of having shared mutable state within my Actor and I was researching for a better way to do the initialization, other than just having a mutable field. This is when I found out about the FSM (Finite State Machine) trait. It is a perfect way to model initialization. I created two States for my Actor (if you want to do initialization in multiple Actors it’s a good idea to keep the common states, data holders and initialization messages in a separate object)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait State | |
case object New extends State | |
case object Initialized extends State | |
trait Data | |
case object NotSet extends Data | |
object FSMStateProtocol { | |
case class Initialize(environment: String) | |
} |
Individual states and data holder are then created in the individual Actors. The parent (supervisor) would then create the Actor, send an Initialize message which would in turn create the expensive object. The actor would then move itself to the next state and be ready to receive the further messages.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.FSM | |
import FSMStateProtocol._ | |
import ActorWithInitializationProtocol._ | |
case class ActorContext(expensive: Reuseable) extends Data | |
class ActorWithInitialization extends FSM[State, Data] { | |
startWith(New, NotSet) | |
when(New) { | |
case Event(Initialize(environment: String), NotSet) => | |
val expensive = doExpensiveComputation(environment) | |
goto(Initialized).using(ActorContext(expensive)) | |
} | |
when(Initialized) { | |
case Event(MyMessage(), ActorContext(expensive)) => { | |
println("Received message. Actor initialized with %s".format(expensive.toString)) | |
stay() | |
} | |
} | |
private def doExpensiveComputation(environment: String): Reuseable = { | |
// do expensive computation and return a Reuseable | |
} | |
} | |
object ActorWithInitializationProtocol { | |
case class MyMessage() | |
} | |
class Reuseable() { | |
// not shown | |
} |
As I learned later, even though multiple threads share the same Actor instance, Akka guarantees that only a single thread will handle a message in the receive method at a time (also called The Actor subsequent processing rule). So now I set the mutable field to a None (Option type) and on the first message that arrives the field is initialized properly to a Some. This works fine but throws up some interesting questions. Since Akka is using Dispatchers (thread pools), subsequent messages in an Actor are most likely handled by different threads. In Java, changes to fields of shared objects done in one thread are not always visible to other threads (unless the field is volatile, the modification is done in a synchronized code section or in a section guarded by a Lock). Apparently this is not a problem for Akka.
In layman’s terms this means that changes to internal fields of the actor are visible when the next message is processed by that actor and you don’t need to make the fields volatile.
Unfortunately it is not further explained how Akka achieves this. The visibility problem DOES exist in Akka - if Actor's contain fields that are modified when receiving a message (i.e. some immutable field of ArrayBuffer where elements are added and removed in the receive method). In that case, how does Akka make sure that those changes are seen in other threads when the next message arrives? In my application at least, I had one issue which seemed to be a visibility problem. Unfortunately until now I wasn’t able to isolate and reproduce this problem in a unit test :( What I have so far (some parts need to be added).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object App { | |
implicit var system: Option[ActorSystem] = None | |
def main(args: Array[String]) = { | |
system = Some(ActorSystem("as")) | |
system.foreach(s => register(s)) | |
} | |
private def register(implicit system: ActorSystem) = { | |
val msa = system.actorOf( | |
Props[MutableStateActor](new MutableStateActor(1000)).withRouter(RoundRobinRouter(nrOfInstances = 3)) | |
, name = "subscriber") | |
for (i <- 0 until 100000000) { | |
msa ! AddElement(i) | |
} | |
system.shutdown() | |
} | |
} | |
class MutableStateActor(batchSize: Int) extends Actor { | |
val batch = new scala.collection.mutable.ArrayBuffer[Int](batchSize) | |
def receive = { | |
case AddElement(i) => { | |
batch += i | |
if (batch.size >= batchSize) { | |
println("Size is: %d".format(batch.size)) | |
// todo: convert batch into a json of about 6 MB size | |
// and HTTP post asynchronously via the Dispatch library | |
batch.clear() | |
} | |
} | |
} | |
} | |
object MutableStateProtocol { | |
case class AddElement(i: Int) | |
} |
Have to fill the gap and do the HTTP POST. What I have seen is a print indicating that a smaller batch has been pushed out – which can ultimately only be a visibility issue. My guess it that the culprit is either my asynchronous POST using the Dispatch library or the way clear() is implemented in the ArrayBuffer class. Further investigating. For now, this change got rid of the problem for me.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MutableStateActor(batchSize: Int) extends Actor { | |
@volatile var batch = new scala.collection.mutable.ArrayBuffer[Int](batchSize) | |
def receive = { | |
case AddElement(i) => { | |
batch += i | |
if (batch.size >= batchSize) { | |
println("Size is: %d".format(batch.size)) | |
// todo: convert batch into a json of about 6 MB size | |
// and HTTP post asynchronously via the Dispatch library | |
// atomic ref assignment on volatile field | |
batch = new scala.collection.mutable.ArrayBuffer[Int](batchSize) | |
} | |
} | |
} | |
} |
1 Kommentare:
This is a very interesting observation, and must I say that I have thought about this possibility, since the time I began learning and working with Akka. I am very happy to hear that someone else has thought through it and has been able to demonstrate (at least) once, the a non-volatile _var_ may have a problem in an Actor.
Have you done any further research on this? Any blog-post on that? I would love to know more about it.
BTW, I have also been involved with building the backend stack for MMoGs, in the past. At least in 3 cases, I have been successful in implementing the core Server/Game logic using hand-crafted, Table-driven FSM (a hangover from my earlier 'C' prigramming, Lex/Yacc days), using Java! Just wanted to share.
Kommentar veröffentlichen