I am using Flink and Java8. When using lambda functions with Tuples and generic types, my compiler ends up with an exception
/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/bin/java -Didea.launcher.port=7536 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath "/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/tools.jar:/Users/hasan.guercan/Git/flink-java-project/target/classes:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-java/1.0.3/flink-java-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-core/1.0.3/flink-core-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-annotations/1.0.3/flink-annotations-1.0.3.jar:/Users/hasan.guercan/.m2/repository/com/esotericsoftware/kryo/kryo/2.24.0/kryo-2.24.0.jar:/Users/hasan.guercan/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/Users/hasan.guercan/.m2/repository/org/objenesis/objenesis/2.1/objenesis-2.1.jar:/Users/hasan.guercan/.m2/repository/org/apache/avro/avro/1.7.6/avro-1.7.6.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-shaded-hadoop2/1.0.3/flink-shaded-hadoop2-1.0.3.jar:/Users/hasan.guercan/.m2/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar:/Users/hasan.guercan/.m2/repository/commons-codec/commons-codec/1.4/commons-codec-1.4.jar:/Users/hasan.guercan/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/Users/hasan.guercan/.m2/repository/commons-net/commons-net/3.1/commons-net-3.1.jar:/Users/hasan.guercan/.m2/repository/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar:/Users/hasan.guercan/.m2/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/hasan.guercan/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/hasan.guercan/.m2/repository/com/sun/jersey/jersey-core/1.9/jersey-core-1.9.jar:/Users/hasan.guercan/.m2/repository/commons-el/commons-el/1.0/commons-el-1.0.jar:/Users/hasan.guercan/.m2/repository/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar:/Users/hasan.guercan/.m2/repository/com/jamesmurty/utils/java-xmlbuilder/0.4/java-xmlbuilder-0.4.jar:/Users/hasan.guercan/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar:/Users/hasan.guercan/.m2/repository/commons-configuration/commons-configuration/1.7/commons-configuration-1.7.jar:/Users/hasan.guercan/.m2/repository/commons-digester/commons-digester/1.8.1/commons-digester-1.8.1.jar:/Users/hasan.guercan/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.8.8/jackson-core-asl-1.8.8.jar:/Users/hasan.guercan/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.8.8/jackson-mapper-asl-1.8.8.jar:/Users/hasan.guercan/.m2/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar:/Users/hasan.guercan/.m2/repository/org/xerial/snappy/snappy-java/1.0.5/snappy-java-1.0.5.jar:/Users/hasan.guercan/.m2/repository/com/jcraft/jsch/0.1.42/jsch-0.1.42.jar:/Users/hasan.guercan/.m2/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar:/Users/hasan.guercan/.m2/repository/io/netty/netty/3.7.0.Final/netty-3.7.0.Final.jar:/Users/hasan.guercan/.m2/repository/org/apache/commons/commons-compress/1.4.1/commons-compress-1.4.1.jar:/Users/hasan.guercan/.m2/repository/org/tukaani/xz/1.0/xz-1.0.jar:/Users/hasan.guercan/.m2/repository/commons-beanutils/commons-beanutils-bean-collections/1.8.3/commons-beanutils-bean-collections-1.8.3.jar:/Users/hasan.guercan/.m2/repository/commons-daemon/commons-daemon/1.0.13/commons-daemon-1.0.13.jar:/Users/hasan.guercan/.m2/repository/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar:/Users/hasan.guercan/.m2/repository/javax/xml/stream/stax-api/1.0-2/stax-api-1.0-2.jar:/Users/hasan.guercan/.m2/repository/javax/activation/activation/1.1/activation-1.1.jar:/Users/hasan.guercan/.m2/repository/com/google/inject/guice/3.0/guice-3.0.jar:/Users/hasan.guercan/.m2/repository/javax/inject/javax.inject/1/javax.inject-1.jar:/Users/hasan.guercan/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar:/Users/hasan.guercan/.m2/repository/org/apache/commons/commons-math3/3.5/commons-math3-3.5.jar:/Users/hasan.guercan/.m2/repository/org/slf4j/slf4j-api/1.7.7/slf4j-api-1.7.7.jar:/Users/hasan.guercan/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar:/Users/hasan.guercan/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/force-shading/1.0.3/force-shading-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-streaming-java_2.10/1.0.3/flink-streaming-java_2.10-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-runtime_2.10/1.0.3/flink-runtime_2.10-1.0.3.jar:/Users/hasan.guercan/.m2/repository/io/netty/netty-all/4.0.27.Final/netty-all-4.0.27.Final.jar:/Users/hasan.guercan/.m2/repository/org/javassist/javassist/3.18.2-GA/javassist-3.18.2-GA.jar:/Users/hasan.guercan/.m2/repository/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar:/Users/hasan.guercan/.m2/repository/com/typesafe/akka/akka-actor_2.10/2.3.7/akka-actor_2.10-2.3.7.jar:/Users/hasan.guercan/.m2/repository/com/typesafe/config/1.2.1/config-1.2.1.jar:/Users/hasan.guercan/.m2/repository/com/typesafe/akka/akka-remote_2.10/2.3.7/akka-remote_2.10-2.3.7.jar:/Users/hasan.guercan/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar:/Users/hasan.guercan/.m2/repository/org/uncommons/maths/uncommons-maths/1.2.2a/uncommons-maths-1.2.2a.jar:/Users/hasan.guercan/.m2/repository/com/typesafe/akka/akka-slf4j_2.10/2.3.7/akka-slf4j_2.10-2.3.7.jar:/Users/hasan.guercan/.m2/repository/org/clapper/grizzled-slf4j_2.10/1.0.2/grizzled-slf4j_2.10-1.0.2.jar:/Users/hasan.guercan/.m2/repository/com/github/scopt/scopt_2.10/3.2.0/scopt_2.10-3.2.0.jar:/Users/hasan.guercan/.m2/repository/io/dropwizard/metrics/metrics-core/3.1.0/metrics-core-3.1.0.jar:/Users/hasan.guercan/.m2/repository/io/dropwizard/metrics/metrics-jvm/3.1.0/metrics-jvm-3.1.0.jar:/Users/hasan.guercan/.m2/repository/io/dropwizard/metrics/metrics-json/3.1.0/metrics-json-3.1.0.jar:/Users/hasan.guercan/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.4.2/jackson-databind-2.4.2.jar:/Users/hasan.guercan/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.4.0/jackson-annotations-2.4.0.jar:/Users/hasan.guercan/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.4.2/jackson-core-2.4.2.jar:/Users/hasan.guercan/.m2/repository/com/twitter/chill_2.10/0.7.4/chill_2.10-0.7.4.jar:/Users/hasan.guercan/.m2/repository/com/twitter/chill-java/0.7.4/chill-java-0.7.4.jar:/Users/hasan.guercan/.m2/repository/org/apache/commons/commons-math/2.2/commons-math-2.2.jar:/Users/hasan.guercan/.m2/repository/org/apache/sling/org.apache.sling.commons.json/2.0.6/org.apache.sling.commons.json-2.0.6.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-clients_2.10/1.0.3/flink-clients_2.10-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-optimizer_2.10/1.0.3/flink-optimizer_2.10-1.0.3.jar:/Users/hasan.guercan/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/Users/hasan.guercan/.m2/repository/org/apache/commons/commons-lang3/3.0.1/commons-lang3-3.0.1.jar:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar" com.intellij.rt.execution.application.AppMain org.apache.flink.quickstart.exercise2.ReplyGraph
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'retrieve(ReplyGraph.java:33)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
at org.apache.flink.quickstart.exercise2.ReplyGraph.retrieve(ReplyGraph.java:41)
at org.apache.flink.quickstart.exercise2.ReplyGraph.main(ReplyGraph.java:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple3' are missing.
It seems that your compiler has not stored them into the .class file.
Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely.
See the documentation for more information about how to compile jobs containing lambda expressions.
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1316)
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:1302)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:346)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:304)
at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:119)
at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
at org.apache.flink.quickstart.exercise2.ReplyGraph.retrieve(ReplyGraph.java:33)
... 6 more
So i have to create at least an anonymous class to solve the problem. First code snippet represents code that leads to described exception:
DataSet<MailEntry> filteredUserReplyMails = replyMails.filter(entryTuple -> {
String sender = entryTuple.getField(1).toString();
return !sender.contains("git@") && !sender.contains("jira@");
}).map((entry -> {
MailEntry mailEntry = new MailEntry();
mailEntry.messageId = entry.f0.replaceAll("<", "").replaceAll(">", "");
mailEntry.sender = entry.f1;
mailEntry.replyTo = entry.f2;
return mailEntry;
});
Next one is working when creating an anonymous class:
DataSet<MailEntry> filteredUserReplyMails = replyMails.filter(entryTuple -> {
String sender = entryTuple.getField(1).toString();
return !sender.contains("git@") && !sender.contains("jira@");
}).map(new MapFunction<Tuple3<String, String, String>, MailEntry>() {
@Override
public MailEntry map(Tuple3<String, String, String> entry) throws Exception {
MailEntry mailEntry = new MailEntry();
mailEntry.messageId = entry.f0.replaceAll("<", "").replaceAll(">", "");
mailEntry.sender = entry.f1;
mailEntry.replyTo = entry.f2;
return mailEntry;
}
});
Javas lambda function is very neat. How can I solve this problem without creating an anonymous class?
Try to use returns method after map:
DataSet<MailEntry> filteredUserReplyMails = replyMails.filter(entryTuple -> {
String sender = entryTuple.getField(1).toString();
return !sender.contains("git@") && !sender.contains("jira@");
}).map(entry -> {
MailEntry mailEntry = new MailEntry();
mailEntry.messageId = entry.f0.replaceAll("<", "").replaceAll(">", "");
mailEntry.sender = entry.f1;
mailEntry.replyTo = entry.f2;
return mailEntry;
}).returns(MailEntry.class);
If you want to use Lambda expression in Flink you can't use the javac compiler, see https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/java8.html
Currently, Flink only supports jobs containing Lambda Expressions completely if they are compiled with the Eclipse JDT compiler contained in Eclipse Luna 4.4.2 (and above).
In order to make it work, you need to compile it using the Eclipse compiler. You can do it following these steps:
Modify the "pom.xml" file, uncommenting the following lines (or adding them in the section if they aren't present)
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source> <!-- ${java.version} -->
<target>1.8</target>
<compilerId>jdt</compilerId>
</configuration>
<dependencies>
<dependency>
<groupId>org.eclipse.tycho</groupId>
<artifactId>tycho-compiler-jdt</artifactId>
<version>0.21.0</version>
</dependency>
</dependencies>
</plugin>
This will ensure you use the JDT compiler instead of the javac
Modify the running configuration you use to launch the project. Go to "Edit Configurations" (see the following picture), and then remove all the "Before launch" tasks (selecting them and then clicking on the '-' button) and add a new "Run Maven Goal" target (using the '+' button), setting "compile" in the "command line" field (see the following picture).
Now clicking on the run button you will be able to run Flink projects
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With