Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get the Partitioner in Apache Flink?

we are trying to create an extension for Apache Flink, which uses custom partitioning. For some operators we want to check/retrieve the used partitioner. Unfortunately, I could not find any possibility to do this on a given DataSet. Have I missed something or is there another workaround for this?

I would start with something like this:

class MyPartitioner[..](..) extends Partitioner[..] {..}
[..]
val myP = new MyPartitioner(...)
val ds = in.partitionCustom(myP, 0)

Now from another class I would like to access the partitioner (if defined). In Spark I would do it the following way:

val myP = ds.partitioner.get.asInstanceOf[MyPartitioner]

However, for Flink I could not find a possibility for this.


Edit1:

It seems to be possible with the suggestion of Fabian. However, there are two limitations:

(1) When using Scala you have to retrieve the underlying Java DataSet first to cast it to a PartitionOperator

(2) The partitioning must be the last operation. So one can not use other operations between setting and getting the partitioner. E.g. the following is not possible:

val in: DataSet[(String, Int)] = ???

val myP = new MyPartitioner()
val ds = in.partitionCustom(myP, 0)
val ds2 = ds.map(x => x)

val myP2 = ds2.asInstanceOf[PartitionOperator].getCustomPartitioner

Thank you and best regards, Philipp

like image 915
Phil Go Avatar asked Oct 30 '22 18:10

Phil Go


1 Answers

You can cast the returned DataSet into a PartitionOperator and call PartitionOperator.getCustomPartitioner():

val in: DataSet[(String, Int)] = ???

val myP = new MyPartitioner()
val ds = in.partitionCustom(myP, 0)

val myP2 = ds.asInstanceOf[PartitionOperator].getCustomPartitioner

Note that

  1. getCustomPartitioner() is an internal method (i.e., not part of the public API) and might change in future versions of Flink.
  2. PartitionOperator is also used for other partitioning types, such as DataSet.partitionByHash(). In these cases getCustomPartitioner() might return null.
like image 96
Fabian Hueske Avatar answered Nov 15 '22 06:11

Fabian Hueske