Skip to content

ZIO-Streams 101#

hero

ZIO-Streams is a library for purely functional, asynchronous, concurrent stream processing in Scala. It is built on top of ZIO. Additionally, it implements the Reactive Streams specification. This means that ZIO-Streams can be used with other libraries that implement the same specification.

This is an introductory post to ZIO-Streams. I will cover the basics of ZIO-Streams.

Introduction#

A stream represents emission of data over time. It can be infinite or finite.

Let's take a look at a simple example of a stream that emits integers from 1 to 10.

val coldStream: ZStream[Any, Nothing, Int] = ZStream.range(1, 10)

Its similar to ZIO data structure: #ZStream[-R, +E, +A]. A here represents the type of elements that the stream emits.

The above stream is called a cold stream. Its because it's finite and all the data is already available. Unlike a hot stream, where amount of data is unknown and potentially infinite. And hence all the data can never be available at once . Here's an example:

val hotStream: ZStream[Any, IOException, Byte] = 
  ZStream.fromInputStream(java.lang.System.in)

This is a byte stream that reads from System.in. The user can keep on typing and the stream will keep on emitting bytes.

It generally easier to work with cold streams because you can process or consume the data at your speed. Reading a file from a disk is an example of a cold stream. Hot streams are when you are plugged into a live data source like http calls. You can't control the speed at which data is emitted. You can can either buffer some data or drop some data if consuming speed is slower than the emitting speed.

Components of a zio-stream#

There are three fundamental components of a stream - ZStream, ZSink and ZPipeline.

To put it visually, it looks something like this:

graph LR
  ZStream --> ZPipeline
  ZPipeline --> ZSink
ZStraem

Think of ZStream as the source of data. It is where is data is read from and then passed on to next stage in the flow. It can be a file, a database, a http call, etc. It can also be another stream.

ZPipeline

ZPipeline is the middle layer where you process your data. Processing may include filtering, transforming, etc. Processing can be pure or effectful (like http call).

ZSink

ZSink is the final stage where you consume the data. It can be writing to a file, writing to a database, or simply aggregate it in memory.

Examples#

Let's create a stream that reads from a file, filters out all the lines that start with # and then performs a word count of all the remaining lines.

Create stream
//source: read data
val byteStream: ZStream[Any, Throwable, Byte] = ZStream.fromFileName("lines.txt")

//transformed stream
val stringStream: ZStream[Any, Throwable, String] = 
  byteStream >>> ZPipeline.utfDecode >>> ZPipeline.splitLines // (1)!
  1. >>> is an alias for .via(ZPipeline)

Here, we created a byte stream from a file source. And then attached two pipelines to it. Notice that when we attach a pipeline to a stream, the type of the stream changes. It changes from ZStream[Any, Throwable, Byte] to ZStream[Any, Throwable, String]. But the ZStream type is still the same. It's just that the type of elements that it emits has changed.

Tip

ZStream + [ZPipelines] = ZStream

Now lets write the remaining pipelines to further filter and transform the data

Pipelines
// pipeline: filter
val filterLines: ZPipeline[Any, Nothing, String, String] = 
  ZPipeline.filter(!_.startsWith("#"))

// pipeline: transform  
val countLineWords: ZPipeline[Any, Nothing, String, Int] =
  ZPipeline.map(_.split(" ").length)

filterLines is a pipeline that takes a stream of String and returns a stream of String. With possibly less number of elements. countLineWords is a pipeline that takes a stream of String and returns a stream of Int. Each Int represents the number of words in a line.

We can combine these two pipelines into one pipeline using >>> operator.

Combine pipelines
val filterAndCount: ZPipeline[Any, Nothing, String, Int] = 
  filterLines >>> countLineWords

Let's create the final sink that will consume the data.

Sink
//sink: aggregate
val sink: ZSink[Any, Nothing, Int, Nothing, Int] = 
  ZSink.foldLeft(0)(_ + _)

ZIO-Streams offers good composition. We can combine filterAndCount pipeline with the sink and get a new combined sink.

Combine sink and pipeline
val wordCountSink: ZSink[Any, Nothing, String, Nothing, Int] = 
  filterAndCount >>> sink

Tip

[ZPipelines] + ZSink = ZSink

Notice that unlike previous sink, this combined sink takes a stream of Strings and not Ints.

Now we can run the stream with the sink and get the result. To run a stream, we need two things - a ZStream and a ZSink. ZPipelines are optional.

Run stream
//run: execute and get aggregated result
val result: ZIO[Any, Throwable, Int] = 
  stringStream >>> wordCountSink

Tip

ZStream + ZSink = ZIO

You can also chose to combine all individual components in the end and run the stream.

Combine all components
val result: ZIO[Any, Throwable, Int] = 
  stringStream >>> filterLines >>> countLineWords >>> sink

It all works as long as output of one component matches the input of the next component.

I wrote a very verbose code to demonstrate the individual components. But you can also write it in a more concise way.

Concise code
val result: ZIO[Any, Throwable, Int] = 
  ZStream.fromFileName("lines.txt")
    .via(ZPipeline.utfDecode)
    .via(ZPipeline.splitLines)
    .filter(!_.startsWith("#")) // (1)!
    .map(_.split(" ").length) // (2)!
    .runSum // (3)!
  1. .filter is an abstraction over .via(ZPipeline.filter)
  2. .map is an abstraction over .via(ZPipeline.map)
  3. .runSum is an abstraction over .runFold(0)(_ + _) which is an abstraction over .run(ZSink.foldLeft(0)(_ + _)

Concluding thoughts#

ZIO-Streams is a very powerful library and it has much more to offer than what I have covered here. Please refer to the official documentation for more details.

If you have any questions or feedback, please feel free to reach out to me on twitter .