Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

run reduceByKey on huge data in spark

Tags:

apache-spark

I'm running reduceByKey in spark. My program is the simplest example of spark:

val counts = textFile.flatMap(line => line.split(" ")).repartition(20000).
                 .map(word => (word, 1))
                 .reduceByKey(_ + _, 10000)
counts.saveAsTextFile("hdfs://...")

but it always run out of memory...

I 'm using 50 servers , 35 executors per server, 140GB memory per server.

the documents volume is : 8TB documents, 20 billion documents, 1000 billion words in total. and the words after reduce will be about 100 million.

I wonder how to set the configuration of spark?

I wonder what value should these parameters be?

1. the number of the maps ? 20000 for example?
2. the number of the reduces ? 10000 for example?
3. others parameters?
like image 227
user2848932 Avatar asked Jun 30 '15 16:06

user2848932


People also ask

Can we use reduceByKey in spark Dataframe?

Further, the dataframe is parallelized using spark and then using reducebykey() function; it is normalized. The output is displayed. In ReduceByKey implementation, unnecessary data transfer over the network does not happen; it occurs in a controlled way.

Why groupByKey is less efficient than reduceByKey on large datasets because?

The groupByKey can cause out of disk problems as data is sent over the network and collected on the reduced workers. You can see the below example. Whereas in reducebykey, Data are combined at each partition, only one output for one key at each partition to send over the network.

Why is groupByKey better than reduceByKey?

Both reduceByKey and groupByKey result in wide transformations which means both triggers a shuffle operation. The key difference between reduceByKey and groupByKey is that reduceByKey does a map side combine and groupByKey does not do a map side combine.

How many RDDs can Cogroup () can work at once?

cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.


1 Answers

It would be helpful if you posted the logs, but one option would be to specify a larger number of partitions when reading in the initial text file (e.g. sc.textFile(path, 200000)) rather than re-partitioning after reading . Another important thing is to make sure that your input file is splittable (some compression options make it not splittable, and in that case Spark may have to read it on a single machine causing OOMs).

Some other options are, since you aren't caching any of the data, would be reducing the amount of memory Spark is setting aside for caching (controlled with with spark.storage.memoryFraction), also since you are only working with tuples of strings I'd recommend using the org.apache.spark.serializer. KryoSerializer serializer.

like image 131
Holden Avatar answered Oct 19 '22 15:10

Holden