Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Efficient grouping by key using mapPartitions or partitioner in Spark

So, I have a data like the following,

[ (1, data1), (1, data2), (2, data3), (1, data4), (2, data5) ]

which I want to convert to the following, for further processing.

[ (1, [data1, data2, data4]), (2, [data3, data5]) ]

I used groupByKey and reduceByKey, but due to really large amount of data it fails. The data is not tall but wide. In other words, keys are from 1 upto 10000, but, value list ranges from 100k to 900k.

I am struggling with this issue and plan to apply mapPartitions or (Hash)partitioner.

So, if one of these may work, I'd like to know

  1. Using mapPartions, could you please give some code snippet?
  2. Using (Hash)partitioner, could you please give some example how to control partitions by some element like key.. e.g. is there a way to create each partition based on key (i.e. 1,2,.. above) with no need to shuffle.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 9 (flatMap at TSUMLR.scala:209) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542)
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
        at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
like image 483
joshsuihn Avatar asked Jan 26 '16 22:01

joshsuihn


People also ask

What is the difference between MAP and mapPartitions in spark?

mapPartitions() – This is precisely the same as map(); the difference being, Spark mapPartitions() provides a facility to do heavy initializations (for example, Database connection) once for each partition instead of doing it on every DataFrame row.

Which method is used to reduce the number of partitions post processing in spark?

Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.

How will you control the number of partitions in spark DataFrame across the application?

repartition() can be used for increasing or decreasing the number of partitions of a Spark DataFrame. However, repartition() involves shuffling which is a costly operation.

What is difference between repartition and coalesce?

The repartition() can be used to increase or decrease the number of partitions, but it involves heavy data shuffling across the cluster. On the other hand, coalesce() can be used only to decrease the number of partitions. In most of the cases, coalesce() does not trigger a shuffle.


1 Answers

None of the proposed method would work. Partitioner by definition have to shuffle the data and will suffer from the same limitations as groupByKey. mapPartitions cannot move data to another partition so it is completely useless. Since your description of the problem is rather vague it is hard to give a specific advice but in general I would try following steps:

  • try to rethink the problem. Do you really need all the values at once? How do you plan to utilize these? Can you obtain the same results without collecting to a single partition?
  • is it possible to reduce the traffic? How many unique values do you expect? Is it possible to compress the data before the shuffle (for example count values or use RLE)?
  • consider using larger executors. Spark has to keep in memory only the values for a single key and can spill processed keys to disk.
  • split your data by key:

    val keys =  rdd.keys.distinct.collect
    val rdds = keys.map(k => rdd.filter(_._1 == k))
    

    and process each RDD separatelly.

like image 138
zero323 Avatar answered Sep 18 '22 12:09

zero323