Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Flink: Return type of function could not be determined automatically due to type erasure

I wrote a simple program using Flink in Java which takes either a file or a text as an input an then print all words using flatMap function.

This my code :

        final ParameterTool params = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().setGlobalJobParameters(params);
        // show user defined parameters in the apache flink dashboard

        DataStream<String> dataStream;

        if(params.has("input")) 
        {
            System.out.println("Executing Words example with file input");
            dataStream = env.readTextFile(params.get("input"));
        }else if (params.has("host") && params.has("port")) 
        {
            System.out.println("Executing Words example with socket stream");
            dataStream = env.socketTextStream(params.get("host"), Integer.parseInt(params.get("port")));
        }
        else {
            System.exit(1);
            return;
        }

        DataStream<String> wordDataStream = dataStream.flatMap(
                (String sentence, Collector<String> out) -> {
                    for(String word: sentence.split(" "))
                        out.collect(word);
        });

        wordDataStream.print();

        env.execute("Word Split");  

But when I run it using this command :

bin/flink run -c Words FlinkExample-0.0.1-SNAPSHOT.jar --host localhost --port 9999

I get the following error :

The program fails with the following exception:

The return type of function 'main(Words.java:32)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

(Line 32 refers to the declaration of the second DataStream)

like image 631
Gatsby Avatar asked Jun 20 '18 10:06

Gatsby


1 Answers

I think the short description of the error message is quite good, but let me expand it a bit.

In order to execute a program, Flink needs to know the type of the values that are processed because it needs to serialize and deserialize them. Flink's type system is based on TypeInformation which describes a data type. When you specify a function, Flink tries to infer the return type of that function. In case of the FlatMapFunction of your example the type of the objects that are passed to the Collector.

Unfortunately, some Lambda functions lose this information due to type erasure such that Flink cannot automatically infer the type. Therefore, you have to explicitly declare the return type.

You can provide the TypeInformation as follows:

DataStream<String> wordDataStream = dataStream.flatMap(
    (String sentence, Collector<String> out) -> {
        for(String word: sentence.split(" "))
        out.collect(word); // collect objects of type String
    }
).returns(Types.STRING); // declare return type of flatmap lambda function as String
like image 116
Fabian Hueske Avatar answered Nov 04 '22 16:11

Fabian Hueske