I am running the same aggregation pipeline with a Spark Application and on the Mongos console. On the console, the data is fetched within the blink of an eye, and only a second use of "it" is needed to retrieve all expected data. The Spark Application however takes almost two minutes according to the Spark WebUI.
As you can see, 242 tasks are being launched to fetch the result. I am not sure why such an high amount of tasks is launched while there are only 40 documents being returned by the MongoDB aggregation. It looks like there is a high overhead.
The query I run on the Mongos console:
db.data.aggregate([
{
$match:{
signals:{
$elemMatch:{
signal:"SomeSignal",
value:{
$gt:0,
$lte:100
}
}
}
}
},
{
$group:{
_id:"$root_document",
firstTimestamp:{
$min:"$ts"
},
lastTimestamp:{
$max:"$ts"
},
count:{
$sum:1
}
}
}
])
The Spark Application code
JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
Document.parse(
"{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
Document.parse(
"{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));
JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
@Override
public String call(Document arg0) throws Exception {
String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
arg0.get("count").toString());
return output;
}
});
outputRdd.saveAsTextFile("/user/spark/output");
After that, I use hdfs dfs -getmerge /user/spark/output/ output.csv
and compare the results.
Why is the aggregation so slow? Isn't the call to withPipeline
meant to reduce the amount of data needed to be transfered to Spark? It looks like it isn't doing the same aggregation the Mongos console does. On the Mongos console it is blazing fast. I am using Spark 1.6.1 and mongo-spark-connector_2.10 version 1.1.0.
Edit: Another thing I am wondering about is that two executors get launched (because I am using the default execution settings atm), but only one executor does all the work. Why isn't the second executor doing any work?
Edit 2: When using a different aggregation pipeline and calling .count()
instead of saveAsTextFile(..)
, there are also 242 tasks being created. This time there will be 65.000 documents returned.
The high number of tasks is caused by the default Mongo Spark partitioner strategy. It ignores the aggregation pipeline when calculating the partitions, for two main reasons:
However, as you've found they can generate empty partitions which in your case is costly.
The choices for fixing could be:
Change partitioning strategy
For choose an alternative partitioner to reduce the number of partitions. For example the PaginateByCount will split the database into a set number of partitions.
Create your own partitioner - simply implement the trait and you will be able to apply the aggregation pipeline and partition up the results. See the HalfwayPartitioner and custom partitioner test for an example.
Pre aggregate the results into a collection using $out and read from there.
coalesce(N)
to coalesce the partitions together and reduce the number of partitions.spark.mongodb.input.partitionerOptions.partitionSizeMB
configuration to produce fewer partitions.A custom partitioner should produce the best solution but there are ways to make better use of the available default partitioners.
If you think there should be a default partitioner that uses the aggregation pipeline to calculate the partitions then please add a ticket to the MongoDB Spark Jira project.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With