I've got big RDD(1gb) in yarn cluster. On local machine, which use this cluster I have only 512 mb. I'd like to iterate over values in RDD on my local machine. I can't use collect(), because it would create too big array locally which more then my heap. I need some iterative way. There is method iterator(), but it requires some additional information, I can't provide.
UDP: commited toLocalIterator method
RDD is slower than both Dataframes and Datasets to perform simple operations like grouping the data. It provides an easy API to perform aggregation operations. It performs aggregation faster than both RDDs and Datasets. Dataset is faster than RDDs but a bit slower than Dataframes.
RDD – RDD API is slower to perform simple grouping and aggregation operations. DataFrame – DataFrame API is very easy to use. It is faster for exploratory analysis, creating aggregated statistics on large data sets. DataSet – In Dataset it is faster to perform aggregation operation on plenty of data sets.
You can save the RDD using saveAsObjectFile and saveAsTextFile method. Whereas you can read the RDD using textFile and sequenceFile function from SparkContext.
In terms of data size, Spark has been shown to work well up to petabytes. It has been used to sort 100 TB of data 3X faster than Hadoop MapReduce on 1/10th of the machines, winning the 2014 Daytona GraySort Benchmark, as well as to sort 1 PB.
Update: RDD.toLocalIterator
method that appeared after the original answer has been written is a more efficient way to do the job. It uses runJob
to evaluate only a single partition on each step.
TL;DR And the original answer might give a rough idea how it works:
First of all, get the array of partition indexes:
val parts = rdd.partitions
Then create smaller rdds filtering out everything but a single partition. Collect the data from smaller rdds and iterate over values of a single partition:
for (p <- parts) { val idx = p.index val partRdd = rdd.mapPartitionsWithIndex(a => if (a._1 == idx) a._2 else Iterator(), true) //The second argument is true to avoid rdd reshuffling val data = partRdd.collect //data contains all values from a single partition //in the form of array //Now you can do with the data whatever you want: iterate, save to a file, etc. }
I didn't try this code, but it should work. Please write a comment if it won't compile. Of cause, it will work only if the partitions are small enough. If they aren't, you can always increase the number of partitions with rdd.coalesce(numParts, true)
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With