I have a flow graph with broadcast and zip inside. If something (regardless what is it) fails inside this flow, I'd like to drop the problematic element passed to it and resume. I came up with the following solution:
val flow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val dangerousFlow = Flow[Int].map {
case 5 => throw new RuntimeException("BOOM!")
case x => x
}
val safeFlow = Flow[Int]
val bcast = builder.add(Broadcast[Int](2))
val zip = builder.add(Zip[Int, Int])
bcast ~> dangerousFlow ~> zip.in0
bcast ~> safeFlow ~> zip.in1
FlowShape(bcast.in, zip.out)
})
Source(1 to 9)
.via(flow)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.foreach(println))
I'd expect it to print:
(1,1)
(2,2)
(3,3)
(4,4)
(5,5)
(6,6)
(7,7)
(8,8)
(9,9)
However, it deadlocks, printing only:
(1,1)
(2,2)
(3,3)
(4,4)
We've done some debugging, and it turns out it applied the "resume" strategy to its children, which caused dangerousFlow
to resume after failure and thus to demand an element from bcast
. bcast
won't emit an element until safeFlow
demands another element, which actually never happens (because it's waiting for demand from zip
).
Is there a way to resume the graph regardless of what went wrong inside one of the stages?
I think you understood the problem well. You saw that, when your element 5
crashes dangerousFlow
, you should also stop the element 5
that is going through safeFlow
because if it reaches the zip
stage, you have the problem you describe. I don't know how to solve your problem between the broadcast
and zip
stages, but what about pushing the problem further, where it is easier to handle?
Consider using the following dangerousFlow
:
import scala.util._
val dangerousFlow = Flow[Int].map {
case 5 => Failure(new RuntimeException("BOOM!"))
case x => Success(x)
}
Even in case of problem, dangerousFlow
would still emit data. You can then zip
as you are currently doing and would just need to add a collect
stage as last step of your graph. On a flow, this would look like:
Flow[(Try[Int], Int)].collect {
case (Success(s), i) => s -> i
}
Now if, as you wrote, you really expect it to output the (5, 5)
tuple, use the following:
Flow[(Try[Int], Int)].collect {
case (Success(s), i) => s -> i
case (_, i) => i -> i
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With