Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does collect() on a DataFrame with 1 row use 2000 exectors?

Tags:

This is the simplest DataFrame I could think of. I'm using PySpark 1.6.1.

# one row of data
rows = [ (1,   2) ]
cols = [ "a", "b" ]
df   = sqlContext.createDataFrame(rows, cols)

So the data frame completely fits in memory, has no references to any files and looks quite trivial to me.

Yet when I collect the data, it uses 2000 executors:

df.collect()

during collect, 2000 executors are used:

[Stage 2:===================================================>(1985 + 15) / 2000]

and then the expected output:

[Row(a=1, b=2)]

Why is this happening? Shouldn't the DataFrame be completely in memory on the driver?

like image 316
Corey Avatar asked Jun 21 '16 00:06

Corey


People also ask

How does Apache spark process data that does not fit into the memory?

Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data. Likewise, cached datasets that do not fit in memory are either spilled to disk or recomputed on the fly when needed, as determined by the RDD's storage level.

What is PySpark?

PySpark is the Python API for Apache Spark, an open source, distributed computing framework and set of libraries for real-time, large-scale data processing. If you're already familiar with Python and libraries such as Pandas, then PySpark is a good language to learn to create more scalable analyses and pipelines.


1 Answers

So I looked into the code a bit to try to figure out what was going on. It seems that sqlContext.createDataFrame really does not make any kind of attempt to set reasonable parameter values based on the data.

Why 2000 tasks?

Spark uses 2000 tasks because my data frame had 2000 partitions. (Even though it seems like clear nonsense to have more partitions than rows.)

This can be seen by:

>>> df.rdd.getNumPartitions()
2000

Why did the DataFrame have 2000 partitions?

This happens because sqlContext.createDataFrame winds up using the default number of partitions (2000 in my case), irrespective of how the data is organized or how many rows it has.

The code trail is as follows.

In sql/context.py, the sqlContext.createDataFrame function calls (in this example):

rdd, schema = self._createFromLocal(data, schema)

which in turn calls:

return self._sc.parallelize(data), schema

And the sqlContext.parallelize function is defined in context.py:

numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism

No check is done on the number of rows, and it is not possible to specify the number of slices from sqlContext.createDataFrame.

How can I change how many partitions the DataFrame has?

Using DataFrame.coalesce.

>>> smdf = df.coalesce(1)
>>> smdf.rdd.getNumPartitions()
1
>>> smdf.explain()
== Physical Plan ==
Coalesce 1
+- Scan ExistingRDD[a#0L,b#1L]
>>> smdf.collect()
[Row(a=1, b=2)]
like image 125
Corey Avatar answered Sep 28 '22 02:09

Corey