I'm running reduceByKey in spark. My program is the simplest example of spark:
val counts = textFile.flatMap(line => line.split(" ")).repartition(20000).
.map(word => (word, 1))
.reduceByKey(_ + _, 10000)
counts.saveAsTextFile("hdfs://...")
but it always run out of memory...
I 'm using 50 servers , 35 executors per server, 140GB memory per server.
the documents volume is : 8TB documents, 20 billion documents, 1000 billion words in total. and the words after reduce will be about 100 million.
I wonder how to set the configuration of spark?
I wonder what value should these parameters be?
1. the number of the maps ? 20000 for example?
2. the number of the reduces ? 10000 for example?
3. others parameters?
Further, the dataframe is parallelized using spark and then using reducebykey() function; it is normalized. The output is displayed. In ReduceByKey implementation, unnecessary data transfer over the network does not happen; it occurs in a controlled way.
The groupByKey can cause out of disk problems as data is sent over the network and collected on the reduced workers. You can see the below example. Whereas in reducebykey, Data are combined at each partition, only one output for one key at each partition to send over the network.
Both reduceByKey and groupByKey result in wide transformations which means both triggers a shuffle operation. The key difference between reduceByKey and groupByKey is that reduceByKey does a map side combine and groupByKey does not do a map side combine.
cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.
It would be helpful if you posted the logs, but one option would be to specify a larger number of partitions when reading in the initial text file (e.g. sc.textFile(path, 200000)
) rather than re-partitioning after reading . Another important thing is to make sure that your input file is splittable (some compression options make it not splittable, and in that case Spark may have to read it on a single machine causing OOMs).
Some other options are, since you aren't caching any of the data, would be reducing the amount of memory Spark is setting aside for caching (controlled with with spark.storage.memoryFraction
), also since you are only working with tuples of strings I'd recommend using the org.apache.spark.serializer.
KryoSerializer
serializer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With