A question about inconsistency of Spark calculations. Does this exist? For example, I am running EXACTLY the same command twice, e.g.:
imp_sample.where(col("location").isNotNull()).count()
And I am getting slightly different results every time I run it (141,830, then 142,314)! Or this:
imp_sample.where(col("location").isNull()).count()
and getting 2,587,013, and then 2,586,943. How is it even possible? Thank you!
To get the number of rows from the PySpark DataFrame use the count() function. This function returns the total number of rows from the DataFrame. By calling this function it triggers all transformations on this DataFrame to execute.
The count function counts the data and returns the data to the driver in PySpark, making the type action in PySpark. This count function in PySpark is used to count the number of rows that are present in the data frame post/pre-data analysis.
Consider caching to speed up PySpark If your dataset is large, this may take quite some time. This is especially true if caching is not enabled and Spark has to start by reading the input data from a remote source – such as a database cluster or cloud object storage like S3.
In Pyspark, there are two ways to get the count of distinct values. We can use distinct() and count() functions of DataFrame to get the count distinct of PySpark DataFrame. Another way is to use SQL countDistinct() function which will provide the distinct value count of all the selected columns.
As per your comment, you are using sampleBy
in your pipeline. sampleBy
doesn't guarantee you'll get the exact fractions of rows. It takes a sample with probability for each record being included equal to fractions and can vary from run to run.
Regarding your monotonically_increasing_id
question in the comments, it only guarantees that the next id is larger than the previous one, however, it doesn't guarantee ids are consecutive (i,i+i,i+2, etc...).
Finally, you can persist a data frame, by called persist() on it.
Ok, I have suffered majorly from this in the past. I had a seven or eight stage pipeline that normalised a couple of tables, added ids, joined them and grouped them. Consecutive runs of the same pipeline gave different results, although not in any coherent pattern I could understand.
Long story short, I traced this feature to my usage of the function monotonically_increasing_id, supposed resolved by this JIRA ticket, but still evident in Spark 2.2.
I do not know exactly what your pipeline does, but please understand that my fix is to force SPARK to persist results after calling monotonically_increasing_id. I never saw the issue again after I started doing this.
Let me know if a judicious persist resolves this issue.
To persist an RDD or DataFrame, call either df.cache (which defaults to in-memory persistence) or df.persist([some storage level]), for example
df.persist(StorageLevel.DISK_ONLY)
Again, it may not help you, but in my case it forced Spark to flush out and write id values which were behaving non-deterministically given repeated invocations of the pipeline.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With