Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark dataframe reduceByKey

I am using Spark 1.5/1.6, where I want to do reduceByKey operation in DataFrame, I don't want to convert the df to rdd.

Each row looks like and I have multiple rows for id1.

id1, id2, score, time

I want to have something like:

id1, [ (id21, score21, time21) , ((id22, score22, time22)) , ((id23, score23, time23)) ]

So, for each "id1", I want all records in a list

By the way, the reason why don't want to convert df to rdd is because I have to join this (reduced) dataframe to another dataframe, and I am doing re-partitioning on the join key, which makes it faster, I guess the same cannot be done with rdd

Any help will be appreciated.

like image 506
user2200660 Avatar asked Mar 16 '26 19:03

user2200660


1 Answers

To simply preserve the partitioning already achieved then re-use the parent RDD partitioner in the reduceByKey invocation:

 val rdd = df.toRdd
 val parentRdd = rdd.dependencies(0) // Assuming first parent has the 
                                     // desired partitioning: adjust as needed
 val parentPartitioner = parentRdd.partitioner
 val optimizedReducedRdd = rdd.reduceByKey(parentPartitioner, reduceFn)

If you were to not specify the partitioner as follows:

 df.toRdd.reduceByKey(reduceFn)  // This is non-optimized: uses full shuffle

then the behavior you noted would occur - i.e. a full shuffle occurs. That is because the HashPartitioner would be used instead.

like image 88
WestCoastProjects Avatar answered Mar 18 '26 12:03

WestCoastProjects



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!