Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink Custom Partition Function

I am using Scala on Flink with DataSet API. I want to re-partition my data across the nodes. Spark has a function that lets the user to re-partition the data with a given numberOfPartitions parameter (link) and I believe Flink does not support such function. Thus, I wanted to achieve this by implementing a custom partitioning function.

My data is of type DataSet(Double,SparseVector) An example line from the data:

(1.0 SparseVector((2024,1.0), (2025,1.0), (2030,1.0), (2045,1.0), (2046,1.41), (2063,1.0), (2072,1.0), (3031,1.0), (3032,1.0), (4757,1.0), (4790,1.0), (177196,1.0), (177197,0.301), (177199,1.0), (177202,1.0), (1544177,1.0), (1544178,1.0), (1544179,1.0), (1654031,1.0), (1654190,1.0), (1654191,1.0), (1654192,1.0), (1654193,1.0), (1654194,1.0), (1654212,1.0), (1654237,1.0), (1654238,1.0)))

Since my "Double" is binary (1 or -1), I want to partition my data according to the length of the SparceVector. My custom partitioner is as follows:

class myPartitioner extends Partitioner[SparseVector]
{ 
    override def partition(key: SparseVector, numPartitions: Int): Int = {
         key.size % numPartitions
    } 
}

I call this custom partitioner as follows:

data.partitionCustom(new myPartitioner(),1)

Can somebody please help me to understand how to specify number of partitions as "numPartitions" argument when calling myPartitioner function in Scala.

Thank you.

like image 945
Batuhan Tüter Avatar asked Jan 14 '19 23:01

Batuhan Tüter


3 Answers

In flink you can define setParallelism for a single operator or for all the operators using enviornment.setParallelism. I hope this link will help you.

like image 195
Kashif Rabbani Avatar answered Oct 31 '22 13:10

Kashif Rabbani


Spark uses repartition(n: Int) function to redistribute data into n partitions, which will be processed by n tasks. From my perspective, this includes two changes: data redistribution and number of downstream tasks.

Therefore, in Apache Flink, I think that the Partitioner is mapped to data redistribution and the parallelism is mapped to the number of downstream tasks, which means you can use setParallelism to determine the "numPartitions".

like image 2
Jiayi Liao Avatar answered Oct 31 '22 13:10

Jiayi Liao


I'm assuming you're using the length of the SparseVector just to have something that gives you relatively random values to use for partitioning. If that's true, then you can just do a DataSet.rebalance(). If you follow that by any operator (including a Sink) where you set the parallelism to numPartitions, then you should get nicely repartitioned data.

But your description of ...want to re-partition my data across the nodes makes me think that you're trying to apply Spark's concept of RDDs to Flink, which isn't really valid. E.g. assuming you have numPartition parallel operators processing the (re-partitioned) data in your DataSet, then these operators will be running in slots provided by the available TaskManagers, and these slots might or might not be on different physical servers.

like image 2
kkrugler Avatar answered Oct 31 '22 14:10

kkrugler