ZIO-Streams 101#
ZIO-Streams is a library for purely functional, asynchronous, concurrent stream processing in Scala. It is built on top of ZIO. Additionally, it implements the Reactive Streams specification. This means that ZIO-Streams can be used with other libraries that implement the same specification.
This is an introductory post to ZIO-Streams. I will cover the basics of ZIO-Streams.
Introduction#
A stream represents emission of data over time. It can be infinite or finite.
Let's take a look at a simple example of a stream that emits integers from 1 to 10.
Its similar to ZIO
data structure: #ZStream[-R, +E, +A]
. A
here represents the type of elements that the stream emits.
The above stream is called a cold stream. Its because it's finite and all the data is already available. Unlike a hot stream, where amount of data is unknown and potentially infinite. And hence all the data can never be available at once . Here's an example:
This is a byte stream that reads from System.in
. The user can keep on typing and the stream will keep on emitting bytes.
It generally easier to work with cold streams because you can process or consume the data at your speed. Reading a file from a disk is an example of a cold stream. Hot streams are when you are plugged into a live data source like http calls. You can't control the speed at which data is emitted. You can can either buffer some data or drop some data if your speed is consuming speed is slower than the emitting speed.
Components of a zio-stream#
There are three fundamental components of a stream - ZStream
, ZSink
and ZPipeline
.
To put it visually, it looks something like this:
graph LR
ZStream --> ZPipeline
ZPipeline --> ZSink
ZStraem
-
Think of ZStream as the source of data. It is where is data is read from and then passed on to next stage in the flow. It can be a file, a database, a http call, etc. It can also be another stream.
ZPipeline
-
ZPipeline is the middle layer where you process your data. Processing may include filtering, transforming, etc. Processing can be pure or effectful (like http call).
ZSink
-
ZSink is the final stage where you consume the data. It can be writing to a file, writing to a database, or simply aggregate it in memory.
Examples#
Let's create a stream that reads from a file, filters out all the lines that start with #
and then performs a word count of all the remaining lines.
Create stream | |
---|---|
>>>
is an alias for.via(ZPipeline)
Here, we created a byte stream from a file source. And then attached two pipelines to it. Notice that when we attach a pipeline to a stream, the type of the stream changes. It changes from ZStream[Any, Throwable, Byte]
to ZStream[Any, Throwable, String]
. But the ZStream
type is still the same. It's just that the type of elements that it emits has changed.
Tip
ZStream + [ZPipelines] = ZStream
Now lets write the remaining pipelines to further filter and transform the data
Pipelines | |
---|---|
filterLines
is a pipeline that takes a stream of String and returns a stream of String. With possibly less number of elements. countLineWords
is a pipeline that takes a stream of String and returns a stream of Int. Each Int represents the number of words in a line.
We can combine these two pipelines into one pipeline using >>>
operator.
Combine pipelines | |
---|---|
Let's create the final sink that will consume the data.
Sink | |
---|---|
ZIO-Streams offers good composition. We can combine filterAndCount
pipeline with the sink and get a new combined sink.
Combine sink and pipeline | |
---|---|
Tip
[ZPipelines] + ZSink = ZSink
Notice that unlike previous sink, this combined sink takes a stream of Strings and not Ints.
Now we can run the stream with the sink and get the result. To run a stream, we need two things - a ZStream and a ZSink. ZPipelines are optional.
Run stream | |
---|---|
Tip
ZStream + ZSink = ZIO
You can also chose to combine all individual components in the end and run the stream.
Combine all components | |
---|---|
It all works as long as output of one component matches the input of the next component.
I wrote a very verbose code to demonstrate the individual components. But you can also write it in a more concise way.
Concise code | |
---|---|
.filter
is an abstraction over.via(ZPipeline.filter)
.map
is an abstraction over.via(ZPipeline.map)
.runSum
is an abstraction over.runFold(0)(_ + _)
which is an abstraction over.run(ZSink.foldLeft(0)(_ + _)
Concluding thoughts#
ZIO-Streams is a very powerful library and it has much more to offer than what I have covered here. Please refer to the official documentation for more details.
If you have any questions or feedback, please feel free to reach out to me on twitter .