we are trying to create an extension for Apache Flink, which uses custom partitioning. For some operators we want to check/retrieve the used partitioner. Unfortunately, I could not find any possibility to do this on a given DataSet. Have I missed something or is there another workaround for this?
I would start with something like this:
class MyPartitioner[..](..) extends Partitioner[..] {..}
[..]
val myP = new MyPartitioner(...)
val ds = in.partitionCustom(myP, 0)
Now from another class I would like to access the partitioner (if defined). In Spark I would do it the following way:
val myP = ds.partitioner.get.asInstanceOf[MyPartitioner]
However, for Flink I could not find a possibility for this.
Edit1:
It seems to be possible with the suggestion of Fabian. However, there are two limitations:
(1) When using Scala you have to retrieve the underlying Java DataSet first to cast it to a PartitionOperator
(2) The partitioning must be the last operation. So one can not use other operations between setting and getting the partitioner. E.g. the following is not possible:
val in: DataSet[(String, Int)] = ???
val myP = new MyPartitioner()
val ds = in.partitionCustom(myP, 0)
val ds2 = ds.map(x => x)
val myP2 = ds2.asInstanceOf[PartitionOperator].getCustomPartitioner
Thank you and best regards, Philipp
You can cast the returned DataSet
into a PartitionOperator
and call PartitionOperator.getCustomPartitioner()
:
val in: DataSet[(String, Int)] = ???
val myP = new MyPartitioner()
val ds = in.partitionCustom(myP, 0)
val myP2 = ds.asInstanceOf[PartitionOperator].getCustomPartitioner
Note that
getCustomPartitioner()
is an internal method (i.e., not part of the public API) and might change in future versions of Flink.PartitionOperator
is also used for other partitioning types, such as DataSet.partitionByHash()
. In these cases getCustomPartitioner()
might return null
.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With