Skip to content

Composable concurrency scope of ZIO Semaphore#

hero

I was recently working on a "Durable Key-Value Store" implementation in ZIO. The implementation was pretty straightforward, but I had to make sure that the log file is not accessed concurrently by multiple fibers. I could have used a Ref to keep track of the state, but I came across Semaphore in ZIO and decided to give it a try. I was pleasantly surprised by how easy it was to use. In this post, we will take a look at how to use Semaphore to implement concurrency control in ZIO.

Durable key-value store

It is a key-value store that can survive system crashes. It can be implemented using a write ahead log.

Write ahead log is a system that records changes to an append only log file before they are applied to the main database. This ensures that the main database is always in a consistent state. If the system crashes, the database can be recovered by replaying the log file.

What is a Semaphore?#

A semaphore is a concurrency primitive that is used to control access to a shared resource. It is similar to a mutex. The difference is that a mutex is binary, i.e. it can be either locked or unlocked. A semaphore has capacity of more than one. It can be locked by multiple parties at the same time. The capacity of a semaphore is called its permits. A semaphore with 1 permit is equivalent to a mutex.

In ZIO's context, parties aquiring and releasing permits are fibers. Once a fiber acquires a permit, it can perform its task. When it is done, it releases the permit. If there are no permits available, the fiber will be suspended until a permit is released by another fiber.

Using Semaphore#

Here's the planned interface for append only log

trait AppendOnlyLog[A]:
  def append(a: A): ZIO[Any, IOException, Unit]
  def readAll: ZIO[Any, IOException, Seq[A]]

Below is the interface for KV store.

trait KVReader[-K, +V]:
  def get(key: K): UIO[Option[V]]

trait KVWriter[-K, -V]:
  def set(key: K, value: V): Task[Unit]
  def delete(key: K): Task[Unit]

trait DurableKVStore[K, V] extends KVReader[K, V] with KVWriter[K, V]

set operation in the writer will need to call append on the log. If the append operation is successful, it can then safely update in-memory state. delete operation is similar to set, while get operation is just a read from in-memory state. readAll operation is used to initialize the in-memory state from the log file.

To keep things consistent, we need following guranantees:

  • at start, while initializing the in-memory state (readAll), no other fiber should be able to write to the log file
  • while one set or delete operation is in progress, other set or delete operation should be in waiting state

This can be summarized only "one" fiber is allowed to touch the log file at a time. We can achive this using a Semaphore. But the question is where should we implement the semaphore? We have two choices. Either implement semaphore in AppendOnlyLog or in KVWriter.

Semaphore implementation in KVWriter seems like it meets our requirement because we want the "scope" of semaphore to last not only till the set or delete operation is complete, but also till the in-memory state is updated. But the downside of this approach is that it will leave AppendOnlyLog implementation open to risk of concurrent access if used outside of KVWriter. To protect agaisnt this, we will have to implement yet another lock in AppendOnlyLog which will be complex as well as inefficient.

The solution I found was to implement the semaphore in AppendOnlyLog but keep its scope extendable with the help of ZIO's Scope. Instead of using the non scoped semaphore's withPermit method, we can use the withPermitScoped method.

AppendOnlyLog.scala
class AppendOnlyLogJsonImpl[A: JsonCodec](
    path: Path, sem: Semaphore
) extends AppendOnlyLog[A]:

  def append(a: A): ZIO[Scope, IOException, Unit] = // (1)!
    sem.withPermitScoped *> appendToFile(path, a.toJson)
  1. Scope added to the Requirement of append method. This is the key to the solution.

Here, the append method is using semaphore to acquire a permit, then it is appending given data to file. But it is not releasing the permit. Instead, it is returning an added Scope "Requirement" to the caller. withPermitScoped ties "release" of permit to the closure of the scope. So, the permit will be released when the scope is closed. Not when the append method returns.

DurableKVStore.scala
class DurableKVStoreImpl[K, V](
    memoryState: MemoryState[K, V],
    fileLog: AppendOnlyLog[KVCommand[K, V]]
) extends DurableKVStore[K, V]:

  def set(key: K, value: V): Task[Unit] = // (1)!
    ZIO.scoped { // (2)!
      fileLog.append(KVCommand.Set(key, value)) *> memoryState.set(key, value)
    }

  def delete(key: K): Task[Unit] = ZIO.scoped { // (3)!
    fileLog.append(KVCommand.Delete(key)) *> memoryState.delete(key)
  }  
  1. Notice that there is no Scope requirement in the signature here.
  2. ZIO.scoped is used to create a new ephemeral scope.
  3. delete is implemented in a similar way.

DurableKVStoreImpl creates a new ephemeral scope and passes it to append method. Then it updates the in-memory state. After that the scope is closed and the permit is released automatically. The scope will be closed regardless of whether the set operation succeeds or fails. This ensures that the permit is always released. KV Store does not know about the semaphore and it does not have to worry about releasing the permit.

I am very impressed by ZIO's Scope and it's apt use case in Semaphore. So much power yet so simple to compose ❤