Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use flink fold function in scala

This is a non working try for using Flink fold with scala anonymous function:

val myFoldFunction = (x: Double, t:(Double,String,String)) => x + t._1
env.readFileStream(...).
...
.groupBy(1)
.fold(0.0, myFoldFunction : Function2[Double, (Double,String,String), Double])

It compiles well, but at execution, I get a "type erasure issue" (see below). Doing so in Java is fine, but of course more verbose. I like the concise and clear lambdas. How can I do that in scala?

Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'R' in 'public org.apache.flink.streaming.api.scala.DataStream org.apache.flink.streaming.api.scala.DataStream.fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation,scala.reflect.ClassTag)' could not be determined. 
This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).
like image 681
sthiers Avatar asked Oct 19 '22 03:10

sthiers


1 Answers

The problem you encountered is a bug in Flink [1]. The problem originates from Flink's TypeExtractor and the way the Scala DataStream API is implemented on top of the Java implementation. The TypeExtractor cannot generate a TypeInformation for the Scala type and thus returns a MissingTypeInformation. This missing type information is manually set after creating the StreamFold operator. However, the StreamFold operator is implemented in a way that it does not accept a MissingTypeInformation and, consequently, fails before setting the right type information.

I've opened a pull request [2] to fix this problem. It should be merged within the next two days. By using then the latest 0.10 snapshot version, your problem should be fixed.

  • [1] https://issues.apache.org/jira/browse/FLINK-2631
  • [2] https://github.com/apache/flink/pull/1101
like image 199
Till Rohrmann Avatar answered Oct 22 '22 00:10

Till Rohrmann