Interesting patterns to consume ZIO Streams#
“
In this post, we will see some of ZStream
consumptions patterns by composing multiple ZSink
s together in different ways
run
#
graph LR
Stream -->|run| Sink
Sink -->|returns| Result
In order to run a stream, you need a sink. The sink may consume one or more or maybe all the values of the stream.
val stream: ZStream[Any, Nothing, Int] = ZStream(1, 2, 3)
val sink: ZSink[Any, Nothing, Int, Nothing, Int] = ZSink.foldLeft(0)(_ + _)
val result: ZIO[Any, Nothing, Int] = stream run sink
Tip
New to ZIO-Streams? Check out ZIO-Streams 101 and Introduction to ZIO Streams
zipPar
: consuming a stream in parallel#
When you want to consume one stream in parallel, you can use zipPar
.
It will consume "ALL" elements from the stream, apply two sinks to it, and then zip the result in a tuple. For example, consuming an http request in a stream format. You can apply two sinks to it, one to process the request and another to log the request.
graph LR
Stream -->|run| Sink1
Stream -->|run| Sink2
Sink1 -->|returns| Result1
Sink2 -->|returns| Result2
Result1 -->|merge| Result1,Result2
Result2 -->|merge| Result1,Result2
val requestStream: ZStream[Any, Nothing, Byte] = ???
val requestProcessorSink: ZSink[Any, Exception, Byte, Nothing, Unit] = ???
val requestLoggerSink: ZSink[Any, Exception, Byte, Nothing, Unit] = ???
val result: ZIO[Any, Exception, Unit] =
requestStream.run(requestProcessorSink zipPar requestLoggerSink)
zip
: consuming a stream sequentially#
Sink.zip
is useful when you want one sink to consume "some elements" of a stream by one sink and rest by another sink. The first Sink decides how many elements it wants to consume. The rest of the elements are consumed by the second sink. This effect can be chained with multiple sinks.
For example, you want to consume a stream of bytes and want to parse the first few bytes as a header and the rest as a body. You can use Sink.zip
to do that.
graph LR
Stream -->|run| Sink1
Sink1 -->|returns| LeftoverStream
Sink1 -->|returns| Result1
LeftoverStream -->|run| Sink2
Sink2 -->|returns| Result2
Result1 -->|merge| Result1,Result2
Result2 -->|merge| Result1,Result2
val inputStream: ZStream[Any, Nothing, String] = ???
case class Header(str: String)
case class Body(str: String)
case class Request(header: Header, body: Body)
val requestHeaderSink =
ZSink
.head[String]
.map(line => Header(line.get))
val requestBodySink: ZSink[Any, Nothing, String, Nothing, Body] =
ZSink.collectAll[String].map(chunk => Body(chunk.mkString))
val run: ZIO[Any, Nothing, Request] = inputStream
.run(requestHeaderSink zip requestBodySink)
.map(Request.apply)
peel
& flatMap
: consuming a stream sequentially using previous result#
ZStream.peel
is a lower level approach to the do the same thing as Sink.zip
.
graph LR
Stream -->|peel| Sink
Sink -->|returns| LeftoverStream
Sink -->|returns| Result1
But peel
offers more flexibility. With peel, you get access to Result1
. It can be useful if Sink2
depends on Result1
. The second sink gets access to the result of the first sink and remaining stream. For example, consider you want to parse the header and then based on the header, you want to parse the body in a streaming fashion
graph LR
Stream -->|peel| Sink1
Sink1 -->|returns| LeftoverStream
Sink1 -->|returns| Result1
Result1 -->|feed| Sink2
LeftoverStream -->|run| Sink2
Sink2 -->|returns| Result2
val inputStream: ZStream[Any, Nothing, String] = ???
enum MessageType:
case Json, Xml
enum MessageBody:
case JsonBody(str: String)
case XmlBody(str: String)
val headerParserSink =
ZSink
.head[String]
.mapZIO {
case Some("json") => ZIO.succeed(MessageType.Json)
case Some("xml") => ZIO.succeed(MessageType.Xml)
case _ => ZIO.fail(new Exception("Invalid header"))
}
def bodyParserSink(messageType: MessageType) =
ZSink
.collectAll[String]
.map(chunk =>
messageType match
case MessageType.Json => MessageBody.JsonBody(chunk.mkString)
case MessageType.Xml => MessageBody.XmlBody(chunk.mkString)
)
val runPeel: ZIO[Scope, Exception, MessageBody] =
for {
peeled <- inputStream.peel(headerParserSink)
(msgType: MessageType, bodyStream: ZStream[Any, Nothing, String]) = peeled
body: MessageBody <- bodyStream run bodyParserSink(msgType)
} yield body
Similar outcome can be achieved using flatMap
as well. flatMap
is a higher level combinator. It can be used to chain sinks together while giving input of previous sink to the next sink. This is more convenient than peel
as you don't have to manually feed the result of the first sink to the second sink.
val runFlatMap: ZIO[Any, Exception, MessageBody] =
val combinedSink = headerParserSink flatMap bodyParserSink
inputStream run combinedSink
race
: racing multiple streams in parallel#
race
can be used to run two sinks in parallel and return the result of the sink that finishes first
graph LR
Stream -->|race| Sink1
Stream -->|race| Sink2
Sink1 -.....-x|slow| Result1
Sink2 -->|fast| Result2
style Result1 fill:#ff95a3,stroke:red,color:#8a0000;
style Result2 fill:#abffa7,stroke:green,color:#008200;
The slower stream is cancelled
val inputStream: ZStream[Any, Nothing, Int] = ZStream.fromIterable(1 to 10)
case class Sum(value: Int)
case class Product(value: Int)
val addition =
ZSink.foldLeft[Int, Sum](Sum(0))((acc, value) => Sum(acc.value + value))
val multiplication =
ZSink.foldLeft[Int, Product](Product(1))((acc, value) =>
Product(acc.value * value)
)
val result: UIO[Sum | Product] =
inputStream run (addition race multiplication)
val run = result.flatMap {
case Sum(value) => Console.printLine(s"Sum: $value")
case Product(value) => Console.printLine(s"Product: $value")
}
orElse
: chain sinks together as fallback strategy#
orElse
can be used to chain sinks together as a fallback. If the first sink fails, the second sink is run. If the second sink fails, the third sink is run and so on. The first sink that succeeds is returned. If all sinks fail, the error of the last sink is returned.
graph LR
ST("Stream")
SN1{"Sink 1"}
SN2{"Sink 2"}
SN3{"Sink 3"}
ST -->|run| SN1
SN1 -->|pass| Result1
SN1 -->|error| SN2
SN2 -->|pass| Result2
SN2 -->|error| SN3
SN3 -->|FAILED| ERROR
SN3 -->|pass| Result3
style Result1 fill:#abffa7,stroke:green,color:#008200;
style Result2 fill:#abffa7,stroke:green,color:#008200;
style Result3 fill:#abffa7,stroke:green,color:#008200;
style ERROR fill:#ff95a3,stroke:red,color:#8a0000;
case class Error(message: String)
val inputStream: ZStream[Any, Nothing, Byte] = ???
val sink1: ZSink[Any, Error, Byte, Nothing, String] = ???
val sink2: ZSink[Any, Error, Byte, Nothing, Int] = ???
val sink3: ZSink[Any, Error, Byte, Nothing, Boolean] = ???
val run: IO[Error, String | Int | Boolean] =
inputStream run (sink1 orElse sink2 orElse sink3)
splitWhere
& foreachWhile
: consume partial data#
splitWhere
can be used to create a new sink from an existing sink such that the new sink consumes only a few initial elements of the stream and the rest of the stream is returned as a leftover stream.
foreachWhile
also supports same use case without first creating sink. Lets see examples of both.
val inputStream: ZStream[Any, Nothing, Int] = ZStream.fromIterable(1 to 10)
val printSink: ZSink[Any, IOException, Int, Nothing, Unit] =
ZSink.foreach[Any, IOException, Int](Console.printLine(_))
val partialPrintSink: ZSink[Any, IOException, Int, Int, Unit] =
printSink.splitWhere(_ == 5)
val decoratedPrintSink: ZSink[Any, IOException, Int, Nothing, Unit] =
ZSink.foreach[Any, IOException, Int](value => Console.printLine(s"Value: $value")
)
val run = inputStream run (partialPrintSink zip decoratedPrintSink)
val inputStream: ZStream[Any, Nothing, Int] = ZStream.fromIterable(1 to 10)
val partialPrintSink = ZSink.foreachWhile((int: Int) =>
if int < 5 then Console.printLine(int) as true
else ZIO.succeed(false)
)
val decoratedPrintSink: ZSink[Any, IOException, Int, Nothing, Unit] =
ZSink.foreach[Any, IOException, Int](value =>
Console.printLine(s"Value: $value")
)
val run = inputStream run (partialPrintSink zip decoratedPrintSink)
Both above programs produce the same output
1
2
3
4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
flowchart LR
subgraph create sink
direction LR
CompleteSink --->|splitWhere| predicate --->|returns| PartialSink
end
Stream --->|run| PartialSink
PartialSink -->|returns| output
subgraph output
direction TB
Result
LeftOverStream
end