I'm running a fairly small Spark program with a few map and reduceByKey operations, over a very small data set of less than 400MB.
At some point I have an RDD of tuples that I want to sort, and I call sortByKey. This is the slowest part of my program. Everything else seems to run almost instantly, but this takes up to 20 seconds.
The problem is, it takes 20 seconds in my laptop, as well as in a cluster of AWS m3.large machines. I've tried with 1, 2 and 3 slaves, and the differences in execution time are very small. Ganglia and the spark web console indicate that CPUs and memories are being used to maximum capacity in all slaves, so I think config is ok.
I also found the issue of the execution happening before I expected, but then I read this thread, which points to an open issue in Spark. I don't think that's entirely related though.
Is it sortByKey inherently slow and it doesn't matter how many nodes I add, it's going to dictate the minimum execution time of my program? Hopefully not, and there is just something I'm doing wrong and can be fixed.
EDIT
Turns out that what I was seeing was related to that link I posted. sortByKey just happened to be the first action (documented as transformation), and it looked as if the program was being slow at sorting, but actually sorting is quite fast. The problem is in a previous join operation.
Still everything I said applies by changing sort with join. Why is the execution time not dropping when I add more nodes (or numTask to the join function), and why is it not even better than a plain SQL join? I found someone else having this problem before, but no answer other than suggesting tuning serialisation, which I really don't think is my case.
A join is inherently a heavy operation, because values with identical keys must be moved to the same machine (a network shuffle). Adding more nodes is just going to add extra IO overhead.
I can think of 2 things:
Option 1
If you are joining a large dataset with a smaller one, it can pay off to broadcast the smaller dataset:
val large = sc.textFile("large.txt").map(...)
val smaller = sc.textFile("smaller.txt").collect().toMap()
val bc = sc.broadcast(smaller)
And then do a 'manual join':
large.map(x => (x.value, bc.value(x.value)))
This is described in more detail in this Advanced Spark presentation.
Option 2
You could repartition the small dataset, using the same partitioner as the large one (i.e. make sure that similar keys are on the same machine). So, adjust partitioning of the small set to match partitioning of the large one.
This will trigger a shuffle of the small set only. Once the partitioning is correct, the join should be fairly fast, since it will run locally on each cluster node.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With