Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does df.limit keep changing in Pyspark?

I'm creating a data sample from some dataframe df with

rdd = df.limit(10000).rdd

This operation takes quite some time (why actually? can it not short-cut after 10000 rows?), so I assume I have a new RDD now.

However, when I now work on rdd, it is different rows every time I access it. As if it resamples over again. Caching the RDD helps a bit, but surely that's not save?

What is the reason behind it?

Update: Here is a reproduction on Spark 1.5.2

from operator import add
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],100)
rdd1=rdd.toDF().limit(1000).rdd
for _ in range(3):
    print(rdd1.map(lambda row:row.i).reduce(add))

The output is

499500
19955500
49651500

I'm surprised that .rdd doesn't fix the data.

EDIT: To show that it get's more tricky than the re-execution issue, here is a single action which produces incorrect results on Spark 2.0.0.2.5.0

from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],200)
rdd1=rdd.toDF().limit(12345).rdd
rdd2=rdd1.map(lambda x:(x,x))
rdd2.join(rdd2).count()
# result is 10240 despite doing a self-join

Basically, whenever you use limit your results might be potentially wrong. I don't mean "just one of many samples", but really incorrect (since in the case the result should always be 12345).

like image 945
Gerenuk Avatar asked May 10 '16 19:05

Gerenuk


People also ask

What does limit do in PySpark?

In PySpark, limit() is a DataFrame transformation that returns a DataFrame with top N rows, for Spark with Scala/Java it returns a Dataset.

How do you show top 10 rows in PySpark?

PySpark – head() It is used to display the top rows or the entire dataframe.

Is PySpark DataFrame mutable?

In other words, the dataframe is mutable and provides great flexibility to work with. While Pyspark derives its basic data types from Python, its own data structures are limited to RDD, Dataframes, Graphframes.

What does .collect do in PySpark?

Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.


3 Answers

Because Spark is distributed, in general it's not safe to assume deterministic results. Your example is taking the "first" 10,000 rows of a DataFrame. Here, there's ambiguity (and hence non-determinism) in what "first" means. That will depend on the internals of Spark. For example, it could be the first partition that responds to the driver. That partition could change with networking, data locality, etc.

Even once you cache the data, I still wouldn't rely on getting the same data back every time, though I certainly would expect it to be more consistent than reading from disk.

like image 186
santon Avatar answered Oct 09 '22 04:10

santon


Spark is lazy, so each action you take recalculates the data returned by limit(). If the underlying data is split across multiple partitions, then every time you evaluate it, limit might be pulling from a different partition (i.e. if your data is stored across 10 Parquet files, the first limit call might pull from file 1, the second from file 7, and so on).

like image 7
alexgbelov Avatar answered Oct 09 '22 05:10

alexgbelov


From the Spark docs:

The LIMIT clause is used to constrain the number of rows returned by the SELECT statement. In general, this clause is used in conjunction with ORDER BY to ensure that the results are deterministic.

So you need to sort the rows beforehand if you want the call to .limit() to be deterministic. But there is a catch! If you sort by a column that doesn't have unique values for every row, the so called "tied" rows (rows with same sorting key value) will not be deterministically ordered, thus the .limit() might still be nondeterministic.

You have two options to work around this:

  • Make sure you include the a unique row id in the sorting call.
    For example df.orderBy('someCol', 'rowId').limit(n)
  • If you only need deterministic result in the single run, you could simply cache the results of limit df.limit(n).cache() so that at least the results from that limit do not change due to the consecutive action calls that would otherwise recompute the results of limit and mess up the results.
like image 5
dsalaj Avatar answered Oct 09 '22 06:10

dsalaj