Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to achieve dynamic load-balancing of tasks in Apache Spark

I know that in Spark I can split my computation by using multiple partitions. If say I can split my input RDD into 1000 partitions and the number of my machines is 100, Spark will split the computation into 1000 tasks and dynamically allocate them into my 100 machines in some smart way.

Now suppose I can initially split my data into only 2 partitions, but I still have 100 machines. Naturally, my 98 machines will be idle. But as I am processing each task I could probably split it into sub-tasks that might be potentially executed on different machines. It can be easily achieved in plain Java with a queue, but I am not sure of what is the best way to attack it in Apache Spark.

Consider the following Java pseudo-code:

BlockingQueue<Task> q = new LinkedBlockingQueue<Task>();
q.push(myInitialTask);
...
//On each thread:
while (!queue.isEmpty()) {
    Task nextTask = queue.take();
    List<Task> newTasks = process_task_and_split_to_sub_tasks(nextTask);
    queue.pushAll(newTasks);
} 

The above Java code will keep all my 100 threads busy assuming the method 'process_task_and_split_to_sub_tasks()' can split any large task to a number of smaller ones.

Is there a way to achieve the same in Spark, may be in combination with other tools?


Update: It has been correctly pointed out that one of the way to attack it is just to

  1. Generate more finer-grained keys and
  2. Then to use a smart Partitioner that will assign those keys to partitions.

I guess this is the 'classic' way to attack this problem, but it requires from me to be able to correctly estimate the amount of work per key to properly partition it. What if I don't have a good way to know the amount of work per key in advance? I might end up with very unfortunate partitioning when most of my machines will stay idle waiting for a few unfortunate ones.

Example: Let's take a simplified frequent itemset mining as an example.
Suppose my file contains lines with letters from a to j (10 letters), all letters in each line are sorted alphabetically and without repetitions, e.g. 'abcf' and the task is to find all letter combinations that exist in 50% of all lines. E.g. if many lines match the pattern 'ab.*f', then the output will contain {'a', 'b', 'f', 'ab', 'af', 'bf', 'abf'}.
One of the way to implement it is to send all lines starting with 'a' to one mapper (machine), all lines starting with 'b' to another etc. By the way, this is how frequent pattern mining is implemented in Spark. Now suppose I have 100 machines (but only 10 letters). Then 90 of my machines will stay idle.
With the finer-grained keys solution I could generate 10,000 4-letter prefixes and then somehow partition them based on estimated work per prefix. But I can be very wrong in my partitioning: if the majority of the lines start with 'abcd', then all the work will be done by the machine responsible for this prefix (and probably other prefixes in addition to it), again yielding a situation when most of my machines stay idle waiting for some unfortunate one.

The dynamic load-balancing in this case would be something like this: the mapper that has received the lines starting with 'a' might want to further split its lines - to those starting with 'ab', 'ac', 'ad',... and then send them to 10 other machines which might decide to further split their work to more tasks.
I understand that the standard Apache Spark does not have an answer out of the box, but I wonder if there is a way to achieve this nonetheless.

Kafka (i.e. the queue, as above) + Spark Streaming looks promising, do you think I will be able to achieve the dynamic load-balancing by using these tools in relatively straightforward way? Could you recommend other tools instead?

like image 476
Alexander Avatar asked Jan 07 '18 17:01

Alexander


3 Answers

Now suppose I have 100 machines (but only 10 letters). The mapper that has received the lines starting with 'a' might want to further split its lines - to those starting with 'ab', 'ac', 'ad' etc. and then send them to 10 other machines.

It is just not how Spark works. "Mapper" (task) is mostly ignorant of all the distributed context. At this level there is no access to SparkContext and we longer have RDDs, just input as a local Iterator and code to be executed on it. It cannot start and it cannot create new tasks.

At the same time your problem definition is artificial. To find frequent patterns you have to aggregate data, therefore you need shuffle. At these point records corresponding to a given pattern has to be shuffled to the same machine. Ensuring that there data is properly distributed is a job of Partitioner and there is really no place for "splits" here.

like image 166
user9198485 Avatar answered Oct 17 '22 07:10

user9198485


Spark's own dynamic allocation can, to some limited extent, emulate what you want, however if you need a detailed, high performance approach with low level control, then Spark is not for you. For starters you won't be able to dynamically split tasks - you can only adjust overall resources assigned to the application.

You should rather consider low level scheduler and implementing your own solution from scratch.

like image 23
user9184913 Avatar answered Oct 17 '22 09:10

user9184913


To archive your requirement, you can just repartition your data from two partition to whatever number of partition you want.

See https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/api/java/JavaPairRDD.html#repartition-int-

BTW, spark Streaming is unrelated to your problem.

Note that level of parallelism is not just depend on partition of dataset, but also depends on our job/algorithm.

like image 34
petertc Avatar answered Oct 17 '22 09:10

petertc