Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to handle errors in custom MapFunction correctly?

Tags:

apache-flink

I have implemented MapFunction for my Apache Flink flow. It is parsing incoming elements and convert them to other format but sometimes error can appear (i.e. incoming data is not valid).

I see two possible ways how to handle it:

  • Ignore invalid elements but seems like I can't ignore errors because for any incoming element I must provide outgoing element.
  • Split incoming elements to valid and invalid but seems like I should use other function for this.

So, I have two questions:

  1. How to handle errors correctly in my MapFunction?
  2. How to implement such transformation functions correctly?
like image 567
Maxim Avatar asked Mar 18 '16 13:03

Maxim


2 Answers

You could use a FlatMapFunction instead of a MapFunction. This would allow you to only emit an element if it is valid. The following shows an example implementation:

input.flatMap(new FlatMapFunction<String, Long>() {
    @Override
    public void flatMap(String input, Collector<Long> collector) throws Exception {
        try {
            Long value = Long.parseLong(input);
            collector.collect(value);
        } catch (NumberFormatException e) {
            // ignore invalid data
        }
    }
});
like image 107
Till Rohrmann Avatar answered Oct 11 '22 13:10

Till Rohrmann


This is to build on @Till Rohrmann's idea above. Adding this as an answer instead of a comment for better formatting.

I think one way to implement "split + select" could be to use a ProcessFunction with a SideOutput. My graph would look something like this:

Source --> ValidateProcessFunction ---good data--> UDF--->SinkToOutput
                                    \
                                     \---bad data----->SinkToErrorChannel

Would this work? Is there a better way?

like image 34
victtim Avatar answered Oct 11 '22 12:10

victtim