Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Best practice for retrieving big data from RDD to local machine

Tags:

apache-spark

I've got big RDD(1gb) in yarn cluster. On local machine, which use this cluster I have only 512 mb. I'd like to iterate over values in RDD on my local machine. I can't use collect(), because it would create too big array locally which more then my heap. I need some iterative way. There is method iterator(), but it requires some additional information, I can't provide.

UDP: commited toLocalIterator method

like image 674
epahomov Avatar asked Feb 11 '14 09:02

epahomov


People also ask

When RDD is faster than DataFrame?

RDD is slower than both Dataframes and Datasets to perform simple operations like grouping the data. It provides an easy API to perform aggregation operations. It performs aggregation faster than both RDDs and Datasets. Dataset is faster than RDDs but a bit slower than Dataframes.

Why DataFrame is faster than RDD in Spark?

RDD – RDD API is slower to perform simple grouping and aggregation operations. DataFrame – DataFrame API is very easy to use. It is faster for exploratory analysis, creating aggregated statistics on large data sets. DataSet – In Dataset it is faster to perform aggregation operation on plenty of data sets.

Which method will help to save RDD in Spark?

You can save the RDD using saveAsObjectFile and saveAsTextFile method. Whereas you can read the RDD using textFile and sequenceFile function from SparkContext.

How large data can Spark handle?

In terms of data size, Spark has been shown to work well up to petabytes. It has been used to sort 100 TB of data 3X faster than Hadoop MapReduce on 1/10th of the machines, winning the 2014 Daytona GraySort Benchmark, as well as to sort 1 PB.


1 Answers

Update: RDD.toLocalIterator method that appeared after the original answer has been written is a more efficient way to do the job. It uses runJob to evaluate only a single partition on each step.

TL;DR And the original answer might give a rough idea how it works:

First of all, get the array of partition indexes:

val parts = rdd.partitions 

Then create smaller rdds filtering out everything but a single partition. Collect the data from smaller rdds and iterate over values of a single partition:

for (p <- parts) {     val idx = p.index     val partRdd = rdd.mapPartitionsWithIndex(a => if (a._1 == idx) a._2 else Iterator(), true)     //The second argument is true to avoid rdd reshuffling     val data = partRdd.collect //data contains all values from a single partition                                 //in the form of array     //Now you can do with the data whatever you want: iterate, save to a file, etc. } 

I didn't try this code, but it should work. Please write a comment if it won't compile. Of cause, it will work only if the partitions are small enough. If they aren't, you can always increase the number of partitions with rdd.coalesce(numParts, true).

like image 115
Wildfire Avatar answered Oct 24 '22 00:10

Wildfire