Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to handle exception properly in flink operator?

Tags:

apache-flink

If I throw runtime exception in flink operator, how it will be handled?

I just want to ignore this exception and continue handle the stream, but I do not know the side effect if I just ignore them. Will this exception stop the whole data stream?

like image 976
xingbin Avatar asked Dec 21 '25 21:12

xingbin


1 Answers

If one of your operators will throw an exception the entire job will fail. It doesn't differ that much from a normal application: If an exception is not handled by anybody the application will fail.

Think of this this way: You throw an exception if you don't know how to handle a certain situation - this somehow indicates whoever called me should take care of this. At least I am not aware of any way how you could tell Flink: Please ignore my exceptions.

My proposal: Handle the exceptions within your operators. This might mean you have to change the type of the operator (or at least the return type).

E.g.

case class MyMapper() extends MapFunction[Double, Double] {
    override def map(in : Double) : Double {
       try {
            1/in
       }
       catch {
            case e : java.lang.ArithmeticException:
                throw new RuntimeException(e)
       }
    }
}

Might become a FlatMap operator just returning nothing. Or you change the return type of this operator to return an Option of Double instead of a Double.

Hope that helps

like image 147
TobiSH Avatar answered Dec 24 '25 07:12

TobiSH