ZIO-Streams 101#
ZIO-Streams is a library for purely functional, asynchronous, concurrent stream processing in Scala. It is built on top of ZIO. Additionally, it implements the Reactive Streams specification. This means that ZIO-Streams can be used with other libraries that implement the same specification.
This is an introductory post to ZIO-Streams. I will cover the basics of ZIO-Streams.
Introduction#
A stream represents emission of data over time. It can be infinite or finite.
Let's take a look at a simple example of a stream that emits integers from 1 to 10.
val coldStream: ZStream[Any, Nothing, Int] = ZStream.range(1, 10)
Its similar to ZIO
data structure: #ZStream[-R, +E, +A]
. A
here represents the type of elements that the stream emits.
The above stream is called a cold stream. Its because it's finite and all the data is already available. Unlike a hot stream, where amount of data is unknown and potentially infinite. And hence all the data can never be available at once . Here's an example:
val hotStream: ZStream[Any, IOException, Byte] =
ZStream.fromInputStream(java.lang.System.in)
This is a byte stream that reads from System.in
. The user can keep on typing and the stream will keep on emitting bytes.
It generally easier to work with cold streams because you can process or consume the data at your speed. Reading a file from a disk is an example of a cold stream. Hot streams are when you are plugged into a live data source like http calls. You can't control the speed at which data is emitted. You can either buffer some data or drop some data if consuming speed is slower than the emitting speed.
Components of a zio-stream#
There are three fundamental components of a stream - ZStream
, ZSink
and ZPipeline
.
To put it visually, it looks something like this:
graph LR
ZStream --> ZPipeline
ZPipeline --> ZSink
ZStraem
-
Think of ZStream as the source of data. It is where is data is read from and then passed on to next stage in the flow. It can be a file, a database, a http call, etc. It can also be another stream.
ZPipeline
-
ZPipeline is the middle layer where you process your data. Processing may include filtering, transforming, etc. Processing can be pure or effectful (like http call).
ZSink
-
ZSink is the final stage where you consume the data. It can be writing to a file, writing to a database, or simply aggregate it in memory.
Examples#
Let's create a stream that reads from a file, filters out all the lines that start with #
and then performs a word count of all the remaining lines.
//source: read data
val byteStream: ZStream[Any, Throwable, Byte] = ZStream.fromFileName("lines.txt")
//transformed stream
val stringStream: ZStream[Any, Throwable, String] =
byteStream >>> ZPipeline.utfDecode >>> ZPipeline.splitLines // (1)!
>>>
is an alias for.via(ZPipeline)
Here, we created a byte stream from a file source. And then attached two pipelines to it. Notice that when we attach a pipeline to a stream, the type of the stream changes. It changes from ZStream[Any, Throwable, Byte]
to ZStream[Any, Throwable, String]
. But the ZStream
type is still the same. It's just that the type of elements that it emits has changed.
Tip
ZStream + [ZPipelines] = ZStream
Now lets write the remaining pipelines to further filter and transform the data
// pipeline: filter
val filterLines: ZPipeline[Any, Nothing, String, String] =
ZPipeline.filter(!_.startsWith("#"))
// pipeline: transform
val countLineWords: ZPipeline[Any, Nothing, String, Int] =
ZPipeline.map(_.split(" ").length)
filterLines
is a pipeline that takes a stream of String and returns a stream of String. With possibly less number of elements. countLineWords
is a pipeline that takes a stream of String and returns a stream of Int. Each Int represents the number of words in a line.
We can combine these two pipelines into one pipeline using >>>
operator.
val filterAndCount: ZPipeline[Any, Nothing, String, Int] =
filterLines >>> countLineWords
Let's create the final sink that will consume the data.
//sink: aggregate
val sink: ZSink[Any, Nothing, Int, Nothing, Int] =
ZSink.foldLeft(0)(_ + _)
ZIO-Streams offers good composition. We can combine filterAndCount
pipeline with the sink and get a new combined sink.
val wordCountSink: ZSink[Any, Nothing, String, Nothing, Int] =
filterAndCount >>> sink
Tip
[ZPipelines] + ZSink = ZSink
Notice that unlike previous sink, this combined sink takes a stream of Strings and not Ints.
Now we can run the stream with the sink and get the result. To run a stream, we need two things - a ZStream and a ZSink. ZPipelines are optional.
//run: execute and get aggregated result
val result: ZIO[Any, Throwable, Int] =
stringStream >>> wordCountSink
Tip
ZStream + ZSink = ZIO
You can also chose to combine all individual components in the end and run the stream.
val result: ZIO[Any, Throwable, Int] =
stringStream >>> filterLines >>> countLineWords >>> sink
It all works as long as output of one component matches the input of the next component.
I wrote a very verbose code to demonstrate the individual components. But you can also write it in a more concise way.
val result: ZIO[Any, Throwable, Int] =
ZStream.fromFileName("lines.txt")
.via(ZPipeline.utfDecode)
.via(ZPipeline.splitLines)
.filter(!_.startsWith("#")) // (1)!
.map(_.split(" ").length) // (2)!
.runSum // (3)!
.filter
is an abstraction over.via(ZPipeline.filter)
.map
is an abstraction over.via(ZPipeline.map)
.runSum
is an abstraction over.runFold(0)(_ + _)
which is an abstraction over.run(ZSink.foldLeft(0)(_ + _)
Concluding thoughts#
ZIO-Streams is a very powerful library and it has much more to offer than what I have covered here. Please refer to the official documentation for more details.
If you have any questions or feedback, please feel free to reach out to me on twitter .