It's well documented that merging with an empty fs2.Stream
should produce the same fs2.Stream
. Here is the quote from Scaladocs:
Has the property that
merge(Stream.empty, s) == s
Consider the following complete Scala program with fs2.Stream
:
Emitting elements
import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref
import scala.concurrent.ExecutionContext
object TestFs2 extends App {
implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val program = Ref.of[IO, Int](0).map(ref => {
fs2.Stream.repeatEval(ref.get).evalMap(value => {
IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
})
})
program.flatMap(_.compile.drain).unsafeRunSync()
}
The program prints the following:
Got value 0
Got value 1
Got value 2
...
and it looks ok. Now applying the quote from Scaladoc
above I concluded that replacing
fs2.Stream.repeatEval(ref.get)
with
fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int])
the behavior should be the same. Here is the updated program:
Emitting elements and merging with empty fs2.Stream
import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref
import scala.concurrent.ExecutionContext
object TestFs2 extends App {
implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val program = Ref.of[IO, Int](0).map(ref => {
fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int]).evalMap(value => {
IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
})
})
program.flatMap(_.compile.drain).unsafeRunSync()
}
The program output is
Got value 0
Got value 0
Got value 1
Got value 1
Got value 2
Got value 2
Got value 3
Got value 3
...
QUESTION: Why is merging with empty fs2.Stream
changes the behavior of the program resulting in duplicating elements of the original fs2.Stream
?
The documentation of merge
also says:
The implementation always tries to pull one chunk from each side before waiting for it to be consumed by resulting stream. As such, there may be up to two chunks (one from each stream) waiting to be processed while the resulting stream is processing elements.
If I understand this correctly that would mean that while the resulting stream is busy processing value 0
, a new value is already pulled from the source before ref
has been updated.
Strictly speaking I don't think this behavior violates any invariants. But for you it makes a difference because
To solve the second point you could use a 1-element queue instead of a Ref
.
AFAICT the same issue could occur without using merge
. The stream is free to pull as many elements from the source as it sees fit before processing them, as long as the source can emit them. You basically got lucky in your first piece of code because you have a pretty simple stream with 1-element chunks.
It turned out to be a bug.
mpilquist described the reason behind the behavior in the comment as
It pulls the next chunk from the source stream and then acquires the semaphore permit, which is blocked until previous chunk is processed from the queue. Hence, it's always reading 1 chunk ahead.
Following mpilquist's advices I created a pull request fixing the issue that was just merged.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With