Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why Mongo Spark connector returns different and incorrect counts for a query?

I'm evaluating Mongo Spark connector for a project and I'm getting the inconsistent results. I use MongoDB server version 3.4.5, Spark (via PySpark) version 2.2.0, Mongo Spark Connector version 2.11;2.2.0 locally on my laptop. For my test DB I use the Enron dataset http://mongodb-enron-email.s3-website-us-east-1.amazonaws.com/ I'm interested in Spark SQL queries and when I started to run simple test queries for count I received different counts for each run. Here is output from my mongo shell:

> db.messages.count({'headers.To': '[email protected]'})
203

Here are some output from my PySpark shell:

In [1]: df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/enron_mail.messages").load()
In [2]: df.registerTempTable("messages")
In [3]: res = spark.sql("select count(*) from messages where headers.To='[email protected]'")
In [4]: res.show()
+--------+                                                                      
|count(1)|
+--------+
|     162|
+--------+
In [5]: res.show()
+--------+                                                                      
|count(1)|
+--------+
|     160|
+--------+
In [6]: res = spark.sql("select count(_id) from messages where headers.To='[email protected]'")
In [7]: res.show()
+----------+                                                                    
|count(_id)|
+----------+
|       161|
+----------+
In [8]: res.show()
+----------+                                                                    
|count(_id)|
+----------+
|       162|
+----------+

I searched in Google about this issue but I didn't find anything helpful. If someone has any ideas why this could happen and how to handle this correctly please share your ideas. I have a feeling that maybe I missed something or maybe something wasn't configured properly.

UPDATE: I solved my issue. The reason of inconsistent counts was the MongoDefaultPartitioner which wraps MongoSamplePartitioner which uses random sampling. To be honest this is quite a weird default as for me. I personally would prefer to have a slow but a consistent partitioner instead. The details for partitioner options can be found in the official configuration options documentation.

UPDATE: Copied the solution into an answer.

like image 978
artemdevel Avatar asked Oct 09 '17 17:10

artemdevel


1 Answers

I solved my issue. The reason of inconsistent counts was the MongoDefaultPartitioner which wraps MongoSamplePartitioner which uses random sampling. To be honest this is quite a weird default as for me. I personally would prefer to have a slow but a consistent partitioner instead. The details for partitioner options can be found in the official configuration options documentation.

code:

val df = spark.read
  .format("com.mongodb.spark.sql.DefaultSource")
  .option("uri", "mongodb://127.0.0.1/enron_mail.messages")
  .option("partitioner", "spark.mongodb.input.partitionerOptions.MongoPaginateBySizePartitioner ")
  .load()
like image 137
artemdevel Avatar answered Dec 22 '22 00:12

artemdevel