I am using Scala on Flink with DataSet API. I want to re-partition my data across the nodes. Spark has a function that lets the user to re-partition the data with a given numberOfPartitions parameter (link) and I believe Flink does not support such function. Thus, I wanted to achieve this by implementing a custom partitioning function.
My data is of type DataSet(Double,SparseVector) An example line from the data:
(1.0 SparseVector((2024,1.0), (2025,1.0), (2030,1.0), (2045,1.0), (2046,1.41), (2063,1.0), (2072,1.0), (3031,1.0), (3032,1.0), (4757,1.0), (4790,1.0), (177196,1.0), (177197,0.301), (177199,1.0), (177202,1.0), (1544177,1.0), (1544178,1.0), (1544179,1.0), (1654031,1.0), (1654190,1.0), (1654191,1.0), (1654192,1.0), (1654193,1.0), (1654194,1.0), (1654212,1.0), (1654237,1.0), (1654238,1.0)))
Since my "Double" is binary (1 or -1), I want to partition my data according to the length of the SparceVector. My custom partitioner is as follows:
class myPartitioner extends Partitioner[SparseVector]
{
override def partition(key: SparseVector, numPartitions: Int): Int = {
key.size % numPartitions
}
}
I call this custom partitioner as follows:
data.partitionCustom(new myPartitioner(),1)
Can somebody please help me to understand how to specify number of partitions as "numPartitions" argument when calling myPartitioner function in Scala.
Thank you.
In flink you can define setParallelism
for a single operator or for all the operators using enviornment.setParallelism
. I hope this link will help you.
Spark uses repartition(n: Int) function to redistribute data into n partitions, which will be processed by n tasks. From my perspective, this includes two changes: data redistribution and number of downstream tasks.
Therefore, in Apache Flink, I think that the Partitioner is mapped to data redistribution and the parallelism is mapped to the number of downstream tasks, which means you can use setParallelism to determine the "numPartitions".
I'm assuming you're using the length of the SparseVector
just to have something that gives you relatively random values to use for partitioning. If that's true, then you can just do a DataSet.rebalance()
. If you follow that by any operator (including a Sink
) where you set the parallelism to numPartitions
, then you should get nicely repartitioned data.
But your description of ...want to re-partition my data across the nodes
makes me think that you're trying to apply Spark's concept of RDD
s to Flink, which isn't really valid. E.g. assuming you have numPartition
parallel operators processing the (re-partitioned) data in your DataSet, then these operators will be running in slots provided by the available TaskManagers, and these slots might or might not be on different physical servers.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With