Skip to content

Actors using Scala & Akka — Part 2 : AskPattern and SpawnProtocol#

hero

This is second post in a series of “Actors using Scala & Akka”. In Part 1, we saw how to create new actors and modify their state by sending messages.

In this articles we will take a look at how to get a reply from actors (i.e.Ask Pattern) and spawning user actors (Spawn protocol) instead of system actors.

Before we jump into ask pattern, we should have basic understanding of some scala concepts

  • Futures
  • Execution Context
  • Implicit parameters

If you are already familiar with scala, feel free to jump to “Ask Pattern” Section.

Scala Basics#

Note

This tutorial uses Scala 2.13. This is not a full tutorial on Scala. To learn complete Scala, I recommend Programming in Scala book.

Futures#

Futures in scala are similar to Task<T> in C#, a Promise in Javascript or a CompletableFuture<T> in Java. Their objective is to help developers write asynchronous code. A Future<Int> represents a value which will be ready in future when the future is completed. A future may complete with a successful value or fail with an exception. Futures are ideally returned from methods which perform an async operation such as network IO, file IO, etc. This is an evolved design compared to callbacks which were difficult to compose. Since futures are “return values” instead of method parameters, it is easy to chain multiple futures together (compose) and also to distribute across the application.

A program with callbacks

IO.readFile("/someFile.txt", fileContents => 
  IO.uploadToService(fileContents, response => 
    IO.writeToFile("/another_file.txt", response, _ =>
      sys.exit(0), _ => sys.exit(1)
    ), _ => sys.exit(1)
  ), _ => sys.exit(1)
)

A program with Futures

IO.readFile("/someFile.txt")
  .flatMap(fileContents => IO.uploadToService(fileContents))
  .flatMap(response => IO.writeToFile("/another_file.txt", response))
  .onComplete {
    case Success(_) => sys.exit(0)
    case Failure(_) => sys.exit(1)
  }

It is important to know about Future because when you send a message to an actor and expect a reply, even though actor sends a reply of type T, what you will get will always be a Future[T]. This is because message based communication is asynchronous & non-blocking.

Execution Context#

Callbacks are a way to tell the program what you want to do after an async operation is complete. When you use a callback based api, you lose control over on which thread or thread-pool the callback will be executed.

When you use futures, you have this control. maping & flatMaping over a Future is like attaching a callback to a future and it requires providing an ExecutionContext. This execution context can be powered by a single thread or any of the various types of thread pools. In other words, whatever you write in the map or flatMap will be executed by the execution context provided by you.

Scala provides an out of the box ExecutionContext at scala.concurrent.ExecutionContext.Implicits.global and it does a good job at doing async work as well as blocking code when used appropriately. It is recommended to use different thread-pools (and execution contexts) for logically different areas of application. You can read about the global execution context in the official documentation.

Implicit Parameters#

Scala has this feature where you can define a parameter list as “implicit” and it will be passed by compiler automatically to the function, if an implicit variable of the required type is present in the caller’s scope. Let’s see an example

Implicit parameters

sendData is a method which accepts a parameter data, of type Any. Any is a “super type” of all types in scala so you can pass anything to sendData. This method has one more parameter list which begins with implicit and requires a parameter of type Serializer. Looking at the body of method, we can see that method body can access serializer argument in the same way it can access data argument. When we call this method without serializer, it fails to compile as seen in the screenshot above.

We can open another pair of braces and pass in the serializer like any other parameter and it would compile and work just fine.

sendData(Person(name = "john", age = 10))(new Serializer())

But since it is marked as implicit, it is designed and intended to be passed automatically - implicitly.

implicit val serializer:Serializer = new Serializer()
sendData(Person(name = "john", age = 10))

Here, I have declared a variable and marked it as implicit and now I don’t need to pass it anymore. Since it has the required type and is current scope, compiler does the job for me. If I enable the magic view in IntelliJ by pressing Option CTRL Shift + +, I can see the implicit parameter being passed!

implicit parameters 2

I talked about implicit parameters because .map and .flatMap requires passing an ExecutionContext implicitly. There is a lot more to implicits than this in Scala and you can learn more about them in the official documentation.

Ask Pattern#

Ask pattern is used when you to send a message to an actor and also expect a reply. Since this is message driven and asynchronous communication, we get back a Future of reply. The future completes when the actors decides to send a reply and reply reaches the destination.

Do you remember the code of bank account actor where we ended part 1?

Code from part 1

package tech.bilal

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}

object Main extends App {

  def behavior(balance: Int): Behavior[BankAccountMessage] = Behaviors.receiveMessage {
    case Deposit(amount)  => behavior(balance + amount)
    case Withdraw(amount) => behavior(balance - amount)
    case PrintBalance =>
      println(s"balance = $balance")
      behavior(balance)
  }

