Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark inconsistency when running count command

A question about inconsistency of Spark calculations. Does this exist? For example, I am running EXACTLY the same command twice, e.g.:

imp_sample.where(col("location").isNotNull()).count()

And I am getting slightly different results every time I run it (141,830, then 142,314)! Or this:

imp_sample.where(col("location").isNull()).count()

and getting 2,587,013, and then 2,586,943. How is it even possible? Thank you!

like image 698
user3245256 Avatar asked Dec 02 '17 21:12

user3245256


People also ask

How do I count the number of rows in a spark data frame?

To get the number of rows from the PySpark DataFrame use the count() function. This function returns the total number of rows from the DataFrame. By calling this function it triggers all transformations on this DataFrame to execute.

How does count work in PySpark?

The count function counts the data and returns the data to the driver in PySpark, making the type action in PySpark. This count function in PySpark is used to count the number of rows that are present in the data frame post/pre-data analysis.

How can I make PySpark run faster?

Consider caching to speed up PySpark If your dataset is large, this may take quite some time. This is especially true if caching is not enabled and Spark has to start by reading the input data from a remote source – such as a database cluster or cloud object storage like S3.

How do you count in PySpark?

In Pyspark, there are two ways to get the count of distinct values. We can use distinct() and count() functions of DataFrame to get the count distinct of PySpark DataFrame. Another way is to use SQL countDistinct() function which will provide the distinct value count of all the selected columns.


2 Answers

As per your comment, you are using sampleBy in your pipeline. sampleBydoesn't guarantee you'll get the exact fractions of rows. It takes a sample with probability for each record being included equal to fractions and can vary from run to run.

Regarding your monotonically_increasing_id question in the comments, it only guarantees that the next id is larger than the previous one, however, it doesn't guarantee ids are consecutive (i,i+i,i+2, etc...).

Finally, you can persist a data frame, by called persist() on it.

like image 179
Alex Avatar answered Oct 02 '22 18:10

Alex


Ok, I have suffered majorly from this in the past. I had a seven or eight stage pipeline that normalised a couple of tables, added ids, joined them and grouped them. Consecutive runs of the same pipeline gave different results, although not in any coherent pattern I could understand.

Long story short, I traced this feature to my usage of the function monotonically_increasing_id, supposed resolved by this JIRA ticket, but still evident in Spark 2.2.

I do not know exactly what your pipeline does, but please understand that my fix is to force SPARK to persist results after calling monotonically_increasing_id. I never saw the issue again after I started doing this.

Let me know if a judicious persist resolves this issue.

To persist an RDD or DataFrame, call either df.cache (which defaults to in-memory persistence) or df.persist([some storage level]), for example

df.persist(StorageLevel.DISK_ONLY)

Again, it may not help you, but in my case it forced Spark to flush out and write id values which were behaving non-deterministically given repeated invocations of the pipeline.

like image 29
Chondrops Avatar answered Oct 02 '22 20:10

Chondrops