I have a very simple Spark DataFrame, and when running a DataFrame groupby, the performance is terrible - about 8x slower than the (in my head) equivalent RDD reduceByKey...
My cached DF is just two columns, customer and name with only 50k rows:
== Physical Plan ==
InMemoryColumnarTableScan [customer#2454,name#2456], InMemoryRelation [customer#2454,name#2456], true, 10000, StorageLevel(true, true, false, true, 1), Scan ParquetRelation[customer#2454,name#2456] InputPaths: hdfs://nameservice1/tmp/v2_selected_parquet/test_parquet2, None
When I run the following two snippets, I'd expect similar performance, not the rdd version to run in 10s and the DF version in 85s...
rawtempDF2.rdd.map(lambda x: (x['name'], 1)).reduceByKey(lambda x,y: x+y).collect()
rawtempDF2.groupby('name').count().collect()
Am I missing something really fundamental here? FWIW, the RDD version runs 54 stages, and the DF version is 227 :/
Edit: I'm using Spark 1.6.1 and Python 3.4.2. Edit2: Also, the source parquet was partitioned customer/day/name - currently 27 customers, 1 day, c. 45 names.
Both numbers seem to be relatively high and it is not exactly clear how you create DataFrame
or measure the time but in general difference like this can be explained by a low number of records compared to the number of partitions.
Default value for spark.sql.shuffle.partitions
is 200 which in the number of tasks you get. With 50K records an overhead of starting a task will be higher than a speedup you can gain from the parallel execution. Let's illustrate that with a simple example. First lets create an example data:
import string
import random
random.seed(323)
def random_string():
n = random.randint(3, 6)
return (''.join(random.choice(string.ascii_uppercase) for _ in range(n)), )
df = (sc
.parallelize([random_string() for _ in range(50000)], 8).toDF(["name"])
.cache())
And measure the time depending on number of shuffle.partitions
:
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 504 ms per loop
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 451 ms per loop
sqlContext.setConf("spark.sql.shuffle.partitions", "100")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 624 ms per loop
sqlContext.setConf("spark.sql.shuffle.partitions", "200")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 778 ms per loop
sqlContext.setConf("spark.sql.shuffle.partitions", "1000")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 1.75 s per loop
Although these values are not comparable to what you claim and this data has been collected in a local mode you can see that relatively clear pattern. The same applies to RDD:
from operator import add
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1).collect()
## 10 loops, best of 3: 414 ms per loop
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 10).collect()
## 10 loops, best of 3: 439 ms per loop
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 100).collect()
## 10 loops, best of 3: 1.3 s per loop
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1000).collect()
## 10 loops, best of 3: 8.41 s per loop
In a proper distributed environment this will be higher due to cost of the network IO.
Just for comparison lets check how long does it take to execute this task locally without Spark
from collections import Counter
data = df.rdd.flatMap(lambda x: x).collect()
%timeit -n 10 Counter(data)
## 10 loops, best of 3: 9.9 ms per loop
You should also take a look at the data locality. Depending on a storage you use and configuration this can add additional delay to your jobs even with a small input like this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With