Skip to content

Interesting patterns to consume ZIO Streams#

hero

In this post, we will see some of ZStream consumptions patterns by composing multiple ZSinks together in different ways

run#

graph LR
    Stream -->|run| Sink
    Sink -->|returns| Result

In order to run a stream, you need a sink. The sink may consume one or more or maybe all the values of the stream.

val stream: ZStream[Any, Nothing, Int] = ZStream(1, 2, 3)

val sink: ZSink[Any, Nothing, Int, Nothing, Int] = ZSink.foldLeft(0)(_ + _)

val result: ZIO[Any, Nothing, Int] = stream run sink

Tip

New to ZIO-Streams? Check out ZIO-Streams 101 and Introduction to ZIO Streams

zipPar: consuming a stream in parallel#

When you want to consume one stream in parallel, you can use zipPar. It will consume "ALL" elements from the stream, apply two sinks to it, and then zip the result in a tuple. For example, consuming an http request in a stream format. You can apply two sinks to it, one to process the request and another to log the request.

graph LR
    Stream -->|run| Sink1
    Stream -->|run| Sink2
    Sink1 -->|returns| Result1
    Sink2 -->|returns| Result2
    Result1 -->|merge| Result1,Result2
    Result2 -->|merge| Result1,Result2
val requestStream: ZStream[Any, Nothing, Byte] = ???

val requestProcessorSink: ZSink[Any, Exception, Byte, Nothing, Unit] = ???

val requestLoggerSink: ZSink[Any, Exception, Byte, Nothing, Unit] = ???

val result: ZIO[Any, Exception, Unit] = 
  requestStream.run(requestProcessorSink zipPar requestLoggerSink)

zip: consuming a stream sequentially#

Sink.zip is useful when you want one sink to consume "some elements" of a stream by one sink and rest by another sink. The first Sink decides how many elements it wants to consume. The rest of the elements are consumed by the second sink. This effect can be chained with multiple sinks.

For example, you want to consume a stream of bytes and want to parse the first few bytes as a header and the rest as a body. You can use Sink.zip to do that.

graph LR
    Stream -->|run| Sink1
    Sink1 -->|returns| LeftoverStream
    Sink1 -->|returns| Result1
    LeftoverStream -->|run| Sink2
    Sink2 -->|returns| Result2
    Result1 -->|merge| Result1,Result2
    Result2 -->|merge| Result1,Result2
val inputStream: ZStream[Any, Nothing, String] = ???

case class Header(str: String)

case class Body(str: String)

case class Request(header: Header, body: Body)

val requestHeaderSink =
  ZSink
    .head[String]
    .map(line => Header(line.get))

val requestBodySink: ZSink[Any, Nothing, String, Nothing, Body] =
  ZSink.collectAll[String].map(chunk => Body(chunk.mkString))

val run: ZIO[Any, Nothing, Request] = inputStream
  .run(requestHeaderSink zip requestBodySink)
  .map(Request.apply)

peel & flatMap: consuming a stream sequentially using previous result#

ZStream.peel is a lower level approach to the do the same thing as Sink.zip.

graph LR
  Stream -->|peel| Sink
  Sink -->|returns| LeftoverStream
  Sink -->|returns| Result1

But peel offers more flexibility. With peel, you get access to Result1. It can be useful if Sink2 depends on Result1. The second sink gets access to the result of the first sink and remaining stream. For example, consider you want to parse the header and then based on the header, you want to parse the body in a streaming fashion

graph LR
  Stream -->|peel| Sink1
  Sink1 -->|returns| LeftoverStream
  Sink1 -->|returns| Result1
  Result1 -->|feed| Sink2
  LeftoverStream -->|run| Sink2
  Sink2 -->|returns| Result2
val inputStream: ZStream[Any, Nothing, String] = ???

enum MessageType:
  case Json, Xml

enum MessageBody:
  case JsonBody(str: String)
  case XmlBody(str: String)

val headerParserSink =
  ZSink
    .head[String]
    .mapZIO {
      case Some("json") => ZIO.succeed(MessageType.Json)
      case Some("xml")  => ZIO.succeed(MessageType.Xml)
      case _            => ZIO.fail(new Exception("Invalid header"))
    }

def bodyParserSink(messageType: MessageType) =
  ZSink
    .collectAll[String]
    .map(chunk =>
      messageType match
        case MessageType.Json => MessageBody.JsonBody(chunk.mkString)
        case MessageType.Xml  => MessageBody.XmlBody(chunk.mkString)
    )

val runPeel: ZIO[Scope, Exception, MessageBody] =
  for {
    peeled <- inputStream.peel(headerParserSink)
    (msgType: MessageType, bodyStream: ZStream[Any, Nothing, String]) = peeled
    body: MessageBody <- bodyStream run bodyParserSink(msgType)
  } yield body

