Suppose that I want to convert some legacy asynchronous API into FS2 Streams. The API provides an interface with 3 callbacks: next element, success, error. I'd like the Stream to emit all the elements and then complete upon receiving success or error callback.
FS2 guide (https://functional-streams-for-scala.github.io/fs2/guide.html) suggests using fs2.Queue
for such situations,
and it works great for enqueueing, but all the examples I've seen so far expect that the stream that queue.dequeue
returns will never complete -
there's no obvious way to handle success/error callback in my situation.
I've tried to use queue.dequeue.interruptWhen(...here goes the signal...)
, but if success/error callback arrives before the client has read the data from the stream,
stream gets terminated prematurely - there are still unread elements. I'd like the consumer to finish reading them before completing the stream.
Is it possible to do that with FS2? With Akka Streams it's trivial - SourceQueueWithComplete
has complete
and fail
methods.
UPDATE: I was able to get good enough result by wrapping elements in Option and considering None as a signal to stop reading the stream, and additionally by using a Promise to propagate errors:
queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)
However, did I overlook more natural way of doing such things?
One idiomatic way to do this is to create a Queue[Option[A]]
instead of Queue[A]
. When enqueueing, wrap in Some
, and you can explicitly enqueue None
to signal completion. On the dequeueing side, do q.dequeue.unNoneTerminate
, which gives you a Stream[F, A]
that terminates once the Queue emits None
Answer to your update: Combine unNoneTerminate
with rethrow
, which takes a Stream[F, Either[Throwable, A]]
and returns a Stream[F, A]
that errors out with Stream.raiseError
when it encouters a throwable.
Your complete stack would then be a Stream[F, Either[Throwable, Option[A]]]
and you unwrap into Stream[F,A]
by calling .rethrow.unNoneTerminate
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With