Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is my Spark DataFrame much slower than RDD?

I have a very simple Spark DataFrame, and when running a DataFrame groupby, the performance is terrible - about 8x slower than the (in my head) equivalent RDD reduceByKey...

My cached DF is just two columns, customer and name with only 50k rows:

== Physical Plan ==
InMemoryColumnarTableScan [customer#2454,name#2456], InMemoryRelation [customer#2454,name#2456], true, 10000, StorageLevel(true, true, false, true, 1), Scan ParquetRelation[customer#2454,name#2456] InputPaths: hdfs://nameservice1/tmp/v2_selected_parquet/test_parquet2, None

When I run the following two snippets, I'd expect similar performance, not the rdd version to run in 10s and the DF version in 85s...

rawtempDF2.rdd.map(lambda x: (x['name'], 1)).reduceByKey(lambda x,y: x+y).collect()

rawtempDF2.groupby('name').count().collect()

Am I missing something really fundamental here? FWIW, the RDD version runs 54 stages, and the DF version is 227 :/

Edit: I'm using Spark 1.6.1 and Python 3.4.2. Edit2: Also, the source parquet was partitioned customer/day/name - currently 27 customers, 1 day, c. 45 names.

like image 594
RichD Avatar asked Jan 06 '23 20:01

RichD


1 Answers

Both numbers seem to be relatively high and it is not exactly clear how you create DataFrame or measure the time but in general difference like this can be explained by a low number of records compared to the number of partitions.

Default value for spark.sql.shuffle.partitions is 200 which in the number of tasks you get. With 50K records an overhead of starting a task will be higher than a speedup you can gain from the parallel execution. Let's illustrate that with a simple example. First lets create an example data:

import string
import random

random.seed(323)

def random_string():
  n = random.randint(3, 6)
  return (''.join(random.choice(string.ascii_uppercase) for _ in range(n)), )

df = (sc
    .parallelize([random_string() for _ in range(50000)], 8).toDF(["name"])
    .cache())

And measure the time depending on number of shuffle.partitions:

sqlContext.setConf("spark.sql.shuffle.partitions", "1")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 504 ms per loop

sqlContext.setConf("spark.sql.shuffle.partitions", "1")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 451 ms per loop

sqlContext.setConf("spark.sql.shuffle.partitions", "100")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 624 ms per loop

sqlContext.setConf("spark.sql.shuffle.partitions", "200")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 778 ms per loop

sqlContext.setConf("spark.sql.shuffle.partitions", "1000")
%timeit -n 10  df.groupby('name').count().collect()
## 10 loops, best of 3: 1.75 s per loop

Although these values are not comparable to what you claim and this data has been collected in a local mode you can see that relatively clear pattern. The same applies to RDD:

from operator import add

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1).collect()
## 10 loops, best of 3: 414 ms per loop

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 10).collect()
## 10 loops, best of 3: 439 ms per loop

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 100).collect()
## 10 loops, best of 3: 1.3 s per loop

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1000).collect()
## 10 loops, best of 3: 8.41 s per loop

In a proper distributed environment this will be higher due to cost of the network IO.

Just for comparison lets check how long does it take to execute this task locally without Spark

from collections import Counter

data = df.rdd.flatMap(lambda x: x).collect()

%timeit -n 10 Counter(data)
## 10 loops, best of 3: 9.9 ms per loop

You should also take a look at the data locality. Depending on a storage you use and configuration this can add additional delay to your jobs even with a small input like this.

like image 187
zero323 Avatar answered Jan 08 '23 09:01

zero323