I wrote a simple program using Flink in Java which takes either a file or a text as an input an then print all words using flatMap function.
This my code :
final ParameterTool params = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
// show user defined parameters in the apache flink dashboard
DataStream<String> dataStream;
if(params.has("input"))
{
System.out.println("Executing Words example with file input");
dataStream = env.readTextFile(params.get("input"));
}else if (params.has("host") && params.has("port"))
{
System.out.println("Executing Words example with socket stream");
dataStream = env.socketTextStream(params.get("host"), Integer.parseInt(params.get("port")));
}
else {
System.exit(1);
return;
}
DataStream<String> wordDataStream = dataStream.flatMap(
(String sentence, Collector<String> out) -> {
for(String word: sentence.split(" "))
out.collect(word);
});
wordDataStream.print();
env.execute("Word Split");
But when I run it using this command :
bin/flink run -c Words FlinkExample-0.0.1-SNAPSHOT.jar --host localhost --port 9999
I get the following error :
The program fails with the following exception:
The return type of function 'main(Words.java:32)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
(Line 32 refers to the declaration of the second DataStream)
I think the short description of the error message is quite good, but let me expand it a bit.
In order to execute a program, Flink needs to know the type of the values that are processed because it needs to serialize and deserialize them. Flink's type system is based on TypeInformation
which describes a data type. When you specify a function, Flink tries to infer the return type of that function. In case of the FlatMapFunction of your example the type of the objects that are passed to the Collector
.
Unfortunately, some Lambda functions lose this information due to type erasure such that Flink cannot automatically infer the type. Therefore, you have to explicitly declare the return type.
You can provide the TypeInformation as follows:
DataStream<String> wordDataStream = dataStream.flatMap(
(String sentence, Collector<String> out) -> {
for(String word: sentence.split(" "))
out.collect(word); // collect objects of type String
}
).returns(Types.STRING); // declare return type of flatmap lambda function as String
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With