I'm creating a data sample from some dataframe df
with
rdd = df.limit(10000).rdd
This operation takes quite some time (why actually? can it not short-cut after 10000 rows?), so I assume I have a new RDD now.
However, when I now work on rdd
, it is different rows every time I access it. As if it resamples over again. Caching the RDD helps a bit, but surely that's not save?
What is the reason behind it?
Update: Here is a reproduction on Spark 1.5.2
from operator import add
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],100)
rdd1=rdd.toDF().limit(1000).rdd
for _ in range(3):
print(rdd1.map(lambda row:row.i).reduce(add))
The output is
499500
19955500
49651500
I'm surprised that .rdd
doesn't fix the data.
EDIT: To show that it get's more tricky than the re-execution issue, here is a single action which produces incorrect results on Spark 2.0.0.2.5.0
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],200)
rdd1=rdd.toDF().limit(12345).rdd
rdd2=rdd1.map(lambda x:(x,x))
rdd2.join(rdd2).count()
# result is 10240 despite doing a self-join
Basically, whenever you use limit
your results might be potentially wrong. I don't mean "just one of many samples", but really incorrect (since in the case the result should always be 12345).
In PySpark, limit() is a DataFrame transformation that returns a DataFrame with top N rows, for Spark with Scala/Java it returns a Dataset.
PySpark – head() It is used to display the top rows or the entire dataframe.
In other words, the dataframe is mutable and provides great flexibility to work with. While Pyspark derives its basic data types from Python, its own data structures are limited to RDD, Dataframes, Graphframes.
Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.
Because Spark is distributed, in general it's not safe to assume deterministic results. Your example is taking the "first" 10,000 rows of a DataFrame. Here, there's ambiguity (and hence non-determinism) in what "first" means. That will depend on the internals of Spark. For example, it could be the first partition that responds to the driver. That partition could change with networking, data locality, etc.
Even once you cache the data, I still wouldn't rely on getting the same data back every time, though I certainly would expect it to be more consistent than reading from disk.
Spark is lazy, so each action you take recalculates the data returned by limit(). If the underlying data is split across multiple partitions, then every time you evaluate it, limit might be pulling from a different partition (i.e. if your data is stored across 10 Parquet files, the first limit call might pull from file 1, the second from file 7, and so on).
From the Spark docs:
The
LIMIT
clause is used to constrain the number of rows returned by theSELECT
statement. In general, this clause is used in conjunction withORDER BY
to ensure that the results are deterministic.
So you need to sort the rows beforehand if you want the call to .limit()
to be deterministic. But there is a catch! If you sort by a column that doesn't have unique values for every row, the so called "tied" rows (rows with same sorting key value) will not be deterministically ordered, thus the .limit()
might still be nondeterministic.
You have two options to work around this:
df.orderBy('someCol', 'rowId').limit(n)
df.limit(n).cache()
so that at least the results from that limit do not change due to the consecutive action calls that would otherwise recompute the results of limit
and mess up the results.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With