ZIO-Streams is a library for purely functional, asynchronous, concurrent stream processing in Scala. It is built on top of ZIO. Additionally, it implements the Reactive Streams specification. This means that ZIO-Streams can be used with other libraries that implement the same specification.
This is an introductory post to ZIO-Streams. I will cover the basics of ZIO-Streams.
Its similar to ZIO data structure: #ZStream[-R, +E, +A]. A here represents the type of elements that the stream emits.
The above stream is called a cold stream. Its because it's finite and all the data is already available. Unlike a hot stream, where amount of data is unknown and potentially infinite. And hence all the data can never be available at once . Here's an example:
This is a byte stream that reads from System.in. The user can keep on typing and the stream will keep on emitting bytes.
It generally easier to work with cold streams because you can process or consume the data at your speed. Reading a file from a disk is an example of a cold stream. Hot streams are when you are plugged into a live data source like http calls. You can't control the speed at which data is emitted. You can either buffer some data or drop some data if consuming speed is slower than the emitting speed.
There are three fundamental components of a stream - ZStream, ZSink and ZPipeline.
To put it visually, it looks something like this:
ZStraem
Think of ZStream as the source of data. It is where is data is read from and then passed on to next stage in the flow. It can be a file, a database, a http call, etc.
It can also be another stream.
ZPipeline
ZPipeline is the middle layer where you process your data. Processing may include filtering, transforming, etc. Processing can be pure or effectful (like http call).
ZSink
ZSink is the final stage where you consume the data. It can be writing to a file, writing to a database, or simply aggregate it in memory.
Here, we created a byte stream from a file source. And then attached two pipelines to it. Notice that when we attach a pipeline to a stream, the type of the stream changes. It changes from ZStream[Any, Throwable, Byte] to ZStream[Any, Throwable, String]. But the ZStream type is still the same. It's just that the type of elements that it emits has changed.
Tip
ZStream + [ZPipelines] = ZStream
Now lets write the remaining pipelines to further filter and transform the data
filterLines is a pipeline that takes a stream of String and returns a stream of String. With possibly less number of elements. countLineWords is a pipeline that takes a stream of String and returns a stream of Int. Each Int represents the number of words in a line.
We can combine these two pipelines into one pipeline using >>> operator.
ZIO-Streams is a very powerful library and it has much more to offer than what I have covered here. Please refer to the official documentation for more details.
If you have any questions or feedback, please feel free to reach out to me on twitter .