Composable concurrency scope of ZIO Semaphore#
I was recently working on a "Durable Key-Value Store" implementation in ZIO. The implementation was pretty straightforward, but I had to make sure that the log file is not accessed concurrently by multiple fibers. I could have used a Ref
to keep track of the state, but I came across Semaphore
in ZIO and decided to give it a try. I was pleasantly surprised by how easy it was to use. In this post, we will take a look at how to use Semaphore
to implement concurrency control in ZIO.
Durable key-value store
It is a key-value store that can survive system crashes. It can be implemented using a write ahead log.
Write ahead log is a system that records changes to an append only log file before they are applied to the main database. This ensures that the main database is always in a consistent state. If the system crashes, the database can be recovered by replaying the log file.
What is a Semaphore?#
A semaphore is a concurrency primitive that is used to control access to a shared resource. It is similar to a mutex. The difference is that a mutex is binary, i.e. it can be either locked or unlocked. A semaphore has capacity of more than one. It can be locked by multiple parties at the same time. The capacity of a semaphore is called its permits. A semaphore with 1 permit is equivalent to a mutex.
In ZIO's context, parties aquiring and releasing permits are fibers. Once a fiber acquires a permit, it can perform its task. When it is done, it releases the permit. If there are no permits available, the fiber will be suspended until a permit is released by another fiber.
Using Semaphore#
Here's the planned interface for append only log
trait AppendOnlyLog[A]:
def append(a: A): ZIO[Any, IOException, Unit]
def readAll: ZIO[Any, IOException, Seq[A]]
Below is the interface for KV store.
trait KVReader[-K, +V]:
def get(key: K): UIO[Option[V]]
trait KVWriter[-K, -V]:
def set(key: K, value: V): Task[Unit]
def delete(key: K): Task[Unit]
trait DurableKVStore[K, V] extends KVReader[K, V] with KVWriter[K, V]
set
operation in the writer will need to call append
on the log. If the append operation is successful, it can then safely update in-memory state. delete
operation is similar to set
, while get
operation is just a read from in-memory state. readAll
operation is used to initialize the in-memory state from the log file.
To keep things consistent, we need following guranantees:
- at start, while initializing the in-memory state (
readAll
), no other fiber should be able to write to the log file - while one
set
ordelete
operation is in progress, otherset
ordelete
operation should be in waiting state
This can be summarized only "one" fiber is allowed to touch the log file at a time. We can achive this using a Semaphore
. But the question is where should we implement the semaphore? We have two choices. Either implement semaphore in AppendOnlyLog
or in KVWriter
.
Semaphore implementation in KVWriter
seems like it meets our requirement because we want the "scope" of semaphore to last not only till the set
or delete
operation is complete, but also till the in-memory state is updated. But the downside of this approach is that it will leave AppendOnlyLog
implementation open to risk of concurrent access if used outside of KVWriter
. To protect agaisnt this, we will have to implement yet another lock in AppendOnlyLog
which will be complex as well as inefficient.
The solution I found was to implement the semaphore in AppendOnlyLog
but keep its scope extendable with the help of ZIO's Scope
. Instead of using the non scoped semaphore's withPermit
method, we can use the withPermitScoped
method.
class AppendOnlyLogJsonImpl[A: JsonCodec](
path: Path, sem: Semaphore
) extends AppendOnlyLog[A]:
def append(a: A): ZIO[Scope, IOException, Unit] = // (1)!
sem.withPermitScoped *> appendToFile(path, a.toJson)
- Scope added to the Requirement of
append
method. This is the key to the solution.
Here, the append
method is using semaphore to acquire a permit, then it is appending given data to file. But it is not releasing the permit. Instead, it is returning an added Scope
"Requirement" to the caller. withPermitScoped
ties "release" of permit to the closure of the scope. So, the permit will be released when the scope is closed. Not when the append
method returns.
class DurableKVStoreImpl[K, V](
memoryState: MemoryState[K, V],
fileLog: AppendOnlyLog[KVCommand[K, V]]
) extends DurableKVStore[K, V]:
def set(key: K, value: V): Task[Unit] = // (1)!
ZIO.scoped { // (2)!
fileLog.append(KVCommand.Set(key, value)) *> memoryState.set(key, value)
}
def delete(key: K): Task[Unit] = ZIO.scoped { // (3)!
fileLog.append(KVCommand.Delete(key)) *> memoryState.delete(key)
}
- Notice that there is no
Scope
requirement in the signature here. ZIO.scoped
is used to create a new ephemeral scope.delete
is implemented in a similar way.
DurableKVStoreImpl
creates a new ephemeral scope and passes it to append
method. Then it updates the in-memory state. After that the scope is closed and the permit is released automatically. The scope will be closed regardless of whether the set
operation succeeds or fails. This ensures that the permit is always released. KV Store does not know about the semaphore and it does not have to worry about releasing the permit.
I am very impressed by ZIO's Scope
and it's apt use case in Semaphore. So much power yet so simple to compose