Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

FS2: is it possible to complete Queue gracefully?

Tags:

scala

fs2

Suppose that I want to convert some legacy asynchronous API into FS2 Streams. The API provides an interface with 3 callbacks: next element, success, error. I'd like the Stream to emit all the elements and then complete upon receiving success or error callback.

FS2 guide (https://functional-streams-for-scala.github.io/fs2/guide.html) suggests using fs2.Queue for such situations, and it works great for enqueueing, but all the examples I've seen so far expect that the stream that queue.dequeue returns will never complete - there's no obvious way to handle success/error callback in my situation. I've tried to use queue.dequeue.interruptWhen(...here goes the signal...), but if success/error callback arrives before the client has read the data from the stream, stream gets terminated prematurely - there are still unread elements. I'd like the consumer to finish reading them before completing the stream.

Is it possible to do that with FS2? With Akka Streams it's trivial - SourceQueueWithComplete has complete and fail methods.

UPDATE: I was able to get good enough result by wrapping elements in Option and considering None as a signal to stop reading the stream, and additionally by using a Promise to propagate errors:

queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)

However, did I overlook more natural way of doing such things?

like image 563
Paul Lysak Avatar asked Dec 07 '22 14:12

Paul Lysak


2 Answers

One idiomatic way to do this is to create a Queue[Option[A]] instead of Queue[A]. When enqueueing, wrap in Some, and you can explicitly enqueue None to signal completion. On the dequeueing side, do q.dequeue.unNoneTerminate, which gives you a Stream[F, A] that terminates once the Queue emits None

like image 138
Daenyth Avatar answered Dec 28 '22 00:12

Daenyth


Answer to your update: Combine unNoneTerminate with rethrow, which takes a Stream[F, Either[Throwable, A]] and returns a Stream[F, A] that errors out with Stream.raiseError when it encouters a throwable.

Your complete stack would then be a Stream[F, Either[Throwable, Option[A]]] and you unwrap into Stream[F,A] by calling .rethrow.unNoneTerminate

like image 26
adhominem Avatar answered Dec 28 '22 00:12

adhominem