Actors using Scala & Akka — Part 2 : AskPattern and SpawnProtocol#
This is second post in a series of “Actors using Scala & Akka”. In Part 1, we saw how to create new actors and modify their state by sending messages.
In this articles we will take a look at how to get a reply from actors (i.e.Ask Pattern) and spawning user actors (Spawn protocol) instead of system actors.
Before we jump into ask pattern, we should have basic understanding of some scala concepts
- Futures
- Execution Context
- Implicit parameters
If you are already familiar with scala, feel free to jump to “Ask Pattern” Section.
Scala Basics#
Note
This tutorial uses Scala 2.13. This is not a full tutorial on Scala. To learn complete Scala, I recommend Programming in Scala book.
Futures#
Futures in scala are similar to Task<T>
in C#, a Promise in Javascript or a CompletableFuture<T>
in Java. Their objective is to help developers write asynchronous code. A Future<Int>
represents a value which will be ready in future when the future is completed. A future may complete with a successful value or fail with an exception. Futures are ideally returned from methods which perform an async operation such as network IO, file IO, etc. This is an evolved design compared to callbacks which were difficult to compose. Since futures are “return values” instead of method parameters, it is easy to chain multiple futures together (compose) and also to distribute across the application.
A program with callbacks
IO.readFile("/someFile.txt", fileContents =>
IO.uploadToService(fileContents, response =>
IO.writeToFile("/another_file.txt", response, _ =>
sys.exit(0), _ => sys.exit(1)
), _ => sys.exit(1)
), _ => sys.exit(1)
)
A program with Futures
IO.readFile("/someFile.txt")
.flatMap(fileContents => IO.uploadToService(fileContents))
.flatMap(response => IO.writeToFile("/another_file.txt", response))
.onComplete {
case Success(_) => sys.exit(0)
case Failure(_) => sys.exit(1)
}
It is important to know about Future
because when you send a message to an actor and expect a reply, even though actor sends a reply of type T
, what you will get will always be a Future[T]
. This is because message based communication is asynchronous & non-blocking.
Execution Context#
Callbacks are a way to tell the program what you want to do after an async operation is complete. When you use a callback based api, you lose control over on which thread or thread-pool the callback will be executed.
When you use futures, you have this control. map
ing & flatMap
ing over a Future
is like attaching a callback to a future and it requires providing an ExecutionContext
. This execution context can be powered by a single thread or any of the various types of thread pools. In other words, whatever you write in the map
or flatMap
will be executed by the execution context provided by you.
Scala provides an out of the box ExecutionContext at scala.concurrent.ExecutionContext.Implicits.global
and it does a good job at doing async work as well as blocking code when used appropriately. It is recommended to use different thread-pools (and execution contexts) for logically different areas of application. You can read about the global execution context in the official documentation.
Implicit Parameters#
Scala has this feature where you can define a parameter list as “implicit” and it will be passed by compiler automatically to the function, if an implicit variable of the required type is present in the caller’s scope. Let’s see an example
sendData
is a method which accepts a parameter data
, of type Any
. Any
is a “super type” of all types in scala so you can pass anything to sendData
. This method has one more parameter list which begins with implicit
and requires a parameter of type Serializer
. Looking at the body of method, we can see that method body can access serializer
argument in the same way it can access data argument. When we call this method without serializer
, it fails to compile as seen in the screenshot above.
We can open another pair of braces and pass in the serializer like any other parameter and it would compile and work just fine.
sendData(Person(name = "john", age = 10))(new Serializer())
But since it is marked as implicit
, it is designed and intended to be passed automatically - implicitly.
implicit val serializer:Serializer = new Serializer()
sendData(Person(name = "john", age = 10))
Here, I have declared a variable and marked it as implicit and now I don’t need to pass it anymore. Since it has the required type and is current scope, compiler does the job for me. If I enable the magic view in IntelliJ by pressing Option CTRL Shift + +, I can see the implicit parameter being passed!
I talked about implicit parameters because .map
and .flatMap
requires passing an ExecutionContext
implicitly. There is a lot more to implicit
s than this in Scala and you can learn more about them in the official documentation.
Ask Pattern#
Ask pattern is used when you to send a message to an actor and also expect a reply. Since this is message driven and asynchronous communication, we get back a Future
of reply. The future completes when the actors decides to send a reply and reply reaches the destination.
Do you remember the code of bank account actor where we ended part 1?
Code from part 1
package tech.bilal
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
object Main extends App {
def behavior(balance: Int): Behavior[BankAccountMessage] = Behaviors.receiveMessage {
case Deposit(amount) => behavior(balance + amount)
case Withdraw(amount) => behavior(balance - amount)
case PrintBalance =>
println(s"balance = $balance")
behavior(balance)
}
val actorSystem = ActorSystem(Behaviors.empty, name = "MyBankActorSystem")
val account1: ActorRef[BankAccountMessage] = actorSystem.systemActorOf(behavior(balance = 0), "account1")
println(account1)
account1 ! PrintBalance
account1 ! Deposit(200)
account1 ! Withdraw(50)
account1 ! PrintBalance
actorSystem.terminate()
}
Here, to know the account balance, we make the actor print it on console. But what if we want to get the balance as a response? Let’s make that happen.
The first thing we will need to do is — change the protocol. Here’s our existing protocol
Protocol from Part 1
sealed trait BankAccountMessage
case class Deposit(amount: Int) extends BankAccountMessage
case class Withdraw(amount: Int) extends BankAccountMessage
case object PrintBalance extends BankAccountMessage
We need to add a “replyTo” parameter (name of parameter doesn’t matter) to the message which needs a reply. It should of type ActorRef[T]
where T
is the type of reply you want. Note that we changed the name from PrintBalance
to GetBalance
and also changed it from case object to case class since it now has parameters and can not be singleton anymore.
sealed trait BankAccountMessage
case class Deposit(amount: Int) extends BankAccountMessage
case class Withdraw(amount: Int) extends BankAccountMessage
case object PrintBalance(reployTo:ActorRef[Int]) extends BankAccountMessage
Let’s take a look at how behaviour needs to change.
def behavior(balance: Int): Behavior[BankAccountMessage] =
Behaviors.receiveMessage {
case Deposit(amount) => behavior(balance + amount)
case Withdraw(amount) => behavior(balance - amount)
case GetBalance(replyTo) =>
replyTo ! balance
Behaviors.same
}
Note
We are using akka version 2.6.5. You can find the latest version on github.
This was pretty straight forward (hopefully!). While handling the message in the behaviour we get hold of “replyTo” address and simply send (!
) the balance.
Now let’s see how we can receive the reply outside of the actor world. This is where ask pattern comes to rescue.
First we need add an import statement to enable some extension methods
import akka.actor.typed.scaladsl.AskPattern.Askable
This provides with an ask
extension method over account1 actorRef. Let’s observe this signature.
It requires one explicit parameter i.e. a function from ActorRef[Int]
to BankAccountMessage
. This is easy to construct.
We don’t have to worry about how we get this actorRef, we just need to be concerned with creating a new BankAccountMessage
. In this is case it is GetBalance
. GetBalance
requires an ActorRef[Int]
which is provided to us by ask method. Ask method internally spawns a temporary actor and provides its actor ref to us. But we don’t have to worry too much about it, it’s just good to know.
We can reduce this code. ask takes a function and GetBalance
’s constructor is also function which takes actor ref. We can pass that directly and let the compiler expand it at compile time.
val balanceFuture: Future[Int] = account1.ask(GetBalance)
ask
requires two more parameters which are marked as implicit. A timeout and a scheduler. Timeout is required to tell ask
, how much time should it wait before failing the future with a timeout exception. A scheduler is a utility which, as the name suggests, is used to schedule function executions. In this case, ask
requires scheduler to fire timeout exception in case actor does not send a reply or it does not reach in time. To pass these implicit values we just have to create their instances in the scope of ask
and mark them as implicit as shown below.
implicit val timeout: Timeout = Timeout(2.seconds)
implicit val scheduler: Scheduler = actorSystem.scheduler
val balanceFuture: Future[Int] = account1.ask(GetBalance)
We don’t have to create a new scheduler instance, we can just use scheduler of our actor system. For defining timeout, you need to pass an instance of FiniteDuration
and it can be created using extension methods such as .seconds
or.minutes
. You need to import scala.concurrent.duration.DurationInt
for these extensions.
Akka also provides an operator function for ask
and it is ?
. It is similar to !
which is an operator function for tell
. Using this operator function, the code becomes as shown below
val balanceFuture: Future[Int] = account1 ? GetBalance
Now that we have the future of balance, we can use .foreach to get access to actual balance and then print it.
import actorSystem.executionContext
balanceFuture.foreach(balance => println(s"balance = $balance"))
This requires an implicit execution context. We can use the global execution context by importing scala.concurrent.ExecutionContext.Implicits.global
or we can use the one in actor system.
Notice that for scheduler
I have to create a new implicit variable, but for executionContext
, I could just import it and it worked. That’s because actor system’s execution context is already marked implicit.
Spawn Protocol#
In part 1, we saw how to spawn a new “system” actor. But system actors are not what we are supposed to spawn for any normal (domain) entities. We need to spawn a user actor. A user actor can only be spawn by another actor. It can be a user actor or a system actor. The most common pattern to use is SpawnProtocol. SpawnProtocol
is a behaviour, give out of the box by Akka. The pattern is
- Use spawn protocol behaviour as guardian behaviour. This will create a system actor with this protocol.
- Send spawn message to this actor with desired behaviour as a parameter
- This actor will spawn the new actor and return actor ref of new actor
val actorSystem = ActorSystem(SpawnProtocol(), name = "MyBankActorSystem")
SpawnProtocol()
returns Behavior[Command]
. Command is a trait which has only one subtype — Spawn[T]
. This means when you use this behavior for spawning an actor or actor system, the only message you can send to target actor is Spawn[T]
.
T
represents the type of behavior you will spawn using spawn protocol. In other words, the “type of message” your spawned actor will accept after it is spawned.
Since we expect a reply (actor ref) from guardian actor, we need to use ask pattern here. Let’s look at the code.
val bankAccountFuture: Future[ActorRef[BankAccountMessage]] =
actorSystem.ask[ActorRef[BankAccountMessage]]{ ref =>
Spawn[BankAccountMessage](
behavior = behavior(balance = 0),
name = "account1",
props = Props.empty,
replyTo = ref
)
}
To send a message to bankAccount, we can use forEach method of the future.
bankAccountFuture.foreach(bankAccount => bankAccount ! Deposit(200))
Scala allows reducing the code by using underscores (_
). Underscore in this context stands for x => x
and we don’t need the lambda notation.
bankAccountFuture.foreach(_ ! Deposit(200))
For those who are used to lambda for long time and are new this underscore notation, can sometimes take a while to get used to this syntax.
To ask bank account, we need to flatMap
because ask also returns a Future[T]
.
val balanceFuture: Future[Int] = bankAccountFuture.flatMap(_ ? GetBalance)
If we have to perform a series of sequential interactions with bank account, they now involve mapping and flatMapping over futures.
bankAccountFuture
.flatMap(_ ? GetBalance)
.map(println)
.map(_ => bankAccountFuture.foreach(_ ! Deposit(200)))
.map(_ => bankAccountFuture.flatMap(_ ! Withdraw(50)))
.flatMap(_ => bankAccountFuture.flatMap(_ ? GetBalance))
.map(println)
Output:
0
150
This syntax is very complicated though. Let’s simplify it.
Using “For Comprehension” with Futures#
Scala provides for comprehensions. These are syntactic sugar for flatMap
and map
. Above code get’s simplified if we use for comprehension.
val future = for {
bankAccount <- bankAccountFuture
balance1 <- bankAccount ? GetBalance
_ = println(balance1)
_ <- bankAccount ! Deposit(200)
_ <- bankAccount ! Withdraw(50)
balance2 <- bankAccount ? GetBalance
_ = println(balance2)
} yield ()
Note that for comprehension is not just for for Future is works for all types with map
& flapMap
methods.
Caution
Akka does not guarantee order of delivery for messages when using a thread-pool directly to send messages. The safest way to ensure order for sequential order is to use “ask” instead of “tell” for all the steps of a sequenced communication.
For comprehensions solve a major problem by giving access to all the “resolved” variables to following expressions. For instance, in the previous code without for comprehension, we had to write bankAccountFuture.forEach
twice and bankAccountFuture.flatMap
once even after we know that future is complete because these operations are sequential. We could have carried the values manually, but that would have been super unreadable and boilerplate code. That’s why. for comprehensions are one of my favourite scala feature
End of Part 2#
Here’s the entire Main.scala contents
package tech.bilal
import akka.actor.typed.SpawnProtocol.Spawn
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem, Behavior, Props, Scheduler, SpawnProtocol}
import akka.actor.typed.scaladsl.AskPattern.Askable
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.DurationInt
object Main extends App {
def behavior(balance: Int): Behavior[BankAccountMessage] = Behaviors.receiveMessage {
case Deposit(amount) => behavior(balance + amount)
case Withdraw(amount) => behavior(balance - amount)
case GetBalance(replyTo) =>
replyTo ! balance
Behaviors.same
}
val actorSystem = ActorSystem(SpawnProtocol(), name = "MyBankActorSystem")
implicit val timeout: Timeout = Timeout(2.seconds)
implicit val scheduler: Scheduler = actorSystem.scheduler
import actorSystem.executionContext
val bankAccountFuture: Future[ActorRef[BankAccountMessage]] =
actorSystem.ask[ActorRef[BankAccountMessage]] { ref =>
Spawn[BankAccountMessage](
behavior = behavior(balance = 0),
name = "account1",
props = Props.empty,
replyTo = ref
)
}
val future = for {
bankAccount <- bankAccountFuture
balance1 <- bankAccount ? GetBalance
_ = println(balance1)
_ = bankAccount ! Deposit(200)
_ = bankAccount ! Withdraw(50)
balance2 <- bankAccount ? GetBalance
_ = println(balance2)
} yield ()
Await.result(future, 2.seconds)
actorSystem.terminate()
}
All the project code can be found on github repo on branch “part-2”.