Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is collect in SparkR so slow?

I have a 500K row spark DataFrame that lives in a parquet file. I'm using spark 2.0.0 and the SparkR package inside Spark (RStudio and R 3.3.1), all running on a local machine with 4 cores and 8gb of RAM.

To facilitate construction of a dataset I can work on in R, I use the collect() method to bring the spark DataFrame into R. Doing so takes about 3 minutes, which is far longer than it'd take to read an equivalently sized CSV file using the data.table package.

Admittedly, the parquet file is compressed and the time needed for decompression could be part of the issue, but I've found other comments on the internet about the collect method being particularly slow, and little in the way of explanation.

I've tried the same operation in sparklyr, and it's much faster. Unfortunately, sparklyr doesn't have the ability to do date path inside joins and filters as easily as SparkR, and so I'm stuck using SparkR. In addition, I don't believe I can use both packages at the same time (i.e. run queries using SparkR calls, and then access those spark objects using sparklyr).

Does anyone have a similar experience, an explanation for the relative slowness of SparkR's collect() method, and/or any solutions?

like image 489
Wil Van Cleve Avatar asked Sep 19 '16 15:09

Wil Van Cleve


People also ask

What does collect () do in Spark?

PySpark Collect() – Retrieve data from DataFrame Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.

What can I use instead of Spark collect?

Collect action will try to move all data in RDD/DataFrame to the machine with the driver and where it may run out of memory and crash. Instead, you can make sure that the number of items returned is sampled by calling take or takeSample , or perhaps by filtering your RDD/DataFrame.

How do I reduce the GC time on my Spark?

To avoid full GC in G1 GC, there are two commonly-used approaches: Decrease the InitiatingHeapOccupancyPercent value ( default is 45), to let G1 GC starts initial concurrent marking at an earlier time, so that we have higher chances to avoid full GC.


1 Answers

@Will

I don't know whether the following comment actually answers your question or not but Spark does lazy operations. All the transformations done in Spark (or SparkR) doesn't really create any data they just create a logical plan to follow.

When you run Actions like collect, it has to fetch data directly from source RDDs (assuming you haven't cached or persisted data).

If your data is not large enough and can be handled by local R easily then there is no need for going with SparkR. Other solution can be to cache your data for frequent uses.

like image 115
Mohit Bansal Avatar answered Sep 21 '22 17:09

Mohit Bansal