I am having a problem with FS2 and exception handling. What I want is that, given a Stream[IO,A], when I map on it using an f: A => B that can throw exception, I obtain a Stream[IO,Either[Throwable,B]].
I tried the following, and it works as expected:
import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt
x1.compile.toVector.unsafeRunSync().foreach(println)
It prints:
Right(1)
Right(4)
Left(java.lang.RuntimeException: I don't like 9s)
However, my problems start when I try to do anything with that Stream.
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt.map(identity)
x1.compile.toVector.unsafeRunSync().foreach(println)
Blows up with the exception and kills the application:
java.lang.RuntimeException: I don't like 9s
at swaps.fm.A$A32$A$A32.$anonfun$x1$2(tmp2.sc:7)
at scala.runtime.java8.JFunction1$mcII$sp.apply(tmp2.sc:8)
...
Even weirder, using take to have the Stream return only the elements that I know to be OK, still blows up in the same way:
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt.take(2)
x1.compile.toVector.unsafeRunSync().foreach(println)
Can anybody clarify why this is happening? Is this a bug or (un)expected behaviour?
N.B. This behaviour is present in FS2 0.10.0-M7 and 0.10.0
The problem here is that to use fs2 you must write pure code. Throwing an exception isn't pure, so if you want a step in your pipeline that can fail, you need to make it explicit. Here's two ways:
import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) Left[Throwable, Int](new RuntimeException("I don't like 9s")) else Right(i)}
x1.compile.toVector.unsafeRunSync().foreach(println)
// Explicit Left annotation is so you can .rethrow if desired; it can be omitted or added later with .widen
OR
import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.flatMap { i => if(i == 9) Stream.raiseError(new RuntimeException("I don't like 9s")) else Stream.emit(i) }
.attempt
x1.compile.toVector.unsafeRunSync().foreach(println)
Of these the first is preferable, because flatMap with emit will result in size-1 chunks which are less efficient generally. If you want to stop processing at the first error, add a .rethrow to the end of the stream.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With