  val actorSystem                            = ActorSystem(Behaviors.empty, name = "MyBankActorSystem")
  val account1: ActorRef[BankAccountMessage] = actorSystem.systemActorOf(behavior(balance = 0), "account1")
  println(account1)

  account1 ! PrintBalance
  account1 ! Deposit(200)
  account1 ! Withdraw(50)
  account1 ! PrintBalance

  actorSystem.terminate()
}

Here, to know the account balance, we make the actor print it on console. But what if we want to get the balance as a response? Let’s make that happen.

The first thing we will need to do is — change the protocol. Here’s our existing protocol

Protocol from Part 1

sealed trait BankAccountMessage

case class Deposit(amount: Int)  extends BankAccountMessage
case class Withdraw(amount: Int) extends BankAccountMessage
case object PrintBalance         extends BankAccountMessage

We need to add a “replyTo” parameter (name of parameter doesn’t matter) to the message which needs a reply. It should of type ActorRef[T] where T is the type of reply you want. Note that we changed the name from PrintBalance to GetBalance and also changed it from case object to case class since it now has parameters and can not be singleton anymore.

sealed trait BankAccountMessage

case class Deposit(amount: Int)                   extends BankAccountMessage
case class Withdraw(amount: Int)                  extends BankAccountMessage
case object PrintBalance(reployTo:ActorRef[Int])  extends BankAccountMessage

Let’s take a look at how behaviour needs to change.

def behavior(balance: Int): Behavior[BankAccountMessage] =
  Behaviors.receiveMessage {
    case Deposit(amount)     => behavior(balance + amount)
    case Withdraw(amount)    => behavior(balance - amount)
    case GetBalance(replyTo) =>
      replyTo ! balance
      Behaviors.same
  }

Note

We are using akka version 2.6.5. You can find the latest version on github.

This was pretty straight forward (hopefully!). While handling the message in the behaviour we get hold of “replyTo” address and simply send (!) the balance.

Now let’s see how we can receive the reply outside of the actor world. This is where ask pattern comes to rescue.

First we need add an import statement to enable some extension methods

import akka.actor.typed.scaladsl.AskPattern.Askable

This provides with an ask extension method over account1 actorRef. Let’s observe this signature.

ask signature 1

It requires one explicit parameter i.e. a function from ActorRef[Int] to BankAccountMessage. This is easy to construct.

ask signature 2

We don’t have to worry about how we get this actorRef, we just need to be concerned with creating a new BankAccountMessage. In this is case it is GetBalance. GetBalance requires an ActorRef[Int] which is provided to us by ask method. Ask method internally spawns a temporary actor and provides its actor ref to us. But we don’t have to worry too much about it, it’s just good to know.

We can reduce this code. ask takes a function and GetBalance’s constructor is also function which takes actor ref. We can pass that directly and let the compiler expand it at compile time.

val balanceFuture: Future[Int] = account1.ask(GetBalance)

ask requires two more parameters which are marked as implicit. A timeout and a scheduler. Timeout is required to tell ask, how much time should it wait before failing the future with a timeout exception. A scheduler is a utility which, as the name suggests, is used to schedule function executions. In this case, ask requires scheduler to fire timeout exception in case actor does not send a reply or it does not reach in time. To pass these implicit values we just have to create their instances in the scope of ask and mark them as implicit as shown below.

implicit val timeout: Timeout = Timeout(2.seconds)
implicit val scheduler: Scheduler = actorSystem.scheduler

val balanceFuture: Future[Int] = account1.ask(GetBalance)

We don’t have to create a new scheduler instance, we can just use scheduler of our actor system. For defining timeout, you need to pass an instance of FiniteDuration and it can be created using extension methods such as .seconds or.minutes. You need to import scala.concurrent.duration.DurationInt for these extensions.

Akka also provides an operator function for ask and it is ?. It is similar to ! which is an operator function for tell. Using this operator function, the code becomes as shown below

val balanceFuture: Future[Int] = account1 ? GetBalance

Now that we have the future of balance, we can use .foreach to get access to actual balance and then print it.

import actorSystem.executionContext
balanceFuture.foreach(balance => println(s"balance = $balance"))

This requires an implicit execution context. We can use the global execution context by importing scala.concurrent.ExecutionContext.Implicits.global or we can use the one in actor system.

Notice that for scheduler I have to create a new implicit variable, but for executionContext, I could just import it and it worked. That’s because actor system’s execution context is already marked implicit.

Spawn Protocol#

In part 1, we saw how to spawn a new “system” actor. But system actors are not what we are supposed to spawn for any normal (domain) entities. We need to spawn a user actor. A user actor can only be spawn by another actor. It can be a user actor or a system actor. The most common pattern to use is SpawnProtocol. SpawnProtocol is a behaviour, give out of the box by Akka. The pattern is

  • Use spawn protocol behaviour as guardian behaviour. This will create a system actor with this protocol.
  • Send spawn message to this actor with desired behaviour as a parameter
  • This actor will spawn the new actor and return actor ref of new actor
