Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink error: Specifying keys via field positions is only valid for tuple data types

I am using the Scala API of Flink. I have some transformations over a reports = DataStream[Tuple15] (the Tuple15 is a Scala Tuple and all the fields are Int). The issue is located here:

reports
  .filter(_._1 == 0) // some filter
  .map( x => (x._3, x._4, x._5, x._7, x._8))
      (TypeInformation.of(classOf[(Int,Int,Int,Int,Int)])) // keep only 5 fields as a Tuple5
  .keyBy(2,3,4) // the error is in apply, but I think related to this somehow
  .timeWindow(Time.minutes(5), Time.minutes(1))
  // the line under is line 107, where the error is
  .apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
       ... 
  })

The error states:

InvalidProgramException: Specifying keys via field positions is only valid for 
tuple data types. Type: GenericType<scala.Tuple5>

Whole error trace (I marked the line pointing to the error, line 107, corresponding to the apply method on the code above):

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: GenericType<scala.Tuple5>
    at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:217)
    at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:208)
    at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:256)
    at org.apache.flink.streaming.api.scala.DataStream.keyBy(DataStream.scala:289)
here -> at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.latestAverageVelocity(LinearRoad.scala:107)
    at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.main(LinearRoad.scala:46)
    at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad.main(LinearRoad.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

But this doesn't make sense to me. I am using a tuple type, am I not? Or what is the deal with the GenericType<...>?

And how should I fix the map to make the keyBy work?

like image 477
houcros Avatar asked Mar 12 '23 19:03

houcros


2 Answers

The reason is that the TypeInformation belongs to the Java API and, thus, does not know the Scala tuples. Therefore, it returns a GenericType which cannot be used as the input for a keyBy operation with field positions.

If you want to generate the Scala tuple type information manually, you have to uses the createTypeInformation method which is contained in the org.apache.flink.api.scala/org.apache.flink.streaming.api.scala package object.

But if you import the package object, then there is no need to specify the type information manually, since the TypeInformation is a context bound of the map operation and createTypeInformation is an implicit function.

The following code snippet shows the idiomatic way to deal with TypeInformations.

import org.apache.flink.streaming.api.scala._

reports
  .filter(_._1 == 0) // some filter
  .map( x => (x._3, x._4, x._5, x._7, x._8))
  .keyBy(2,3,4) // the error is in apply, but I think related to this somehow
  .timeWindow(Time.minutes(5), Time.minutes(1))
  // the line under is line 107, where the error is
  .apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
       ... 
  })
like image 189
Till Rohrmann Avatar answered Mar 15 '23 03:03

Till Rohrmann


I also encountered the same problem and able to fix it as follows:

Use Tuple2 class from Flink API i.e., [import org.apache.flink.api.java.tuple.Tuple15] instead of scala.Tuple15

Please see your import section and correct it.

Here I used Flink Java API. In case of Scala, import org.apache.flink.api.scala._ package

[Apache Flink]

like image 27
Naga Avatar answered Mar 15 '23 03:03

Naga