Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

MongoDB Spark Connector - aggregation is slow

I am running the same aggregation pipeline with a Spark Application and on the Mongos console. On the console, the data is fetched within the blink of an eye, and only a second use of "it" is needed to retrieve all expected data. The Spark Application however takes almost two minutes according to the Spark WebUI.

enter image description here

As you can see, 242 tasks are being launched to fetch the result. I am not sure why such an high amount of tasks is launched while there are only 40 documents being returned by the MongoDB aggregation. It looks like there is a high overhead.

The query I run on the Mongos console:

db.data.aggregate([
   {
      $match:{
         signals:{
            $elemMatch:{
               signal:"SomeSignal",
               value:{
                  $gt:0,
                  $lte:100
               }
            }
         }
      }
   },
   {
      $group:{
         _id:"$root_document",
         firstTimestamp:{
            $min:"$ts"
         },
         lastTimestamp:{
            $max:"$ts"
         },
         count:{
            $sum:1
         }
      }
   }
])

The Spark Application code

    JavaMongoRDD<Document> rdd = MongoSpark.load(sc);

    JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
            Document.parse(
                    "{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
            Document.parse(
                    "{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));

    JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
        @Override
        public String call(Document arg0) throws Exception {
            String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
                    arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
                    arg0.get("count").toString());
            return output;
        }
    });

    outputRdd.saveAsTextFile("/user/spark/output");

After that, I use hdfs dfs -getmerge /user/spark/output/ output.csv and compare the results.

Why is the aggregation so slow? Isn't the call to withPipeline meant to reduce the amount of data needed to be transfered to Spark? It looks like it isn't doing the same aggregation the Mongos console does. On the Mongos console it is blazing fast. I am using Spark 1.6.1 and mongo-spark-connector_2.10 version 1.1.0.

Edit: Another thing I am wondering about is that two executors get launched (because I am using the default execution settings atm), but only one executor does all the work. Why isn't the second executor doing any work?

enter image description here

Edit 2: When using a different aggregation pipeline and calling .count() instead of saveAsTextFile(..), there are also 242 tasks being created. This time there will be 65.000 documents returned. enter image description here

like image 643
j9dy Avatar asked Oct 17 '22 23:10

j9dy


1 Answers

The high number of tasks is caused by the default Mongo Spark partitioner strategy. It ignores the aggregation pipeline when calculating the partitions, for two main reasons:

  1. It reduces the cost of calculating partitions
  2. Ensures the same behaviour for sharded and non-sharded partitioners

However, as you've found they can generate empty partitions which in your case is costly.

The choices for fixing could be:

  1. Change partitioning strategy

    For choose an alternative partitioner to reduce the number of partitions. For example the PaginateByCount will split the database into a set number of partitions.

    Create your own partitioner - simply implement the trait and you will be able to apply the aggregation pipeline and partition up the results. See the HalfwayPartitioner and custom partitioner test for an example.

  2. Pre aggregate the results into a collection using $out and read from there.

  3. Use coalesce(N) to coalesce the partitions together and reduce the number of partitions.
  4. Increase the spark.mongodb.input.partitionerOptions.partitionSizeMB configuration to produce fewer partitions.

A custom partitioner should produce the best solution but there are ways to make better use of the available default partitioners.

If you think there should be a default partitioner that uses the aggregation pipeline to calculate the partitions then please add a ticket to the MongoDB Spark Jira project.

like image 150
Ross Avatar answered Oct 21 '22 07:10

Ross