Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does merging with empty fs2.Stream change program's behavior

It's well documented that merging with an empty fs2.Stream should produce the same fs2.Stream. Here is the quote from Scaladocs:

Has the property that merge(Stream.empty, s) == s

Consider the following complete Scala program with fs2.Stream:

Emitting elements

import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref

import scala.concurrent.ExecutionContext

object TestFs2 extends App {
  implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
  implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

  val program = Ref.of[IO, Int](0).map(ref => {
    fs2.Stream.repeatEval(ref.get).evalMap(value => {
      IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
    })
  })

  program.flatMap(_.compile.drain).unsafeRunSync()
}

The program prints the following:

Got value 0
Got value 1
Got value 2
...

and it looks ok. Now applying the quote from Scaladoc above I concluded that replacing

fs2.Stream.repeatEval(ref.get)

with

fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int])

the behavior should be the same. Here is the updated program:

Emitting elements and merging with empty fs2.Stream

import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref

import scala.concurrent.ExecutionContext

object TestFs2 extends App {
  implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
  implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

  val program = Ref.of[IO, Int](0).map(ref => {
    fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int]).evalMap(value => {
      IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
    })
  })

  program.flatMap(_.compile.drain).unsafeRunSync()
}

The program output is

Got value 0
Got value 0
Got value 1
Got value 1
Got value 2
Got value 2
Got value 3
Got value 3
...

QUESTION: Why is merging with empty fs2.Stream changes the behavior of the program resulting in duplicating elements of the original fs2.Stream?

like image 377
Some Name Avatar asked Aug 13 '20 19:08

Some Name


2 Answers

The documentation of merge also says:

The implementation always tries to pull one chunk from each side before waiting for it to be consumed by resulting stream. As such, there may be up to two chunks (one from each stream) waiting to be processed while the resulting stream is processing elements.

If I understand this correctly that would mean that while the resulting stream is busy processing value 0, a new value is already pulled from the source before ref has been updated.

Strictly speaking I don't think this behavior violates any invariants. But for you it makes a difference because

  • your stream mutates the source from which it is pulling
  • your source stream is always ready to emit an element

To solve the second point you could use a 1-element queue instead of a Ref.

AFAICT the same issue could occur without using merge. The stream is free to pull as many elements from the source as it sees fit before processing them, as long as the source can emit them. You basically got lucky in your first piece of code because you have a pretty simple stream with 1-element chunks.

like image 142
Jasper-M Avatar answered Nov 06 '22 12:11

Jasper-M


It turned out to be a bug.

mpilquist described the reason behind the behavior in the comment as

It pulls the next chunk from the source stream and then acquires the semaphore permit, which is blocked until previous chunk is processed from the queue. Hence, it's always reading 1 chunk ahead.

Following mpilquist's advices I created a pull request fixing the issue that was just merged.

like image 30
Some Name Avatar answered Nov 06 '22 12:11

Some Name