Similar outcome can be achieved using flatMap as well. flatMap is a higher level combinator. It can be used to chain sinks together while giving input of previous sink to the next sink. This is more convenient than peel as you don't have to manually feed the result of the first sink to the second sink.

val runFlatMap: ZIO[Any, Exception, MessageBody] =
  val combinedSink = headerParserSink flatMap bodyParserSink
  inputStream run combinedSink

race: racing multiple streams in parallel#

race can be used to run two sinks in parallel and return the result of the sink that finishes first

graph LR
  Stream -->|race| Sink1
  Stream -->|race| Sink2

  Sink1 -.....-x|slow| Result1
  Sink2 -->|fast| Result2

  style Result1 fill:#ff95a3,stroke:red,color:#8a0000;
  style Result2 fill:#abffa7,stroke:green,color:#008200;

The slower stream is cancelled

val inputStream: ZStream[Any, Nothing, Int] = ZStream.fromIterable(1 to 10)

case class Sum(value: Int)
case class Product(value: Int)

val addition =
  ZSink.foldLeft[Int, Sum](Sum(0))((acc, value) => Sum(acc.value + value))

val multiplication =
  ZSink.foldLeft[Int, Product](Product(1))((acc, value) =>
    Product(acc.value * value)
  )

val result: UIO[Sum | Product] =
  inputStream run (addition race multiplication)

val run = result.flatMap {
  case Sum(value)     => Console.printLine(s"Sum: $value")
  case Product(value) => Console.printLine(s"Product: $value")
}

orElse: chain sinks together as fallback strategy#

orElse can be used to chain sinks together as a fallback. If the first sink fails, the second sink is run. If the second sink fails, the third sink is run and so on. The first sink that succeeds is returned. If all sinks fail, the error of the last sink is returned.

graph LR
  ST("Stream")
  SN1{"Sink 1"}
  SN2{"Sink 2"}
  SN3{"Sink 3"}

  ST -->|run| SN1
  SN1 -->|pass| Result1
  SN1 -->|error| SN2

  SN2 -->|pass| Result2
  SN2 -->|error| SN3

  SN3 -->|FAILED| ERROR
  SN3 -->|pass| Result3

  style Result1 fill:#abffa7,stroke:green,color:#008200;
  style Result2 fill:#abffa7,stroke:green,color:#008200;
  style Result3 fill:#abffa7,stroke:green,color:#008200;
  style ERROR fill:#ff95a3,stroke:red,color:#8a0000;
case class Error(message: String)

val inputStream: ZStream[Any, Nothing, Byte] = ???

val sink1: ZSink[Any, Error, Byte, Nothing, String] = ???

val sink2: ZSink[Any, Error, Byte, Nothing, Int] = ???

val sink3: ZSink[Any, Error, Byte, Nothing, Boolean] = ???

val run: IO[Error, String | Int | Boolean] =
  inputStream run (sink1 orElse sink2 orElse sink3)

splitWhere & foreachWhile: consume partial data#

splitWhere can be used to create a new sink from an existing sink such that the new sink consumes only a few initial elements of the stream and the rest of the stream is returned as a leftover stream.

foreachWhile also supports same use case without first creating sink. Lets see examples of both.

val inputStream: ZStream[Any, Nothing, Int] = ZStream.fromIterable(1 to 10)

val printSink: ZSink[Any, IOException, Int, Nothing, Unit] = 
  ZSink.foreach[Any, IOException, Int](Console.printLine(_))

val partialPrintSink: ZSink[Any, IOException, Int, Int, Unit] = 
  printSink.splitWhere(_ == 5)

val decoratedPrintSink: ZSink[Any, IOException, Int, Nothing, Unit] = 
  ZSink.foreach[Any, IOException, Int](value => Console.printLine(s"Value: $value")
)

val run = inputStream run (partialPrintSink zip decoratedPrintSink)
val inputStream: ZStream[Any, Nothing, Int] = ZStream.fromIterable(1 to 10)

val partialPrintSink = ZSink.foreachWhile((int: Int) =>
  if int < 5 then Console.printLine(int) as true
  else ZIO.succeed(false)
)

val decoratedPrintSink: ZSink[Any, IOException, Int, Nothing, Unit] =
  ZSink.foreach[Any, IOException, Int](value =>
    Console.printLine(s"Value: $value")
  )

val run = inputStream run (partialPrintSink zip decoratedPrintSink)

Both above programs produce the same output

1
2
3
4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
flowchart LR
    subgraph create sink
    direction LR
    CompleteSink --->|splitWhere| predicate  --->|returns| PartialSink
    end

    Stream --->|run| PartialSink

    PartialSink -->|returns| output

    subgraph output
    direction TB
    Result
    LeftOverStream
    end