val actorSystem = ActorSystem(SpawnProtocol(), name = "MyBankActorSystem")

SpawnProtocol() returns Behavior[Command] . Command is a trait which has only one subtype — Spawn[T] . This means when you use this behavior for spawning an actor or actor system, the only message you can send to target actor is Spawn[T].

T represents the type of behavior you will spawn using spawn protocol. In other words, the “type of message” your spawned actor will accept after it is spawned.

Since we expect a reply (actor ref) from guardian actor, we need to use ask pattern here. Let’s look at the code.

val bankAccountFuture: Future[ActorRef[BankAccountMessage]] =
  actorSystem.ask[ActorRef[BankAccountMessage]]{ ref =>
    Spawn[BankAccountMessage](
      behavior = behavior(balance = 0),
      name = "account1",
      props = Props.empty,
      replyTo = ref
    )
  }

To send a message to bankAccount, we can use forEach method of the future.

bankAccountFuture.foreach(bankAccount => bankAccount ! Deposit(200))

Scala allows reducing the code by using underscores (_). Underscore in this context stands for x => x and we don’t need the lambda notation.

bankAccountFuture.foreach(_ ! Deposit(200))

For those who are used to lambda for long time and are new this underscore notation, can sometimes take a while to get used to this syntax.

To ask bank account, we need to flatMap because ask also returns a Future[T].

val balanceFuture: Future[Int] = bankAccountFuture.flatMap(_ ? GetBalance)

If we have to perform a series of sequential interactions with bank account, they now involve mapping and flatMapping over futures.

bankAccountFuture
  .flatMap(_ ? GetBalance)
  .map(println)
  .map(_ => bankAccountFuture.foreach(_ ! Deposit(200)))
  .map(_ => bankAccountFuture.flatMap(_ ! Withdraw(50)))
  .flatMap(_ => bankAccountFuture.flatMap(_ ? GetBalance))
  .map(println)

Output:

0
150

This syntax is very complicated though. Let’s simplify it.

Using “For Comprehension” with Futures#

Scala provides for comprehensions. These are syntactic sugar for flatMap and map. Above code get’s simplified if we use for comprehension.

val future = for {
  bankAccount <- bankAccountFuture
  balance1 <- bankAccount ? GetBalance
  _ = println(balance1)
  _ <- bankAccount ! Deposit(200)
  _ <- bankAccount ! Withdraw(50)
  balance2 <- bankAccount ? GetBalance
  _ = println(balance2)
} yield ()

Note that for comprehension is not just for for Future is works for all types with map & flapMap methods.

Caution

Akka does not guarantee order of delivery for messages when using a thread-pool directly to send messages. The safest way to ensure order for sequential order is to use “ask” instead of “tell” for all the steps of a sequenced communication.

For comprehensions solve a major problem by giving access to all the “resolved” variables to following expressions. For instance, in the previous code without for comprehension, we had to write bankAccountFuture.forEach twice and bankAccountFuture.flatMap once even after we know that future is complete because these operations are sequential. We could have carried the values manually, but that would have been super unreadable and boilerplate code. That’s why. for comprehensions are one of my favourite scala feature

End of Part 2#

Here’s the entire Main.scala contents

package tech.bilal

import akka.actor.typed.SpawnProtocol.Spawn
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem, Behavior, Props, Scheduler, SpawnProtocol}
import akka.actor.typed.scaladsl.AskPattern.Askable
import akka.util.Timeout

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.DurationInt

object Main extends App {

  def behavior(balance: Int): Behavior[BankAccountMessage] = Behaviors.receiveMessage {
    case Deposit(amount)  => behavior(balance + amount)
    case Withdraw(amount) => behavior(balance - amount)
    case GetBalance(replyTo) =>
      replyTo ! balance
      Behaviors.same
  }

  val actorSystem = ActorSystem(SpawnProtocol(), name = "MyBankActorSystem")

  implicit val timeout: Timeout     = Timeout(2.seconds)
  implicit val scheduler: Scheduler = actorSystem.scheduler
  import actorSystem.executionContext

  val bankAccountFuture: Future[ActorRef[BankAccountMessage]] =
    actorSystem.ask[ActorRef[BankAccountMessage]] { ref =>
      Spawn[BankAccountMessage](
        behavior = behavior(balance = 0),
        name = "account1",
        props = Props.empty,
        replyTo = ref
      )
    }

  val future = for {
    bankAccount <- bankAccountFuture
    balance1    <- bankAccount ? GetBalance
    _           = println(balance1)
    _           = bankAccount ! Deposit(200)
    _           = bankAccount ! Withdraw(50)
    balance2    <- bankAccount ? GetBalance
    _           = println(balance2)
  } yield ()

  Await.result(future, 2.seconds)

  actorSystem.terminate()
}

All the project code can be found on github repo on branch “part-2”.