Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

FS2 Stream exception handling not working

I am having a problem with FS2 and exception handling. What I want is that, given a Stream[IO,A], when I map on it using an f: A => B that can throw exception, I obtain a Stream[IO,Either[Throwable,B]].

I tried the following, and it works as expected:

import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
  .attempt
x1.compile.toVector.unsafeRunSync().foreach(println)

It prints:

Right(1)
Right(4)
Left(java.lang.RuntimeException: I don't like 9s)

However, my problems start when I try to do anything with that Stream.

val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
  .attempt.map(identity)

x1.compile.toVector.unsafeRunSync().foreach(println)

Blows up with the exception and kills the application:

java.lang.RuntimeException: I don't like 9s
    at swaps.fm.A$A32$A$A32.$anonfun$x1$2(tmp2.sc:7)
    at scala.runtime.java8.JFunction1$mcII$sp.apply(tmp2.sc:8)
    ...

Even weirder, using take to have the Stream return only the elements that I know to be OK, still blows up in the same way:

val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
  .attempt.take(2)

x1.compile.toVector.unsafeRunSync().foreach(println)

Can anybody clarify why this is happening? Is this a bug or (un)expected behaviour?

N.B. This behaviour is present in FS2 0.10.0-M7 and 0.10.0

like image 587
mdm Avatar asked Feb 14 '26 09:02

mdm


1 Answers

The problem here is that to use fs2 you must write pure code. Throwing an exception isn't pure, so if you want a step in your pipeline that can fail, you need to make it explicit. Here's two ways:

import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) Left[Throwable, Int](new RuntimeException("I don't like 9s")) else Right(i)}
x1.compile.toVector.unsafeRunSync().foreach(println)
// Explicit Left annotation is so you can .rethrow if desired; it can be omitted or added later with .widen

OR

import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .flatMap { i => if(i == 9) Stream.raiseError(new RuntimeException("I don't like 9s")) else Stream.emit(i) }
  .attempt
x1.compile.toVector.unsafeRunSync().foreach(println)

Of these the first is preferable, because flatMap with emit will result in size-1 chunks which are less efficient generally. If you want to stop processing at the first error, add a .rethrow to the end of the stream.

like image 111
Daenyth Avatar answered Feb 16 '26 01:02

Daenyth



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!